Python爬虫实战⑫|Scrapy Item与Pipeline,数据提取与存储
·
author: 专注Python实战,分享爬虫与数据分析干货
title: Python爬虫实战⑫|Scrapy Item与Pipeline,数据提取与存储
update: 2026-04-26
tags: Python,爬虫,Scrapy,Item,Pipeline,数据清洗,图片下载
作者:专注Python实战,分享爬虫与数据分析干货
更新时间:2026年4月
适合人群:已掌握Scrapy基础、想深入数据处理流程的开发者
前言:Scrapy的数据流,你真的懂吗?
Request → Downloader → Response → Spider.parse()
↓
yield Item
↓
Item Pipeline(多个)
↓
保存到文件/数据库
Scrapy的数据流非常清晰:Spider解析页面 → 产出Item → Pipeline处理 → 最终存储。
今天深入讲解Item和Pipeline,写出生产级的Scrapy爬虫。
一、Item进阶用法
1.1 Item Loader(Item加载器)
不用Item Loader时,提取逻辑全写在Spider里:
# 传统方式:提取逻辑全在Spider
def parse(self, response):
item = MyItem()
item["title"] = response.xpath("//h1/text()").get()
item["price"] = response.xpath("//span[@class='price']/text()").get()
item["desc"] = response.xpath("//div[@class='desc']//text()").getall()
yield item
Item Loader方式: 提取逻辑和数据处理分离,代码更清晰:
from scrapy.loader import ItemLoader
from scrapy.loader.processors import TakeFirst, MapCompose, Join
class ProductLoader(ItemLoader):
"""产品数据加载器"""
# 默认输出处理器:取第一个值
default_output_processor = TakeFirst()
# 各字段的输入处理器(数据清洗)
title_in = MapCompose(str.strip, str.title)
price_in = MapCompose(str.strip, lambda x: x.replace("¥", ""))
desc_in = Join() # 将多个文本拼接成一个
# 各字段的输出处理器
title_out = TakeFirst()
price_out = TakeFirst()
# Spider中使用ItemLoader
def parse_product(self, response):
loader = ProductLoader(item=ProductItem(), response=response)
# 添加选择器(会自动执行in处理器)
loader.add_xpath("title", "//h1[@class='product-title']/text()")
loader.add_xpath("price", "//span[@class='price']/text()")
loader.add_xpath("desc", "//div[@class='description']//text()")
loader.add_xpath("images", "//div[@class='gallery']//img/@src")
return loader.load_item()
1.2 内置处理器
from scrapy.loader.processors import (
TakeFirst, # 取第一个
Join, # 拼接成字符串
MapCompose, # 对每个值应用函数
Compose, # 组合多个函数
Identity, # 原样返回
SelectJmes, # JSON字段提取
)
# TakeFirst:返回第一个非空值
loader.add_xpath("title", "//h1/text()")
loader.add_xpath("title2", "//h2/text()")
# → 取第一个有值的字段
# Join:拼接所有值
loader.add_xpath("text", "//p/text()")
# → 将所有p标签的文本拼接
# MapCompose:逐个应用函数
def clean_price(x):
return x.replace("¥", "").replace(",", "").strip()
loader.add_xpath("price", "//span/text()", MapCompose(clean_price))
二、Pipeline进阶用法
2.1 多Pipeline组合
# settings.py
ITEM_PIPELINES = {
# 数字越小越先执行
"myproject.pipelines.ValidationPipeline": 100, # 1. 数据验证
"myproject.pipelines.CleaningPipeline": 200, # 2. 数据清洗
"myproject.pipelines.DuplicatesPipeline": 300, # 3. 去重
"myproject.pipelines.ImageDownloadPipeline": 400, # 4. 图片下载
"myproject.pipelines.MongoDBPipeline": 500, # 5. MongoDB存储
"myproject.pipelines.MySQLPipeline": 600, # 6. MySQL存储
}
2.2 完整Pipeline实例
import pymongo
import json
from itemadapter import ItemAdapter
from scrapy import Item, Field
class ValidationPipeline:
"""验证字段是否完整"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 检查必填字段
required_fields = ["title", "url"]
for field in required_fields:
if not adapter.get(field):
spider.logger.warning(f"缺少必填字段 {field},丢弃")
return None # 返回None会丢弃此Item
# 检查数据类型
if adapter.get("price"):
try:
price = float(adapter["price"])
if price <= 0:
spider.logger.warning(f"价格异常: {price},丢弃")
return None
adapter["price"] = price
except ValueError:
adapter["price"] = 0.0
return item
class CleaningPipeline:
"""数据清洗"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 清理文本字段
for field in ["title", "desc", "author"]:
value = adapter.get(field)
if value:
adapter[field] = str(value).strip()
# 去除多余空格
adapter[field] = " ".join(adapter[field].split())
# 评分标准化
rating = adapter.get("rating")
if rating:
try:
adapter["rating"] = round(float(rating), 1)
except (ValueError, TypeError):
adapter["rating"] = 0.0
# 处理列表字段
tags = adapter.get("tags")
if tags and isinstance(tags, list):
adapter["tags"] = [t.strip() for t in tags if t.strip()]
return item
class DuplicatesPipeline:
"""去重Pipeline"""
def __init__(self):
self.seen_ids = set()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
item_id = adapter.get("url") or adapter.get("title")
if item_id in self.seen_ids:
spider.logger.debug(f"重复数据,已丢弃: {item_id}")
return None
self.seen_ids.add(item_id)
return item
class MongoDBPipeline:
"""MongoDB存储Pipeline"""
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
"""从settings获取配置"""
return cls(
mongo_uri=crawler.settings.get("MONGO_URI", "mongodb://localhost:27017"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "crawler_db"),
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def process_item(self, item, spider):
adapter = ItemAdapter(item)
self.db[spider.name].insert_one(dict(adapter))
return item
def close_spider(self, spider):
self.client.close()
class MySQLPipeline:
"""MySQL存储Pipeline"""
def __init__(self, db_config):
self.db_config = db_config
@classmethod
def from_crawler(cls, crawler):
return cls(db_config={
"host": crawler.settings.get("MYSQL_HOST", "localhost"),
"port": crawler.settings.get("MYSQL_PORT", 3306),
"user": crawler.settings.get("MYSQL_USER", "root"),
"password": crawler.settings.get("MYSQL_PASSWORD", ""),
"database": crawler.settings.get("MYSQL_DATABASE", "crawler_db"),
})
def open_spider(self, spider):
import pymysql
self.conn = pymysql.connect(**self.db_config, charset="utf8mb4")
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
sql = "INSERT INTO products (title, price, rating, url) VALUES (%s, %s, %s, %s)"
try:
self.cursor.execute(sql, (
adapter.get("title"),
adapter.get("price", 0),
adapter.get("rating", 0),
adapter.get("url"),
))
self.conn.commit()
except Exception as e:
self.conn.rollback()
spider.logger.error(f"MySQL插入失败: {e}")
return item
def close_spider(self, spider):
self.cursor.close()
self.conn.close()
2.3 settings.py配置
# MongoDB配置
MONGO_URI = "mongodb://localhost:27017"
MONGO_DATABASE = "crawler_db"
# MySQL配置
MYSQL_HOST = "localhost"
MYSQL_PORT = 3306
MYSQL_USER = "root"
MYSQL_PASSWORD = "your_password"
MYSQL_DATABASE = "crawler_db"
# 启用Pipeline
ITEM_PIPELINES = {
"myproject.pipelines.ValidationPipeline": 100,
"myproject.pipelines.CleaningPipeline": 200,
"myproject.pipelines.DuplicatesPipeline": 300,
"myproject.pipelines.MongoDBPipeline": 400,
}
三、图片下载Pipeline
Scrapy内置了图片下载功能,只需简单配置:
3.1 安装依赖
pip install pillow -i https://pypi.tuna.tsinghua.edu.cn/simple
3.2 启用图片Pipeline
# settings.py
ITEM_PIPELINES = {
"scrapy.pipelines.images.ImagesPipeline": 1, # 内置图片下载
"myproject.pipelines.ProductPipeline": 100,
}
IMAGES_STORE = "images" # 图片保存目录
IMAGES_THUMBS = { # 生成缩略图
"small": (100, 100),
"big": (400, 400),
}
IMAGES_MIN_WIDTH = 100 # 最小宽度
IMAGES_MIN_HEIGHT = 100 # 最小高度
3.3 Item定义图片字段
import scrapy
class ProductItem(scrapy.Item):
title = scrapy.Field()
price = scrapy.Field()
image_urls = scrapy.Field() # 图片URL列表
images = scrapy.Field() # 下载后的图片信息(Scrapy自动填充)
image_paths = scrapy.Field() # 图片保存路径
3.4 自定义图片Pipeline
from scrapy.pipelines.images import ImagesPipeline
from scrapy.http import Request
from itemadapter import ItemAdapter
class CustomImagePipeline(ImagesPipeline):
"""自定义图片下载Pipeline"""
def get_media_requests(self, item, info):
"""下载图片"""
adapter = ItemAdapter(item)
image_urls = adapter.get("image_urls", [])
for image_url in image_urls:
yield Request(
image_url,
meta={"item_title": adapter.get("title")} # 传递额外信息
)
def file_path(self, request, response=None, info=None, *, item=None):
"""自定义文件名"""
adapter = ItemAdapter(item)
title = adapter.get("title", "unknown")
filename = f"{title}_{request.url.split('/')[-1]}"
# 清理文件名中的非法字符
filename = "".join(c for c in filename if c not in '<>:"/\\|?*')
return f"products/{filename}"
def image_downloaded(self, response, request, info, *, item=None):
"""图片下载完成后的处理"""
adapter = ItemAdapter(item)
print(f"图片下载完成: {adapter.get('title')}")
return super().image_downloaded(response, request, info, item=item)
四、知识卡
| 功能 | 代码 |
|---|---|
| ItemLoader | from scrapy.loader import ItemLoader |
| TakeFirst | 取第一个非空值 |
| MapCompose | 逐值应用函数 |
| Join | 拼接为字符串 |
| Pipeline顺序 | settings里数字越小越先执行 |
| 返回None | 丢弃该Item |
| from_crawler | 从settings初始化Pipeline |
| open_spider | 爬虫启动时调用 |
| close_spider | 爬虫结束时调用 |
| ImagesPipeline | Scrapy内置图片下载 |
| IMAGES_STORE | 图片保存目录 |
五、课后作业
必做题:
- 用ItemLoader重构爬虫的数据提取逻辑
- 编写ValidationPipeline验证必填字段
- 配置MongoDBPipeline实现数据存储
选做题:
- 实现自定义图片下载Pipeline
- 实现一个图片去重Pipeline
完成作业的同学,把运行截图发到评论区!
Item + Pipeline = Scrapy数据处理的黄金组合。 Item定义数据结构,Pipeline负责处理逻辑,分工明确,易于维护。
本篇要点:
- Item Loader数据加载器
- 内置处理器(TakeFirst/Join/MapCompose)
- 多Pipeline组合
- 数据验证、清洗、去重、存储Pipeline
- MongoDB/MySQL存储Pipeline
- Scrapy图片下载Pipeline
下一篇学习Scrapy中间件与分布式爬虫——让爬虫规模化。
有问题欢迎评论区留言,大家一起讨论!
标签:Python | Scrapy | Item | Pipeline | ItemLoader | 数据清洗 | 图片下载
更多推荐
所有评论(0)