1 个回答
Flink对接pulsar作为source的代码示例是:
PulsarSource<String> source = PulsarSource.builder()
.setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 10000); .setServiceUrl(serviceUrl)
.setAdminUrl(adminUrl)
.setStartCursor(StartCursor.earliest())
.setTopics("my-topic")
.setDeserializationSchema(new SimpleStringSchema())
.setSubscriptionName("my-subscription")
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
发布于:9个月前 (04-11) IP属地:四川省
我来回答
您需要 登录 后回答此问题!