1 个回答
可以做如下的配置:
Properties props = new Properties();
// 增大单次拉取数据量(默认1MB)
props.put("fetch.max.bytes", 10485760);
// 提升单次poll返回记录数(默认500)
props.put("max.poll.records", 5000);
// 延长会话超时(默认45s,建议2-5分钟)
props.put("session.timeout.ms", 120000);
// 调整心跳间隔(默认3s)
props.put("heartbeat.interval.ms", 5000);
// 消费者并行度建议:分区数 >= 消费者数*并发线程数
ExecutorService threadPool = Executors.newFixedThreadPool(8);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
threadPool.submit(() -> processBatch(records)); // 异步批处理
}
备注:采用批量提交偏移量:enable.auto.commit=false + 手动提交
使用ConsumerRebalanceListener实现动态负载感知
对延迟不敏感的场景可开启fetch.max.wait.ms(默认500ms)
发布于:1个月前 (03-21) IP属地:四川省
我来回答
您需要 登录 后回答此问题!