3 个回答
flink sql 查询示例如下:
SET 'execution.checkpointing.interval' = '3s';
CREATE TABLE mysql_cdc_users (
id INT,
username STRING,
gender STRING,
age INT,
password STRING,
register timestamp(3),
PRIMARY KEY (id) NOT ENFORCED
)
WITH
(
'connector' = 'mysql-cdc',
'hostname' = '192.168.1.249',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'test',
'server-time-zone' = 'UTC+8',
'scan.incremental.snapshot.enabled' = 'true',
'debezium.snapshot.mode' = 'initial', -- 或者key是scan.startup.mode,initial表示要历史数据,latest-offset表示不要历史数据
'debezium.datetime.format.date' = 'yyyy-MM-dd',
'debezium.datetime.format.time' = 'HH-mm-ss',
'debezium.datetime.format.datetime' = 'yyyy-MM-dd HH-mm-ss',
'debezium.datetime.format.timestamp' = 'yyyy-MM-dd HH-mm-ss',
'debezium.datetime.format.timestamp.zone' = 'UTC+8',
'table-name' = 'users'
);
select * from mysql_cdc_users;
发布于:4个月前 (12-19) IP属地:四川省
我来回答
您需要 登录 后回答此问题!