2 个回答
大致分为以下几个步骤
1、设置时间语义
1、设置时间语义
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 使用事件时间(需分配时间戳和生成 Watermark)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2、分配时间戳和watermarkDataStream<Event> dataStream = env.addSource(kafkaSource).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event, timestamp) -> event.getTimestamp()));
3、按键区分KeyedStream<Event, String> keyedStream = dataStream.keyBy(event -> event.getUserId());
4、定义窗口分配器// 滚动事件时间窗口(5秒)
WindowedStream<Event, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
发布于:24分钟前 IP属地:
5、应用窗口函数
// 使用 reduce 函数(输入和输出类型相同)
SingleOutputStreamOperator<Event> result = windowedStream.reduce((e1, e2) -> new Event(e1.getUserId(), e1.getValue() + e2.getValue()));
// 使用 aggregate 函数(更灵活,需自定义 AggregateFunction)
SingleOutputStreamOperator<Double> avgResult = windowedStream.aggregate(new AverageAggregateFunction());
6、输出结果result.print();
env.execute("Window Operation Example");
发布于:23分钟前 IP属地:
我来回答
您需要 登录 后回答此问题!