承接上篇《Flink学习系列(八)flink的transform介绍》。之前的文章我们已经介绍过了flink的一些简单应用,从这篇开始,我们会开始介绍一些flink的进阶知识点。本篇我们介绍下flink的cdc操作。
一、cdc是什么
cdc我们可以理解为可以检测某个对象的数据变更记录的工具。说的再形象点就是有一个工具,在启动的时候就可以开启某个对象的监听,这个对象有任何操作变更的时候,这个工具都能感知到,所以这样的工具我们把他称为cdc。
二、常用的cdc工具
大家在工作中常用到的cdc的工具有阿里巴巴的cannel,他是一个监听mysqlbinlog变更的工具。还有一个是readhat出厂的工具:Debezium 。这个工具一般会搭配kafka使用。
备注:我们在工作中或者面试过程中经常听到的一般都是cannel,但是Debezium工具在大数据的环境里面的使用范围才是最大的。资深的大数据工程师都会对Debezium有了解。
三、flink的cdc是什么
Flink在前面我们已经介绍过了,他只是一个分布式计算框架,但是为什么大家会听到有flinkcdc呢,这里我们回忆一下上面介绍的Debezium和cannel工具。
例如我们有一个场景,需要监听mysql表的变化,如果mysql的某个库的某张表里面有变化,我们需要把对应的信息进行解析处理,然后把数据写入新的mysql。这时候我们一般怎么做呢?那肯定是先使用cancel或者Debezium监听mysql的binlog。然后把数据处理后发送到kafka,然后再使用消费者从kafka里面获取数据进行业务操作。
相信熟悉业务的人都知道是这个流程吧。
那么flink长度cdc是什么呢? 其实flink的cdc就是一个自定义的datasource,他里面集成了debezium和kafka的功能,简化了我们开发和运维部署debezium和kafka工具的工作。使得我们在flink里面初始化cdc的配置里面进行下配置,然后我们就可以直接读取到mysql binlog这样的数据。
四、flink的cdc支持哪些数据源
flink的cdc不仅仅支持mysql,还支持其他的数据源,在这里我们列举一下
序号 | 数据源 | 数据源支持版本 |
1 | debezium | |
2 | mongodb | 3.6,4.x,5.0 |
3 | mysql | 5.6,5.7,8.0 |
4 | oceanbase | 3.1.x |
5 | oracle | 11,12,19 |
6 | postgresql | 9.6,10,11,12 |
7 | sqlserver | 2012,2014,2016,2017,2019 |
8 | tidb | 5.1.x,5.2.x,5.3.x,5.4.x,6.0.0 |
五、flink的cdc怎么使用
这里我们以mysql为演示:
1、maven pom里面导入
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version> </dependency>
2、代码如下
package com.fututebytedance; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import com.ververica.cdc.connectors.mysql.MySqlSource; import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema; public class FlinkCDC { public static void main(String[] args) throws Exception { // 1.获取Flink 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 2.通过FlinkCDC构建SourceFunction DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().hostname("192.168.31.30") .port(3307).username("root").password("123456").serverTimeZone("UTC").databaseList("test") .tableList("test.user").deserializer(new StringDebeziumDeserializationSchema()) // .startupOptions(StartupOptions.initial()) .build(); DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction); // 3.数据打印 dataStreamSource.print(); // 4.启动任务 env.execute("FlinkCDC"); } }
3、然后我们操作数据库即可看到对应的结果
4、完整的我们结果我们粘贴出来
SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1656488885, file=mysql-bin.000004, pos=3168, row=1, server_id=123654, event=2}} ConnectRecord{topic='mysql_binlog_source.test.user', kafkaPartition=null, key=Struct{id=9}, keySchema=Schema{mysql_binlog_source.test.user.Key:STRUCT}, value=Struct{after=Struct{id=9,username=田七,userage=22,sex=1},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1656488885000,db=test,table=user,server_id=123654,file=mysql-bin.000004,pos=3292,row=0},op=c,ts_ms=1656488887035}, valueSchema=Schema{mysql_binlog_source.test.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)} SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1656488919, file=mysql-bin.000004, pos=3441, row=1, server_id=123654, event=2}} ConnectRecord{topic='mysql_binlog_source.test.user', kafkaPartition=null, key=Struct{id=9}, keySchema=Schema{mysql_binlog_source.test.user.Key:STRUCT}, value=Struct{before=Struct{id=9,username=田七,userage=22,sex=1},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1656488919000,db=test,table=user,server_id=123654,file=mysql-bin.000004,pos=3565,row=0},op=d,ts_ms=1656488920668}, valueSchema=Schema{mysql_binlog_source.test.user.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
备注:flink的cdc里面mysql的我们使用最多,但是目前只能检测到insert,update,delete,像alter这样的sql监听不到。
还没有评论,来说两句吧...