26

并发体验:Python抓图的8种方式

 6 years ago
source link: http://www.10tiao.com/html/761/201806/2650368493/1.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文系作者「无名小妖」的第二篇原创投稿文章,作者通过用爬虫示例来说明并发相关的多线程、多进程、协程之间的执行效率对比。如果你喜欢写博客,想投稿可微信我,有稿费酬劳。

假设我们现在要在网上下载图片,一个简单的方法是用 requests+BeautifulSoup。注:本文所有例子都使用python3.5)

单线程

示例 1:get_photos.py

import os
import time
import uuid

import requests
from bs4 import BeautifulSoup

def out_wrapper(func):  # 记录程序执行时间的简单装饰器
   def inner_wrapper():
       start_time = time.time()
       func()
       stop_time = time.time()
       print('Used time {}'.format(stop_time-start_time))
   return inner_wrapper

def save_flag(img, filename):  # 保存图片
   path = os.path.join('down_photos', filename)
   with open(path, 'wb') as fp:
       fp.write(img)

def download_one(url):  # 下载一个图片
   image = requests.get(url)
   save_flag(image.content, str(uuid.uuid4()))

def user_conf():  # 返回30个图片的url
   url = 'https://unsplash.com/'
   ret = requests.get(url)
   soup = BeautifulSoup(ret.text, "lxml")
   zzr = soup.find_all('img')
   ret = []
   num = 0
   for item in zzr:
       if item.get("src").endswith('80') and num < 30:
           num += 1
           ret.append(item.get("src"))
   return ret

@out_wrapper
def download_many():
   zzr = user_conf()
   for item in zzr:
       download_one(item)

if __name__ == '__main__':
   download_many()

示例1进行的是顺序下载,下载30张图片的平均时间在60s左右(结果因实验环境不同而不同)。

这个代码能用但并不高效,怎么才能提高效率呢?

参考开篇的示意图,有三种方式:多进程、多线程和协程。下面我们一一说明:

我们都知道 Python 中存在 GIL(主要是Cpython),但 GIL 并不影响 IO 密集型任务,因此对于 IO 密集型任务而言,多线程更加适合(线程可以开100个,1000个而进程同时运行的数量受 CPU 核数的限制,开多了也没用)

不过,这并不妨碍我们通过实验来了解多进程。

多进程

示例2

from multiprocessing import Process
from get_photos import out_wrapper, download_one, user_conf

@out_wrapper
def download_many():
   zzr = user_conf()
   task_list = []
   for item in zzr:
       t = Process(target=download_one, args=(item,))
       t.start()
       task_list.append(t)
   [t.join() for t in task_list]  # 等待进程全部执行完毕(为了记录时间)

if __name__ == '__main__':
   download_many()

本示例重用了示例1的部分代码,我们只需关注使用多进程的这部分。

笔者测试了3次(使用的机器是双核超线程,即同时只能有4个下载任务在进行),输出分别是:19.5s、17.4s和18.6s。速度提升并不是很多,也证明了多进程不适合io密集型任务。

还有一种使用多进程的方法,那就是内置模块futures中的ProcessPoolExecutor。

示例3

from concurrent import futures
from get_photos import out_wrapper, download_one, user_conf

@out_wrapper
def download_many():
   zzr = user_conf()
   with futures.ProcessPoolExecutor(len(zzr)) as executor:
       res = executor.map(download_one, zzr)
   return len(list(res))

if __name__ == '__main__':
   download_many()

使用 ProcessPoolExecutor 代码简洁了不少,executor.map 和标准库中的 map用法类似。耗时和示例2相差无几。多进程就到这里,下面来体验一下多线程。

多线程

示例4

import threading
from get_photos import out_wrapper, download_one, user_conf

@out_wrapper
def download_many():
   zzr = user_conf()
   task_list = []
   for item in zzr:
       t = threading.Thread(target=download_one, args=(item,))
       t.start()
       task_list.append(t)
   [t.join() for t in task_list]

if __name__ == '__main__':
   download_many()

threading 和 multiprocessing 的语法基本一样,但是速度在9s左右,相较多进程提升了1倍。

下面的示例5和示例6中分别使用内置模块 futures.ThreadPoolExecutor 中的 map 和submit、as_completed

示例5

from concurrent import futures
from get_photos import out_wrapper, download_one, user_conf

@out_wrapper
def download_many():
   zzr = user_conf()
   with futures.ThreadPoolExecutor(len(zzr)) as executor:
       res = executor.map(download_one, zzr)
   return len(list(res))

if __name__ == '__main__':
   download_many()

示例6:

from concurrent import futures
from get_photos import out_wrapper, download_one, user_conf

@out_wrapper
def download_many():
    zzr = user_conf()
    with futures.ThreadPoolExecutor(len(zzr)) as executor:
        to_do = [executor.submit(download_one, item) for item in zzr]
        ret = [future.result() for future in futures.as_completed(to_do)]
    return ret

if __name__ == '__main__':
    download_many()

Executor.map 由于和内置的map用法相似所以更易于使用,它有个特性:返回结果的顺序与调用开始的顺序一致。不过,通常更可取的方式是,不管提交的顺序,只要有结果就获取。

为此,要把 Executor.submit 和 futures.as_completed结合起来使用。

最后到了协程,这里分别介绍 gevent 和 asyncio。

gevent

示例7

from gevent import monkey
monkey.patch_all()

import gevent
from get_photos import out_wrapper, download_one, user_conf

@out_wrapper
def download_many():
    zzr = user_conf()
    jobs = [gevent.spawn(download_one, item) for item in zzr]
    gevent.joinall(jobs)

if __name__ == '__main__':
    download_many()

asyncio

示例8

import uuid
import asyncio

import aiohttp
from get_photos import out_wrapper, user_conf, save_flag

async def download_one(url):
   async with aiohttp.ClientSession() as session:
       async with session.get(url) as resp:
           save_flag(await resp.read(), str(uuid.uuid4()))

@out_wrapper
def download_many():
   urls = user_conf()
   loop = asyncio.get_event_loop()
   to_do = [download_one(url) for url in urls]
   wait_coro = asyncio.wait(to_do)
   res, _ = loop.run_until_complete(wait_coro)
   loop.close()
   return len(res)

if __name__ == '__main__':
   download_many()

协程的耗时和多线程相差不多,区别在于协程是单线程。具体原理限于篇幅这里就不赘述了。

但是我们不得不说一下asyncio,asyncio是Python3.4加入标准库的,在3.5为其添加async和await关键字。或许对于上述多线程多进程的例子你稍加研习就能掌握,但是想要理解asyncio你不得不付出更多的时间和精力。

另外,使用线程写程序比较困难,因为调度程序任何时候都能中断线程。必须保留锁以保护程序,防止多步操作在执行的过程中中断,防止数据处于无效状态。

而协程默认会做好全方位保护,我们必须显式产出才能让程序的余下部分运行。对协程来说,无需保留锁,在多个线程之间同步操作,协程自身就会同步,因为在任意时刻只有一个协程运行。想交出控制权时,可以使用 yield 或 yield from(await) 把控制权交还调度程序。

总结

本篇文章主要是将python中并发相关的模块进行基本用法的介绍,全做抛砖引玉。而这背后相关的进程、线程、协程、阻塞io、非阻塞io、同步io、异步io、事件驱动等概念和asyncio的用法并未介绍。大家感兴趣的话可以自行google或者百度,也可以在下方留言,大家一起探讨。


(如果本文对你有帮助,可以对作者打赏)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK