如何在FlinkSQL中配置和使用Kafka连接器?详细说明其参数。

提问者:帅平 问题分类:面试刷题
如何在FlinkSQL中配置和使用Kafka连接器?详细说明其参数。
1 个回答
有个笨蛋住进我心
有个笨蛋住进我心
数据源source示例如下:
CREATE TABLE kafka_source (
  user_id STRING,
  event_time TIMESTAMP(3),
  log_message STRING,
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',          -- 固定为'kafka'
  'topic' = 'input_topic',        -- Kafka主题名称
  'properties.bootstrap.servers' = 'localhost:9092',  -- Kafka集群地址
  'properties.group.id' = 'flink_group',               -- 消费者组ID
  'format' = 'json',             -- 数据格式(如JSON、CSV)
  'scan.startup.mode' = 'earliest-offset',  -- 起始偏移量(earliest/latest)
  'scan.startup.specific-offsets' = '0,1000',     -- 指定分区偏移量(可选)
  'scan.partition-count' = '3'    -- 并行消费的分区数
);
数据写入sink示例如下:
[code:html]CREATE TABLE kafka_sink (
processed_data STRING
) WITH (
'connector' = 'kafka',
'topic' = 'output_topic',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json', -- 数据格式
'sink.parallelism' = '2', -- 写入并行度
'sink.delivery-guarantee' = 'at-least-once' -- 保证语义(at-least-
发布于:1周前 (05-28) IP属地:
我来回答