1 个回答
数据源source示例如下:
[code:html]CREATE TABLE kafka_sink (
processed_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'output_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json', -- 数据格式
'sink.parallelism' = '2', -- 写入并行度
'sink.delivery-guarantee' = 'at-least-once' -- 保证语义(at-least-
CREATE TABLE kafka_source (
user_id STRING,
event_time TIMESTAMP(3),
log_message STRING,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka', -- 固定为'kafka'
'topic' = 'input_topic', -- Kafka主题名称
'properties.bootstrap.servers' = 'localhost:9092', -- Kafka集群地址
'properties.group.id' = 'flink_group', -- 消费者组ID
'format' = 'json', -- 数据格式(如JSON、CSV)
'scan.startup.mode' = 'earliest-offset', -- 起始偏移量(earliest/latest)
'scan.startup.specific-offsets' = '0,1000', -- 指定分区偏移量(可选)
'scan.partition-count' = '3' -- 并行消费的分区数
);
数据写入sink示例如下:[code:html]CREATE TABLE kafka_sink (
processed_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'output_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json', -- 数据格式
'sink.parallelism' = '2', -- 写入并行度
'sink.delivery-guarantee' = 'at-least-once' -- 保证语义(at-least-
发布于:1周前 (05-28) IP属地:
我来回答
您需要 登录 后回答此问题!