1 个回答
采用手动提交+死信队列的方式,示例代码如下:
@KafkaListener(topics = "order-topic", groupId = "payment-group")
public void listen(
ConsumerRecord<String, String> record,
Acknowledgment ack,
Consumer<String, String> consumer) {
try {
paymentService.process(record.value());
ack.acknowledge(); // 手动提交
} catch (Exception ex) {
// 记录原始消息到死信队列
kafkaTemplate.send("order-dlq", record.key(), record.value());
// 重置偏移量到5秒前
consumer.seek(record.topic(), record.partition(), record.offset() - 1);
}
}
切记要在配置下如下的属性:# 关键配置
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.isolation.level=read_committed
发布于:1个月前 (03-21) IP属地:四川省
我来回答
您需要 登录 后回答此问题!