线上kafka消息生产量短时间内大量飙升,消息挤压严重,集群压力剧增,请问如何紧急优化?
线上kafka消息生产量短时间内大量飙升,消息挤压严重,集群压力剧增,发出大量的警告和错误消息:
请问如何紧急优化?
WARN [ReplicaManager brokerId=1] Produce request with correlation id 12345 from client client1 on partition topic1-15 failed due to request timeout
ERROR [KafkaRequestHandlerPool-0] Error when handling request: clientId=client2, correlationId=23456, api=PRODUCE
java.lang.OutOfMemoryError: Java heap space
WARN [ReplicaFetcherThread-0-3] Error in fetch from broker 3 for partition topic2-5
请问如何紧急优化?
发布于:9小时前 IP属地:
7 个回答
高阶优化的方案有:
1、添加broker节点,扩展集群
1、添加broker节点,扩展集群
# 配置新的broker节点
# server.properties for new brokers
broker.id=6
listeners=PLAINTEXT://new-broker:9092
# 启动新节点
bin/kafka-server-start.sh config/server.properties
2、将部分热点主题的分区迁移到新节点# 生成迁移计划,将部分分区迁移到新节点
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --generate --topics-to-move-json-file hot-topics.json --broker-list "1,2,3,4,5,6"
# 执行迁移计划
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --execute --reassignment-json-file migration.json
3、把生产者修改为异步发送模型在代码里面添加一个queue,消息先写入queue中,再定时拉取一批数据发送过去。
发布于:9小时前 IP属地:
4、根据消息流量特征重新设计分区策略
# 为高流量主题增加分区数
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic high-traffic-topic --partitions 100
# 为关键业务主题增加副本因子
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic critical-topic --replica-assignment "1:2:3:4:5,2:3:4:5:1,..."
#实现自定义分区器,避免热点分区
https://www.80wz.com/zb_users/upload/2025/05/20250513103911174710395179781.txt
发布于:8小时前 IP属地:
3、将日志分散到多个物理磁盘,并且为不同类型的主题配置不同的存储策略
# 配置多个日志目录,分散I/O压力
log.dirs=/data1/kafka-logs,/data2/kafka-logs,/data3/kafka-logs,/data4/kafka-logs
# 为高吞吐量主题配置专用存储策略
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type topics --entity-name high-throughput-topic --alter --add-config "retention.ms=86400000,cleanup.policy=delete"
# 为需要长期保存的主题配置压缩策略
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type topics --entity-name archive-topic --alter --add-config "cleanup.policy=compact"
发布于:9小时前 IP属地:
除了紧急处理措施之外,长久的优化措施如下:
1、增加热点主题分区数
1、增加热点主题分区数
# 使用kafka-topics.sh工具增加分区数
bin/kafka-topics.sh --bootstrap-server broker1:9092 --alter --topic hot-topic --partitions 60
2、重平衡分区分配# 生成分区重分配计划
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --generate --topics-to-move-json-file topics.json --broker-list "1,2,3,4,5"
# 执行分区重分配
bin/kafka-reassign-partitions.sh --bootstrap-server broker1:9092 --execute --reassignment-json-file reassignment.json
3、优化日志配置#修改server.properties配置,优化日志处理
# 增加日志段大小,减少小文件数量
log.segment.bytes=1073741824
# 优化日志刷盘策略,避免频繁刷盘
log.flush.interval.messages=50000
log.flush.interval.ms=10000
# 优化日志清理线程数
log.cleaner.threads=4
4、调整副本同步参数# 增加副本拉取线程数
num.replica.fetchers=8
# 增加副本拉取大小
replica.fetch.max.bytes=10485760
# 调整ISR收缩时间,避免频繁的ISR变化
replica.lag.time.max.ms=30000
发布于:9小时前 IP属地:
5、实施动态配置调整
# 使用kafka-configs.sh工具动态调整配置
bin/kafka-configs.sh --bootstrap-server broker1:9092 --entity-type brokers --entity-name 1 --alter --add-config "num.io.threads=32,num.network.threads=16"
发布于:9小时前 IP属地:
紧急优化的措施可以采取如下几项:
1、增加jvm堆内存
1、增加jvm堆内存
# 修改Kafka启动脚本中的KAFKA_HEAP_OPTS
export KAFKA_HEAP_OPTS="-Xms16G -Xmx16G"
2、优化GC配置
# 添加以下GC参数
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent"
3、增加网络和I/O线程数#修改server.properties配置:
# 增加网络线程数,从默认的3增加到16
num.network.threads=16
# 增加I/O线程数,从默认的8增加到32
num.io.threads=32
# 增加请求队列大小
queued.max.requests=1000
4、调整请求处理参数# 增加socket接收缓冲区
socket.receive.buffer.bytes=1048576
# 增加socket发送缓冲区
socket.send.buffer.bytes=1048576
# 增加最大请求大小
max.request.size=10485760
发布于:9小时前 IP属地:
5、优化生产者客户端配置
# 增加批处理大小
batch.size=131072
# 设置linger.ms使消息有时间进行批处理
linger.ms=50
# 启用压缩
compression.type=lz4
# 增加生产者缓冲区
buffer.memory=536870912
# 增加重试次数和重试间隔
retries=10
retry.backoff.ms=100
6、优化消费者客户端配置# 增加单次拉取大小
fetch.max.bytes=52428800
# 增加最大拉取等待时间
fetch.max.wait.ms=500
# 增加消费者缓冲区大小
max.partition.fetch.bytes=10485760
7、临时限流措施// 在生产者客户端实施限流
Properties props = new Properties();
props.put("throttle.rate", "10000"); // 每秒限制10000条消息
发布于:9小时前 IP属地:
我来回答
您需要 登录 后回答此问题!