2 个回答
主要的优化措施有:
1、资源与并行度优化
1、资源与并行度优化
1、下游并行度设置为上游的1.5倍并行度
env.setParallelism(8); // 全局并行度
dataStream.map(...).setParallelism(16); // 算子级并行度
2、每个 TaskManager 的 Slot 数建议与 CPU 核数一致。
taskmanager.numberOfTaskSlots: 4 # 每个 TaskManager 4 Slot
taskmanager.memory.process.size: 8192m # 总内存 8GB
3、增大 TaskManager 堆内存,减少 GC 暂停。
taskmanager.memory.task.heap.size: 4096m
taskmanager.memory.managed.size: 2048m
2、数据方面的优化1、在 KeyBy 前对热点 Key 分桶(如 key + "_" + random.nextInt(10))或者使用 rebalance() 或自定义分区器。避免数据倾斜,
2、网络缓冲区调优
taskmanager.network.memory.buffers-per-channel: 4 # 每个通道缓冲区数
taskmanager.network.memory.floating-buffers-per-gate: 16 # 浮动缓冲区数
3、状态和检查点的优化1、配置增量检查点
RocksDBStateBackend rocksDB = new RocksDBStateBackend("hdfs:///checkpoints", true);
2、配置状态压缩
env.getConfig().useSnapshotCompression(true); // 检查点压缩
3、配置状态ttl
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(6)).build();
stateDescriptor.enableTimeToLive(ttlConfig);
发布于:1个月前 (05-19) IP属地:
4、序列化优化
6、外部存储优化
1、使用Kryo注册自定义序列化器
env.getConfig().registerTypeWithKryoSerializer(User.class, UserKryoSerializer.class);
2、替代 List<Integer>,减少序列化开销。
3、使用 BroadcastState 减少网络传输。
4、用 rescale() 替代 rebalance() 局部重分区。
5、代码优化(略)6、外部存储优化
1、sink批量写入
2、source多分区消费
发布于:1个月前 (05-19) IP属地:
我来回答
您需要 登录 后回答此问题!