日常工作中直接读取kafka的数据写入doris也是一个非常常见的场景,下面演示下使用spark读取kafka的数据写入doris。直接上代码:
val kafkaSource = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "****************") .option("subscribe", sourceTopic) .option("failOnDataLoss",false) //如果读取数据源时,发现数据突然缺失,比如被删,则是否马上抛出异常 .option("startingOffsets","earliest") //第一次消费时,读取kafka数据的位置 .load() kafkaSource .selectExpr("CAST(value as STRING)") .writeStream .format("doris") .option("checkpointLocation", "hdfs://xxxx/user/kafka/ckp") .option("doris.table.identifier","xxxx.xxxx") .option("doris.fenodes", "xxxxx:xxxx") .option("user", "xxxx") .option("password", "****") .option("max_filter_ratio","0.1") // 设置该选项可以将 Kafka 消息中的 value 列不经过处理直接写入 .option("doris.sink.streaming.passthrough", "true") .option("doris.sink.properties.format", "json") .option("doris.write.fields", "column1,column2,column2,....") // 其他选项 .start() .awaitTermination()
最后记得在maven依赖中添加:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.0</version> </dependency>
还没有评论,来说两句吧...