现各公司都在建设数仓,尤其现在的实时数仓,所以对于CDC的需求是非常硬性的,CDC在我们之前也有使用,因此准备更新下CDC相关的文章,本篇我们介绍Canal的服务端安装并且使用JAVA客户端读取数据变更。
一、首先说下canal是什么?
Canal是一个专注mysql实时同步的一个工具,这个工具其实主要利用的原理是mysql的主从复制原理。即master-slave的架构。canal启动后,把自己伪装成mysql的slave角色,然后向master发起dump请求,master接收到dump请求后,给canal推送binlog。此时canal接收到binlog之后对binlog进行解析就能得到mysql的master服务节点的实时数据更新情况。下面的图示展示的是canal做数据同步的流程信息:
二、canal的下载
canal主要是阿里巴巴开源的,因此我们在阿里巴巴的github页面就能找到canal的的下载信息。页面地址是:https://github.com/alibaba/canal/releases
这里最新的文档版本是1.1.6版本。
所以下面的演示部署主要以1.1.6版本为主。
三、canal服务端安装
3.1、下载canal1.1.6版本
这里我们已经下载好了,并且安装。
3.2、在mysql中创建用于同步的账户,并且分配权限
CREATE USER slave1 IDENTIFIED BY 'slave1'; GRANT ALL PRIVILEGES ON *.* TO 'slave1'@'%'; FLUSH PRIVILEGES;
这里我们的权限一般我在生产里面不要配置all权限,根据实际情况配置即可。这里是演示,因此我们配置的是all权限。
3.3、配置配置文件
在canal里面,配置文件的地址是在${canal_home}/conf目录,这里我们主要的目的是为了使用java读取客户端的变更,因此我们需要在服务端直接启动canal的server即可,所以我们需要在canal的配置文件里面配置下我们的mysql信息,这里的配置文件是:$canal/conf/example/instance.properties
主要修改的内容是:
# position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name=mall-mysql-bin.000003 canal.instance.master.position=740 canal.instance.master.timestamp= canal.instance.master.gtid= # username/password canal.instance.dbUsername=slave1 canal.instance.dbPassword=slave1
这上面的配置binlog文件和position怎么来呢?其实就是在mysql端,我们执行下命令:
show master status
把这里面的内容复制进去即可。
3.4、启动canal服务端
进入到${canal}/bin目录下,执行:
./startup.sh
然后我们执行下jps命令即可看到CanalLauncher进程:
此时就代表canal启动完成了。
备注:canal是一个java客户端,所以我们需要在canal的服务器上部署jdk版本。
四、编写java程序连接canal服务端读取mysql的变化
4.1、查看canal服务端的监听端口
在上一个步骤我们启动了canal的服务端,我们看下canal启动了哪些端口监听,执行下对应的命令
这里我们看到启动了11110端口,11111端口,11112端口。
4.2、创建maven项目,并且引入依赖
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency>
4.3、编写监听类进行监听,同时打印出来相关的变更数据
package com.canal.demo;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import com.alibaba.fastjson.JSON;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Application {
public static void main(String[] args) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("1.13.172.231", 11111),
"example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
log.info("正在接受数据库变更信息。。。");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
log.error(e.getMessage());
}
EventType eventType = rowChage.getEventType();
log.info("当前监听到的binlog变化文件是:{}", entry.getHeader().getLogfileName());
log.info("当前监听到的binlog对应修改的数据库名是:{}", entry.getHeader().getSchemaName());
log.info("当前监听到的binlog对应修改的表名是:{}", entry.getHeader().getTableName());
log.info("当前监听到的binlog对应操作表的操作类型是:{}", eventType);
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
log.info("正在执行delete语句,原row对应的数据是:{}",
JSON.toJSONString(getColumRs(rowData.getBeforeColumnsList())));
} else if (eventType == EventType.INSERT) {
log.info("正在执行insert语句,原row对应的数据是:{}",
JSON.toJSONString(getColumRs(rowData.getAfterColumnsList())));
} else {
// 这里就代表执行了update语句,因此我们可以获取到变更之前的内容和变更之后的内容
log.info("正在执行update语句,原row对应的数据是:{}",
JSON.toJSONString(getColumRs(rowData.getBeforeColumnsList())));
log.info("正在执行update语句,原row对应修改后的数据是:{}",
JSON.toJSONString(getColumRs(rowData.getAfterColumnsList())));
}
}
}
}
/**
* 这里把对应变化的字段打印出来。
*
* @param columns
*/
private static HashMap<String, Object> getColumRs(List<Column> columns) {
HashMap<String, Object> rs = new HashMap<String, Object>();
for (Column column : columns) {
rs.put(column.getName(), column.getValue());
}
return rs;
}
}4.4、启动下测试
1、启动这个java类
2、去数据库里面操作下数据。
然后我们可以看到控制台的输出如下:
这数据是非常实时的,数据库只要有变更,canal-client这边就会立马感知到数据的变化,在实时数据变更做数仓的环境下,这是非常好使的。
最后我们当然还是要把源码附带上,登录即可看到java端的源码。

















还没有评论,来说两句吧...