之前我们已经讲解了rocketmq,在这里我们暂时不介绍部署了(太简单了)。直接开始介绍Producer的demo。
在实战demo的时候,我们要提醒一下,rocketmq的所有topic默认是不允许自动创建的。个人认为生产环境的topic都需要经过统一规划,不允许自动创建,不然项目跑一段时间,如果有些公司人员更新比较频繁的话,到时候就留一坨烂账,因此这里个人还是不建议开启自动创建topic。
在rocketmq中,手动创建topic的语句是:
./mqadmin updateTopic -n 192.168.31.30:9876 -b 127.0.0.1:10911 -t singleTopic
当看到create topic to 127.0.0.1:10911 success.即代表topic创建成功了。
好了下面我们来演示下三种不同的producer,分别是:单向生产者,同步生产者,异步生产者。
一、单向生产者
package com.test.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import lombok.extern.slf4j.Slf4j; /** * 单向发送 * * @author Administrator * */ @Slf4j public class SingleProducer { private DefaultMQProducer producer; public SingleProducer() throws Exception { producer = new DefaultMQProducer("SingleProducerGroup"); // NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址 producer.setNamesrvAddr("192.168.31.30:9876"); producer.start(); } public void send() throws Exception { Message msg = new Message("singleTopic", // 发送的topic "singe", // tags "123", // keys3 "this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容 ); producer.sendOneway(msg); log.info("发送消息完毕"); } public void close() { if (null != producer) { producer.shutdown(); } } public static void main(String[] args) throws Exception { SingleProducer singleProducer = new SingleProducer(); singleProducer.send(); singleProducer.close(); } }
二、同步生产者
package com.test.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; /** * 同步发送 * * @author Administrator * */ @Slf4j public class TProducer { private DefaultMQProducer producer; public TProducer() throws Exception { producer = new DefaultMQProducer("TProducerGroup"); // NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址 producer.setNamesrvAddr("192.168.31.30:9876"); // 设置同步发送失败重试次数,默认2 producer.setRetryTimesWhenSendFailed(3); // 设置发送超时时间,包括同步和异步,默认3000ms producer.setSendMsgTimeout(3000); producer.start(); } public void send() throws Exception { Message msg = new Message("tTopic", // 发送的topic "singe", // tags "123", // keys3 "this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容 ); SendResult sendResult = producer.send(msg); log.info("发送消息完毕:{}",JSON.toJSONString(sendResult)); } public void close() { if (null != producer) { producer.shutdown(); } } public static void main(String[] args) throws Exception { TProducer singleProducer = new TProducer(); singleProducer.send(); singleProducer.close(); } }
三、异步生产者
package com.test.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; /** * 异步发送 * * @author Administrator * */ @Slf4j public class SProducer { private DefaultMQProducer producer; public SProducer() throws Exception { producer = new DefaultMQProducer("SProducerGroup"); // NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址 producer.setNamesrvAddr("192.168.31.30:9876"); // 设置同步发送失败重试次数,默认2 producer.setRetryTimesWhenSendAsyncFailed(3); // 设置发送超时时间,包括同步和异步,默认3000ms producer.setSendMsgTimeout(3000); producer.start(); } public void send() throws Exception { Message msg = new Message("sTopic", // 发送的topic "singe", // tags "123", // keys3 "this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容 ); producer.send(msg, new SendCallback() { /** * 消息成功执行这里 * * @param sendResult */ @Override public void onSuccess(SendResult sendResult) { log.info("发送消息完毕:{}", JSON.toJSONString(sendResult)); } /** * 消息失败执行这里 * * @param e */ @Override public void onException(Throwable e) { log.info("发送消息失败:{}",e.getMessage()); log.error(e.getMessage(),e); } }); } public void close() { if (null != producer) { producer.shutdown(); } } public static void main(String[] args) throws Exception { SProducer singleProducer = new SProducer(); singleProducer.send(); singleProducer.close(); } }
最后总结一下:
吞吐量:
1.单向发送模式因为发送过去之后不用接收结果,所以这种方式吞吐量是最高的,
2.同步发送模式是吞吐量最慢的方式,因为它需要等待MQ给它响应结果,它才能继续往下执行
3.异步发送模式吞吐量是比同步发送高比单向发送低的.
安全性:
1.同步发送模式安全性是最高的
2.异步发送模式和单向发送模式都容易丢失消息
还没有评论,来说两句吧...