这是flink流处理的第三篇文章,咱们介绍下使用flink的cdc功能,读取mysql的binlog实现数据同步。下面直接实战演示下:
一、安装mysql
安装mysql的话,,这里选择5.7或者8.x的版本都可以,建议使用mysql5.7的版本,同时安装的mysql必须要开启binlog功能,详细的mysql安装教程可以参考《如何使用docker搭建一个mysql5.7的主从环境》。
备注:
1、使用cdc读取mysql的binlog的话,在mysql里面的binlog_format一定要配置为row。不然会报错。
二、创建一个maven项目,并且引入pom依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.flink.demo</groupId> <artifactId>FStreamingCDCDemo</artifactId> <version>1.0</version> <description>Sample application generated by com.a9ski:quick-start</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <lombok.version>1.18.24</lombok.version> <log4j2.version>2.17.1</log4j2.version> <flink.version>1.14.4</flink.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependencies> <!-- build dependencies --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> <scope>provided</scope> </dependency> <!-- Logging --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j2.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j2.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.23</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.12.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.80</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>
三、定义一个枚举,体现表更改的类型
在mysql中使用cdc进行数据读取的时候,会涉及到三种方式,分别是:
第一种是数据的插入 第二种是数据的更新 第三种是数据的删除
所以我们在这里定义一个枚举类,完整代码如下:
package com.flink.demo.enu; /** * * @author Administrator * */ public enum OpEnum { /** * 新增 */ CREATE("c", "create", "新增"), /** * 修改 */ UPDATA("u", "update", "更新"), /** * 删除 */ DELETE("d", "delete", "删除"), /** * 读 */ READ("r", "read", "首次读"); /** * 字典码 */ private String dictCode; /** * 字典码翻译值 */ private String dictValue; /** * 字典码描述 */ private String description; OpEnum(String dictCode, String dictValue, String description) { this.dictCode = dictCode; this.dictValue = dictValue; this.description = description; } public String getDictCode() { return dictCode; } public String getDictValue() { return dictValue; } public String getDescription() { return description; } }
四、进行格式转化
使用cdc从mysql的binlog里读取到的原始信息是这样子的:
{"before":{"id":2,"name":"张三","age":2,"sex":"1"},"after":{"id":2,"name":"张三","age":2,"sex":"2"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1677654460000,"snapshot":"false","db":"users","sequence":null,"table":"user","server_id":1001,"gtid":null,"file":"mall-mysql-bin.000004","pos":1205,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1677654582869,"transaction":null}
但是这里完整的代码一般不是我们想要的,我们只想要更改后的,也就是after之后的数据,所以这里我们需要把数据进行转换一下,完整代码如下:
package com.flink.demo.utils; import com.alibaba.fastjson.JSONObject; import com.flink.demo.enu.OpEnum; import lombok.extern.slf4j.Slf4j; @Slf4j public class TransformUtil { /** * 格式化抽取数据格式 去除before、after、source等冗余内容 * * @param extractData 抽取的数据 * @return */ public static JSONObject formatResult(String extractData) { System.out.println("读取到的内容是:"+extractData); JSONObject formatDataObj = new JSONObject(); JSONObject rawDataObj = JSONObject.parseObject(extractData); formatDataObj.putAll(rawDataObj); formatDataObj.remove("before"); formatDataObj.remove("after"); formatDataObj.remove("source"); String op = rawDataObj.getString("op"); if (OpEnum.DELETE.getDictCode().equals(op)) { // 新增取 before结构体数据 formatDataObj.putAll(rawDataObj.getJSONObject("before")); } else { // 其余取 after结构体数据 formatDataObj.putAll(rawDataObj.getJSONObject("after")); } return formatDataObj; } }
经过这里的转换之后,我们从上面很长的json串就变成了比较简单的字符串,同时根据之前的枚举信息,我们可以知道哪些数据是做的哪些操作。转换后的json数据是:
{"op":"u","sex":"2","name":"张三","id":2,"ts_ms":1677654582869,"age":2}
这里的json就看的比较简单了,也就是进行了更新操作,并且操作时间也有。
五、编写读取对应库表的datasource
使用flink读取的时候,我们需要编写mysql的连接,伪装成一个slave节点来读取master节点的binlog文件,完整代码如下:
package com.flink.demo.connector; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.alibaba.fastjson.JSONObject; import com.flink.demo.utils.TransformUtil; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; public class UserDataStream { public static DataStream<JSONObject> getTeacherDataStream(StreamExecutionEnvironment env) { // 1.创建Flink-MySQL-CDC的Source MySqlSource<String> teacherSouce = MySqlSource.<String>builder().hostname("192.168.31.20").port(33307) .username("root").password("123456").databaseList("users").tableList("users.user") .startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()) .serverTimeZone("Asia/Shanghai").build(); // 2.使用CDC Source从MySQL读取数据 DataStreamSource<String> mysqlDataStreamSource = env.fromSource(teacherSouce, WatermarkStrategy.noWatermarks(), "UserDataStreamNoWatermark Source"); // 3.转换为指定格式 DataStream<JSONObject> teacherDataStream = mysqlDataStreamSource.map(rawData -> { return TransformUtil.formatResult(rawData); }); return teacherDataStream; } }
备注:
1、在cdc里面我们可以读取任何库表的数据,但是这里我们指定了读取某个库和某张表的数据,主要是为了业务使用。
六、编写flink的job流,把结果打印出来
上面准备工作都做完了,这里我们编写一个flink的job任务来打一下输出的信息,完整代码如下:
package com.flink.demo.mysql; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.alibaba.fastjson.JSONObject; import com.flink.demo.connector.UserDataStream; public class MySqlCdcReader { public static void main(String[] args) throws Exception { // 1.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.获取user表的数据源 DataStream<JSONObject> teacherDataStream = UserDataStream.getTeacherDataStream(env); teacherDataStream.print(); env.execute("mysql cdc demo"); } }
七、测试运行
代码编写完了,我们运行下看看效果,同时在users库的user表里面进行增删改查操作,结果如下图:
可以看到这里的话,我们完整的读取出来了我们想要的数据。
备注:
1、这个案例我们演示的是读取指定库和指定表的数据,如果不需要指定库表的话,则把对应的库表配置去掉即可,或者可查看这篇文章《Flink学习系列(九)flink的cdc介绍》。
2、这里我们有一步是把数据进行了转换,主要是因为我们后面要写文章,需要提取相关的数据出来,做其他的业务使用。所以这里会有一个转换的使用。这种场景一般应用于做在线数据分析使用,而非单纯的做数据同步,如果需要数据同步的话,可以查看《canal相关的文章》
最后附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...