如何将Table API与SQL结合使用?

提问者:帅平 问题分类:大数据
如何将Table API与SQL结合使用?
2 个回答
有你我就幸福
有你我就幸福
用Table API定义表,通过SQL查询
DataStream<Tuple2<String, Integer>> inputDS = env.addSource(...);
// 通过Table API注册为临时表
Table inputTable = tableEnv.fromDataStream(inputDS, "user, cnt");
tableEnv.createTemporaryView("input_table", inputTable);
// 通过SQL查询
Table sqlResult = tableEnv.sqlQuery(
    "SELECT user, SUM(cnt) as total FROM input_table GROUP BY user"
);
发布于:2周前 (05-29) IP属地:
人潮似海遇见你°
人潮似海遇见你°
用SQL创建表,通过Table API操作
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 通过SQL创建源表
tableEnv.executeSql(
    "CREATE TABLE kafka_source (" +
    "  user STRING, " +
    "  cnt INT" +
    ") WITH ('connector' = 'kafka', ...)"
);
// 通过Table API查询并过滤数据
Table resultTable = tableEnv.from("kafka_source")
    .filter("cnt > 100")
    .select("user, cnt");
// 转换为DataStream
DataStream<Row> resultDS = tableEnv.toAppendStream(resultTable, Row.class);
发布于:2周前 (05-29) IP属地:
我来回答