上文《clickHouse基础系列(十八)ClickHouse java调用实战》我们介绍了使用java把数据写入到clickhouse里面去,但是在大数据的场景里面,目前flink的使用频率是非常高的。所以我们再来演示一下使用flink sql的方式把数据写入到clickhouse里面去
public class FlinkToClickHouse {
public static void main(String[] args) throws Exception {
// 创建env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度
env.setParallelism(10);
// 创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, settings);
// 注册ClickHouse Catalog
String catalogName = "clickhouse_catalog";
String defaultDatabase = "test";
Map<String, String> properties = new HashMap<>();
properties.put("url", "jdbc:clickhouse://192.168.31.254:8123/");
properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver");
properties.put("username", "default");
properties.put("password", "");
tEnv.registerCatalog(catalogName, new ClickHouseCatalogFactory(), properties);
tEnv.useCatalog(catalogName);
// 定义输入表格
String inputDDL = "CREATE TABLE source (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '/path/to/input'\n" +
");";
tEnv.executeSql(inputDDL);
// 定义目标表格
String outputDDL = "CREATE TABLE sink (\n" +
" id INT,\n" +
" name STRING\n" +
") WITH (\n" +
" 'connector' = 'clickhouse',\n" +
" 'database-name' = '" + defaultDatabase + "',\n" +
" 'table-name' = 'targetTable'\n" +
");";
tEnv.executeSql(outputDDL);
// 查询并写入结果
String query = "INSERT INTO sink SELECT * FROM source;";
tEnv.sqlUpdate(query);
// 提交任务
tEnv.execute("Write to ClickHouse");
}
}这里我们需要引入的maven依赖主要是:
<dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.6.0</version> </dependency>

发表评论