java中向kafka中发送数据,如何保证数据不丢失?

提问者:帅平 问题分类:消息队列
java中向kafka中发送数据,如何保证数据不丢失?
2 个回答
你以为我的心是不锈钢么
你以为我的心是不锈钢么
如果是使用kafka做事务消息,实现分布式事务的话,示例代码如下:
@Bean
public KafkaTransactionManager<String, String> transactionManager() {
    return new KafkaTransactionManager<>(producerFactory());
}
@Transactional
public void processOrder(Order order) {
    paymentService.charge(order);
    kafkaTemplate.send("payment-topic", order.getId(), order.toJson());
    inventoryService.reduceStock(order); 
}
发布于:1个月前 (03-21) IP属地:四川省
深海少女心
深海少女心
采用同步发送+重试的策略即可,示例代码如下:
@Bean
public KafkaTemplate<String, String> reliableKafkaTemplate() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(ProducerConfig.ACKS_CONFIG, "all"); // 必须所有副本确认
    configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
    configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 防止乱序
    configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等
    return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));
}
// 发送模板
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("order-topic", key, value);
future.get(10, TimeUnit.SECONDS); // 同步等待确认
发布于:1个月前 (03-21) IP属地:四川省
我来回答