如何提高Flink的吞吐量?

提问者:帅平 问题分类:面试刷题
如何提高Flink的吞吐量?
2 个回答
自然萌鹿鹿
自然萌鹿鹿
主要的优化措施有:
1、资源与并行度优化
1、下游并行度设置为上游的1.5倍并行度
env.setParallelism(8); // 全局并行度
dataStream.map(...).setParallelism(16); // 算子级并行度
2、每个 TaskManager 的 Slot 数建议与 CPU 核数一致。
taskmanager.numberOfTaskSlots: 4  # 每个 TaskManager 4 Slot
taskmanager.memory.process.size: 8192m  # 总内存 8GB
3、增大 TaskManager 堆内存,减少 GC 暂停。
taskmanager.memory.task.heap.size: 4096m
taskmanager.memory.managed.size: 2048m
2、数据方面的优化
1、在 KeyBy 前对热点 Key 分桶(如 key + "_" + random.nextInt(10))或者使用 rebalance() 或自定义分区器。避免数据倾斜,
2、网络缓冲区调优
taskmanager.network.memory.buffers-per-channel: 4  # 每个通道缓冲区数
taskmanager.network.memory.floating-buffers-per-gate: 16  # 浮动缓冲区数
3、状态和检查点的优化
1、配置增量检查点
RocksDBStateBackend rocksDB = new RocksDBStateBackend("hdfs:///checkpoints", true);
2、配置状态压缩
env.getConfig().useSnapshotCompression(true);  // 检查点压缩
3、配置状态ttl
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(6)).build();
stateDescriptor.enableTimeToLive(ttlConfig);
发布于:1个月前 (05-19) IP属地:
我怕时间不够
我怕时间不够
4、序列化优化
1、使用Kryo注册自定义序列化器
env.getConfig().registerTypeWithKryoSerializer(User.class, UserKryoSerializer.class);
2、替代 List<Integer>,减少序列化开销。
3、使用 BroadcastState 减少网络传输。
4、用 rescale() 替代 rebalance() 局部重分区。
5、代码优化(略)
6、外部存储优化
1、sink批量写入
2、source多分区消费
发布于:1个月前 (05-19) IP属地:
我来回答