Python爬虫实战:使用最新技术爬取荔枝微课数据
本文详细介绍了如何使用Python最新技术爬取荔枝微课网站数据。我们实现了以下功能:使用异步编程提高爬虫效率应对各种反爬虫机制处理JavaScript渲染内容实现数据存储和导出功能添加性能优化和容错机制异步编程:使用asyncio和aiohttp实现高性能异步爬虫反爬应对:通过User-Agent轮换、IP代理、请求频率控制等手段规避反爬虫JS渲染处理:使用Playwright处理动态加载内容数据
1. 引言
随着在线教育行业的蓬勃发展,荔枝微课作为国内知名的知识付费平台,汇集了大量优质课程资源。对于教育研究者、市场分析师或内容创作者而言,获取这些数据具有重要价值。本文将详细介绍如何使用Python最新爬虫技术,高效、合规地爬取荔枝微课数据。
网络爬虫是一种自动获取网页内容的程序,它通过模拟浏览器行为或直接发送HTTP请求来获取数据。近年来,随着网站反爬虫技术的不断升级,传统的简单爬虫已难以应对复杂的网络环境。因此,本文将采用一系列最新技术来构建一个健壮、高效的爬虫系统。
在开始之前,需要明确一点:爬虫使用应遵守网站robots.txt协议,尊重数据版权,不进行恶意爬取或侵犯用户隐私。本文仅用于技术学习目的。
2. 荔枝微课网站分析
2.1 网站结构分析
荔枝微课(https://www.lizhiweike.com)是一个典型的内容型网站,主要包含以下页面类型:
-
首页:展示推荐课程、热门分类等
-
课程列表页:按分类展示课程
-
课程详情页:展示课程具体信息
-
讲师页面:展示讲师信息和其开设的课程
2.2 数据接口分析
现代网站大多采用前后端分离架构,通过API接口动态加载数据。通过浏览器开发者工具分析,我们发现荔枝微课主要使用以下类型的接口:
-
课程列表API:返回分页的课程数据
-
课程详情API:返回特定课程的详细信息
-
搜索API:根据关键词返回搜索结果
这些API通常返回JSON格式数据,比解析HTML更高效、稳定。
2.3 反爬虫机制分析
荔枝微课采用了多种反爬虫技术:
-
User-Agent检测:验证请求头中的浏览器标识
-
IP频率限制:限制单个IP的请求频率
-
请求参数签名:API请求需要计算签名
-
动态Cookie验证:验证用户会话状态
-
JavaScript渲染:部分内容通过JavaScript动态加载
3. 技术选型与环境配置
3.1 技术栈选择
针对荔枝微课的特点,我们选择以下技术栈:
-
请求库:httpx(支持HTTP/2.0,异步请求)
-
浏览器自动化:Playwright(新一代浏览器自动化工具)
-
解析库:parsel(兼容XPath和CSS选择器)
-
异步框架:asyncio + aiohttp(高性能异步请求)
-
代理管理:requests-html(内置代理轮换功能)
-
数据存储:Pandas + SQLAlchemy(灵活的数据处理与存储)
3.2 环境配置
首先安装必要的Python库:
bash
pip install httpx playwright parsel aiohttp pandas sqlalchemy python -m playwright install
4. 反爬虫机制分析与应对策略
4.1 User-Agent轮换
使用fake-useragent库生成随机的浏览器标识:
python
from fake_useragent import UserAgent import random def get_random_headers(): ua = UserAgent() return { 'User-Agent': ua.random, 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', 'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', }
4.2 IP代理池
使用免费或付费代理服务构建IP代理池:
python
import aiohttp import asyncio from typing import List class ProxyPool: def __init__(self): self.proxies = [] self.current_index = 0 async def refresh_proxies(self): # 从代理服务商获取最新代理IP async with aiohttp.ClientSession() as session: async with session.get('https://api.proxy-service.com/proxies') as resp: data = await resp.json() self.proxies = [f"http://{p['ip']}:{p['port']}" for p in data['proxies']] def get_proxy(self): if not self.proxies: return None proxy = self.proxies[self.current_index] self.current_index = (self.current_index + 1) % len(self.proxies) return proxy
4.3 请求频率控制
实现智能的请求间隔控制,避免触发频率限制:
python
import time import random from datetime import datetime class RequestLimiter: def __init__(self, max_requests_per_minute=30): self.max_requests = max_requests_per_minute self.request_times = [] async def acquire(self): now = time.time() # 移除1分钟前的请求记录 self.request_times = [t for t in self.request_times if now - t < 60] # 如果已达到限制,等待 if len(self.request_times) >= self.max_requests: sleep_time = 60 - (now - self.request_times[0]) + random.uniform(0.1, 0.5) await asyncio.sleep(sleep_time) # 更新记录 self.request_times = [t for t in self.request_times if time.time() - t < 60] self.request_times.append(time.time()) # 添加随机延迟,模拟人类行为 await asyncio.sleep(random.uniform(1, 3))
5. 爬虫核心代码实现
5.1 基础爬虫类
首先实现一个基础爬虫类,封装通用的爬虫功能:
python
import asyncio import aiohttp import json import logging from urllib.parse import urljoin, urlencode from typing import Dict, Any, Optional, List class BaseCrawler: def __init__(self, base_url: str, max_concurrent: int = 5): self.base_url = base_url self.max_concurrent = max_concurrent self.session: Optional[aiohttp.ClientSession] = None self.logger = self._setup_logger() self.limiter = RequestLimiter() def _setup_logger(self): logger = logging.getLogger(self.__class__.__name__) logger.setLevel(logging.INFO) handler = logging.StreamHandler() formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) return logger async def __aenter__(self): self.session = aiohttp.ClientSession( headers=get_random_headers(), timeout=aiohttp.ClientTimeout(total=30) ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.session: await self.session.close() async def fetch(self, url: str, params: Dict = None, method: str = 'GET') -> Optional[str]: """发送HTTP请求并返回响应文本""" await self.limiter.acquire() try: full_url = urljoin(self.base_url, url) if params and method == 'GET': full_url += '?' + urlencode(params) async with self.session.request(method, full_url) as response: if response.status == 200: return await response.text() else: self.logger.error(f"请求失败: {response.status} - {full_url}") return None except Exception as e: self.logger.error(f"请求异常: {e}") return None async def fetch_json(self, url: str, params: Dict = None) -> Optional[Dict]: """获取JSON格式的响应""" text = await self.fetch(url, params) if text: try: return json.loads(text) except json.JSONDecodeError: self.logger.error("JSON解析失败") return None
5.2 荔枝微课爬虫实现
基于基础爬虫类,实现专门针对荔枝微课的爬虫:
python
import hashlib import time from parsel import Selector class LizhiWeikeCrawler(BaseCrawler): def __init__(self): super().__init__('https://www.lizhiweike.com') self.api_base = 'https://www.lizhiweike.com/api' def _generate_signature(self, params: Dict) -> str: """生成API请求签名(示例实现,实际需要根据网站具体算法调整)""" # 实际签名算法需要通过逆向工程分析JavaScript代码获得 # 这里只是一个示例 param_str = '&'.join([f'{k}={v}' for k, v in sorted(params.items())]) secret = 'lizhiweike_secret_key' # 需要从JS代码中提取 sign_str = param_str + secret return hashlib.md5(sign_str.encode()).hexdigest() async def get_categories(self) -> List[Dict]: """获取课程分类""" url = f'{self.api_base}/categories' data = await self.fetch_json(url) if data and data.get('success'): return data.get('data', []) return [] async def get_courses_by_category(self, category_id: int, page: int = 1, limit: int = 20) -> List[Dict]: """根据分类获取课程列表""" url = f'{self.api_base}/courses' params = { 'category_id': category_id, 'page': page, 'limit': limit, 'timestamp': int(time.time()) } # 添加签名 params['sign'] = self._generate_signature(params) data = await self.fetch_json(url, params) if data and data.get('success'): return data.get('data', {}).get('courses', []) return [] async def get_course_detail(self, course_id: int) -> Optional[Dict]: """获取课程详细信息""" url = f'{self.api_base}/course/{course_id}' data = await self.fetch_json(url) if data and data.get('success'): return data.get('data', {}) return None async def search_courses(self, keyword: str, page: int = 1) -> List[Dict]: """搜索课程""" url = f'{self.api_base}/search' params = { 'keyword': keyword, 'page': page, 'timestamp': int(time.time()) } params['sign'] = self._generate_signature(params) data = await self.fetch_json(url, params) if data and data.get('success'): return data.get('data', {}).get('courses', []) return [] async def get_instructor_info(self, instructor_id: int) -> Optional[Dict]: """获取讲师信息""" url = f'{self.api_base}/instructor/{instructor_id}' data = await self.fetch_json(url) if data and data.get('success'): return data.get('data', {}) return None
5.3 使用Playwright处理JavaScript渲染内容
对于通过JavaScript动态加载的内容,使用Playwright进行抓取:
python
from playwright.async_api import async_playwright class JSRenderCrawler: def __init__(self): self.browser = None self.context = None async def __aenter__(self): self.playwright = await async_playwright().start() self.browser = await self.playwright.chromium.launch( headless=True, args=['--no-sandbox', '--disable-setuid-sandbox'] ) self.context = await self.browser.new_context( user_agent=get_random_headers()['User-Agent'] ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.browser: await self.browser.close() await self.playwright.stop() async def get_dynamic_content(self, url: str, wait_for_selector: str = None) -> Optional[str]: """获取JavaScript渲染后的页面内容""" page = await self.context.new_page() try: await page.goto(url, wait_until='networkidle') if wait_for_selector: await page.wait_for_selector(wait_for_selector) # 等待页面完全加载 await page.wait_for_timeout(2000) content = await page.content() return content except Exception as e: logging.error(f"Playwright抓取失败: {e}") return None finally: await page.close() async def extract_course_data(self, course_url: str) -> Dict: """从课程页面提取详细信息""" html = await self.get_dynamic_content(course_url, '.course-detail') if not html: return {} selector = Selector(html) data = {} # 使用XPath和CSS选择器提取数据 data['title'] = selector.css('h1.course-title::text').get() data['price'] = selector.css('.course-price::text').get() data['students_count'] = selector.css('.students-count::text').get() data['rating'] = selector.css('.rating-value::text').get() # 提取课程目录 chapters = [] for chapter in selector.css('.chapter-item'): chapter_data = { 'title': chapter.css('.chapter-title::text').get(), 'sections': [] } for section in chapter.css('.section-item'): section_data = { 'title': section.css('.section-title::text').get(), 'duration': section.css('.section-duration::text').get() } chapter_data['sections'].append(section_data) chapters.append(chapter_data) data['chapters'] = chapters return data
6. 数据存储与处理
6.1 数据库设计
设计合理的数据表结构存储爬取的数据:
python
from sqlalchemy import create_engine, Column, Integer, String, Float, Text, DateTime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from datetime import datetime Base = declarative_base() class Course(Base): __tablename__ = 'courses' id = Column(Integer, primary_key=True) course_id = Column(Integer, unique=True, index=True) title = Column(String(200)) description = Column(Text) price = Column(Float) original_price = Column(Float) students_count = Column(Integer) rating = Column(Float) category_id = Column(Integer) instructor_id = Column(Integer) created_at = Column(DateTime, default=datetime.now) updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) class Instructor(Base): __tablename__ = 'instructors' id = Column(Integer, primary_key=True) instructor_id = Column(Integer, unique=True, index=True) name = Column(String(100)) description = Column(Text) courses_count = Column(Integer) followers_count = Column(Integer) created_at = Column(DateTime, default=datetime.now) class Category(Base): __tablename__ = 'categories' id = Column(Integer, primary_key=True) category_id = Column(Integer, unique=True, index=True) name = Column(String(50)) parent_id = Column(Integer, default=0) courses_count = Column(Integer) created_at = Column(DateTime, default=datetime.now)
6.2 数据存储管理器
实现数据存储管理器,支持多种存储方式:
python
import pandas as pd from sqlalchemy import create_engine class DataManager: def __init__(self, database_url: str = 'sqlite:///lizhiweike.db'): self.engine = create_engine(database_url) Base.metadata.create_all(self.engine) self.Session = sessionmaker(bind=self.engine) def save_courses(self, courses_data: List[Dict]): """保存课程数据到数据库""" session = self.Session() try: for course_data in courses_data: # 检查是否已存在 existing = session.query(Course).filter_by(course_id=course_data['id']).first() if existing: # 更新现有记录 for key, value in course_data.items(): if hasattr(existing, key): setattr(existing, key, value) existing.updated_at = datetime.now() else: # 创建新记录 course = Course( course_id=course_data['id'], title=course_data['title'], description=course_data.get('description', ''), price=course_data.get('price', 0), original_price=course_data.get('original_price', 0), students_count=course_data.get('students_count', 0), rating=course_data.get('rating', 0), category_id=course_data.get('category_id', 0), instructor_id=course_data.get('instructor_id', 0) ) session.add(course) session.commit() except Exception as e: session.rollback() logging.error(f"保存课程数据失败: {e}") finally: session.close() def export_to_excel(self, filepath: str): """导出数据到Excel文件""" with self.engine.connect() as conn: courses_df = pd.read_sql_table('courses', conn) instructors_df = pd.read_sql_table('instructors', conn) categories_df = pd.read_sql_table('categories', conn) with pd.ExcelWriter(filepath) as writer: courses_df.to_excel(writer, sheet_name='课程', index=False) instructors_df.to_excel(writer, sheet_name='讲师', index=False) categories_df.to_excel(writer, sheet_name='分类', index=False)
7. 爬虫性能优化
7.1 异步并发控制
使用asyncio.Semaphore控制并发数量,避免过度请求:
python
class ConcurrentCrawler: def __init__(self, max_concurrent: int = 10): self.semaphore = asyncio.Semaphore(max_concurrent) async def fetch_with_semaphore(self, url: str): async with self.semaphore: return await self.fetch(url) async def crawl_multiple(self, urls: List[str]): tasks = [self.fetch_with_semaphore(url) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) return results
7.2 缓存机制
实现请求缓存,避免重复请求相同内容:
python
import pickle import os from datetime import datetime, timedelta class CacheManager: def __init__(self, cache_dir: str = 'cache', ttl: int = 3600): self.cache_dir = cache_dir self.ttl = ttl # 缓存存活时间(秒) os.makedirs(cache_dir, exist_ok=True) def _get_cache_path(self, key: str) -> str: filename = hashlib.md5(key.encode()).hexdigest() + '.pkl' return os.path.join(self.cache_dir, filename) def get(self, key: str): cache_path = self._get_cache_path(key) if not os.path.exists(cache_path): return None # 检查缓存是否过期 mtime = datetime.fromtimestamp(os.path.getmtime(cache_path)) if datetime.now() - mtime > timedelta(seconds=self.ttl): os.remove(cache_path) return None with open(cache_path, 'rb') as f: return pickle.load(f) def set(self, key: str, value): cache_path = self._get_cache_path(key) with open(cache_path, 'wb') as f: pickle.dump(value, f)
7.3 断点续爬
实现断点续爬功能,提高爬虫的容错性:
python
import json class CheckpointManager: def __init__(self, checkpoint_file: str = 'checkpoint.json'): self.checkpoint_file = checkpoint_file self.data = self._load_checkpoint() def _load_checkpoint(self): if os.path.exists(self.checkpoint_file): with open(self.checkpoint_file, 'r') as f: return json.load(f) return {} def save_checkpoint(self, key: str, value): self.data[key] = value with open(self.checkpoint_file, 'w') as f: json.dump(self.data, f, indent=2) def get_checkpoint(self, key: str, default=None): return self.data.get(key, default)
8. 法律与道德考量
在开发和使用爬虫时,必须考虑法律和道德问题:
8.1 遵守robots.txt
首先检查网站的robots.txt文件,尊重网站的爬虫政策:
python
import urllib.robotparser def check_robots_permission(base_url: str, user_agent: str = '*') -> bool: rp = urllib.robotparser.RobotFileParser() rp.set_url(urllib.parse.urljoin(base_url, '/robots.txt')) rp.read() return rp.can_fetch(user_agent, base_url)
8.2 数据使用规范
-
仅爬取公开可访问的数据
-
不爬取用户个人信息等敏感数据
-
遵守网站的使用条款
-
合理控制爬取频率,避免对网站造成负担
-
爬取的数据仅用于学习研究目的
9. 完整代码示例
以下是一个完整的荔枝微课爬虫示例:
python
import asyncio import aiohttp import logging from datetime import datetime from data_manager import DataManager from lizhi_crawler import LizhiWeikeCrawler # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) class LizhiWeikeSpider: def __init__(self): self.crawler = LizhiWeikeCrawler() self.data_manager = DataManager() self.checkpoint = CheckpointManager() async def crawl_categories(self): """爬取所有分类""" logging.info("开始爬取分类信息") categories = await self.crawler.get_categories() # 保存分类信息 self.data_manager.save_categories(categories) logging.info(f"爬取完成,共获取{len(categories)}个分类") return categories async def crawl_courses_by_category(self, category_id: int, max_pages: int = 10): """爬取指定分类下的课程""" logging.info(f"开始爬取分类 {category_id} 的课程") all_courses = [] for page in range(1, max_pages + 1): # 检查断点 checkpoint_key = f'category_{category_id}_page' last_page = self.checkpoint.get_checkpoint(checkpoint_key, 0) if page <= last_page: logging.info(f"跳过已爬取的页面: {page}") continue logging.info(f"爬取第 {page} 页") courses = await self.crawler.get_courses_by_category(category_id, page) if not courses: logging.info(f"第 {page} 页无数据,停止爬取") break all_courses.extend(courses) # 保存课程数据 self.data_manager.save_courses(courses) # 更新断点 self.checkpoint.save_checkpoint(checkpoint_key, page) # 添加延迟 await asyncio.sleep(2) logging.info(f"分类 {category_id} 爬取完成,共获取{len(all_courses)}门课程") return all_courses async def crawl_course_details(self, course_ids: List[int]): """爬取课程详细信息""" logging.info(f"开始爬取 {len(course_ids)} 门课程的详细信息") details = [] for i, course_id in enumerate(course_ids): # 检查是否已爬取 if self.data_manager.course_detail_exists(course_id): logging.info(f"课程 {course_id} 详情已存在,跳过") continue logging.info(f"爬取课程详情 ({i+1}/{len(course_ids)}): {course_id}") detail = await self.crawler.get_course_detail(course_id) if detail: details.append(detail) self.data_manager.save_course_detail(detail) # 控制请求频率 if (i + 1) % 5 == 0: await asyncio.sleep(3) logging.info(f"课程详情爬取完成,共获取{len(details)}门课程的详细信息") return details async def run(self): """运行爬虫""" async with self.crawler: # 爬取分类 categories = await self.crawl_categories() # 爬取每个分类下的课程 all_courses = [] for category in categories: category_id = category['id'] courses = await self.crawl_courses_by_category(category_id) all_courses.extend(courses) # 提取课程ID course_ids = [course['id'] for course in all_courses if 'id' in course] # 爬取课程详情 await self.crawl_course_details(course_ids[:50]) # 限制数量,避免请求过多 # 导出数据 self.data_manager.export_to_excel('lizhiweike_data.xlsx') logging.info("数据导出完成") async def main(): spider = LizhiWeikeSpider() await spider.run() if __name__ == '__main__': asyncio.run(main())
10. 总结与展望
本文详细介绍了如何使用Python最新技术爬取荔枝微课网站数据。我们实现了以下功能:
-
使用异步编程提高爬虫效率
-
应对各种反爬虫机制
-
处理JavaScript渲染内容
-
实现数据存储和导出功能
-
添加性能优化和容错机制
技术要点总结:
-
异步编程:使用asyncio和aiohttp实现高性能异步爬虫
-
反爬应对:通过User-Agent轮换、IP代理、请求频率控制等手段规避反爬虫
-
JS渲染处理:使用Playwright处理动态加载内容
-
数据管理:使用SQLAlchemy实现灵活的数据存储
-
性能优化:通过并发控制、缓存、断点续爬等机制优化爬虫性能
未来改进方向:
-
分布式爬虫:将爬虫部署到多台机器,进一步提高爬取效率
-
智能解析:使用机器学习技术自动识别和提取网页数据
-
实时监控:添加爬虫运行状态监控和报警功能
-
数据质量评估:实现数据质量自动评估和清洗流程
更多推荐
所有评论(0)