最近线上的rabbitmq消费者出现了消息积压的情况,因此收到了报警。果断上去查看,挤压的消息确实够多,很多数据都没有被及时的处理,所以第一时间检查了代码,一看消费者一直在消费同一条消息,也就是出现了死循环。对于根据经验可以直接判断是消费者端没有出现ack。果断查看代码就是如此,下面我们贴一下伪代码:
@RabbitListener(queues = "topic1") // 监听队列名称
public void getMsg(Message message, Channel channel, String msg) {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费的队列消息来自:" + message.getMessageProperties().getConsumerQueue());
log.info("获取到的消息是:{}", msg);
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
log.error(e.getMessage(), e);
try {
channel.basicReject(deliveryTag, true);
} catch (IOException e1) {
log.error("当前消息:{} 没有被正确消费", msg);
log.error(e1.getMessage(), e1);
}
}
}从上面的伪代码可以看到,程序如果正确处理消息的话是没问题的,但是如果因为一些异常导致的时候,会使用:
channel.basicReject(deliveryTag, true);
这里的代码把消息重新返回队列,此时呢?由于消息是直接返回到队首,所以下一秒又会收到相同的消息,如此继续接收此消息,并且继续出现异常,然后继续返回队首,然后继续接收此消息,这样子造成了无限死循环,此时如果所有的消费者都出现这样的情况的话,所有的消费者就会全部被死循环占用,导致正常的消息不能被正常的进行消费。
所以对于我们日常编码的时候一定要注意合理的使用各个组件的api函数,而不是只是简单的凭介绍就直接调用相关的api,这样在线上会造成无可估量的重大事故。
对于这种死循环来说我们如何解决呢?其实常用的做法就是使用rabbitmq的死信队列。当消息处理异常的时候,我们把这条消息发送到死信队列里面去,然后经过ttl之后,此消息再度流转到当前的消息队列,供此消费者进行消费。这样子做的好处是:
1、消息处理异常的时候会进行ack,不会造成正常消息队列的异常消息给消费者造成死循环的问题。使消费者可以正常的处理其他的消息。 2、消息即使处理失败,发送到死信队列之后也不会造成消息的丢失。 3、经过死信队列与正常队列的理由绑定,可以实现一套消费者逻辑,不需要实现监听多个队列来处理消费者信息。
其实在之前我们介绍过编写rabbitmq的消费者,但是那时候其实主要是使用spring的方式来实现的,现在大部分都是一些web项目,所以大家都是使用Springboot项目来实现的,示例如下:
一、引入maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、创建消费者监听
package com.example.demo.consumer;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
@Service
@Slf4j
public class RabbitmqConsumer {
@Autowired
private DeadProducer deadProducer;
@RabbitListener(queues = "topic1") // 监听队列名称
public void getMsg(Message message, Channel channel, String msg) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info("消费的队列消息来自:" + message.getMessageProperties().getConsumerQueue());
log.info("获取到的消息是:{}", msg);
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
log.error(e.getMessage(), e);
Boolean succ = deadProducer.sendMsg(msg);
if(succ) {
//一般发送不成功都是rabbitmq挂了,其他的错误原因一般初始化都解决了。
channel.basicAck(deliveryTag, true);
}else {
//人工处理,以防万一
log.info("topic1 消费者处理失败,消息是:{} 需要人工处理",msg);
}
}
}
}三、创建生产者
死信队列的生产者比较特殊,需要在初始化的时候绑定路由,所以这里的话我们单独作为一个生产者来使用。示例代码如下:
package com.example.demo.consumer;
import java.util.HashMap;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class DeadProducer {
// 死信交换机
private static final String DELAY_EXCHANGE = "delay-topic1-exchange";
// 死信路由
private static final String DELAY_ROUTING_KEY = "delay-topic1-routing-key";
// 死信队列
private static final String DELAY_QUEUE = "delay-topic1";
// 普通交换机
private static final String NORMAL_EXCHANGE = "topic1-exchange";
// 普通路由
private static final String NORMAL_ROUTING_KEY = "topic1-routing-key";
// 普通队列
private static final String NORMAL_QUEUE = "topic1";
@Value("${spring.rabbitmq.host}")
private String mq_host;
@Value("${spring.rabbitmq.port}")
private String mq_port;
@Value("${spring.rabbitmq.username}")
private String mq_username;
@Value("${spring.rabbitmq.password}")
private String mq_password;
private Connection connection = null;
private Channel channel = null;
private AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).expiration("10000")
.build();
@PostConstruct
public void initProducer() {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(mq_host);
connectionFactory.setPort(Integer.valueOf(mq_port));
connectionFactory.setUsername(mq_username);
connectionFactory.setPassword(mq_password);
// 设置连接超时时间:20秒
connectionFactory.setConnectionTimeout(20000);
connection = connectionFactory.newConnection();
channel = connection.createChannel();
HashMap<String, Object> arg = new HashMap<String, Object>();
/**
* 消息发送给延时队列 设置延时队列的过期时间为5秒钟 5秒之后,延时队列将消息发送给普通队列
*/
arg.put("x-dead-letter-exchange", NORMAL_EXCHANGE);
arg.put("x-dead-letter-routing-key", NORMAL_ROUTING_KEY);
arg.put("x-message-ttl", 10000);
channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(DELAY_QUEUE, true, false, false, arg);
channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY);
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(NORMAL_QUEUE, true, false, false, null);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
} catch (Exception e) {
log.error("rabbitmq死信队列生产者初始化失败");
throw new RuntimeException(e);
}
}
public Boolean sendMsg(String msg) {
try {
channel.basicPublish(DELAY_EXCHANGE, DELAY_ROUTING_KEY, properties, msg.getBytes());
} catch (Exception e) {
log.error(e.getMessage(), e);
return false;
}
return true;
}
@PreDestroy
public void destroy() {
try {
if (null != channel) {
channel.close();
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
try {
if (null != connection) {
connection.close();
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}这里可能有的同学会有疑问:
这样子的话,只要涉及到死信队列是不是都要新创建一个生产者?
我的答案是这样子的,我们目前来说主要是依靠单线程进行发送,总体线程数还是可控的,同时项目中涉及到的异步不多,所以这时候我们不同队列对应的生产者都会新创建一个。
最后再贴上我们的配置信息:
spring: #rabbitmq 配置 rabbitmq: host: 49.4.14.12 username: guest password: guest #虚拟主机 virtual-host: / #端口 port: 5672 listener: simple: #消费者最小数量 concurrency: 50 #消费者最大数量 max-concurrency: 50 #限制消费者,每次只能处理一条消息,处理完才能继续下一条消息 prefetch: 1 #启动时是否默认启动容器,默认为 true auto-startup: true #开启手动确认模式 acknowledge-mode: manual template: retry: #启用消息重试机制,默认为 false enabled: true #初始重试间隔时间 initial-interval: 1000ms #重试最大次数,默认为 3 次 max-attempts: 3 #重试最大时间间隔,默认 10000ms max-interval: 10000ms #重试的间隔乘数,第一次等 1s,第二次等 2s,第三次等 4s multiplier: 2
最后我们运行一下即可看在rabbitmq的dashboard上看到对应的信息,示例如下:










还没有评论,来说两句吧...