在日常工作中,我们经常会有延时任务的需求,在此举一个场景示例:
用户在商场下单之后,需要进行付款,我们大多都可以看到有一个待付款的订单,这个订单有效期是30分钟左右(不同的系统时间不一样,常见的是30分钟),那么30分钟之后我们需要取消订单,用户在30分钟之后进行付款,我们应该提示订单失效请重新下单。
在上面这个场景里面,我们需要在订单30分钟截止的时候需要去触发取消订单,像这种操作就属于一个延迟的事件。为了减轻程序的压力,那么在到30分钟截止的那一秒再触发这个动作是不是会更好了。这种场景里面我们就希望有一个组件帮我们协调信息,到点的时候通知我们程序进行触发即可。这里我们就介绍下大家常用的消息队列rabbitmq的死信队列帮我们实现。
这个死信队列的原理是什么呢?来一张图示:
正常的一个流程就是如上图所示。大家可以理解下。下面就给大家来实战演示下:
一、使用docker启动一个rabbitmq,并启动webUI
docker pull rabbitmq:3.7.7-management docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:3.7.7-management
再访问 http://192.168.31.30:15672 页面就可以看到对应的管理页面了 。默认登录账号密码是:admin/admin
二、代码演示下
2.1)引入maven依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.15.0</version> </dependency>
2.2)编写一个生产者 Producer2.java
package com.test.rabbitmq; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer2 { // 死信交换机 public static final String DELAY_EXCHANGE = "delay-exchange"; // 死信路由 public static final String DELAY_ROUTING_KEY = "delay-routing-key"; // 死信队列 public static final String DELAY_QUEUE = "delay-queue"; // 普通交换机 public static final String NORMAL_EXCHANGE = "normal-exchange"; // 普通路由 public static final String NORMAL_ROUTING_KEY = "normal-routing-key"; // 普通队列 public static final String NORMAL_QUEUE = "normal-queue"; public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.31.30"); connectionFactory.setPort(5672); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); // 设置连接超时时间:20秒 connectionFactory.setConnectionTimeout(20000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); HashMap<String, Object> arg = new HashMap<String, Object>(); /** * 消息发送给延时队列 设置延时队列的过期时间为5秒钟 5秒之后,延时队列将消息发送给普通队列 */ arg.put("x-dead-letter-exchange", NORMAL_EXCHANGE); arg.put("x-dead-letter-routing-key", NORMAL_ROUTING_KEY); arg.put("x-message-ttl", 5000); channel.exchangeDeclare(DELAY_EXCHANGE, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(DELAY_QUEUE, true, false, false, arg); channel.queueBind(DELAY_QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(NORMAL_QUEUE, true, false, false, null); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().deliveryMode(2).expiration("10000") .build(); channel.basicPublish(DELAY_EXCHANGE, DELAY_ROUTING_KEY, properties, "hello".getBytes()); channel.close(); connection.close(); } }
三、开始测试
3.1)访问http://192.168.31.30:15672 账号:admin,密码:admin,然后点击queue的栏目
此时我们可以看到是没有queue的,queue的数量是0
3.2)运行producer2.java
3.3)运行了java程序之后,我们可以看到queue页面有两个queue
此时我们可以看到刚才我们创建的两个队列,一个delay延迟队列,一个是normal队列。然后我们在运行一下,可以看到发送的消息是首先进入到了delay-queue里面的,等待消息的ttl时间到了之后,消息就自动跑到normal-queue里面了。由此就达到了上面场景的需求,例如订单支付,我们首先把订单放到delay-queue里面。然后等待30分钟后,rabbitmqmq会自动把message转移到normal-queue中,这时候normal-queue正常的消费者就可以拿到数据,直接出发取消订单的操作。
综上总结一下:
1、延迟队列的使用场景可以很好的帮助我们到点触发action,避免程序压力过大。
2、延迟队列的话只需要控制生产者就是了,不需要控制消费者。网上大多的案例是在消费者端控制,如果这样操作的话,有些业务场景明明只需要生产者,但是确还要编写消费者去处理这种延迟队列。流程上来说是不正确的。
3、在使用延迟队列的时候,消息一定要设置持久化,如果没有持久化,重启rabbitmq数据就会丢失。
再补充下延迟队列的消费场景:
1、商城取消订单
2、消息消费延迟重试
3、定期提醒(例如:30天内没有登录,发送短信通知)
4、预定会议通知
。。。等等以上的场景。
还没有评论,来说两句吧...