Python爬虫实战⑭|Scrapy实战,新闻网站全站爬取
·
author: 专注Python实战,分享爬虫与数据分析干货
title: Python爬虫实战⑭|Scrapy实战——新闻网站全站爬取
update: 2026-04-26
tags: Python,Scrapy,爬虫实战,新闻网站,全站爬取,分布式
作者:专注Python实战,分享爬虫与数据分析干货
更新时间:2026年4月
适合人群:已掌握Scrapy、想完成完整项目的开发者
前言:做一个能上线的爬虫项目
前面学了Scrapy的各个组件,但都是零散知识。今天我们把这些知识串起来,做一个完整项目:爬取新闻网站的全站数据。
项目目标:
- 爬取多个新闻网站的标题、正文、时间、来源
- 自动发现新文章并跟进
- 数据清洗后存入MongoDB
- 完整的日志和异常处理
- 分布式部署准备
一、项目规划
1.1 选择目标网站
我们以一个模拟的新闻网站为例子。实际项目中,选择robots.txt允许爬取的新闻站点。
1.2 数据模型设计
# items.py
import scrapy
class NewsItem(scrapy.Item):
"""新闻数据模型"""
title = scrapy.Field() # 标题
content = scrapy.Field() # 正文内容
publish_time = scrapy.Field() # 发布时间
author = scrapy.Field() # 作者/来源
category = scrapy.Field() # 分类
tags = scrapy.Field() # 标签
url = scrapy.Field() # 原文链接
source = scrapy.Field() # 来源网站
crawl_time = scrapy.Field() # 爬取时间
images = scrapy.Field() # 图片列表
class ArticleItem(scrapy.Item):
"""文章摘要数据模型"""
article_id = scrapy.Field() # 文章ID(从URL提取)
title = scrapy.Field()
summary = scrapy.Field() # 摘要
publish_time = scrapy.Field()
url = scrapy.Field()
thumbnail = scrapy.Field() # 缩略图
category = scrapy.Field()
二、编写爬虫
2.1 列表页Spider
# spiders/news_spider.py
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from news_project.items import NewsItem, ArticleItem
from datetime import datetime
import re
class NewsListSpider(CrawlSpider):
"""新闻列表爬虫:自动发现文章链接"""
name = "news_list"
allowed_domains = ["news.example.com"]
start_urls = [
"https://news.example.com/",
"https://news.example.com/tech/",
"https://news.example.com/business/",
"https://news.example.com/culture/",
]
rules = (
# 规则1:列表页翻页
Rule(
LinkExtractor(
allow=r"/page/\d+",
restrict_css=".pagination, .page-nav"
),
follow=True,
),
# 规则2:文章详情页
Rule(
LinkExtractor(
allow=r"/article/\d+\.html",
restrict_css=".article-list, .news-list, main"
),
callback="parse_article",
follow=False,
),
)
custom_settings = {
"DOWNLOAD_DELAY": 2,
"CONCURRENT_REQUESTS_PER_DOMAIN": 2,
}
def parse_article(self, response):
"""解析文章详情页"""
item = ArticleItem()
# 文章ID
match = re.search(r"/article/(\d+)\.html", response.url)
item["article_id"] = match.group(1) if match else ""
# 标题
title = response.css("h1.article-title::text").get()
if not title:
title = response.css("h1::text").get()
item["title"] = title.strip() if title else ""
# 摘要
summary = response.css("meta[name='description']::attr(content)").get()
if not summary:
summary = response.css(".article-summary::text").get()
item["summary"] = summary.strip() if summary else ""
# 发布时间
time_str = response.css("time.publish-time::attr(datetime)").get()
if not time_str:
time_str = response.css("span.time::text").get()
item["publish_time"] = self._parse_time(time_str)
# 分类
category = response.css(".article-category a::text").get()
if not category:
category = response.css("nav.breadcrumb span::text").get()
item["category"] = category.strip() if category else ""
# 缩略图
thumbnail = response.css("meta[property='og:image']::attr(content)").get()
if not thumbnail:
thumbnail = response.css(".article-cover img::attr(src)").get()
item["thumbnail"] = response.urljoin(thumbnail) if thumbnail else ""
# 标签
tags = response.css(".article-tags a::text").getall()
item["tags"] = [t.strip() for t in tags if t.strip()]
# 来源网站
item["source"] = "示例新闻网站"
# 原始URL
item["url"] = response.url
yield item
# 同时请求完整内容(如果需要的话)
yield scrapy.Request(
response.url,
callback=self.parse_full_content,
meta={"item": item},
dont_filter=True,
)
def parse_full_content(self, response):
"""解析文章完整内容(正文)"""
item = response.meta["item"]
# 提取正文段落
paragraphs = response.css(".article-content p::text").getall()
if not paragraphs:
paragraphs = response.css("article p::text").getall()
if not paragraphs:
paragraphs = response.css(".content p::text").getall()
item["content"] = "\n".join(p.strip() for p in paragraphs if p.strip())
item["crawl_time"] = datetime.now().isoformat()
yield item
def _parse_time(self, time_str):
"""解析时间字符串"""
if not time_str:
return ""
# 常见格式
patterns = [
(r"\d{4}-\d{2}-\d{2}", "%Y-%m-%d"),
(r"\d{4}/\d{2}/\d{2}", "%Y/%m/%d"),
(r"\d{4}年\d{1,2}月\d{1,2}日", "%Y年%m月%d日"),
]
for pattern, fmt in patterns:
match = re.search(pattern, time_str)
if match:
try:
return datetime.strptime(match.group(), fmt).isoformat()
except ValueError:
continue
return time_str
2.2 详情页Spider
# spiders/news_detail_spider.py
import scrapy
from news_project.items import NewsItem
class NewsDetailSpider(scrapy.Spider):
"""新闻详情爬虫:根据URL列表抓取详情"""
name = "news_detail"
def __init__(self, url_file=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url_file = url_file
def start_requests(self):
"""从文件读取URL列表"""
if self.url_file:
with open(self.url_file, "r", encoding="utf-8") as f:
urls = [line.strip() for line in f if line.strip()]
for url in urls:
yield scrapy.Request(url, callback=self.parse)
else:
self.logger.warning("未提供URL文件,跳过")
def parse(self, response):
"""解析新闻详情"""
item = NewsItem()
item["url"] = response.url
item["title"] = response.css("h1::text").get() or ""
item["content"] = " ".join(
p.strip() for p in response.css(".article p::text").getall()
)
item["publish_time"] = response.css("time::attr(datetime)").get() or ""
item["source"] = response.css(".source::text").get() or ""
yield item
三、Pipeline编写
# pipelines.py
import pymongo
from datetime import datetime
from itemadapter import ItemAdapter
import hashlib
class NewsValidationPipeline:
"""数据验证"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 必填字段检查
if not adapter.get("title"):
spider.logger.warning(f"缺少标题,跳过: {adapter.get('url')}")
return None
if not adapter.get("url"):
return None
# 生成文章ID
if not adapter.get("article_id"):
url = adapter.get("url", "")
adapter["article_id"] = hashlib.md5(url.encode()).hexdigest()[:16]
# 清洗内容
content = adapter.get("content", "")
if content:
adapter["content"] = content.strip()
# 限制内容长度
if len(adapter["content"]) > 50000:
adapter["content"] = adapter["content"][:50000]
return item
class MongoDBStoragePipeline:
"""MongoDB存储"""
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client = None
self.db = None
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI", "mongodb://localhost:27017"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "news_crawler"),
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
spider.logger.info(f"连接MongoDB: {self.mongo_db}")
def process_item(self, item, spider):
adapter = ItemAdapter(item)
doc = dict(adapter)
# 按Spider名称分集合
collection_name = f"{spider.name}_items"
# 去重插入
try:
self.db[collection_name].update_one(
{"article_id": doc.get("article_id", "")},
{"$set": doc},
upsert=True,
)
spider.logger.debug(f"存储: {doc.get('title', '')[:30]}")
except Exception as e:
spider.logger.error(f"存储失败: {e}")
return item
def close_spider(self, spider):
self.client.close()
spider.logger.info("MongoDB连接关闭")
class DuplicatesFilterPipeline:
"""URL去重Pipeline"""
def __init__(self):
self.seen_urls = set()
def open_spider(self, spider):
self.seen_urls.clear()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
url = adapter.get("url", "")
if url in self.seen_urls:
spider.logger.debug(f"重复URL,跳过: {url}")
return None
self.seen_urls.add(url)
return item
四、Settings配置
# settings.py
BOT_NAME = "news_project"
SPIDER_MODULES = ["news_project.spiders"]
NEWSPIDER_MODULE = "news_project.spiders"
ROBOTSTXT_OBEY = True
DEFAULT_REQUEST_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
DOWNLOAD_DELAY = 2
CONCURRENT_REQUESTS_PER_DOMAIN = 2
# 自动限速
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 1
AUTOTHROTTLE_MAX_DELAY = 10
# MongoDB配置
MONGO_URI = "mongodb://localhost:27017"
MONGO_DATABASE = "news_crawler"
# Pipeline配置
ITEM_PIPELINES = {
"news_project.pipelines.DuplicatesFilterPipeline": 100,
"news_project.pipelines.NewsValidationPipeline": 200,
"news_project.pipelines.MongoDBStoragePipeline": 300,
}
# 日志
LOG_FILE = "news_crawler.log"
LOG_LEVEL = "INFO"
五、运行与部署
# 本地运行
cd news_project
scrapy crawl news_list -s JOBDIR=crawls/news_list
# -s JOBDIR:保存爬虫状态,中断后可继续
# scrapy crawl news_list -s JOBDIR=crawls/news_list
# 导出数据
scrapy crawl news_list -o news.json -s JOBDIR=crawls/news_list
# Linux后台运行(nohup)
nohup scrapy crawl news_list -s JOBDIR=crawls/news_list > crawler.log 2>&1 &
# 查看日志
tail -f crawler.log
# 查看爬虫状态
scrapy list
六、监控与维护
# monitor.py — 爬虫健康监控
import pymongo
from datetime import datetime, timedelta
def check_crawler_health():
"""检查爬虫运行状态"""
client = pymongo.MongoClient("mongodb://localhost:27017")
db = client["news_crawler"]
collections = ["news_list_items"]
for col_name in collections:
col = db[col_name]
now = datetime.now()
one_hour_ago = now - timedelta(hours=1)
one_day_ago = now - timedelta(days=1)
hour_count = col.count_documents({"crawl_time": {"$gte": one_hour_ago.isoformat()}})
day_count = col.count_documents({"crawl_time": {"$gte": one_day_ago.isoformat()}})
print(f"\n爬虫: {col_name}")
print(f" 最近1小时: {hour_count} 条")
print(f" 最近24小时: {day_count} 条")
if hour_count == 0:
print(" ⚠️ 警告: 过去1小时无新数据,可能爬虫已停止!")
client.close()
if __name__ == "__main__":
check_crawler_health()
七、知识卡
| 配置 | 说明 |
|---|---|
| CrawlSpider + Rules | 自动发现链接,无需手动翻页 |
| LinkExtractor + restrict_css | 在指定区域内提取链接 |
| follow=True | 跟进链接继续爬取 |
| follow=False | 不跟进,只解析当前页 |
| JOBDIR | 保存爬虫状态,中断可续 |
| ROBOTSTXT_OBEY | 遵守robots.txt |
| MONGO_URI | MongoDB连接地址 |
| upsert=True | 不存在则插入,存在则更新 |
八、课后作业
必做题:
- 用Scrapy爬取任意新闻网站的文章列表
- 实现MongoDB存储Pipeline
- 配置自动限速和请求延迟
选做题:
- 实现爬虫健康监控脚本
- 部署到Linux服务器并设置定时任务
有问题欢迎评论区留言,大家一起讨论!
标签:Python | Scrapy | 爬虫实战 | 新闻网站 | 全站爬取 | MongoDB
更多推荐
所有评论(0)