在前面,我们成功实现了rabbitmq的安装,详见《centos7.x如何使用docker安装rabbitmq并配置管理界面?》,同样上一篇文章,我们讲解了生产者-《消息中间件实战之rabbitmq生产者》,今天我们来讲解下rabbitmq的实战编码之消费者。
第一步、在生产者的项目里面编写消费者代码
package com.demo.rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * RabbitMQ 消费者 */ public class RabbitMqConsumer { // 队列名称 private String QUEUE_NAME = "test"; // 创建连接工厂 private ConnectionFactory factory = null; // 建立到代理服务器到连接 private Connection connection = null; // 获得通道 private Channel channel = null; private QueueingConsumer consumer = null; public RabbitMqConsumer() { try { factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("192.168.31.10"); factory.setPort(5672); // 建立到代理服务器到连接 connection = factory.newConnection(); channel = connection.createChannel(); // 1.队列名2.是否持久化,3是否局限与链接,4不再使用是否删除,5其他的属性 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 声明一个消费者,配置好获取消息的方式 consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } public void getMsg() { // 循环获取消息 while (true) { QueueingConsumer.Delivery delivery = null; try { // 循环获取信息 // 指向下一个消息,如果没有会一直阻塞 delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); System.out.println("从队列:" + QUEUE_NAME + "接收到的消息是:" + msg); //生产环境的消费者一定要手动ack数据,避免数据丢失 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } catch (Exception e) { e.printStackTrace(); try { //重回队列 channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,true); } catch (Exception e1) { e1.printStackTrace(); } } } } public void destroy() { try { // 关闭资源 channel.close(); } catch (Exception e) { e.printStackTrace(); } finally { try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } }
第二步、在测试类里面编写测试消费者的代码
package com.demo.rabbitmq; public class RabbitmqTest { public static void main(String[] args) { //produceMsg(); consumingMsg(); } /** * 测试消费者 */ private static void consumingMsg() { RabbitMqConsumer consumer = new RabbitMqConsumer(); consumer.getMsg(); } /** * 测试生产消息 */ private static void produceMsg() { RabbitMqProducer rabbitMqProducer = new RabbitMqProducer(); rabbitMqProducer.send("11111"); rabbitMqProducer.destroy(); } }
第三步、查看执行结果
到这里我们的生产者和消费者都实现完了,并且已经测试通了。有几个需要注意的地方,需要大概说下:
1、生产环境里面,我们一般都是通过spring 在管理这些代码,因此这些参数一定要从配置中心进行或者,不要硬编码到项目中。 2、生产环境中一定要手动ack数据,保证消费的每一条数据都被程序完成正确的处理掉。
还没有评论,来说两句吧...