在日常的业务场景中,我们使用flink做cdc的工作的时候,大多数都有这样的需求,把mysql的数据近视的导入到doris中做分析。为了方便简写代码,这里我们使用Dinky来演示下把mysql的数据使用cdc同步到doris中去,下面直接开始演示:
1)下载flink-doris-connector依赖包
这里我们需要使用flink-doris-connector的依赖包,所以这里的话,我们需要去官网把这个依赖包下载后放到flink的执行环境中去,下载地址是:flink-doris-connector官网下载。这里我的flink版本是1.17的,所以我们下载的是1.17版本的,如下图:
下载完毕之后,我们把这个依赖包放到${flink_home}/lib/目录下,再重启下flink集群。
2)下载mysql cdc的依赖包
这里同样的我们需要把flink-sql-connector-mysql-cdc的依赖包下载下来,放到flink的${flink_home}/lib目录下,flink-sql-connector-mysql-cdc依赖包的下载地址是:flink-sql-connector-mysql-cdc依赖下载。
3)下载mysql-connector-java依赖包
这里我们还要下载mysql-connector-java依赖包,把这个依赖包同样的放到flink的${flink_home}/lib目录下,mysql-connector-java依赖包的下载地址是:mysql-connector-java依赖包下载.
4)准备一个mysql
这里我们需要准备一个mysql,并且开启binlog,这里的演示我们就省略了。
5)准备一份示例sql
这里我们准备一份示例sql,用来做后面的同步演示,例如我们这里的示例数据表是:
这里的示例表数据会在文末提供。
6)doris创建对应的表
这里既然我们想要把mysql的数据同步到doris中去,那么肯定是需要在doris中创建一张相同的表,这里的建表语句是:
CREATE TABLE `township` ( `id` int(11) NOT NULL, `province` varchar(60) NULL, `province_code` varchar(60) NULL, `city` varchar(60) NULL, `city_code` varchar(60) NULL, `area` varchar(60) NULL, `area_code` varchar(60) NULL, `town` varchar(60) NULL, `town_code` varchar(60) NULL ) ENGINE=OLAP UNIQUE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1" );
这里需要注意几个问题:
1、如果使用cdc同步到doris的话,会涉及到mysql的新增,修改和删除,那么doris中只能使用unique类型的表。 2、mysql的varchar字段如果是20的话,那么doris中需要*3,也就是60,这个是一定要注意的问题。
7)进入dinky创建作业
这里我们进入到dinky的页面上,点击数据开发模块
然后在下面创建一个mysql-2-doris名称的作业:
然后把下面的sql语句复制进去:
-- 开启checkpoint SET 'execution.checkpointing.interval' = '30s'; CREATE TABLE source_township ( id int NOT NULL, province STRING, province_code STRING, city STRING, city_code STRING, area STRING, area_code STRING, town STRING, town_code STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.31.218', 'port' = '33306', 'username' = 'root', 'password' = '123456', 'database-name' = 'test3', 'table-name' = 'township' ); CREATE TABLE doris_township_sink ( id int NOT NULL, province STRING, province_code STRING, city STRING, city_code STRING, area STRING, area_code STRING, town STRING, town_code STRING ) WITH ( 'connector' = 'doris', 'fenodes' = '192.168.31.218:8030', 'table.identifier' = 'demo.township', 'username' = 'root', 'password' = 'root123@.', 'sink.label-prefix' = 'doris_label_township2' ); INSERT INTO doris_township_sink select id,province,province_code,city,city_code,area,area_code,town,town_code from source_township;
在这个sql中我们主要做的事情有:
1、开启checkpoint 2、创建一张source_township的source表,配置的属性是mysql-cdc 3、创建一张doris_township_sink的sink表,配置的属性是doris。 4、从source_township表中查询数据,再插入到doris_township_sink表中。
当数据放入到sink表之后,数据就会自然落地到doris中去了。然后我们测试一下,点击上面的运行按钮
然后我们区看看doris中是否有数据:
可以看到doris中的历史数据都已经进来了。此时我们区mysql中修改一条数据:
然后我们再去看看doris中数据是否修改了:
可以看到数据已经进来了,说明同步没有任何问题。这样子可以达到的效果是把对应表的历史数据都同步到doris中,并且后续的数据也会自动更新到doris中,是不是很简单?
备注:
1、这里需要注意doris对应的varchar长度*3的问题。
2、这里需要注意在Flink sql中创建doris sink表的属性里面的label问题。这里的label不能重复。
最后按照惯例,附上本案例的示例sql,登录后即可下载。
还没有评论,来说两句吧...