1. 引言

随着在线教育行业的蓬勃发展,荔枝微课作为国内知名的知识付费平台,汇集了大量优质课程资源。对于教育研究者、市场分析师或内容创作者而言,获取这些数据具有重要价值。本文将详细介绍如何使用Python最新爬虫技术,高效、合规地爬取荔枝微课数据。

网络爬虫是一种自动获取网页内容的程序,它通过模拟浏览器行为或直接发送HTTP请求来获取数据。近年来,随着网站反爬虫技术的不断升级,传统的简单爬虫已难以应对复杂的网络环境。因此,本文将采用一系列最新技术来构建一个健壮、高效的爬虫系统。

在开始之前,需要明确一点:爬虫使用应遵守网站robots.txt协议,尊重数据版权,不进行恶意爬取或侵犯用户隐私。本文仅用于技术学习目的。

2. 荔枝微课网站分析

2.1 网站结构分析

荔枝微课(https://www.lizhiweike.com)是一个典型的内容型网站,主要包含以下页面类型:

  • 首页:展示推荐课程、热门分类等

  • 课程列表页:按分类展示课程

  • 课程详情页:展示课程具体信息

  • 讲师页面:展示讲师信息和其开设的课程

2.2 数据接口分析

现代网站大多采用前后端分离架构,通过API接口动态加载数据。通过浏览器开发者工具分析,我们发现荔枝微课主要使用以下类型的接口:

  1. 课程列表API:返回分页的课程数据

  2. 课程详情API:返回特定课程的详细信息

  3. 搜索API:根据关键词返回搜索结果

这些API通常返回JSON格式数据,比解析HTML更高效、稳定。

2.3 反爬虫机制分析

荔枝微课采用了多种反爬虫技术:

  1. User-Agent检测:验证请求头中的浏览器标识

  2. IP频率限制:限制单个IP的请求频率

  3. 请求参数签名:API请求需要计算签名

  4. 动态Cookie验证:验证用户会话状态

  5. JavaScript渲染:部分内容通过JavaScript动态加载

3. 技术选型与环境配置

3.1 技术栈选择

针对荔枝微课的特点,我们选择以下技术栈:

  • 请求库:httpx(支持HTTP/2.0,异步请求)

  • 浏览器自动化:Playwright(新一代浏览器自动化工具)

  • 解析库:parsel(兼容XPath和CSS选择器)

  • 异步框架:asyncio + aiohttp(高性能异步请求)

  • 代理管理:requests-html(内置代理轮换功能)

  • 数据存储:Pandas + SQLAlchemy(灵活的数据处理与存储)

3.2 环境配置

首先安装必要的Python库:

bash

pip install httpx playwright parsel aiohttp pandas sqlalchemy
python -m playwright install

4. 反爬虫机制分析与应对策略

4.1 User-Agent轮换

使用fake-useragent库生成随机的浏览器标识:

python

from fake_useragent import UserAgent
import random

def get_random_headers():
    ua = UserAgent()
    return {
        'User-Agent': ua.random,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
        'Accept-Encoding': 'gzip, deflate, br',
        'Connection': 'keep-alive',
        'Upgrade-Insecure-Requests': '1',
    }

4.2 IP代理池

使用免费或付费代理服务构建IP代理池:

python

import aiohttp
import asyncio
from typing import List

class ProxyPool:
    def __init__(self):
        self.proxies = []
        self.current_index = 0
    
    async def refresh_proxies(self):
        # 从代理服务商获取最新代理IP
        async with aiohttp.ClientSession() as session:
            async with session.get('https://api.proxy-service.com/proxies') as resp:
                data = await resp.json()
                self.proxies = [f"http://{p['ip']}:{p['port']}" for p in data['proxies']]
    
    def get_proxy(self):
        if not self.proxies:
            return None
        proxy = self.proxies[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.proxies)
        return proxy

4.3 请求频率控制

实现智能的请求间隔控制,避免触发频率限制:

python

import time
import random
from datetime import datetime

class RequestLimiter:
    def __init__(self, max_requests_per_minute=30):
        self.max_requests = max_requests_per_minute
        self.request_times = []
    
    async def acquire(self):
        now = time.time()
        # 移除1分钟前的请求记录
        self.request_times = [t for t in self.request_times if now - t < 60]
        
        # 如果已达到限制,等待
        if len(self.request_times) >= self.max_requests:
            sleep_time = 60 - (now - self.request_times[0]) + random.uniform(0.1, 0.5)
            await asyncio.sleep(sleep_time)
            # 更新记录
            self.request_times = [t for t in self.request_times if time.time() - t < 60]
        
        self.request_times.append(time.time())
        # 添加随机延迟,模拟人类行为
        await asyncio.sleep(random.uniform(1, 3))

5. 爬虫核心代码实现

5.1 基础爬虫类

首先实现一个基础爬虫类,封装通用的爬虫功能:

python

import asyncio
import aiohttp
import json
import logging
from urllib.parse import urljoin, urlencode
from typing import Dict, Any, Optional, List

class BaseCrawler:
    def __init__(self, base_url: str, max_concurrent: int = 5):
        self.base_url = base_url
        self.max_concurrent = max_concurrent
        self.session: Optional[aiohttp.ClientSession] = None
        self.logger = self._setup_logger()
        self.limiter = RequestLimiter()
    
    def _setup_logger(self):
        logger = logging.getLogger(self.__class__.__name__)
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        return logger
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            headers=get_random_headers(),
            timeout=aiohttp.ClientTimeout(total=30)
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch(self, url: str, params: Dict = None, method: str = 'GET') -> Optional[str]:
        """发送HTTP请求并返回响应文本"""
        await self.limiter.acquire()
        
        try:
            full_url = urljoin(self.base_url, url)
            if params and method == 'GET':
                full_url += '?' + urlencode(params)
            
            async with self.session.request(method, full_url) as response:
                if response.status == 200:
                    return await response.text()
                else:
                    self.logger.error(f"请求失败: {response.status} - {full_url}")
                    return None
        except Exception as e:
            self.logger.error(f"请求异常: {e}")
            return None
    
    async def fetch_json(self, url: str, params: Dict = None) -> Optional[Dict]:
        """获取JSON格式的响应"""
        text = await self.fetch(url, params)
        if text:
            try:
                return json.loads(text)
            except json.JSONDecodeError:
                self.logger.error("JSON解析失败")
        return None

5.2 荔枝微课爬虫实现

基于基础爬虫类,实现专门针对荔枝微课的爬虫:

python

import hashlib
import time
from parsel import Selector

class LizhiWeikeCrawler(BaseCrawler):
    def __init__(self):
        super().__init__('https://www.lizhiweike.com')
        self.api_base = 'https://www.lizhiweike.com/api'
    
    def _generate_signature(self, params: Dict) -> str:
        """生成API请求签名(示例实现,实际需要根据网站具体算法调整)"""
        # 实际签名算法需要通过逆向工程分析JavaScript代码获得
        # 这里只是一个示例
        param_str = '&'.join([f'{k}={v}' for k, v in sorted(params.items())])
        secret = 'lizhiweike_secret_key'  # 需要从JS代码中提取
        sign_str = param_str + secret
        return hashlib.md5(sign_str.encode()).hexdigest()
    
    async def get_categories(self) -> List[Dict]:
        """获取课程分类"""
        url = f'{self.api_base}/categories'
        data = await self.fetch_json(url)
        if data and data.get('success'):
            return data.get('data', [])
        return []
    
    async def get_courses_by_category(self, category_id: int, page: int = 1, limit: int = 20) -> List[Dict]:
        """根据分类获取课程列表"""
        url = f'{self.api_base}/courses'
        params = {
            'category_id': category_id,
            'page': page,
            'limit': limit,
            'timestamp': int(time.time())
        }
        # 添加签名
        params['sign'] = self._generate_signature(params)
        
        data = await self.fetch_json(url, params)
        if data and data.get('success'):
            return data.get('data', {}).get('courses', [])
        return []
    
    async def get_course_detail(self, course_id: int) -> Optional[Dict]:
        """获取课程详细信息"""
        url = f'{self.api_base}/course/{course_id}'
        data = await self.fetch_json(url)
        if data and data.get('success'):
            return data.get('data', {})
        return None
    
    async def search_courses(self, keyword: str, page: int = 1) -> List[Dict]:
        """搜索课程"""
        url = f'{self.api_base}/search'
        params = {
            'keyword': keyword,
            'page': page,
            'timestamp': int(time.time())
        }
        params['sign'] = self._generate_signature(params)
        
        data = await self.fetch_json(url, params)
        if data and data.get('success'):
            return data.get('data', {}).get('courses', [])
        return []
    
    async def get_instructor_info(self, instructor_id: int) -> Optional[Dict]:
        """获取讲师信息"""
        url = f'{self.api_base}/instructor/{instructor_id}'
        data = await self.fetch_json(url)
        if data and data.get('success'):
            return data.get('data', {})
        return None

5.3 使用Playwright处理JavaScript渲染内容

对于通过JavaScript动态加载的内容,使用Playwright进行抓取:

python

from playwright.async_api import async_playwright

class JSRenderCrawler:
    def __init__(self):
        self.browser = None
        self.context = None
    
    async def __aenter__(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(
            headless=True,
            args=['--no-sandbox', '--disable-setuid-sandbox']
        )
        self.context = await self.browser.new_context(
            user_agent=get_random_headers()['User-Agent']
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.browser:
            await self.browser.close()
        await self.playwright.stop()
    
    async def get_dynamic_content(self, url: str, wait_for_selector: str = None) -> Optional[str]:
        """获取JavaScript渲染后的页面内容"""
        page = await self.context.new_page()
        
        try:
            await page.goto(url, wait_until='networkidle')
            
            if wait_for_selector:
                await page.wait_for_selector(wait_for_selector)
            
            # 等待页面完全加载
            await page.wait_for_timeout(2000)
            
            content = await page.content()
            return content
        except Exception as e:
            logging.error(f"Playwright抓取失败: {e}")
            return None
        finally:
            await page.close()
    
    async def extract_course_data(self, course_url: str) -> Dict:
        """从课程页面提取详细信息"""
        html = await self.get_dynamic_content(course_url, '.course-detail')
        if not html:
            return {}
        
        selector = Selector(html)
        data = {}
        
        # 使用XPath和CSS选择器提取数据
        data['title'] = selector.css('h1.course-title::text').get()
        data['price'] = selector.css('.course-price::text').get()
        data['students_count'] = selector.css('.students-count::text').get()
        data['rating'] = selector.css('.rating-value::text').get()
        
        # 提取课程目录
        chapters = []
        for chapter in selector.css('.chapter-item'):
            chapter_data = {
                'title': chapter.css('.chapter-title::text').get(),
                'sections': []
            }
            for section in chapter.css('.section-item'):
                section_data = {
                    'title': section.css('.section-title::text').get(),
                    'duration': section.css('.section-duration::text').get()
                }
                chapter_data['sections'].append(section_data)
            chapters.append(chapter_data)
        
        data['chapters'] = chapters
        
        return data

6. 数据存储与处理

6.1 数据库设计

设计合理的数据表结构存储爬取的数据:

python

from sqlalchemy import create_engine, Column, Integer, String, Float, Text, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

Base = declarative_base()

class Course(Base):
    __tablename__ = 'courses'
    
    id = Column(Integer, primary_key=True)
    course_id = Column(Integer, unique=True, index=True)
    title = Column(String(200))
    description = Column(Text)
    price = Column(Float)
    original_price = Column(Float)
    students_count = Column(Integer)
    rating = Column(Float)
    category_id = Column(Integer)
    instructor_id = Column(Integer)
    created_at = Column(DateTime, default=datetime.now)
    updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

class Instructor(Base):
    __tablename__ = 'instructors'
    
    id = Column(Integer, primary_key=True)
    instructor_id = Column(Integer, unique=True, index=True)
    name = Column(String(100))
    description = Column(Text)
    courses_count = Column(Integer)
    followers_count = Column(Integer)
    created_at = Column(DateTime, default=datetime.now)

class Category(Base):
    __tablename__ = 'categories'
    
    id = Column(Integer, primary_key=True)
    category_id = Column(Integer, unique=True, index=True)
    name = Column(String(50))
    parent_id = Column(Integer, default=0)
    courses_count = Column(Integer)
    created_at = Column(DateTime, default=datetime.now)

6.2 数据存储管理器

实现数据存储管理器,支持多种存储方式:

python

import pandas as pd
from sqlalchemy import create_engine

class DataManager:
    def __init__(self, database_url: str = 'sqlite:///lizhiweike.db'):
        self.engine = create_engine(database_url)
        Base.metadata.create_all(self.engine)
        self.Session = sessionmaker(bind=self.engine)
    
    def save_courses(self, courses_data: List[Dict]):
        """保存课程数据到数据库"""
        session = self.Session()
        
        try:
            for course_data in courses_data:
                # 检查是否已存在
                existing = session.query(Course).filter_by(course_id=course_data['id']).first()
                if existing:
                    # 更新现有记录
                    for key, value in course_data.items():
                        if hasattr(existing, key):
                            setattr(existing, key, value)
                    existing.updated_at = datetime.now()
                else:
                    # 创建新记录
                    course = Course(
                        course_id=course_data['id'],
                        title=course_data['title'],
                        description=course_data.get('description', ''),
                        price=course_data.get('price', 0),
                        original_price=course_data.get('original_price', 0),
                        students_count=course_data.get('students_count', 0),
                        rating=course_data.get('rating', 0),
                        category_id=course_data.get('category_id', 0),
                        instructor_id=course_data.get('instructor_id', 0)
                    )
                    session.add(course)
            
            session.commit()
        except Exception as e:
            session.rollback()
            logging.error(f"保存课程数据失败: {e}")
        finally:
            session.close()
    
    def export_to_excel(self, filepath: str):
        """导出数据到Excel文件"""
        with self.engine.connect() as conn:
            courses_df = pd.read_sql_table('courses', conn)
            instructors_df = pd.read_sql_table('instructors', conn)
            categories_df = pd.read_sql_table('categories', conn)
        
        with pd.ExcelWriter(filepath) as writer:
            courses_df.to_excel(writer, sheet_name='课程', index=False)
            instructors_df.to_excel(writer, sheet_name='讲师', index=False)
            categories_df.to_excel(writer, sheet_name='分类', index=False)

7. 爬虫性能优化

7.1 异步并发控制

使用asyncio.Semaphore控制并发数量,避免过度请求:

python

class ConcurrentCrawler:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_with_semaphore(self, url: str):
        async with self.semaphore:
            return await self.fetch(url)
    
    async def crawl_multiple(self, urls: List[str]):
        tasks = [self.fetch_with_semaphore(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

7.2 缓存机制

实现请求缓存,避免重复请求相同内容:

python

import pickle
import os
from datetime import datetime, timedelta

class CacheManager:
    def __init__(self, cache_dir: str = 'cache', ttl: int = 3600):
        self.cache_dir = cache_dir
        self.ttl = ttl  # 缓存存活时间(秒)
        os.makedirs(cache_dir, exist_ok=True)
    
    def _get_cache_path(self, key: str) -> str:
        filename = hashlib.md5(key.encode()).hexdigest() + '.pkl'
        return os.path.join(self.cache_dir, filename)
    
    def get(self, key: str):
        cache_path = self._get_cache_path(key)
        if not os.path.exists(cache_path):
            return None
        
        # 检查缓存是否过期
        mtime = datetime.fromtimestamp(os.path.getmtime(cache_path))
        if datetime.now() - mtime > timedelta(seconds=self.ttl):
            os.remove(cache_path)
            return None
        
        with open(cache_path, 'rb') as f:
            return pickle.load(f)
    
    def set(self, key: str, value):
        cache_path = self._get_cache_path(key)
        with open(cache_path, 'wb') as f:
            pickle.dump(value, f)

7.3 断点续爬

实现断点续爬功能,提高爬虫的容错性:

python

import json

class CheckpointManager:
    def __init__(self, checkpoint_file: str = 'checkpoint.json'):
        self.checkpoint_file = checkpoint_file
        self.data = self._load_checkpoint()
    
    def _load_checkpoint(self):
        if os.path.exists(self.checkpoint_file):
            with open(self.checkpoint_file, 'r') as f:
                return json.load(f)
        return {}
    
    def save_checkpoint(self, key: str, value):
        self.data[key] = value
        with open(self.checkpoint_file, 'w') as f:
            json.dump(self.data, f, indent=2)
    
    def get_checkpoint(self, key: str, default=None):
        return self.data.get(key, default)

8. 法律与道德考量

在开发和使用爬虫时,必须考虑法律和道德问题:

8.1 遵守robots.txt

首先检查网站的robots.txt文件,尊重网站的爬虫政策:

python

import urllib.robotparser

def check_robots_permission(base_url: str, user_agent: str = '*') -> bool:
    rp = urllib.robotparser.RobotFileParser()
    rp.set_url(urllib.parse.urljoin(base_url, '/robots.txt'))
    rp.read()
    return rp.can_fetch(user_agent, base_url)

8.2 数据使用规范

  • 仅爬取公开可访问的数据

  • 不爬取用户个人信息等敏感数据

  • 遵守网站的使用条款

  • 合理控制爬取频率,避免对网站造成负担

  • 爬取的数据仅用于学习研究目的

9. 完整代码示例

以下是一个完整的荔枝微课爬虫示例:

python

import asyncio
import aiohttp
import logging
from datetime import datetime
from data_manager import DataManager
from lizhi_crawler import LizhiWeikeCrawler

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

class LizhiWeikeSpider:
    def __init__(self):
        self.crawler = LizhiWeikeCrawler()
        self.data_manager = DataManager()
        self.checkpoint = CheckpointManager()
    
    async def crawl_categories(self):
        """爬取所有分类"""
        logging.info("开始爬取分类信息")
        categories = await self.crawler.get_categories()
        # 保存分类信息
        self.data_manager.save_categories(categories)
        logging.info(f"爬取完成,共获取{len(categories)}个分类")
        return categories
    
    async def crawl_courses_by_category(self, category_id: int, max_pages: int = 10):
        """爬取指定分类下的课程"""
        logging.info(f"开始爬取分类 {category_id} 的课程")
        
        all_courses = []
        for page in range(1, max_pages + 1):
            # 检查断点
            checkpoint_key = f'category_{category_id}_page'
            last_page = self.checkpoint.get_checkpoint(checkpoint_key, 0)
            if page <= last_page:
                logging.info(f"跳过已爬取的页面: {page}")
                continue
            
            logging.info(f"爬取第 {page} 页")
            courses = await self.crawler.get_courses_by_category(category_id, page)
            
            if not courses:
                logging.info(f"第 {page} 页无数据,停止爬取")
                break
            
            all_courses.extend(courses)
            
            # 保存课程数据
            self.data_manager.save_courses(courses)
            
            # 更新断点
            self.checkpoint.save_checkpoint(checkpoint_key, page)
            
            # 添加延迟
            await asyncio.sleep(2)
        
        logging.info(f"分类 {category_id} 爬取完成,共获取{len(all_courses)}门课程")
        return all_courses
    
    async def crawl_course_details(self, course_ids: List[int]):
        """爬取课程详细信息"""
        logging.info(f"开始爬取 {len(course_ids)} 门课程的详细信息")
        
        details = []
        for i, course_id in enumerate(course_ids):
            # 检查是否已爬取
            if self.data_manager.course_detail_exists(course_id):
                logging.info(f"课程 {course_id} 详情已存在,跳过")
                continue
            
            logging.info(f"爬取课程详情 ({i+1}/{len(course_ids)}): {course_id}")
            detail = await self.crawler.get_course_detail(course_id)
            
            if detail:
                details.append(detail)
                self.data_manager.save_course_detail(detail)
            
            # 控制请求频率
            if (i + 1) % 5 == 0:
                await asyncio.sleep(3)
        
        logging.info(f"课程详情爬取完成,共获取{len(details)}门课程的详细信息")
        return details
    
    async def run(self):
        """运行爬虫"""
        async with self.crawler:
            # 爬取分类
            categories = await self.crawl_categories()
            
            # 爬取每个分类下的课程
            all_courses = []
            for category in categories:
                category_id = category['id']
                courses = await self.crawl_courses_by_category(category_id)
                all_courses.extend(courses)
            
            # 提取课程ID
            course_ids = [course['id'] for course in all_courses if 'id' in course]
            
            # 爬取课程详情
            await self.crawl_course_details(course_ids[:50])  # 限制数量,避免请求过多
            
            # 导出数据
            self.data_manager.export_to_excel('lizhiweike_data.xlsx')
            logging.info("数据导出完成")

async def main():
    spider = LizhiWeikeSpider()
    await spider.run()

if __name__ == '__main__':
    asyncio.run(main())

10. 总结与展望

本文详细介绍了如何使用Python最新技术爬取荔枝微课网站数据。我们实现了以下功能:

  1. 使用异步编程提高爬虫效率

  2. 应对各种反爬虫机制

  3. 处理JavaScript渲染内容

  4. 实现数据存储和导出功能

  5. 添加性能优化和容错机制

技术要点总结:

  • 异步编程:使用asyncio和aiohttp实现高性能异步爬虫

  • 反爬应对:通过User-Agent轮换、IP代理、请求频率控制等手段规避反爬虫

  • JS渲染处理:使用Playwright处理动态加载内容

  • 数据管理:使用SQLAlchemy实现灵活的数据存储

  • 性能优化:通过并发控制、缓存、断点续爬等机制优化爬虫性能

未来改进方向:

  1. 分布式爬虫:将爬虫部署到多台机器,进一步提高爬取效率

  2. 智能解析:使用机器学习技术自动识别和提取网页数据

  3. 实时监控:添加爬虫运行状态监控和报警功能

  4. 数据质量评估:实现数据质量自动评估和清洗流程

Logo

更多推荐