47

Golang实现简单爬虫框架(5)——项目重构与数据存储

 5 years ago
source link: https://www.tuicool.com/articles/YZrmqii
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前言

在上一篇文章 《Golang实现简单爬虫框架(4)——队列实现并发任务调度》 中,我们使用用队列实现了任务调度,接下来首先对两种并发方式做一个同构,使代码统一。然后添加数据存储模块。

注意:本次并发是在上一篇文章简单并发实现的基础上修改,所以没有贴出全部代码,只是贴出部分修改部分,要查看完整项目代码,可以查看上篇文章,或者从github下载 项目源代码查看

1、项目重构

(1)并发引擎

通过分析我们发现,两种不同调度的区别是每个 worker 一个 channel 还是 所有 worker 共用一个 channel ,所以我们在接口中定义一个函数 WorkerChan() ,用来决定这件事,即 worker 一个 channel 还是 所有 worker 共用一个 channel 。此时 ConfigMasterWorkerChan 就不再需要了。

在项目文件concurrent.go中我们定义一个任务调度器Scheduler,如下:

// 任务调度器
type Scheduler interface {
    Submit(request Request) // 提交任务
    ConfigMasterWorkerChan(chan Request)
    WorkerReady(w chan Request)
    Run()
}

但是在简单并发中我们只实现了 SubmitConfigMasterWorkerChan 接口,而使用队列调度中却实现了接口的所有方法,所有我们同构一下使 concurrent.go 文件可以适用于两种不同的调度器。

因为在 createworker 函数中要使用 WorkerReady 函数,所以要传入一个 Scheduler ,但是这样显得比较重,我们可以利用接口组合,新建一个接口 ReadyNotifier ,这样在 createworker 函数中传入 ReadyNotifier 即可。

修改后的任务调度如下:

type Scheduler interface {
    ReadyNotifier
    Submit(request Request) // 提交任务
    WorkerChan() chan Request
    Run()
}
type ReadyNotifier interface {
    WorkerReady(chan Request)
}

此时创建goroutine修改如下:

// 创建 goroutine
for i := 0; i < e.WorkerCount; i++ {
    //任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定
    createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
}

修改后的concurrent.go文件如下:

package engine

import (
    "log"
)

// 并发引擎
type ConcurrendEngine struct {
    Scheduler   Scheduler
    WorkerCount int
}

// 任务调度器
type Scheduler interface {
    ReadyNotifier
    Submit(request Request) // 提交任务
    WorkerChan() chan Request
    Run()
}
type ReadyNotifier interface {
    WorkerReady(chan Request)
}

func (e *ConcurrendEngine) Run(seeds ...Request) {

    out := make(chan ParseResult)
    e.Scheduler.Run()

    // 创建 goruntine
    for i := 0; i < e.WorkerCount; i++ {
        // 任务是每个 worker 一个 channel 还是 所有 worker 共用一个 channel 由WorkerChan 来决定
        createWorker(e.Scheduler.WorkerChan(), out, e.Scheduler)
    }

    // engine把请求任务提交给 Scheduler
    for _, request := range seeds {
        e.Scheduler.Submit(request)
    }

    itemCount := 0
    for {
        // 接受 Worker 的解析结果
        result := <-out
        for _, item := range result.Items {
            log.Printf("Got item: #%d: %v\n", itemCount, item)
            itemCount++
        }

        // 然后把 Worker 解析出的 Request 送给 Scheduler
        for _, request := range result.Requests {
            e.Scheduler.Submit(request)
        }
    }
}

func createWorker(in chan Request, out chan ParseResult, ready ReadyNotifier) {
    go func() {
        for {
            ready.WorkerReady(in) // 告诉调度器任务空闲
            request := <-in
            result, err := worker(request)
            if err != nil {
                continue
            }
            out <- result
        }
    }()
}

(2)简单并发调度器

scheduler/simple.go

package scheduler

import "crawler/engine"

type SimpleScheduler struct {
    workerChan chan engine.Request
}

func (s *SimpleScheduler) WorkerChan() chan engine.Request {
    // 此时所有 worker 共用同一个 channel,直接返回即可
    return s.workerChan
}

func (s *SimpleScheduler) WorkerReady(w chan engine.Request) {

}

func (s *SimpleScheduler) Run() {
    // 创建出 workchannel
    s.workerChan = make(chan engine.Request)
}

func (s *SimpleScheduler) Submit(request engine.Request) {
    // send request down to worker chan
    go func() {
        s.workerChan <- request
    }()
}

(3)队列实现调度器

scheduler/queued.go

添加 WorkerChan() 的实现即可

package scheduler

import "crawler/engine"

// 使用队列来调度任务

type QueuedScheduler struct {
    requestChan chan engine.Request
    workerChan  chan chan engine.Request
}

func (s *QueuedScheduler) WorkerChan() chan engine.Request {
    // 对于队列实现来讲,每个 worker 共用一个 channel
    return make(chan engine.Request)
}

// 提交请求任务到 requestChan
func (s *QueuedScheduler) Submit(request engine.Request) {
    s.requestChan <- request
}

// 告诉外界有一个 worker 可以接收 request
func (s *QueuedScheduler) WorkerReady(w chan engine.Request) {
    s.workerChan <- w
}

func (s *QueuedScheduler) Run() {
    s.workerChan = make(chan chan engine.Request)
    s.requestChan = make(chan engine.Request)
    go func() {
        // 创建请求队列和工作队列
        var requestQ []engine.Request
        var workerQ []chan engine.Request
        for {
            var activeWorker chan engine.Request
            var activeRequest engine.Request

            if len(requestQ) > 0 && len(workerQ) > 0 {
                activeWorker = workerQ[0]
                activeRequest = requestQ[0]
            }

            select {
            case r := <-s.requestChan: // 当 requestChan 收到数据
                requestQ = append(requestQ, r)
            case w := <-s.workerChan: // 当 workerChan 收到数据
                workerQ = append(workerQ, w)
            case activeWorker <- activeRequest: // 当请求队列和认读队列都不为空时,给任务队列分配任务
                requestQ = requestQ[1:]
                workerQ = workerQ[1:]
            }
        }
    }()
}

(4)main函数

经过上述同构,在main函数中如需切换不同调度器,只需要相应的配置即可。

package main

import (
    "crawler/engine"
    "crawler/scheduler"
    "crawler/zhenai/parser"
)

func main() {
    e := engine.ConcurrendEngine{
        //Scheduler: &scheduler.QueuedScheduler{},    // 队列实现调度器
        Scheduler:   &scheduler.SimpleScheduler{},    // 简单并发调度
        WorkerCount: 50,
    }
    e.Run(engine.Request{
        Url:       "http://www.zhenai.com/zhenghun",
        ParseFunc: parser.ParseCityList,
    })
}

2、数据存储

(1)Mgo的介绍安装

爬取到的数据不能仅仅在控制台打印出来,所以我们还要给爬虫添加数据存储模块。我们本次选择使用mongodb来存储我们的数据。

mgo(音mango)是 MongoDBGo语言 驱动,它用基于Go语法的简单API实现了丰富的特性,并经过良好测试。

官方网址: http://labix.org/mgo

文档: API docs for mgo

首先我们要安装mgo,打开终端,输入下面代码完成安装

go get gopkg.in/mgo.v2

mgo基本操作都很简单,有数据库操作经验都可以很快上手。

(2)爬虫引擎与数据格式

首先,爬虫引擎获取到数据要把数据发送给数据存储模块,而数据的传递用要用到 channel ,所以打开 concurrent.go 文件,在引擎添加 ItemChan 属性,如下所示:

爬取到数据需要把数据发送到数据存储模块,

package engine
// 并发引擎
type ConcurrendEngine struct {
    Scheduler   Scheduler // 任务调度器
    WorkerCount int       // 并发任务数量
    ItemChan    chan Item // 数据保存 channel
}

// ...
for {
    // 接受 Worker 的解析结果
    result := <-out
    for _, item := range result.Items {
        // 当抓取一组数据后,进行保存
        go func(item2 Item) {
            e.ItemChan <- item2
        }(item)
    }
    // ...
}
// ...

engine/types.go 中定义Item类型:

package engine

// 请求结构
type Request struct {
    Url       string // 请求地址
    ParseFunc func([]byte) ParseResult
}

// 解析结果结构
type ParseResult struct {
    Requests []Request // 解析出的请求
    Items    []Item    // 解析出的内容
}

// 解析出的用户数据格式
type Item struct {
    Url     string      // 个人信息Url地址
    Type    string      // table
    Id      string      // Id
    Payload interface{} // 详细信息
}

func NilParseFun([]byte) ParseResult {
    return ParseResult{}
}

(3)存储模块的实现

在根目录下创建persist文件夹,然后创建itemsaver.go文件

// persist/itemsaver.go
package persist

import (
    "context"
    "crawler/engine"
    "errors"
    "gopkg.in/mgo.v2"
    "gopkg.in/olivere/elastic.v5"
    "log"
)

func ItemSaver(index string) (chan engine.Item, error) {
    // mongodb connect
    session, err := mgo.Dial("localhost:27017")
    if err != nil {
        panic(err)
    }

    out := make(chan engine.Item)
    go func() {
        itemCount := 0
        for {
            // 接收到发送的 item
            item := <-out
            log.Printf("Item Saver: got item #%d: %v\n",
                itemCount, item)
            itemCount++

            // Save data in mongodb
            err := mongo_save(session, index, item)

            if err != nil {
                // if have err, ignore it
                log.Printf("Item Saver: error, saving item %v: %v",
                    item, err)
            }
        }
    }()
    return out, nil
}

// 使用 MongoDB 保存数据
func mongo_save(session *mgo.Session, dbName string, item engine.Item) error {
    if item.Type == "" {
        return errors.New("must supply Type")
    }
    c := session.DB(dbName).C(item.Type)    // 选择要操作的数据库与集合
    err := c.Insert(item)        // 插入数据
    if err != nil {
        log.Fatal(err)
    }
    return nil
}

(4)存储测试文件

我们把一条数据存入mongodb,然后再取出来,比对读出的数据和写入的数据是否相同

// persist/itemsaver_test.gp
package persist

import (
    "crawler/engine"
    "crawler/model"
    "encoding/json"
    "fmt"
    "gopkg.in/mgo.v2"
    "gopkg.in/mgo.v2/bson"
    "log"
    "testing"
)
func TestMongoSave(t *testing.T) {
    // mongodb connect
    session, err := mgo.Dial("localhost:27017")
    if err != nil {
        panic(err)
    }

    expected := engine.Item{
        Url:  "http://album.zhenai.com/u/1946858930",
        Type: "zhenai",
        Id:   "1946858930",
        Payload: model.Profile{
            Name:     "為你垨候",
            Gender:   "女士",
            Age:      40,
            Height:   163,
            Weight:   54,
            Income:   "5-8千",
            Marriage: "未婚",
            Address:  "佛山顺德区",
        },
    }
    // 保存数据
    err = mongo_save(session, "crawler", expected)
    if err != nil {
        panic(err)
    }

    c := session.DB("crawler").C("zhenai")

    var result engine.Item
    // 查询数据
    err = c.Find(bson.M{"id": "1946858930"}).One(&result)
    // result 为 Json 类型
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("%s, %s, %v\n", result.Url, result.Id, result.Payload)
}

(5)parser模块

我们要在 parse/profile.go 文件中组装好需要保存到数据库的数据格式

// ...
result := engine.ParseResult{
    Items: []engine.Item{
        {
            Url:     url,
            Type:    "zhenai",
            Id:      extractString([]byte(url), idUrlRe),
            Payload: profile,
        },
    },
}
// ...

(6)main函数

package main

import (
    "crawler/engine"
    "crawler/persist"
    "crawler/scheduler"
    "crawler/zhenai/parser"
)

func main() {
    itemChan, err := persist.ItemSaver()
    if err != nil {
        panic(err)
    }

    e := engine.ConcurrendEngine{
        //Scheduler: &scheduler.QueuedScheduler{},
        Scheduler:   &scheduler.SimpleScheduler{},
        WorkerCount: 100,
        ItemChan:    itemChan,
    }
    e.Run(engine.Request{
        Url:       "http://www.zhenai.com/zhenghun",
        ParseFunc: parser.ParseCityList,
    })
}

运行项目,打开mongodb可视化工具,可以看到爬取了54410条数据

1460000019340532

3、总结

我们首先把两种并发方式做一个同构,使代码统一,直接在main函数中使用不同的配置就可以切换调度器,简单方便。然后使用Mgo驱动操作数据,添加到mongodb中。内容有点多,很多代码没有完整的展示出来,希望大家可以下载 项目源代码 ,回滚到对应提交记录查看,效果会更好。 别无所求,只求随手给个star

下篇博客中我们会再当前博客的基础上添加数据展示功能

如果想获取 Google工程师深度讲解go语言 视频资源的,可以在评论区留下邮箱。

如果觉得文章还可以,劳烦大人随手点个赞。。。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK