5 个回答
consumer端的措施有:
关闭自动 offset,手动提交offset
设置 enable.auto.commit = false , 默认值true,自动提交
使用kafka的Consumer的类,用方法consumer.commitSync()提交
或者使用spring-kafka的 Acknowledgment类,用方法ack.acknowledge()提交(推荐使用)
发布于:6个月前 (07-28) IP属地:四川省
producer端的措施有:如果是同步模式的话可以设置如下的信息:
producer.type=sync
request.required.acks=1
副本数量>=2
增加重试次数
更改调用方式;Producer.send(msg, callback)
发布于:6个月前 (07-28) IP属地:四川省
如果是异步方式的话,可以设置如下信息:
通过buffer来进行控制数据的发送,有两个值来进行控制,缓冲时间阈值与缓冲消息的数量阈值,如果buffer满了数据还没有发送出去,如果设置的是立即清理模式,风险很大,一定要设置为阻塞模式
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
queue.buffering.max.ms=5000
通过buffer来进行控制数据的发送,有两个值来进行控制,缓冲时间阈值与缓冲消息的数量阈值,如果buffer满了数据还没有发送出去,如果设置的是立即清理模式,风险很大,一定要设置为阻塞模式
发布于:6个月前 (07-28) IP属地:四川省
broker端的措施有:
Topic 副本因子个数:replication.factor >= 3
同步副本列表(ISR):min.insync.replicas = 2
禁用unclean选举:unclean.leader.election.enable=false
replication.factor = min.insync.replicas +1
发布于:6个月前 (07-28) IP属地:四川省
我来回答
您需要 登录 后回答此问题!