本文的话,我们再介绍下使用使用spark实现读取hive的数据,然后把数据写入到kafka,直接上代码:
val spark = SparkSession.builder()
.appName("HiveToKafka")
.config("spark.sql.warehouse.dir", "hdfs://xxxx/user/hive/warehouse")
.config("hive.metastore.uris", "thrift://xxxx:9083")
.config(conf)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val syncDate=args(0)
val kafkaBrokers = "***********"
val df1 = spark.sql(
s"""
|select * from
|database.table
|where pdate= '$syncDate'
|""".stripMargin)
.toJSON
.toDF("value")
// 定义Kafka配置
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaBrokers) // Kafka集群地址
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// 将DataFrame发送到Kafka
df1.foreachPartition { (partition:Iterator[Row]) =>
val producer = new KafkaProducer[String, String](props)
partition.foreach { row =>
val value = row.getAs[String]("value")
val record = new ProducerRecord[String, String]("target_topic", value)
producer.send(record,new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if(exception == null) {
System.out.println("partition: " + metadata.partition() + " offset: " + metadata.offset());
}
else {
exception.printStackTrace();
}
}
})
}
producer.close()
}最后记得在maven中添加如下的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency>









还没有评论,来说两句吧...