zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Go分布式爬虫笔记(二十二)

2023-09-11 14:16:28 时间

22 辅助任务管理:任务优先级、去重与失败处理

设置爬虫最大深度

目的:

  • 防止访问陷入到死循环
  • 控制爬取的有效链接的数量

最大爬取深度是和任务有关的,因此我们要在 Request 中加上 MaxDepth 这个字段,它可以标识到爬取的最大深度。Depth 则表示任务的当前深度,最初始的深度为 0。

type Request struct {
  Url       string
  Cookie    string
  WaitTime  time.Duration
  Depth     int
  MaxDepth  int
  ParseFunc func([]byte, *Request) ParseResult
}

那在异步爬取的情况下,我们怎么知道当前网站的深度呢?最好的时机是在采集引擎采集并解析爬虫数据,并将下一层的请求放到队列中的时候。以我们之前写好的 ParseURL 函数为例,在添加下一层的 URL 时,我们将 Depth 加 1,这样就标识了下一层的深度。

func ParseURL(contents []byte, req *collect.Request) collect.ParseResult {
  re := regexp.MustCompile(urlListRe)

  matches := re.FindAllSubmatch(contents, -1)
  result := collect.ParseResult{}

  for _, m := range matches {
    u := string(m[1])
    result.Requesrts = append(
      result.Requesrts, &collect.Request{
        Url:      u,
        WaitTime: req.WaitTime,
        Cookie:   req.Cookie,
        Depth:    req.Depth + 1,
        MaxDepth: req.MaxDepth,
        ParseFunc: func(c []byte, request *collect.Request) collect.ParseResult {
          return GetContent(c, u)
        },
      })
  }
  return result
}

最后一步,我们在爬取新的网页之前,判断最大深度。如果当前深度超过了最大深度,那就不再进行爬取。


func (r *Request) Check() error {
  if r.Depth > r.MaxDepth {
    return errors.New("Max depth limit reached")
  }
  return nil
}

func (s *Schedule) CreateWork() {
  for {
    r := <-s.workerCh
    if err := r.Check(); err != nil {
      s.Logger.Error("check failed",
        zap.Error(err),
      )
      continue
    }
    ...
  }
}

避免请求重复

目的:

  • 避免死循环
  • 无效爬取

考虑点:

  • 用什么数据结构来存储数据才能保证快速地查找到请求的记录?
    哈希表
  • 如何保证并发查找与写入时,不出现并发冲突问题?
    锁, sync.Map
  • 在什么条件下,我们才能确认请求是重复的,从而停止爬取?
    任务进行前检查

在解决上面的三个问题之前,我们先优化一下代码。我们之前的 Request 结构体会在每一次请求时发生变化,但是我们希望有一个字段能够表示一整个网站的爬取任务,因此我们需要抽离出一个新的结构 Task ​作为一个爬虫任务,而 Request 则作为单独的请求存在。有些参数是整个任务共有的,例如 Task 中的 Cookie、MaxDepth(最大深度)、WaitTime(默认等待时间)和 RootReq(任务中的第一个请求)。


type Task struct {
  Url         string
  Cookie      string
  WaitTime    time.Duration
  MaxDepth    int
  RootReq     *Request
  Fetcher     Fetcher
}

// 单个请求
type Request struct {
  Task      *Task
  Url       string
  Depth     int
  ParseFunc func([]byte, *Request) ParseResult
}

由于抽象出了 Task,代码需要做对应的修改,例如我们需要把初始的 Seed 种子任务替换为 Task 结构。

for i := 0; i <= 0; i += 25 {
    str := fmt.Sprintf("<https://www.douban.com/group/szsh/discussion?start=%d>", i)
    seeds = append(seeds, &collect.Task{
      ...
      Url:      str,
      RootReq: &collect.Request{
        ParseFunc: doubangroup.ParseURL,
      },
    })
  }

同时,在深度检查时,每一个请求的最大深度需要从 Task 字段中获取。


func (r *Request) Check() error {
  if r.Depth > r.Task.MaxDepth {
    return errors.New("Max depth limit reached")
  }
  return nil
}

接下来,我们继续用一个哈希表结构来存储历史请求。
由于我们希望随时访问哈希表中的历史请求,所以把它放在 Request、Task 中都不合适。 放在调度引擎中也不合适,因为调度引擎从功能上讲,应该只负责调度才对。所以,我们还需要完成一轮抽象,将调度引擎抽离出来作为一个接口,让它只做调度的工作,不用负责存储全局变量等任务。

type Crawler struct {
	out         chan collect.ParseResult //负责处理爬取后的数据,完成下一步的存储操作。schedule 函数会创建调度程序,负责的是调度的核心逻辑。
	Visited     map[string]bool          //存储请求访问信息
	VisitedLock sync.Mutex
	options
}

type Scheduler interface {
	Schedule()                //启动调度器
	Push(...*collect.Request) //将请求放入到调度器中
	Pull() *collect.Request   //从调度器中获取请求
}

type Schedule struct {
	requestCh chan *collect.Request //负责接收请求
	workerCh  chan *collect.Request //负责分配任务给 worker
	reqQueue  []*collect.Request
	Logger    *zap.Logger
}

Visited 中的 Key 是请求的唯一标识,我们现在先将唯一标识设置为 URL + method 方法,并使用 MD5 生成唯一键。后面我们还会为唯一标识加上当前请求的规则条件。

// 请求的唯一识别码
func (r *Request) Unique() string {
  block := md5.Sum([]byte(r.Url + r.Method))
  return hex.EncodeToString(block[:])
}

接着,编写 HasVisited 方法,判断当前请求是否已经被访问过。StoreVisited 方法用于将请求存储到 Visited 哈希表中。

func (e *Crawler) HasVisited(r *collect.Request) bool {
  e.VisitedLock.Lock()
  defer e.VisitedLock.Unlock()
  unique := r.Unique()
  return e.Visited[unique]
}

func (e *Crawler) StoreVisited(reqs ...*collect.Request) {
  e.VisitedLock.Lock()
  defer e.VisitedLock.Unlock()

  for _, r := range reqs {
    unique := r.Unique()
    e.Visited[unique] = true
  }
}

最后在 Worker 中,在执行 request 前,判断当前请求是否已被访问。如果请求没有被访问过,将 request 放入 Visited 哈希表中。

func (s *Crawler) CreateWork() {
  for {
    r := s.scheduler.Pull()
    if err := r.Check(); err != nil {
      s.Logger.Error("check failed",
        zap.Error(err),
      )
      continue
    }
    // 判断当前请求是否已被访问
    if s.HasVisited(r) {
      s.Logger.Debug("request has visited",
        zap.String("url:", r.Url),
      )
      continue
    }
    // 设置当前请求已被访问
    s.StoreVisited(r)
    ...
  }
}

设置优先队列

爬虫任务的优先级有时并不是相同的,一些任务需要优先处理。因此,接下来我们就来设置一个任务的优先队列。优先队列还可以分成多个等级,在这里将它简单地分为了两个等级,即优先队列和普通队列。优先级更高的请求会存储到 priReqQueue 优先队列中。

type Schedule struct {
  requestCh   chan *collect.Request
  workerCh    chan *collect.Request
  priReqQueue []*collect.Request
  reqQueue    []*collect.Request
  Logger      *zap.Logger
}

设置随机User-Agent

避免服务器检测到我们使用了同一个 User-Agent,继而判断出是同一个客户端在发出请求,我们可以为发送的 User-Agent 加入随机性。

这个操作的本质就是将浏览器的不同型号与不同版本拼接起来,组成一个新的 User-Agent。

随机生成 User-Agent 的逻辑位于 extensions/randomua.go 中,里面枚举了不同型号的浏览器和不同型号的版本,并且通过排列组合产生了不同的 User-Agent。

最后一步,我们要在采集引擎中调用 GenerateRandomUA 函数,将请求头设置为随机的 User-Agent,如下所示:

func (b BrowserFetch) Get(request *spider.Request) ([]byte, error) {
   ...
   req.Header.Set("User-Agent", extensions.GenerateRandomUA())
   resp, err := client.Do(req)

失败处理

我们在爬取网站时,网络超时等诸多潜在风险都可能导致爬取失败。这时,我们可以对失败的任务进行重试。但是如果网站多次失败,那就没有必要反复重试了,我们可以将它们放入单独的队列中。为了防止失败请求日积月久导致的内存泄露,同时也为了在程序崩溃后能够再次加载这些失败网站,我们最后还需要将这些失败网站持久化到数据库或文件中。

我们先完成前半部分,即失败重试。我们要在全局 Crawler 中存储 failures 哈希表,设置 Key 为请求的唯一键,用于快速查找。failureLock 互斥锁用于并发安全。

type Crawler struct {
   ...
   failures    map[string]*collect.Request // 失败请求id -> 失败请求
   failureLock sync.Mutex
}

当请求失败之后,调用 SetFailure 方法将请求加入到 failures 哈希表中,并且把它重新交由调度引擎进行调度。这里我们为任务 Task 引入了一个新的字段 Reload,标识当前任务的网页是否可以重复爬取。如果不可以重复爬取,我们需要在失败重试前删除 Visited 中的历史记录。

「此文章为4月Day6学习笔记,内容来源于极客时间《Go分布式爬虫实战》,强烈推荐该课程!/推荐该课程」