在使用rabbitmq作为消息队列的时候,我们需要了解下rabbitmq的消息模式。rabbitmq的消息模式有5种,分别是:simple模式,Fanout模式,Direct模式,Topic模式,work模式,。本文主要介绍的是Work模式。
work模式可以看做是一个负载均衡的工作模式,例如我们在实际环境中有多个消费者,那么这些消费者的消费能力可能是不一样的,那么我们就需要去均衡每个消费者的消费能力,例如:每个消费者固定消费多少数据,或者消费能力强的多消费一些数据。生产者这边是不需要做任何改动的。
Work模式主要有2种:
1、轮询的方式,即按照顺序轮询的方式,每一个消费者的消息队列获取的消息数量是一样的,例如5个队列,10条消息,那么每个队列都是2条消息,很均衡。 2、公平分发,即消费能力强的多发送一些消息,消费能力弱的少发送一些消息。直白点就是能力强的多干,能力弱的少干。
还有一点就是这种work模式是在消费者端控制的。下面分别演示下Work模式下的负载均衡。
一、添加maven依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
二、编写生产者
package com.rabbitmq.producer; import java.nio.charset.StandardCharsets; import org.junit.After; import org.junit.Before; import org.junit.Test; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import lombok.extern.slf4j.Slf4j; @Slf4j public class WorkProducer { private Connection connection = null; private Channel channel = null; private ConnectionFactory factory = null; private String queueName = "test"; /** * 初始化rabbitmq连接 */ @Before public void init() { factory = new ConnectionFactory(); factory.setHost("192.168.31.20"); // 设置rabbitmq的登录账号和密码 factory.setUsername("guest"); factory.setPassword("guest"); factory.setPort(5673); // 默认端口是5672,这里服务器端口冲突,因此改到5673 factory.setVirtualHost("/"); } @After public void destroy() { // 关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e) { log.error(e.getMessage(), e); } } // 关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e) { log.error(e.getMessage(), e); } } } @Test public void sendMsg() throws Exception { // 2、创建连接、通道 connection = factory.newConnection(); channel = connection.createChannel(); // 发送消息内容 String message = "take a message"; // 4、发送消息到指定队列,这里我们使用MessageProperties.PERSISTENT_TEXT_PLAIN对消息进行持久化 channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); log.info("发送消息完成,消息内容是:{}", message); } }
三、编写消费者
3.1、编写轮询的消费者
package com.rabbitmq.producer; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import lombok.extern.slf4j.Slf4j; @Slf4j public class WorkLunxunConsumer { private final static String QUEUE_NAME = "test"; public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.20"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setPort(5673); factory.setVirtualHost("/"); // 2、获取 Connection和 Channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 3、声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); log.info("接收到的消息是:{}", message); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { log.error(e.getMessage(),e); } }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { // 如果消息被取消,则什么都不做 }); log.info("消费者启动成功"); } }
3.2、编写公平的消费者
package com.rabbitmq.producer; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; import lombok.extern.slf4j.Slf4j; @Slf4j public class WorkLunxun2Consumer { private final static String QUEUE_NAME = "test"; public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.31.20"); factory.setUsername("guest"); factory.setPassword("guest"); factory.setPort(5673); factory.setVirtualHost("/"); // 2、获取 Connection和 Channel Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 3、声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //使用QOS机制,每次从队列里面取1条 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); log.info("接收到的消息是:{}", message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { log.error(e.getMessage(),e); } }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { // 如果消息被取消,则什么都不做 }); log.info("消费者启动成功"); } }
可以看出,在公平里面,我们使用了qos机制,即每次从消息队列里面取出来多少条数据,待数据处理完之后,我们在使用手动的ack机制,以保证数据是被一条条的处理完毕的,如果没处理完,则不会进行手动basicack,这样队列里面的消息就会发送给其他的消费者处理。
备注:
1、这个qos机制在生产环境里面几乎是必用的。
2、手动ack机制是我们保证消费每一条queue里面的消息不丢失的保障。
还没有评论,来说两句吧...