在使用rabbitmq作为消息队列的时候,我们需要了解下rabbitmq的消息模式。rabbitmq的消息模式有5种,分别是:simple模式,Fanout模式,Direct模式,Topic模式,work模式,。本文主要介绍的是simple模式。
在rabbitmq中simple模式是最简单的模式,即一个生产者对应一个队列,同时只有一个消费者,像这种一般我们很容易理解把他作为FIFO顺序性的消息保障。
整个流程如上图,生产者把生产的消息发送给Rabbitmq进行存储,消费者从消息队列中拉取消息进行消费。下面我们使用代码进行演示下:
一、引入maven的依赖:
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
二、编写生产者
package com.rabbitmq.producer;
import java.nio.charset.StandardCharsets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SimpleleProducer {
private Connection connection = null;
private Channel channel = null;
private ConnectionFactory factory = null;
private String queueName = "test";
/**
* 初始化rabbitmq连接
*/
@Before
public void init() {
factory = new ConnectionFactory();
factory.setHost("192.168.31.20");
// 设置rabbitmq的登录账号和密码
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5673); // 默认端口是5672,这里服务器端口冲突,因此改到5673
factory.setVirtualHost("/");
}
@After
public void destroy() {
// 关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
// 关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
@Test
public void sendMsg() throws Exception {
// 2、创建连接、通道
connection = factory.newConnection();
channel = connection.createChannel();
// 3、声明队列
channel.queueDeclare(queueName, true, false, false, null);
// 发送消息内容
String message = "take a message";
// 4、发送消息到指定队列,这里我们使用MessageProperties.PERSISTENT_TEXT_PLAIN对消息进行持久化
channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes(StandardCharsets.UTF_8));
log.info("发送消息完成,消息内容是:{}", message);
}
}三、编写消费者
package com.rabbitmq.producer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SimpleConsumer {
private final static String QUEUE_NAME = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 1、创建连接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.31.20");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5673);
factory.setVirtualHost("/");
// 2、获取 Connection和 Channel
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3、声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
log.info("接收到的消息是:{}", message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
// 如果消息被取消,则什么都不做
});
}
}然后运行结果如下:
备注:
1、Simple模式是rabbitmq消息模式里面最简单的一种。
2、在编写rabbitmq的时候需要注意下队列的持久化和消息的持久化











还没有评论,来说两句吧...