现如今大部分ToC的业务中都会有一些建数仓的需求,所以对于流批一体化建设数仓的需求非常多。目前建立流批一体化的数仓方案主要以Flinkcdc+doris为主(大多数企业目前几乎都在使用此方案)。所以这里我们也来介绍下当前的这个方案。
在做流批一体化数仓的时候,一般我们在线部分的数据大部分都是从mq或者cdc这样的方式来的,所以例如线上业务型的数据一般都主要以mysql为主。那么flinkcdc的是使用频率就非常高。
目前网上使用flinkcdc同步数据到doris得话,大部分都是使用flinksql进行同步得,此时如果对于mysql上有数据库表结构变更得话,我们就需要使用修改doris相关得库表信息,同时修改flinksql的作业,再进行重启。是不是很麻烦。
在doris相关的starrocks其实也提供了相关的方案,就是使用smt工具,具体的可以参考:
https://docs.starrocks.io/zh/docs/loading/Flink_cdc_load/#%E5%90%8C%E6%AD%A5%E5%BA%93%E8%A1%A8%E7%BB%93%E6%9E%84
今天我们介绍的主要是编写一个flinkcdc的job任务,可以实现的目的是:
1、实现mysql的整库cdc同步到doris中 2、mysql库里面表结构更新之后,不用手动去doris修改表结构,可以实现自动的数据同步 3、mysql库里面表结构更新之后,flinkjob不用做任何修改,也不用重启,可以实现无缝同步,包括新增字段的数据同步。
下面我们就来演示一下:
1)准备mysql
这里我们准备一个mysql示例,同时需要开启binlog的配置。在本站我们已经写过对应的文章《mysql开启binlog需要配置哪些参数》。大家可以参考一下,这里我们假设已经部署好了一个开启binlog的mysql实例:
2)准备一个doris的环境
这里我们准备一个doris的环境,演示的话,使用一个单机就好
3)mysql创建库表
这里我们首先在mysql中创建一个演示的库和表,方便一会做数据演示,创建的库名称为test,创建的表名称为user,对应的建表语句是:
CREATE TABLE `user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8; INSERT INTO `test`.`user`(`id`, `name`) VALUES (1, '张三'); INSERT INTO `test`.`user`(`id`, `name`) VALUES (2, '李四');
创建好了,示例图如下:
4)doris创建库表
这里我们在doris里面同样创建一个名称为test和user的表,主要是为了和mysql对应起来,创建的示例语句如下:
create databse test; use test; CREATE TABLE `user` ( `id` int(11) NULL COMMENT '唯一id', `name` varchar(20) NULL, `age` tinyint(4) NULL, `sex` tinyint(4) NULL ) ENGINE=OLAP UNIQUE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "is_being_synced" = "false", "storage_format" = "V2", "light_schema_change" = "true", "disable_auto_compaction" = "false", "enable_single_replica_compaction" = "false" );
创建好了之后,如下图:
6)创建flinkcdc job
接着我们就来重头戏了,就是编写一个java的项目,实现这个flink cdc的job就可以完成,整体的思路如下:
1、创建一个maven项目 2、引入相关的依赖 3、编写MysqlCdcJob任务。
引入的maven依赖如下:
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.18.0</version> </dependency> <!--mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-sql-connector-mysql-cdc</artifactId> <version>2.4.2</version> </dependency> <!-- flink-doris-connector --> <dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector-1.16</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>thrift-service</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>1.15.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.18.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-bridge --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>1.18.0</version> <scope>provided</scope> </dependency> <!-- SLF4J依赖,编译时和运行时都需要 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.26</version> </dependency> <!-- Logback依赖,运行时需要 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.2.3</version> <scope>runtime</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> <scope>runtime</scope> </dependency> </dependencies>
然后编写cdcjob,这里的示例代码的话在文末提供下载,直接下载即可。整个job的代码很简单,整个job的代码只有81行,示例图如下:
整个项目的示例图如下:
然后我们把项目运行起来:
此时我们由于mysql里面已经有两条示例数据了,所以我们看看doris中是否已经同步过来了这两条数据:
可以看到flink的cdc是成功的,并且数据已经同步到doris中了。
接着我们演示下mysql新增字段,在mysql上新增一个sex字段,示例如下:
这里我们使用navicat直接新增的,当然也可以使用alter语句进行配置。然后我们看看217mysql的情况,然后再看看doris的情况:
可以看到mysql上新增了sex字段,doris自动新增了sex字段。可以看到整个flinkjob已经把这个新增的字段同步过来了,实现了自动doris的表结构变更。flink上的执行日志如下:
接着我们给id为1的数据赋予sex的值,看下是否可以同步过去:
然后点击保存,看啊可能doris的sex字段是否同步进来了:
可以看到完全没有问题,然后我们再在mysql中新增一条王五的记录,然后再看下doris:
也同步过来了,接着我们在mysql中删除sex字段,再看看效果:
可以看到doris也同步的删除了sex字段。是不是非常方便。全程没有重启或者改动这个flinkjob的任何操作:
以上就是使用Flink CDC同步整库的数据到doris,能实现自动变更库表数据结构。最后按照惯例,附上本案例的源码,登陆后即可下载。
还没有评论,来说两句吧...