在flink的job开发过程中,我们整体会经历三个阶段,分别是:source -> Transform -> sink。今天我们就来介绍下这个sink,即把数据写入某个地方。
在flink的job中,我们经过各种算子的操作后会得到我们想要的结果数据,那么最后我们肯定是需要把数据写入到我们定义的目标中。这里我们常见的把数据写入的目标有:
1、mysql 2、redis 3、kafka 4、elasticsearch 5、文件 等等。。。。
在flink中自带有一些sink,例如:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataSource = environment.addSource(new MySources());
dataSource.writeAsCsv("/aaaa"); //把数据写成csv格式的文件
dataSource.writeAsText("/aaaa") //把数据写成text格式的文件
dataSource.writeToSocket("192.168.31.30", 9999, new TextSerializationSchema()); //把数据写入到socket中
dataSource.print(); //把数据打印到控制台进行显示当然flink的sink和source都一样,网上有大量的开源第三方依赖,可以直接拿来即用,同时我们也可以自定义sink,那么如何操作呢?
1、第一种方式是集成自RichSinkFunction类,示例代码如下:
package com.flink.demo.sources;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.flink.demo.RsPoJo;
public class MysqlSink extends RichSinkFunction<RsPoJo>{
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void invoke(RsPoJo value, Context context) throws Exception {
super.invoke(value, context);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void close() throws Exception {
super.close();
}
}在这个方法里面,我们主要实现invoke(),open(),close()方法。
2、第二种方式是实现SinkFunction类,示例代码如下:
package com.flink.demo.sources;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import com.flink.demo.RsPoJo;
public class RedisSink implements SinkFunction<RsPoJo>{
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void invoke(RsPoJo value, Context context) throws Exception {
// TODO Auto-generated method stub
SinkFunction.super.invoke(value, context);
}
}在这个方法里面,我们主要实现invoke()方法即可
最后个人推荐使用第一种方法,有完整的生命周期,在开发的时候,逻辑比较清晰明了。









还没有评论,来说两句吧...