Scrapy 中的 Pipline管道

本小节中我们将详细介绍 Scrapy 中的 Pipeline 及其多种用法和使用场景。Pipeline 是 Scrapy 框架的一个重要模块,从前面的 Scrapy 架构图中我们可以看到它位于架构图的最左边,用于连续处理从网页中抓取到的每条记录,就像一个流水线工厂加工食品那样,完成食品最后的封装、保存等操作。此外,我们还会介绍 Scrapy 内置的图片管道,可以自动下载对应地址的图片。最后,我们会基于上述内容完成一个小说网站的爬取案例。

1. Scrapy 中的 Pipeline 介绍

Pipeline 的中文意思是管道,类似于工厂的流水线那样。Scrapy 中的 Pipeline 通常是和 Items 联系在一起的,其实就是对 Items 数据的流水线处理。 一般而言,Pipeline 的典型应用场景如下:

  • 数据清洗、去重
  • 验证数据的有效性
  • 按照自定义格式保存数据
  • 存储到合适的数据库中 (如 MySQL、Redis 或者 MongoDB)

通过前面的 Scrapy 架构图可知,Pipeline 位于 Scrapy 数据处理流程的最后一步,但是它也不是必须,Pipeline 默认处于关闭状态。如果需要的话,我们只需要在 settings.py 中设置 ITEM_PIPELINES 属性值即可。它是一个数组值,我们可以定义多个 Item Pipeline,并且在 ITEM_PIPELINES 中设置相应 Pipeline 的优先级。这样 Scrapy 会依次处理这些 Pipelines,最后达到我们想要的效果。

图片描述

item 经过 pipelines 处理

注意:上面的 pipeline 顺序和功能都可以任意调整,保证逻辑性即可。比如有一个去重的 pipeline 和保存到数据库的 pipeline,那么去重的 pipeline 一定要在保存数据库之前,这样保存的就是不重复的数据。

2. 如何编写自己的 Item Pipeline

编写自己的 Item Pipeline 非常简单,我们只需要编写一个简单的类,实现四个特定名称的方法即可 (部分方法非必须)。我们来简单说明下这三个方法:

  • open_spider(spider):非必需,参数 spider 即被关闭的 Spider 对象。这个方法是 MiddlewareManager 类中的方法,在 Spider 开启时被调用,主要做一些初始化操作,如连接数据库、打开要保存的文件等;
  • close_spider(spider):非必需,参数 spider 即被关闭的 Spider 对象。这个方法也是 MiddlewareManager 类中的方法,在 Spider 关闭时被调用,主要做一些如关闭数据库连接、关闭打开的文件等操作;
  • from_crawler(cls, crawler):非必需,在 Spider启用时调用,且早于 open_spider() 方法。这个方法我们很少去重载,可以不用;
  • process_item(item, spider):必须实现。该函数有两个参数,一个是表示被处理的 Item 对象,另一个是生成该 Item 的 Spider 对象。定义的 Item pipeline 会默认调用该方法对 Item 进行处理,这也是 Pipeline 的工作核心;

完成这样一个 Item Pipeline 后,将该类的路径地址添加到 settings.py 中的 ITEM_PIPELINES 中即可。下图是我们一个简单项目完成的两个 pipelines。

图片描述

一个简单项目的 pipelines 示例

3. 实战演练

学习了上面的一些知识,我们来使用一个简单的网站进行实战演练,在该过程中介绍更多的和 Item Pipeline 相关的用法。

假设我们是一名小说爱好者,我想到起点中文网上去找一些好的小说看,我该怎么找呢?起点中文网的月票榜是一个不错的参考方式,如下图所示:

图片描述

起点中文网月票榜

其实简单看一看就知道月票榜的 url 组成:

  • 主体 url:https://www.qidian.com/rank/yuepiao
  • 参数 month:02 表示 2 月份,03 表示 3 月份,目前为止最多到 7 月份;
  • 参数 chn:表示的是分类,-1 表示全部分类。21 表示玄幻,22表示仙侠;
  • 参数 page:表示第几页,一页有20个作品。

目前我们只需要从 01 月份开始到 07 月份的月票榜中,每次都取得第一页的数据,也就是月票榜的前20 名。7 个月份的前 20 名加起来,然后再去重,就得到了曾经的占据月票榜的作品,这中间大概率都是比较好看的书。完成这个简单的需求我们按照如下的步骤进行:

创建初始项目 qidian_yuepiao:

[root@server ~]# pyenv activate scrapy-test
(scrapy-test) [root@server ~]# cd scrapy-test
(scrapy-test) [root@server scrapy-test]# scrapy startproject qidian_yuepia

(scrapy-test) [root@server qidian_yuepiao]# ls
 __init__.py  items.py  middlewares.py  pipelines.py settings.py  spider

接下来我们准备获取小说作品的字段,大概会获取如下几个数据:

  • 小说名:name;
  • 小说作者:author;
  • 小说类型:fiction_type。比如玄幻、仙侠、科幻等;
  • 小说状态:state。连载还是完结;
  • 封面图片地址:image_url;
  • images:保存图片数据;
  • brief_introduction:作品简介;
  • book_url:小说的具体地址。

根据定义的这些字段,我们可以写出对应的 Items 类,如下:

(scrapy-test) [root@server qidian_yuepiao]# cat items.py 
# Define here the models for your scraped items
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/items.html

import scrapy


class QidianYuepiaoItem(scrapy.Item):
    # define the fields for your item here like:
    name = scrapy.Field()  
    author = scrapy.Field()
    fiction_type = scrapy.Field()
    state = scrapy.Field()
    image_url = scrapy.Field() 
    images = scrapy.Field()
    brief_introduction = scrapy.Field() 
    book_url = scrapy.Field()

到了最关键的地方,需要解析网页数据,提取月票榜的作品信息。这个和前面一些,我们只需要完成相应的 xpath 即可。此外,我们会从 01 月份的月票榜开始,每次会新生成一个 url,主要改动的就是月份参数,每次将月份数加一;如果当前月份大于07,则终止。

(scrapy-test) [root@server qidian_yuepiao]# touch spiders/qidian_yuepiao_parse.py
import re

from scrapy import Request
from scrapy.spiders import Spider
from  qidian_yuepiao.items import QidianYuepiaoItem


def num_to_str(num, size=2, padding='0'):
    """
    0 - > 00   1 -> 01   11 -> 11
    :param num:
    :param size:
    :param padding:
    :return:
    """
    str_num = str(num)
    while len(str_num) < size:
        str_num = padding + str_num
    return str_num


class QidianSpider(Spider):
    name = "qidian_yuepiao_spider"
    start_urls = [
        "https://www.qidian.com/rank/yuepiao?month=01&chn=-1&page=1"
    ]

    def parse(self, response):
        fictions = response.xpath('//div[@id="rank-view-list"]/div/ul/li')
        for fiction in fictions:
            name = fiction.xpath('div[@class="book-mid-info"]/h4/a/text()').extract_first()
            author = fiction.xpath('div[@class="book-mid-info"]/p[@class="author"]/a[1]/text()').extract_first()
            fiction_type = fiction.xpath('div[@class="book-mid-info"]/p[@class="author"]/a[1]/text()').extract_first()
            # 注意一定要是列表,不然会报错
            image_url = ['http:{}'.format(fiction.xpath('div[@class="book-img-box"]/a/img/@src').extract()[0])]
            brief_introduction = fiction.xpath('div[@class="book-mid-info"]/p[@class="intro"]/text()').extract_first()
            state = fiction.xpath('div[@class="book-mid-info"]/p[@class="author"]/a[2]/text()').extract()[0]
            book_url = fiction.xpath('div[@class="book-mid-info"]/h4/a/@href').extract()[0]  

            item = QidianYuepiaoItem()
            item['name'] = name
            item['author'] = author
            item['fiction_type'] = fiction_type
            item['brief_introduction'] = brief_introduction.strip()
            item['image_url'] = image_url
            item['state'] = state
            item['book_url'] = book_url

            yield item

        # 提取月份数,同时也要提取请求的url
        url = response.url
        regex = "https://(.*)?month=(.*?)&(.*)"
        pattern = re.compile(regex)
        m = pattern.match(url)
        if not m:
            return []
        prefix = m.group(1)
        month = int(m.group(2))
        suffix = m.group(3)

        # 大于7月份则停止,目前是2020年7月20日
        if month > 7:
            return
 
        # 一定要将月份转成01, 02, s03这样的形式,否则不能正确请求到数据
        next_month = num_to_str(month + 1)
        
        next_url = f"https://{prefix}?month={next_month}&{suffix}"
        yield Request(next_url)

最后到了我们本节课的重点。首先我想要将数据保存成 json 格式,存储到文本文件中,但是在保存之前,需要对作品去重。因为有些作品会连续好几个月出现在月票榜的前20位置上,会有比较多重复。我们通过作品的 url 地址来唯一确定该小说。因此需要定义两个 Item Pipeline:

import json

from itemadapter import ItemAdapter
from scrapy.exceptions import DropItem


class QidianYuepiaoPipeline:
    """
    保存不重复的数据到文本中
    """
    def open_spider(self, spider):
        self.file = open("yuepiao_top.json", 'w+')

    def close_spider(self, spider):
        self.file.close()

    def process_item(self, item, spider):
        data = json.dumps(dict(item), ensure_ascii=False)
        self.file.write(f"{data}n")
        return item


class DuplicatePipeline:
    """
    去除重复的数据,重复数据直接抛出异常,不会进入下一个流水线处理
    """
    def __init__(self):
        self.book_url_set = set() 

    def process_item(self, item, spider):
        if item['book_url'] in self.book_url_set:
            raise DropItem('duplicate fiction, drop it')
        self.book_url_set.add(item['book_url'])
        return item

我来简单介绍下上面实现的两个 pipelines 的代码。首先爬虫抓取的 item 结果经过的是 DuplicatePipeline 这个管道 (我们通过管道的优先级控制),我们在 DuplicatePipeline 中定义了一个全局的集合 (set),在 管道的核心方法process_item() 中,我们先判断传过来的 item 中 book_url 的值是否存在,如果存在则判定重复,然后抛出异常,这样下一个管道 (即 QidianYuepiaoPipeline) 就不会去处理;

在经过的第二个管道 (QidianYuepiaoPipeline) 中,我们主要是将不重复 item 保存到本地文件中,因此我们会在 open_spider() 方法中打开文件句柄,在 close_spider() 方法中关闭文件句柄,而在 process_item() 中将 item 数据保存到指定的文件中。

接着就是将这两个 Pipelines 加到 settings.py 中:

ITEM_PIPELINES = {
    'qidian_yuepiao.pipelines.DuplicatePipeline': 200,
    'qidian_yuepiao.pipelines.QidianYuepiaoPipeline': 300,
}

最后,我们来介绍一个 Scrapy 内置的图片管道,其实现的 Pipeline 代码位置为:scrapy/pipelines/images.py,对应的还有一个内置的文件管道。我们不需要编写任何代码,只需要在 settings.py 中指定下载的图片字段即可:

# 下载图片存储位置
IMAGES_STORE = '/root/scrapy-test/qidian_yuepiao/qidian_yuepiao/images'
# 保存下载图片url地址的字段
IMAGES_URLS_FIELD = 'image_url'
# 图片保存地址字段
IMAGES_RESULT_FIELD = 'images'
IMAGES_THUMBS = {
  'small': (102, 136),
  'big': (150, 200)
}

# ...

ITEM_PIPELINES = {
    'scrapy.pipelines.images.ImagesPipeline': 1,
    'qidian_yuepiao.pipelines.DuplicatePipeline': 200,
    'qidian_yuepiao.pipelines.QidianYuepiaoPipeline': 300,
}

由于 ImagesPipeline 继承自 FilesPipeline,我们可以从官网的介绍中知道该图片下载功能的执行流程如下:

  • 在 spider 中爬取需要下载的图片链接,将其放入 item 的 image_url 字段中;
  • spider 将得到的 item 传送到 pipeline 进行处理;
  • 当 item 到达 Image Pipeline 处理时,它会检测是否有 image_url 字段,如果存在的话,会将该 url 传递给 scrapy 调度器和下载器;
  • 下载完成后会将结果写入 item 的另一个字段 images,images 包含了图片的本地路径、图片校验、以及图片的url;

完成了以上四步之后,我们的这样一个简单需求就算完成了。还等什么,快来运行看看!以下是视频演示:

这样爬取数据是不是非常有趣?使用了 Scrapy 框架后,我们的爬取流程会变得比较固定化以及流水线化。但我们不仅仅要学会怎么使用 Scrapy 框架,还要能够基于 Scrapy 框架在特定场景下做些改造,这样才能达到完全驾驭 Scrapy 框架的目的。

4. 小结

本小节中,我们介绍了 Scrapy 中 Pipeline 相关的知识并在起点中文网上进行了简单的演示。在我们的爬虫项目中使用了两个自定义管道,分别用于去除重复小说以及将非重复的小说数据保存到本地文件中;另外我们还启用了 Scrapy 内置的图片下载管道,帮助我们自动处理图片 URL 并下载。