16

面向数据科学的Python多进程简介及应用

 3 years ago
source link: https://zhuanlan.zhihu.com/p/264795288
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

来源:python大数据分析

一、简介

进程是计算机系统中资源分配的最小单位,也是操作系统可以控制的最小单位,在数据科学中很多涉及大量计算、CPU密集型的任务都可以通过多进程并行运算的方式大幅度提升运算效率从而节省时间开销,而在 Python 中实现多进程有多种方式,本文就将针对其中较为易用的几种方式进行介绍。

二、利用multiprocessing实现多进程

multiprocessingPython 自带的用于管理进程的模块,通过合理地利用 multiprocessing ,我们可以充分榨干所使用机器的CPU运算性能,在 multiprocessing 中实现多进程也有几种方式。

2.1 Process

Processmultiprocessing 中最基础的类,用于创建进程,先来看看下面的示例:

single_process.py

import multiprocessing
import datetime
import numpy as np
import os

def job():

    print(f'进程{os.getpid()}开始计算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    for j in range(100):
        _ = np.sum(np.random.rand(10000000))
    print(f'进程{os.getpid()}结束运算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

if __name__ == '__main__':

    process = multiprocessing.Process(target=job)
    process.start()
jyqmA33.jpg!mobile 图1 single_process.py运行结果

在上面的例子中,我们首先定义了函数 job() ,其连续执行一项运算任务100次,并在开始和结束的时刻打印该进程对应的 pid ,用来唯一识别一个独立的进程,接着利用Process()将一个进程实例化,其主要参数如下:

target : 需要执行的运算函数 args : target函数对应的传入参数,元组形式传入

在process创建完成之后,我们对其调用 .start() 方法执行运算,这样我们就实现了单个进程的创建与使用,在此基础上,我们将上述例子多线程化:

multi_processes.py

import multiprocessing
import datetime
import numpy as np
import os

def job():

    print(f'进程{os.getpid()}开始计算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    for j in range(100):
        _ = np.sum(np.random.rand(10000000))
    print(f'进程{os.getpid()}结束运算:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

if __name__ == '__main__':

    process_list = []

    for i in range(multiprocessing.cpu_count() - 1):
        process = multiprocessing.Process(target=job)
        process_list.append(process)

    for process in process_list:
        process.start()

    for process in process_list:
        process.join()
UfQFVzu.jpg!mobile 图2 multi_processes.py运行结果

在上面的例子中,我们首先初始化用于存放多个线程的列表 process_list ,接着用循环的方式创建了CPU核心数-1个进程并添加到 process_list 中,再接着用循环的方式将所有进程逐个激活,最后使用到 .join() 方法,这个方法用于控制进程之间的并行,如下例:

join_demo.py

import multiprocessing
import os
import datetime
import time

def job():

    print(f'进程{os.getpid()}开始:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
    time.sleep(5)
    print(f'进程{os.getpid()}结束:', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


if __name__ == '__main__':

    process1 = multiprocessing.Process(target=job)
    process2 = multiprocessing.Process(target=job)

    process1.start()
    process1.join()

    process2.start()
    process2.join()
    print('='*200)
    process3 = multiprocessing.Process(target=job)
    process4 = multiprocessing.Process(target=job)

    process3.start()
    process4.start()

    process3.join()
    process4.join()
UZNzumQ.jpg!mobile 图2 multi_processes.py运行结果

观察对应进程执行的开始结束时间信息可以发现,一个进程对象在 .start() 之后,若在其他的进程对象 .start() 之前调用 .join() 方法,则必须等到先前的进程对象运行结束才会接着执行 .join() 之后的非 .join() 的内容,即前面的进程阻塞了后续的进程,这种情况下并不能实现并行的多进程,要想实现真正的并行,需要现行对多个进程执行 .start() ,接着再对这些进程对象执行 .join() ,才能使得各个进程之间相互独立,了解了这些我们就可以利用 Process 来实现多进程运算;

2.2 Pool

除了上述的 Process ,在 multiprocessing 中还可以使用 Pool 来快捷地实现多进程,先来看下面的例子:

Pool_demo.py

from multiprocessing import Pool
import numpy as np
from pprint import pprint

def job(n):
    return np.mean(np.random.rand(n)), np.std(np.random.rand(n))

if __name__ == '__main__':
    with Pool(5) as p:
        pprint(p.map(job, [i**10 for i in range(1, 6)]))
EJZ7bmr.jpg!mobile 图3 Pool_demo.py运行结果

在上面的例子中,我们使用 Pool 这个类,将自编函数 job 利用 .map() 方法作用到后面传入序列每一个位置上,与 Python 自带的 map() 函数相似,不同的是 map() 函数将传入的函数以串行的方式作用到传入的序列每一个元素之上,而 Pool() 中的 .map() 方法则根据前面传入的并行数量5,以多进程并行的方式执行,大大提升了运算效率。

三、利用joblib实现多进程

multiprocessing 需要将执行运算的语句放置于含有 if __name__ == '__main__': 的脚本文件中下不同, joblib 将多进程的实现方式大大简化,使得我们可以在 IPython 交互式环境下中灵活地使用它,先看下面这个例子:

from joblib import Parallel, delayed
import numpy as np
import time
import datetime

def job(i):
    start = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    time.sleep(5)
    end = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    return start, end

result = Parallel(n_jobs=5, verbose=1)(delayed(job)(j) for j in range(5))
result
zIjQJrz.jpg!mobile 图4 joblib并行示例

在上面的例子中,我们从 joblib 中导入 Paralleldelayed ,仅用 Parallel(n_jobs=5, verbose=1)(delayed(job)(j) for j in range(5)) 一句就实现了并行运算的功能,其中 n_jobs 控制并行进程的数量, verbose 参数控制是否打印进程运算过程,如果你熟悉 scikit-learn ,相信这两个参数你一定不会陌生,因为 scikit-learnRandomForestClassifier 等可以并行运算的算法都是通过 joblib 来实现的。

以上就是本文的全部内容,如有笔误望指出!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK