上文《大数据实战系列(三)Spark 实现读取hive数据写入kafka》我们实现了利用spark读取hive的数据写入kafka,本文的话我们实现利用spark读取kafka的数据然后写入hive中。直接上代码:
val kafkaDF = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "xxxx") .option("subscribe", topicName) .option("failOnDataLoss",false) //如果读取数据源时,发现数据突然缺失,比如被删,则是否马上抛出异常 .option("fetchOffset.numRetries",3) //获取消息的偏移量时,最多进行的重试次数 .option("maxOffsetsPerTrigger",500) /**用于限流,限定每次读取数据的最大条数,不指定则是as fast as possible*/ .option("startingOffsets","earliest") //第一次消费时,读取kafka数据的位置 .load() import spark.implicits._ val schema:StructType = StructType( Array( StructField("timestamp", StringType, true), StructField("vehicleid", StringType, true), .... ) ) val parsedDF = kafkaDF.selectExpr("CAST(value AS STRING)") .select(from_json($"value", schema) .as("data")) .select($"data.*") // .createOrReplaceTempView("stream_table") val syncDate=args(0) // 使用spark sql 查询 // val df: DataFrame = spark.sql("select * from stream_table where pDate='2024-03-15'") val query = parsedDF.writeStream .queryName("kafka2Hive") .outputMode(OutputMode.Append()) .option("checkpointLocation","hdfs://xxxx/user/hive/ckp")/**用来保存offset,用该目录来绑定对应的offset,如果该目录发生改变则程序运行的id会发生变化,类比group.id的变化,写hive的时候一定不要轻易变动*/ .foreachBatch((df: Dataset[Row], batchId: Long) => { myWriteFun(df, batchId, "database.table",syncDate) }) .trigger(Trigger.ProcessingTime(2000)) .start() def myWriteFun(df: Dataset[Row], batchId: Long, tableName: String,pDate: String): Unit = { println("BatchId" + batchId) if (df.count() != 0) { df.persist() df.write.format("hive").mode(SaveMode.Append).partitionBy(pDate).saveAsTable(tableName) df.unpersist() } }
最后记得在maven中引入如下的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency>
还没有评论,来说两句吧...