上文《大数据实战系列(三)Spark 实现读取hive数据写入kafka》我们实现了利用spark读取hive的数据写入kafka,本文的话我们实现利用spark读取kafka的数据然后写入hive中。直接上代码:
val kafkaDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xxxx")
.option("subscribe", topicName)
.option("failOnDataLoss",false) //如果读取数据源时,发现数据突然缺失,比如被删,则是否马上抛出异常
.option("fetchOffset.numRetries",3) //获取消息的偏移量时,最多进行的重试次数
.option("maxOffsetsPerTrigger",500) /**用于限流,限定每次读取数据的最大条数,不指定则是as fast as possible*/
.option("startingOffsets","earliest") //第一次消费时,读取kafka数据的位置
.load()
import spark.implicits._
val schema:StructType = StructType(
Array(
StructField("timestamp", StringType, true),
StructField("vehicleid", StringType, true),
....
)
)
val parsedDF =
kafkaDF.selectExpr("CAST(value AS STRING)")
.select(from_json($"value", schema) .as("data"))
.select($"data.*")
// .createOrReplaceTempView("stream_table")
val syncDate=args(0)
// 使用spark sql 查询
// val df: DataFrame = spark.sql("select * from stream_table where pDate='2024-03-15'")
val query = parsedDF.writeStream
.queryName("kafka2Hive")
.outputMode(OutputMode.Append())
.option("checkpointLocation","hdfs://xxxx/user/hive/ckp")/**用来保存offset,用该目录来绑定对应的offset,如果该目录发生改变则程序运行的id会发生变化,类比group.id的变化,写hive的时候一定不要轻易变动*/
.foreachBatch((df: Dataset[Row], batchId: Long) => {
myWriteFun(df, batchId, "database.table",syncDate)
})
.trigger(Trigger.ProcessingTime(2000))
.start()
def myWriteFun(df: Dataset[Row], batchId: Long, tableName: String,pDate: String): Unit = {
println("BatchId" + batchId)
if (df.count() != 0) {
df.persist()
df.write.format("hive").mode(SaveMode.Append).partitionBy(pDate).saveAsTable(tableName)
df.unpersist()
}
}最后记得在maven中引入如下的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency>

还没有评论,来说两句吧...