2 个回答
Lookup Join(查询维表Join)用于将流数据与静态或缓慢变化的维度表(如用户信息、产品详情)进行关联,以丰富流事件的业务属性。其核心实现依赖于广播状态(Broadcast State)或异步外部查询。
1、广播状态(Broadcast State):将维度表数据广播到所有并行任务节点,流数据在本地直接匹配广播状态,避免网络传输开销。适用场景是维度表较小(通常小于100MB),且更新频率低(如每日更新)。使用示例如下:
1)定义维度表
1、广播状态(Broadcast State):将维度表数据广播到所有并行任务节点,流数据在本地直接匹配广播状态,避免网络传输开销。适用场景是维度表较小(通常小于100MB),且更新频率低(如每日更新)。使用示例如下:
1)定义维度表
CREATE TABLE dim_product (
product_id STRING,
product_name STRING,
price DOUBLE
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'product',
'broadcast' = 'true' -- 标记为广播表
);
2、执行Join操作SELECT
o.order_id,
o.user_id,
p.product_name
FROM orders o
JOIN dim_product p
ON o.product_id = p.product_id;
发布于:1周前 (05-29) IP属地:
2、异步外部查询(Async I/O):当维度表较大或存储在外部系统(如Redis、MySQL)时,通过异步IO实时查询维度数据。适用场景是维度表无法广播(数据量过大),且需实时获取最新状态。具体实现示例如下:
1)定义异步连接器
1)定义异步连接器
CREATE TABLE dim_user (
user_id STRING,
user_name STRING,
address STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'user',
'lookup.async' = 'true', -- 启用异步查询
'lookup.fetch-size' = '1000' -- 每次查询批量大小
);
2)执行join操作SELECT
o.order_id,
u.user_name
FROM orders o
JOIN dim_user u
ON o.user_id = u.user_id;
发布于:1周前 (05-29) IP属地:
我来回答
您需要 登录 后回答此问题!