8

python 基于aiohttp的异步爬虫实战 - 钢铁侠的知识库

 2 years ago
source link: https://www.cnblogs.com/jiba/p/16672319.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

钢铁知识库,一个学习python爬虫、数据分析的知识库。人生苦短,快用python。

之前我们使用requests库爬取某个站点的时候,每发出一个请求,程序必须等待网站返回响应才能接着运行,而在整个爬虫过程中,整个爬虫程序是一直在等待的,实际上没有做任何事情。

像这种占用磁盘/内存IO、网络IO的任务,大部分时间是CPU在等待的操作,就叫IO密集型任务。对于这种情况有没有优化方案呢,当然有,那就是使用aiohttp库实现异步爬虫。

aiohttp是什么

我们在使用requests请求时,只能等一个请求先出去再回来,才会发送下一个请求。明显效率不高阿,这时候如果换成异步请求的方式,就不会有这个等待。一个请求发出去,不管这个请求什么时间响应,程序通过await挂起协程对象后直接进行下一个请求。

解决方法就是通过 aiohttp + asyncio,什么是aiohttp?一个基于 asyncio 的异步 HTTP 网络模块,可用于实现异步爬虫,速度明显快于 requests 的同步爬虫。

requests和aiohttp区别

区别就是一个同步一个是异步。话不多说直接上代码看效果。

安装aiohttp

pip install aiohttp
  • requests同步示例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# author: 钢铁知识库
import time
import requests

# 同步请求
def main():
    start = time.time()
    for i in range(5):
        res = requests.get('http://httpbin.org/delay/2')
        print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status_code}')
    print(f'requests同步耗时:{time.time() - start}')
    
if __name__ == '__main__':
    main()

'''
当前时间:2022-09-05 15:44:51.991685, status_code = 200
当前时间:2022-09-05 15:44:54.528918, status_code = 200
当前时间:2022-09-05 15:44:57.057373, status_code = 200
当前时间:2022-09-05 15:44:59.643119, status_code = 200
当前时间:2022-09-05 15:45:02.167362, status_code = 200
requests同步耗时:12.785893440246582
'''

可以看到5次请求总共用12.7秒,再来看同样的请求异步多少时间。

  • aiohttp异步示例:
#!/usr/bin/env python
# file: day6-9同步和异步.py
# author: 钢铁知识库
import asyncio
import time
import aiohttp

async def async_http():
    # 声明一个支持异步的上下文管理器
    async with aiohttp.ClientSession() as session:
        res = await session.get('http://httpbin.org/delay/2')
        print(f'当前时间:{datetime.datetime.now()}, status_code = {res.status}')

tasks = [async_http() for _ in range(5)]
start = time.time()
# Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run()来代替最后的启动操作
asyncio.run(asyncio.wait(tasks))
print(f'aiohttp异步耗时:{time.time() - start}')

'''
当前时间:2022-09-05 15:42:32.363966, status_code = 200
当前时间:2022-09-05 15:42:32.366957, status_code = 200
当前时间:2022-09-05 15:42:32.374973, status_code = 200
当前时间:2022-09-05 15:42:32.384909, status_code = 200
当前时间:2022-09-05 15:42:32.390318, status_code = 200
aiohttp异步耗时:2.5826876163482666
'''

两次对比可以看到执行过程,时间一个是顺序执行,一个是同时执行。这就是同步和异步的区别。

aiohttp使用介绍

接下来我们会详细介绍aiohttp库的用法和爬取实战。aiohttp 是一个支持异步请求的库,它和 asyncio 配合使用,可以使我们非常方便地实现异步请求操作。asyncio模块,其内部实现了对TCP、UDP、SSL协议的异步操作,但是对于HTTP请求,就需要aiohttp实现了。

aiohttp分为两部分,一部分是Client,一部分是Server。下面来说说aiohttp客户端部分的用法。

先写一个简单的案例

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 钢铁知识库
import asyncio
import aiohttp

async def get_api(session, url):
    # 声明一个支持异步的上下文管理器
    async with session.get(url) as response:
        return await response.text(), response.status

async def main():
    async with aiohttp.ClientSession() as session:
        html, status = await get_api(session, 'http://httpbin.org/delay/2')
        print(f'html: {html[:50]}')
        print(f'status : {status}')

if __name__ == '__main__':
    #  Python 3.7 及以后,不需要显式声明事件循环,可以使用 asyncio.run(main())来代替最后的启动操作
    asyncio.get_event_loop().run_until_complete(main())
'''
html: {
  "args": {}, 
  "data": "", 
  "files": {}, 
  
status : 200

Process finished with exit code 0
'''

aiohttp请求的方法和之前有明显区别,主要包括如下几点:

  1. 除了导入aiohttp库,还必须引入asyncio库,因为要实现异步,需要启动协程。
  2. 异步的方法定义不同,前面都要统一加async来修饰。
  3. with as用于声明上下文管理器,帮我们自动分配和释放资源,加上async代码支持异步。
  4. 对于返回协程对象的操作,前面需要加await来修饰。response.text()返回的是协程对象。
  5. 最后运行启用循环事件

注意:Python3.7及以后的版本中,可以使用asyncio.run(main())代替最后的启动操作。

URL参数设置

对于URL参数的设置,我们可以借助params设置,传入一个字典即可,实例如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 钢铁知识库
import aiohttp
import asyncio

async def main():
    params = {'name': '钢铁知识库', 'age': 23}
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.httpbin.org/get', params=params) as res:
            print(await res.json())

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
{'args': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=钢铁知识库&age=23'}
'''

可以看到实际请求的URL后面带了后缀,这就是params的内容。

除了get请求,aiohttp还支持其它请求类型,如POST、PUT、DELETE等,和requests使用方式类似。

session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')

要使用这些方法,只需要把对应的方法和参数替换一下。用法和get类似就不再举例。

响应的几个方法

对于响应来说,我们可以用如下方法分别获取其中的响应情况。状态码、响应头、响应体、响应体二进制内容、响应体JSON结果,实例如下:

#!/usr/bin/env python
# @Author  : 钢铁知识库
import aiohttp
import asyncio

async def main():
    data = {'name': '钢铁知识库', 'age': 23}
    async with aiohttp.ClientSession() as session:
        async with session.post('https://www.httpbin.org/post', data=data) as response:
            print('status:', response.status)  # 状态码
            print('headers:', response.headers)  # 响应头
            print('body:', await response.text())  # 响应体
            print('bytes:', await response.read())  # 响应体二进制内容
            print('json:', await response.json())  # 响应体json数据

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
status: 200
headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {
  "args": {}, 
  "data": "", 
  "files": {}, 
  "form": {
    "age": "23", 
    "name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"
  }, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Content-Length": "57", 
    "Content-Type": "application/x-www-form-urlencoded", 
    "Host": "www.httpbin.org", 
    "User-Agent": "Python/3.8 aiohttp/3.8.1", 
    "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"
  }, 
  "json": null, 
  "origin": "122.55.11.188", 
  "url": "https://www.httpbin.org/post"
}

bytes: b'{\n  "args": {}, \n  "data": "", \n  "files": {}, \n  "form": {\n    "age": "23", \n    "name": "\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip, deflate", \n    "Content-Length": "57", \n    "Content-Type": "application/x-www-form-urlencoded", \n    "Host": "www.httpbin.org", \n    "User-Agent": "Python/3.8 aiohttp/3.8.1", \n    "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"\n  }, \n  "json": null, \n  "origin": "122.5.132.196", \n  "url": "https://www.httpbin.org/post"\n}\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '钢铁知识库'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'}
'''

可以看到有些字段前面需要加await,因为其返回的是一个协程对象(如async修饰的方法),那么前面就要加await。

我们可以借助ClientTimeout对象设置超时,例如要设置1秒的超时时间,可以这么实现:

#!/usr/bin/env python
# @Author  : 钢铁知识库
import aiohttp
import asyncio

async def main():
    # 设置 1 秒的超时 
    timeout = aiohttp.ClientTimeout(total=1)
    data = {'name': '钢铁知识库', 'age': 23}
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get('https://www.httpbin.org/delay/2', data=data) as response:
            print('status:', response.status)  # 状态码

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
Traceback (most recent call last):
####中间省略####
    raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
'''

这里设置了超时1秒请求延时2秒,发现抛出异常asyncio.TimeoutError,如果正常则响应200。

aiohttp可以支持非常高的并发量,但面对高并发网站可能会承受不住,随时有挂掉的危险,这时需要对并发进行一些控制。现在我们借助asyncio 的Semaphore来控制并发量,实例如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 钢铁知识库
import asyncio
from datetime import datetime
import aiohttp

# 声明最大并发量
semaphore = asyncio.Semaphore(2)

async def get_api():
    async with semaphore:
        print(f'scrapting...{datetime.now()}')
        async with session.get('https://www.baidu.com') as response:
            await asyncio.sleep(2)
            # print(f'当前时间:{datetime.now()}, {response.status}')

async def main():
    global session
    session = aiohttp.ClientSession()
    tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]
    await asyncio.gather(*tasks)
    await session.close()

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
'''
scrapting...2022-09-07 08:11:14.190000
scrapting...2022-09-07 08:11:14.292000
scrapting...2022-09-07 08:11:16.482000
scrapting...2022-09-07 08:11:16.504000
scrapting...2022-09-07 08:11:18.520000
scrapting...2022-09-07 08:11:18.521000
'''

在main方法里,我们声明了1000个task,如果没有通过Semaphore进行并发限制,那这1000放到gather方法后会被同时执行,并发量相当大。有了信号量的控制之后,同时运行的task数量就会被控制,这样就能给aiohttp限制速度了。

aiohttp异步爬取实战

接下来我们通过异步方式练手一个小说爬虫,需求如下:

需求页面:https://dushu.baidu.com/pc/detail?gid=4308080950

目录接口:https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4308080950"}

详情接口:https://dushu.baidu.com/api/pc/getChapterContent?data={"book_id":"4295122774","cid":"4295122774|116332"}

关键参数:book_id:小说ID、cid:章节id

采集要求:使用协程方式写入,数据存放进mongo

需求分析:点开需求页面,通过F12抓包可以发现两个接口。一个目录接口,一个详情接口。
首先第一步先请求目录接口拿到cid章节id,然后将cid传递给详情接口拿到小说数据,最后存入mongo即可。

话不多说,直接上代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author  : 钢铁知识库
# 不合适就是不合适,真正合适的,你不会有半点犹豫。
import asyncio
import json,re
import logging
import aiohttp
import requests
from utils.conn_db import ConnDb

# 日志格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

# 章节目录api
b_id = '4308080950'
url = 'https://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+b_id+'"}'
headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
                  "Chrome/104.0.0.0 Safari/537.36"
}
# 并发声明
semaphore = asyncio.Semaphore(5)

async def download(title,b_id, cid):
    data = {
        "book_id": b_id,
        "cid": f'{b_id}|{cid}',
    }
    data = json.dumps(data)
    detail_url = 'https://dushu.baidu.com/api/pc/getChapterContent?data={}'.format(data)
    async with semaphore:
        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.get(detail_url) as response:
                res = await response.json()
                content = {
                    'title': title,
                    'content': res['data']['novel']['content']
                }
                # print(title)
                await save_data(content)

async def save_data(data):
    if data:
        client = ConnDb().conn_motor_mongo()
        db = client.baidu_novel
        collection = db.novel
        logging.info('saving data %s', data)
        await collection.update_one(
            {'title': data.get('title')},
            {'$set': data},
            upsert=True
        )

async def main():
    res = requests.get(url, headers=headers)
    tasks = []
    for re in res.json()['data']['novel']['items']:     # 拿到某小说目录cid
        title = re['title']
        cid = re['cid']
        tasks.append(download(title, b_id, cid))    # 将请求放到列表里,再通过gather执行并发
    await asyncio.gather(*tasks)

if __name__ == '__main__':
    asyncio.run(main())

至此,我们就使用aiohttp完成了对小说章节的爬取。

要实现异步处理,得先要有挂起操作,当一个任务需要等待 IO 结果的时候,可以挂起当前任务,转而去执行其他任务,这样才能充分利用好资源,要实现异步,需要了解 await 的用法,使用 await 可以将耗时等待的操作挂起,让出控制权。当协程执行的时候遇到 await,时间循环就会将本协程挂起,转而去执行别的协程,直到其他的协程挂起或执行完毕。

await 后面的对象必须是如下格式之一:

  • A native coroutine object returned from a native coroutine function,一个原生 coroutine 对象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine,一个由 types.coroutine 修饰的生成器,这个生成器可以返回 coroutine 对象。
  • An object with an await method returning an iterator,一个包含 await 方法的对象返回的一个迭代器。

---- 20220909 钢铁知识库

以上就是借助协程async和异步aiohttp两个主要模块完成异步爬虫的内容,
aiohttp 以异步方式爬取网站的耗时远小于 requests 同步方式,以上列举的例子希望对你有帮助。

注意,线程和协程是两个概念,后面找机会我们再聊聊进程和线程、线程和协程的关系。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK