Spark shuffle 原理分析

整体上 Spark Shuffle 整体上分为两个阶段:Shuffle Write 和 Shuffle Read。中间经历了比较大的变动,简述如下:

  • 0.8 及之前,采用的是 hash shuffle,后来引入了 consolidation 机制。
  • 1.1 加入 sort shuffle 的选项,但默认使用 hash shuffle。
  • 1.2 开始默认使用 sort shuffle。
  • 1.4 引入了 tungsten-sort shuffle,基于 unsafe 来操控内存空间。
  • 1.6版本将 tungsten-sort shuffle 与 sort shuffle 合并,由 Spark 自动决定采用哪一种方式。
  • 2.0版本之后,hash shuffle 机制被删除,只保留 sort shuffle 机制至今。

Write 阶段大体经历排序和归并以及可能的预聚合,最终每个 write task 会产生数据和索引两个文件。其中,数据文件会按照分区进行存储,即相同分区的数据在文件中是连续的,而索引文件记录了每个分区在文件中的起始和结束位置。而对于 Shuffle Read, 首先可能需要通过网络从各个 Write 任务节点获取给定分区的数据,即数据文件中某一段连续的区域,然后经过排序和归并等过程,最终形成计算结果。

HashShuffle

先来看一下最早的 HashShuffle,在 shuffle write 阶段,对相同的 key 执行 hash 算法,将相同 key 写入到同一个磁盘文件中,每一个磁盘文件都只属于下游 stage 的一个 task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲写满之后,才会溢写到磁盘文件中去。下一个 stage 的 task 有多少个,当前 stage 的每个 task 就要创建多少份磁盘文件。

shuffle write 阶段每一个 task 会给下游 stage 的每个 task 都创建一个磁盘文件,所以每一个 shuffle read task 只要从上游 stage 的所有 task 所在节点上,拉取属于自己的那一个磁盘文件即可。shuffle read 的拉取过程是一边拉取一边进行聚合的。每个 shuffle read task 都会有一个自己的 buffer 缓冲,每次最多只能拉取与 buffer 缓冲相同大小的数据,然后通过内存中通过一个 map 进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作,直到最后将所有数据到拉取完,并得到最终的结果。

这样一种 Shuffle 机制,最大的问题就是会产生大量的临时文件,比如当前 stage 有 100 个 task,下一个 stage 有 100 个 task,总共就会创建 10000 个 task。后来 spark 提供了一个 spark.shuffle.consolidateFiles 参数,设置为 true 即可开启优化机制减少文件数量,

开启 consolidate 机制之后,在 shuffle write过程中,此时会出现 shuffleFileGroup 的概念,每个 shuffleFileGroup 会对应一批磁盘文件,每个 Executor 的 cpu 执行完一批 task,接着执行下一批 task 时,下一批 task 就会复用之前已有的shuffleFileGroup。此时 task 会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。假设第一个 stage 有 100 个 task,总共还是有 10 个 Executor,每个 Executor 执行 10 个task,第二个 stage 有 100 个 task。此时每一个 Executor 只会写 100 个文件,总共只会产生 1000 个文件。

SortShuffle

SortShuffle 数据会先写入一个内存数据结构中,此时根据不同的 shuffle 算子,可能选用不同的数据结构。如果是 reduceByKey 这种聚合类的 shuffle 算子,那么会选用 Map 数据结构,一边通过 Map 进行聚合,一边写入内存,如果是 join 这种普通的 shuffle 算子,那么会选用 Array 数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。在溢写到磁盘文件之前,会先根据 key 对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件,默认的 batch 数量是 10000 条。一个 task 将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是 merge 过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。

ByPassShuffle 其实和未经优化的 HashShuffle 差不多,都会产生大量的文件,只是 ByPassShuffle 最终会做一个 merge。每个 shuffle write task 都会为每个 shuffle write task 创建一个临时磁盘文件,并将数据按 key 进行 hash 将 key 写入对应的磁盘文件之中。写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,同时也会创建一个单独的索引文件。相比 SortShuffle 省去了排序的开销。

具体使用哪一个种 Shuffle 可参考 o.a.s.shuffle.sort.SortShuffleManager 的 registerShuffle 实现方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
  1. 如果满足 shouldBypassMergeSort 则会使用 BypassMergeSortShuffleWriter。需要满足分区数量小于 spark.shuffle.sort.bypassMergeThreshold (默认是 200)以及不需要进行 map-side 预聚合,比如 reduceByKey 就需要先进行预聚合。
  2. 使用的序列化器支持序列化对象的重定位比如 KryoSerializer,shuffle 依赖中完全没有聚合操作,并且分区数不大于常量MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE的值(最大分区ID号+1,即2^24=16777216),则使用 UnsafeShuffleWriter。
  3. 否则使用 SortShuffleWriter。

参数调优

  1. 并行度调优,在大数据量会出现 oom 的情况,在无法增加内存加机器的情况下,可以尝试提高 task 的数量减少每一个 task 处理的数据量。设置 spark.sql.shuffle.partitions 和 spark.default.parallelism。
  2. spark.executor.cores 一定要结合上面的并行度和 executor 内存合理设置,为了避免 oom 并不一定是越高越好。
  3. 如上面所分析的,将数据写到磁盘文件之前,会先写入 buffer 缓冲中,待缓冲写满之后,才会溢写到磁盘。spark.shuffle.file.buffer 就是用来控制这个缓冲区大小的,默认为 32k,在内存资源足够的情况下建议调大这个参数减少磁盘 io。
  4. spark.reducer.maxSizeInFlight,用于设置 shuffle read task 的缓冲区大小,这个缓冲区决定了每次能够拉取多少数据,默认为 96m,可根据并行度和内存大小合理配置。
  5. spark.shuffle.io.maxRetries,默认为 3,shuffle read task 从 shuffle write task 所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。建议在大数据量情况下调大这个参数,避免网络和 gc 导致 job 跑失败。spark.shuffle.io.retryWait 用于设置重试间隔,默认为 5s。
  6. unsafe 相关的调优-todo