前面我们介绍了使用spark近实时写把kafka的数据写入iceberg,本文的话,我们使用flink sql的形式来把kafka的数据近实时写入到iceberg里面去。
在flink中,这种写入其实主要是依赖于kafka connector,然后使用insert into xxxx select这样的语法来实现,下面直接上代码:
package com.flink.iceberg.demo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 演示读取kafka的数据,然后使用flink sql的形式把数据插入到iceberg
*/
public class KafkaFlinkSqlInsertIceberg {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tblEnv = StreamTableEnvironment.create(env);
env.enableCheckpointing(1000);
//将模式设置为精确一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置两次检查点之间的时间间隔是0.5秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//设置检查点必须在1分钟内完成,如果没完成,则抛弃掉
env.getCheckpointConfig().setCheckpointTimeout(60000);
//设置同一个时间只允许一个检查点在工作
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//开启在作业终止后仍然保留外部检查点
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//允许在有更近保存点时间退到检查点
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
/**
* 1.需要预先创建 Catalog 及Iceberg表
*/
//1.创建Catalog
tblEnv.executeSql("CREATE CATALOG flink_cal_2 WITH (" + "'type'='iceberg'," + "'catalog-type'='hadoop'," + "'warehouse'='hdfs://node1:9000/flink_iceberg1')");
//2.使用创建的Catalog
// tblEnv.useCatalog("flink_cal_2");
//3.创建数据库
// tblEnv.executeSql("create database test4");
//4.使用数据库
// tblEnv.useDatabase("test4");
//5.创建iceberg表 flink_iceberg_tbl
// tblEnv.executeSql("create table flink_cal_2.test4.users(id int,name string,age int)");
//6.创建 Kafka Connector,连接消费Kafka中数据
tblEnv.executeSql("create table kafka_input_table(" + " id int," + " name varchar," + " age int" + ") with (" + " 'connector' = 'kafka'," + " 'topic' = 'test20'," + " 'properties.bootstrap.servers'='192.168.31.20:9092'," + " 'scan.startup.mode'='latest-offset'," + " 'properties.group.id' = 'flink-iceberg'," + " 'format' = 'json'" + ")");
//7.配置 table.dynamic-table-options.enabled
Configuration configuration = tblEnv.getConfig().getConfiguration();
// 支持SQL语法中的 OPTIONS 选项
configuration.setBoolean("table.dynamic-table-options.enabled", true);
//8.写入数据到表 flink_cal_2.test2.users
tblEnv.executeSql("insert into flink_cal_2.test4.users select id,name,age from kafka_input_table");
//9.查询表数据
TableResult tableResult = tblEnv.executeSql("select * from flink_cal_2.test4.users /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/");
//10.显示结果数据
tableResult.print();
}
}此时我们首先把这个flink job运行起来,然后用前面scala写的kafka producer发送数据,就可以看到flink job的控制台查询出来了相关的表数据,示例图如下:
使用flink sql目前在大数据很多工作中让我们简化了很多工作量,所以大家可以多尝试下。
最后按照惯例,附上本案例的源码,登陆后即可下载。










还没有评论,来说两句吧...