FlinkSQL与DataStream API如何互操作?

提问者:帅平 问题分类:大数据
FlinkSQL与DataStream API如何互操作?
2 个回答
肺少女
肺少女
Flink SQL → DataStream的互操作如下:
1、通过SQL定义数据源,供DataStream使用
// 定义SQL表(例如Kafka源)
tableEnv.executeSql(
    "CREATE TABLE kafka_source (" +
    "  user STRING, " +
    "  cnt INT" +
    ") WITH (...)"
);
// 转换为DataStream
Table table = tableEnv.sqlQuery("SELECT * FROM kafka_source");
DataStream<Tuple2<String, Integer>> ds = tableEnv.toAppendStream(table, Row.class);
2、结合DataStream API增强逻辑
// SQL预处理数据(过滤)
Table filteredTable = tableEnv.sqlQuery("SELECT * FROM kafka_source WHERE cnt > 100");
DataStream<Row> filteredDS = tableEnv.toAppendStream(filteredTable);
// 使用DataStream API做窗口聚合
DataStream<Tuple2<String, Integer>> windowedDS = filteredDS
    .map(row -> Tuple2.of(row.getField(0), row.getField(1)))
    .keyBy(value -> value.f0)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .sum(1);
发布于:2周前 (05-29) IP属地:
曾经多难忘
曾经多难忘
DataStream → Flink SQL的互操作如下:
1、注册DataStream为表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 创建DataStream(例如Kafka数据)
DataStream<Tuple2<String, Integer>> inputDS = env.addSource(...);
// 注册为表(需定义Schema)
tableEnv.createTemporaryView("kafka_source", inputDS, 
    Schema.newBuilder()
        .column("user", DataTypes.STRING())
        .column("cnt", DataTypes.INT())
        .build());
2、通过SQL操作DataStream数据
-- SQL查询并过滤数据
SELECT user, SUM(cnt) as total 
FROM kafka_source 
GROUP BY user;
3、将SQL结果转回DataStream
// 执行SQL并获取结果流
TableResult result = tableEnv.executeSql(
    "SELECT user, SUM(cnt) as total FROM kafka_source GROUP BY user");
// 转换为DataStream(需指定类型)
DataStream<Row> resultDS = tableEnv.toAppendStream(result, Row.class);
resultDS.print();
发布于:2周前 (05-29) IP属地:
我来回答