2 个回答
用Table API定义表,通过SQL查询
DataStream<Tuple2<String, Integer>> inputDS = env.addSource(...);
// 通过Table API注册为临时表
Table inputTable = tableEnv.fromDataStream(inputDS, "user, cnt");
tableEnv.createTemporaryView("input_table", inputTable);
// 通过SQL查询
Table sqlResult = tableEnv.sqlQuery(
"SELECT user, SUM(cnt) as total FROM input_table GROUP BY user"
);
发布于:2周前 (05-29) IP属地:
用SQL创建表,通过Table API操作
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 通过SQL创建源表
tableEnv.executeSql(
"CREATE TABLE kafka_source (" +
" user STRING, " +
" cnt INT" +
") WITH ('connector' = 'kafka', ...)"
);
// 通过Table API查询并过滤数据
Table resultTable = tableEnv.from("kafka_source")
.filter("cnt > 100")
.select("user, cnt");
// 转换为DataStream
DataStream<Row> resultDS = tableEnv.toAppendStream(resultTable, Row.class);
发布于:2周前 (05-29) IP属地:
我来回答
您需要 登录 后回答此问题!