这里写一个kafka生产者的代码示例,方便后期拿来即可使用。
1、pom.xml中引入kafka的依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.0</version> </dependency>
2、编写kafka生产者producer的代码
package com.big.data.kafka;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FlinkKafkaProducer {
private static final String TOPIC = "t_wordcount";
private static final String BROKER_LIST = "192.168.31.20:9092";
/**
* 消息发送确认 0,只要消息提交到消息缓冲,就视为消息发送成功 1,只要消息发送到分区Leader且写入磁盘,就视为消息发送成功
* all,消息发送到分区Leader且写入磁盘,同时其他副本分区也同步到磁盘,才视为消息发送成功
*/
private static final String ACKS_CONFIG = "1";
/**
* 缓存消息数达到此数值后批量提交
*/
private static final String BATCH_SIZE_CONFIG = "1000";
private static KafkaProducer<String, String> producer;
static {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
properties.setProperty(ProducerConfig.ACKS_CONFIG, ACKS_CONFIG);
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, BATCH_SIZE_CONFIG);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<String, String>(properties);
}
public void send() {
try {
// 等待启动日志打印完后再发送消息
String message = "this is a test";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
producer.send(record, (recordMetadata, e) -> {
if (e != null) {
System.out.println("发送消息异常!");
}
if (recordMetadata != null) {
// topic 下可以有多个分区,每个分区的消费者维护一个 offset
log.info("消息发送成功:{} - {}", recordMetadata.partition(), recordMetadata.offset());
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != producer) {
producer.close();
}
}
}
public static void main(String[] args) {
FlinkKafkaProducer flinkKafkaProducer = new FlinkKafkaProducer();
flinkKafkaProducer.send();
}
}








还没有评论,来说两句吧...