4 个回答
Flink中水位线(Watermark)的生成策略有:
1、单调递增时间戳,适用于数据流中事件时间严格有序(无乱序),每个新事件的时间戳都比前一个大。示例
1、单调递增时间戳,适用于数据流中事件时间严格有序(无乱序),每个新事件的时间戳都比前一个大。示例
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forMonotonousTimestamps()
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
2、允许固定乱序,适用于数据流存在有限程度的乱序(如最多延迟5秒),需平衡延迟与准确性。WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
发布于:8小时前 IP属地:
5、断点式水位线,适用于数据流中存在特殊标记事件(如心跳事件),需基于这些事件显式推进水位线。
public class PunctuatedWatermarkGenerator implements WatermarkGenerator<Event> {
@Override
public void onEvent(Event event, long timestamp, WatermarkOutput output) {
if (event.isHeartbeat()) {
// 收到心跳事件时生成水位线
output.emitWatermark(new Watermark(timestamp - 5000)); // 允许5秒乱序
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 无需周期性生成
}
}
// 注册断点式生成器
WatermarkStrategy<Event> strategy = WatermarkStrategy
.forGenerator(ctx -> new PunctuatedWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
6、自适应水位线,适用于数据流的乱序程度动态变化(如网络波动),需自动调整延迟阈值。发布于:7小时前 IP属地:
4、处理时间水位线,适用于无需处理乱序,直接基于处理时间(系统时间)推进窗口,延迟最低但无法处理事件时间乱序。
// 显式使用处理时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 或通过WatermarkStrategy(Flink 1.11+)
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forGenerator(ctx -> new PunctuatedGenerator())
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis());
发布于:7小时前 IP属地:
3、自定义水位线生成器,适用于需要动态调整延迟或基于数据特征生成水位线(如特定事件触发水位线推进)。
public class DynamicWatermarkGenerator implements WatermarkGenerator<Event> {
private long maxTimestamp;
private final long outOfOrderness = 5000; // 允许5秒乱序
@Override
public void onEvent(Event event, long timestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, timestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrderness - 1));
}
}
// 在数据流中使用
WatermarkStrategy<Event> strategy = WatermarkStrategy
.forGenerator(ctx -> new DynamicWatermarkGenerator())
.withTimestampAssigner((event, timestamp) -> event.getEventTime());
发布于:7小时前 IP属地:
我来回答
您需要 登录 后回答此问题!