Spark—Shuffle理解

Shuffle是mapreduce编程模型的关键步骤,同样也是在程序运行中主要需要进行调优的阶段,因此更好的理解shuffle原理,才能够在程序中找到需要优化的关键点。

在理解Spark shuffle前先理解标准的mapreduce编程模型中使用的shuffle原理,也就是Hadoop中的MapReduce计算框架。

在mapreduce中默认每一个输入到reducer中的数据都根据key已经排好序了,系统将map端的输出进行排好序,reducer端拉取到数据这一过程就是shuffle。

map函数输出的结果,并不是简单的写入磁盘,针对每一个map任务都有一个环形缓冲区用于存储结果,在缓冲区被填满时溢写磁盘,在这之前要根据数据最终要传入的reducer把数据划分成对应的partition,在每个partition中再根据key进行排序。如果此时我们有指定combine函数,那么他们将会把拍好序的结果执行combine操作,之后再写入磁盘,否则将直接写入磁盘。

此时来到reducer端,map的输出数据在map任务的tasktracker的本地磁盘,那么将根据tasktracker分区文件运行reducer任务,这里需要注意的是,每个reducer任务
是在map任务运行完成后才开始运行的,也就是说每一个分区文件都已经完全生成好了(这也是mapreduce效率较低的其中原因之一),reducer任务启动同时开始复制线程进行查找map输出文件的并进行复制(如何查找到map输出文件的位置?:每个map输出完成后都会向application master会报信息,这其中包括了输出文件的位置。)。复制好map输出结果后,reducer开始合并阶段,将每一个map结果进行合并,同时会保持map已经做好的顺序进行排序。

在spark中的shuffle处理过程同样是基于迭代的方式,早期版本中spark没有采用hadoop在reducer获取前全部排序的方式。而是提供了基于hash的shuffle操作(HashShuffleManager)。

不过这种方式存在一个较大的缺点就是在ShuffleMapTask数据生成时会产生大量的小文件,以及占用较大的内存缓存开销。为了解决这个问题,spark在版本演化的过程中几次优化了Shuffle机制。

shuffle写操作

基于hash的普通shuffle写操作

早期版本中spark的每个ShuffleMapTask会根据ResultTask也就是reducetask的数量创建bucket,bucket数量就是ShuffleMapTask*ResultTask的数量,在当前的机制中bucket是一个抽象概念,每个bucket对应一个生成文件。不过这种方式存在较大的问题。

  • 在整个ShuffleMapTask生成bucket后导致小数据量的大量的文件产生,根据经验,磁盘的随机读取性能要照比顺序读差,同时每次ResultTask端拉取一个文件都需要进行一次网络io同样也是耗时操作(具体将会在后边shuffle读操作分析)。
  • 有文件产生那么就意味着内存中维护了相同数量的文件句柄,这些对象全部都分配在对内存中,可能导致oom。

假设当前存在四个ShuffleMapTask任务,但是只提供了两个Executor的CPU core,如上图所示,那么在任务运行中假设先执行了1和3,此时根据ResultTask生成了六个分区数据文件,而写个文件中的数据并不是有序的,执行完成后,在执行2和4 task,此时又生成了六个分区数据文件,至此整个任务一共产生了ShuffleMapTask*ResultTask个分区数据文件。

基于hash的优化shuffle写操作

根据普通的hash shuffle,可以看到其产生大量小文件的缺点,于是提供了优化后的hash shuffle,在新的策略中提出了consolidate机制,减少了一部分小文件的产生,不过这种策略还是不够完美。

consolidate机制,简单来说就是复用buffer缓冲区,合并一些key相同的数据到一个分区(注意此时合并后文件依然无序的),然后对同一个分区生成一份文件,这样就能够缓解普通hash shuffle出现的大量小文件的问题。

具体如何优化的呢,假设当前存在六个ShuffleMapTask,提供了两个cpu core,同时存在三个ResultTask(三个分区)。

根据spark任务的分时执行,可以假设第一个和第四个任务先执行,这样就先生成了六个bucket,每个cpu core对应三个,也就是说会生成六个文件。

此时在执行2和5任务,不过这次不会再生成新的bucket也就是说不会有新的文件生成,他们会复用内存中已经存在的bucket,继续向其对应的分区进行追加。同样3和6任务也是同样向其追加,当整个ShuffleMapTask都执行完成后,可以看到整体的文件数量变少了很多,新的文件数量可以根据core*ResultTask计算。

不过这种策略也不够完美,当ResultTask分区过多时,必有有1000个ResultTask那么也会出现较多的小文件,所以也就引入了新的shuffle机制。

当前spark版本中已经完全抛弃掉了HashShuffleManager

基于sort的普通shuffle写操作

为了缓解hash shuffle产生大量文件,以及文件句柄浪费的大量内存。spark在1.1版本中借鉴Hadoop Shuffle中的排序机制,并且对生成的结果使用了合并、索引机制。

具体是如何操作的呢?在hash shuffle中每一个bucket使用hashmap存储,他是无序的,而在当前机制中map中使用key进行分区,map的value使用Array来进行存储每一个数据,当array的数据量达到一定量时就会被写入磁盘,而在溢写磁盘之前先对该部分数据进行排序。
而排序后的文件会按照默认的数据量分批写入磁盘(5w),每次分批写入一个磁盘都会产生一个临时文件,最后当这个任务完成后会将所有的文件merge,合并为一个文件,同时对每一个临时文件的起始位置信息,再创建一个索引文件,这样就能够在拉取时找到指定的数据段位置。

这样一来ShuffleMapTask端产生的文件数量将变得更少,也就是有多少个cpu core就会有多少个文件生成。

优点:

  • 小文件明显变少了,一个task只生成一个file文件
  • file文件整体有序,加上索引文件的辅助,查找变快,虽然排序浪费一些性能,但是查找变快很多

基于bypass的sort shuffle写操作

该策略与普通的sort shuffle机制只是略去了ShuffleMapTask端的排序过程,其他过程没有变化,最后都会合并成一个文件。

该策略的运行条件:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值
  • 不是聚合类的shuffle算子(比如reduceByKey)

在shuffleMapTask数量小于默认值200时,启用bypass模式的sortShuffle(原因是数据量本身比较少,没必要进行sort全排序,因为数据量少本身查询速度就快,正好省了sort的那部分性能开销。)

shuffle的读操作

前边侧重介绍ShuffleMapTask端结果生成的过程,也就是所谓的map端写数据过程。在ResultTask端进行读取时整体流程是相同的,都是ResultTask端去ShuffleMapTask端拉取对应的数据到本地,然后开始执行任务。
这一过程中针对不同的shuffle写机制具体实现有不同的疑问:

  • Shuffle写基于hash和sort两种方式,他们对应的读去方式如何?
  • 不同的机制生成的数据,ResultTask端是如何定位到自己需要的数据所在的位置(节点、Executor、数据块位置)?

  1. spark-env在初始化阶段会对ShuffleManager、BlockManager及MapOutPutTacker进行实例化,ShuffleManager在当前版本中默认只支持SortShuffleManager(Hash已经移除),在Shuffle读中实例化BlockStoreShuffleReader,在HashShuffleManager中持有的是FileShuffleBlockResolver,而在SortShuffleManager中持有的时IndexShuffleBlockResolver,由此可以知道,在Spark的Shuffle中根据不同的写入方式采用相同的读取方式,读取的数据都放在hash表中进行存储。
  2. 在每次ShuffleMapTask生成数据后都会调用MapOutPutTacker将MapStatus(包含文件信息)发送给MapOutPutTackerMaster,所以在读的阶段Executor的MapOutPutTacker会向MapOutPutTackerMaster请求,获取到对应的MapStatus,从而获取到执行结果的元信息,由此可以知道ResultTask是通过向MapOutPutTackerMaster请求从而找到的上游Shuffle写结果的位置信息。
  3. 在获取到位置信息后会进行判断,当前结果是在本地节点还是远程节点,如果在本地那么将直接使用BlockManager进行读取数据;如果不在本地那么将会通过netty进行远程网络读取,默认spark中会采用五个线程进行并行读取。
  4. 关于读取的细节,在HashShuffleManager时是直接会找到对应的文件进行整体读取,不过当前版本使用SortShuffleManager,每个执行节点只有一个数据文件,那么就需要从MapStatus中获取到对应数据文件的index(索引)数据,然后在对应节点通过索引查找到数据块。
  5. 读取数据后,判断ShuffleDependency是否定义了聚合操作,如果存在那么将根据key进行聚合。若在ShuffleMapTask中已经进行了合并,那么将在其基础上进行聚合,合并完成后,将使用外部排序对数据进行排序。形成最后的MapPartitionRDD。

总结

至此所有关于Spark Shuffle的原理分析完成,其中只需要重点关注SortShuffle这一种机制即可,因为在Spark已经完全移除了HashShuffle,并且将Tungsten-Sort合并入SortShuffle,我们需要清楚Shuffle机制的作用是什么,使用SortShuffle能够带来什么好处,以及Shuffle的读写过程中具体的流程如何,这样方便在后续进行Shuffle优化时能够准确优化对应的细节。


以下节选自 [Spark Shuffle的技术演进]
作者:LeonLu
链接:https://www.jianshu.com/p/4c5c2e535da5

Spark Shuffle具体实现的演进

在具体的实现上,Shuffle经历了Hash、Sort、Tungsten-Sort三阶段:

Spark 0.8及以前 Hash Based Shuffle

在Shuffle Write过程按照Hash的方式重组Partition的数据,不进行排序。每个map端的任务为每个reduce端的Task生成一个文件,通常会产生大量的文件(即对应为M*R个中间文件,其中M表示map端的Task个数,R表示reduce端的Task个数),伴随大量的随机磁盘IO操作与大量的内存开销。
Shuffle Read过程如果有combiner操作,那么它会把拉到的数据保存在一个Spark封装的哈希表(AppendOnlyMap)中进行合并。
在代码结构上:

org.apache.spark.storage.ShuffleBlockManager负责Shuffle Write
org.apache.spark.BlockStoreShuffleFetcher负责Shuffle Read
org.apache.spark.Aggregator负责combine,依赖于AppendOnlyMap

Spark 0.8.1 为Hash Based Shuffle引入File Consolidation机制

通过文件合并,中间文件的生成方式修改为每个执行单位(一个Executor中的执行单位等于Core的个数除以每个Task所需的Core数)为每个reduce端的任务生成一个文件。最终可以将文件个数从MR修改为EC/T*R,其中,E表示Executor的个数,C表示每个Executor中可用Core的个数,T表示Task所分配的Core的个数。
是否采用Consolidate机制,需要配置spark.shuffle.consolidateFiles参数

Spark 0.9 引入ExternalAppendOnlyMap

在combine的时候,可以将数据spill到磁盘,然后通过堆排序merge

Spark 1.1 引入Sort Based Shuffle,但默认仍为Hash Based Shuffle

在Sort Based Shuffle的Shuffle Write阶段,map端的任务会按照Partition id以及key对记录进行排序。同时将全部结果写到一个数据文件中,同时生成一个索引文件,reduce端的Task可以通过该索引文件获取相关的数据。
在代码结构上:

从以前的ShuffleBlockManager中分离出ShuffleManager来专门管理Shuffle Writer和Shuffle Reader。两种Shuffle方式分别对应
org.apache.spark.shuffle.hash.HashShuffleManagerorg.apache.spark.shuffle.sort.SortShuffleManager

可通过spark.shuffle.manager参数配置。两种Shuffle方式有各自的ShuffleWriter:

  • org.apache.spark.shuffle.hash.HashShuffle
  • org.apache.spark.shuffle.sort.SortShuffleWriter;
  • 共用一个ShuffleReader,即org.apache.spark.shuffle.hash.HashShuffleReader
    org.apache.spark.util.collection.ExternalSorter实现排序功能。可通过对spark.shuffle.spill参数配置,决定是否可以在排序时将临时数据Spill到磁盘。

Spark 1.2 默认的Shuffle方式改为Sort Based Shuffle

Spark 1.4 引入Tungsten-Sort Based Shuffle

将数据记录用序列化的二进制方式存储,把排序转化成指针数组的排序,引入堆外内存空间和新的内存管理模型,这些技术决定了使用Tungsten-Sort要符合一些严格的限制,比如Shuffle dependency不能带有aggregation、输出不能排序等。由于堆外内存的管理基于JDK Sun Unsafe API,故Tungsten-Sort Based Shuffle也被称为Unsafe Shuffle。
在代码层面:

新增org.apache.spark.shuffle.unsafe.UnsafeShuffleManager
新增org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter(用java实现)
ShuffleReader复用HashShuffleReader

Spark 1.6 Tungsten-sort并入Sort Based Shuffle

由SortShuffleManager自动判断选择最佳Shuffle方式,如果检测到满足Tungsten-sort条件会自动采用Tungsten-sort Based Shuffle,否则采用Sort Based Shuffle。
在代码方面:

UnsafeShuffleManager合并到SortShuffleManager
HashShuffleReader 重命名为BlockStoreShuffleReader,Sort Based Shuffle和Hash Based Shuffle仍共用ShuffleReader。
Spark 2.0 Hash Based Shuffle退出历史舞台

从此Spark只有Sort Based Shuffle。