现各公司都在建设数仓,尤其现在的实时数仓,所以对于CDC的需求是非常硬性的,CDC在我们之前也有使用,因此准备更新下CDC相关的文章,本篇我们介绍Canal的服务端安装并且使用JAVA客户端读取数据变更。
一、首先说下canal是什么?
Canal是一个专注mysql实时同步的一个工具,这个工具其实主要利用的原理是mysql的主从复制原理。即master-slave的架构。canal启动后,把自己伪装成mysql的slave角色,然后向master发起dump请求,master接收到dump请求后,给canal推送binlog。此时canal接收到binlog之后对binlog进行解析就能得到mysql的master服务节点的实时数据更新情况。下面的图示展示的是canal做数据同步的流程信息:
二、canal的下载
canal主要是阿里巴巴开源的,因此我们在阿里巴巴的github页面就能找到canal的的下载信息。页面地址是:https://github.com/alibaba/canal/releases
这里最新的文档版本是1.1.6版本。
所以下面的演示部署主要以1.1.6版本为主。
三、canal服务端安装
3.1、下载canal1.1.6版本
这里我们已经下载好了,并且安装。
3.2、在mysql中创建用于同步的账户,并且分配权限
CREATE USER slave1 IDENTIFIED BY 'slave1'; GRANT ALL PRIVILEGES ON *.* TO 'slave1'@'%'; FLUSH PRIVILEGES;
这里我们的权限一般我在生产里面不要配置all权限,根据实际情况配置即可。这里是演示,因此我们配置的是all权限。
3.3、配置配置文件
在canal里面,配置文件的地址是在${canal_home}/conf目录,这里我们主要的目的是为了使用java读取客户端的变更,因此我们需要在服务端直接启动canal的server即可,所以我们需要在canal的配置文件里面配置下我们的mysql信息,这里的配置文件是:$canal/conf/example/instance.properties
主要修改的内容是:
# position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name=mall-mysql-bin.000003 canal.instance.master.position=740 canal.instance.master.timestamp= canal.instance.master.gtid= # username/password canal.instance.dbUsername=slave1 canal.instance.dbPassword=slave1
这上面的配置binlog文件和position怎么来呢?其实就是在mysql端,我们执行下命令:
show master status
把这里面的内容复制进去即可。
3.4、启动canal服务端
进入到${canal}/bin目录下,执行:
./startup.sh
然后我们执行下jps命令即可看到CanalLauncher进程:
此时就代表canal启动完成了。
备注:canal是一个java客户端,所以我们需要在canal的服务器上部署jdk版本。
四、编写java程序连接canal服务端读取mysql的变化
4.1、查看canal服务端的监听端口
在上一个步骤我们启动了canal的服务端,我们看下canal启动了哪些端口监听,执行下对应的命令
这里我们看到启动了11110端口,11111端口,11112端口。
4.2、创建maven项目,并且引入依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
4.3、编写监听类进行监听,同时打印出来相关的变更数据
package com.canal.demo; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import com.alibaba.fastjson.JSON; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; import lombok.extern.slf4j.Slf4j; @Slf4j public class Application { public static void main(String[] args) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("1.13.172.231", 11111), "example", "", ""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; log.info("正在接受数据库变更信息。。。"); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 } } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { log.error(e.getMessage()); } EventType eventType = rowChage.getEventType(); log.info("当前监听到的binlog变化文件是:{}", entry.getHeader().getLogfileName()); log.info("当前监听到的binlog对应修改的数据库名是:{}", entry.getHeader().getSchemaName()); log.info("当前监听到的binlog对应修改的表名是:{}", entry.getHeader().getTableName()); log.info("当前监听到的binlog对应操作表的操作类型是:{}", eventType); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { log.info("正在执行delete语句,原row对应的数据是:{}", JSON.toJSONString(getColumRs(rowData.getBeforeColumnsList()))); } else if (eventType == EventType.INSERT) { log.info("正在执行insert语句,原row对应的数据是:{}", JSON.toJSONString(getColumRs(rowData.getAfterColumnsList()))); } else { // 这里就代表执行了update语句,因此我们可以获取到变更之前的内容和变更之后的内容 log.info("正在执行update语句,原row对应的数据是:{}", JSON.toJSONString(getColumRs(rowData.getBeforeColumnsList()))); log.info("正在执行update语句,原row对应修改后的数据是:{}", JSON.toJSONString(getColumRs(rowData.getAfterColumnsList()))); } } } } /** * 这里把对应变化的字段打印出来。 * * @param columns */ private static HashMap<String, Object> getColumRs(List<Column> columns) { HashMap<String, Object> rs = new HashMap<String, Object>(); for (Column column : columns) { rs.put(column.getName(), column.getValue()); } return rs; } }
4.4、启动下测试
1、启动这个java类
2、去数据库里面操作下数据。
然后我们可以看到控制台的输出如下:
这数据是非常实时的,数据库只要有变更,canal-client这边就会立马感知到数据的变化,在实时数据变更做数仓的环境下,这是非常好使的。
最后我们当然还是要把源码附带上,登录即可看到java端的源码。
还没有评论,来说两句吧...