6

VLDB'20 Magnet: 领英Spark Shuffle解决方案

 2 years ago
source link: https://zhuanlan.zhihu.com/p/397391514
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

VLDB'20 Magnet: 领英Spark Shuffle解决方案

阿里巴巴集团 分布式数据库研发

本文 Magnet: Push-based Shuffle Service for Large-scale Data Processing 介绍的是领英(LinkedIn)的Shuffle 服务系统 magnet。

领英(LinkedIn)对Apache Spark进行了大规模商用,用于内部的机器学习平台、商业分析系统等等。每天都有PB级别的数据跑在上万节点构成的Spark 集群上。这其中,Spark Shuffle 造成了严重的性能瓶颈;Shuffle 的性能问题也让集群难以 scale out。magnet解决了shuffle 导致的scalability问题,并且让Spark集群端到端处理性能提升了30%。

前置知识:Spark官方优化

这篇文章是关于领英内部的Spark shuffle优化方案的。如果对于Spark shuffle本身没有深入了解,其实很难Get到文章的点,毕竟本文的shuffle优化不是一个孤立的问题,而是一脉相承自Spark 官方的优化历程。

从官方时间线来看,Spark shuffle 的演进至少有以下几个milestone:

  1. 初版使用hash-based shuffle,特点是当DAG复杂时,文件数量会爆炸;
  2. Spark 0.8 引入File Consolidation机制,根据并行度来池化文件,减少文件数量;
  3. Spark 1.1 引入sort-based shuffle,同一个map task只输出一个shuffle文件+index文件,该shuffle方式在后续版本中成为默认方式;
  4. Spark 1.4 Tungsten-Sort Based Shuffle 来优化shuffle时排序的性能

现在一个Spark DAG拉起来了:

上游的map task需要把数据传输给下游的reduce task,每个reduce task 数据集对应一个partition id,我们需要把每个map task输出的数据根据partition id 分散和发送出去。

Spark 0.8以前的解决方式是基于哈希:现在上游有M个map task,下游有R个reduce task,首先在map task所在节点上创建 M * R个本地文件,map task 输出的数据通过hash确定对应输出的文件位置,写入到本地文件中;每个reduce task 通过Driver 得知所需文件的位置并逐个拉取。如下图所示:

这种shuffle方式很快出现了问题:当DAG非常复杂,map和reduce task的数量激增,shuffle所需的文件数量会爆炸;单个shuffle文件不会很大,海量的小文件写入其实是随机IO,这也导致了IO性能很低。

从Spark 0.8开始,官方引入了File Consolidation机制来优化hash shuffle。假设节点上有C个cpu 核数,每个Task 占用T个核,那么并行度为 C / T;假设reduce task数目为R,那么只需为每个并行度池化一个文件组,其中包含R个文件用于数据输出,文件池总大小为C / T * R。shuffle过程如下图所示:

File Consolidation 机制让shuffle文件数量下降了1~2个数量级,但这还不够,Spark 1.1引入了sort-based shuffle,直接把文件数量降低到了常量级别(并行度 * 2)。

现在上游有M个map task,每个task占用T个核数,共有C个cpu核数;map task 将数据写入AppendOnlyMap并根据{partitionId, hash(key)) 进行排序,排序结果能保证属于同一个partition id的数据在物理存储上保持相邻,称作block。最终我们可以把排序结果保存在一个文件中,并且用一个index文件来记录每个block在文件中的偏移位置。

Spill技术广泛运用在大数据引擎中,用于应对内存不足的情况;Spark sort-base shuffle也利用了Spill Sort来完成输出结果的排序。

整个过程如下图所示:

Spark 1.4 引入了 Tungsten-Sort Based Shuffle,其实是对sort-based shuffle的性能优化,包括:

  1. 利用sun.misc.Unsafe直接操作原始数据,不需要事先反序列化;
  2. 使用cache-efficient的排序数据,每个sort key 占用8bytes,排序效率更高;
  3. 由于排序不需要反序列化,spill效率更高了。原先需要deserialize->compare->serialize->spill,现在可以直接将原始字节数据spill到磁盘。

领英Shuffle解决方案

和官方版本一样,领英使用Spark sort-based方式来进行shuffle。在每天PB级别的workload上,领英工程师发现了这样几个问题:

1. 磁盘IO问题

上节介绍Spark sort-based shuffle提到,同一个Task输出两个文件,包括数据文件shuffle file,由一个个Block组成;index file,标识block在文件中的位置。下游Executor获取数据位置后,根据Index拉取block数据。

下图展示了领英某天真实的block统计数据,可以看到block大多数容量为KB级别,并且block size越小时,shuffle拉取数据的延迟就越高,这其中有两个主要原因:

  1. 出于性价比考虑,主要使用的是机械硬盘,海量小文件读会使得IO效率低下;
  2. 当shuffle数据量线性增长时,伴随的block数量往往会成平方型增长,论文Riffle: optimized shuffle service for large-scale data analytics 已有证明。

2. 连接可靠性问题

由于上游多个Executor共享同一个Spark ESS(spark external shuffle service),下游同一个Executor内共享一个连接,假设ESS服务数量为S,下游Executor数量为E,那么一次shuffle需要维护的连接数量为S * E。

在一次复杂的分析任务里,S和E的数量都能达到1000级别,一次需要维护数百万条连接,这就对连接的可靠性带来了巨大的挑战;一旦某个下游Executor与上游ESS连接异常断开,整个stage中的reduce task都需要失败重试。

Magnet shuffle service

Magnet Shuffle Service是领英引入的一种spark外部shuffle服务,用于优化磁盘io效率,减少ESS连接失败,提升连接可靠性,解决数据倾斜和task stragglers,并且不会带来过多的cpu和内存消耗。

magnet主要结构和流程:

  1. Spark driver组件,协调整体的shuffle操作
  2. map任务的shuffle过程,增加了一个额外的操作push-merge,将数据推到远程shuffle服务上;
  3. magnet shuffle service是一个强化版的ESS。隶属于同一个shuffle partition的block,会在远程传输到magnet 后被merge到一个文件中;
  4. reduce任务从magnet shuffle service 接收合并好的shuffle数据,不同reduce任务可以共享shuffle数据来提升shuffle传输效率。

几个关键特性:

  • push-merge shuffle
    KB级别小文件的随机读 -> MB级别文件顺序读
    push操作与map任务解耦,Push失败不会导致map任务重试
  • Best-effort approach
    magnet可以接受block push失败,reduce任务既可以消费合并好的block数据,也可以接受未经合并的小文件。
  • Stragglers/Data Skews Mitigation
    可用来减少数据倾斜和task stragglers(即某个慢任务拖慢整个任务的执行时间)

Push-Merge

push-merge的根本目的是减少reduce侧的随机IO,在Magnet上把小文件block合并后,reduce task可以读取连续存储的、大小在MB级别的文件,这样一来,随机读取就可以变成连续磁盘空间上的IO。

push-merge的基本单位是chunk,map task输出block后,首先要将block以算法1的方式分配到chunk中去。

简单解释一下算法1,其实算法1讲的就是block攒批到chunk,chunk长度超限之后又push到magent上的过程。针对每个reduce 任务以及对应的block数据 (编号为i):

  1. 当chunk长度没有超过限制L,将block (长度为 [公式] )append到chunk中,更新chunk长度 [公式]
  2. 当chunk长度超过限制L,将chunk推到编号为k的Magnet机器上,之后写入新的block进去(重新初始化);
  3. [公式] 时,把chunk推到编号为k的Magnet机器上,并且把编号k+1,这里需要理解一下, [公式] 就是第i个任务所对应的Magnet机器编号,R/n是每台Magnet机器所对应的reduce task数量。因此这里需要更新一下机器编号,把chunk发送到下一台机器上。

这里还有一个隐患,根据算法1,所有map task都按照partition id = 1, 2 ... R的顺序来构造和push chunk,这会导致Magnet上资源的热点和严重的争用冲突。(例如,一次push的所有block都属于PartitionId=1,那么在magnet机器上进行merge时,需要排队等待partitionId=1上的文件锁)因此,设计者将chunk按照编号进行了随机化处理,来避免所有map task按照相同次序push chunk。

Magnet机器上需要维护一些重要的元信息,包括:

  1. bitmap: 存储已merge的mapper id,防止重复merge
  2. position offset: 如果本次block没有正常merge,可以恢复到上一个block的位置
  3. currentMapId:标识当前正在append的block,保证不同mapper 的block能依次append

提升连接可靠性

magnet shuffle服务通过Best-effort的方式来解决海量连接可靠性低的问题。在该体系上,所有连接异常都是non-fatal的,可以理解为每个环节上的连接断开或异常,都有一个对应的备选和兜底方案:

  • 如果Map task输出的Block没有成功Push到magnet上,并且反复重试仍然失败,则reduce task直接从ESS上拉取原始block数据;
  • 如果magnet上的block因为重复或者冲突等原因,没有正常完成merge的过程,则reduce task直接拉取未完成merge的block;
  • 如果reduce拉取已经merge好的block失败,则会直接拉取merge前的原始block。

Spark Driver维护了MergeStatus,包含已经merge的block的信息;MapStatus,代表所有map task,即未完成merge的block信息。通过维护MergeStatus和MapStatus,可以完成上述的备选方案,达到Best-effort的效果。

解决Task Straggler问题

task straggler问题也是一个非常有趣的点。下图(a)代表观测到的map task和reduce task的执行时间,横轴为时间线;图(b) 代表隐藏在背后的真实任务执行情况。

解释一下图(b),蓝色代表map task执行时间,绿色代表push-merge时间,红色代表真实的reduce task执行时间。可以看到有一个map task的push-merge执行非常缓慢,导致整个reduce task需要等待block merge完成,才能开始执行。

为了解决这个问题,magnet服务设置了push-merge超时时间,如果block没有在超时时间内完成push-merge,magnet服务会停止继续接受block,提前让reduce task开始执行;而未完成push-merge的block,根据上节中提到的Best-effort方案,reduce task会从MapStatus中获取状态与位置信息,直接拉取没有merge的block数据。图(c)展示了在task straggler问题解决后,reduce task完成时间大大提前了。

Data Skew 问题

在Spark shuffle过程中,如果某个partition的shuffle数据量远高于其他partition,则会出现数据倾斜(data skew)问题。 data skew 不是magnet特有的问题,而是在Spark上已经有成熟解决方案,即大名鼎鼎的 adaptive query execution

magnet需要适配Spark 的adaptive execution特性,同时防止一个magnet服务上因data skew而导致有 100GB / 1TB级别的数据需要merge。为此,针对上文的算法1,做出了一些改进,具体办法是,通过限制 size超过阈值的block被并入到chunk中;如果超过阈值,则会利用上节中的Best-effort方案,直接拉取未完成merge的block数据。而普通的、未有data skew情况的block,则会走正常的push-merge流程。

[1] Shen M, Zhou Y, Singh C. Magnet: push-based shuffle service for large-scale data processing[J]. Proceedings of the VLDB Endowment, 2020, 13(12): 3382-3395.

[2] Zhang H, Cho B, Seyfe E, et al. Riffle: optimized shuffle service for large-scale data analytics[C]//Proceedings of the Thirteenth EuroSys Conference. 2018: 1-15.

[3] Spark Architecture: Shuffle Spark Architecture: Shuffle

[4] Spark sql adaptive execution at 100 tb. https://software.intel.com/en-us/articles/spark-sql-adaptive-execution-at-100-tb (Retrieved 02/20/2020).


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK