Golang实现简单爬虫框架(5)——项目重构与数据存储
source link: https://www.tuicool.com/articles/YZrmqii
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
前言
在上一篇文章 《Golang实现简单爬虫框架(4)——队列实现并发任务调度》 中,我们使用用队列实现了任务调度,接下来首先对两种并发方式做一个同构,使代码统一。然后添加数据存储模块。
注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载 项目源代码查看
1、项目重构
(1)并发引擎
通过分析我们发现,两种不同调度的区别是每个 worker
一个 channel
还是 所有 worker
共用一个 channel
,所以我们在接口中定义一个函数 WorkerChan()
,用来决定这件事,即 worker
一个 channel
还是 所有 worker
共用一个 channel
。此时 ConfigMasterWorkerChan
就不再需要了。
在项目文件concurrent.go中我们定义一个任务调度器Scheduler,如下:
// 任务调度器 type Scheduler interface { Submit(request Request) // 提交任务 ConfigMasterWorkerChan(chan Request) WorkerReady(w chan Request) Run() }
但是在简单并发中我们只实现了 Submit
和 ConfigMasterWorkerChan
接口,而使用队列调度中却实现了接口的所有方法,所有我们同构一下使 concurrent.go
文件可以适用于两种不同的调度器。
因为在 createworker
函数中要使用 WorkerReady
函数,所以要传入一个 Scheduler
,但是这样显得比较重,我们可以利用接口组合,新建一个接口 ReadyNotifier
,这样在 createworker
函数中传入 ReadyNotifier
即可。
修改后的任务调度如下:
type Scheduler interface { ReadyNotifier Submit(request Request) // 提交任务 WorkerChan() chan Request Run() } type ReadyNotifier interface { WorkerReady(chan Request) }
此时创建goroutine修改如下:
// 创建 goroutine for i := 0; i < e.WorkerCount; i++ { //任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定 createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler) }
修改后的concurrent.go文件如下:
package engine import ( "log" ) // 并发引擎 type ConcurrendEngine struct { Scheduler Scheduler WorkerCount int } // 任务调度器 type Scheduler interface { ReadyNotifier Submit(request Request) // 提交任务 WorkerChan() chan Request Run() } type ReadyNotifier interface { WorkerReady(chan Request) } func (e *ConcurrendEngine) Run(seeds ...Request) { out := make(chan ParseResult) e.Scheduler.Run() // 创建 goruntine for i := 0; i < e.WorkerCount; i++ { // 任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定 createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler) } // engine把请求任务提交给 Scheduler for _, request := range seeds { e.Scheduler.Submit(request) } itemCount := 0 for { // 接受 Worker 的解析结果 result := <-out for _, item := range result.Items { log.Printf("Got item: #%d: %v\n", itemCount, item) itemCount++ } // 然后把 Worker 解析出的 Request 送给 Scheduler for _, request := range result.Requests { e.Scheduler.Submit(request) } } } func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) { go func() { for { ready.WorkerReady(in) // 告诉调度器任务空闲 request := <-in result, err := worker(request) if err != nil { continue } out <- result } }() }
(2)简单并发调度器
scheduler/simple.go
package scheduler import "crawler/engine" type SimpleScheduler struct { workerChan chan engine.Request } func (s *SimpleScheduler) WorkerChan() chan engine.Request { // 此时所有 worker 共用同一个 channel,直接返回即可 return s.workerChan } func (s *SimpleScheduler) WorkerReady(w chan engine.Request) { } func (s *SimpleScheduler) Run() { // 创建出 workchannel s.workerChan = make(chan engine.Request) } func (s *SimpleScheduler) Submit(request engine.Request) { // send request down to worker chan go func() { s.workerChan <- request }() }
(3)队列实现调度器
scheduler/queued.go
添加 WorkerChan()
的实现即可
package scheduler import "crawler/engine" // 使用队列来调度任务 type QueuedScheduler struct { requestChan chan engine.Request workerChan chan chan engine.Request } func (s *QueuedScheduler) WorkerChan() chan engine.Request { // 对于队列实现来讲,每个 worker 共用一个 channel return make(chan engine.Request) } // 提交请求任务到 requestChan func (s *QueuedScheduler) Submit(request engine.Request) { s.requestChan <- request } // 告诉外界有一个 worker 可以接收 request func (s *QueuedScheduler) WorkerReady(w chan engine.Request) { s.workerChan <- w } func (s *QueuedScheduler) Run() { s.workerChan = make(chan chan engine.Request) s.requestChan = make(chan engine.Request) go func() { // 创建请求队列和工作队列 var requestQ []engine.Request var workerQ []chan engine.Request for { var activeWorker chan engine.Request var activeRequest engine.Request if len(requestQ) > 0 && len(workerQ) > 0 { activeWorker = workerQ[0] activeRequest = requestQ[0] } select { case r := <-s.requestChan: // 当 requestChan 收到数据 requestQ = append(requestQ, r) case w := <-s.workerChan: // 当 workerChan 收到数据 workerQ = append(workerQ, w) case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务 requestQ = requestQ[1:] workerQ = workerQ[1:] } } }() }
(4)main函数
经过上述同构,在main函数中如需切换不同调度器,只需要相应的配置即可。
package main import ( "crawler/engine" "crawler/scheduler" "crawler/zhenai/parser" ) func main() { e := engine.ConcurrendEngine{ //Scheduler: &scheduler.QueuedScheduler{}, // 队列实现调度器 Scheduler: &scheduler.SimpleScheduler{}, // 简单并发调度 WorkerCount: 50, } e.Run(engine.Request{ Url: "http://www.zhenai.com/zhenghun", ParseFunc: parser.ParseCityList, }) }
2、数据存储
(1)Mgo的介绍安装
爬取到的数据不能仅仅在控制台打印出来,所以我们还要给爬虫添加数据存储模块。我们本次选择使用mongodb来存储我们的数据。
mgo(音mango)是 MongoDB 的 Go语言 驱动,它用基于Go语法的简单API实现了丰富的特性,并经过良好测试。
官方网址: http://labix.org/mgo
文档: API docs for mgo
首先我们要安装mgo,打开终端,输入下面代码完成安装
go get gopkg.in/mgo.v2
mgo基本操作都很简单,有数据库操作经验都可以很快上手。
(2)爬虫引擎与数据格式
首先,爬虫引擎获取到数据要把数据发送给数据存储模块,而数据的传递用要用到 channel
,所以打开 concurrent.go
文件,在引擎添加 ItemChan
属性,如下所示:
爬取到数据需要把数据发送到数据存储模块,
package engine // 并发引擎 type ConcurrendEngine struct { Scheduler Scheduler // 任务调度器 WorkerCount int // 并发任务数量 ItemChan chan Item // 数据保存 channel } // ... for { // 接受 Worker 的解析结果 result := <-out for _, item := range result.Items { // 当抓取一组数据后,进行保存 go func(item2 Item) { e.ItemChan <- item2 }(item) } // ... } // ...
在 engine/types.go
中定义Item类型:
package engine // 请求结构 type Request struct { Url string // 请求地址 ParseFunc func([]byte) ParseResult } // 解析结果结构 type ParseResult struct { Requests []Request // 解析出的请求 Items []Item // 解析出的内容 } // 解析出的用户数据格式 type Item struct { Url string // 个人信息Url地址 Type string // table Id string // Id Payload interface{} // 详细信息 } func NilParseFun([]byte) ParseResult { return ParseResult{} }
(3)存储模块的实现
在根目录下创建persist文件夹,然后创建itemsaver.go文件
// persist/itemsaver.go package persist import ( "context" "crawler/engine" "errors" "gopkg.in/mgo.v2" "gopkg.in/olivere/elastic.v5" "log" ) func ItemSaver(index string) (chan engine.Item, error) { // mongodb connect session, err := mgo.Dial("localhost:27017") if err != nil { panic(err) } out := make(chan engine.Item) go func() { itemCount := 0 for { // 接收到发送的 item item := <-out log.Printf("Item Saver: got item #%d: %v\n", itemCount, item) itemCount++ // Save data in mongodb err := mongo_save(session, index, item) if err != nil { // if have err, ignore it log.Printf("Item Saver: error, saving item %v: %v", item, err) } } }() return out, nil } // 使用 MongoDB 保存数据 func mongo_save(session *mgo.Session, dbName string, item engine.Item) error { if item.Type == "" { return errors.New("must supply Type") } c := session.DB(dbName).C(item.Type) // 选择要操作的数据库与集合 err := c.Insert(item) // 插入数据 if err != nil { log.Fatal(err) } return nil }
(4)存储测试文件
我们把一条数据存入mongodb,然后再取出来,比对读出的数据和写入的数据是否相同
// persist/itemsaver_test.gp package persist import ( "crawler/engine" "crawler/model" "encoding/json" "fmt" "gopkg.in/mgo.v2" "gopkg.in/mgo.v2/bson" "log" "testing" ) func TestMongoSave(t *testing.T) { // mongodb connect session, err := mgo.Dial("localhost:27017") if err != nil { panic(err) } expected := engine.Item{ Url: "http://album.zhenai.com/u/1946858930", Type: "zhenai", Id: "1946858930", Payload: model.Profile{ Name: "為你垨候", Gender: "女士", Age: 40, Height: 163, Weight: 54, Income: "5-8千", Marriage: "未婚", Address: "佛山顺德区", }, } // 保存数据 err = mongo_save(session, "crawler", expected) if err != nil { panic(err) } c := session.DB("crawler").C("zhenai") var result engine.Item // 查询数据 err = c.Find(bson.M{"id": "1946858930"}).One(&result) // result 为 Json 类型 if err != nil { log.Fatal(err) } fmt.Printf("%s, %s, %v\n", result.Url, result.Id, result.Payload) }
(5)parser模块
我们要在 parse/profile.go
文件中组装好需要保存到数据库的数据格式
// ... result := engine.ParseResult{ Items: []engine.Item{ { Url: url, Type: "zhenai", Id: extractString([]byte(url), idUrlRe), Payload: profile, }, }, } // ...
(6)main函数
package main import ( "crawler/engine" "crawler/persist" "crawler/scheduler" "crawler/zhenai/parser" ) func main() { itemChan, err := persist.ItemSaver() if err != nil { panic(err) } e := engine.ConcurrendEngine{ //Scheduler: &scheduler.QueuedScheduler{}, Scheduler: &scheduler.SimpleScheduler{}, WorkerCount: 100, ItemChan: itemChan, } e.Run(engine.Request{ Url: "http://www.zhenai.com/zhenghun", ParseFunc: parser.ParseCityList, }) }
运行项目,打开mongodb可视化工具,可以看到爬取了54410条数据
3、总结
我们首先把两种并发方式做一个同构,使代码统一,直接在main函数中使用不同的配置就可以切换调度器,简单方便。然后使用Mgo驱动操作数据,添加到mongodb中。内容有点多,很多代码没有完整的展示出来,希望大家可以下载 项目源代码 ,回滚到对应提交记录查看,效果会更好。 别无所求,只求随手给个star
下篇博客中我们会再当前博客的基础上添加数据展示功能
如果想获取 Google工程师深度讲解go语言 视频资源的,可以在评论区留下邮箱。
如果觉得文章还可以,劳烦大人随手点个赞。。。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK