在flink的job开发过程中,我们整体会经历三个阶段,分别是:source -> Transform -> sink。今天我们就来介绍下这个sink,即把数据写入某个地方。
在flink的job中,我们经过各种算子的操作后会得到我们想要的结果数据,那么最后我们肯定是需要把数据写入到我们定义的目标中。这里我们常见的把数据写入的目标有:
1、mysql 2、redis 3、kafka 4、elasticsearch 5、文件 等等。。。。
在flink中自带有一些sink,例如:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataSource = environment.addSource(new MySources()); dataSource.writeAsCsv("/aaaa"); //把数据写成csv格式的文件 dataSource.writeAsText("/aaaa") //把数据写成text格式的文件 dataSource.writeToSocket("192.168.31.30", 9999, new TextSerializationSchema()); //把数据写入到socket中 dataSource.print(); //把数据打印到控制台进行显示
当然flink的sink和source都一样,网上有大量的开源第三方依赖,可以直接拿来即用,同时我们也可以自定义sink,那么如何操作呢?
1、第一种方式是集成自RichSinkFunction类,示例代码如下:
package com.flink.demo.sources; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import com.flink.demo.RsPoJo; public class MysqlSink extends RichSinkFunction<RsPoJo>{ /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(RsPoJo value, Context context) throws Exception { super.invoke(value, context); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } @Override public void close() throws Exception { super.close(); } }
在这个方法里面,我们主要实现invoke(),open(),close()方法。
2、第二种方式是实现SinkFunction类,示例代码如下:
package com.flink.demo.sources; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import com.flink.demo.RsPoJo; public class RedisSink implements SinkFunction<RsPoJo>{ /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(RsPoJo value, Context context) throws Exception { // TODO Auto-generated method stub SinkFunction.super.invoke(value, context); } }
在这个方法里面,我们主要实现invoke()方法即可
最后个人推荐使用第一种方法,有完整的生命周期,在开发的时候,逻辑比较清晰明了。
还没有评论,来说两句吧...