之前我们已经讲解了rocketmq,在这里我们暂时不介绍部署了(太简单了)。直接开始介绍Producer的demo。
在实战demo的时候,我们要提醒一下,rocketmq的所有topic默认是不允许自动创建的。个人认为生产环境的topic都需要经过统一规划,不允许自动创建,不然项目跑一段时间,如果有些公司人员更新比较频繁的话,到时候就留一坨烂账,因此这里个人还是不建议开启自动创建topic。
在rocketmq中,手动创建topic的语句是:
./mqadmin updateTopic -n 192.168.31.30:9876 -b 127.0.0.1:10911 -t singleTopic
当看到create topic to 127.0.0.1:10911 success.即代表topic创建成功了。
好了下面我们来演示下三种不同的producer,分别是:单向生产者,同步生产者,异步生产者。
一、单向生产者
package com.test.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import lombok.extern.slf4j.Slf4j;
/**
* 单向发送
*
* @author Administrator
*
*/
@Slf4j
public class SingleProducer {
private DefaultMQProducer producer;
public SingleProducer() throws Exception {
producer = new DefaultMQProducer("SingleProducerGroup");
// NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("192.168.31.30:9876");
producer.start();
}
public void send() throws Exception {
Message msg = new Message("singleTopic", // 发送的topic
"singe", // tags
"123", // keys3
"this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
producer.sendOneway(msg);
log.info("发送消息完毕");
}
public void close() {
if (null != producer) {
producer.shutdown();
}
}
public static void main(String[] args) throws Exception {
SingleProducer singleProducer = new SingleProducer();
singleProducer.send();
singleProducer.close();
}
}二、同步生产者
package com.test.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* 同步发送
*
* @author Administrator
*
*/
@Slf4j
public class TProducer {
private DefaultMQProducer producer;
public TProducer() throws Exception {
producer = new DefaultMQProducer("TProducerGroup");
// NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("192.168.31.30:9876");
// 设置同步发送失败重试次数,默认2
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时间,包括同步和异步,默认3000ms
producer.setSendMsgTimeout(3000);
producer.start();
}
public void send() throws Exception {
Message msg = new Message("tTopic", // 发送的topic
"singe", // tags
"123", // keys3
"this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
SendResult sendResult = producer.send(msg);
log.info("发送消息完毕:{}",JSON.toJSONString(sendResult));
}
public void close() {
if (null != producer) {
producer.shutdown();
}
}
public static void main(String[] args) throws Exception {
TProducer singleProducer = new TProducer();
singleProducer.send();
singleProducer.close();
}
}三、异步生产者
package com.test.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
/**
* 异步发送
*
* @author Administrator
*
*/
@Slf4j
public class SProducer {
private DefaultMQProducer producer;
public SProducer() throws Exception {
producer = new DefaultMQProducer("SProducerGroup");
// NameServer 可以在代码中指定,也可以通过配置环境变量的方式指定NameServer的地址
producer.setNamesrvAddr("192.168.31.30:9876");
// 设置同步发送失败重试次数,默认2
producer.setRetryTimesWhenSendAsyncFailed(3);
// 设置发送超时时间,包括同步和异步,默认3000ms
producer.setSendMsgTimeout(3000);
producer.start();
}
public void send() throws Exception {
Message msg = new Message("sTopic", // 发送的topic
"singe", // tags
"123", // keys3
"this is single test".getBytes(RemotingHelper.DEFAULT_CHARSET) // 发送的内容
);
producer.send(msg, new SendCallback() {
/**
* 消息成功执行这里
*
* @param sendResult
*/
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送消息完毕:{}", JSON.toJSONString(sendResult));
}
/**
* 消息失败执行这里
*
* @param e
*/
@Override
public void onException(Throwable e) {
log.info("发送消息失败:{}",e.getMessage());
log.error(e.getMessage(),e);
}
});
}
public void close() {
if (null != producer) {
producer.shutdown();
}
}
public static void main(String[] args) throws Exception {
SProducer singleProducer = new SProducer();
singleProducer.send();
singleProducer.close();
}
}最后总结一下:
吞吐量:
1.单向发送模式因为发送过去之后不用接收结果,所以这种方式吞吐量是最高的,
2.同步发送模式是吞吐量最慢的方式,因为它需要等待MQ给它响应结果,它才能继续往下执行
3.异步发送模式吞吐量是比同步发送高比单向发送低的.
安全性:
1.同步发送模式安全性是最高的
2.异步发送模式和单向发送模式都容易丢失消息









还没有评论,来说两句吧...