如何在FlinkSQL中实现高性能流式去重(Deduplication)?

提问者:帅平 问题分类:面试刷题
如何在FlinkSQL中实现高性能流式去重(Deduplication)?
3 个回答
待我幼稚完
待我幼稚完
基于布隆过滤器(Bloom Filter)的去重,适合大规模数据
-- 创建布隆过滤器表
CREATE TABLE bloom_filter (
    user_id STRING,
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'input_topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'sink.parallelism' = '1'
);
-- 使用布隆过滤器去重
INSERT INTO deduplicated_data
SELECT 
    user_id, 
    event_time, 
    data
FROM clickstream
WHERE NOT EXISTS (
    SELECT 1 
    FROM bloom_filter 
    WHERE user_id = clickstream.user_id 
      AND event_time >= clickstream.event_time - INTERVAL '5' SECOND
);
发布于:4天前 IP属地:
一鹿有晗
一鹿有晗
在时间窗口内去重,适用于允许一定时间延迟的场景。
-- 按用户ID和时间窗口去重
SELECT 
    user_id, 
    event_time, 
    data
FROM (
    SELECT 
        *, 
        ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY event_time DESC) AS rn
    FROM clickstream
) 
WHERE rn = 1;
发布于:4天前 IP属地:
我是小样
我是小样
利用Flink的Keyed State存储已处理数据的键,确保每条数据只处理一次。
-- 定义Keyed State(需Java/Scala实现)
public class DeduplicateState {
    private ValueState<Boolean> processedState;
    @Override
    public void open(Configuration parameters) {
        processedState = getRuntimeContext().getState(new ValueStateDescriptor<>("processed", Boolean.class));
    }
    public boolean isProcessed(String key) {
        return processedState.value() != null && processedState.value();
    }
    public void markProcessed(String key) {
        processedState.update(true);
    }
}
发布于:4天前 IP属地:
我来回答