在flink的job开发过程中,我们整体会经历三个阶段,分别是:source -> Transform -> sink。今天我们就来介绍下这个source。
在flink开发里面,不管是DataSet还是DataStream都需要去读取数据源,因此添加一个数据源是非常必要的,在flink开发里面,我们常见的添加数据源的方式是:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = environment.addSource(new MySources());
这里的数据源可以是常见的list,数组,数据库的数据,缓存的数据,消息队列的数据等等,不同的数据源有对应的数据源接入方式,我们常听的flink-connector也就是一种数据源的接入方式,例如常见的:
kafka-connector metaq-connector doris-connector 等等。
同时如果没有现成的connector的话,我们也可以自定义一个connector,自己自定义connector的方式主要是:
1、第一种方式是继承自RichSourceFunction类,同时实现对应的方法:
open() close() run() cancel()
例如:
package com.flink.demo.function;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import com.flink.demo.RsPoJo;
public class MysqlSource extends RichSourceFunction<RsPoJo> {
@Override
public void open(Configuration parameters) throws Exception {
// TODO Auto-generated method stub
super.open(parameters);
}
@Override
public void close() throws Exception {
// TODO Auto-generated method stub
super.close();
}
@Override
public void run(SourceContext<RsPoJo> ctx) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void cancel() {
// TODO Auto-generated method stub
}
}2、第二种方式是实现SourceFunction接口,同时需要实现的方法是:
run() close()
示例如下:
package com.flink.demo.function;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.flink.demo.RsPoJo;
public class RedisSource implements SourceFunction<RsPoJo>{
@Override
public void run(SourceContext<RsPoJo> ctx) throws Exception {
}
@Override
public void cancel() {
// TODO Auto-generated method stub
}
}个人比较推荐继承自RichSourceFunction类,这样在代码编写起来比较直观一些。









还没有评论,来说两句吧...