前面我们演示了使用spark操作iceberg的数据,接着我们就来演示下flink操作iceberg数据。flink操作iceberg数据的形式主要有两种,分别是:
1、datastream 2、flink sql
现如今主要有Dinky的加持,所以在日常工作中使用flink sql的形式操作iceberg会更加顺畅一点。这里我们就主要使用flink sql的形式来操作iceberg。整体操作iceberg的顺序和操作mysql是一样的,主要流程如下:
1、创建多数据源目录catalog 2、使用多数据源目录catalog 3、创建database 4、使用database 5、创建table 6、向table插入数据 7、向table查询数据 8、显示查询结果
所以这里演示的话,我们的核心代码如下:
package com.flink.iceberg.demo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class FlinkSqlInsertIceberg { public static void main(String[] args) { //初始化flink相关的环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env); env.enableCheckpointing(1000); //后面的查询主要是演示实时查询,如果要实时查询就要做如下的设置。 Configuration configuration = tblEnv.getConfig().getConfiguration(); // 支持SQL语法中的 OPTIONS 选项 configuration.setBoolean("table.dynamic-table-options.enabled", true); //1.创建多数据源目录Catalog tblEnv.executeSql("CREATE CATALOG flink_cal_1 WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://node1:9000/flink_iceberg')"); //2.使用创建的Catalog tblEnv.useCatalog("flink_cal_1"); //3.创建数据库 // tblEnv.executeSql("create database test1"); //4.使用数据库 // tblEnv.useDatabase("test1"); //5.创建users表结构 // tblEnv.executeSql("create table if not exists flink_cal_1.test1.users(id int,name string,age int,loc string) partitioned by (loc)"); //6.写入数据到users表 tblEnv.executeSql("insert into flink_cal_1.test1.users values (1,'张三',18,'beijing'),(2,'李四',19,'shanghai'),(3,'王五',20,'guangzhou')"); //7.写入之后读取一下,后面添加options主要是为了做实时查询 TableResult tableResult = tblEnv.executeSql("select * from flink_cal_1.test1.users /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/"); //8、把读取的结果显示出来 tableResult.print(); } }
最后我们运行一下这个flink job,示例图如下:
可以看到这里我们插入了数据,并且实时显示出来了。
备注:
1、在flink中只有checkpoint之后,数据才会被commit到sink里面,如果checkpoint失败了,那么就不会commit数据到sink,就无法查询对应的数据。 2、在flink中如果想要近实时的查询数据,就要设置一下conf,并且启用注解的形式来进行查询。注意这里查询结果还是近实时,不是准实时。
最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...