zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Scrapy源码阅读分析_3_核心组件

scrapy组件源码 分析 核心 阅读
2023-09-11 14:13:59 时间

 

From:https://blog.csdn.net/weixin_37947156/article/details/74481758

 

这篇这要是关于核心组件,讲解这些核心组件初始化都做了哪些工作。包括:引擎、下载器、调度器、爬虫类、输出处理器 等的初始化。每个核心组件下其实都包含一些小的组件在里面,帮助处理某一环节的各种流程。

  • 核心组件初始化
  • 核心组件交互流程

 

 

爬虫类

 

接着上次代码讲,上次的运行入口执行到最后是执行了 Crawler 的 crawl 方法:

    @defer.inlineCallbacks
    def crawl(self, *args, **kwargs):
        assert not self.crawling, "Crawling already taking place"
        self.crawling = True

        try:
            # 到现在,才是实例化一个爬虫实例
            self.spider = self._create_spider(*args, **kwargs)

            # 创建引擎
            self.engine = self._create_engine()

            # 调用爬虫类的start_requests方法
            start_requests = iter(self.spider.start_requests())

            # 执行引擎的open_spider,并传入爬虫实例和初始请求
            yield self.engine.open_spider(self.spider, start_requests)
            yield defer.maybeDeferred(self.engine.start)
        except Exception:
            # In Python 2 reraising an exception after yield discards
            # the original traceback (see https://bugs.python.org/issue7563),
            # so sys.exc_info() workaround is used.
            # This workaround also works in Python 3, but it is not needed,
            # and it is slower, so in Python 3 we use native `raise`.
            if six.PY2:
                exc_info = sys.exc_info()

            self.crawling = False
            if self.engine is not None:
                yield self.engine.close()

            if six.PY2:
                six.reraise(*exc_info)
            raise

在这里,就交由 scrapy 引擎 来处理了。

依次来看,爬虫类是如何实例化的?上文已讲解过,在 Crawler 实例化时,会创建 SpiderLoader,它会根据用户的配置文件settings.py 找到存放爬虫的位置,我们写的爬虫都会放在这里。

然后 SpiderLoader 会扫描这里的所有文件,并找到 父类是 scrapy.Spider 爬虫类,然后根据爬虫类中的 name 属性(在编写爬虫时,这个属性是必填的),最后生成一个 {spider_name: spider_cls} 的 字典,然后根据 scrapy crawl <spider_name> 命令,根据 spider_name 找到对应的爬虫类,然后实例化它,在这里就是调用了 _create_spider 方法:

class Crawler(object):

    def __init__(self, spidercls, settings=None):
        if isinstance(settings, dict) or settings is None:
            settings = Settings(settings)

        self.spidercls = spidercls
        self.settings = settings.copy()
        self.spidercls.update_settings(self.settings)

        d = dict(overridden_settings(self.settings))
        logger.info("Overridden settings: %(settings)r", {'settings': d})

        self.signals = SignalManager(self)
        self.stats = load_object(self.settings['STATS_CLASS'])(self)

        handler = LogCounterHandler(self, level=self.settings.get('LOG_LEVEL'))
        logging.root.addHandler(handler)
        if get_scrapy_root_handler() is not None:
            # scrapy root handler already installed: update it with new settings
            install_scrapy_root_handler(self.settings)
        # lambda is assigned to Crawler attribute because this way it is not
        # garbage collected after leaving __init__ scope
        self.__remove_handler = lambda: logging.root.removeHandler(handler)
        self.signals.connect(self.__remove_handler, signals.engine_stopped)

        lf_cls = load_object(self.settings['LOG_FORMATTER'])
        self.logformatter = lf_cls.from_crawler(self)
        self.extensions = ExtensionManager.from_crawler(self)

        self.settings.freeze()
        self.crawling = False
        self.spider = None
        self.engine = None

    @property
    def spiders(self):
        if not hasattr(self, '_spiders'):
            warnings.warn("Crawler.spiders is deprecated, use "
                          "CrawlerRunner.spider_loader or instantiate "
                          "scrapy.spiderloader.SpiderLoader with your "
                          "settings.",
                          category=ScrapyDeprecationWarning, stacklevel=2)
            self._spiders = _get_spider_loader(self.settings.frozencopy())
        return self._spiders

    @defer.inlineCallbacks
    def crawl(self, *args, **kwargs):
        assert not self.crawling, "Crawling already taking place"
        self.crawling = True

        try:
            # 到现在,才是实例化一个爬虫实例
            self.spider = self._create_spider(*args, **kwargs)

            # 创建引擎
            self.engine = self._create_engine()

            # 调用爬虫类的start_requests方法
            start_requests = iter(self.spider.start_requests())

            # 执行引擎的open_spider,并传入爬虫实例和初始请求
            yield self.engine.open_spider(self.spider, start_requests)
            yield defer.maybeDeferred(self.engine.start)
        except Exception:
            # In Python 2 reraising an exception after yield discards
            # the original traceback (see https://bugs.python.org/issue7563),
            # so sys.exc_info() workaround is used.
            # This workaround also works in Python 3, but it is not needed,
            # and it is slower, so in Python 3 we use native `raise`.
            if six.PY2:
                exc_info = sys.exc_info()

            self.crawling = False
            if self.engine is not None:
                yield self.engine.close()

            if six.PY2:
                six.reraise(*exc_info)
            raise

    def _create_spider(self, *args, **kwargs):
        # 调用类方法from_crawler实例化
        return self.spidercls.from_crawler(self, *args, **kwargs)

    def _create_engine(self):
        return ExecutionEngine(self, lambda _: self.stop())

    @defer.inlineCallbacks
    def stop(self):
        if self.crawling:
            self.crawling = False
            yield defer.maybeDeferred(self.engine.stop)

实例化爬虫比较有意思,它不是通过普通的构造方法进行初始化,而是调用了类方法 from_crawler 进行的初始化,找到scrapy.Spider 类:(scrapy/spiders/__init__.py)

class Spider(object_ref):
    """Base class for scrapy spiders. All spiders must inherit from this
    class.
    """

    name = None
    custom_settings = None  # 自定义设置 

    def __init__(self, name=None, **kwargs):

        # spider name 必填
        if name is not None:
            self.name = name
        elif not getattr(self, 'name', None):
            raise ValueError("%s must have a name" % type(self).__name__)
        self.__dict__.update(kwargs)

        # 如果没有设置 start_urls,默认是[]
        if not hasattr(self, 'start_urls'):
            self.start_urls = []

    @property
    def logger(self):
        logger = logging.getLogger(self.name)
        return logging.LoggerAdapter(logger, {'spider': self})

    def log(self, message, level=logging.DEBUG, **kw):
        """Log the given message at the given log level

        This helper wraps a log call to the logger within the spider, but you
        can use it directly (e.g. Spider.logger.info('msg')) or use any other
        Python logger too.
        """
        self.logger.log(level, message, **kw)

    @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = cls(*args, **kwargs)
        spider._set_crawler(crawler)
        return spider

    def set_crawler(self, crawler):
        warnings.warn("set_crawler is deprecated, instantiate and bound the "
                      "spider to this crawler with from_crawler method "
                      "instead.",
                      category=ScrapyDeprecationWarning, stacklevel=2)
        assert not hasattr(self, 'crawler'), "Spider already bounded to a " \
                                             "crawler"
        self._set_crawler(crawler)

    def _set_crawler(self, crawler):
        self.crawler = crawler

        # 把settings对象赋给spider实例
        self.settings = crawler.settings
        crawler.signals.connect(self.close, signals.spider_closed)

    def start_requests(self):
        cls = self.__class__
        if method_is_overridden(cls, Spider, 'make_requests_from_url'):
            warnings.warn(
                "Spider.make_requests_from_url method is deprecated; it "
                "won't be called in future Scrapy releases. Please "
                "override Spider.start_requests method instead (see %s.%s)." % (
                    cls.__module__, cls.__name__
                ),
            )
            for url in self.start_urls:
                yield self.make_requests_from_url(url)
        else:
            for url in self.start_urls:
                yield Request(url, dont_filter=True)

    def make_requests_from_url(self, url):
        """ This method is deprecated. """
        return Request(url, dont_filter=True)

    def parse(self, response):
        raise NotImplementedError('{}.parse callback is not defined'.format(self.__class__.__name__))

    @classmethod
    def update_settings(cls, settings):
        settings.setdict(cls.custom_settings or {}, priority='spider')

    @classmethod
    def handles_request(cls, request):
        return url_is_from_spider(request.url, cls)

    @staticmethod
    def close(spider, reason):
        closed = getattr(spider, 'closed', None)
        if callable(closed):
            return closed(reason)

    def __str__(self):
        return "<%s %r at 0x%0x>" % (type(self).__name__, self.name, id(self))

    __repr__ = __str__

在这里可以看到,这个类方法其实也是调用了构造方法,进行实例化,同时也拿到了 settings 配置,

再看构造方法干了些什么?就是我们平时编写爬虫类时,最常用的几个属性:name、start_urls、custom_settings

  • name:在运行爬虫时通过它找到对应的爬虫脚本而使用;
  • start_urls:定义种子URL;
  • custom_settings:从字面意思可以看出,爬虫自定义配置,会覆盖配置文件的配置项;

 

 

引擎

 

分析完爬虫类的初始化后,还是回到Crawlercrawl方法(scrapy/crawler.py 中 Crawler 类 的 crawl 方法

紧接着就是创建 引擎对象,也就是 _create_engine 方法,这里直接进行了引擎初始化操作,看看都发生了什么?

在这里能看到,进行了核心组件的定义和初始化,包括:SchedulerDownloaderScrapyer,其中 Scheduler 只进行了类定义,没有实例化。

 

 

调度器

 

调度器初始化发生在引擎的 open_spider 方法中,

我们提前来看一下 调度器 的 初始化 完成了哪些工作?

调度器的初始化主要做了2件事:

  • 实例化请求指纹过滤器:用来过滤重复请求,可自己重写替换之;
  • 定义各种不同类型的任务队列:优先级任务队列、基于磁盘的任务队列、基于内存的任务队列;

 

 

请求指纹过滤器

 

先来看请求指纹过滤器是什么?在配置文件中定义的默认指纹过滤器是 RFPDupeFilter
DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'

请求指纹过滤器初始化时定义了指纹集合,这个集合使用内存实现的 set,而且可以控制这些指纹是否存入磁盘供下次重复使用。

指纹过滤器的主要职责是:过滤重复请求,可自定义过滤规则。

在下篇文章中会介绍到,每个请求是根据什么规则生成指纹,进而实现重复请求过滤逻辑的。

 

 

任务队列

 

调度器默认定义的2种队列类型:

  • 基于磁盘的任务队列:在配置文件可配置存储路径,每次执行后会把队列任务保存到磁盘上;
  • 基于内存的任务队列:每次都在内存中执行,下次启动则消失;

配置文件默认定义如下:

如果用户在配置文件中定义了 JOBDIR,那么则每次把任务队列保存在磁盘中,下次启动时自动加载。

如果没有定义,那么则使用的是内存队列。

细心的你会发现,默认定义的这些队列结构都是 后进先出 的,什么意思呢?

也就是说:Scrapy默认的采集规则是深度优先采集!

如何改变这种机制,变为 广度优先采集 呢?那么你可以看一下 scrapy.squeues 模块,其中定义了:

# 先进先出磁盘队列(pickle序列化)
PickleFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue, _pickle_serialize, pickle.loads)

# 后进先出磁盘队列(pickle序列化)
PickleLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, _pickle_serialize, pickle.loads)

# 先进先出磁盘队列(marshal序列化)
MarshalFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue, marshal.dumps, marshal.loads)

# 后进先出磁盘队列(marshal序列化)
MarshalLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, marshal.dumps, marshal.loads)

# 先进先出内存队列
FifoMemoryQueue = queue.FifoMemoryQueue

# 后进先出内存队列
LifoMemoryQueue = queue.LifoMemoryQueue

你只需要在配置文件中把队列类修改为 先进先出 队列类就可以了!有没有发现,模块化、组件替代再次发挥威力!

如果你想追究这些队列是如何实现的,可以参考scrapy作者写的 scrapy/queuelib 模块。

 

 

下载器

 

回头继续看引擎的初始化,来看下载器是如何初始化的。

在默认的配置文件 default_settings.py 中,下载器配置如下:

DOWNLOADER = 'scrapy.core.downloader.Downloader'

Downloader 实例化:

这个过程主要是初始化了 下载处理器下载器中间件管理器 以及从配置文件中拿到抓取请求控制相关参数。

下载器 DownloadHandlers 是做什么的?

下载器中间件 DownloaderMiddlewareManager 初始化发生了什么?

 

 

下载处理器

 

下载处理器在默认的配置文件中是这样配置的:

看到这里你应该能明白了,说白了就是需下载的资源是什么类型,就选用哪一种下载处理器进行网络下载,其中最常用的就是http https 对应的处理器。

从这里你也能看出,scrapy的架构是非常低耦合的,任何涉及到的组件及模块都是可重写和配置的。scrapy提供了基础的服务组件,你也可以自己实现其中的某些组件,修改配置即可达到替换的目的。

到这里,大概就能明白,下载处理器的工作就是:管理着各种资源对应的下载器,在真正发起网络请求时,选取对应的下载器进行资源下载。

但是请注意,在这个初始化过程中,这些下载器是没有被实例化的,也就是说,在真正发起网络请求时,才会进行初始化,而且只会初始化一次,后面会讲到。

 

 

下载器中间件管理器

 

下面来看下载器中间件 DownloaderMiddlewareManager 初始化,同样的这里又调用了类方法 from_crawler 进行初始化,DownloaderMiddlewareManager 继承了 MiddlewareManager 类,来看它在初始化做了哪些工作:

(scrapy/core/downloader/middleware.py)

from collections import defaultdict, deque
import logging
import pprint

from scrapy.exceptions import NotConfigured
from scrapy.utils.misc import create_instance, load_object
from scrapy.utils.defer import process_parallel, process_chain, process_chain_both

logger = logging.getLogger(__name__)


class MiddlewareManager(object):
    """所有中间件的父类,提供中间件公共的方法"""

    component_name = 'foo middleware'

    def __init__(self, *middlewares):
        self.middlewares = middlewares

        # 定义中间件方法
        self.methods = defaultdict(deque)
        for mw in middlewares:
            self._add_middleware(mw)

    @classmethod
    def _get_mwlist_from_settings(cls, settings):
        # 具体有哪些中间件类,子类定义
        raise NotImplementedError

    @classmethod
    def from_settings(cls, settings, crawler=None):
        # 调用子类_get_mwlist_from_settings得到所有中间件类的模块
        mwlist = cls._get_mwlist_from_settings(settings)
        middlewares = []
        enabled = []

        # 依次实例化
        for clspath in mwlist:
            try:
                # 加载这些中间件模块
                mwcls = load_object(clspath)
                mw = create_instance(mwcls, settings, crawler)
                middlewares.append(mw)
                enabled.append(clspath)
            except NotConfigured as e:
                if e.args:
                    clsname = clspath.split('.')[-1]
                    logger.warning("Disabled %(clsname)s: %(eargs)s",
                                   {'clsname': clsname, 'eargs': e.args[0]},
                                   extra={'crawler': crawler})

        logger.info("Enabled %(componentname)ss:\n%(enabledlist)s",
                    {'componentname': cls.component_name,
                     'enabledlist': pprint.pformat(enabled)},
                    extra={'crawler': crawler})

        # 调用构造方法
        return cls(*middlewares)

    @classmethod
    def from_crawler(cls, crawler):
        # 调用 from_settings
        return cls.from_settings(crawler.settings, crawler)

    def _add_middleware(self, mw):
        # 默认定义的,子类可覆盖
        # 如果中间件类有定义open_spider,则加入到methods
        if hasattr(mw, 'open_spider'):
            self.methods['open_spider'].append(mw.open_spider)

        # 如果中间件类有定义close_spider,则加入到methods
        # methods就是一串中间件的方法链,后期会依次调用
        if hasattr(mw, 'close_spider'):
            self.methods['close_spider'].appendleft(mw.close_spider)

    def _process_parallel(self, methodname, obj, *args):
        return process_parallel(self.methods[methodname], obj, *args)

    def _process_chain(self, methodname, obj, *args):
        return process_chain(self.methods[methodname], obj, *args)

    def _process_chain_both(self, cb_methodname, eb_methodname, obj, *args):
        return process_chain_both(self.methods[cb_methodname], \
            self.methods[eb_methodname], obj, *args)

    def open_spider(self, spider):
        return self._process_parallel('open_spider', spider)

    def close_spider(self, spider):
        return self._process_parallel('close_spider', spider)

create_instance 函数:

def create_instance(objcls, settings, crawler, *args, **kwargs):
    """Construct a class instance using its ``from_crawler`` or
    ``from_settings`` constructors, if available.

    At least one of ``settings`` and ``crawler`` needs to be different from
    ``None``. If ``settings `` is ``None``, ``crawler.settings`` will be used.
    If ``crawler`` is ``None``, only the ``from_settings`` constructor will be
    tried.

    ``*args`` and ``**kwargs`` are forwarded to the constructors.

    Raises ``ValueError`` if both ``settings`` and ``crawler`` are ``None``.
    """
    if settings is None:
        if crawler is None:
            raise ValueError("Specifiy at least one of settings and crawler.")
        settings = crawler.settings

    # 如果此中间件类定义了from_crawler,则调用此方法实例化
    if crawler and hasattr(objcls, 'from_crawler'):
        return objcls.from_crawler(crawler, *args, **kwargs)

    # 如果此中间件类定义了from_settings,则调用此方法实例化
    elif hasattr(objcls, 'from_settings'):
        return objcls.from_settings(settings, *args, **kwargs)
    else:
        # 上面2个方法都没有,则直接调用构造实例化
        return objcls(*args, **kwargs)

DownloaderMiddlewareManager 实例化:

下载器中间件管理器 继承了 MiddlewareManager 类,然后重写了 _add_middleware 方法,为下载行为定义默认的 下载前、下载后、异常时 对应的处理方法。 

中间件的职责是什么?从这里能大概看出,从某个组件流向另一个组件时,会经过一系列中间件,每个中间件都定义了自己的处理流程,相当于一个个管道,输入时可以针对数据进行处理,然后送达到另一个组件,另一个组件处理完逻辑后,又经过这一系列中间件,这些中间件可再针对这个响应结果进行处理,最终输出。

 

 

Scraper

 

下载器实例化完了之后,回到引擎的初始化方法中,然后是实例化 Scraper,在Scrapy源码分析(一)架构概览中已经大概说到,这个类没有在架构图中出现,但这个类其实是处于 EngineSpidersPipeline 之间,是连通这3个组件的桥梁。

来看它的初始化:scrapy/core/scraper.py

 

 

爬虫中间件管理器

 

SpiderMiddlewareManager 初始化:

爬虫中间件管理器初始化与之前的下载器中间件管理器类似,先是从配置文件中加载了默认的爬虫中间件类,然后依次注册爬虫中间件的一系列流程方法。

配置文件中定义的默认的爬虫中间件类如下:

这些默认的爬虫中间件职责分别如下:

  • HttpErrorMiddleware:会针对响应不是 200 错误进行逻辑处理;
  • OffsiteMiddleware:如果Spider中定义了 allowed_domains,会自动过滤除此之外的域名请求;
  • RefererMiddleware:追加 Referer 头信息;
  • UrlLengthMiddleware:控制过滤URL长度超过配置的请求;
  • DepthMiddleware:过滤超过配置深入的抓取请求;

当然,你也可以定义自己的爬虫中间件,来处理自己需要的逻辑。

 

 

Pipeline管理器

 

爬虫中间件管理器初始化完之后,然后就是 Pipeline 组件的初始化,默认的 Pipeline 组件是 ItemPipelineManager

可以看到 ItemPipelineManager 也是一个中间件管理器的子类,由于它的行为非常类似于中间件,但由于功能较为独立,所以属于核心组件之一。

从 Scraper 的初始化能够看到,它管理着 Spiders 和 Pipeline 相关的交互逻辑。

 

 

总结

 

到这里,所有组件:引擎、下载器、调度器、爬虫类、输出处理器都依次初始化完成,每个核心组件下其实都包含一些小的组件在里面,帮助处理某一环节的各种流程。