在flink sql开发中,我们必然会涉及到创建表的步骤,所以本文的话,我们介绍一些常用的建表语句模板。
1)flink sql 创建kafka的表
这里就是相当于with后面跟kafka,可以从kafka里面读取数据,也可以写入数据到kafka中,建表语句模板如下:
CREATE TABLE Kafka_Table ( `event_time` TIMESTAMP ( 3 ) METADATA FROM 'timestamp', `partition` BIGINT METADATA VIRTUAL, `offset` BIGINT METADATA VIRTUAL, `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' );
注意这里with里面的properties有几个可选择的属性,可选属性如下:
--可选: 'value.fields-include' = 'ALL', --可选: 'json.ignore-parse-errors' = 'true', --可选: 'key.fields-prefix' = 'k_',
2)flink sql创建Doris的表
这里就相当于with后面跟doris的信息,可以把数据写入到doris中,建表语句模板如下:
CREATE TABLE doris_table ( cid INT, sid INT, NAME STRING, cls STRING, score INT, PRIMARY KEY ( cid ) NOT ENFORCED ) WITH ( 'connector' = 'doris', 'fenodes' = '127.0.0.1:8030', 'table.identifier' = 'test.scoreinfo', 'username' = 'root', 'password' = '' );
3)flink sql创建jdbc的表
这里主要是针对于mysql的,可以从mysql读取数据,也可以写入到mysql中。建表语句模板如下:
CREATE TABLE JDBC_table ( id BIGINT, NAME STRING, age INT, STATUS BOOLEAN, PRIMARY KEY ( id ) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users', 'username' = 'root', 'password' = '123456' );
注意这里with里面的properties有几个可选择的属性,可选属性如下:
--可选: 'sink.parallelism' = '1', --可选: 'lookup.cache.ttl' = '1000s',
4)flink sql创建Hive的表
这里在with后面填写hive的信息,可以对hive进行读写,建表语句模板如下:
##创建 hive的catalog CREATE CATALOG hive WITH ( 'type' = 'hive', 'hive-conf-dir' = '/app/wwwroot/MBDC/hive/conf/', 'hadoop-conf-dir' = '/app/wwwroot/MBDC/hadoop/etc/hadoop/' ); ##创建hive表 CREATE TABLE hive_stream_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY ( dt STRING, hr STRING ) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern' = '$dt $hr:00:00', 'sink.partition-commit.trigger' = 'partition-time', 'sink.partition-commit.delay' = '1min', 'sink.semantic' = 'exactly-once', 'sink.rolling-policy.rollover-interval' = '1min', 'sink.rolling-policy.check-interval' = '1min', 'sink.partition-commit.policy.kind' = 'metastore,success-file' );
5)创建mysql cdc的表
这里也是针对于mysql使用的,只是他是从mysql中读取数据,建表语句模板如下:
CREATE TABLE mysql_cdc_table ( cid INT, sid INT, cls STRING, score INT, PRIMARY KEY ( cid ) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test', 'server-time-zone' = 'UTC', 'scan.incremental.snapshot.enabled' = 'true', 'debezium.snapshot.mode' = 'latest-offset',-- 或者key是scan.startup.mode,initial表示要历史数据,latest-offset表示不要历史数据 'debezium.datetime.format.date'='yyyy-MM-dd', 'debezium.datetime.format.time'='HH-mm-ss', 'debezium.datetime.format.datetime'='yyyy-MM-dd HH-mm-ss', 'debezium.datetime.format.timestamp'='yyyy-MM-dd HH-mm-ss', 'debezium.datetime.format.timestamp.zone'='UTC+8', 'table-name' = 'mysql_cdc_table');
6)创建Hudi的表
这里是对接数据湖Hudi的信息,建表语句模板如下:
CREATE TABLE hudi_table ( `goods_order_id` BIGINT COMMENT '自增主键id', `goods_order_uid` string COMMENT '订单uid', `customer_uid` string COMMENT '客户uid', `customer_name` string COMMENT '客户name', `create_time` TIMESTAMP ( 3 ) COMMENT '创建时间', `update_time` TIMESTAMP ( 3 ) COMMENT '更新时间', `create_by` string COMMENT '创建人uid(唯一标识)', `update_by` string COMMENT '更新人uid(唯一标识)', PRIMARY KEY ( goods_order_id ) NOT ENFORCED ) COMMENT 'hudi_table' WITH ( 'connector' = 'hudi', 'path' = 'hdfs://cluster1/data/bizdata/cdc/mysql/order/goods_order',-- 路径会自动创建 'hoodie.datasource.write.recordkey.field' = 'goods_order_id', -- 主键 'write.precombine.field' = 'update_time', -- 相同的键值时,取此字段最大值,默认ts字段 'read.streaming.skip_compaction' = 'true', -- 避免重复消费问题 'write.bucket_assign.tasks' = '2', -- 并发写的 bucekt 数 'write.tasks' = '2', 'compaction.tasks' = '1', 'write.operation' = 'upsert', -- UPSERT(插入更新)\INSERT(插入)\BULK_INSERT(批插入)(upsert性能会低些,不适合埋点上报) 'write.rate.limit' = '20000', -- 限制每秒多少条 'table.type' = 'COPY_ON_WRITE', -- 默认COPY_ON_WRITE , 'compaction.async.enabled' = 'true', -- 在线压缩 'compaction.trigger.strategy' = 'num_or_time', -- 按次数压缩 'compaction.delta_commits' = '20', -- 默认为5 'compaction.delta_seconds' = '60', -- 默认为1小时 'hive_sync.enable' = 'true', -- 启用hive同步 'hive_sync.mode' = 'hms', -- 启用hive hms同步,默认jdbc 'hive_sync.metastore.uris' = 'thrift://cdh2.vision.com:9083', -- required, metastore的端口 'hive_sync.jdbc_url' = 'jdbc:hive2://cdh1.vision.com:10000', -- required, hiveServer地址 'hive_sync.table' = 'order_mysql_goods_order', -- required, hive 新建的表名 会自动同步hudi的表结构和数据到hive 'hive_sync.db' = 'cdc_ods', -- required, hive 新建的数据库名 'hive_sync.username' = 'hive', -- required, HMS 用户名 'hive_sync.password' = '123456', -- required, HMS 密码 'hive_sync.skip_ro_suffix' = 'true' -- 去除ro后缀 );
以上就是在flink中常用的外部数据源关联建表的模板。
还没有评论,来说两句吧...