zl程序教程

您现在的位置是:首页 >  后端

当前栏目

4-爬虫框架-分布式异步爬虫

2023-09-14 09:00:33 时间

#####

分布式爬虫

一、分布式爬虫架构

爬虫Server,负责管理所有URL(即,爬虫客户端的下载任务)的状态,通过我们前面介绍的UrlPool(网址池)进行管理。

Server提供接口给Clients,以便它们获取URL和提交URL。

 

爬虫Client,负责URL的下载、网页的解析以及存储等各种。Client通过接口向Server请求需要被下载的URL,

下载完成后向Server报告URL是否下载成功,同时把从网页中提取到的URLs提交给Server,Server把它们放入URLPool。

#### 

二、分布式爬虫Server端实现

这个分布式框架的server端是使用的sanic异步的web框架来实现的,

现在我们写的这个服务端,只是用到了sanic的api,没有界面,用到的功能很简单,

未来,你可能再公司可能要求你能再页面设置一些参数,再页面监控爬虫运行的情况,这个时候你就需要对sanic框架如何写页面再学习学习了,

from sanic import Sanic
from sanic import response

from urlpool import UrlPool

urlpool = UrlPool(__file__)

# 初始化urlpool,根据你的需要进行修改
hub_urls = []
urlpool.set_hubs(hub_urls, 300)
urlpool.add('https://news.sina.com.cn/')

# init
app = Sanic(__name__)


@app.listener('after_server_stop')
async def cache_urlpool(app, loop):
    global urlpool
    print('caching urlpool after_server_stop')
    del urlpool
    print('bye!')


@app.route('/task')
async def task_get(request):
    count = request.args.get('count', 10)
    try:
        count = int(count)
    except:
        count = 10
    urls = urlpool.pop(count)
    return response.json(urls)


@app.route('/task', methods=['POST', ])
async def task_post(request):
    result = request.json
    urlpool.set_status(result['url'], result['status'])
    if result['url_real'] != result['url']:
        urlpool.set_status(result['url_real'], result['status'])
    if result['newurls']:
        print('receive URLs:', len(result['newurls']))
        for url in result['newurls']:
            urlpool.add(url)
    return response.text('ok')


if __name__ == '__main__':
    app.run(
        host='0.0.0.0',
        port=8080,
        debug=False,
        access_log=False,
        workers=1)

 

####  

@app.listener(‘after_server_stop’)

在Server退出前,缓存URLPool里面的url。

整个web服务就实现了一个接口:/task, 通过GET方法让client获取URLs,通过POST让client提交URLs。

####

三、爬虫Client的实现

#!/usr/bin/env python3
# encoding: UTF-8
# author: veelion
# file: bee_client.py

import re
import cchardet
import traceback
import time
import json
import asyncio
import urllib.parse as urlparse
import aiohttp
import uvloop


asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())



p_tag_a = re.compile(
    r'<a[^>]*?href=[\'"]?([^> \'"]+)[^>]*?>(.*?)</a>',
    re.I|re.S|re.M)


def extract_links_re(url, html):
    newlinks = set()
    aa = p_tag_a.findall(html)
    for a in aa:
        link = a[0].strip()
        if not link:
            continue
        link = urlparse.urljoin(url, link)
        if not link.startswith('http'):
            continue
        newlinks.add(link)
    return newlinks



class CrawlerClient:
    def __init__(self, ):
        self._workers = 0
        self.workers_max = 10
        self.server_host = 'localhost'
        self.server_port = 8080
        self.headers = {'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
      'Windows NT 6.1; Win64; x64; Trident/5.0)')}

        self.loop = asyncio.get_event_loop()
        self.queue = asyncio.Queue(loop=self.loop)
        self.session = aiohttp.ClientSession(loop=self.loop)

    async def download(self, url, timeout=25):
        status_code = 900
        html = ''
        url_now = url
        try:
            async with self.session.get(url_now, headers=self.headers, timeout=timeout) as response:
                status_code = response.status
                html = await response.read()
                encoding = cchardet.detect(html)['encoding']
                html = html.decode(encoding, errors='ignore')
                url_now = str(response.url)
        except Exception as e:
            # traceback.print_exc()
            print('=== exception: ', e, type(e), str(e))
            msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e))
            print(msg)
        return status_code, html, url_now

    async def get_urls(self,):
        count = self.workers_max - self.queue.qsize()
        if count <= 0:
            print('no need to get urls this time')
            return None
        url = 'http://%s:%s/task?count=%s' % (
            self.server_host,
            self.server_port,
            count
        )
        try:
            async with self.session.get(url, timeout=3) as response:
                if response.status not in [200, 201]:
                    return
                jsn = await response.text()
                urls = json.loads(jsn)
                msg = ('get_urls()  to get [%s] but got[%s], @%s') % (
                    count, len(urls),
                    time.strftime('%Y-%m-%d %H:%M:%S'))
                print(msg)
                for kv in urls.items():
                    await self.queue.put(kv)
                print('queue size:', self.queue.qsize(), ', _workers:', self._workers)
        except:
            traceback.print_exc()
            return

    async def send_result(self, result):
        url = 'http://%s:%s/task' % (
            self.server_host,
            self.server_port
        )
        try:
            async with self.session.post(url, json=result, timeout=3) as response:
                return response.status
        except:
            traceback.print_exc()
            pass

    def save_html(self, url, html):
        print('saved:', url, len(html))

    def filter_good(self, urls):
        '''根据抓取目的过滤提取的URLs,只要你想要的'''
        good = []
        for url in urls:
            if url.startswith('http'):
                good.append(url)
        return good

    async def process(self, url, ishub):
        status, html, url_now = await self.download(url)
        self._workers -= 1
        print('downloaded:', url, ', html:', len(html))
        if html:
            newurls = extract_links_re(url, html)
            newurls = self.filter_good(newurls)
            self.save_html(url, html)
        else:
            newurls = []
        result = {
            'url': url,
            'url_real': url_now,
            'status': status,
            'newurls': newurls,
        }
        await self.send_result(result)

    async def loop_get_urls(self,):
        print('loop_get_urls() start')
        while 1:
            await self.get_urls()
            await asyncio.sleep(1)

    async def loop_crawl(self,):
        print('loop_crawl() start')
        asyncio.ensure_future(self.loop_get_urls())
        counter = 0
        while 1:
            item = await self.queue.get()
            url, url_level = item
            self._workers += 1
            counter += 1
            asyncio.ensure_future(self.process(url, url_level))

            if self._workers > self.workers_max:
                print('====== got workers_max, sleep 3 sec to next worker =====')
                await asyncio.sleep(3)

    def start(self):
        try:
            self.loop.run_until_complete(self.loop_crawl())
        except KeyboardInterrupt:
            print('stopped by yourself!')
            pass


def run():
    ant = CrawlerClient()
    ant.start()


if __name__ == '__main__':
    run()

#####

我们把Client写成一个类,这个类一部分接口是与Server交互用的,也就是从Server那里获取要下载的URL,
以及向Server提交新得到的URLs。另外一部分接口是与互联网交互的,即下载URLs。 通过异步IO,我们可以把Client的并发提高上去,达到更高的抓取速度。 先来看看这个类的初始化: 其中,self._workers 记录当前正在下载的协程(即,并发数); sellf.workers_max 是限制最大并发数,根据自己的CPU性能和网络带宽调整其大小,从而达到最大效率利用硬件资源。 download()方法是对aiohttp的封装,方便异步下载。 下面是与Server服务进行交互的两个方法: get_urls() send_result() 下面是下载任务相关的方法,其中: save_html()根据自己需要可以把下载的网页保存到数据库; filter_good()清洗提取到的URLs,把不需要下载的URLs扔掉。 process()是个协程定义,它的工作就是下载、提取、保存、提交,这个协程会在抓取循环中被不断的创建,达到并发的目的。 最后,我们定义两个循环, 一个用于定时向Server请求要下载的URLs, 另一个用于调度下载任务处理协程。 通过self._workers这个计数器限制协程数量。 start()方法就是整个类的执行入口。

 

 

######

总结:

这个框架,从单个同步爬虫--到异步并发爬虫----到分布式爬虫,

的确有很多借鉴的地方,

1,下载器,对mysql的封装,

2,网址池的实现,这个给了我很多的灵感,但是使用的leveldb,我并没有很多的实践经验,

3,异步并发,这个很重要,我需要再次对异步并发的这个知识点做研究,

4,分布式爬虫,这个也给我很多灵感,但是使用了sanic框架,我也没有很多的实践经验,

这都很是问题,所以我认为重心还是应该放到scrapy上面,深深的挖掘scrapy,才是我应该做的,

####

新闻类的网页,几乎没有反扒,

这个时候主要考察,我们写爬虫的能力,怎么并发提高抓取的效率,

但是现在的网站反扒很厉害了,

 

 

 

#####

 

 

 

 

 

####