上文《实时流存储fluss学习系列(六)Fluss日志表介绍及使用教程》已经介绍了fluss中日志表的使用,这里再介绍另外一种主键表。
主键表的顾名思义就是当前创建的表有主键,和mysql的使用是相似的。同时主键表与日志表的不同点有:
1、主键表的每一张表中都有主键。 2、主键表的每一张表不仅支持插入,还支持更新和删除。 3、主键表的每一张表还支持部分列的更新。
下面我们也来演示下主键表相关的数据。
一、创建表
创建一张用户表,带有主键和分区:
#创建多数据源目录,指向fluss CREATE CATALOG my_fluss WITH ( 'type' = 'fluss', 'bootstrap.servers' = 'coordinator-server:9123' ); #使用fluss多数据源目录 USE CATALOG my_fluss; #创建users表 CREATE TABLE users ( user_id INT, user_name STRING, user_age INT, user_birthday string, register_year string, PRIMARY KEY (user_id,register_year) NOT ENFORCED )PARTITIONED BY (register_year) WITH ( 'table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'YEAR', 'table.auto-partition.num-precreate' = '5', 'table.auto-partition.num-retention' = '20', 'bucket.num' = '2', 'bucket.key'='user_id' );
执行上面的语句,我们的users表就创建好了
二、插入数据
接下来演示下插入数据,这里还是选择硬插入数据:
insert into users values(1,'张三',10,'1998-01-01','2024'); insert into users values(2,'李四',11,'1999-01-01','2025'); insert into users values(3,'王五',12,'2000-01-01','2026'); insert into users values(4,'赵六',13,'2000-01-01','2027');
三、查询数据
演示下查询数据
select * from users;
可以看到数据被创建出来了。
备注:
1、查询fluss里面的数据只能使用流模式进行更新,所以需要设置为流模式,例如:
SET 'execution.runtime-mode' = 'streaming';
四、更新数据
接下来演示下更新数据,这里更新数据分为两部分,分别是使用update更新和row更新。
1)使用update更新
这里update更新的话需要注意两点:
1、把flink-sql的模式修改为batch模式,不能是streaming模式。 2、更新的时候where后面必须带有所有的主键字段,缺一不可。
比如这里我们把id=3的用户年龄修改为16,执行的sql语句如下:
#设置为batch模式 SET 'execution.runtime-mode' = 'batch'; #更新语句 update users set user_age = 16 where user_id = 3 and register_year='2026';
执行完毕就可以看到更新成功了(注意这里查询的时候需要把模式切换回流模式:SET 'execution.runtime-mode' = 'streaming';)
2)部分列更新
这里部分列更新的话,其实不是说的更新,而是只插入,也就是比如users表有5个字段,根据主键第一次插入2个字段,第二次插入3个字段,最后fluss会把这5个字段的值根据主键进行合并形成完整的一条数据,下面演示下:
#插入id=7,分区为=2027的用户姓名 insert into users(user_id,user_name,register_year) values(7,'田七','2027'); #查询结果 select * from users;
#插入id=7,分区为=2027的用户年龄和生日 insert into users(user_id,user_age,user_birthday,register_year) values(7,13,'2000-01-01','2027'); #查询结果 select * from users;
#插入id=7,分区为=2027的用户年龄=20 insert into users(user_id,user_age,register_year) values(7,20,'2027'); #查询结果 select * from users;
可以数据被插入合并了,年龄也被修改了,这里的部分列更新其实就是分部分进行字段值得新增或者修改,fluss会默认根据主键进行合并。使用的语法主要是insert而不是update。
五、删除数据
这里的删除数据就比较简单了,主要是使用delete语句删除即可
#切换为batch模式 SET 'execution.runtime-mode' = 'batch'; #删除数据 delete from users where user_id=7 and register_year='2027';
以上就是关于fluss主键表相关的介绍和使用教程。
备注:
1、需要注意在update和delete的时候需要把flinksql的模式修改为batch模式,查询的时候只能使用streaming模式。
2、主键表在进行update或者delete的时候要添加where,where条件之后必须要包含所有的主键字段,缺一不可。
还没有评论,来说两句吧...