上一篇文章我们介绍了RocketMQ是如何保证高可用的,这篇文章我们介绍下RocketMQ是如何消费的。
在生产者成功发送消息到Broker,Broker在成功存储消息之后,消费者要消费消息了。
消费者在启动的时候会从NameSrever拉取消费者订阅的topic的路由信息,这样就知道订阅的topic有哪些queue,以及queue所在Broker的地址信息。
为什么消费者需要知道topic对应的哪些queue呢?
其实主要是因为消费者在消费消息的时候是以队列为消费单元的,消费者需要告诉Broker拉取的是哪个队列的消息,至于如何拉到消息的,后面再说。
消费的两种模式
前面说过,消费者是有个消费者组的概念,在启动消费者的时候会指定该消费者属于哪个消费者组。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sanyouConsumer");
一个消费者组中可以有多个消费者,不同消费者组之间消费消息是互不干扰的。
在同一个消费者组中,消息消费有两种模式。
1、集群模式 2、广播模式
集群模式
同一条消息只能被同一个消费组下的一个消费者消费,也就是说,同一条消息在同一个消费者组底下只会被消费一次,这就叫集群消费。
集群消费的实现就是将队列按照一定的算法分配给消费者,默认是按照平均分配的。
如图所示,将每个队列分配只分配给同一个消费者组中的一个消费者,这样消息就只会被一个消费者消费,从而实现了集群消费的效果。
RocketMQ默认是集群消费的模式。
广播模式
广播模式就是同一条消息可以被同一个消费者组下的所有消费者消费。
其实实现也很简单,就是将所有队列分配给每个消费者,这样每个消费者都能读取topic底下所有的队列的数据,就实现了广播模式。
如果你想使用广播模式,只需要在代码中指定即可。
consumer.setMessageModel(MessageModel.BROADCASTING);
ConsumerQueue
我们提到消费者是从队列中拉取消息的,但是这里不经就有一个疑问,那就是消息明明都存在CommitLog文件中的,那么是如何去队列中拉的呢?难道是去遍历所有的文件,找到对应队列的消息进行消费么?
答案是否定的,因为这种每次都遍历数据的效率会很低,所以为了解决这种问题,引入了ConsumeQueue的这个概念,而消费实际是从ConsumeQueue中拉取数据的。
用户在创建topic的时候,Broker会为topic创建队列,并且每个队列其实会有一个编号queueId,每个队列都会对应一个ConsumeQueue,比如说一个topic在某个Broker上有4个队列,那么就有4个ConsumeQueue。
前面说过,消息在发送的时候,会根据一定的算法选择一个队列,之后再发送消息的时候会携带选择队列的queueId,这样Broker就知道消息属于哪个队列的了。当消息被存到CommitLog之后,其实还会往这条消息所在的队列的ConsumeQueue插一条数据。
ConsumeQueue也是由多个文件组成,每个文件默认是存30万条数据。
插入ConsumeQueue中的每条数据由20个字节组成,包含3部分信息,消息在CommitLog的起始位置(8个字节),消息在CommitLog存储的长度(8个字节),还有就是tag的hashCode(4个字节)。
所以当消费者从Broker拉取消息的时候,会告诉Broker拉取哪个队列(queueId)的消息、这个队列的哪个位置的消息(queueOffset)。
queueOffset就是指上图中ConsumeQueue一条数据的编号,单调递增的。
Broker在接受到消息的时候,找个指定队列的ConsumeQueue,由于每条数据固定是20个字节,所以可以轻易地计算出queueOffset对应的那条数据在哪个文件的哪个位置上,然后读出20个字节,从这20个字节中在解析出消息在CommitLog的起始位置和存储的长度,之后再到CommitLog中去查找,这样就找到了消息,然后在进行一些处理操作返回给消费者。
到这,我们就清楚的知道消费者是如何从队列中拉取消息的了,其实就是先从这个队列对应的ConsumeQueue中找到消息所在CommmitLog中的位置,然后再从CommmitLog中读取消息的。
还没有评论,来说两句吧...