2 个回答
如果是使用kafka做事务消息,实现分布式事务的话,示例代码如下:
@Bean
public KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(producerFactory());
}
@Transactional
public void processOrder(Order order) {
paymentService.charge(order);
kafkaTemplate.send("payment-topic", order.getId(), order.toJson());
inventoryService.reduceStock(order);
}
发布于:1个月前 (03-21) IP属地:四川省
采用同步发送+重试的策略即可,示例代码如下:
@Bean
public KafkaTemplate<String, String> reliableKafkaTemplate() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.ACKS_CONFIG, "all"); // 必须所有副本确认
configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 防止乱序
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));
}
// 发送模板
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("order-topic", key, value);
future.get(10, TimeUnit.SECONDS); // 同步等待确认
发布于:1个月前 (03-21) IP属地:四川省
我来回答
您需要 登录 后回答此问题!