2

52讲轻松搞定网络爬虫笔记5

 1 year ago
source link: https://www.qixinbo.info/2023/01/15/web-crawler-5/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

52讲轻松搞定网络爬虫

代理的基本原理和用法

我们在做爬虫的过程中经常会遇到这样的情况,最初爬虫正常运行,正常抓取数据,一切看起来都是那么的美好,然而一杯茶的功夫可能就会出现错误,比如 403 Forbidden,这时候打开网页一看,可能会看到 “您的 IP 访问频率太高” 这样的提示,或者跳出一个验证码让我们输入,输入之后才可能解封,但是输入之后过一会儿就又这样了。

出现这种现象的原因是网站采取了一些反爬虫的措施,比如服务器会检测某个 IP 在单位时间内的请求次数,如果超过了这个阈值,那么会直接拒绝服务,返回一些错误信息,这种情况可以称之为封 IP,于是乎就成功把我们的爬虫禁掉了。

既然服务器检测的是某个 IP 单位时间的请求次数,那么我们借助某种方式来伪装我们的 IP,让服务器识别不出是由我们本机发起的请求,不就可以成功防止封 IP 了吗?所以这时候代理就派上用场了。

本课时我们先来看下代理的基本原理和使用代理处理反爬虫的方法。

代理实际上指的就是代理服务器,英文叫作 proxy server,它的功能是代理网络用户去获取网络信息。形象地说,它是网络信息的中转站。在我们正常请求一个网站时,是发送了请求给 Web 服务器,Web 服务器把响应传回给我们。如果设置了代理服务器,实际上就是在本机和服务器之间搭建了一个桥,此时本机不是直接向 Web 服务器发起请求,而是向代理服务器发出请求,请求会发送给代理服务器,然后由代理服务器再发送给 Web 服务器,接着由代理服务器再把 Web 服务器返回的响应转发给本机。这样我们同样可以正常访问网页,但这个过程中 Web 服务器识别出的真实 IP 就不再是我们本机的 IP 了,就成功实现了 IP 伪装,这就是代理的基本原理。

代理的作用

那么,代理有什么作用呢?我们可以简单列举如下。

  • 突破自身 IP 访问限制,访问一些平时不能访问的站点。
  • 访问一些单位或团体内部资源,如使用教育网内地址段免费代理服务器,就可以用于对教育网开放的各类 FTP 下载上传,以及各类资料查询共享等服务。
  • 提高访问速度,通常代理服务器都设置一个较大的硬盘缓冲区,当有外界的信息通过时,也将其保存到缓冲区中,当其他用户再访问相同的信息时, 则直接由缓冲区中取出信息,传给用户,以提高访问速度。
  • 隐藏真实 IP,上网者也可以通过这种方法隐藏自己的 IP,免受攻击,对于爬虫来说,我们用代理就是为了隐藏自身 IP,防止自身的 IP 被封锁。

对于爬虫来说,由于爬虫爬取速度过快,在爬取过程中可能遇到同一个 IP 访问过于频繁的问题,此时网站就会让我们输入验证码登录或者直接封锁 IP,这样会给爬取带来极大的不便。

使用代理隐藏真实的 IP,让服务器误以为是代理服务器在请求自己。这样在爬取过程中通过不断更换代理,就不会被封锁,可以达到很好的爬取效果。

代理分类时,既可以根据协议区分,也可以根据其匿名程度区分,下面分别总结如下:

根据协议区分

根据代理的协议,代理可以分为如下类别:

  • FTP 代理服务器,主要用于访问 FTP 服务器,一般有上传、下载以及缓存功能,端口一般为 21、2121 等。
  • HTTP 代理服务器,主要用于访问网页,一般有内容过滤和缓存功能,端口一般为 80、8080、3128 等。
  • SSL/TLS 代理,主要用于访问加密网站,一般有 SSL 或 TLS 加密功能(最高支持 128 位加密强度),端口一般为 443。
  • RTSP 代理,主要用于 Realplayer 访问 Real 流媒体服务器,一般有缓存功能,端口一般为 554。
  • Telnet 代理,主要用于 telnet 远程控制(黑客入侵计算机时常用于隐藏身份),端口一般为 23。
  • POP3/SMTP 代理,主要用于 POP3/SMTP 方式收发邮件,一般有缓存功能,端口一般为 110/25。
  • SOCKS 代理,只是单纯传递数据包,不关心具体协议和用法,所以速度快很多,一般有缓存功能,端口一般为 1080。SOCKS 代理协议又分为 SOCKS4 和 SOCKS5,SOCKS4 协议只支持 TCP,而 SOCKS5 协议支持 TCP 和 UDP,还支持各种身份验证机制、服务器端域名解析等。简单来说,SOCK4 能做到的 SOCKS5 都可以做到,但 SOCKS5 能做到的 SOCK4 不一定能做到。

根据匿名程度区分

根据代理的匿名程度,代理可以分为如下类别。

  • 高度匿名代理,高度匿名代理会将数据包原封不动的转发,在服务端看来就好像真的是一个普通客户端在访问,而记录的 IP 是代理服务器的 IP。
  • 普通匿名代理,普通匿名代理会在数据包上做一些改动,服务端上有可能发现这是个代理服务器,也有一定几率追查到客户端的真实 IP。代理服务器通常会加入的 HTTP 头有 HTTP_VIA 和 HTTP_X_FORWARDED_FOR。
  • 透明代理,透明代理不但改动了数据包,还会告诉服务器客户端的真实 IP。这种代理除了能用缓存技术提高浏览速度,能用内容过滤提高安全性之外,并无其他显著作用,最常见的例子是内网中的硬件防火墙。
  • 间谍代理,间谍代理指组织或个人创建的,用于记录用户传输的数据,然后进行研究、监控等目的的代理服务器。

常见代理类型

  • 使用网上的免费代理,最好使用高匿代理,使用前抓取下来筛选一下可用代理,也可以进一步维护一个代理池。
  • 使用付费代理服务,互联网上存在许多代理商,可以付费使用,质量比免费代理好很多。
  • ADSL 拨号,拨一次号换一次 IP,稳定性高,也是一种比较有效的解决方案。
  • 蜂窝代理,即用 4G 或 5G 网卡等制作的代理,由于蜂窝网络用作代理的情形较少,因此整体被封锁的几率会较低,但搭建蜂窝代理的成本较高。

在前面我们介绍了多种请求库,如 Requests、Selenium、Pyppeteer 等。我们接下来首先贴近实战,了解一下代理怎么使用,为后面了解代理池打下基础。

下面我们来梳理一下这些库的代理的设置方法。

做测试之前,我们需要先获取一个可用代理。搜索引擎搜索 “代理” 关键字,就可以看到许多代理服务网站,网站上会有很多免费或付费代理,比如免费代理“快代理”:https://www.kuaidaili.com/free/。但是这些免费代理大多数情况下都是不好用的,所以比较靠谱的方法是购买付费代理。付费代理各大代理商家都有套餐,数量不用多,稳定可用即可,我们可以自行选购。

如果本机有相关代理软件的话,软件一般会在本机创建 HTTP 或 SOCKS 代理服务,本机直接使用此代理也可以。

在这里,我的本机安装了一部代理软件,它会在本地的 7890 端口上创建 HTTP 代理服务,即代理为127.0.0.1:7890,另外还会在 7891 端口创建 SOCKS 代理服务,即代理为 127.0.0.1:7891。

我只要设置了这个代理,就可以成功将本机 IP 切换到代理软件连接的服务器的 IP 了。下面的示例里,我将使用上述代理来演示其设置方法,你也可以自行替换成自己的可用代理。设置代理后测试的网址是:http://httpbin.org/get,我们访问该网址可以得到请求的相关信息,其中 origin 字段就是客户端的 IP,我们可以根据它来判断代理是否设置成功,即是否成功伪装了 IP。

requests 设置代理

对于 requests 来说,代理设置非常简单,我们只需要传入 proxies 参数即可。

我在这里以我本机的代理为例,来看下 requests 的 HTTP 代理的设置,代码如下:

import requests
proxy = '127.0.0.1:7890'
proxies = {
   'http': 'http://' + proxy,
   'https': 'https://' + proxy,
}
try:
   response = requests.get('https://httpbin.org/get', proxies=proxies)
   print(response.text)
except requests.exceptions.ConnectionError as e:
   print('Error', e.args)
运行结果:
{
 "args": {},
 "headers": {
   "Accept": "*/*",
   "Accept-Encoding": "gzip, deflate",
   "Host": "httpbin.org",
   "User-Agent": "python-requests/2.22.0",
   "X-Amzn-Trace-Id": "Root=1-5e8f358d-87913f68a192fb9f87aa0323"
 },
 "origin": "210.173.1.204",
 "url": "https://httpbin.org/get"
}

可以发现,我们通过一个字典的形式就设置好了 HTTP 代理,它分为两个类别,有 HTTP 和 HTTPS,如果我们访问的链接是 HTTP 协议,那就用 http 字典名指定的代理,如果是 HTTPS 协议,那就用 https 字典名指定的代理。

其运行结果的 origin 如是代理服务器的 IP,则证明代理已经设置成功。

如果代理需要认证,同样在代理的前面加上用户名密码即可,代理的写法就变成如下所示:

proxy = 'username:[email protected]:7890'

这里只需要将 username 和 password 替换即可。

如果需要使用 SOCKS 代理,则可以使用如下方式来设置:

import requests
proxy = '127.0.0.1:7891'
proxies = {
   'http': 'socks5://' + proxy,
   'https': 'socks5://' + proxy
}
try:
   response = requests.get('https://httpbin.org/get', proxies=proxies)
   print(response.text)
except requests.exceptions.ConnectionError as e:
   print('Error', e.args)

在这里,我们需要额外安装一个包,这个包叫作 requests[socks],安装命令如下所示:

pip3 install "requests[socks]"

运行结果是完全相同的:

{
 "args": {},
 "headers": {
   "Accept": "*/*",
   "Accept-Encoding": "gzip, deflate",
   "Host": "httpbin.org",
   "User-Agent": "python-requests/2.22.0",
   "X-Amzn-Trace-Id": "Root=1-5e8f364a-589d3cf2500fafd47b5560f2"
 },
 "origin": "210.173.1.204",
 "url": "https://httpbin.org/get"
}

另外,还有一种设置方式即使用 socks 模块,也需要像上文一样安装 socks 库。这种设置方法如下所示:

import requests
import socks
import socket
socks.set_default_proxy(socks.SOCKS5, '127.0.0.1', 7891)
socket.socket = socks.socksocket
try:
   response = requests.get('https://httpbin.org/get')
   print(response.text)
except requests.exceptions.ConnectionError as e:
   print('Error', e.args)

使用这种方法也可以设置 SOCKS 代理,运行结果完全相同。相比第一种方法,此方法是全局设置。我们可以在不同情况下选用不同的方法。

Selenium 设置代理

Selenium 同样可以设置代理,在这里以 Chrome 为例来介绍下其设置方法。

对于无认证的代理,设置方法如下:

from selenium import webdriver
proxy = '127.0.0.1:7890'
options = webdriver.ChromeOptions()
options.add_argument('--proxy-server=http://' + proxy)
browser = webdriver.Chrome(options=options)
browser.get('https://httpbin.org/get')
print(browser.page_source)
browser.close()

运行结果如下:

{
 "args": {},
 "headers": {
   "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
   "Accept-Encoding": "gzip, deflate",
   "Accept-Language": "zh-CN,zh;q=0.9",
   "Host": "httpbin.org",
   "Upgrade-Insecure-Requests": "1",
   "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36",
   "X-Amzn-Trace-Id": "Root=1-5e8f39cd-60930018205fd154a9af39cc"
 },
 "origin": "210.173.1.204",
 "url": "http://httpbin.org/get"
}

代理设置成功,origin 同样为代理 IP 的地址。

如果代理是认证代理,则设置方法相对比较麻烦,设置方法如下所示:

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import zipfile
 
ip = '127.0.0.1'
port = 7890
username = 'foo'
password = 'bar'
 
manifest_json = """{"version":"1.0.0","manifest_version": 2,"name":"Chrome Proxy","permissions": ["proxy","tabs","unlimitedStorage","storage","<all_urls>","webRequest","webRequestBlocking"],"background": {"scripts": ["background.js"]
   }
}
"""
background_js = """
var config = {
       mode: "fixed_servers",
       rules: {
         singleProxy: {
           scheme: "http",
           host: "%(ip) s",
           port: %(port) s
         }
       }
     }
 
chrome.proxy.settings.set({value: config, scope: "regular"}, function() {});
 
function callbackFn(details) {
   return {
       authCredentials: {username: "%(username) s",
           password: "%(password) s"
       }
   }
}
 
chrome.webRequest.onAuthRequired.addListener(
           callbackFn,
           {urls: ["<all_urls>"]},
           ['blocking']
)
""" % {'ip': ip, 'port': port, 'username': username, 'password': password}
 
plugin_file = 'proxy_auth_plugin.zip'
with zipfile.ZipFile(plugin_file, 'w') as zp:
   zp.writestr("manifest.json", manifest_json)
   zp.writestr("background.js", background_js)
options = Options()
options.add_argument("--start-maximized")
options.add_extension(plugin_file)
browser = webdriver.Chrome(options=options)
browser.get('https://httpbin.org/get')
print(browser.page_source)
browser.close()

这里需要在本地创建一个 manifest.json 配置文件和 background.js 脚本来设置认证代理。运行代码之后本地会生成一个 proxy_auth_plugin.zip 文件来保存当前配置。

运行结果和上例一致,origin 同样为代理 IP。

SOCKS 代理的设置也比较简单,把对应的协议修改为 socks5 即可,如无密码认证的代理设置方法为:

from selenium import webdriver

proxy = '127.0.0.1:7891'
options = webdriver.ChromeOptions()
options.add_argument('--proxy-server=socks5://' + proxy)
browser = webdriver.Chrome(options=options)
browser.get('https://httpbin.org/get')
print(browser.page_source)
browser.close()

运行结果是一样的。

aiohttp 设置代理

对于 aiohttp 来说,我们可以通过 proxy 参数直接设置即可,HTTP 代理设置如下:

import asyncio
import aiohttp

proxy = 'http://127.0.0.1:7890'

async def main():
   async with aiohttp.ClientSession() as session:
       async with session.get('https://httpbin.org/get', proxy=proxy) as response:
           print(await response.text())

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

如果代理有用户名密码,像 requests 一样,把 proxy 修改为如下内容:

proxy = 'http://username:[email protected]:7890'

这里只需要将 username 和 password 替换即可。

对于 SOCKS 代理,我们需要安装一个支持库,叫作 aiohttp-socks,安装命令如下:

pip3 install aiohttp-socks

可以借助于这个库的 ProxyConnector 来设置 SOCKS 代理,代码如下:

import asyncio
import aiohttp
from aiohttp_socks import ProxyConnector
 
connector = ProxyConnector.from_url('socks5://127.0.0.1:7891')
 
async def main():
   async with aiohttp.ClientSession(connector=connector) as session:
       async with session.get('https://httpbin.org/get') as response:
           print(await response.text())

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

运行结果是一样的。

另外这个库还支持设置 SOCKS4、HTTP 代理以及对应的代理认证,可以参考其官方介绍。

Pyppeteer 设置代理

对于 Pyppeteer 来说,由于其默认使用的是类似 Chrome 的 Chromium 浏览器,因此设置方法和 Selenium 的 Chrome 是一样的,如 HTTP 无认证代理设置方法都是通过 args 来设置,实现如下:

import asyncio
from pyppeteer import launch

proxy = '127.0.0.1:7890'

async def main():
   browser = await launch({'args': ['--proxy-server=http://' + proxy], 'headless': False})
   page = await browser.newPage()
   await page.goto('https://httpbin.org/get')
   print(await page.content())
   await browser.close()

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

运行结果:

{
 "args": {},
 "headers": {
   "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
   "Accept-Encoding": "gzip, deflate, br",
   "Accept-Language": "zh-CN,zh;q=0.9",
   "Host": "httpbin.org",
   "Upgrade-Insecure-Requests": "1",
   "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3494.0 Safari/537.36",
   "X-Amzn-Trace-Id": "Root=1-5e8f442c-12b1ed7865b049007267a66c"
 },
 "origin": "210.173.1.204",
 "url": "https://httpbin.org/get"
}

同样可以看到设置成功。

对于 SOCKS 代理,也是一样的,只需要将协议修改为 socks5 即可,代码实现如下:

import asyncio
from pyppeteer import launch

proxy = '127.0.0.1:7891'

async def main():
   browser = await launch({'args': ['--proxy-server=socks5://' + proxy], 'headless': False})
   page = await browser.newPage()
   await page.goto('https://httpbin.org/get')
   print(await page.content())
   await browser.close()

if __name__ == '__main__':
   asyncio.get_event_loop().run_until_complete(main())

运行结果也是一样的。

以上总结了各个库的代理使用方式,以后如果遇到封 IP 的问题,我们就可以轻松通过加代理的方式来解决啦。

本节代码:https://github.com/Python3WebSpider/ProxyTest

代理池的搭建和使用

我们在上一课时了解了利用代理可以解决目标网站封 IP 的问题,但是如何实时高效地获取到大量可用的代理又是一个问题。

首先在互联网上有大量公开的免费代理,当然我们也可以购买付费的代理 IP,但是代理不论是免费的还是付费的,都不能保证是可用的,因为可能此 IP 已被其他人使用来爬取同样的目标站点而被封禁,或者代理服务器突然发生故障或网络繁忙。一旦我们选用了一个不可用的代理,这势必会影响爬虫的工作效率。

所以,我们需要提前做筛选,将不可用的代理剔除掉,保留可用代理。那么这个怎么来实现呢?这里就需要借助于一个叫作代理池的东西了。

接下来本课时我们就介绍一下如何搭建一个高效易用的代理池。

在这里代理池的存储我们需要借助于 Redis,因此这个需要额外安装。总体来说,本课时需要的环境如下:

  • 安装并成功运行和连接一个 Redis 数据库,安装方法见:https://cuiqingcai.com/5219.html
  • 安装好 Python3(至少为 Python 3.6 版本),并能成功运行 Python 程序。

安装好一些必要的库,包括 aiohttp、requests、redis-py、pyquery、Flask 等。

建议使用 Python 虚拟环境安装,参考安装命令如下:

做好了如上准备工作,我们便可以开始实现或运行本课时所讲的代理池了。

代理池的目标

我们需要做到下面的几个目标,来实现易用高效的代理池。

  • 基本模块分为 4 块:存储模块、获取模块、检测模块、接口模块。
  • 存储模块:负责存储抓取下来的代理。首先要保证代理不重复,要标识代理的可用情况,还要动态实时处理每个代理,所以一种比较高效和方便的存储方式就是使用 Redis 的 Sorted Set,即有序集合。
  • 获取模块:需要定时在各大代理网站抓取代理。代理可以是免费公开代理也可以是付费代理,代理的形式都是 IP 加端口,此模块尽量从不同来源获取,尽量抓取高匿代理,抓取成功之后将可用代理保存到数据库中。
  • 检测模块:需要定时检测数据库中的代理。这里需要设置一个检测链接,最好是爬取哪个网站就检测哪个网站,这样更加有针对性,如果要做一个通用型的代理,那可以设置百度等链接来检测。另外,我们需要标识每一个代理的状态,如设置分数标识,100 分代表可用,分数越少代表越不可用。检测一次,如果代理可用,我们可以将分数标识立即设置为 100 满分,也可以在原基础上加 1 分;如果代理不可用,可以将分数标识减 1 分,当分数减到一定阈值后,代理就直接从数据库移除。通过这样的标识分数,我们就可以辨别代理的可用情况,选用的时候会更有针对性。
  • 接口模块:需要用 API 来提供对外服务的接口。其实我们可以直接连接数据库来获取对应的数据,但是这样就需要知道数据库的连接信息,并且要配置连接,而比较安全和方便的方式就是提供一个 Web API 接口,我们通过访问接口即可拿到可用代理。另外,由于可用代理可能有多个,那么我们可以设置一个随机返回某个可用代理的接口,这样就能保证每个可用代理都可以取到,实现负载均衡。

以上内容是设计代理的一些基本思路。接下来我们设计整体的架构,然后用代码实现代理池。

代理池的架构

根据上文的描述,代理池的架构如图所示。

Ciqah16ULCeAIIqyAABXJVAIMbk947.png

代理池分为 4 个模块:存储模块、获取模块、检测模块、接口模块。

  • 存储模块使用 Redis 的有序集合,用来做代理的去重和状态标识,同时它也是中心模块和基础模块,将其他模块串联起来。
  • 获取模块定时从代理网站获取代理,将获取的代理传递给存储模块,并保存到数据库。
  • 检测模块定时通过存储模块获取所有代理,并对代理进行检测,根据不同的检测结果对代理设置不同的标识。
  • 接口模块通过 Web API 提供服务接口,接口通过连接数据库并通过 Web 形式返回可用的代理。

代理池的实现

接下来我们分别用代码来实现一下这四个模块。

注:完整的代理池代码量较大,因此本课时的代码不必一步步跟着编写,最后去了解源码即可。

这里我们使用 Redis 的有序集合,集合的每一个元素都是不重复的,对于代理池来说,集合的元素就变成了一个个代理,也就是 IP 加端口的形式,如 60.207.237.111:8888,这样的一个代理就是集合的一个元素。另外,有序集合的每一个元素都有一个分数字段,分数是可以重复的,可以是浮点数类型,也可以是整数类型。该集合会根据每一个元素的分数对集合进行排序,数值小的排在前面,数值大的排在后面,这样就可以实现集合元素的排序了。

对于代理池来说,这个分数可以作为判断一个代理是否可用的标志,100 为最高分,代表最可用,0 为最低分,代表最不可用。如果要获取可用代理,可以从代理池中随机获取分数最高的代理,注意是随机,这样可以保证每个可用代理都会被调用到。

分数是我们判断代理稳定性的重要标准,设置分数规则如下所示。

  • 分数 100 为可用,检测器会定时循环检测每个代理可用情况,一旦检测到有可用的代理就立即置为 100,检测到不可用就将分数减 1,分数减至 0 后代理移除。
  • 新获取的代理的分数为 10,如果测试可行,分数立即置为 100,不可行则分数减 1,分数减至 0 后代理移除。

这只是一种解决方案,当然可能还有更合理的方案。之所以设置此方案有如下几个原因。

  • 在检测到代理可用时,分数立即置为 100,这样可以保证所有可用代理有更大的机会被获取到。你可能会问,为什么不将分数加 1 而是直接设为最高 100 呢?设想一下,有的代理是从各大免费公开代理网站获取的,常常一个代理并没有那么稳定,平均 5 次请求可能有两次成功,3 次失败,如果按照这种方式来设置分数,那么这个代理几乎不可能达到一个高的分数,也就是说即便它有时是可用的,但是筛选的分数最高,那这样的代理几乎不可能被取到。如果想追求代理稳定性,可以用上述方法,这种方法可确保分数最高的代理一定是最稳定可用的。所以,这里我们采取 “可用即设置 100” 的方法,确保只要可用的代理都可以被获取到。
  • 在检测到代理不可用时,分数减 1,分数减至 0 后,代理移除。这样一个有效代理如果要被移除需要连续不断失败 100 次,也就是说当一个可用代理如果尝试了 100 次都失败了,就一直减分直到移除,一旦成功就重新置回 100。尝试机会越多,则这个代理拯救回来的机会越多,这样就不容易将曾经的一个可用代理丢弃,因为代理不可用的原因很可能是网络繁忙或者其他人用此代理请求太过频繁,所以在这里将分数为 100。
  • 新获取的代理的分数设置为 10,代理如果不可用,分数就减 1,分数减到 0,代理就移除,如果代理可用,分数就置为 100。由于很多代理是从免费网站获取的,所以新获取的代理无效的比例非常高,可能可用的代理不足 10%。所以在这里我们将分数设置为 10,检测的机会没有可用代理的 100 次那么多,这也可以适当减少开销。

上述代理分数的设置思路不一定是最优思路,但据个人实测,它的实用性还是比较强的。

在这里首先给出存储模块的实现代码,见:https://github.com/Python3WebSpider/ProxyPool/tree/master/proxypool/storages,建议直接对照源码阅读。

在代码中,我们定义了一个类来操作数据库的有序集合,定义一些方法来实现分数的设置、代理的获取等。其核心实现代码实现如下所示:

import redis
from proxypool.exceptions import PoolEmptyException
from proxypool.schemas.proxy import Proxy
from proxypool.setting import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MIN, \
   PROXY_SCORE_INIT
from random import choice
from typing import List
from loguru import logger
from proxypool.utils.proxy import is_valid_proxy, convert_proxy_or_proxies
REDIS_CLIENT_VERSION = redis.__version__
IS_REDIS_VERSION_2 = REDIS_CLIENT_VERSION.startswith('2.')
class RedisClient(object):
   """
   redis connection client of proxypool
   """

   def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, **kwargs):
       """
       init redis client
       :param host: redis host
       :param port: redis port
       :param password: redis password
       """
       self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True, **kwargs)

   def add(self, proxy: Proxy, score=PROXY_SCORE_INIT) -> int:
       """
       add proxy and set it to init score
       :param proxy: proxy, ip:port, like 8.8.8.8:88
       :param score: int score
       :return: result
       """
       if not is_valid_proxy(f'{proxy.host}:{proxy.port}'):
           logger.info(f'invalid proxy {proxy}, throw it')
           return
       if not self.exists(proxy):
           if IS_REDIS_VERSION_2:
               return self.db.zadd(REDIS_KEY, score, proxy.string())
           return self.db.zadd(REDIS_KEY, {proxy.string(): score})

   def random(self) -> Proxy:
       """
       get random proxy
       firstly try to get proxy with max score
       if not exists, try to get proxy by rank
       if not exists, raise error
       :return: proxy, like 8.8.8.8:8
       """
       # try to get proxy with max score
       proxies = self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MAX, PROXY_SCORE_MAX)
       if len(proxies):
           return convert_proxy_or_proxies(choice(proxies))
       # else get proxy by rank
       proxies = self.db.zrevrange(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX)
       if len(proxies):
           return convert_proxy_or_proxies(choice(proxies))
       # else raise error
       raise PoolEmptyException

   def decrease(self, proxy: Proxy) -> int:
       """
       decrease score of proxy, if small than PROXY_SCORE_MIN, delete it
       :param proxy: proxy
       :return: new score
       """
       score = self.db.zscore(REDIS_KEY, proxy.string())
       # current score is larger than PROXY_SCORE_MIN
       if score and score > PROXY_SCORE_MIN:
           logger.info(f'{proxy.string()} current score {score}, decrease 1')
           if IS_REDIS_VERSION_2:
               return self.db.zincrby(REDIS_KEY, proxy.string(), -1)
           return self.db.zincrby(REDIS_KEY, -1, proxy.string())
       # otherwise delete proxy
       else:
           logger.info(f'{proxy.string()} current score {score}, remove')
           return self.db.zrem(REDIS_KEY, proxy.string())

   def exists(self, proxy: Proxy) -> bool:
       """
       if proxy exists
       :param proxy: proxy
       :return: if exists, bool
       """
       return not self.db.zscore(REDIS_KEY, proxy.string()) is None

   def max(self, proxy: Proxy) -> int:
       """
       set proxy to max score
       :param proxy: proxy
       :return: new score
       """
       logger.info(f'{proxy.string()} is valid, set to {PROXY_SCORE_MAX}')
       if IS_REDIS_VERSION_2:
           return self.db.zadd(REDIS_KEY, PROXY_SCORE_MAX, proxy.string())
       return self.db.zadd(REDIS_KEY, {proxy.string(): PROXY_SCORE_MAX})

   def count(self) -> int:
       """
       get count of proxies
       :return: count, int
       """
       return self.db.zcard(REDIS_KEY)

   def all(self) -> List[Proxy]:
       """
       get all proxies
       :return: list of proxies
       """
       return convert_proxy_or_proxies(self.db.zrangebyscore(REDIS_KEY, PROXY_SCORE_MIN, PROXY_SCORE_MAX))

   def batch(self, start, end) -> List[Proxy]:
       """
       get batch of proxies
       :param start: start index
       :param end: end index
       :return: list of proxies
       """
       return convert_proxy_or_proxies(self.db.zrevrange(REDIS_KEY, start, end - 1))
if __name__ == '__main__':
   conn = RedisClient()
   result = conn.random()
   print(result)

首先我们定义了一些常量,如 PROXY_SCORE_MAX、PROXY_SCORE_MIN、PROXY_SCORE_INIT 分别代表最大分数、最小分数、初始分数。REDIS_HOST、REDIS_PORT、REDIS_PASSWORD 分别代表了 Redis 的连接信息,即地址、端口、密码。REDIS_KEY 是有序集合的键名,我们可以通过它来获取代理存储所使用的有序集合。

RedisClient 这个类可以用来操作 Redis 的有序集合,其中定义了一些方法来对集合中的元素进行处理,它的主要功能如下所示。

  • __init__ 方法是初始化的方法,其参数是 Redis 的连接信息,默认的连接信息已经定义为常量,在 __init__ 方法中初始化了一个 StrictRedis 的类,建立 Redis 连接。
  • add 方法向数据库添加代理并设置分数,默认的分数是 PROXY_SCORE_INIT 也就是 10,返回结果是添加的结果。
  • random 方法是随机获取代理的方法,首先获取 100 分的代理,然后随机选择一个返回。如果不存在 100 分的代理,则此方法按照排名来获取,选取前 100 名,然后随机选择一个返回,否则抛出异常。
  • decrease 方法是在代理检测无效的时候设置分数减 1 的方法,代理传入后,此方法将代理的分数减 1,如果分数达到最低值,那么代理就删除。
  • exists 方法可判断代理是否存在集合中。
  • max 方法将代理的分数设置为 PROXY_SCORE_MAX,即 100,也就是当代理有效时的设置。
  • count 方法返回当前集合的元素个数。
  • all 方法返回所有的代理列表,供检测使用。

定义好了这些方法,我们可以在后续的模块中调用此类来连接和操作数据库。如想要获取随机可用的代理,只需要调用 random 方法即可,得到的就是随机的可用代理。

获取模块主要是为了从各大网站抓取代理并调用存储模块进行保存,代码实现见:https://github.com/Python3WebSpider/ProxyPool/tree/master/proxypool/crawlers

获取模块的逻辑相对简单,比如我们可以定义一些抓取代理的方法,示例如下:

from proxypool.crawlers.base import BaseCrawler
from proxypool.schemas.proxy import Proxy
import re
MAX_PAGE = 5
BASE_URL = 'http://www.ip3366.net/free/?stype=1&page={page}'
class IP3366Crawler(BaseCrawler):
   """
   ip3366 crawler, http://www.ip3366.net/
   """
   urls = [BASE_URL.format(page=i) for i in range(1, 8)]

   def parse(self, html):
       """
       parse html file to get proxies
       :return:
       """
       ip_address = re.compile('<tr>\s*<td>(.*?)</td>\s*<td>(.*?)</td>')
       # \s * 匹配空格,起到换行作用
       re_ip_address = ip_address.findall(html)
       for address, port in re_ip_address:
           proxy = Proxy(host=address.strip(), port=int(port.strip()))
           yield proxy

我们在这里定义了一个代理 Crawler 类,用来抓取某一网站的代理,这里是抓取的 IP3366 的公开代理,通过 parse 方法来解析页面的源码并构造一个个 Proxy 对象返回即可。

另外在其父类 BaseCrawler 里面定义了通用的页面抓取方法,它可以读取子类里面定义的 urls 全局变量并进行爬取,然后调用子类的 parse 方法来解析页面,代码实现如下:

from retrying import retry
import requests
from loguru import logger
class BaseCrawler(object):
   urls = []

   @retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None)
   def fetch(self, url, **kwargs):
       try:
           response = requests.get(url, **kwargs)
           if response.status_code == 200:
               return response.text
       except requests.ConnectionError:
           return

   @logger.catch
   def crawl(self):
       """
       crawl main method
       """
       for url in self.urls:
           logger.info(f'fetching {url}')
           html = self.fetch(url)
           for proxy in self.parse(html):
               logger.info(f'fetched proxy {proxy.string()} from {url}')
               yield proxy

所以,我们如果要扩展一个代理的 Crawler,只需要继承 BaseCrawler 并实现 parse 方法即可,扩展性较好。

因此,这一个个的 Crawler 就可以针对各个不同的代理网站进行代理的抓取。最后有一个统一的方法将 Crawler 汇总起来,遍历调用即可。

如何汇总呢?在这里我们可以检测代码只要定义有 BaseCrawler 的子类就算一个有效的代理 Crawler,可以直接通过遍历 Python 文件包的方式来获取,代码实现如下:

import pkgutil
from .base import BaseCrawler
import inspect
# load classes subclass of BaseCrawler
classes = []
for loader, name, is_pkg in pkgutil.walk_packages(__path__):
   module = loader.find_module(name).load_module(name)
   for name, value in inspect.getmembers(module):
       globals()[name] = value
       if inspect.isclass(value) and issubclass(value, BaseCrawler) and value is not BaseCrawler:
           classes.append(value)
__all__ = __ALL__ = classes

在这里我们调用了 walk_packages 方法,遍历了整个 crawlers 模块下的类,并判断了它是 BaseCrawler 的子类,那就将其添加到结果中,并返回。

最后只要将 classes 遍历并依次实例化,调用其 crawl 方法即可完成代理的爬取和提取,代码实现见:https://github.com/Python3WebSpider/ProxyPool/blob/master/proxypool/processors/getter.py

我们已经成功将各个网站的代理获取下来了,现在就需要一个检测模块来对所有代理进行多轮检测。代理检测可用,分数就设置为 100,代理不可用,分数减 1,这样就可以实时改变每个代理的可用情况。如要获取有效代理只需要获取分数高的代理即可。

由于代理的数量非常多,为了提高代理的检测效率,我们在这里使用异步请求库 aiohttp 来进行检测。

requests 作为一个同步请求库,我们在发出一个请求之后,程序需要等待网页加载完成之后才能继续执行。也就是这个过程会阻塞等待响应,如果服务器响应非常慢,比如一个请求等待十几秒,那么我们使用 requests 完成一个请求就会需要十几秒的时间,程序也不会继续往下执行,而在这十几秒的时间里程序其实完全可以去做其他的事情,比如调度其他的请求或者进行网页解析等。

对于响应速度比较快的网站来说,requests 同步请求和 aiohttp 异步请求的效果差距没那么大。可对于检测代理来说,检测一个代理一般需要十多秒甚至几十秒的时间,这时候使用 aiohttp 异步请求库的优势就大大体现出来了,效率可能会提高几十倍不止。

所以,我们的代理检测使用异步请求库 aiohttp,实现示例如下所示:

import asyncio
import aiohttp
from loguru import logger
from proxypool.schemas import Proxy
from proxypool.storages.redis import RedisClient
from proxypool.setting import TEST_TIMEOUT, TEST_BATCH, TEST_URL, TEST_VALID_STATUS
from aiohttp import ClientProxyConnectionError, ServerDisconnectedError, ClientOSError, ClientHttpProxyError
from asyncio import TimeoutError
EXCEPTIONS = (
   ClientProxyConnectionError,
   ConnectionRefusedError,
   TimeoutError,
   ServerDisconnectedError,
   ClientOSError,
   ClientHttpProxyError
)
class Tester(object):
   """
   tester for testing proxies in queue
   """

   def __init__(self):
       """
       init redis
       """
       self.redis = RedisClient()
       self.loop = asyncio.get_event_loop()

   async def test(self, proxy: Proxy):
       """
       test single proxy
       :param proxy: Proxy object
       :return:
       """
       async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=False)) as session:
           try:
               logger.debug(f'testing {proxy.string()}')
               async with session.get(TEST_URL, proxy=f'http://{proxy.string()}', timeout=TEST_TIMEOUT,
                                      allow_redirects=False) as response:
                   if response.status in TEST_VALID_STATUS:
                       self.redis.max(proxy)
                       logger.debug(f'proxy {proxy.string()} is valid, set max score')
                   else:
                       self.redis.decrease(proxy)
                       logger.debug(f'proxy {proxy.string()} is invalid, decrease score')
           except EXCEPTIONS:
               self.redis.decrease(proxy)
               logger.debug(f'proxy {proxy.string()} is invalid, decrease score')

   @logger.catch
   def run(self):
       """
       test main method
       :return:
       """
       # event loop of aiohttp
       logger.info('stating tester...')
       count = self.redis.count()
       logger.debug(f'{count} proxies to test')
       for i in range(0, count, TEST_BATCH):
           # start end end offset
           start, end = i, min(i + TEST_BATCH, count)
           logger.debug(f'testing proxies from {start} to {end} indices')
           proxies = self.redis.batch(start, end)
           tasks = [self.test(proxy) for proxy in proxies]
           # run tasks using event loop
           self.loop.run_until_complete(asyncio.wait(tasks))
if __name__ == '__main__':
   tester = Tester()
   tester.run()

这里定义了一个类 Tester,__init__ 方法中建立了一个 RedisClient 对象,供该对象中其他方法使用。接下来定义了一个 test 方法,这个方法用来检测单个代理的可用情况,其参数就是被检测的代理。注意,test 方法前面加了 async 关键词,代表这个方法是异步的。方法内部首先创建了 aiohttp 的 ClientSession 对象,可以直接调用该对象的 get 方法来访问页面。

测试的链接在这里定义为常量 TEST_URL。如果针对某个网站有抓取需求,建议将 TEST_URL 设置为目标网站的地址,因为在抓取的过程中,代理本身可能是可用的,但是该代理的 IP 已经被目标网站封掉了。例如,某些代理可以正常访问百度等页面,但是对知乎来说可能就被封了,所以我们可以将 TEST_URL 设置为知乎的某个页面的链接,当请求失败、代理被封时,分数自然会减下来,失效的代理就不会被取到了。

如果想做一个通用的代理池,则不需要专门设置 TEST_URL,可以将其设置为一个不会封 IP 的网站,也可以设置为百度这类响应稳定的网站。

我们还定义了 TEST_VALID_STATUS 变量,这个变量是一个列表形式,包含了正常的状态码,如可以定义成 [200]。当然某些目标网站可能会出现其他的状态码,你可以自行配置。

程序在获取 Response 后需要判断响应的状态,如果状态码在 TEST_VALID_STATUS 列表里,则代表代理可用,可以调用 RedisClient 的 max 方法将代理分数设为 100,否则调用 decrease 方法将代理分数减 1,如果出现异常也同样将代理分数减 1。

另外,我们设置了批量测试的最大值为 TEST_BATCH,也就是一批测试最多 TEST_BATCH 个,这可以避免代理池过大时一次性测试全部代理导致内存开销过大的问题。当然也可以用信号量来实现并发控制。

随后,在 run 方法里面获取了所有的代理列表,使用 aiohttp 分配任务,启动运行。这样在不断的运行过程中,代理池中无效的代理的分数会一直被减 1,直至被清除,有效的代理则会一直保持 100 分,供随时取用。

这样,测试模块的逻辑就完成了。

通过上述 3 个模块,我们已经可以做到代理的获取、检测和更新,数据库就会以有序集合的形式存储各个代理及其对应的分数,分数 100 代表可用,分数越小代表越不可用。

但是我们怎样方便地获取可用代理呢?可以用 RedisClient 类直接连接 Redis,然后调用 random 方法。这样做没问题,效率很高,但是会有几个弊端。

  • 如果其他人使用这个代理池,他需要知道 Redis 连接的用户名和密码信息,这样很不安全。
  • 如果代理池需要部署在远程服务器上运行,而远程服务器的 Redis 只允许本地连接,那么我们就不能远程直连 Redis 来获取代理。
  • 如果爬虫所在的主机没有连接 Redis 模块,或者爬虫不是由 Python 语言编写的,那么我们就无法使用 RedisClient 来获取代理。
  • 如果 RedisClient 类或者数据库结构有更新,那么爬虫端必须同步这些更新,这样非常麻烦。

综上考虑,为了使代理池可以作为一个独立服务运行,我们最好增加一个接口模块,并以 Web API 的形式暴露可用代理。

这样一来,获取代理只需要请求接口即可,以上的几个缺点弊端也可以避免。

我们使用一个比较轻量级的库 Flask 来实现这个接口模块,实现示例如下所示:

from flask import Flask, g
from proxypool.storages.redis import RedisClient
from proxypool.setting import API_HOST, API_PORT, API_THREADED
__all__ = ['app']
app = Flask(__name__)
def get_conn():
   """
   get redis client object
   :return:
   """
   if not hasattr(g, 'redis'):
       g.redis = RedisClient()
   return g.redis
@app.route('/')
def index():
   """
   get home page, you can define your own templates
   :return:
   """
   return '<h2>Welcome to Proxy Pool System</h2>'
@app.route('/random')
def get_proxy():
   """
   get a random proxy
   :return: get a random proxy
   """
   conn = get_conn()
   return conn.random().string()
@app.route('/count')
def get_count():
   """
   get the count of proxies
   :return: count, int
   """
   conn = get_conn()
   return str(conn.count())
if __name__ == '__main__':
   app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)

在这里,我们声明了一个 Flask 对象,定义了 3 个接口,分别是首页、随机代理页、获取数量页。

运行之后,Flask 会启动一个 Web 服务,我们只需要访问对应的接口即可获取到可用代理。

调度模块就是调用以上所定义的 3 个模块,将这 3 个模块通过多进程的形式运行起来,示例如下所示:

import time
import multiprocessing
from proxypool.processors.server import app
from proxypool.processors.getter import Getter
from proxypool.processors.tester import Tester
from proxypool.setting import CYCLE_GETTER, CYCLE_TESTER, API_HOST, API_THREADED, API_PORT, ENABLE_SERVER, \
   ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
from loguru import logger
if IS_WINDOWS:
   multiprocessing.freeze_support()
tester_process, getter_process, server_process = None, None, None
class Scheduler():
   """
   scheduler
   """

   def run_tester(self, cycle=CYCLE_TESTER):
       """
       run tester
       """
       if not ENABLE_TESTER:
           logger.info('tester not enabled, exit')
           return
       tester = Tester()
       loop = 0
       while True:
           logger.debug(f'tester loop {loop} start...')
           tester.run()
           loop += 1
           time.sleep(cycle)

   def run_getter(self, cycle=CYCLE_GETTER):
       """
       run getter
       """
       if not ENABLE_GETTER:
           logger.info('getter not enabled, exit')
           return
       getter = Getter()
       loop = 0
       while True:
           logger.debug(f'getter loop {loop} start...')
           getter.run()
           loop += 1
           time.sleep(cycle)

   def run_server(self):
       """
       run server for api
       """
       if not ENABLE_SERVER:
           logger.info('server not enabled, exit')
           return
       app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED)

   def run(self):
       global tester_process, getter_process, server_process
       try:
           logger.info('starting proxypool...')
           if ENABLE_TESTER:
               tester_process = multiprocessing.Process(target=self.run_tester)
               logger.info(f'starting tester, pid {tester_process.pid}...')
               tester_process.start()

           if ENABLE_GETTER:
               getter_process = multiprocessing.Process(target=self.run_getter)
               logger.info(f'starting getter, pid{getter_process.pid}...')
               getter_process.start()

           if ENABLE_SERVER:
               server_process = multiprocessing.Process(target=self.run_server)
               logger.info(f'starting server, pid{server_process.pid}...')
               server_process.start()

           tester_process.join()
           getter_process.join()
           server_process.join()
       except KeyboardInterrupt:
           logger.info('received keyboard interrupt signal')
           tester_process.terminate()
           getter_process.terminate()
           server_process.terminate()
       finally:
           # must call join method before calling is_alive
           tester_process.join()
           getter_process.join()
           server_process.join()
           logger.info(f'tester is {"alive" if tester_process.is_alive() else "dead"}')
           logger.info(f'getter is {"alive" if getter_process.is_alive() else "dead"}')
           logger.info(f'server is {"alive" if server_process.is_alive() else "dead"}')
           logger.info('proxy terminated')
if __name__ == '__main__':
   scheduler = Scheduler()
   scheduler.run()

3 个常量 ENABLE_TESTER、ENABLE_GETTER、ENABLE_SERVER 都是布尔类型,表示测试模块、获取模块、接口模块的开关,如果都为 True,则代表模块开启。

启动入口是 run 方法,这个方法分别判断 3 个模块的开关。如果开关开启,启动时程序就新建一个 Process 进程,设置好启动目标,然后调用 start 方法运行,这样 3 个进程就可以并行执行,互不干扰。

3 个调度方法结构也非常清晰。比如,run_tester 方法用来调度测试模块,首先声明一个 Tester 对象,然后进入死循环不断循环调用其 run 方法,执行完一轮之后就休眠一段时间,休眠结束之后重新再执行。在这里,休眠时间也定义为一个常量,如 20 秒,即每隔 20 秒进行一次代理检测。

最后,只需要调用 Scheduler 的 run 方法即可启动整个代理池。

以上内容便是整个代理池的架构和相应实现逻辑。

接下来我们将代码整合一下,将代理运行起来,运行之后的输出结果如下所示:

2020-04-13 02:52:06.510 | INFO     | proxypool.storages.redis:decrease:73 - 60.186.146.193:9000 current score 10.0, decrease 1
2020-04-13 02:52:06.517 | DEBUG    | proxypool.processors.tester:test:52 - proxy 60.186.146.193:9000 is invalid, decrease score
2020-04-13 02:52:06.524 | INFO     | proxypool.storages.redis:decrease:73 - 60.186.151.147:9000 current score 10.0, decrease 1
2020-04-13 02:52:06.532 | DEBUG    | proxypool.processors.tester:test:52 - proxy 60.186.151.147:9000 is invalid, decrease score
2020-04-13 02:52:07.159 | INFO     | proxypool.storages.redis:max:96 - 60.191.11.246:3128 is valid, set to 100
2020-04-13 02:52:07.167 | DEBUG    | proxypool.processors.tester:test:46 - proxy 60.191.11.246:3128 is valid, set max score
2020-04-13 02:52:17.271 | INFO     | proxypool.storages.redis:decrease:73 - 59.62.7.130:9000 current score 10.0, decrease 1
2020-04-13 02:52:17.280 | DEBUG    | proxypool.processors.tester:test:52 - proxy 59.62.7.130:9000 is invalid, decrease score
2020-04-13 02:52:17.288 | INFO     | proxypool.storages.redis:decrease:73 - 60.167.103.74:1133 current score 10.0, decrease 1
2020-04-13 02:52:17.295 | DEBUG    | proxypool.processors.tester:test:52 - proxy 60.167.103.74:1133 is invalid, decrease score
2020-04-13 02:52:17.302 | INFO     | proxypool.storages.redis:decrease:73 - 60.162.71.113:9000 current score 10.0, decrease 1
2020-04-13 02:52:17.309 | DEBUG    | proxypool.processors.tester:test:52 - proxy 60.162.71.113:9000 is invalid, decrease score

以上是代理池的控制台输出,可以看到可用代理设置为 100,不可用代理分数减 1。

接下来我们再打开浏览器,当前配置了运行在 5555 端口,所以打开:http://127.0.0.1:5555,即可看到其首页,如图所示。

Ciqah16UK_WAX8YSAAEMqmBH7rI997.png

再访问 http://127.0.0.1:5555/random,即可获取随机可用代理,如图所示。

Cgq2xl6UK_WARL3GAADK8PRJ5EQ293.png

我们只需要访问此接口即可获取一个随机可用代理,这非常方便。

获取代理的代码如下所示:

import requests
 
PROXY_POOL_URL = 'http://localhost:5555/random'
 
def get_proxy():
   try:
       response = requests.get(PROXY_POOL_URL)
       if response.status_code == 200:
           return response.text
   except ConnectionError:
       return None

这样便可以获取到一个随机代理了,它是字符串类型,此代理可以按照上一课时所示的方法设置,如 requests 的使用方法如下所示:

import requests
 
proxy = get_proxy()
proxies = {
   'http': 'http://' + proxy,
   'https': 'https://' + proxy,
}
try:
   response = requests.get('http://httpbin.org/get', proxies=proxies)
   print(response.text)
except requests.exceptions.ConnectionError as e:
   print('Error', e.args)

有了代理池之后,我们再取出代理即可有效防止 IP 被封禁的情况。

本课时代码地址为:https://github.com/Python3WebSpider/ProxyPool,代码量相比之前的案例复杂了很多,逻辑也相对完善。另外代码库中还提供了 Docker 和 Kubernetes 的运行和部署操作,可以帮助我们更加快捷地运行代理池,如果你感兴趣可以了解下。

验证码反爬虫的基本原理

我们在浏览网站的时候经常会遇到各种各样的验证码,在多数情况下这些验证码会出现在登录账号的时候,也可能会出现在访问页面的过程中,严格来说,这些行为都算验证码反爬虫。

本课时我们就来介绍下验证码反爬虫的基本原理及常见的验证码和解决方案。

验证码,全称叫作 Completely Automated Public Turing test to tell Computers and Humans Apart,意思是全自动区分计算机和人类的图灵测试,取了它们关键词的首字母变成了 CAPTCHA,它是一种用来区分用户是计算机还是人的公共全自动程序。

它有什么用呢?当然很多用处,如:

  • 网站注册的时候加上验证码,可以一定程度上防止恶意大批量注册。
  • 网站登录的时候加上验证码,可以一定程度上防止恶意密码爆破。
  • 网站在发表评论的时候加上验证码,可以在一定程度上防止恶意灌水。
  • 网站在投票的时候加上验证码,可以在一定程度上防止恶意刷票。
  • 网站在被频繁访问的时候或者浏览行为不正常的时候,一般可能是遇到了爬虫,可以一定程度上防止爬虫的爬取。

总的来说呢,以上的行为都可以称之为验证码反爬虫行为。使用验证码可以防止各种可以用程序模拟的行为。有了验证码,机器要想完全自动化执行就会遇到一些麻烦,当然这个麻烦的大小就取决于验证码的破解难易程度了。

验证码反爬虫

那为什么会出现验证码呢?在大多数情形下是因为网站的访问频率过高或者行为异常,或者是为了直接限制某些自动化行为。归类如下:

  • 很多情况下,比如登录和注册,这些验证码几乎是必现的,它的目的就是为了限制恶意注册、恶意爆破等行为,这也算反爬的一种手段。
  • 一些网站遇到访问频率过高的行为的时候,可能会直接弹出一个登录窗口,要求我们登录才能继续访问,此时的验证码就直接和登录表单绑定在一起了,这就算检测到异常之后利用强制登录的方式进行反爬。
  • 一些较为常规的网站如果遇到访问频率稍高的情形的时候,会主动弹出一个验证码让用户识别并提交,验证当前访问网站的是不是真实的人,用来限制一些机器的行为,实现反爬虫。

这几种情形都能在一定程度上限制程序的一些自动化行为,因此都可以称之为反爬虫。

验证码反爬虫的原理

在模块一的时候,我们已经讲到过 Session 的基本概念了,它是存在于服务端的,用于保存当前用户的会话信息,这个信息对于验证码的机制非常重要。

服务端是可以往 Session 对象里面存一些值的,比如我们要生成一个图形验证码,比如 1234 这四个数字的图形验证码。

首先客户端要显示某个验证码,这个验证码相关的信息肯定要从服务器端来获取。比如说请求了这个生成验证码的接口,我们要生成一个图形验证码,内容为 1234,这时候服务端会将 1234 这四个数字保存到 Session 对象里面,然后把 1234 这个结果返回给客户端,或者直接把生成好的验证码图形返回也是可以的,客户端会将其呈现出来,用户就能看到验证码的内容了。

用户看到验证码之后呢,就会在表单里面输入验证码的内容,点击提交按钮的时候,这些信息就会又发送给服务器,服务器拿着提交的信息和 Session 里面保存的验证码信息后进行对比,如果一致,那就代表验证码输入正确,校验成功,然后就继续放行恢复正常状态。如果不一致,那就代表校验失败,会继续进行校验。

目前市面上大多数的验证码都是基于这个机制来实现的,归类如下:

  • 对于图形验证码,服务器会把图形的内容保存到 Session,然后将验证码图返回或者客户端自行显示,等用户提交表单之后校验 Session 里验证码的值和用户提交的值。
  • 对于行为验证码,服务器会做一些计算,把一些 Key、Token 等信息也储存在 Session 里面,用户首先要完成客户端的校验,如果校验成功才能提交表单,当客户端的校验完成之后,客户端会把验证之后计算产生的 Key、Token、Code 等信息发送到服务端,服务端会再做一次校验,如果服务端也校验通过了,那就算真正的通过了。
  • 对于手机验证码,服务器会预先生成一个验证码的信息,然后会把这个验证码的结果还有要发送的手机号发送给短信发送服务商,让服务商下发验证码给用户,用户再把这个码提交给服务器,服务器判断 Session 里面的验证码和提交的验证码是否一致即可。

还有很多其他的验证码,其原理基本都是一致的。

常见验证码

下面我们再来看看市面上的一些常见的验证码,并简单介绍一些识别思路。

图形验证码

最基本的验证码就是图形验证码了,比如下图。

Ciqah16W5UyAEhSHAAAX1y2xT2g229.png

一般来说,识别思路有这么几种:

  • 利用 OCR 识别,比如 Tesserocr 等库,或者直接调用 OCR 接口,如百度、腾讯的,识别效果相比 Tesserocr 更好。
  • 打码平台,把验证码发送给打码平台,平台内实现了一些强大的识别算法或者平台背后有人来专门做识别,速度快,省心。
  • 深度学习训练,这类验证码也可以使用 CNN 等深度学习模型来训练分类算法,但是如果种类繁多或者写法各异的话,其识别精度会有一些影响。

行为验证码

现在我们能见到非常多类型的行为验证码,可以说是十分流行了,比如极验、腾讯、网易盾等等都有类似的验证码服务,另外验证的方式也多种多样,如滑动、拖动、点选、逻辑判断等等,如图所示。

Cgq2xl6W5UyAGwElAADQXzjdcRk446.png
CgoCgV6W5U2AP1KFAALydwxJhow955.png
Ciqah16W5U2AO2ACAAK0QZRq5dQ995.png
Cgq2xl6W5U2AUskeAAFkApuai3Y580.png

这里推荐的识别方案有以下几种:

  • 打码平台,这里面很多验证码都是与坐标相关的,我们可以直接将验证码截图发送给打码平台,打码平台背后会有人帮我们找到对应的位置坐标,获取位置坐标之后就可以来模拟了。这时候模拟的方法有两种,一种是模拟行为,使用 Selenium 等实现,模拟完成之后通常能登录或者解锁某个 Session 封锁状态,获取有效 Cookies 即可。另一种是在 JavaScript 层级上模拟,这种难度更高,模拟完了可以直接获取验证码提交的一些 Token 值等内容。
  • 深度学习,利用一些图像标注加深度学习的方法同样可以识别验证码,其实主要还是识别位置,有了位置之后同样可以模拟。

短信、扫码验证码

另外我们可能遇到一些类似短信、扫码的验证码,这种操作起来就会更加麻烦,一些解决思路如下:

  • 手机号可以不用自己的,可以从某些平台来获取,平台维护了一套手机短信收发系统,填入手机号,并通过 API 获取短信验证码即可。
  • 另外也可以购买一些专业的收码设备或者安装一些监听短信的软件,它会有一些机制把一些手机短信信息导出到某个接口或文本或数据库,然后再提取即可。
  • 对于扫码验证的情况,如果不用自己的账号,可以把码发送到打码平台,让对方用自己的账号扫码处理,但这种情况多数需要定制,可以去跟平台沟通。另外的方案就涉及到逆向和破解相关的内容了,一般需要逆向手机 App 内的扫码和解析逻辑,然后再模拟,这里就不再展开讲了。

基本上验证码都是类似的,其中有一些列举不全,但是基本类别都能大致归类。

以上我们就介绍了验证码反爬虫的基本原理和一些验证码识别的思路。在后面的课时我会介绍使用打码平台和深度学习的方式来识别验证码的方案。

学会用打码平台处理验证码

在前一课时我们介绍了多种多样的验证码,有图形文字的、有模拟点选的、有拖动滑动的,但其实归根结底都需要人来对某种情形做一些判断,然后把结果返回并提交。如果此时提交的验证码结果是正确的,并且通过了一些验证码的检测,就能成功突破这个验证码了。

那么,既然验证码就是让人来识别的,那么机器怎么办呢?如果我们也不会什么算法,怎么去解这些验证码呢?此时如果有一个帮助我们来识别验证码的工具或平台就好了,让工具或平台把验证码识别的结果返回给我们,我们拿着结果提交,那不就好了吗?

有这种工具或平台吗?还真有专门的打码平台帮助我们来识别各种各样的验证码,平台内部对算法和人力做了集成,可以 7x24 小时来识别各种验证码,包括识别图形、坐标点、缺口等各种验证码,返回对应的结果或坐标,正好可以解决我们的问题。

本课时我们就来介绍利用打码平台来识别验证码的流程。

本课时我们以一种点选验证码为例来讲解打码平台的使用方法,验证码的链接为:https://captcha3.scrape.center/,这个网站在每次登录的时候都会弹出一个验证码,其验证码效果图如下所示。

Cgq2xl6ZWNWAUhQaAAMYtARC5aI058.png

这个验证码上面显示了几个汉字,同时在图中也显示了几个汉字,我们需要按照顺序依次点击汉字在图中的位置,点击完成之后确认提交,即可完成验证。

这种验证码如果我们没有任何图像识别算法基础的话,是很难去识别的,所以这里我们可以借助打码平台来帮助我们识别汉字的位置。

我们使用的 Python 库是 Selenium,使用的浏览器为 Chrome。

在本课时开始之前请确保已经正确安装好 Selenium 库、Chrome 浏览器,并配置好 ChromeDriver,相关流程可以参考 Selenium 那一课时的介绍。

另外本课时使用的打码平台是超级鹰,链接为:https://www.chaojiying.com/,在使用之前请你自己注册账号并获取一些题分供测试,另外还可以了解平台可识别的验证码的类别。

打码平台能提供的服务种类一般都非常广泛,可识别的验证码类型也非常多,其中就包括点触验证码。

超级鹰平台同样支持简单的图形验证码识别。超级鹰平台提供了如下一些服务。

  • 英文数字:提供最多 20 位英文数字的混合识别;

  • 中文汉字:提供最多 7 个汉字的识别;

  • 纯英文:提供最多 12 位的英文识别;

  • 纯数字:提供最多 11 位的数字识别;

  • 任意特殊字符:提供不定长汉字英文数字、拼音首字母、计算题、成语混合、集装箱号等字符的识别;

  • 坐标选择识别:如复杂计算题、选择题四选一、问答题、点击相同的字、物品、动物等返回多个坐标的识别。

具体如有变动以官网为准:https://www.chaojiying.com/price.html

这里需要处理的就是坐标多选识别的情况。我们先将验证码图片提交给平台,平台会返回识别结果在图片中的坐标位置,然后我们再解析坐标模拟点击。

下面我们就用程序来实现。

获取 API

在官方网站下载对应的 Python API,链接为:https://www.chaojiying.com/api-14.html。API 是 Python 2 版本的,是用 requests 库来实现的。我们可以简单更改几个地方,即可将其修改为 Python 3 版本。

修改之后的 API 如下所示:

import requests
from hashlib import md5
class Chaojiying(object):

   def __init__(self, username, password, soft_id):
       self.username = username
       self.password = md5(password.encode('utf-8')).hexdigest()
       self.soft_id = soft_id
       self.base_params = {
           'user': self.username,
           'pass2': self.password,
           'softid': self.soft_id,
       }
       self.headers = {
           'Connection': 'Keep-Alive',
           'User-Agent': 'Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)',
       }

   def post_pic(self, im, codetype):
       """
       im: 图片字节
       codetype: 题目类型 参考 http://www.chaojiying.com/price.html
       """
       params = {
           'codetype': codetype,
       }
       params.update(self.base_params)
       files = {'userfile': ('ccc.jpg', im)}
       r = requests.post('http://upload.chaojiying.net/Upload/Processing.php', data=params, files=files,
                         headers=self.headers)
       return r.json()

   def report_error(self, im_id):
       """
       im_id:报错题目的图片ID
       """
       params = {
           'id': im_id,
       }
       params.update(self.base_params)
       r = requests.post('http://upload.chaojiying.net/Upload/ReportError.php', data=params, headers=self.headers)
       return r.json()

这里定义了一个 Chaojiying 类,其构造函数接收三个参数,分别是超级鹰的用户名、密码以及软件 ID,保存以备使用。

最重要的一个方法叫作 post_pic,它需要传入图片对象和验证码类型的代号。该方法会将图片对象和相关信息发给超级鹰的后台进行识别,然后将识别成功的 JSON 返回。

另一个方法叫作 report_error,它是发生错误时的回调。如果验证码识别错误,调用此方法会返回相应的题分。

接下来,我们以 https://captcha3.scrape.center/ 为例来演示下识别的过程。

首先我们引入一些必要的包,然后初始化一些变量,如 WebDriver、Chaojiying 对象等,代码实现如下所示:

import time
from io import BytesIO
from PIL import Image
from selenium import webdriver
from selenium.webdriver import ActionChains
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from chaojiying import Chaojiying
USERNAME = 'admin'
PASSWORD = 'admin'
CHAOJIYING_USERNAME = ''
CHAOJIYING_PASSWORD = ''
CHAOJIYING_SOFT_ID = 893590
CHAOJIYING_KIND = 9102
if not CHAOJIYING_USERNAME or not CHAOJIYING_PASSWORD:
   print('请设置用户名和密码')
   exit(0)
class CrackCaptcha():
   def __init__(self):
       self.url = 'https://captcha3.scrape.center/'
       self.browser = webdriver.Chrome()
       self.wait = WebDriverWait(self.browser, 20)
       self.username = USERNAME
       self.password = PASSWORD
       self.chaojiying = Chaojiying(CHAOJIYING_USERNAME, CHAOJIYING_PASSWORD, CHAOJIYING_SOFT_ID)

这里的 USERNAME、PASSWORD 是示例网站的用户名和密码,都设置为 admin 即可。另外 CHAOJIYING_USERNAME、CHAOJIYING_PASSWORD 就是超级鹰打码平台的用户名和密码,可以自行设置成自己的。

另外这里定义了一个 CrackCaptcha 类,初始化了浏览器对象和打码平台的操作对象。

接下来我们用 Selenium 模拟呼出验证码开始验证就好啦。

获取验证码

接下来的步骤就是完善相关表单,模拟点击呼出验证码了,代码实现如下所示:

def open(self):
   """
   打开网页输入用户名密码
   :return: None
   """
   self.browser.get(self.url)
   # 填入用户名密码
   username = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[type="text"]')))
   password = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'input[type="password"]')))
   username.send_keys(self.username)
   password.send_keys(self.password)
def get_captcha_button(self):
   """
   获取初始验证按钮
   :return:
   """
   button = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'button[type="button"]')))
   return button

这里我们调用了 open 方法负责填写表单,get_captcha_button 方法获取验证码按钮,之后触发点击,这时候就可以看到页面已经把验证码呈现出来了。

有了验证码的图片,我们下一步要做的就是把验证码的具体内容获取下来,然后发送给打码平台识别。

那怎么获取验证码的图片呢?我们可以先获取验证码图片的位置和大小,从网页截图里截取相应的验证码图片即可,代码实现如下所示:

def get_captcha_element(self):
   """
   获取验证图片对象
   :return: 图片对象
   """
   # 验证码图片加载出来
   self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, 'img.geetest_item_img')))
   # 验证码完整节点
   element = self.wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'geetest_panel_box')))
   print('成功获取验证码节点')
   return element
def get_captcha_position(self):
   """
   获取验证码位置
   :return: 验证码位置元组
   """
   element = self.get_captcha_element()
   time.sleep(2)
   location = element.location
   size = element.size
   top, bottom, left, right = location['y'], location['y'] + size['height'], location['x'], location['x'] + size[
       'width']
   return (top, bottom, left, right)
def get_screenshot(self):
   """
   获取网页截图
   :return: 截图对象
   """
   screenshot = self.browser.get_screenshot_as_png()
   screenshot = Image.open(BytesIO(screenshot))
   screenshot.save('screenshot.png')
   return screenshot
def get_captcha_image(self, name='captcha.png'):
   """
   获取验证码图片
   :return: 图片对象
   """
   top, bottom, left, right = self.get_captcha_position()
   print('验证码位置', top, bottom, left, right)
   screenshot = self.get_screenshot()
   captcha = screenshot.crop((left, top, right, bottom))
   captcha.save(name)
   return captcha

这里 get_captcha_image 方法即为从网页截图中截取对应的验证码图片,其中验证码图片的相对位置坐标由 get_captcha_position 方法返回得到。所以就是利用了先截图再裁切的方法获取了验证码。

注意:如果你的屏幕是高清屏如 Mac 的 Retina 屏幕的话,可能需要适当调整下屏幕分辨率或者对获取到的验证码位置做一些倍数偏移计算。

最后我们得到的验证码是 Image 对象,其结果样例如图所示。

CgoCgV6ZWNWAT2cwAANbWOcLXsY268.png

识别验证码

现在我们有了验证码图了,下一步就是把图发送给打码平台了。

我们调用 Chaojiying 对象的 post_pic 方法,即可把图片发送给超级鹰后台,这里发送的图像是字节流格式,代码实现如下所示:

image = self.get_touclick_image()
bytes_array = BytesIO()
image.save(bytes_array, format='PNG')
# 识别验证码
result = self.chaojiying.post_pic(bytes_array.getvalue(), CHAOJIYING_KIND)
print(result)

运行之后,result 变量就是超级鹰后台的识别结果。可能运行需要等待几秒,它会返回一个 JSON 格式的字符串。

如果识别成功,典型的返回结果如下所示:

{'err_no': 0, 'err_str': 'OK', 'pic_id': '6002001380949200001', 'pic_str': '132,127|56,77', 'md5': '1f8e1d4bef8b11484cb1f1f34299865b'}
其中,pic_str 就是识别的文字的坐标,是以字符串形式返回的,每个坐标都以 | 分隔。接下来我们只需要将其解析,然后模拟点击,代码实现如下所示:
def get_points(self, captcha_result):
   """
   解析识别结果
   :param captcha_result: 识别结果
   :return: 转化后的结果
   """
   groups = captcha_result.get('pic_str').split('|')
   locations = [[int(number) for number in group.split(',')] for group in groups]
   return locations
def touch_click_words(self, locations):
   """
   点击验证图片
   :param locations: 点击位置
   :return: None
   """
   for location in locations:
       ActionChains(self.browser).move_to_element_with_offset(self.get_captcha_element(), location[0], location[1]).click().perform()
       time.sleep(1)

这里用 get_points 方法将识别结果变成列表的形式。touch_click_words 方法则通过调用 move_to_element_with_offset 方法依次传入解析后的坐标,点击即可。

这样我们就模拟完成坐标的点选了,运行效果如下所示。

Ciqah16ZWNaAJNOKAAM9-CV9h3A564.png

最后再模拟点击提交验证的按钮,等待验证通过就会自动登录啦,后续实现在此不再赘述。

如何判断登录是否成功呢?同样可以使用 Selenium 的判定条件,比如判断页面里面出现了某个文字就代表登录成功了,代码如下:

# 判定是否成功
success = self.wait.until(EC.text_to_be_present_in_element((By.TAG_NAME, 'h2'), '登录成功'))

比如这里我们判定了点击确认按钮,页面会不会跳转到提示成功的页面,成功的页面包含一个 h2 节点,包含“登录成功”四个字,就代表登录成功啦。

这样我们就借助在线验证码平台完成了点触验证码的识别。此方法是一种通用方法,我们也可以用此方法来识别图文、数字、算术等各种各样的验证码。

本课时我们通过在线打码平台辅助完成了验证码的识别。这种识别方法非常强大,几乎任意的验证码都可以识别。如果遇到难题,借助打码平台无疑是一个极佳的选择。

更智能的深度学习处理验证码

我们在前面讲解了如何使用打码平台来识别验证码,简单高效。但是也有一些缺点,比如效率可能没那么高,准确率也不一定能做到完全可控,并且需要付出一定的费用。

本课时我们就来介绍使用深度学习来识别验证码的方法,训练好对应的模型就能更好地对验证码进行识别,并且准确率可控,节省一定的成本。

本课时我们以深度学习识别滑块验证码为例来讲解深度学习对于此类验证码识别的实现。

滑块验证码是怎样的呢?如图所示,验证码是一张矩形图,图片左侧会出现一个滑块,右侧会出现一个缺口,下侧会出现一个滑轨。左侧的滑块会随着滑轨的拖动而移动,如果能将左侧滑块匹配滑动到右侧缺口处,就算完成了验证。

1.png

由于这种验证码交互形式比较友好,且安全性、美观度上也会更高,像这种类似的验证码也变得越来越流行。另外不仅仅是“极验”,其他很多验证码服务商也推出了类似的验证码服务,如“网易易盾”等,上图所示的就是“网易易盾”的滑动验证码。

没错,这种滑动验证码的出现确实让很多网站变得更安全。但是做爬虫的可就苦恼了,如果想采用自动化的方法来绕过这种滑动验证码,关键点在于以下两点:

  • 找出目标缺口的位置。
  • 模拟人的滑动轨迹将滑块滑动到缺口处。

那么问题来了,第一步怎么做呢?

接下来我们就来看看如何利用深度学习来实现吧。

我们的目标就是输入一张图,输出缺口的的位置,所以只需要将这个问题归结成一个深度学习的“目标检测”问题就好了。

首先在开始之前简单说下目标检测。什么叫目标检测?顾名思义,就是把我们想找的东西找出来。比如给一张“狗”的图片,如图所示:

2.png

我们想知道这只狗在哪,它的舌头在哪,找到了就把它们框选出来,这就是目标检测。

经过目标检测算法处理之后,我们期望得到的图片是这样的:

3.png

可以看到这只狗和它的舌头就被框选出来了,这样就完成了一个不错的目标检测。

当前做目标检测的算法主要有两个方向,有一阶段式和两阶段式,英文分别叫作 One stage 和 Two stage,简述如下。

  • Two Stage:算法首先生成一系列目标所在位置的候选框,然后再对这些框选出来的结果进行样本分类,即先找出来在哪,然后再分出来是什么,俗话说叫“看两眼”,这种算法有 R-CNN、Fast R-CNN、Faster R-CNN 等,这些算法架构相对复杂,但准确率上有优势。
  • One Stage:不需要产生候选框,直接将目标定位和分类的问题转化为回归问题,俗话说叫“看一眼”,这种算法有 YOLO、SSD,这些算法虽然准确率上不及 Two stage,但架构相对简单,检测速度更快。

所以这次我们选用 One Stage 的有代表性的目标检测算法 YOLO 来实现滑动验证码缺口的识别。

YOLO,英文全称叫作 You Only Look Once,取了它们的首字母就构成了算法名,目前 YOLO 算法最新的版本是 V3 版本,这里算法的具体流程我们就不过多介绍了,如果你感兴趣可以搜一下相关资料了解下,另外也可以了解下 YOLO V1~V3 版本的不同和改进之处,这里列几个参考链接。

回归我们本课时的主题,我们要做的是缺口的位置识别,那么第一步应该做什么呢?

我们的目标是要训练深度学习模型,那我们总得需要让模型知道要学点什么东西吧,这次我们做缺口识别,那么我们需要让模型学的就是找到这个缺口在哪里。由于一张验证码图片只有一个缺口,要分类就是一类,所以我们只需要找到缺口位置就行了。

好,那模型要学如何找出缺口的位置,就需要我们提供样本数据让模型来学习才行。样本数据怎样的呢?样本数据就得有带缺口的验证码图片以及我们自己标注的缺口位置。只有把这两部分都告诉模型,模型才能去学习。等模型学好了,当我们再给个新的验证码时,就能检测出缺口在哪里了,这就是一个成功的模型。

OK,那我们就开始准备数据和缺口标注结果吧。

数据这里用的是网易盾的验证码,验证码图片可以自行收集,写个脚本批量保存下来就行。标注的工具可以使用 LabelImg,GitHub 链接为:https://github.com/tzutalin/labelImg,利用它我们可以方便地进行检测目标位置的标注和类别的标注,如这里验证码和标注示例如下:

4.png

标注完了会生成一系列 xml 文件,你需要解析 xml 文件把位置的坐标和类别等处理一下,转成训练模型需要的数据。

在这里我已经整理好了我的数据集,完整 GitHub 链接为:https://github.com/Python3WebSpider/DeepLearningSlideCaptcha,我标注了 200 多张图片,然后处理了 xml 文件,变成训练 YOLO 模型需要的数据格式,验证码图片和标注结果见 data/captcha 文件夹。

如果要训练自己的数据,数据格式准备见:https://github.com/eriklindernoren/PyTorch-YOLOv3#train-on-custom-dataset

上一步我已经把标注好的数据处理好了,可以直接拿来训练了。

由于 YOLO 模型相对比较复杂,所以这个项目我就直接基于开源的 PyTorch-YOLOV3 项目来进行修改了,模型使用的深度学习框架为 PyTorch,具体的 YOLO V3 模型的实现这里不再阐述了。

另外推荐使用 GPU 训练,不然拿 CPU 直接训练速度会很慢。我的 GPU 是 P100,几乎十几秒就训练完一轮。

下面就直接把代码克隆下来吧。

由于本项目我把训练好的模型也放上去了,使用了 Git LFS,所以克隆时间较长,克隆命令如下:

git clone https://github.com/Python3WebSpider/DeepLearningSlideCaptcha.git

如果想加速克隆,可以暂时先跳过大文件模型下载,可以执行命令:

GIT_LFS_SKIP_SMUDGE=1 git clone https://github.com/Python3WebSpider/DeepLearningSlideCaptcha.git

代码克隆下载之后,我们还需要下载一些预训练模型。

YOLOV3 的训练要加载预训练模型才能有不错的训练效果,预训练模型下载命令如下:

bash prepare.sh

执行这个脚本,就能下载 YOLO V3 模型的一些权重文件,包括 yolov3 和 weights,还有 darknet 的 weights,在训练之前我们需要用这些权重文件初始化 YOLO V3 模型。

注意:Windows 下建议使用 Git Bash 来运行上述命令。

另外还需要安装一些必须的库,如 PyTorch、TensorBoard 等,建议使用 Python 虚拟环境,运行命令如下:

pip3 install -r requirements.txt

这些库都安装好了之后,就可以开始训练了。

本项目已经提供了标注好的数据集,在 data/captcha,可以直接使用。

当前数据训练脚本:

bash train.sh

实测 P100 训练时长约 15 秒一个 epoch,大约几分钟即可训练出较好效果。

训练差不多了,我们便可以使用 TensorBoard 来看看 loss 和 mAP 的变化,运行 TensorBoard:

tensorboard --logdir='logs' --port=6006 --host 0.0.0.0

loss_1 变化如下:

5.png

val_mAP 变化如下:

6.png

可以看到 loss 从最初的非常高下降到了很低,准确率也逐渐接近 100%。

另外训练过程中还能看到如下的输出结果:

---- [Epoch 99/100, Batch 27/29] ----
+------------+--------------+--------------+--------------+
| Metrics    | YOLO Layer 0 | YOLO Layer 1 | YOLO Layer 2 |
+------------+--------------+--------------+--------------+
| grid_size  | 14           | 28           | 56           |
| loss       | 0.028268     | 0.046053     | 0.043745     |
| x          | 0.002108     | 0.005267     | 0.008111     |
| y          | 0.004561     | 0.002016     | 0.009047     |
| w          | 0.001284     | 0.004618     | 0.000207     |
| h          | 0.000594     | 0.000528     | 0.000946     |
| conf       | 0.019700     | 0.033624     | 0.025432     |
| cls        | 0.000022     | 0.000001     | 0.000002     |
| cls_acc    | 100.00%      | 100.00%      | 100.00%      |
| recall50   | 1.000000     | 1.000000     | 1.000000     |
| recall75   | 1.000000     | 1.000000     | 1.000000     |
| precision  | 1.000000     | 0.800000     | 0.666667     |
| conf_obj   | 0.994271     | 0.999249     | 0.997762     |
| conf_noobj | 0.000126     | 0.000158     | 0.000140     |
+------------+--------------+--------------+--------------+
Total loss 0.11806630343198776

这里显示了训练过程中各个指标的变化情况,如 loss、recall、precision、confidence 等,分别代表训练过程的损失(越小越好)、召回率(能识别出的结果占应该识别出结果的比例,越高越好)、精确率(识别出的结果中正确的比率,越高越好)、置信度(模型有把握识别对的概率,越高越好),可以作为参考。

训练完毕之后会在 checkpoints 文件夹生成 pth 文件,可直接使用模型来预测生成标注结果。

如果你没有训练自己的模型的话,这里我已经把训练好的模型放上去了,可以直接使用我训练好的模型来测试。如之前跳过了 Git LFS 文件下载,则可以使用如下命令下载 Git LFS 文件:

git lfs pull

此时 checkpoints 文件夹会生成训练好的 pth 文件。

测试脚本:

sh detect.sh

该脚本会读取 captcha 下的 test 文件夹所有图片,并将处理后的结果输出到 result 文件夹。

运行结果样例:

Performing object detection:
        + Batch 0, Inference Time: 0:00:00.044223
        + Batch 1, Inference Time: 0:00:00.028566
        + Batch 2, Inference Time: 0:00:00.029764
        + Batch 3, Inference Time: 0:00:00.032430
        + Batch 4, Inference Time: 0:00:00.033373
        + Batch 5, Inference Time: 0:00:00.027861
        + Batch 6, Inference Time: 0:00:00.031444
        + Batch 7, Inference Time: 0:00:00.032110
        + Batch 8, Inference Time: 0:00:00.029131

Saving images:
(0) Image: 'data/captcha/test/captcha_4497.png'
        + Label: target, Conf: 0.99999
(1) Image: 'data/captcha/test/captcha_4498.png'
        + Label: target, Conf: 0.99999
(2) Image: 'data/captcha/test/captcha_4499.png'
        + Label: target, Conf: 0.99997
(3) Image: 'data/captcha/test/captcha_4500.png'
        + Label: target, Conf: 0.99999
(4) Image: 'data/captcha/test/captcha_4501.png'
        + Label: target, Conf: 0.99997
(5) Image: 'data/captcha/test/captcha_4502.png'
        + Label: target, Conf: 0.99999
(6) Image: 'data/captcha/test/captcha_4503.png'
        + Label: target, Conf: 0.99997
(7) Image: 'data/captcha/test/captcha_4504.png'
        + Label: target, Conf: 0.99998
(8) Image: 'data/captcha/test/captcha_4505.png'
        + Label: target, Conf: 0.99998

拿几个样例结果看下:

7.png
8.png
9.png

这里我们可以看到,利用训练好的模型我们就成功识别出缺口的位置了,另外程序还会打印输出这个边框的中心点和宽高信息。

有了这个边界信息,我们再利用某些手段拖动滑块即可通过验证了,比如可以模拟加速减速过程,或者可以录制人的轨迹再执行都是可以的,由于本课时更多是介绍深度学习识别相关内容,所以关于拖动轨迹不再展开讲解。

本课时我们介绍了使用深度学习识别滑动验证码缺口的方法,包括标注、训练、测试等环节都进行了阐述。有了它,我们就能轻松方便地对缺口进行识别了。

代码:https://github.com/Python3WebSpider/DeepLearningSlideCaptcha


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK