author: 专注Python实战,分享爬虫与数据分析干货
title: Python爬虫实战⑭|Scrapy实战——新闻网站全站爬取
update: 2026-04-26
tags: Python,Scrapy,爬虫实战,新闻网站,全站爬取,分布式

作者:专注Python实战,分享爬虫与数据分析干货
更新时间:2026年4月
适合人群:已掌握Scrapy、想完成完整项目的开发者


前言:做一个能上线的爬虫项目

前面学了Scrapy的各个组件,但都是零散知识。今天我们把这些知识串起来,做一个完整项目:爬取新闻网站的全站数据。

项目目标:

  • 爬取多个新闻网站的标题、正文、时间、来源
  • 自动发现新文章并跟进
  • 数据清洗后存入MongoDB
  • 完整的日志和异常处理
  • 分布式部署准备

一、项目规划

1.1 选择目标网站

我们以一个模拟的新闻网站为例子。实际项目中,选择robots.txt允许爬取的新闻站点。

1.2 数据模型设计

# items.py

import scrapy

class NewsItem(scrapy.Item):
    """新闻数据模型"""
    title = scrapy.Field()        # 标题
    content = scrapy.Field()      # 正文内容
    publish_time = scrapy.Field() # 发布时间
    author = scrapy.Field()       # 作者/来源
    category = scrapy.Field()    # 分类
    tags = scrapy.Field()        # 标签
    url = scrapy.Field()          # 原文链接
    source = scrapy.Field()       # 来源网站
    crawl_time = scrapy.Field()    # 爬取时间
    images = scrapy.Field()       # 图片列表


class ArticleItem(scrapy.Item):
    """文章摘要数据模型"""
    article_id = scrapy.Field()   # 文章ID(从URL提取)
    title = scrapy.Field()
    summary = scrapy.Field()      # 摘要
    publish_time = scrapy.Field()
    url = scrapy.Field()
    thumbnail = scrapy.Field()     # 缩略图
    category = scrapy.Field()

二、编写爬虫

2.1 列表页Spider

# spiders/news_spider.py

import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from news_project.items import NewsItem, ArticleItem
from datetime import datetime
import re


class NewsListSpider(CrawlSpider):
    """新闻列表爬虫:自动发现文章链接"""

    name = "news_list"

    allowed_domains = ["news.example.com"]

    start_urls = [
        "https://news.example.com/",
        "https://news.example.com/tech/",
        "https://news.example.com/business/",
        "https://news.example.com/culture/",
    ]

    rules = (
        # 规则1:列表页翻页
        Rule(
            LinkExtractor(
                allow=r"/page/\d+",
                restrict_css=".pagination, .page-nav"
            ),
            follow=True,
        ),
        # 规则2:文章详情页
        Rule(
            LinkExtractor(
                allow=r"/article/\d+\.html",
                restrict_css=".article-list, .news-list, main"
            ),
            callback="parse_article",
            follow=False,
        ),
    )

    custom_settings = {
        "DOWNLOAD_DELAY": 2,
        "CONCURRENT_REQUESTS_PER_DOMAIN": 2,
    }

    def parse_article(self, response):
        """解析文章详情页"""
        item = ArticleItem()

        # 文章ID
        match = re.search(r"/article/(\d+)\.html", response.url)
        item["article_id"] = match.group(1) if match else ""

        # 标题
        title = response.css("h1.article-title::text").get()
        if not title:
            title = response.css("h1::text").get()
        item["title"] = title.strip() if title else ""

        # 摘要
        summary = response.css("meta[name='description']::attr(content)").get()
        if not summary:
            summary = response.css(".article-summary::text").get()
        item["summary"] = summary.strip() if summary else ""

        # 发布时间
        time_str = response.css("time.publish-time::attr(datetime)").get()
        if not time_str:
            time_str = response.css("span.time::text").get()
        item["publish_time"] = self._parse_time(time_str)

        # 分类
        category = response.css(".article-category a::text").get()
        if not category:
            category = response.css("nav.breadcrumb span::text").get()
        item["category"] = category.strip() if category else ""

        # 缩略图
        thumbnail = response.css("meta[property='og:image']::attr(content)").get()
        if not thumbnail:
            thumbnail = response.css(".article-cover img::attr(src)").get()
        item["thumbnail"] = response.urljoin(thumbnail) if thumbnail else ""

        # 标签
        tags = response.css(".article-tags a::text").getall()
        item["tags"] = [t.strip() for t in tags if t.strip()]

        # 来源网站
        item["source"] = "示例新闻网站"

        # 原始URL
        item["url"] = response.url

        yield item

        # 同时请求完整内容(如果需要的话)
        yield scrapy.Request(
            response.url,
            callback=self.parse_full_content,
            meta={"item": item},
            dont_filter=True,
        )

    def parse_full_content(self, response):
        """解析文章完整内容(正文)"""
        item = response.meta["item"]

        # 提取正文段落
        paragraphs = response.css(".article-content p::text").getall()
        if not paragraphs:
            paragraphs = response.css("article p::text").getall()
        if not paragraphs:
            paragraphs = response.css(".content p::text").getall()

        item["content"] = "\n".join(p.strip() for p in paragraphs if p.strip())
        item["crawl_time"] = datetime.now().isoformat()

        yield item

    def _parse_time(self, time_str):
        """解析时间字符串"""
        if not time_str:
            return ""

        # 常见格式
        patterns = [
            (r"\d{4}-\d{2}-\d{2}", "%Y-%m-%d"),
            (r"\d{4}/\d{2}/\d{2}", "%Y/%m/%d"),
            (r"\d{4}年\d{1,2}月\d{1,2}日", "%Y年%m月%d日"),
        ]

        for pattern, fmt in patterns:
            match = re.search(pattern, time_str)
            if match:
                try:
                    return datetime.strptime(match.group(), fmt).isoformat()
                except ValueError:
                    continue

        return time_str

2.2 详情页Spider

# spiders/news_detail_spider.py

import scrapy
from news_project.items import NewsItem


class NewsDetailSpider(scrapy.Spider):
    """新闻详情爬虫:根据URL列表抓取详情"""

    name = "news_detail"

    def __init__(self, url_file=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.url_file = url_file

    def start_requests(self):
        """从文件读取URL列表"""
        if self.url_file:
            with open(self.url_file, "r", encoding="utf-8") as f:
                urls = [line.strip() for line in f if line.strip()]
            for url in urls:
                yield scrapy.Request(url, callback=self.parse)
        else:
            self.logger.warning("未提供URL文件,跳过")

    def parse(self, response):
        """解析新闻详情"""
        item = NewsItem()

        item["url"] = response.url
        item["title"] = response.css("h1::text").get() or ""
        item["content"] = " ".join(
            p.strip() for p in response.css(".article p::text").getall()
        )
        item["publish_time"] = response.css("time::attr(datetime)").get() or ""
        item["source"] = response.css(".source::text").get() or ""

        yield item

三、Pipeline编写

# pipelines.py

import pymongo
from datetime import datetime
from itemadapter import ItemAdapter
import hashlib


class NewsValidationPipeline:
    """数据验证"""

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)

        # 必填字段检查
        if not adapter.get("title"):
            spider.logger.warning(f"缺少标题,跳过: {adapter.get('url')}")
            return None

        if not adapter.get("url"):
            return None

        # 生成文章ID
        if not adapter.get("article_id"):
            url = adapter.get("url", "")
            adapter["article_id"] = hashlib.md5(url.encode()).hexdigest()[:16]

        # 清洗内容
        content = adapter.get("content", "")
        if content:
            adapter["content"] = content.strip()
            # 限制内容长度
            if len(adapter["content"]) > 50000:
                adapter["content"] = adapter["content"][:50000]

        return item


class MongoDBStoragePipeline:
    """MongoDB存储"""

    def __init__(self, mongo_uri, mongo_db):
        self.mongo_uri = mongo_uri
        self.mongo_db = mongo_db
        self.client = None
        self.db = None

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mongo_uri=crawler.settings.get("MONGO_URI", "mongodb://localhost:27017"),
            mongo_db=crawler.settings.get("MONGO_DATABASE", "news_crawler"),
        )

    def open_spider(self, spider):
        self.client = pymongo.MongoClient(self.mongo_uri)
        self.db = self.client[self.mongo_db]
        spider.logger.info(f"连接MongoDB: {self.mongo_db}")

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        doc = dict(adapter)

        # 按Spider名称分集合
        collection_name = f"{spider.name}_items"

        # 去重插入
        try:
            self.db[collection_name].update_one(
                {"article_id": doc.get("article_id", "")},
                {"$set": doc},
                upsert=True,
            )
            spider.logger.debug(f"存储: {doc.get('title', '')[:30]}")
        except Exception as e:
            spider.logger.error(f"存储失败: {e}")

        return item

    def close_spider(self, spider):
        self.client.close()
        spider.logger.info("MongoDB连接关闭")


class DuplicatesFilterPipeline:
    """URL去重Pipeline"""

    def __init__(self):
        self.seen_urls = set()

    def open_spider(self, spider):
        self.seen_urls.clear()

    def process_item(self, item, spider):
        adapter = ItemAdapter(item)
        url = adapter.get("url", "")

        if url in self.seen_urls:
            spider.logger.debug(f"重复URL,跳过: {url}")
            return None

        self.seen_urls.add(url)
        return item

四、Settings配置

# settings.py

BOT_NAME = "news_project"
SPIDER_MODULES = ["news_project.spiders"]
NEWSPIDER_MODULE = "news_project.spiders"

ROBOTSTXT_OBEY = True

DEFAULT_REQUEST_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
                  "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}

DOWNLOAD_DELAY = 2
CONCURRENT_REQUESTS_PER_DOMAIN = 2

# 自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10

# MongoDB配置
MONGO_URI = "mongodb://localhost:27017"
MONGO_DATABASE = "news_crawler"

# Pipeline配置
ITEM_PIPELINES = {
    "news_project.pipelines.DuplicatesFilterPipeline": 100,
    "news_project.pipelines.NewsValidationPipeline": 200,
    "news_project.pipelines.MongoDBStoragePipeline": 300,
}

# 日志
LOG_FILE = "news_crawler.log"
LOG_LEVEL = "INFO"

五、运行与部署

# 本地运行
cd news_project
scrapy crawl news_list -s JOBDIR=crawls/news_list

# -s JOBDIR:保存爬虫状态,中断后可继续
# scrapy crawl news_list -s JOBDIR=crawls/news_list

# 导出数据
scrapy crawl news_list -o news.json -s JOBDIR=crawls/news_list

# Linux后台运行(nohup)
nohup scrapy crawl news_list -s JOBDIR=crawls/news_list > crawler.log 2>&1 &

# 查看日志
tail -f crawler.log

# 查看爬虫状态
scrapy list

六、监控与维护

# monitor.py — 爬虫健康监控

import pymongo
from datetime import datetime, timedelta

def check_crawler_health():
    """检查爬虫运行状态"""
    client = pymongo.MongoClient("mongodb://localhost:27017")
    db = client["news_crawler"]

    collections = ["news_list_items"]

    for col_name in collections:
        col = db[col_name]
        now = datetime.now()
        one_hour_ago = now - timedelta(hours=1)
        one_day_ago = now - timedelta(days=1)

        hour_count = col.count_documents({"crawl_time": {"$gte": one_hour_ago.isoformat()}})
        day_count = col.count_documents({"crawl_time": {"$gte": one_day_ago.isoformat()}})

        print(f"\n爬虫: {col_name}")
        print(f"  最近1小时: {hour_count} 条")
        print(f"  最近24小时: {day_count} 条")

        if hour_count == 0:
            print("  ⚠️ 警告: 过去1小时无新数据,可能爬虫已停止!")

    client.close()

if __name__ == "__main__":
    check_crawler_health()

七、知识卡

配置 说明
CrawlSpider + Rules 自动发现链接,无需手动翻页
LinkExtractor + restrict_css 在指定区域内提取链接
follow=True 跟进链接继续爬取
follow=False 不跟进,只解析当前页
JOBDIR 保存爬虫状态,中断可续
ROBOTSTXT_OBEY 遵守robots.txt
MONGO_URI MongoDB连接地址
upsert=True 不存在则插入,存在则更新

八、课后作业

必做题:

  1. 用Scrapy爬取任意新闻网站的文章列表
  2. 实现MongoDB存储Pipeline
  3. 配置自动限速和请求延迟

选做题:

  1. 实现爬虫健康监控脚本
  2. 部署到Linux服务器并设置定时任务

有问题欢迎评论区留言,大家一起讨论!


标签:Python | Scrapy | 爬虫实战 | 新闻网站 | 全站爬取 | MongoDB

更多推荐