日常工作中直接读取kafka的数据写入doris也是一个非常常见的场景,下面演示下使用spark读取kafka的数据写入doris。直接上代码:
val kafkaSource = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "****************")
.option("subscribe", sourceTopic)
.option("failOnDataLoss",false) //如果读取数据源时,发现数据突然缺失,比如被删,则是否马上抛出异常
.option("startingOffsets","earliest") //第一次消费时,读取kafka数据的位置
.load()
kafkaSource
.selectExpr("CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "hdfs://xxxx/user/kafka/ckp")
.option("doris.table.identifier","xxxx.xxxx")
.option("doris.fenodes", "xxxxx:xxxx")
.option("user", "xxxx")
.option("password", "****")
.option("max_filter_ratio","0.1")
// 设置该选项可以将 Kafka 消息中的 value 列不经过处理直接写入
.option("doris.sink.streaming.passthrough", "true")
.option("doris.sink.properties.format", "json")
.option("doris.write.fields", "column1,column2,column2,....")
// 其他选项
.start()
.awaitTermination()最后记得在maven依赖中添加:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.0</version> </dependency>









还没有评论,来说两句吧...