Paimon与Flink的集成方式有哪些?

提问者:帅平 问题分类:面试刷题
Paimon与Flink的集成方式有哪些?
2 个回答
素梦瑾然
素梦瑾然
Paimon与Flink集成的方式有:
1、使用Flink sql作为 Sink 表写入,例如:
CREATE TABLE paimon_orders (
    order_id STRING PRIMARY KEY,
    amount    DECIMAL(10,2),
    dt        STRING
) WITH (
    'connector' = 'paimon',
    'path'      = 's3://bucket/paimon/orders',
    'bucket'    = '4' -- 分4个桶
);
-- 从Kafka实时写入
INSERT INTO paimon_orders 
SELECT order_id, amount, DATE_FORMAT(proc_time, 'yyyy-MM-dd') 
FROM kafka_source;
2、使用Flink sql作为source进行读取,例如:
-- 流式读取增量数据(默认读取最新快照)
SELECT * FROM paimon_orders /*+ OPTIONS('scan.mode'='latest') */;
-- 批量读取历史快照(时间旅行)
SELECT * FROM paimon_orders FOR SYSTEM_TIME AS OF TIMESTAMP '2023-10-01 12:00:00';
发布于:2周前 (05-26) IP属地:
吹南风
吹南风
3、使用DataStream API写入paimon
DataStream<RowData> stream = ...; // 来自Kafka的流
Configuration conf = new Configuration();
conf.setString("path", "s3://bucket/paimon/orders");
conf.setInteger("bucket", 4);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
    "CREATE TABLE paimon_orders (...) WITH ('connector'='paimon', ...)");
// 转换为Table并写入
Table table = tEnv.fromDataStream(stream, Schema.define(...));
table.executeInsert("paimon_orders");
4、使用DataStream API读取paimon
Table paimonTable = tEnv.from("paimon_orders");
DataStream<Row> stream = tEnv.toDataStream(paimonTable);
stream.print();
发布于:2周前 (05-26) IP属地:
我来回答