author: 专注Python实战,分享爬虫与数据分析干货
title: Python爬虫实战⑫|Scrapy Item与Pipeline,数据提取与存储
update: 2026-04-26
tags: Python,爬虫,Scrapy,Item,Pipeline,数据清洗,图片下载

作者:专注Python实战,分享爬虫与数据分析干货
更新时间:2026年4月
适合人群:已掌握Scrapy基础、想深入数据处理流程的开发者


前言:Scrapy的数据流,你真的懂吗?

Request → Downloader → Response → Spider.parse()
                              ↓
                         yield Item
                              ↓
                      Item Pipeline(多个)
                              ↓
                       保存到文件/数据库

Scrapy的数据流非常清晰:Spider解析页面 → 产出Item → Pipeline处理 → 最终存储。

今天深入讲解Item和Pipeline,写出生产级的Scrapy爬虫。


一、Item进阶用法

1.1 Item Loader(Item加载器)

不用Item Loader时,提取逻辑全写在Spider里:

# 传统方式:提取逻辑全在Spider
def parse(self, response):
    item = MyItem()
    item["title"] = response.xpath("//h1/text()").get()
    item["price"] = response.xpath("//span[@class='price']/text()").get()
    item["desc"] = response.xpath("//div[@class='desc']//text()").getall()
    yield item

Item Loader方式: 提取逻辑和数据处理分离,代码更清晰:

from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, MapCompose, Join

class ProductLoader(ItemLoader):
    """产品数据加载器"""

    # 默认输出处理器:取第一个值
    default_output_processor = TakeFirst()

    # 各字段的输入处理器(数据清洗)
    title_in = MapCompose(str.strip, str.title)
    price_in = MapCompose(str.strip, lambda x: x.replace("¥", ""))
    desc_in = Join()  # 将多个文本拼接成一个

    # 各字段的输出处理器
    title_out = TakeFirst()
    price_out = TakeFirst()
# Spider中使用ItemLoader
def parse_product(self, response):
    loader = ProductLoader(item=ProductItem(), response=response)

    # 添加选择器(会自动执行in处理器)
    loader.add_xpath("title", "//h1[@class='product-title']/text()")
    loader.add_xpath("price", "//span[@class='price']/text()")
    loader.add_xpath("desc", "//div[@class='description']//text()")
    loader.add_xpath("images", "//div[@class='gallery']//img/@src")

    return loader.load_item()

1.2 内置处理器

from scrapy.loader.processors import (
    TakeFirst,     # 取第一个
    Join,          # 拼接成字符串
    MapCompose,    # 对每个值应用函数
    Compose,       # 组合多个函数
    Identity,      # 原样返回
    SelectJmes,    # JSON字段提取
)

# TakeFirst:返回第一个非空值
loader.add_xpath("title", "//h1/text()")
loader.add_xpath("title2", "//h2/text()")
# → 取第一个有值的字段

# Join:拼接所有值
loader.add_xpath("text", "//p/text()")
# → 将所有p标签的文本拼接

# MapCompose:逐个应用函数
def clean_price(x):
    return x.replace("¥", "").replace(",", "").strip()

loader.add_xpath("price", "//span/text()", MapCompose(clean_price))

二、Pipeline进阶用法

2.1 多Pipeline组合

# settings.py
ITEM_PIPELINES = {
    # 数字越小越先执行
    "myproject.pipelines.ValidationPipeline": 100,   # 1. 数据验证
    "myproject.pipelines.CleaningPipeline": 200,      # 2. 数据清洗
    "myproject.pipelines.DuplicatesPipeline": 300,     # 3. 去重
    "myproject.pipelines.ImageDownloadPipeline": 400, # 4. 图片下载
    "myproject.pipelines.MongoDBPipeline": 500,      # 5. MongoDB存储
    "myproject.pipelines.MySQLPipeline": 600,        # 6. MySQL存储
}

2.2 完整Pipeline实例

import pymongo
import json
from itemadapter import ItemAdapter
from scrapy import Item, Field

class ValidationPipeline:
    """验证字段是否完整"""

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # 检查必填字段
        required_fields = ["title", "url"]
        for field in required_fields:
            if not adapter.get(field):
                spider.logger.warning(f"缺少必填字段 {field},丢弃")
                return None  # 返回None会丢弃此Item

        # 检查数据类型
        if adapter.get("price"):
            try:
                price = float(adapter["price"])
                if price <= 0:
                    spider.logger.warning(f"价格异常: {price},丢弃")
                    return None
                adapter["price"] = price
            except ValueError:
                adapter["price"] = 0.0

        return item


class CleaningPipeline:
    """数据清洗"""

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # 清理文本字段
        for field in ["title", "desc", "author"]:
            value = adapter.get(field)
            if value:
                adapter[field] = str(value).strip()
                # 去除多余空格
                adapter[field] = " ".join(adapter[field].split())

        # 评分标准化
        rating = adapter.get("rating")
        if rating:
            try:
                adapter["rating"] = round(float(rating), 1)
            except (ValueError, TypeError):
                adapter["rating"] = 0.0

        # 处理列表字段
        tags = adapter.get("tags")
        if tags and isinstance(tags, list):
            adapter["tags"] = [t.strip() for t in tags if t.strip()]

        return item


class DuplicatesPipeline:
    """去重Pipeline"""

    def __init__(self):
        self.seen_ids = set()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        item_id = adapter.get("url") or adapter.get("title")

        if item_id in self.seen_ids:
            spider.logger.debug(f"重复数据,已丢弃: {item_id}")
            return None

        self.seen_ids.add(item_id)
        return item


class MongoDBPipeline:
    """MongoDB存储Pipeline"""

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db

    @classmethod
    def from_crawler(cls, crawler):
        """从settings获取配置"""
        return cls(
            mongo_uri=crawler.settings.get("MONGO_URI", "mongodb://localhost:27017"),
            mongo_db=crawler.settings.get("MONGO_DATABASE", "crawler_db"),
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        self.db[spider.name].insert_one(dict(adapter))
        return item

    def close_spider(self, spider):
        self.client.close()


class MySQLPipeline:
    """MySQL存储Pipeline"""

    def __init__(self, db_config):
        self.db_config = db_config

    @classmethod
    def from_crawler(cls, crawler):
        return cls(db_config={
            "host": crawler.settings.get("MYSQL_HOST", "localhost"),
            "port": crawler.settings.get("MYSQL_PORT", 3306),
            "user": crawler.settings.get("MYSQL_USER", "root"),
            "password": crawler.settings.get("MYSQL_PASSWORD", ""),
            "database": crawler.settings.get("MYSQL_DATABASE", "crawler_db"),
        })

    def open_spider(self, spider):
        import pymysql
        self.conn = pymysql.connect(**self.db_config, charset="utf8mb4")
        self.cursor = self.conn.cursor()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        sql = "INSERT INTO products (title, price, rating, url) VALUES (%s, %s, %s, %s)"
        try:
            self.cursor.execute(sql, (
                adapter.get("title"),
                adapter.get("price", 0),
                adapter.get("rating", 0),
                adapter.get("url"),
            ))
            self.conn.commit()
        except Exception as e:
            self.conn.rollback()
            spider.logger.error(f"MySQL插入失败: {e}")
        return item

    def close_spider(self, spider):
        self.cursor.close()
        self.conn.close()

2.3 settings.py配置

# MongoDB配置
MONGO_URI = "mongodb://localhost:27017"
MONGO_DATABASE = "crawler_db"

# MySQL配置
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
MYSQL_USER = "root"
MYSQL_PASSWORD = "your_password"
MYSQL_DATABASE = "crawler_db"

# 启用Pipeline
ITEM_PIPELINES = {
    "myproject.pipelines.ValidationPipeline": 100,
    "myproject.pipelines.CleaningPipeline": 200,
    "myproject.pipelines.DuplicatesPipeline": 300,
    "myproject.pipelines.MongoDBPipeline": 400,
}

三、图片下载Pipeline

Scrapy内置了图片下载功能,只需简单配置:

3.1 安装依赖

pip install pillow -i https://pypi.tuna.tsinghua.edu.cn/simple

3.2 启用图片Pipeline

# settings.py
ITEM_PIPELINES = {
    "scrapy.pipelines.images.ImagesPipeline": 1,  # 内置图片下载
    "myproject.pipelines.ProductPipeline": 100,
}

IMAGES_STORE = "images"  # 图片保存目录
IMAGES_THUMBS = {         # 生成缩略图
    "small": (100, 100),
    "big": (400, 400),
}
IMAGES_MIN_WIDTH = 100    # 最小宽度
IMAGES_MIN_HEIGHT = 100   # 最小高度

3.3 Item定义图片字段

import scrapy

class ProductItem(scrapy.Item):
    title = scrapy.Field()
    price = scrapy.Field()
    image_urls = scrapy.Field()       # 图片URL列表
    images = scrapy.Field()          # 下载后的图片信息(Scrapy自动填充)
    image_paths = scrapy.Field()     # 图片保存路径

3.4 自定义图片Pipeline

from scrapy.pipelines.images import ImagesPipeline
from scrapy.http import Request
from itemadapter import ItemAdapter

class CustomImagePipeline(ImagesPipeline):
    """自定义图片下载Pipeline"""

    def get_media_requests(self, item, info):
        """下载图片"""
        adapter = ItemAdapter(item)
        image_urls = adapter.get("image_urls", [])
        for image_url in image_urls:
            yield Request(
                image_url,
                meta={"item_title": adapter.get("title")}  # 传递额外信息
            )

    def file_path(self, request, response=None, info=None, *, item=None):
        """自定义文件名"""
        adapter = ItemAdapter(item)
        title = adapter.get("title", "unknown")
        filename = f"{title}_{request.url.split('/')[-1]}"
        # 清理文件名中的非法字符
        filename = "".join(c for c in filename if c not in '<>:"/\\|?*')
        return f"products/{filename}"

    def image_downloaded(self, response, request, info, *, item=None):
        """图片下载完成后的处理"""
        adapter = ItemAdapter(item)
        print(f"图片下载完成: {adapter.get('title')}")
        return super().image_downloaded(response, request, info, item=item)

四、知识卡

功能 代码
ItemLoader from scrapy.loader import ItemLoader
TakeFirst 取第一个非空值
MapCompose 逐值应用函数
Join 拼接为字符串
Pipeline顺序 settings里数字越小越先执行
返回None 丢弃该Item
from_crawler 从settings初始化Pipeline
open_spider 爬虫启动时调用
close_spider 爬虫结束时调用
ImagesPipeline Scrapy内置图片下载
IMAGES_STORE 图片保存目录

五、课后作业

必做题:

  1. 用ItemLoader重构爬虫的数据提取逻辑
  2. 编写ValidationPipeline验证必填字段
  3. 配置MongoDBPipeline实现数据存储

选做题:

  1. 实现自定义图片下载Pipeline
  2. 实现一个图片去重Pipeline

完成作业的同学,把运行截图发到评论区!


Item + Pipeline = Scrapy数据处理的黄金组合。 Item定义数据结构,Pipeline负责处理逻辑,分工明确,易于维护。

本篇要点:

  • Item Loader数据加载器
  • 内置处理器(TakeFirst/Join/MapCompose)
  • 多Pipeline组合
  • 数据验证、清洗、去重、存储Pipeline
  • MongoDB/MySQL存储Pipeline
  • Scrapy图片下载Pipeline

下一篇学习Scrapy中间件与分布式爬虫——让爬虫规模化。

有问题欢迎评论区留言,大家一起讨论!


标签:Python | Scrapy | Item | Pipeline | ItemLoader | 数据清洗 | 图片下载

更多推荐