3 个回答
基于布隆过滤器(Bloom Filter)的去重,适合大规模数据
-- 创建布隆过滤器表
CREATE TABLE bloom_filter (
user_id STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'input_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json',
'sink.parallelism' = '1'
);
-- 使用布隆过滤器去重
INSERT INTO deduplicated_data
SELECT
user_id,
event_time,
data
FROM clickstream
WHERE NOT EXISTS (
SELECT 1
FROM bloom_filter
WHERE user_id = clickstream.user_id
AND event_time >= clickstream.event_time - INTERVAL '5' SECOND
);
发布于:4天前 IP属地:
在时间窗口内去重,适用于允许一定时间延迟的场景。
-- 按用户ID和时间窗口去重
SELECT
user_id,
event_time,
data
FROM (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time DESC) AS rn
FROM clickstream
)
WHERE rn = 1;
发布于:4天前 IP属地:
利用Flink的Keyed State存储已处理数据的键,确保每条数据只处理一次。
-- 定义Keyed State(需Java/Scala实现)
public class DeduplicateState {
private ValueState<Boolean> processedState;
@Override
public void open(Configuration parameters) {
processedState = getRuntimeContext().getState(new ValueStateDescriptor<>("processed", Boolean.class));
}
public boolean isProcessed(String key) {
return processedState.value() != null && processedState.value();
}
public void markProcessed(String key) {
processedState.update(true);
}
}
发布于:4天前 IP属地:
我来回答
您需要 登录 后回答此问题!