上一篇文章《Flink流处理系列(三)flink cdc读取mysql binlog》我们介绍了一下使用flink的cdc来读取mysql。这篇文章我们的案例代码还是在上一篇的基础上改动即可。
在实际的业务中,我们经常会涉及到多数据源数据的读取,特别是在数仓的ETL环节,所以对于使用flink来说,我们几乎都会涉及到从多个connector里面读取,每一个connector可能都是不同的,例如一部分数据从mysql来,一部分数据从mq来。所以这种多数据源的场景使用是非常广泛的。这篇文章我们就来演示一下使用flink从多数据源读取数据。
特殊说明
由于演示环境的问题,我们这里的多数据源拟定的场景是从多个mysql里面读取binlog,由于演示的环境没有mq,因此暂时不费时间了。
代码示例
其实flink从多数据源读取数据很简单,就是简历多个数据通道即可,例如上一篇文章我们有UserDataStream数据源,这里我们增加一个school数据源,完整代码示例如下:
package com.flink.demo.connector;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSONObject;
import com.flink.demo.utils.TransformUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
public class SchoolDataStream {
public static DataStream<JSONObject> getTeacherDataStream(StreamExecutionEnvironment env) {
// 1.创建Flink-MySQL-CDC的Source
MySqlSource<String> teacherSouce = MySqlSource.<String>builder().hostname("192.168.31.20").port(33307)
.username("root").password("123456").databaseList("users").tableList("users.school")
.startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema())
.serverTimeZone("Asia/Shanghai").build();
// 2.使用CDC Source从MySQL读取数据
DataStreamSource<String> mysqlDataStreamSource = env.fromSource(teacherSouce, WatermarkStrategy.noWatermarks(),
"SchoolDataStreamNoWatermark Source");
// 3.转换为指定格式
DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> {
return TransformUtil.formatResult(rawData);
});
return teacherDataStream;
}
}这里就相当于两个数据源的功能模块都写完了,那么我们在flink job的初始化环境里面new 两个数据源即可,完整代码如下:
package com.flink.demo.mysql;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSONObject;
import com.flink.demo.connector.SchoolDataStream;
import com.flink.demo.connector.UserDataStream;
public class MySqlCdcReader {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.1获取user表的数据源
DataStream<JSONObject> userDataStream = UserDataStream.getTeacherDataStream(env);
// 2.2获取school表的数据源
DataStream<JSONObject> schoolDataStream = SchoolDataStream.getTeacherDataStream(env);
// 3.1打印user数据源
userDataStream.print();
// 3.2打印school数据源
schoolDataStream.print();
env.execute("mysql cdc demo");
}
}上面2.1和2.2就是我们new的两个数据源,此时就相当于我们把两个数据源都接入进来了,运行起来测试下
可以看到我们对不同数据源的mysql进行增删改查的时候,这里都会有数据打印出来。说明这两个数据源的数据都会源源不断的进入到flink中来。
备注:
1、flink处理多数据源其实非常简单,需要几个数据源,就new几个就可以了。
最后,按照惯例,附上本文的案例源码,登录后即可下载。










还没有评论,来说两句吧...