本文的话,我们再介绍下使用使用spark实现读取hive的数据,然后把数据写入到kafka,直接上代码:
val spark = SparkSession.builder() .appName("HiveToKafka") .config("spark.sql.warehouse.dir", "hdfs://xxxx/user/hive/warehouse") .config("hive.metastore.uris", "thrift://xxxx:9083") .config(conf) .enableHiveSupport() .getOrCreate() import spark.implicits._ val syncDate=args(0) val kafkaBrokers = "***********" val df1 = spark.sql( s""" |select * from |database.table |where pdate= '$syncDate' |""".stripMargin) .toJSON .toDF("value") // 定义Kafka配置 val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaBrokers) // Kafka集群地址 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // 将DataFrame发送到Kafka df1.foreachPartition { (partition:Iterator[Row]) => val producer = new KafkaProducer[String, String](props) partition.foreach { row => val value = row.getAs[String]("value") val record = new ProducerRecord[String, String]("target_topic", value) producer.send(record,new Callback { override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = { if(exception == null) { System.out.println("partition: " + metadata.partition() + " offset: " + metadata.offset()); } else { exception.printStackTrace(); } } }) } producer.close() }
最后记得在maven中添加如下的依赖:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.2.1</version> </dependency>
还没有评论,来说两句吧...