深入分析 crawl 命令的执行过程

今天我们来跟踪学习 scrapy crawl spider_name 命令的执行过程,从这个过程中我们将看到 Scrapy 的引擎模块的作用。它是整个 Scrapy 其他模块共同的沟通主体,在 Scrapy 中处于核心模块的地位,可以说 Scrapy 的引擎模块就是它的大脑。本小节就让我们一起来探索 Scrapy “大脑” 的奥秘吧 !

1. Crawler 类及其相关类

我们先简单介绍下 Scrapy 中几个常用的基础类:Crawler、CrawlerRunner、CrawlerProcess。这些是我们分析 Scrapy 源码的基础知识点。

1.1 Crawler 类

# 源码位置:scrapy/crawler.py
# ...

class Crawler:

    def __init__(self, spidercls, settings=None):
        if isinstance(spidercls, Spider):
            raise ValueError('The spidercls argument must be a class, not an object')

        if isinstance(settings, dict) or settings is None:
            settings = Settings(settings)

        self.spidercls = spidercls
        self.settings = settings.copy()
        self.spidercls.update_settings(self.settings)

        # 初始化一些属性
        # ...

        # 日志格式化类
        lf_cls = load_object(self.settings['LOG_FORMATTER'])
        self.logformatter = lf_cls.from_crawler(self)
        # 扩展管理类
        self.extensions = ExtensionManager.from_crawler(self)

        self.settings.freeze()
        self.crawling = False
        # 爬虫模块
        self.spider = None
        # 引擎模块
        self.engine = None

    @defer.inlineCallbacks
    def crawl(self, *args, **kwargs):
        if self.crawling:
            raise RuntimeError("Crawling already taking place")
        self.crawling = True

        try:
            self.spider = self._create_spider(*args, **kwargs)
            self.engine = self._create_engine()
            # 获取初始下载请求
            start_requests = iter(self.spider.start_requests())
            # 调用引擎的open_spider()方法
            yield self.engine.open_spider(self.spider, start_requests)
            # 生成一个Deferred对象
            yield defer.maybeDeferred(self.engine.start)
        except Exception:
            self.crawling = False
            if self.engine is not None:
                yield self.engine.close()
            raise

    def _create_spider(self, *args, **kwargs):
        # 实例化Spider类
        return self.spidercls.from_crawler(self, *args, **kwargs)

    def _create_engine(self):
        # 生成引擎实例
        return ExecutionEngine(self, lambda _: self.stop())

    @defer.inlineCallbacks
    def stop(self):
        """Starts a graceful stop of the crawler and returns a deferred that is
        fired when the crawler is stopped."""
        if self.crawling:
            self.crawling = False
            yield defer.maybeDeferred(self.engine.stop)

Crawler 类的属性比较丰富,方法较少,最核心的就是这个 crawl() 方法了。可以看到它其实就完成了以下几个动作:

  • 获取初始的 Request 请求;
  • 调用引擎的 open_spider() 方法;
  • 将引擎的 start() 方法添加到 Deferred 对象中;

1.2 CrawlerRunner

该类有比较详细的注释,源码里对它的介绍如下:

这是一个简便的辅助类,用于跟踪、管理和运行已设置中的爬虫程序。

我们先来看该类的实例化方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerRunner:
    # ...
    
    @staticmethod
    def _get_spider_loader(settings):
        """ Get SpiderLoader instance from settings """
        cls_path = settings.get('SPIDER_LOADER_CLASS')
        # 导入spider loader类
        loader_cls = load_object(cls_path)
        excs = (DoesNotImplement, MultipleInvalid) if MultipleInvalid else DoesNotImplement
        try:
            verifyClass(ISpiderLoader, loader_cls)
        except excs:
            # 打印异常信息
            # ...
        return loader_cls.from_settings(settings.frozencopy())
    
    def __init__(self, settings=None):
        # scrapy全局配置
        if isinstance(settings, dict) or settings is None:
            settings = Settings(settings)
        self.settings = settings
        # 根据爬虫名,加载相应的Spdier类
        self.spider_loader = self._get_spider_loader(settings)
        # 用于保存所有的Crawler对象
        self._crawlers = set()
        # self._active用于保存调用Crawler.crawl()方法,返回的defer对象
        self._active = set()
        self.bootstrap_failed = False
        self._handle_twisted_reactor()

注意比较重要的属性值有:

  • self.settings:保存了全局的配置信息;
  • self.spider_loader:用于根据相应的爬虫名加载对应的 Spider 类;
  • self._crawlers:用于保存所有的 Crawler 对象,使用的是 python 中的集合类型;

接下来我们看看该类中几个重要方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerRunner:
    # ...
    
    @property
    def spiders(self):
        # 打印告警信息
        # ...
        return self.spider_loader

    def crawl(self, crawler_or_spidercls, *args, **kwargs):
        if isinstance(crawler_or_spidercls, Spider):
            # 抛出异常,不能是Spider对象,只能是Spider类或者Crawler对象
            # ...
            
        crawler = self.create_crawler(crawler_or_spidercls)
        return self._crawl(crawler, *args, **kwargs)

    def _crawl(self, crawler, *args, **kwargs):
        self.crawlers.add(crawler)
        # 调用crawl()方法生成一个Deferred对象
        d = crawler.crawl(*args, **kwargs)
        # 加入活跃爬虫的集合
        self._active.add(d)

        def _done(result):
            # 执行完后的收尾工作
            self.crawlers.discard(crawler)
            self._active.discard(d)
            self.bootstrap_failed |= not getattr(crawler, 'spider', None)
            return result
        # 将_done()方法加入成功和失败的回调链中
        return d.addBoth(_done)

    def create_crawler(self, crawler_or_spidercls):
        if isinstance(crawler_or_spidercls, Spider):
            # 抛出异常,不能是Spider对象,只能是Spider类或者Crawler对象
            # ...
        if isinstance(crawler_or_spidercls, Crawler):
            # 如果是Crawler实例
            return crawler_or_spidercls
        # 否则创建Crawler实例
        return self._create_crawler(crawler_or_spidercls)

    def _create_crawler(self, spidercls):
        if isinstance(spidercls, str):
            spidercls = self.spider_loader.load(spidercls)
        # 返回Crawler实例
        return Crawler(spidercls, self.settings)    

上面的代码量都比较少,比较容易理解。不用纠结代码的细节,把握代码的作用即可,对于 Scrapy 框架有一个全貌即可。来简单描述下上面的方法:

  • crawl():爬虫开始执行爬取动作,crawl 命令调用的入口方法;
  • create_crawler():创建一个 Crawler 实例;

在看看这个类剩余的几个方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerRunner:
    # ...
    
    def stop(self):
        return defer.DeferredList([c.stop() for c in list(self.crawlers)])

    @defer.inlineCallbacks
    def join(self):
        while self._active:
            yield defer.DeferredList(self._active)

    def _handle_twisted_reactor(self):
        if self.settings.get("TWISTED_REACTOR"):
            verify_installed_reactor(self.settings["TWISTED_REACTOR"])

来简单说明下上面几个方法:

  • stop()join() 方法都是停止或者同步管理的爬虫,返回一个 Deferred 对象;

  • _handle_twisted_reactor:默认配置中 TWISTED_REACTOR 的值为 None。其校验安装的 reactor 代码也比较简单,就是导入 TWISTED_REACTOR 路径对应的类,然后判断 twisted 中的 reactor 是否为该类的一个实例,不是就抛出异常:

    # 源码位置:scrapy/utils/reactor.py
    # ...
    
    def verify_installed_reactor(reactor_path):
        from twisted.internet import reactor
        # 导入reactor_path位置的类
        reactor_class = load_object(reactor_path)
        # 判断twisted中的reactor是否为其实例
        if not isinstance(reactor, reactor_class):
            # 抛出异常
            # ...
    

到此,这个类的相关属性及方法就介绍完毕了,接下来我们学习它的一个子类:CrawlerProcess

1.3 CrawlerProcess

该类用于在一个进程中同时运行多个 Scrapy 爬虫,它继承自上面的 CrawlerRunner 类。先学习其 __init__() 方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerProcess(CrawlerRunner):
    def __init__(self, settings=None, install_root_handler=True):
        # 调用父类的__init__()方法
        super(CrawlerProcess, self).__init__(settings)
        # 给一些信号设置相应的回调方法
        install_shutdown_handlers(self._signal_shutdown)
        # 初始化scrapy中默认的日志配置
        configure_logging(self.settings, install_root_handler)
        # 打印一些基本信息
        log_scrapy_info(self.settings)
        
    def _signal_shutdown(self, signum, _):
        from twisted.internet import reactor
        # 注册信号回调方法
        install_shutdown_handlers(self._signal_kill)
        signame = signal_names[signum]
        # ...
        reactor.callFromThread(self._graceful_stop_reactor)

    def _signal_kill(self, signum, _):
        from twisted.internet import reactor
        install_shutdown_handlers(signal.SIG_IGN)
        signame = signal_names[signum]
        logger.info('Received %(signame)s twice, forcing unclean shutdown',
                    {'signame': signame})
        reactor.callFromThread(self._stop_reactor)
        
    # ...

可以看到 CrawlerProcess 类对象的初始化方法中先是调用父类的 __init__() 初始化相关属性,接着注册信号的回调方法,主要是终止信号。我们来看看这个 install_shutdown_handlers() 方法的代码:

# 源码位置:scrapy/utils/ossignal.py
# ...

def install_shutdown_handlers(function, override_sigint=True):
    """Install the given function as a signal handler for all common shutdown
    signals (such as SIGINT, SIGTERM, etc). If override_sigint is ``False`` the
    SIGINT handler won't be install if there is already a handler in place
    (e.g.  Pdb)
    """
    from twisted.internet import reactor
    reactor._handleSignals()
    # 注册SIGTERM信号的回调
    signal.signal(signal.SIGTERM, function)
    if signal.getsignal(signal.SIGINT) == signal.default_int_handler or 
            override_sigint:
        signal.signal(signal.SIGINT, function)
    # Catch Ctrl-Break in windows
    if hasattr(signal, 'SIGBREAK'):
        # 注册SIGBREAK信号的回调
        signal.signal(signal.SIGBREAK, function)

可以看到,上面的代码就是一些注册信号的回调方法。接下来我们来看 CrawlerRunner 类中比较重要的几个方法:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerProcess(CrawlerRunner):
    # ...
    
    def start(self, stop_after_crawl=True):
        from twisted.internet import reactor
        if stop_after_crawl:
            d = self.join()
            # Don't start the reactor if the deferreds are already fired
            if d.called:
                return
            d.addBoth(self._stop_reactor)

        resolver_class = load_object(self.settings["DNS_RESOLVER"])
        resolver = create_instance(resolver_class, self.settings, self, reactor=reactor)
        resolver.install_on_reactor()
        tp = reactor.getThreadPool()
        tp.adjustPoolsize(maxthreads=self.settings.getint('REACTOR_THREADPOOL_MAXSIZE'))
        reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
        reactor.run(installSignalHandlers=False)  # blocking call

    def _graceful_stop_reactor(self):
        # 先调用stop()方法停止爬虫进程
        d = self.stop()
        # 最后回调停止reactor的方法
        d.addBoth(self._stop_reactor)
        return d

    def _stop_reactor(self, _=None):
        from twisted.internet import reactor
        try:
            reactor.stop()
        except RuntimeError:  # raised if already stopped or in shutdown stage
            pass

    # ...

来看这个 start() 方法,其中比较重要的就是调用了 reactor.run() 方法。_graceful_stop_reactor()_stop_reactor() 从名字上就可以看出它们是关于停止 reactor 的方法。

2. crawl 命令追踪

现在我们要追踪前面多次使用的 scrapy crawl xxxx 命令的执行过程。根据前面的学习成果,我们知道 crawl 命令会调用 scrapy/commands/crawl.py 文件中 Command 类的 run() 方法:

# 源码位置:scrapy/commands/crawl.py
# ...

class Command(BaseRunSpiderCommand):
    # ...
    
    def run(self, args, opts):
        # 没带参数或者带多了参数,抛出异常
        # ...
        
        # 获取爬虫名
        spname = args[0]

        # 核心是调用self.crawler_process执行爬取动作
        crawl_defer = self.crawler_process.crawl(spname, **opts.spargs)

        if getattr(crawl_defer, 'result', None) is not None and issubclass(crawl_defer.result.type, Exception):
            # 如果结果异常或者没有结果,设置退出码为1
            self.exitcode = 1
        else:
            # 调用self.crawler_process的start()方法
            self.crawler_process.start()

            if self.crawler_process.bootstrap_failed or 
                    (hasattr(self.crawler_process, 'has_exception') and self.crawler_process.has_exception):
                self.exitcode = 1

首先我们从早先的调用代码中可以知道 self.crawler_process 其实是 CrawlerProcess 类的一个实例:

# 源码位置:scrapy/cmdline.py
# ...

def execute(argv=None, settings=None):
    # ...
    
    # 核心,设置command类的核心处理类
    cmd.crawler_process = CrawlerProcess(settings)
    
    # ...

这样命令的核心执行方法其实就是调用 CrawlerProcess 对象的 crawl() 方法。我们从一小节的学习中可知:CrawlerProcess 对象的 crawl() 方法其实是父类 (CrawlerRunner) 中的 crawl() 方法,且其调用的核心就两行语句:

crawler = self.create_crawler(crawler_or_spidercls)
return self._crawl(crawler, *args, **kwargs)

这个 crawler 是 Crawler 类的一个实例,最后在调用 self._crawl() 方法时,通过代码可知该方法其实是调用 Crawler 对象的 crawl() 方法开始执行 Scrapy 爬虫程序:

# 源码位置:scrapy/crawler.py
# ...

class CrawlerRunner:
    # ...

    def _crawl(self, crawler, *args, **kwargs):
        # ...
        d = crawler.crawl(*args, **kwargs)
        # ...

到此,我们知道 crawl 命令执行的核心方法其实是 Crawler 对象中的 crawl() 方法。 接下来,我们就是要从这个 crawl() 方法入手。

前面我们已经介绍过 crawl() 方法,该方法会先根据传入的爬虫名来创建 Spider 实例以及 Engine 实例:

self.spider = self._create_spider(*args, **kwargs)
self.engine = self._create_engine()

接着是得到起始的爬取 URL:

start_requests = iter(self.spider.start_requests())

这部分调用的正是 Spider 对象的 start_requests()方法获取初始的 URLs,我们也可以在自定义的爬虫中通过重写该方法来获取自定义的初始 URLs。

在接下来就是两个 yield 语句,这两句也是核心

yield self.engine.open_spider(self.spider, start_requests)
yield defer.maybeDeferred(self.engine.start)

这样子,我们的跟踪目标就要转入到 Scrapy 源码的核心目录下了,其调用的是引擎模块中的两个方法,位于我们之前接触的 engine.py 文件中。来分别看着两个方法的代码:

# 源码位置: scrapy/core/engine.py
# ...

class ExecutionEngine:
    # ...
    
    @defer.inlineCallbacks
    def open_spider(self, spider, start_requests=(), close_if_idle=True):
        if not self.has_capacity():
            raise RuntimeError("No free spider slot when opening %r" % spider.name)
        logger.info("Spider opened", extra={'spider': spider})
        # 获取CallLaterOnce的实例,延迟调用一次
        nextcall = CallLaterOnce(self._next_request, spider)
        # 获取调度器
        scheduler = self.scheduler_cls.from_crawler(self.crawler)
        # 通过Spider中间件后,获取初始的urls,对应着数据流图的1
        start_requests = yield self.scraper.spidermw.process_start_requests(start_requests, spider)
        slot = Slot(start_requests, close_if_idle, nextcall, scheduler)
        self.slot = slot
        self.spider = spider
        # 打开调度器
        yield scheduler.open(spider)
        yield self.scraper.open_spider(spider)
        # 统计用的
        self.crawler.stats.open_spider(spider)
        yield self.signals.send_catch_log_deferred(signals.spider_opened, spider=spider)
        # 延迟调用self._next_request()方法,内部使用reactor.callLater()方法实现
        slot.nextcall.schedule()
        slot.heartbeat.start(5)

看上面的代码,在 open_spider() 方法中我们可以看到,在我们调用的 Spider 对象中产生的起始请求在经过 Spider 中间件处理后再给回引擎模块;接着我们还会打开调度器以及爬取开关 (self.scraper.open_spider()) 和统计开关 (self.crawler.stats.open_spider())。

紧接着就是内部使用 reactor.callLater() 方法实现对 self._next_request() 方法的延迟调用:

# 源码位置:scrapy/utils/reactor.py
# ...

class CallLaterOnce:
    """Schedule a function to be called in the next reactor loop, but only if
    it hasn't been already scheduled since the last time it ran.
    """

    def __init__(self, func, *a, **kw):
        self._func = func
        self._a = a
        self._kw = kw
        self._call = None

    def schedule(self, delay=0):
        '''设置延迟调用'''
        from twisted.internet import reactor
        if self._call is None:
            self._call = reactor.callLater(delay, self)

    def cancel(self):
        if self._call:
            self._call.cancel()

    def __call__(self):
        # 在对象调用后,self._call设置为空,并调用方法
        self._call = None
        return self._func(*self._a, **self._kw)
    
# ...

看完上面的代码后,我们可知接下来就要进入 self._next_request() 方法中执行。继续来看该方法的代码:

# 源码位置:scrapy/core/engine.py
# ...

class ExecutionEngine:
    # ...
    
    def _next_request(self, spider):
        slot = self.slot
        # 不正确的情况,直接return 
        # ...

        while not self._needs_backout(spider):
            """爬虫不需要返回,然后会从调度器获取下一个请求"""
            if not self._next_request_from_scheduler(spider):
                break

        if slot.start_requests and not self._needs_backout(spider):
            try:
                request = next(slot.start_requests)
            except StopIteration:
                slot.start_requests = None
            except Exception:
                slot.start_requests = None
                logger.error('Error while obtaining start requests',
                             exc_info=True, extra={'spider': spider})
            else:
                self.crawl(request, spider)

        if self.spider_is_idle(spider) and slot.close_if_idle:
            self._spider_idle(spider)

注意:一开始的调度器中并没有请求的 URLs,因此 _next_request() 方法每次调用时会从 start_requests 中获取请求,然后调用 self.crawl() 方法。我们来看看具体的 self.crawl() 方法的代码:

# 源码位置:scrapy/core/engine.py
# ...

class ExecutionEngine:
    # ...
    
    def crawl(self, request, spider):
        if spider not in self.open_spiders:
            raise RuntimeError("Spider %r not opened when crawling: %s" % (spider.name, request))
        # 将请求加入调度器队列
        self.schedule(request, spider)
        # 继续调用self._next_request()方法
        self.slot.nextcall.schedule()

    def schedule(self, request, spider):
        self.signals.send_catch_log(signals.request_scheduled, request=request, spider=spider)
        # 这里就是简单将请求压入调度
        if not self.slot.scheduler.enqueue_request(request):
            self.signals.send_catch_log(signals.request_dropped, request=request, spider=spider)

这个代码就非常清楚了,self.crawl() 方法并不直接下载对应 url 的网页,而是将其推入调度器的下载队列,然后再调回 self._next_request() 方法 (对应着就是 self.slot.nextcall.schedule() 这一句)。而真正执行下载任务的代码为:

while not self._needs_backout(spider):
    """爬虫不需要返回,然后会从调度器获取下一个请求"""
    if not self._next_request_from_scheduler(spider):
        break

这段代码会循环从调度器中获取下一个请求,如果没有则跳出循环往下执行。如果有请求,则会执行 self._download() 方法去下载对应的网页,而这个方法正是在第24节中介绍过的下载网页的入口。

另外,我们还剩下一个 yield 没介绍,就是引擎的启动:yield defer.maybeDeferred(self.engine.start)。该方法比较简单,就是记录下起始时间以及设置 self.running 标志:

# 源码位置:scrapy/core/engine.py
# ...

class ExecutionEngine:
    # ...

    @defer.inlineCallbacks
    def start(self):
        """Start the execution engine"""
        if self.running:
            raise RuntimeError("Engine already running")
        # 记录引擎开始执行时间
        self.start_time = time()
        # 发送引擎执行信号
        yield self.signals.send_catch_log_deferred(signal=signals.engine_started)
        # 设置running标识
        self.running = True
        self._closewait = defer.Deferred()
        yield self._closewait

来总结下 crawl 命令的执行流程:

图片描述

crawl命令的执行流程

对于 crawl 命令的追踪到这里就结束了。当然,这里我们还有一些内容没有介绍到,比如调度器如何实现请求的入队、出队等,再比如 Spider 模块和 Engine 模块的交互过程等,这些就留给读者课后继续探索了。

3. 小结

本小节中我们主要介绍了 scrapy/crawler.py 文件中的代码以及追踪了 scrapy crawl xxx 命令的执行过程。通过追踪这个命令,我们可以了解 Scrapy 爬虫的整个运行流程,回过头来在看 Scrapy 的架构图以及数据流图,是不是又多了一份理解与认识?当然,对于 Scrapy 框架源码我们还有许多代码没有介绍到,比如 scrapy 的扩展模块 (scrapy/extensions)、链接抽取模块 (scrapy/linkextractors)。这些部分并不属于 Scrapy 的核心部分,也不是学习 Scrapy 的重点,读者有兴趣可以课后认真研究这些模块的代码,以增强对 Scrapy 框架的认识。好了,整个 Scrapy 框架的的介绍到此就结束了,我们青山不改,江湖再会!