在日常的开发过程中我们还经常会涉及到使用 MQ 作为消息中间件的形式来进行异步任务。
对于 mq 来说,不管是哪种 mq 都可能会涉及到 消息重复接收的场景。所以我们在使用消费者的时候,一定要注意消息接收重复的场景。
备注:
1、大多数消息中间件框架都不能承诺消费者不能重复消费数据,只是大部分场景下不会发生重复消费的场景。
所以对于这里的话,我们会涉及到在消费者端做幂等的处理。对于我们来说,消费者做幂等的处理方案如下:
1、生产者发送的每一条消息都需要使用雪花 id 生成一条不重复的 msgId,然后和messagebody 组成新的 msg,发送到 mq。 2、消费者接受到消息之后,先提取 messageid,然后去 redis 中判断是否存在key,示例: redis.exists(${key_msgid}) 3、如果当前的 msgid 存在,则直接丢弃掉此消息。 4、如果当前的 msgid 不存在,则先把 msgid 写入到 redis,例如:redis.set(${key_msgid},"1",10*60*1000),然后再进行消息的处理。 5、如果消息处理失败,则直接把消息的 msgid 重新修改下,然后作为生产者发送到队列中。
根据以上的信息,我们可以合理的处理消息的幂等性。保证消息不会被重复的进行处理。
备注:
1、这里我们消费者是先把 msgid 存储在 redis 之后再进行逻辑处理。 2、在设置 redis 的时候需要指定一个过期时间,一般我们的消费者消费能力都足够强,所以一般我们设置的过期时间停留在5-10分钟即可。 3、消息逻辑处理成功或者失败之后我们都不需要手动的删除 redis 中的 msgid 这个 key,让他自动过期即可。避免过一会出现重复的消息。 4、消息处理失败的时候,我们一般手动 ack,然后重新生成新的 msgid,然后把消息发送到对应的 topic 里面。 5、这里可能存现一种问题,就是程序出现 bug,抛出了异常,导致消息没有处理掉,但是也没有把消息重新发送到 topic 里面去,所以这里我们需要在逻辑根处理的时候进行异常的捕获。然后再这里把消息发送到 topic 或者就是日志。
还没有评论,来说两句吧...