7

Python中的多进程(multiprocessing)

 3 years ago
source link: https://note.qidong.name/2018/11/python-multiprocessing/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Python中的多进程(multiprocessing)

2018-11-21 22:22:14 +08  字数:2530  标签: Python

Python中的多线程、包括协程,由于CPython的GIL(Global interpreter Lock ,全局解释锁)问题,只能实现并发(Concurrency),不能实现并行(Parallelism)。 因此,在并行计算场景,多进程是Python最简单的选择。

Python多进程概念

概念,就是class。 了解概念,就会了解class之间的关系。

Process

与普通的多进程类似,Python多进程的核心概念是Process(进程)。 一个简单的进程使用示例如下:

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p0 = Process(target=f, args=('alice',))
    p1 = Process(target=f, args=('bob',))
    p0.start()
    p1.start()
    p0.join()
    p1.join()

进程使用三部曲:

  1. 创建Process
  2. start,开始执行。
  3. join,等待执行完毕。

Pipe

Pipe即管道,是Bash中最常见的跨进程通信手段。 echo hello | tee stdout.log,中间的|就是管道,把前一个进程的stdout传递给下一个进程。

Pipe创建时,返回两个Connection,前者负责send而后者负责recv。 两个进程各执一端,就可以实现单向通信。

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    receiver, sender = Pipe()
    p = Process(target=f, args=(sender,))
    p.start()
    print(receiver.recv())   # prints "[42, None, 'hello']"
    p.join()

如果在创建Pipe时,指定duplex=True,比如Pipe(True),两个Connection即可实现双向通信。 默认duplex=False

Queue

Queue是一个基于标准模块queue、包装了Pipe的类。 它不仅具有先进先出(FIFO)的特性,还能实现跨进程通信。

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

在实际使用中,除了少数简单场景外,都不会直接使用ProcessPipeQueue来实现多进程。 这种低层级(low level,无贬义)的API,可读性差,容易出错。 常用的是高层级API——进程池。

Pool

由于Process创建、销毁有较大开销,并且并行数受机器CPU数量的限制,过多无益。 一个Pool(进程池)会统一创建并维持一定数量的Process,并行地执行Task。 在所有Task执行完毕后,再统一地关闭Process

这里Task(任务)的概念,并未被实现为一个class,而是一个callable,比如下面的fg

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from multiprocessing.pool import Pool


def f(x):
    return x * x


def g(x, y):
    return x**y


def main():
    with Pool(4) as pool:
        result = pool.map(f, [1, 2, 3, 4, 5])
        print(type(result))
        print(result)

    with Pool(4) as pool:
        result = pool.starmap(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)])
        print(type(result))
        print(result)


if __name__ == '__main__':
    main()

以上代码保存为multi.py文件,执行结果如下:

$ python3 multi.py
<class 'list'>
[1, 4, 9, 16, 25]
<class 'list'>
[1, 16, 243, 4096, 78125]

map的用法类似内置函数map,专门处理单个参数的callable; 而starmap则是用来处理多个参数的callable

此外,还有利用Pool执行单个Task的apply。 除非Task本身就是一个个来的,否则使用apply的效率不高。

比起apply,更值得关注的是imapimap_unorderedimapmap非常类似,而这个多出来的i,则是Iterablemap使用的是listimap则是Iterable,前者效率略高,而后者内存消耗显著的小。 在处理结果上,imap可以尽快返回一个Iterable的结果,而map则需要等待全部Task执行完毕,返回list。 无论map还是imap,都需要按顺序等待Task执行完成,而imap_unordered则不必。 imap_unordered返回的Iterable,会优先迭代到先执行完成的Task。 三者各有特点,要按需使用。

AsyncResult

以上为进程池的同步使用方案。 同步方案会卡在mapstarmap这一行,直到所有任务都执行完毕。 有时,我们会需要一个异步方案,这时就需要用到map_asyncstarmap_async。 它们返回的结果,就是AsyncResult

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
from multiprocessing.pool import Pool


def f(x):
    return x * x


def g(x, y):
    return x**y


def main():
    with Pool(4) as pool:
        result = pool.map_async(f, [1, 2, 3, 4, 5])
        print(type(result))
        print(result.get())

    with Pool(4) as pool:
        result = pool.starmap_async(g, [(1, 3), (2, 4), (3, 5), (4, 6), (5, 7)])
        print(type(result))
        print(result.get())


if __name__ == '__main__':
    main()

以上代码保存为multi_async.py文件,执行结果如下:

$ python3 multi_async.py
<class 'multiprocessing.pool.MapResult'>
[1, 4, 9, 16, 25]
<class 'multiprocessing.pool.MapResult'>
[1, 16, 243, 4096, 78125]

以上代码中,实际等待位置是result.get()那一行。

Timeout

以上多进程代码,其实是不完善的。 除非Task非常简单,并无IO、网络等资源依赖,否则多进程也好、多线程也好,都有可能执行不完。 为了避免未知原因的挂起,及时止损,通常需要设置timeoutAsyncResult在阻塞时,可以用waitget,设置timeout参数。

#!/usr/bin/env python3
# -*- coding:utf-8 -*-

import time
from multiprocessing.pool import Pool, TimeoutError


def sleep(duration):
    time.sleep(duration)
    with open('result.log', 'a') as file:
        file.write(str(duration))
        file.write('\n')
    return duration


def main():
    with Pool(4) as pool:
        result = pool.map_async(sleep, range(8))
        try:
            print(result.get(timeout=5))
        except TimeoutError:
            print(TimeoutError.__name__)


if __name__ == '__main__':
    main()

以上代码保存为timeout.py文件,执行结果如下:

$ python3 timeout.py
TimeoutError
$ cat result.log
0
1
2
3
4

可以看到,由于timeout=5,4秒以前的Task都成功了,而大于(等于)5秒的Task都失败了。

get需要等待所有进程结束时,需要在Pool关闭以前。 因此,需要在with作用域中执行,否则将超时或(没设timeout)挂死。 如果使用wait,则get可以在with以外获取结果。 因此,更推荐使用wait配合get

def main():
    with Pool(4) as pool:
        result = pool.map_async(sleep, range(8))
        result.wait()
    try:
        print(result.get(9))
    except TimeoutError:
        print(TimeoutError.__name__)

替换main(),执行结果如下:

$ python3 timeout.py
[0, 1, 2, 3, 4, 5, 6, 7]

总结

前面提到Python做并行计算的选择,多进程multiprocessing只是最简单的一个选择。 另外还有两个常见选择: 一是使用其它解释器实现的Python,比如PyPy、Jython等; 二是使用C语言优化需要并行的代码,在Native层绕过GIL的限制; 三是使用协程(或线程)加subprocess,这也算是多进程的一个方案。 此外,确认代码是否真的会被GIL所影响,是首要工作。 如果代码中真正耗时的计算是在Native层执行——这在Python中非常常见,比如OpenCV——那么用多线程也没问题。

另外,要注意多进程的测试覆盖问题。 在另一个进程执行的代码,是无法被coverage确认为已覆盖的。 需要对执行内容进行单独测试,或者在程序中预留未用多进程优化的原始方案。

其实,多进程带来的额外通信、切换开销,有时候也是很明显的。 还有个问题是,主进程被杀掉后,子进程会仍然存活,这在某些场景下会产生未知问题。 所以,在机器不是很强大的场景下,用原始的单线程串行方案,是最经济实用的选择。

参考


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK