线上kafka消息生产量短时间内大量飙升,消息挤压严重,集群压力剧增,请问如何紧急优化?

提问者:帅平 问题分类:消息队列
线上kafka消息生产量短时间内大量飙升,消息挤压严重,集群压力剧增,发出大量的警告和错误消息:
WARN [ReplicaManager brokerId=1] Produce request with correlation id 12345 from client client1 on partition topic1-15 failed due to request timeout  
ERROR [KafkaRequestHandlerPool-0] Error when handling request: clientId=client2, correlationId=23456, api=PRODUCE  
java.lang.OutOfMemoryError: Java heap space  
WARN [ReplicaFetcherThread-0-3] Error in fetch from broker 3 for partition topic2-5

请问如何紧急优化?
7 个回答
羡风不停留
羡风不停留
高阶优化的方案有:
1、添加broker节点,扩展集群
# 配置新的broker节点  
# server.properties for new brokers  
broker.id=6  
listeners=PLAINTEXT://new-broker:9092  
# 启动新节点  
bin/kafka-server-start.sh config/server.properties
2、将部分热点主题的分区迁移到新节点
# 生成迁移计划,将部分分区迁移到新节点  
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --generate --topics-to-move-json-file hot-topics.json --broker-list "1,2,3,4,5,6"  
# 执行迁移计划  
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --execute --reassignment-json-file migration.json
3、把生产者修改为异步发送模型
在代码里面添加一个queue,消息先写入queue中,再定时拉取一批数据发送过去。
发布于:9小时前 IP属地:
一闪一闪亮晶晶べ
一闪一闪亮晶晶べ
4、根据消息流量特征重新设计分区策略
# 为高流量主题增加分区数  
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic high-traffic-topic --partitions 100  
# 为关键业务主题增加副本因子  
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic critical-topic --replica-assignment "1:2:3:4:5,2:3:4:5:1,..."
#实现自定义分区器,避免热点分区
https://www.80wz.com/zb_users/upload/2025/05/20250513103911174710395179781.txt
发布于:8小时前 IP属地:
怕她脏还是爱她葬
怕她脏还是爱她葬
3、将日志分散到多个物理磁盘,并且为不同类型的主题配置不同的存储策略
# 配置多个日志目录,分散I/O压力  
log.dirs=/data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs,/data4/kafka-logs
# 为高吞吐量主题配置专用存储策略  
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type topics --entity-name high-throughput-topic --alter --add-config "retention.ms=86400000,cleanup.policy=delete"  
# 为需要长期保存的主题配置压缩策略  
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type topics --entity-name archive-topic --alter --add-config "cleanup.policy=compact"
发布于:9小时前 IP属地:
情若相惜
情若相惜
除了紧急处理措施之外,长久的优化措施如下:
1、增加热点主题分区数
# 使用kafka-topics.sh工具增加分区数  
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic hot-topic --partitions 60
2、重平衡分区分配
# 生成分区重分配计划  
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --generate --topics-to-move-json-file topics.json --broker-list "1,2,3,4,5"  
# 执行分区重分配  
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --execute --reassignment-json-file reassignment.json
3、优化日志配置
#修改server.properties配置,优化日志处理
# 增加日志段大小,减少小文件数量  
log.segment.bytes=1073741824  
# 优化日志刷盘策略,避免频繁刷盘  
log.flush.interval.messages=50000  
log.flush.interval.ms=10000  
# 优化日志清理线程数  
log.cleaner.threads=4
4、调整副本同步参数
# 增加副本拉取线程数  
num.replica.fetchers=8  
# 增加副本拉取大小  
replica.fetch.max.bytes=10485760  
# 调整ISR收缩时间,避免频繁的ISR变化  
replica.lag.time.max.ms=30000
发布于:9小时前 IP属地:
不换爱人
不换爱人
5、实施动态配置调整
# 使用kafka-configs.sh工具动态调整配置  
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type brokers --entity-name 1 --alter --add-config "num.io.threads=32,num.network.threads=16"
发布于:9小时前 IP属地:
午后的阳光让人昏沉
午后的阳光让人昏沉
紧急优化的措施可以采取如下几项:
1、增加jvm堆内存
# 修改Kafka启动脚本中的KAFKA_HEAP_OPTS  
export KAFKA_HEAP_OPTS="-Xms16G -Xmx16G"
2、优化GC配置

# 添加以下GC参数  
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
3、增加网络和I/O线程数
#修改server.properties配置:
# 增加网络线程数,从默认的3增加到16  
num.network.threads=16  
# 增加I/O线程数,从默认的8增加到32  
num.io.threads=32  
# 增加请求队列大小  
queued.max.requests=1000
4、调整请求处理参数
# 增加socket接收缓冲区  
socket.receive.buffer.bytes=1048576  
# 增加socket发送缓冲区  
socket.send.buffer.bytes=1048576  
# 增加最大请求大小  
max.request.size=10485760
发布于:9小时前 IP属地:
安若兮
安若兮
5、优化生产者客户端配置
# 增加批处理大小  
batch.size=131072  
# 设置linger.ms使消息有时间进行批处理  
linger.ms=50  
# 启用压缩  
compression.type=lz4  
# 增加生产者缓冲区  
buffer.memory=536870912  
# 增加重试次数和重试间隔  
retries=10  
retry.backoff.ms=100
6、优化消费者客户端配置
# 增加单次拉取大小  
fetch.max.bytes=52428800  
# 增加最大拉取等待时间  
fetch.max.wait.ms=500  
# 增加消费者缓冲区大小  
max.partition.fetch.bytes=10485760
7、临时限流措施
// 在生产者客户端实施限流  
Properties props = new Properties();  
props.put("throttle.rate", "10000"); // 每秒限制10000条消息
发布于:9小时前 IP属地:
我来回答