前面我们介绍了使用spark近实时写把kafka的数据写入iceberg,本文的话,我们使用flink sql的形式来把kafka的数据近实时写入到iceberg里面去。
在flink中,这种写入其实主要是依赖于kafka connector,然后使用insert into xxxx select这样的语法来实现,下面直接上代码:
package com.flink.iceberg.demo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * 演示读取kafka的数据,然后使用flink sql的形式把数据插入到iceberg */ public class KafkaFlinkSqlInsertIceberg { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env); env.enableCheckpointing(1000); //将模式设置为精确一次 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //设置两次检查点之间的时间间隔是0.5秒 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //设置检查点必须在1分钟内完成,如果没完成,则抛弃掉 env.getCheckpointConfig().setCheckpointTimeout(60000); //设置同一个时间只允许一个检查点在工作 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //开启在作业终止后仍然保留外部检查点 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //允许在有更近保存点时间退到检查点 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); /** * 1.需要预先创建 Catalog 及Iceberg表 */ //1.创建Catalog tblEnv.executeSql("CREATE CATALOG flink_cal_2 WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://node1:9000/flink_iceberg1')"); //2.使用创建的Catalog // tblEnv.useCatalog("flink_cal_2"); //3.创建数据库 // tblEnv.executeSql("create database test4"); //4.使用数据库 // tblEnv.useDatabase("test4"); //5.创建iceberg表 flink_iceberg_tbl // tblEnv.executeSql("create table flink_cal_2.test4.users(id int,name string,age int)"); //6.创建 Kafka Connector,连接消费Kafka中数据 tblEnv.executeSql("create table kafka_input_table(" + " id int," + " name varchar," + " age int" + ") with (" + " 'connector' = 'kafka'," + " 'topic' = 'test20'," + " 'properties.bootstrap.servers'='192.168.31.20:9092'," + " 'scan.startup.mode'='latest-offset'," + " 'properties.group.id' = 'flink-iceberg'," + " 'format' = 'json'" + ")"); //7.配置 table.dynamic-table-options.enabled Configuration configuration = tblEnv.getConfig().getConfiguration(); // 支持SQL语法中的 OPTIONS 选项 configuration.setBoolean("table.dynamic-table-options.enabled", true); //8.写入数据到表 flink_cal_2.test2.users tblEnv.executeSql("insert into flink_cal_2.test4.users select id,name,age from kafka_input_table"); //9.查询表数据 TableResult tableResult = tblEnv.executeSql("select * from flink_cal_2.test4.users /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/"); //10.显示结果数据 tableResult.print(); } }
此时我们首先把这个flink job运行起来,然后用前面scala写的kafka producer发送数据,就可以看到flink job的控制台查询出来了相关的表数据,示例图如下:
使用flink sql目前在大数据很多工作中让我们简化了很多工作量,所以大家可以多尝试下。
最后按照惯例,附上本案例的源码,登陆后即可下载。
还没有评论,来说两句吧...