最近线上的rabbitmq消费者出现了消息积压的情况,因此收到了报警。果断上去查看,挤压的消息确实够多,很多数据都没有被及时的处理,所以第一时间检查了代码,一看消费者一直在消费同一条消息,也就是出现了死循环。对于根据经验可以直接判断是消费者端没有出现ack。果断查看代码就是如此,下面我们贴一下伪代码:
@RabbitListener(queues = "topic1") // 监听队列名称 public void getMsg(Message message, Channel channel, String msg) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("消费的队列消息来自:" + message.getMessageProperties().getConsumerQueue()); log.info("获取到的消息是:{}", msg); channel.basicAck(deliveryTag, true); } catch (Exception e) { log.error(e.getMessage(), e); try { channel.basicReject(deliveryTag, true); } catch (IOException e1) { log.error("当前消息:{} 没有被正确消费", msg); log.error(e1.getMessage(), e1); } } }
从上面的伪代码可以看到,程序如果正确处理消息的话是没问题的,但是如果因为一些异常导致的时候,会使用:
channel.basicReject(deliveryTag, true);
这里的代码把消息重新返回队列,此时呢?由于消息是直接返回到队首,所以下一秒又会收到相同的消息,如此继续接收此消息,并且继续出现异常,然后继续返回队首,然后继续接收此消息,这样子造成了无限死循环,此时如果所有的消费者都出现这样的情况的话,所有的消费者就会全部被死循环占用,导致正常的消息不能被正常的进行消费。
所以对于我们日常编码的时候一定要注意合理的使用各个组件的api函数,而不是只是简单的凭介绍就直接调用相关的api,这样在线上会造成无可估量的重大事故。
对于这种死循环来说我们如何解决呢?其实常用的做法就是使用rabbitmq的死信队列。当消息处理异常的时候,我们把这条消息发送到死信队列里面去,然后经过ttl之后,此消息再度流转到当前的消息队列,供此消费者进行消费。这样子做的好处是:
1、消息处理异常的时候会进行ack,不会造成正常消息队列的异常消息给消费者造成死循环的问题。使消费者可以正常的处理其他的消息。 2、消息即使处理失败,发送到死信队列之后也不会造成消息的丢失。 3、经过死信队列与正常队列的理由绑定,可以实现一套消费者逻辑,不需要实现监听多个队列来处理消费者信息。
其实在之前我们介绍过编写rabbitmq的消费者,但是那时候其实主要是使用spring的方式来实现的,现在大部分都是一些web项目,所以大家都是使用Springboot项目来实现的,示例如下:
一、引入maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、创建消费者监听
package com.example.demo.consumer; import java.io.IOException; import java.util.concurrent.TimeUnit; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; @Service @Slf4j public class RabbitmqConsumer { @Autowired private DeadProducer deadProducer; @RabbitListener(queues = "topic1") // 监听队列名称 public void getMsg(Message message, Channel channel, String msg) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("消费的队列消息来自:" + message.getMessageProperties().getConsumerQueue()); log.info("获取到的消息是:{}", msg); channel.basicAck(deliveryTag, true); } catch (Exception e) { log.error(e.getMessage(), e); Boolean succ = deadProducer.sendMsg(msg); if(succ) { //一般发送不成功都是rabbitmq挂了,其他的错误原因一般初始化都解决了。 channel.basicAck(deliveryTag, true); }else { //人工处理,以防万一 log.info("topic1 消费者处理失败,消息是:{} 需要人工处理",msg); } } } }
三、创建生产者
死信队列的生产者比较特殊,需要在初始化的时候绑定路由,所以这里的话我们单独作为一个生产者来使用。示例代码如下:
package com.example.demo.consumer; import java.util.HashMap; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class DeadProducer { // 死信交换机 private static final String DELAY_EXCHANGE = "delay-topic1-exchange"; // 死信路由 private static final String DELAY_ROUTING_KEY = "delay-topic1-routing-key"; // 死信队列 private static final String DELAY_QUEUE = "delay-topic1"; // 普通交换机 private static final String NORMAL_EXCHANGE = "topic1-exchange"; // 普通路由 private static final String NORMAL_ROUTING_KEY = "topic1-routing-key"; // 普通队列 private static final String NORMAL_QUEUE = "topic1"; @Value("${spring.rabbitmq.host}") private String mq_host; @Value("${spring.rabbitmq.port}") private String mq_port; @Value("${spring.rabbitmq.username}") private String mq_username; @Value("${spring.rabbitmq.password}") private String mq_password; private Connection connection = null; private Channel channel = null; private AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).expiration("10000") .build(); @PostConstruct public void initProducer() { try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(mq_host); connectionFactory.setPort(Integer.valueOf(mq_port)); connectionFactory.setUsername(mq_username); connectionFactory.setPassword(mq_password); // 设置连接超时时间:20秒 connectionFactory.setConnectionTimeout(20000); connection = connectionFactory.newConnection(); channel = connection.createChannel(); HashMap<String, Object> arg = new HashMap<String, Object>(); /** * 消息发送给延时队列 设置延时队列的过期时间为5秒钟 5秒之后,延时队列将消息发送给普通队列 */ arg.put("x-dead-letter-exchange", NORMAL_EXCHANGE); arg.put("x-dead-letter-routing-key", NORMAL_ROUTING_KEY); arg.put("x-message-ttl", 10000); channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(DELAY_QUEUE, true, false, false, arg); channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(NORMAL_QUEUE, true, false, false, null); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY); } catch (Exception e) { log.error("rabbitmq死信队列生产者初始化失败"); throw new RuntimeException(e); } } public Boolean sendMsg(String msg) { try { channel.basicPublish(DELAY_EXCHANGE, DELAY_ROUTING_KEY, properties, msg.getBytes()); } catch (Exception e) { log.error(e.getMessage(), e); return false; } return true; } @PreDestroy public void destroy() { try { if (null != channel) { channel.close(); } } catch (Exception e) { log.error(e.getMessage(), e); } finally { try { if (null != connection) { connection.close(); } } catch (Exception e) { log.error(e.getMessage(), e); } } } }
这里可能有的同学会有疑问:
这样子的话,只要涉及到死信队列是不是都要新创建一个生产者?
我的答案是这样子的,我们目前来说主要是依靠单线程进行发送,总体线程数还是可控的,同时项目中涉及到的异步不多,所以这时候我们不同队列对应的生产者都会新创建一个。
最后再贴上我们的配置信息:
spring: #rabbitmq 配置 rabbitmq: host: 49.4.14.12 username: guest password: guest #虚拟主机 virtual-host: / #端口 port: 5672 listener: simple: #消费者最小数量 concurrency: 50 #消费者最大数量 max-concurrency: 50 #限制消费者,每次只能处理一条消息,处理完才能继续下一条消息 prefetch: 1 #启动时是否默认启动容器,默认为 true auto-startup: true #开启手动确认模式 acknowledge-mode: manual template: retry: #启用消息重试机制,默认为 false enabled: true #初始重试间隔时间 initial-interval: 1000ms #重试最大次数,默认为 3 次 max-attempts: 3 #重试最大时间间隔,默认 10000ms max-interval: 10000ms #重试的间隔乘数,第一次等 1s,第二次等 2s,第三次等 4s multiplier: 2
最后我们运行一下即可看在rabbitmq的dashboard上看到对应的信息,示例如下:
还没有评论,来说两句吧...