75

Python 并发编程

 5 years ago
source link: https://www.tuicool.com/articles/E7Nba2f
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

资料推荐

这里只是学习资料的一个笔记与总结, 更详细、仔细的学习还请各位看官自行看看原始的资料。在此罗列一下参考到的有用的资料。

主要参考资料:

Python并行编程 中文版

译者非常的用心, 原著(英文版)的代码译者应该大部分都亲自测试过。 因为原著很多疑似疏漏的地方, 译者都做了特别的标识。在此鄙视一下由 张龙 翻译并出版成书的版本。 翻译狗屁不通, 上面的代码应该也没跑过。

使用Python进行并发编程-asyncio篇( 一 )

使用Python进行并发编程-asyncio篇( 二 )

异步编程讲得还是满详细的! 学到了很多东西!

基于线程的并行

Python Thread的定义

class threading.Thread(group=None,  # 一般设置为 None ,这是为以后的一些特性预留的
                       target=None, # 当线程启动的时候要执行的函数
                       name=None,   # 线程的名字,默认会分配一个唯一名字 Thread-N
                       args=(),     # 传递给 target 的参数,要使用tuple类型
                       kwargs={})   # 传递给 target 的参数,要使用dict类型
 

例子: 简单的Python Thread 示例代码

import threading
import time
 
def first_function():
    print(threading.currentThread().getName() + str(' is Starting '))
    time.sleep(2)
    print (threading.currentThread().getName() + str(' is Exiting '))
    return
 
def second_function():
    print(threading.currentThread().getName() + str(' is Starting '))
    time.sleep(2)
    print (threading.currentThread().getName() + str(' is Exiting '))
    return
 
def third_function():
    print(threading.currentThread().getName() + str(' is Starting '))
    time.sleep(2)
    print(threading.currentThread().getName() + str(' is Exiting '))
    return
 
if __name__ == "__main__":
    t1 = threading.Thread(name='first_function', target=first_function)
    t2 = threading.Thread(name='second_function', target=second_function)
    t3 = threading.Thread(name='third_function', target=third_function)
    t1.start()
    t2.start()
    t3.start()
    print("main thread!!!")
 

输出示例:

first_function is Starting 
second_function is Starting 
third_function is Starting 
main thread!!!
# 问题: 请问如果多执行几次, 下面的输出结果的顺序是固定的吗?
first_function is Exiting 
second_function is Exiting 
third_function is Exiting 
 
 
# 结论: 测试结果是固定的
 

问题2: 在最底部按如下顺序加入join()

t1.join()
t2.join()
t3.join()
 
"""
结果:第二部分的顺序看起来是随机的
"""
 

问题3: 按照以下顺序开始&join()

t1.start()
t1.join()
 
t2.start()
t2.join()
 
t3.start()
t3.join()
 
print("main thread!!!")
 
"""
输出顺序固定, 如下所示:
first_function is Starting 
first_function is Exiting 
second_function is Starting 
second_function is Exiting 
third_function is Starting 
third_function is Exiting 
main thread!!!
 
我的理解: join() 是让当前线程完成了再回到主线程, 多线程就成了“单线程”在执行了
"""
 

几个线程的事实

参考资料: https://blog.csdn.net/zhiyuan_2007/article/details/48807761

  1. python 默认参数创建线程后,不管主线程是否执行完毕,都会等待子线程执行完毕才一起退出,有无join结果一样
  2. 如果创建线程,并且设置了daemon为true,即thread.setDaemon(True), 则主线程执行完毕后自动退出,不会等待子线程的执行结果。而且随着主线程退出,子线程也消亡。

  3. join方法的作用是阻塞,等待子线程结束,join方法有一个参数是timeout,即如果主线程等待timeout,子线程还没有结束,则主线程强制结束子线程。

  4. 如果线程daemon属性为False, 则join里的timeout参数无效。主线程会一直等待子线程结束。

  5. 如果线程daemon属性为True, 则join里的timeout参数是有效的, 主线程会等待timeout时间后,结束子线程。此处有一个坑,即如果同时有N个子线程join(timeout),那么实际上主线程会等待的超时时间最长为 N * timeout, 因为每个子线程的超时开始时刻是上一个子线程超时结束的时刻。

自定义线程类的实现

  1. 定义一个 Thread 的子类
  2. 覆盖构造函数 __init__(self [,args]) , 可以添加更多的参数
  3. 覆盖 run() 函数, 实现该线程要做的事情
  4. 启动方法: 调用 start() 而不是 run()
  5. 同样的, 也可以调用 join() 函数

一个使用多线程来抓取豆瓣API的例子:

参考: https://zhuanlan.zhihu.com/p/34004447

import threading
import requests
from bs4 import BeautifulSoup
 
class MyThread(threading.Thread):
 
    def __init__(self, i):
        threading.Thread.__init__(self)
        self.i = i
 
    def run(self):
        url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25)
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'html.parser')
        lis = soup.find('ol', class_='grid_view').find_all('li')
        title_lst = []
        for li in lis:
            title = li.find('span', class_="title").text
            title_lst.append(title)
        print( "thread-%s" % self.i, ",".join(title_lst))
 
for i in range(10):
    th = MyThread(i)
    th.start()
 

输出示例: (宽度有限, 用省略号来替代了)

thread-6 心迷宫,纵横四海,荒...
thread-7 燃情岁月,未麻的部屋...
thread-9 绿里奇迹,2001太...
thread-1 蝙蝠侠:黑暗骑士,乱...
thread-8 谍影重重,战争之王,...
thread-0 肖申克的救赎,霸王别...
thread-5 蝙蝠侠:黑暗骑士崛起...
thread-3 猫鼠游戏,沉默的羔羊...
thread-4 消失的爱人,大鱼,一...
thread-2 指环王2:双塔奇兵,...
 

多线程的应用场景

补充:

Python的GIL是什么鬼,多线程性能究竟如何

一些简单的结论:

  1. 全称 Global Interpreter Lock : 全局解释器锁
  2. 作用: CPython 引入的锁机制。 GIL在解释器层面阻止了真正的并行运行
  3. 因此, 如果是CPU密集型的任务, 多线程反而会拖累整体性能

  4. I/O 密集型才是CPython 解释器下多线程的正确应用场景

    注意: 如果读的是本地文件, 也需要读取不同的文件, 否则不一定能提高性能。

    典型应用: 多线程抓取网络数据 / 调用WebAPI 。。。

  5. Why IO: 因为在进行IO调用的时候, GIL会释放相应的锁!

多线程同步

书中介绍了多种同步机制: Lock / RLock / 信号量(semaphore) / 事件 / with / 队列(quene)

我没有细看, 暂时还不需要处理这种场景。 (当前处理的场景还不需要这么复杂。)

多线程运行结果汇总

举个例子: 上面我们通过多线程抓取了豆瓣Top250 的电影名称,那么我们如何直接得到汇总之后的运行结果呢?

基于上面使用线程类的方式

import threading
import requests
from bs4 import BeautifulSoup
 
class MyThread(threading.Thread):
 
    def __init__(self, i):
        threading.Thread.__init__(self)
        self.i = i
        self.result = []    # 新增一个变量用于存放结果
 
    def run(self):
        url = 'https://movie.douban.com/top250?start={}&filter='.format(self.i*25)
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'html.parser')
        lis = soup.find('ol', class_='grid_view').find_all('li')
        title_lst = []
        for li in lis:
            title = li.find('span', class_="title").text
            title_lst.append(title)
        print( "thread-%s" % self.i, ",".join(title_lst)[:10] + "...")
        self.result = title_lst # 存储运算结果
 
    def get_result(self):   # 专门用于返回运算结果
        return self.result
 
 
result_list = []
thread_list = []
for i in range(10):
    th = MyThread(i)
    th.start()
    thread_list.append(th) # 把全部线程类都放到一个list之中
 
for th in thread_list :     # 遍历线程类list
    th.join()               # 让每个线程都执行完
    result_list.extend(th.get_result()) # 获取线程类的执行结果
 
print("==== final result:", len(result_list))
print(result_list)
 

运行结果如下:

thread-4 消失的爱人,大鱼,一...
thread-8 谍影重重,战争之王,...
thread-1 蝙蝠侠:黑暗骑士,乱...
thread-3 猫鼠游戏,沉默的羔羊...
thread-7 燃情岁月,未麻的部屋...
thread-0 肖申克的救赎,霸王别...
thread-5 蝙蝠侠:黑暗骑士崛起...
thread-2 指环王2:双塔奇兵,...
thread-9 绿里奇迹,2001太...
thread-6 心迷宫,纵横四海,荒...
==== final result: 250
['肖申克的救赎', '霸王别姬', 。。。。。]
 
 

基于 threading.Thread 直接调用的方式

目前没有找到直接从 Thread 类返回结果的方式。

线程池ThreadPool的方式

from multiprocessing.pool import ThreadPool
import requests
from bs4 import BeautifulSoup
 
 
def run(page_no):
    url = 'https://movie.douban.com/top250?start={}&filter='.format(page_no * 25)
    r = requests.get(url)
    soup = BeautifulSoup(r.content, 'html.parser')
    lis = soup.find('ol', class_='grid_view').find_all('li')
    title_lst = []
    for li in lis:
        title = li.find('span', class_="title").text
        title_lst.append(title)
    print("thread-%s" % page_no, ",".join(title_lst)[:10] + "...")
    return title_lst
 
import time
t0 = time.time()
pool = ThreadPool(processes=4)
result_list = pool.map(run, [0,1])
print(len(result_list))
print(result_list)
t1 = time.time()
print("cost time:", (t1 - t0))
 

输出结果:

thread-1 蝙蝠侠:黑暗骑士,乱...
thread-0 肖申克的救赎,霸王别...
2
[['肖申克的救赎', '霸王别姬', ...], 
 ['蝙蝠侠:黑暗骑士', '乱世佳人', ...]]
cost time: 2.3958077430725098
 

注意: 刚才只抓了两页, 因此结果集的长度=2, 现在改成抓取4页, 即有如下改动:

# 原来
result_list = pool.map(run, [0,1])
 
# 改为
result_list = pool.map(run, [0,1,2,3])
 

输出结果:

thread-3 猫鼠游戏,沉默的羔羊...
thread-0 肖申克的救赎,霸王别...
thread-1 蝙蝠侠:黑暗骑士,乱...
thread-2 指环王2:双塔奇兵,...
4
[
    ['肖申克的救赎', '霸王别姬', ...], 
    ['蝙蝠侠:黑暗骑士', '乱世佳人', ...], 
    ['指环王2:双塔奇兵', '教父2', ...], 
    ['猫鼠游戏', '沉默的羔羊', ...]]
cost time: 2.5353810787200928
 

从运行结果上面来说, 时间并没有增加太多(网络开销无法固定)。 如果我们改成抓取10页, 执行速度就要慢得多了(毕竟我们设置线程池的数量为 4

线程池

ThreadPool VS Pool

from multiprocessing.pool import ThreadPool
from multiprocessing import Pool
from multiprocessing.dummy import Pool

具体选择哪个线程池, 两个线程池有什么区别? 我也在好奇之中。。。

貌似推荐 dummy.Pool 的人多一些。 两个线程池在Python官网的介绍都比较少。。。

之所以推荐 dummy.Pool 的人多一些, 原因总结如下:

  1. 实现了跟Pool 完全一致的API , 切换线程池、进程池比较方便
  2. ThreadPool 没有文档说明。。。

几种执行方式

  • apply_async
  • apply
  • map_async
  • map

简单说说他们的区别与联系:

  • map & apply

    他们都是同步/阻塞的

    即: map/apply之后直接运行线程/进程,运行结束后再执行之后语句

  • map 对比 apply 就是调用参数不太一样

    def apply(self, func, args=(), kwds={}):
    def map(self, func, iterable, chunksize=None):
    
  • _async 就是原来同步的基础之上变成是异步执行。直到遇到 wait() 之后才阻塞

几种执行方式例子:

参考: python进程池multiprocessing.Pool和线程池multiprocessing.dummy.Pool实例

正确关闭pool的姿势

需要注意的是, 正确关闭Pool的方式:

pool = ThreadPool(x)
# do something ...
pool.close()    # 先close
pool.join()     # 再join
 

异步编程

一个关键概念: 协程

除了并发之外, 还有一种编程模式:异步。 其中很重要的一个概念就是: 协程 (Coroutine / 微线程,纤程)

引用一个文章的说法: 小米安全中心: 乱谈 Python 并发

Python的线程(包括进程)其实都是对系统原生内核级线程的包装,切换时,需要在内核与用户态空间来来回回,开销明显会大很多,而且多个线程完全由系统来调度,什么时候执行哪个线程是无法预知的。相比而言,协程就轻量级很多,是对线程的一种模拟,原理与内核级线程类似,只不过切换时,上下文环境保存在用户态的堆栈里,协程“挂起”的时候入栈,“唤醒”的时候出栈,所以其调度是可以人为控制的,这也是“协程”名字的来由,大伙协作着来,别总抢来抢去的,伤感情。

简单来说: 协程是一种用户态的轻量级线程。 因此在任务切换的时候要更轻量得多。

futures 做爬虫的例子

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor,as_completed
import time 
import requests
 
def download(url):
    headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:63.0) Gecko/20100101 Firefox/63.0',
                'Connection':'keep-alive',
                'Host':'example.webscraping.com'}
    response = requests.get(url, headers=headers)
    return response.status_code
 
if __name__ == '__main__':
    urllist = ['http://example.webscraping.com/places/default/view/Afghanistan-1',
               'http://example.webscraping.com/places/default/view/Aland-Islands-2']
 
    start = time.time()               
    pool = ProcessPoolExecutor(max_workers = 2)  # 也可以改成 ThreadPoolExecutor          
    futures = [pool.submit(download,url) for url in urllist]
    for future in futures:
        print('执行中:%s, 已完成:%s' % (future.running(), future.done()))
    print('#### 分界线 ####')
    for future in as_completed(futures, timeout=2):
        print('执行中:%s, 已完成:%s' % (future.running(), future.done()))
        print(future.result())
    end = time.time()
    print('使用多线程--timestamp:{:.3f}'.format(end-start))
 

注意:

  1. Python的这个 concurrent.futures 库感觉跟Java 的很像, 可能相互借鉴与学习的结果。
  2. 不过感觉这样写还略麻烦。 可以考虑使用 aiohttp 来做网络爬虫, 用法跟直接使用 requests 比较像, 但是需要接触到新的关键字: asycn wait / asycn with

  3. 使用了异步编程之后, 感觉整个人生观都变了, 需要注意时刻在你的代码中使用异步操作,你如果在代码中使用同步操作,爬虫并不会报错,但是速度可能会受影响。 (就像在写nodejs, 需要调整到回调思维那样)

异步编程个人经验小结

  1. 心智模式上, 线程池、进程池的思维模式我们还是比较熟悉的, 跟之前的同步思想差别不大

    但是异步编程就差别很多了, 而且很多地方都用到了 asycn / await 的关键字。

  2. 可维护性上,考虑到项目交接等等,我的选择: (IO密集型的任务)

    1. 单进程、单线程同步
    2. 单进程、多线程
    3. 异步编程

    也就是说: 除非性能扛不住, 否则就用最简单的编程模型。

  3. 如果是网络爬虫, aiohttp + asycnio 搭配的执行效率是最高的

    性能数据参考: 使用Python进行并发编程-asyncio篇(一)

  4. 你碰到了CPU密集型? 比如机器学习、数据分析等等~

    个人不推荐使用多进程,各种开销、维护很头疼。 推荐使用消息队列、Celery 等等。

  5. asyncio 还在快速发展之中, 在Python3.7 之中,新增了 asyncio.run 的写法, 否则我们得这样写:

    loop = asyncio.get_event_loop()
    loop.run_until_complete(target_function())
    loop.close()
     
    

aiohttp做爬虫的例子

import aiohttp
import asyncio
 
 
NUMBERS = range(12)
# URL = 'http://httpbin.org/get?a={}'
URL = "https://www.baidu.com/s?wd={}"
sema = asyncio.Semaphore(10)        # 通过信号量控制并发数
 
final_list = []
 
def update_dict(new_ele) :
    """
    更新final_list, 相当于把异步并发的结果合并到final_list之中。 
    并且在这里控制, 如果长度到达一定的标准, 就可以做下一步的事情。 这一步是同步的。
    """
    final_list.append(new_ele)
    print("============ list length:", len(final_list))
 
    if len(final_list) >= 3 :
        print("3333 sleep 3 seconds, could insert data list into db")
        time.sleep(3)
        final_list = []
 
async def fetch_async(a):
    """
    使用aiohttp发出异步请求
    """
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.text()
    return data
 
async def print_result(a):
    with (await sema):
        r = await fetch_async(a)
        update_dict(r)
        print('fetch({}) = {}'.format(a, r))
 
import time
 
t0 = time.time()
loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)
 
t1 = time.time()
print("============  cost time:", t1 - t0)
 

一个可以优化的地方: 官网建议重用 ClientSession , 使用方法跟 requestssession 比较类似。 在这里图省事, 暂时没有改造成这样的形式。重用Session, 根据我的理解, 抓取同一个域名会比较有效。(么有实际测试过~)

原文链接:https://www.flyml.net/2019/07/07/python-parallel-programming/


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK