在flink sql开发中,我们对于数据源来说,首先需要做的就是create table,也就是创建一张表,在flink sql中关于create table的语句和传统的mysql中创建表的是不完全一样,也就是只有一部分是一样的。在flink sql中创建表的话,完整的语法如下:
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name ( { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n] [ <watermark_definition> ] [ <table_constraint> ][ , ...n] ) [COMMENT table_comment] [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] WITH (key1=val1, key2=val2, ...) [ LIKE source_table [( <like_options> )] | AS select_query ] <physical_column_definition>: column_name column_type [ <column_constraint> ] [COMMENT column_comment] <column_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED <table_constraint>: [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED <metadata_column_definition>: column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ] <computed_column_definition>: column_name AS computed_column_expression [COMMENT column_comment] <watermark_definition>: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression <source_table>: [catalog_name.][db_name.]table_name <like_options>: { { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS } | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } }[, ...]
下面我们挨个来剖析一下这个语法:
1)关于columns
这里的columns主要是创建表的时候指定列,这里指定列的语法和传统的mysql是一样的,例如:
CREATE TABLE users( `user_id` BIGINT, `name` STRING ) WITH ( ... );
在列里面除了指定对应的类型之外,还可以指定metadata,这里的metadata是flink sql中的扩展,主要是用metadata关键词指定记录中的某个字段,例如我们需要基于时间流来处理数据,数据从kafka中读取,那么我们就需要使用metadata关键词来指定一个事件的字段,以方便后续的处理,例如:
CREATE TABLE users ( `user_id` BIGINT, `name` STRING, `login_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka' ... );
同时我们也可以直接指定某个字段为metadata,例如:
CREATE TABLE users ( `user_id` BIGINT, `name` STRING, `timestamp` TIMESTAMP_LTZ(3) METADATA ) WITH ( 'connector' = 'kafka' ... );
在columns里面我们还可能会涉及到窗口函数,也就是需要指定某个字段为水位线的时间字段,例如:
CREATE TABLE users ( `user_id` BIGINT, `name` STRING, `timestamp` TIMESTAMP_LTZ(3) METADATA, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka' ... );
在column里面同样还有一个东西就是主键,和传统的mysql是一样的,它主要是也是flink table sql里面某一列值唯一并且不包含null。例如:
CREATE TABLE users ( `user_id` BIGINT, `name` STRING, `timestamp` TIMESTAMP_LTZ(3) METADATA, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'kafka' ... );
2)with options
在创建表的时候,后面我们会带有一个with,这里主要是填写一些底层的连接器的信息。在flink中,底层不存储数据,他只是一个计算框架,包括这里的flink table也是一样的,这里的with可以是数据来源的数据源,也可以是写入存储的sink的数据源。with示例有:
CREATE TABLE users ( `user_id` BIGINT, `name` STRING, `timestamp` TIMESTAMP_LTZ(3) METADATA, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND, PRIMARY KEY(user_id) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'scan.startup.mode' = 'earliest-offset' ... )
3)like
这里的like我们不是介绍的sql语句里面的模糊查询,而是一种相识度的创建表,例如我们想创建一张users1的表,users1的表里面大部分字段和users都是相同的,那么使用like创建表的示例如下:
CREATE TABLE users1( WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'scan.startup.mode' = 'latest-offset' ) LIKE users;
在like里面也有一些配置项,例如:
#新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,例如新表和源表存在相同 key 的属性。 including #新表不包含源表指定的任何表属性 excluding #新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,例如,两个表中都存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值。 overwriting
带有like的options的参数有:
CREATE TABLE users1( WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND ) WITH ( 'scan.startup.mode' = 'latest-offset' ) LIKE users( -- 排除需要生成 watermark 的计算列之外的所有内容。 EXCLUDING ALL -- 去除不适用于 kafka 的所有分区和文件系统的相关属性。 INCLUDING GENERATED );
当然除了以上的最通用的创建表之外,还有一种as select_statement的方式来创建表。他主要以查询出来的结果对数据进行建表,示例sql如下:
CREATE TABLE users2 WITH ( 'connector' = 'kafka', ... ) AS SELECT id, name, age FROM users WHERE mod(id, 10) = 0;
以上就是创建表的常用的集中方式。
还没有评论,来说两句吧...