上一篇我们介绍了rocketmq生产者的代码示例,这篇我们介绍下消费者Consumer的代码示例。话不多说,直接上代码:
package com.test.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import java.io.UnsupportedEncodingException; import java.util.List; /** * rocketmq消费者 * * @author Administrator * */ @Slf4j public class RConsumer { DefaultMQPushConsumer consumer = null; public RConsumer() throws Exception { // 使用指定的消费者组名称实例化 consumer = new DefaultMQPushConsumer("RConsumerGroup"); // 指定名称服务器地址 consumer.setNamesrvAddr("192.168.31.30:9876"); // 如果指定的消费者组是全新的,请指定从哪里开始 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 再订阅一个主题来消费 consumer.subscribe("tTopic", "*"); } public void subscribe() throws MQClientException { consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { log.info("接收到的消息是:{}", JSON.toJSONString(msgs)); if(msgs.size() > 0) { MessageExt messageExt = msgs.get(0); String body = null; try { body = new String(messageExt.getBody(), "utf-8"); } catch (Exception e) { log.error(e.getMessage(),e); } log.info("解码出来的body是:{}",body); } // 根据业务处理返回success或者fail return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } public void close() { if (null != consumer) { consumer.shutdown(); } } public static void main(String[] args) throws Exception { RConsumer rConsumer = new RConsumer(); rConsumer.subscribe(); } }
备注:
不管是哪种生产者,这个消费者的代码都差不多的,所以消费者示例只有这一个。
还没有评论,来说两句吧...