之前我们已经讲解了rocketmq,在这里我们暂时不介绍部署了(太简单了)。直接开始介绍Producer的demo。
在实战demo的时候,我们要提醒一下,rocketmq的所有topic默认是不允许自动创建的。个人认为生产环境的topic都需要经过统一规划,不允许自动创建,不然项目跑一段时间,如果有些公司人员更新比较频繁的话,到时候就留一坨烂账,因此这里个人还是不建议开启自动创建topic。
在rocketmq中,手动创建topic的语句是:
./mqadmin updateTopic -n 192.168.31.30:9876 -b 127.0.0.1:10911 -t singleTopic
当看到create topic to 127.0.0.1:10911 success.即代表topic创建成功了。
好了下面我们来演示下三种不同的producer,分别是:单向生产者,同步生产者,异步生产者。
一、单向生产者
package com.test.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import lombok.extern.slf4j.Slf4j; /** * 单向发送 * * @author Administrator * */ @Slf4j public class SingleProducer { private DefaultMQProducer producer; public SingleProducer() throws Exception { producer = new DefaultMQProducer("SingleProducerGroup"); // NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址 producer.setNamesrvAddr("192.168.31.30:9876"); producer.start(); } public void send() throws Exception { Message msg = new Message("singleTopic", // 发送的topic "singe", // tags "123", // keys3 "this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容 ); producer.sendOneway(msg); log.info("发送消息完毕"); } public void close() { if (null != producer) { producer.shutdown(); } } public static void main(String[] args) throws Exception { SingleProducer singleProducer = new SingleProducer(); singleProducer.send(); singleProducer.close(); } }
二、同步生产者
package com.test.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; /** * 同步发送 * * @author Administrator * */ @Slf4j public class TProducer { private DefaultMQProducer producer; public TProducer() throws Exception { producer = new DefaultMQProducer("TProducerGroup"); // NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址 producer.setNamesrvAddr("192.168.31.30:9876"); // 设置同步发送失败重试次数,默认2 producer.setRetryTimesWhenSendFailed(3); // 设置发送超时时间,包括同步和异步,默认3000ms producer.setSendMsgTimeout(3000); producer.start(); } public void send() throws Exception { Message msg = new Message("tTopic", // 发送的topic "singe", // tags "123", // keys3 "this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容 ); SendResult sendResult = producer.send(msg); log.info("发送消息完毕:{}",JSON.toJSONString(sendResult)); } public void close() { if (null != producer) { producer.shutdown(); } } public static void main(String[] args) throws Exception { TProducer singleProducer = new TProducer(); singleProducer.send(); singleProducer.close(); } }
三、异步生产者
package com.test.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* 异步发送
*
* @author Administrator
*
*/
@Slf4j
public class SProducer {
private DefaultMQProducer producer;
public SProducer() throws Exception {
producer = new DefaultMQProducer("SProducerGroup");
// NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("192.168.31.30:9876");
// 设置同步发送失败重试次数,默认2
producer.setRetryTimesWhenSendAsyncFailed(3);
// 设置发送超时时间,包括同步和异步,默认3000ms
producer.setSendMsgTimeout(3000);
producer.start();
}
public void send() throws Exception {
Message msg = new Message("sTopic", // 发送的topic
"singe", // tags
"123", // keys3
"this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
producer.send(msg, new SendCallback() {
/**
* 消息成功执行这里
*
* @param sendResult
*/
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送消息完毕:{}", JSON.toJSONString(sendResult));
}
/**
* 消息失败执行这里
*
* @param e
*/
@Override
public void onException(Throwable e) {
log.info("发送消息失败:{}",e.getMessage());
log.error(e.getMessage(),e);
}
});
}
public void close() {
if (null != producer) {
producer.shutdown();
}
}
public static void main(String[] args) throws Exception {
SProducer singleProducer = new SProducer();
singleProducer.send();
singleProducer.close();
}
}最后总结一下:
吞吐量:
1.单向发送模式因为发送过去之后不用接收结果,所以这种方式吞吐量是最高的,
2.同步发送模式是吞吐量最慢的方式,因为它需要等待MQ给它响应结果,它才能继续往下执行
3.异步发送模式吞吐量是比同步发送高比单向发送低的.
安全性:
1.同步发送模式安全性是最高的
2.异步发送模式和单向发送模式都容易丢失消息

还没有评论,来说两句吧...