上一篇文章《Dinky 实时计算平台系列(十九)FlinkSql作业开发之FlinkSql-Kafka》我们介绍了flinksql中kafka的connector,在实际的业务中我们还会涉及到的有mysql的cdc,本文的话我们介绍下cdc。下面直接开始
1)准备一个mysql
这里mysql的话,我们也有相关的安装文档,大家看下即可,详见:《centos如何使用docker安装mysql?》。
2)为mysql开启binlog
mysql开启binlog,这里我们也有文章介绍过,详见:《mysql开启binlog需要配置哪些参数》。
3)为mysql创建一个子账户
这里我们尽量避免使用root账户进行数据同步,尽量创建一个子账户,具体使用命令如下:
CREATE USER slave1 IDENTIFIED BY 'slave1'; GRANT ALL PRIVILEGES ON *.* TO 'slave1'@'%'; FLUSH PRIVILEGES;
4)下载flink-sql-mysql-cdc的依赖包
这里和kafka一样,我们需要下载对应的flink-sql-mysql-cdc的依赖包,下载地址是:
https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
下载之后,我们把它放到这两个文件夹中,分别是:${Dinky_HOME}/plugins/flink1.17/dinky/和${Flink_home}/lib/目录下,如下图:
备注:
这里其实还需要一个mysql-connector-java-xxx.jar,但是由于flink已经自带了,所以不需要再单独下载了。
5)进入mysql创建一张users的表
CREATE TABLE `users` ( `id` int(11) NOT NULL, `name` varchar(255) DEFAULT NULL COMMENT '姓名', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
创建成功之后就可以看到这张表了
6)编写flink-sql作业
然后我们登陆到dinky平台,进入到数据开发模块,创建一个名为test3的作业,然后填充如下的flink-sql
CREATE TABLE IF NOT EXISTS users ( id STRING, name STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '192.168.31.218', 'port' = '3306', 'username' = 'slave1', 'password' = 'slave1', 'database-name' = 'test', 'table-name' = 'users' ); select * from users;
示例图如下:
7)运行flink-sql-cdc的作业
这里我们把这个作业给运行起来
然后我们去mysql数据库中添加两条数据
然后再回到dinky平台中,点击获取最新的数据:
就可以看到具体的数据了。
以上就是flink-sql-mysql-cdc数据同步的案例。
还没有评论,来说两句吧...