2 个回答
Paimon与Flink集成的方式有:
1、使用Flink sql作为 Sink 表写入,例如:
1、使用Flink sql作为 Sink 表写入,例如:
CREATE TABLE paimon_orders (
order_id STRING PRIMARY KEY,
amount DECIMAL(10,2),
dt STRING
) WITH (
'connector' = 'paimon',
'path' = 's3://bucket/paimon/orders',
'bucket' = '4' -- 分4个桶
);
-- 从Kafka实时写入
INSERT INTO paimon_orders
SELECT order_id, amount, DATE_FORMAT(proc_time, 'yyyy-MM-dd')
FROM kafka_source;
2、使用Flink sql作为source进行读取,例如:-- 流式读取增量数据(默认读取最新快照)
SELECT * FROM paimon_orders /*+ OPTIONS('scan.mode'='latest') */;
-- 批量读取历史快照(时间旅行)
SELECT * FROM paimon_orders FOR SYSTEM_TIME AS OF TIMESTAMP '2023-10-01 12:00:00';
发布于:2周前 (05-26) IP属地:
3、使用DataStream API写入paimon
DataStream<RowData> stream = ...; // 来自Kafka的流
Configuration conf = new Configuration();
conf.setString("path", "s3://bucket/paimon/orders");
conf.setInteger("bucket", 4);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
"CREATE TABLE paimon_orders (...) WITH ('connector'='paimon', ...)");
// 转换为Table并写入
Table table = tEnv.fromDataStream(stream, Schema.define(...));
table.executeInsert("paimon_orders");
4、使用DataStream API读取paimonTable paimonTable = tEnv.from("paimon_orders");
DataStream<Row> stream = tEnv.toDataStream(paimonTable);
stream.print();
发布于:2周前 (05-26) IP属地:
我来回答
您需要 登录 后回答此问题!