上文《Flink CDC实战系列(一)Flink cdc 把mysql数据写入到elasticsearch》我们已经介绍了使用flink cdc把数据从mysql写入到elasticsearch。本文的话我们介绍下使用flink cdc把数据从mysql写入到kafka中。这里我们还是使用dinky进行演示,下面直接开始:
1)准备dinky环境
这里的话我们直接使用dinky1.0版本,正好做下dinky1.0版本的测试。详细的安装和使用可参考:《Dinky使用教程》。这里我们已经准备好了一个dinky的环境,使用的dinky版本是1.0.1版本,flink版本是1.18.1。
2)准备kafka环境
我们使用docker启动一个kafka实例即可,切记这里kafka启动的时候需要启动zookeeper的docker实例和kafka的docker实例,示例如下:
3)准备mysql实例
这里需要准备一个mysql的实例,切记一定要开启binlog。
4)添加flink-cdc-mysql的connector依赖
接下来我们去如下的地址下载flink-cdc-mysql的依赖包,然后把他放在dinky的flink对应依赖目录里面去。
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
5)添加flink-kafka的connector依赖
这里的kafka-connector是现成的,我们可以直接在maven repository里面进行下载,地址是:
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/3.1.0-1.18
下载完成之后,记得放在dinky的flink对应依赖目录里面去。
6)mysql创建库表
这里我们在mysql的test库里面创建一张users的表,示例如下:
DROP TABLE IF EXISTS `users`; CREATE TABLE `users` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) NOT NULL, `time` datetime NOT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 11 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic; -- ---------------------------- -- Records of users -- ---------------------------- INSERT INTO `users` VALUES (1, 1, '2024-01-24 13:14:00'); INSERT INTO `users` VALUES (2, 1, '2024-01-24 13:14:00'); INSERT INTO `users` VALUES (3, 1, '2024-01-24 13:14:00'); INSERT INTO `users` VALUES (4, 1, '2024-01-24 13:14:00'); INSERT INTO `users` VALUES (5, 8, '2024-02-14 13:14:00'); INSERT INTO `users` VALUES (6, 8, '2024-02-13 13:14:00'); INSERT INTO `users` VALUES (7, 8, '2024-03-24 13:14:00'); INSERT INTO `users` VALUES (8, 8, '2024-03-23 13:14:00'); INSERT INTO `users` VALUES (9, 8, '2024-04-13 13:14:00'); INSERT INTO `users` VALUES (10, 8, '2024-04-13 13:14:00');
这里我们弄好了,如下图:
7)开发flinksql
接下来我们就开发flinksql了,这里进入到dinky的dashboard,然后点击数据开发,然后创建一个作业,把如下的信息填写进去:
--在flink sql环境执行创建source-mysql表 CREATE TABLE users_source ( `id` int, `user_id` int, `time` timestamp, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector'='mysql-cdc', 'hostname' = '192.168.31.217', 'port' = '33306', 'username' = 'root', 'password' = '123456', 'database-name' = 'test', 'table-name' = 'users' ); --在flink sql环境执行创建kafka对应的表 CREATE TABLE users_sink( `id` int, `user_id` int, `time` timestamp, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'test.users', 'properties.bootstrap.servers' = '192.168.31.254:9092', 'properties.group.id' = 'flink-cdc-kafka-group', 'key.format' = 'json', 'value.format' = 'json' ); --执行插入动作 insert into users_sink select * from users_source;
如下图:
然后我们点击右上角的运行按钮:
然后我们进入到kafka的docker容器里面,启动一个控制台消费者:
kafka-console-consumer.sh --bootstrap-server 192.168.31.254:9092 --topic test.users
稍等片刻就可以看到已经把mysql的数据同步到了kafka里面了。
以上就是使用flink cdc把mysql的数据同步到kafka的案例。
备注:
1、这里我们把数据同步到kafka的时候一定要记得使用json的格式,方便后续的程序进行解析。
还没有评论,来说两句吧...