3 个回答
结合ReduceFunction或AggregateFunction实现增量聚合,减少状态开销。
CREATE TABLE top3_products (
product_id STRING,
click_count BIGINT,
window_start TIMESTAMP(3)
) WITH (...);
INSERT INTO top3_products
SELECT
product_id,
SUM(click_count) AS total_clicks,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start
FROM clickstream
GROUP BY
product_id,
TUMBLE(event_time, INTERVAL '1' MINUTE)
HAVING SUM(click_count) IN (
SELECT TOP3_CLICKS(product_id, SUM(click_count))
FROM clickstream
GROUP BY product_id
);
发布于:4天前 IP属地:
对全量数据排序后取前N条,适用于实时性要求低但需全局一致性的场景。
SELECT
product_id,
click_count
FROM (
SELECT
product_id,
COUNT(*) AS click_count,
ROW_NUMBER() OVER (ORDER BY COUNT(*) DESC) AS rn
FROM clickstream
GROUP BY product_id
)
WHERE rn <= 5;
发布于:4天前 IP属地:
通过TUMBLE或HOP窗口定义时间范围,结合ROW_NUMBER()函数排序并筛选前N条记录。
SELECT
product_id,
click_count,
window_start
FROM (
SELECT
product_id,
COUNT(*) AS click_count,
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
ROW_NUMBER() OVER (PARTITION BY TUMBLE_START(event_time, INTERVAL '1' HOUR)
ORDER BY COUNT(*) DESC) AS rn
FROM clickstream
GROUP BY
product_id,
TUMBLE(event_time, INTERVAL '1' HOUR)
)
WHERE rn <= 3;
发布于:4天前 IP属地:
我来回答
您需要 登录 后回答此问题!