14

利用coalesce加速Spark迭代计算

 3 years ago
source link: https://blog.csdn.net/yanxiangtianji/article/details/109043066
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

问题:join导致partition数量膨胀

Spark把每一个RDD分割为若干个partition,每一个partition上的计算是一个独立的task。每个task在执行的时候都是独立schedule的,都需要执行一遍完整的排序、系列化、计算、反序列化工作。

这其中很大一部分工作的开销基本都是恒定的,不随partition中数据的多少而变化。所以当一个RDD的partition过多的时候,计算时会有很大的overhead。

在计算的过程中,我们经常需要把两个或更多的变量放在一起做些计算,我们通常会用到join,cogroup一来的操作。以a.join(b)为例,它在执行过程中会先对两个RDD分别做mapPartitions操作再次切分a和b。这是因为默认情况下不能保证切分啊和b的方法是不一样的,也就没法只用本地数据完成join操作。重新切分之后就可以确保切分后同一个partition上的a和b的key是相同的了。然后就可以在本地执行join操作了。

由于mapPartitions的存在,join一类的操作会增加partition数量。在反复迭代的计算任务中,partition数量会被增加到难以想象,每一轮的计算时间也会不断膨胀。

下面是一个迭代计算最短路径的例子,这里我们使用BFS的算法。其中我们用graph.join(sssp)来合并图和当前已知的最短距离,然后通过flatMapcompute计算途径每一个节点出发到达其他节点的距离,最后用reduceByKey(min)计算新一轮里已知的最短距离。

# 随机生成一个有权图
# 格式:(src, [(dst, weight)*n])
n=10000
graph=[(k, [(i, random.random()) for i in random.sample(range(n),random.randint(1,n-1))]) for k in range(n)]
print(graph[0])
# 初始化从source到各个节点的已知最短距离
source=0
sssp=graph.map(lambda r:(r[0],0 if r[0]==source else math.inf)
# 定义计算函数
def compute(links, v):
	for d,w in links:
		yield (d,v+w)
# 迭代计算
for i in range(20):
	t=time.time()
	c=graph.join(sssp).flatMap(lambda kls: compute(kls[1][0],kls[1][1]))
	sssp=c.reduceByKey(min)
	#sssp=sssp.coalesce(4)
	p=sssp.aggregate((0,0), (lambda lr,v:(lr[0],lr[1]+1) if math.isinf(v[1]) else(lr[0]+v[1],lr[1])), (lambda a,b:(a[0]+b[0],a[1]+b[1])))
	print(i,time.time()-t,p,c.getNumPartitions(),sssp.getNumPartitions())

这里的getNumPartitions()函数可以看到某个RDD当前有多少partition。
下面的数据是我用3个core运行,同时把graph切成3块,初始sssp切成6块时候的计算结果。可以看到除了第一轮因为需要做些初始化工作所以速度较慢以外,其他各轮随着partition数量增多,每一轮的计算时间也在不断增长。
如果使用WebUI的话可以点开对应的stage信息,可以清晰地看到task的数量逐渐增多,同时overhead也越来越大。最终导致reduceByKey操作的时间也越来越长。

0 2.0179808139801025 (8745.09903, 1) 9 9
1 1.5022525787353516 (8745.099020000001, 0) 12 12
2 2.0394625663757324 (8745.09902, 0) 15 15
3 2.5546443462371826 (8745.099020000001, 0) 18 18
4 3.239337921142578 (8745.09902, 0) 21 21

解决:合并partition

repartition函数

最直接的合并parition的办法就是repartition函数。它可以把一个RDD转换为任意数量的partition。

a=sc.parallelize(range(100),5) # 初始为5个partition
print(a.getNumPartitions())
b=a.repartition(10) # 转为10个partition
print(b.getNumPartitions())
c=a.repartition(2) # 转为2个partition
print(c.getNumPartitions())

但是repartition函数每次执行都会对所有元素重新计算一次它应该归属的partition,这个开销可能会很大。

coalesce函数

repartition函数不同,coalesce函数只用用于缩减partition的数量。而且它不重新计算partition归属,只是单纯地合并一些数据。所以它的参数只有在小于该RDD当前的partition数量时才有意义。
它的执行原则是不进行跨通讯,单纯地将同一个core上的数据块进行合并,从而达到最终partition数量满足给出的参数。

a=sc.parallelize(range(100),5
b=a.coalesce(5)

下面是我将上面代码中关于coalesce的注释去掉之后的结果。因为我之间做了一些其他操作,所以第一轮运行时间略长。但是之后的运行时间非常稳定,不再随轮数而增长。

0 2.5019025802612305 (8745.099020000001, 0) 7 4
1 0.9806034564971924 (8745.099020000001, 0) 7 4
2 0.8405632972717285 (8745.099020000001, 0) 7 4
3 0.795809268951416 (8745.099020000001, 0) 7 4
4 0.7986171245574951 (8745.099020000001, 0) 7 4

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK