Flink对接pulsar作为数据源输入的source应该如何编写?

提问者:帅平 问题分类:大数据
Flink对接pulsar作为数据源输入的source应该如何编写?
1 个回答
肺少女
肺少女
Flink对接pulsar作为source的代码示例是:
PulsarSource<String> source = PulsarSource.builder()
    .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000);     .setServiceUrl(serviceUrl)
    .setAdminUrl(adminUrl)
    .setStartCursor(StartCursor.earliest())
    .setTopics("my-topic")
    .setDeserializationSchema(new SimpleStringSchema())
    .setSubscriptionName("my-subscription")
    .build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
发布于:6个月前 (04-11) IP属地:四川省
我来回答