在消息中间件里面,我们常用的有rabbitmq,kafka,rocketmq,pulsar等。这些mq我们有时间都会挨个介绍。今天这篇主要介绍下rocketmq。
一、什么是rocketmq
rocketmq是阿里开源的,贡献给apache的消息中间件,整个代码都是由java编写。rocketmq下载地址是:https://archive.apache.org/dist/rocketmq/
二、rocketmq消息模型有哪些?
rocketmq消息模型有三种,分别是:Producer,Broker,Consumer。这三个角色分别是:Producer是发送消息,Consumer是消费消息,Broker是存储消息。
三、rocketmq之producer
rocketmq的producer我们已经说过是用来生产消息的,发送消息的模式目前有:同步发送消息,异步发送消息,顺序发送消息,单向发送消息。在同步和异步发送消息的时候,我们需要broker端返回确认的信息,这个业务主要是保证我们的消息不丢失,在生产环境中常见。 单向发送消息是不需要broker返回确认信息的,常见的场景如日志收集等可以容纳消息丢失的业务场景。
四、rocketmq之Consumer
rocketmq的Consumer我们已经说过他是用来消费消息的,常见的消费模式有:拉取消费(最常用),推送消费(broker向Consumer主动推消息),广播消费(常用,每个group接收全量消息),集群消费(常用,相同Group的每个Consumer示例均摊消息),普通顺序消费(相同topic分区收到的消费消息,一般这种常见的主要是定义一个分区,顺序消费),严格顺序消费(消费者收到的所有消息均是有顺序的,例如订单创建,消费,完成这三个顺序不能乱,比较不常用)。
五、rocketmq什么情况下会导致数据丢失
1、Broker非正常关闭 2、Broker因异常导致退出 3、操作系统挂掉 4、服务器不可用 5、磁盘损坏
六、rocketmq如何做流量控制
在生产环境中,我们可能会因为部署rocketmq的服务器配置比较低,导致我们在发送的时候会返回一些被流控的错误信息,那么导致生产者被流控的原因有哪些呢?
1、commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,会被返回流控,默认的设置是:osPageCacheBusyTimeOutMills=1000ms,我们可以适当增加或减少 2、开启transientStorePoolEnable == true,如果此时连接词中的资源不足也会被进行流控 3、waitTimeMillsInSendQueue = 200ms,这个是broker每隔10秒会检查send请求队列头部的等待时间,如果超过了200ms,则直接拒绝。 4、在broker端配置了拒绝send请求,这样也会被流控
以上4点是主要导致生产者被流控的原因,当生产者被流控的时候,消息不会被重试进行投递,因此在真实的业务中,如果返回了被流控的话,需要记录好对应的日志,方便进行补偿。
除了在生产者端被流控,在消费者端也会被流控,造成消费者端被流控的原因有哪些呢?
1、消费者本地缓存消息数太多了,默认是:pullThresholdForQueue = 1000 2、消费者本地缓存的消息大小超了,默认是:pullThresholdSizeForQueue = 100MB 3、消费者本地缓存消息跨度太大了,默认是:consumeConcurrentlyMaxSpan = 2000
消费者被流控的话,就会降低拉取数据的频率,broker堆积的消息就会增多,因此在生产环境下,需要多做好监控。
七、死信队列
死信队列还有一个名词叫死亡信箱。就是我们正常消费一些数据,如果出现一些异常,或者我们不需要立即消费(例如30分钟后取消订单),那么这个时候,我们把消息发送给死信队列,那么此条消息在设定的时候后,才能被消费者所拉取到,进行消费。所以在死信队列也是一个消息队列,只是需要额外设置一个投递时间。
八、rocketmq的tag是什么?
在rocketmq中还有一个tag的概念,即标签,我们可以为每一条消息,在producer发送的时候设置一下tag,这样子在消费者进行消费的时候,可以直接通过tag过滤同一个topic里面的消息。生产者端通过:
message.setTags("TagA")
消费者在订阅的时候设置tag,最终的过滤是由broker进行过滤的。
九、rocketmq可以解决消息重复的问题吗?
在真实的业务场景中,我们会遇到消息重复的问题,这时候我们有没有办法通过rocketmq来解决消息重复的问题呢?答案是不能,即使发送的每一条消息都标记了keys,但是作为一个消息中间件,他是不理解我们的业务需求的,只要能发送成功,他就会把这个消息存储起来,并且推送给消费者。所以在业务上有消费消息重复的问题,我们需要在业务层进行幂等性操作。
十、rocketmq部署部分
1)rocket有哪些角色
NameServer(对应的进程是:NamesrvStartup) BrokerSever(对应的进程是:BrokerStartup)
2)rocketmq部署的时候jvm如何调优:
#设置堆大小一样的值,在很多应用组件里面都会遇到这种 -server -Xms8g -Xmx8g -Xmn4g #设置G1垃圾回收 -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 #开启rolling GC日志 -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m #把GC日志文件重定向到内存文件系统 -Xloggc:/dev/shm/mq_gc_%p.log123
还没有评论,来说两句吧...