在flink的job开发过程中,我们整体会经历三个阶段,分别是:source -> Transform -> sink。今天我们就来介绍下这个source。
在flink开发里面,不管是DataSet还是DataStream都需要去读取数据源,因此添加一个数据源是非常必要的,在flink开发里面,我们常见的添加数据源的方式是:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = environment.addSource(new MySources());
这里的数据源可以是常见的list,数组,数据库的数据,缓存的数据,消息队列的数据等等,不同的数据源有对应的数据源接入方式,我们常听的flink-connector也就是一种数据源的接入方式,例如常见的:
kafka-connector metaq-connector doris-connector 等等。
同时如果没有现成的connector的话,我们也可以自定义一个connector,自己自定义connector的方式主要是:
1、第一种方式是继承自RichSourceFunction类,同时实现对应的方法:
open() close() run() cancel()
例如:
package com.flink.demo.function; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import com.flink.demo.RsPoJo; public class MysqlSource extends RichSourceFunction<RsPoJo> { @Override public void open(Configuration parameters) throws Exception { // TODO Auto-generated method stub super.open(parameters); } @Override public void close() throws Exception { // TODO Auto-generated method stub super.close(); } @Override public void run(SourceContext<RsPoJo> ctx) throws Exception { // TODO Auto-generated method stub } @Override public void cancel() { // TODO Auto-generated method stub } }
2、第二种方式是实现SourceFunction接口,同时需要实现的方法是:
run() close()
示例如下:
package com.flink.demo.function; import org.apache.flink.streaming.api.functions.source.SourceFunction; import com.flink.demo.RsPoJo; public class RedisSource implements SourceFunction<RsPoJo>{ @Override public void run(SourceContext<RsPoJo> ctx) throws Exception { } @Override public void cancel() { // TODO Auto-generated method stub } }
个人比较推荐继承自RichSourceFunction类,这样在代码编写起来比较直观一些。
还没有评论,来说两句吧...