1 个回答
client端幂等性发送流程:
1、producer send方法将消息加入缓存区。添加时会判断是否需要新建一个 ProducerBatch,这时这个 ProducerBatch 还是没有 PID 和 sequence number 信息的;
2、sender发送线程,在run方法中,判断当前的 PID 是否需要重置。重置的原因是因为:如果有 topic-partition 的 batch 重试多次失败最后因为超时而被移除,这时 sequence number 将无法做到连续,因为 sequence number 有部分已经分配出去,这时系统依赖自身的机制无法继续进行下去(因为幂等性是要保证不丢不重的),相当于程序遇到了一个 fatal 异常,PID 会进行重置。遇到这个问题时是无法保证 exactly once 的(有数据已经发送失败了,并且超过了重试次数);
3、判断是否需要申请 PID,如果需要的话,这里会阻塞直到获取到相应的 PID 信息;
4、判断这个 topic-partition 是否可以继续发送(如果出现前面2中的情况是不允许发送的)、判断 PID 是否有效、如果这个 batch 是重试的 batch,那么需要判断这个 batch 之前是否还有 batch 没有发送完成,如果有,这里会先跳过这个 Topic-Partition 的发送,直到前面的 batch 发送完成。如果这个 ProducerBatch 还没有这个相应的 PID 和 sequence number 信息,会在这里进行相应的设置
5、发送 ProduceRequest 请求
发布于:3个月前 (01-24) IP属地:四川省
我来回答
您需要 登录 后回答此问题!