7

从MR到Spark再到Ray,谈分布式编程的发展

 4 years ago
source link: http://dockone.io/article/9648
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

作为和Spark同出一脉的Ray,目前三个合伙人也创立了公司,叫Anyscale,参考这篇新闻《 Anyscale raises $20.6 million to simplify writing AI and ML applications with Ray - Top Tech News 》。

获得了2000多万美元的投资。然后我之前也写了篇文章, 谈了谈对Ray的认知,以及如何充分整合Spark/Ray的优势

其实现在看来,把他们搭配起来,确实是个不错的组合。不过今天,我想从一个新的角度去看Ray,希望能从MR->Spark->Ray的发展历程看看分布式编程的发展历程。说实在的,我不知道“分布式编程”这个是我自己臆想出来的词汇还是真的有这个术语,反正含义就是我需要编写一段代码,但是这段代码可以从单机多机随便跑,不用做任何调整。

另外,我之前还觉得Spark和Ray并不是同一个层级的产品,因为觉得Ray更像一个通用的资源调度框架,只是不同于Yarn,Kubernetes之类面向应用,Ray是面向业务问题的,而Spark则是一个面向数据处理的产品。现在回过头来看,他们其实是一毛一样的。Ray的系统层对应的其实就是Spark的RDD,然后基于Ray系统层或者Spark RDD,我们都可以开发应用层的东西,比如DataFrame API,SQL执行引擎,流式计算引擎,机器学习算法。同样的,就算没有这些应用层,我们也可以轻而易举的使用系统层/RDD来直接解决一个很简单的问题,比如做一个简单的word count。所以这么看来,他们其实是一个东西,唯一的区别是系统层的抽象(不同取决于要解决的问题)。Ray的系统层是以Task为抽象粒度的,用户可以在代码里任意生成和组合task,比如拆分成多个Stage,每个Task执行什么逻辑,每个task需要多少资源,非常自由,对资源把控力很强。RDD则是以数据作为抽象对象的,你关心的应该是数据如何处理,而不是去如何拆解任务,关心资源如何被分配,这其中涉及的概念比如Job,Stage,task你最好都不要管,RDD自己来决定。虽然抽象不一样,但是最终都能以此来构建解决各种问题的应用框架,比如RDD也能完成流,批,机器学习相关的工作,Ray也是如此。所以从这个角度来看,他们其实并没有太过本质的区别。从交互语言而言,双方目前都支持使用python/java来写任务。另外,就易用性而言,双方差距很小,各有优势。

恩,不知不觉跑题了。我们回到这篇文章的主题吧。为了编写一个可以跑在多几个机器上的应用,MR做了一个范式规定,就是你编写map,reduce函数(在类里面),然后MR作为调度框架去调用你的map/reduce。

public class WordCount {

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();

        StringTokenizer tokenizer = new StringTokenizer(line);

        while (tokenizer.hasMoreTokens()) {

            value.set(tokenizer.nextToken());

            context.write(value, new IntWritable(1));

        }

    }

}



public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int sum = 0;

        for (IntWritable x : values) {

            sum += x.get();

        }

        context.write(key, new IntWritable(sum));

    }

}



public static void main(String[] args) throws Exception {



    Configuration conf = new Configuration();

    Job job = new Job(conf, "My Word Count Program");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(Map.class);

    job.setReducerClass(Reduce.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    job.setInputFormatClass(TextInputFormat.class);

    job.setOutputFormatClass(TextOutputFormat.class);

    Path outputPath = new Path(args[1]);

    //Configuring the input/output path from the filesystem into the job

    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    //deleting the output path automatically from hdfs so that we don't have to delete it explicitly

    outputPath.getFileSystem(conf).delete(outputPath);

    //exiting the job only if the flag value becomes false

    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

} 

MR自身是一个分布式计算框架,依附于它,你可以把这段代码分布式跑起来。map/reduce部分都是分布在不同的进程中执行的。为了能够分布式执行,我们看,他制定了非常多的规范和接口,这看起来非常不像我们正常写单机程序的样子。

Spark除了性能提升以外,最大的优势是,把代码搞的更像单机程序了,下面看一个:

val text = sc.textFile("mytextfile.txt") 

val counts = text.flatMap(line => line.split(" ")

).map(word => (word,1)).reduceByKey(_+_) counts.collect

上面代码看起来已经和单机的scala处理别无二致了,基本没有强制你需要实现什么接口。所以这也导致了一个额外的问题,用户真的以完全以单机的思维去写代码,但是毕竟他还是分布式的,所以有的时候用户会写出问题。比如申明了一个变量,然后在map里进行累加,发现输出结果不对。Spark能做到这么简洁,核心基础是函数序列化,也就是系统会把你写的那些函数(你的功能)自动序列化到各个节点去执行,函数之间会产生依赖图。这个是Spark对整个分布式编程的一大贡献。在MR中,是把Jar包分发到各个节点,然后执行相应的代码。分发的是二进制库,而不是函数。除此之外,函数序列化发送,还给Spark带来了一大波好处,比如你可以使用spark-shell,类似python-shell,然后在命令行里面直接写分布式代码,实时回显结果。

同时,Spark很早就看上了Python作为当前最优秀的人机交互通用语言,创造性的引入Python的支持,能让你用Python去写这些程序,并且保证和写scala的模式完全一致。可怜的MR虽然也能用Python,但是是通过输入输出流来让Java的进程和Python进程进行交互,效率性能都极低,而且python的代码看起来也不自然,又是单独一套方式,而不是吻合MR的模式写。Spark让自己的scala版exeucor不但能执行java/scala的函数,还能将Python的函数委托给Python Worker执行。executor从task执行者,摇身一变,还成了类似yarn的nodemanager,可以管理其他子worker了(启动python进程执行Python函数)。

所以,Spark是使得分布式程序看起来更像单机程序的始作俑者,而且还提供了非常多的好用的API,比如DataFrame,SQL等等。

现在Ray来了,Ray吸取了Spark的诸多营养,走的更远了,首先,既然Python作为世界上当前最优秀的人机交互通用语言,我直接就让Python成为头等公民,安装部署啥的全部采用Python生态。比如你pip 安装了ray,接着就可以用ray这个命令部署集群跑在Kubernetes/Yarn或者多个主机上,也可以直接使用Python引入ray API进行编程。易用性再次提高一大截。其次,作为分布式应用,他除了能够把Python函数发到任意N台机器上执行,还提供了Actor(类)的机制,让你的类也可以发到任意N台机器上执行。我们来看看函数的例子:

import ray

ray.init()



@ray.remote

def f(x):

return x * x



futures = [f.remote(i) for i in range(4)]

print(ray.get(futures))

再来看类的例子:

import ray

ray.init()



@ray.remote(num_cpus=2, num_gpus=1)

class Counter(object):

def __init__(self):

    self.value = 0



def increment(self):

    self.value += 1

    return self.value

f = Foo.remote()

在@ray.remote里你可以设置这个类或者函数需要的资源等各项参数。大家可以看看更复杂的例子,比如实现一些MR/Parameter Server,看起来和单机代码区别不大。

额外值得一提的是,Ray还解决一个非常重要的问题,就是进程间数据交换问题。在Ray里他会把数据存储到一个内置的分布式内存存储里,但是写入和读取就存在一个问题,有序列化反序列开销,这个无论是同一语言还是不同的语言都会遇到。于是他们开发了Apache Arrow项目。这个项目很快也被Spark引入,给PySpark性能提升非常大。

所以,从MR到Spark到Ray,本质上都是让分布式代码更符合单机代码的要求,这块要得益于Python/Java等语言对函数序列化的支持。同时,Python已经成为真正的“人机交互标准语言”,因为他的受众非常的广。经过两代框架的努力,不同语言的交互性能已经得到极大的提升。我们即可用通过Python 进程内wrapper其他语言,比如C++来提升Python的运行速度,也可以通过Arrow来完成进程间的数据交互,来提升Python的执行效率。

当然,对于一个可编程的分布式执行引擎而言,Ray相比Spark的RDD是一个更好的更通用的分布式执行引擎,他的抽象层次更低一些,但是引入了一些更友好的东西,比如build-in python支持,除了函数还能更好的支持有状态的类,以及你可以在你的task里启动新的task,并且这个新启动的task也会被分布式的调度到一台服务器上执行,而不是在当前task的Python进程中执行,这个是当前其的引擎比如Spark RDD/Flink做不到的,在这些框架里,任务的产生和分发都只能主节点来做,而Ray则不存在这个问题。

最后Bonus,我在分享上一篇文章的的时候,说了这么一句话:

Spark现有整合AI的模式不是最佳的,而是高成本的。Spark应该通过间接拥抱Ray来拥抱其他的机器学习库。

为啥这么说呢?为啥Spark现在要整合AI的成本是比较高的呢?根子在于Spark Executor最早设计的时候是为了执行task而准备的,后面随着拥抱Python,同时具备了NodeManager的角色,根据需要启动Python Worker执行任务,但最终Python执行前后都需要经过Executor。为了支持复杂的机器学习算法的各种模式,比如最早,Spark并发执行的task和task之间是无关联的,各自跑完了就退出了,现在为了为了满足一些算法的范式,需要让task都启动后才能做下面一件事,所以引入了barria API(这个设计治标不治本,只能解决一类问题,机器学习层次终极要求你需要解决task之间通讯的问题,以及在task中需要能够产生新的task)。这些都需要不断的对spark core做一些改进和调整,而这种调整在Spark目前的体量下,属于重型修改,所以每次都需要引入大版本才能发布这些功能。理论上,最好不要动系统层(RDD),而是在应用层完成这些。但是我们知道,因为RDD曾抽象有点太高(最早是给数据处理做的设计),导致其任务/资源的管理,调度经常需要根据需求做调整,所以最好的办法是,拥抱Ray来达成最佳效果。但是现在两个都是商业公司,事情变得不好说了。

原文链接: https://zhuanlan.zhihu.com/p/98033020


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK