spark shuffle 过程是什么?

提问者:帅平 问题分类:面试刷题
spark shuffle 过程是什么?
3 个回答
此生只爱你
此生只爱你
spark1.1 之前只有HashShuffle, 并在0.8.1版本时对其进行了优化,引入了优化后的HashShuffle, 实现了shuffle consolidate机制(文件合并机制). 在spark1.1之后引入了SortShuffle, 又在1.4版本中引入了 tungsten sort shuffle. 在1.6版本中删除了hash中的shuffle consolidate机制, 此后在2.0以后的版本中只支持sort, tungsten sort shuffle两种方式.
发布于:3个月前 (01-22) IP属地:四川省
肺少女
肺少女
SortShuffleManager的过程如下:
1、普通运行机制,数据根据不同的shuffle算子选择不同的数据结构, 如果是reduceByKey类的聚合算子会选用Map数据结构,如果是join类的普通算子则会选用Array数据结构. 数据先写入内存数据结构,达到阈值时, 就会溢写到磁盘, 在溢写磁盘之前, 会先根据key 对内存中的数据进行排序, 排序过后会分批写入磁盘, 默认batch数量是1w条. 批次写可以减少磁盘的IO次数. 一个task处理数据过程中会发生多次磁盘溢写操作, 进而有多个临时文件, 最后这些临时文件会被merge成一个磁盘文件,此外还会写一份对应的索引文件,其中标识了下游多个task的数据在文件中的start offset和end offset.
2、bypass 运行机制, 当reduce端task比较少时,基于hash Shuffle的实现明显比基于sort shuffle 的实现快. 因此sort shuffle 实现了一个带hash风格的回退方案,即 bypass 机制, bypass 运行机制触发条件: a. shuffle map task 数量小于 spark.shuffle.sort.bypassMergeTrheshold=200(默认)参数的值. b.不是聚合类shuffle算子, bypass 运行机制下每个上游task的数据写入流程同 hash shuffle 机制, 写入磁盘时不会进行排序操作, 不同的是bypass机制会将所有的临时文件合并成一个磁盘文件, 并会创建一个索引文件. 因为对比hash shuffle多了磁盘合并操作, 所以性能更好.
3、tungsten sort shuffle 运行机制, 其针对普通sort机制, 优化了排序过程. tungsten sort排序时, 并不是排序内容本身,而是对内容序列化后的字节数组的指针进行排序. 因为直接对序列化后的二进制数据进行排序, 没有序列化与反序列化的过程, 从而大大降低了内存消耗和GC.
发布于:3个月前 (01-22) IP属地:四川省
自愈
自愈
HashShuffleManager的过程如下:
1、shuffle write: 对key进行hash, 将相同的key写入到同一个磁盘文件中.数据写入磁盘前先写入内存缓冲, 当缓冲填满后再溢写到磁盘中.在 Spark 0.8.1后,可以通过配置属性 spark.shuffie.consolidateFiles=true开启shuffle consolidate机制, 此时就会出现shuffleFlieGroup的概念, shuffleFlieGroup对应一批磁盘文件,shuffleFlieGroup第一次创建后,所有的task都会复用已创建的shuffleFlieGroup,包括其中的磁盘文件,不会将数据写入新的磁盘文件中. 如此, 多个task的磁盘文件进行了一定程度上的合并,从而大幅减少了磁盘文件的数量, 提升了shuffle write的性能.
2、shuffle read: 各个节点上拉取数据, 一边拉取一边聚合, 每个shuffle read task都有一个自己的buffer缓冲区,每次只拉取与buffer缓冲大小相同的数据, 然后进行聚合.
发布于:3个月前 (01-22) IP属地:四川省
我来回答