在前面,我们成功实现了rabbitmq的安装,详见《centos7.x如何使用docker安装rabbitmq并配置管理界面?》,同样上一篇文章,我们讲解了生产者-《消息中间件实战之rabbitmq生产者》,今天我们来讲解下rabbitmq的实战编码之消费者。
第一步、在生产者的项目里面编写消费者代码
package com.demo.rabbitmq;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;
/**
* RabbitMQ 消费者
*/
public class RabbitMqConsumer {
// 队列名称
private String QUEUE_NAME = "test";
// 创建连接工厂
private ConnectionFactory factory = null;
// 建立到代理服务器到连接
private Connection connection = null;
// 获得通道
private Channel channel = null;
private QueueingConsumer consumer = null;
public RabbitMqConsumer() {
try {
factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("192.168.31.10");
factory.setPort(5672);
// 建立到代理服务器到连接
connection = factory.newConnection();
channel = connection.createChannel();
// 1.队列名2.是否持久化,3是否局限与链接,4不再使用是否删除,5其他的属性
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 声明一个消费者,配置好获取消息的方式
consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
public void getMsg() {
// 循环获取消息
while (true) {
QueueingConsumer.Delivery delivery = null;
try {
// 循环获取信息
// 指向下一个消息,如果没有会一直阻塞
delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("从队列:" + QUEUE_NAME + "接收到的消息是:" + msg);
//生产环境的消费者一定要手动ack数据,避免数据丢失
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
} catch (Exception e) {
e.printStackTrace();
try {
//重回队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,true);
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
public void destroy() {
try {
// 关闭资源
channel.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}第二步、在测试类里面编写测试消费者的代码
package com.demo.rabbitmq;
public class RabbitmqTest {
public static void main(String[] args) {
//produceMsg();
consumingMsg();
}
/**
* 测试消费者
*/
private static void consumingMsg() {
RabbitMqConsumer consumer = new RabbitMqConsumer();
consumer.getMsg();
}
/**
* 测试生产消息
*/
private static void produceMsg() {
RabbitMqProducer rabbitMqProducer = new RabbitMqProducer();
rabbitMqProducer.send("11111");
rabbitMqProducer.destroy();
}
}第三步、查看执行结果
到这里我们的生产者和消费者都实现完了,并且已经测试通了。有几个需要注意的地方,需要大概说下:
1、生产环境里面,我们一般都是通过spring 在管理这些代码,因此这些参数一定要从配置中心进行或者,不要硬编码到项目中。 2、生产环境中一定要手动ack数据,保证消费的每一条数据都被程序完成正确的处理掉。










还没有评论,来说两句吧...