最近在演示spark相关的内容,所以这里单独出一个案例,使用scala编写一个kafka的生产者和消费者的代码案例,直接上代码:
生产者代码示例:
package com.kafka.producer import com.alibaba.fastjson.JSON import com.alibaba.fastjson.serializer.{SerializeConfig, SerializeFilter, SerializerFeature} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import java.util.Properties object KafkaProducer { def main(args: Array[String]): Unit = { //初始化kafka的配置 val props = new Properties() props.put("bootstrap.servers", "192.168.31.20:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") //创建kafka生产者 val producer = new KafkaProducer[String, String](props) //调用生成数据,产生信息 val content: String = generatorUserInfo() //模拟循环发送100次 for( i <- 1 to 100){ producer.send(new ProducerRecord[String, String]("ttest2", content)) } //发送完毕记得关闭客户端 producer.close() } /** * 模拟生成数据,这里由于只是模拟,后续不做主键的需求,所以示例数据是写死的 * @return */ def generatorUserInfo() = { var user = new UserPoJo() user.id = 1 user.name = "田七" user.age = 20 JSON.toJSONString(user,new SerializeConfig(true)) } }
消费者代码示例:
package com.kafka.consumer import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer} import java.util.{Collections, Properties} object KafkaConsumer { def main(args: Array[String]): Unit = { // 初始化kafka配置 val prop = new Properties prop.put("bootstrap.servers", "192.168.31.20:9092") // 指定消费者组 prop.put("group.id", "group1") // 指定消费位置: earliest/latest/none prop.put("auto.offset.reset", "earliest") // 指定消费的key/value的反序列化方式 prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") //设置自动ack prop.put("enable.auto.commit", "true") //设置过期时间 prop.put("session.timeout.ms", "30000") // 创建Consumer实例 val kafkaConsumer = new KafkaConsumer[String, String](prop) // 订阅topic kafkaConsumer.subscribe(Collections.singletonList("ttest2")) // 开始消费数据 while (true) { // 如果Kafak中没有消息,会隔timeout这个值读一次。比如上面代码设置了2秒,也是就2秒后会查一次。 // 如果Kafka中还有消息没有消费的话,会马上去读,而不需要等待。 val msgs: ConsumerRecords[String, String] = kafkaConsumer.poll(2000) val it = msgs.iterator() //读取回来是一个批量的,所以需要虚幻解析结果 while (it.hasNext) { val msg = it.next() println(msg.toString) println(s"partition: ${msg.partition()}, offset: ${msg.offset()}, key: ${msg.key()}, value: ${msg.value()}") } } } }
最后运行下看看效果:
最后记得在maven里面引入相关的依赖:
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.google.collections</groupId> <artifactId>google-collections</artifactId> <version>1.0</version> </dependency> <!-- Scala 包--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.12.14</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-compiler</artifactId> <version>2.12.14</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-reflect</artifactId> <version>2.12.14</version> </dependency> <!-- 这里由于是spark的程序,所以引入的是spark kafka,如果不用spark,则引入对应的包即可 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency>
还没有评论,来说两句吧...