这里写一个kafka生产者的代码示例,方便后期拿来即可使用。
1、pom.xml中引入kafka的依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.0</version> </dependency>
2、编写kafka生产者producer的代码
package com.big.data.kafka; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import lombok.extern.slf4j.Slf4j; @Slf4j public class FlinkKafkaProducer { private static final String TOPIC = "t_wordcount"; private static final String BROKER_LIST = "192.168.31.20:9092"; /** * 消息发送确认 0,只要消息提交到消息缓冲,就视为消息发送成功 1,只要消息发送到分区Leader且写入磁盘,就视为消息发送成功 * all,消息发送到分区Leader且写入磁盘,同时其他副本分区也同步到磁盘,才视为消息发送成功 */ private static final String ACKS_CONFIG = "1"; /** * 缓存消息数达到此数值后批量提交 */ private static final String BATCH_SIZE_CONFIG = "1000"; private static KafkaProducer<String, String> producer; static { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST); properties.setProperty(ProducerConfig.ACKS_CONFIG, ACKS_CONFIG); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE_CONFIG); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<String, String>(properties); } public void send() { try { // 等待启动日志打印完后再发送消息 String message = "this is a test"; ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message); producer.send(record, (recordMetadata, e) -> { if (e != null) { System.out.println("发送消息异常!"); } if (recordMetadata != null) { // topic 下可以有多个分区,每个分区的消费者维护一个 offset log.info("消息发送成功:{} - {}", recordMetadata.partition(), recordMetadata.offset()); } }); } catch (Exception e) { e.printStackTrace(); } finally { if (null != producer) { producer.close(); } } } public static void main(String[] args) { FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer(); flinkKafkaProducer.send(); } }
还没有评论,来说两句吧...