上文《clickHouse基础系列(十八)ClickHouse java调用实战》我们介绍了使用java把数据写入到clickhouse里面去,但是在大数据的场景里面,目前flink的使用频率是非常高的。所以我们再来演示一下使用flink sql的方式把数据写入到clickhouse里面去
public class FlinkToClickHouse { public static void main(String[] args) throws Exception { // 创建env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 env.setParallelism(10); // 创建TableEnvironment EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env, settings); // 注册ClickHouse Catalog String catalogName = "clickhouse_catalog"; String defaultDatabase = "test"; Map<String, String> properties = new HashMap<>(); properties.put("url", "jdbc:clickhouse://192.168.31.254:8123/"); properties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver"); properties.put("username", "default"); properties.put("password", ""); tEnv.registerCatalog(catalogName, new ClickHouseCatalogFactory(), properties); tEnv.useCatalog(catalogName); // 定义输入表格 String inputDDL = "CREATE TABLE source (\n" + " id INT,\n" + " name STRING\n" + ") WITH (\n" + " 'connector' = 'filesystem',\n" + " 'path' = '/path/to/input'\n" + ");"; tEnv.executeSql(inputDDL); // 定义目标表格 String outputDDL = "CREATE TABLE sink (\n" + " id INT,\n" + " name STRING\n" + ") WITH (\n" + " 'connector' = 'clickhouse',\n" + " 'database-name' = '" + defaultDatabase + "',\n" + " 'table-name' = 'targetTable'\n" + ");"; tEnv.executeSql(outputDDL); // 查询并写入结果 String query = "INSERT INTO sink SELECT * FROM source;"; tEnv.sqlUpdate(query); // 提交任务 tEnv.execute("Write to ClickHouse"); } }
这里我们需要引入的maven依赖主要是:
<dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.6.0</version> </dependency>
发表评论