4 个回答
元数据扩展标记(Metadata Tagging)
-- 写入时附加标记到Commit信息
INSERT INTO user_behavior /*+ OPTIONS('commit.user'='admin', 'commit.tags'='source=kafka,env=prod') */
SELECT user_id, action FROM streampark_source;
-- 查看带特定标签的Commit记录
SELECT * FROM `user_behavior$commits`
WHERE commit_tags LIKE '%env=prod%';
发布于:1周前 (05-27) IP属地:
分区标记(Partition-Level Tags)
-- 按业务单元+数据时效分区
CREATE TABLE sales (
product_id STRING,
amount DOUBLE
) PARTITIONED BY (business_unit STRING, data_freshness STRING)
WITH (
'partition.tag.business_unit' = 'europe,asia', -- 预定义允许值
'partition.tag.data_freshness' = 'hot,warm,cold'
);
-- 插入时指定分区标签
INSERT INTO sales PARTITION (business_unit='asia', data_freshness='hot')
SELECT product_id, amount FROM temp_orders;
-- 根据标签清理冷数据
ALTER TABLE sales DROP PARTITION (data_freshness='cold');
发布于:1周前 (05-27) IP属地:
动态列标记(Row-Level Tags)
-- 写入时动态添加标记列(如数据质量标记)
INSERT INTO user_behavior
SELECT
user_id,
action,
CASE WHEN user_id IS NULL THEN 'invalid' ELSE 'valid' END AS data_quality_tag
FROM kafka_source;
-- 查询时基于标签过滤
SELECT * FROM user_behavior WHERE data_quality_tag = 'valid';
发布于:1周前 (05-27) IP属地:
表属性标记(Table-Level Tags)
-- 创建表时定义业务标签(部门+数据分类)
CREATE TABLE user_behavior (
user_id BIGINT,
action STRING
) WITH (
'tag.department' = 'marketing',
'tag.data-class' = 'PII',
'comment' = '用户点击行为日志,包含敏感信息'
);
-- 修改表标签
ALTER TABLE user_behavior SET ('tag.data-class' = 'PII,behavior');
-- 查询过滤:通过INFORMATION_SCHEMA检索带特定标签的表
SELECT table_name FROM INFORMATION_SCHEMA.TABLES
WHERE table_options LIKE '%tag.department=marketing%';
发布于:1周前 (05-27) IP属地:
我来回答
您需要 登录 后回答此问题!