我们都知道,消息是由业务系统在运行过程产生的,当我们的业务系统产生了消息,我们就可以调用RocketMQ提供的API向RocketMQ发送消息,就像下面这样
DefaultMQProducer producer = new DefaultMQProducer("sanyouProducer"); //指定NameServer的地址 producer.setNamesrvAddr("localhost:9876"); //启动生产者 producer.start(); Message msg = new Message("sanyouTopic", "TagA", "三友的java日记 ".getBytes(RemotingHelper.DEFAULT_CHARSET)); // 发送消息并得到消息的发送结果,然后打印 SendResult sendResult = producer.send(msg);
虽然代码很简单,我们不经意间可能会思考如下问题:
1、代码中只设置了NameServer的地址,那么生产者是如何知道Broker所在机器的地址,然后向Broker发送消息的? 2、一个topic会有很多队列,那么生产者是如何选择哪个队列发送消息? 3、消息一旦发送失败了怎么办?
下面我们就挨个来介绍下上面的几个问题
路由表
当Broker在启动的过程中,Broker就会往NameServer注册自己这个Broker的信息,这些信息就包括自身所在服务器的ip和端口,还有就是自己这个Broker有哪些topic和对应的队列信息,这些信息就是路由信息,后面就统一称为路由表。
当生产者启动的时候,会从NameServer中拉取到路由表,缓存到本地,同时会开启一个定时任务,默认是每隔30s从NameServer中重新拉取路由信息,更新本地缓存。
队列的选择
好了通过上面的路由表我们就明白了,原来生产者会从NameServer拉取到Broker的路由表的信息,这样生产者就知道了topic对应的队列的信息了。
但是由于一个topic可能会有很多的队列,那么应该将消息发送到哪个队列上呢?
面对这种情况,RocketMQ提供了两种消息队列的选择算法。
1、轮询算法 2、最小投递延迟算法
轮询算法 就是一个队列一个队列发送消息,这些就能保证消息能够均匀分布在不同的队列底下,这也是RocketMQ默认的队列选择算法。
但是由于机器性能或者其它情况可能会出现某些Broker上的Queue可能投递延迟较严重,这样就会导致生产者不能及时发消息,造成生产者压力过大的问题。所以RocketMQ提供了最小投递延迟算法。
最小投递延迟算法 每次消息投递的时候会统计投递的时间延迟,在选择队列的时候会优先选择投递延迟时间小的队列。这种算法可能会导致消息分布不均匀的问题。
如果你想启用最小投递延迟算法,只需要按如下方法设置一下即可。
producer.setSendLatencyFaultEnable(true);
当然除了上述两种队列选择算法之外,你也可以自定义队列选择算法,只需要实现MessageQueueSelector接口,在发送消息的时候指定即可。
SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //从mqs中选择一个队列 return null; } }, new Object());
MessageQueueSelector RocketMQ也提供了三种实现:
1、随机算法 2、Hash算法 3、根据机房选择算法(空实现)
其他的特殊情况处理
发送异常处理
终于,不论是通过RocketMQ默认的队列选择算法也好,又或是自定义队列选择算法也罢,终于选择到了一个队列,那么此时就可以跟这个队列所在的Broker机器建立网络连接,然后通过网络请求将消息发送到Broker上。
但是不幸的事发生了,Broker挂了,又或者是机器负载太高了,发送消息超时了,那么此时RockerMQ就会进行重试。
RockerMQ重试其实很简单,就是重新选择其它Broker机器中的一个队列进行消息发送,默认会重试两次。
当然如果你的机器比较多,可以将设置重试次数设置大点。
producer.setRetryTimesWhenSendFailed(10);
消息内容过大的处理
一般情况下,消息的内容都不会太大,但是在一些特殊的场景中,消息内容可能会出现很大的情况。
遇到这种消息过大的情况,比如在默认情况下消息大小超过4M的时候,RocketMQ是会对消息进行压缩之后再发送到Broker上,这样在消息发送的时候就可以减少网络资源的占用。
还没有评论,来说两句吧...