在电商数字化运营中,构建高效、稳定的商品数据通道对于企业决策、竞品分析和用户体验优化至关重要。本文将详细介绍如何设计和实现一个京东商品数据通道方案,通过 API 接口开发与实时采集技术,实现京东商品数据的高效获取、处理与分发。

方案架构设计

京东商品数据通道方案采用分层架构设计,主要包含以下几个核心模块:

  1. 数据采集层:负责从京东网站实时采集商品数据
  2. 数据处理层:对采集到的原始数据进行清洗、解析和标准化
  3. API 服务层:提供标准化接口供内部系统和外部应用调用
  4. 缓存层:缓存热点数据,提高访问速度并减轻采集压力
  5. 监控告警层:监控系统运行状态,及时发现并处理异常

核心技术选型

本方案采用以下技术栈实现:

  • 开发语言:Python 3.9+
  • Web 框架:FastAPI(高性能 API 开发)
  • 数据采集:Requests + PyQuery(网页爬取与解析)
  • 缓存系统:Redis(数据缓存与分布式锁)
  • 任务队列:Celery(异步任务处理)
  • 消息队列:RabbitMQ(数据分发)
  • 数据库:MongoDB(非结构化商品数据存储)
  • 监控系统:Prometheus + Grafana(性能监控)

详细实现方案

1. 数据采集模块开发

数据采集模块负责从京东网站获取商品数据,需要处理反爬机制并保证采集效率。

import time
import random
import json
import re
from typing import Dict, Optional, List
import requests
from pyquery import PyQuery as pq
from fake_useragent import UserAgent
import redis
from redis.exceptions import RedisError
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class JDDataCollector:
    def __init__(self, redis_host: str = 'localhost', redis_port: int = 6379, redis_db: int = 0):
        """初始化京东数据采集器"""
        self.ua = UserAgent()
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
        self.proxies_pool = self._load_proxies()  # 代理池
        self.request_interval = random.uniform(1.5, 3.5)  # 随机请求间隔,避免被反爬
        
    def _load_proxies(self) -> List[str]:
        """加载代理IP列表(实际应用中可从代理服务商API获取)"""
        try:
            # 这里只是示例,实际应用中应使用真实代理服务
            response = requests.get("https://api.proxies.com/get_proxies")
            if response.status_code == 200:
                return response.json().get('proxies', [])
            return []
        except Exception as e:
            logger.error(f"加载代理池失败: {str(e)}")
            return []
    
    def _get_random_proxy(self) -> Optional[Dict]:
        """获取随机代理"""
        if not self.proxies_pool:
            return None
        proxy = random.choice(self.proxies_pool)
        return {
            'http': f'http://{proxy}',
            'https': f'https://{proxy}'
        }
    
    def _get_headers(self) -> Dict[str, str]:
        """生成随机请求头"""
        return {
            'User-Agent': self.ua.random,
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
            'Referer': 'https://www.jd.com/',
            'Connection': 'keep-alive',
            'Cache-Control': 'max-age=0'
        }
    
    def _is_rate_limited(self, product_id: str) -> bool:
        """检查是否达到请求频率限制"""
        key = f"jd:rate_limit:{product_id}"
        try:
            # 检查该商品ID是否在10秒内被请求过
            if self.redis_client.exists(key):
                return True
            # 设置10秒过期
            self.redis_client.setex(key, 10, 1)
            return False
        except RedisError as e:
            logger.warning(f"Redis操作失败: {str(e)}, 跳过频率限制检查")
            return False
    
    def get_product_detail(self, product_id: str) -> Optional[Dict]:
        """
        获取商品详情数据
        :param product_id: 京东商品ID
        :return: 商品详情字典,失败返回None
        """
        # 检查频率限制
        if self._is_rate_limited(product_id):
            logger.warning(f"商品 {product_id} 请求过于频繁,触发频率限制")
            return None
            
        url = f"https://item.jd.com/{product_id}.html"
        headers = self._get_headers()
        proxies = self._get_random_proxy()
        
        try:
            # 随机延迟,模拟人类浏览行为
            time.sleep(self.request_interval)
            
            # 发送请求
            response = requests.get(
                url, 
                headers=headers, 
                proxies=proxies,
                timeout=10,
                allow_redirects=True
            )
            
            # 检查响应状态
            if response.status_code != 200:
                logger.error(f"商品 {product_id} 请求失败,状态码: {response.status_code}")
                return None
                
            # 解析页面内容
            doc = pq(response.text)
            
            # 提取商品基本信息
            product_data = {
                "product_id": product_id,
                "title": self._extract_title(doc),
                "price": self._get_product_price(product_id),
                "images": self._extract_images(doc),
                "category": self._extract_category(doc),
                "specifications": self._extract_specifications(doc),
                "seller_info": self._extract_seller_info(doc),
                "stock_status": self._check_stock(product_id),
                "tags": self._extract_tags(doc),
                "crawl_time": time.strftime("%Y-%m-%d %H:%M:%S")
            }
            
            logger.info(f"成功获取商品 {product_id} 详情")
            return product_data
            
        except requests.exceptions.RequestException as e:
            logger.error(f"商品 {product_id} 请求异常: {str(e)}")
            return None
        except Exception as e:
            logger.error(f"商品 {product_id} 解析异常: {str(e)}")
            return None
    
    def _extract_title(self, doc: pq) -> Optional[str]:
        """提取商品标题"""
        title = doc('.sku-name').text().strip()
        return title if title else None
    
    def _get_product_price(self, product_id: str) -> Optional[float]:
        """获取商品价格(通过京东价格API)"""
        try:
            price_url = f"https://p.3.cn/prices/mgets?skuIds=J_{product_id}"
            response = requests.get(price_url, headers=self._get_headers(), timeout=5)
            
            if response.status_code == 200 and response.text:
                price_data = json.loads(response.text)
                if price_data and len(price_data) > 0:
                    price = price_data[0].get('p')
                    return float(price) if price else None
            return None
        except Exception as e:
            logger.error(f"获取商品 {product_id} 价格失败: {str(e)}")
            return None
    
    def _extract_images(self, doc: pq) -> List[str]:
        """提取商品图片URL"""
        images = []
        
        # 主图
        main_img = doc('#spec-img').attr('src')
        if main_img:
            img_url = f"https:{main_img}" if main_img.startswith('//') else main_img
            images.append(img_url)
        
        # 缩略图
        for img in doc('.spec-items img').items():
            img_src = img.attr('src') or img.attr('data-src')
            if img_src:
                img_url = f"https:{img_src}" if img_src.startswith('//') else img_src
                # 替换为高清图
                img_url = img_url.replace('/n9/', '/n1/')
                images.append(img_url)
        
        # 去重并返回
        return list(set(images))
    
    def _extract_category(self, doc: pq) -> List[str]:
        """提取商品分类"""
        category_path = doc('.breadcrumb').text().strip()
        if category_path:
            return [c.strip() for c in category_path.split('>') if c.strip()]
        return []
    
    def _extract_specifications(self, doc: pq) -> Dict[str, str]:
        """提取商品规格参数"""
        specs = {}
        for item in doc('.parameter2 li').items():
            text = item.text().strip()
            if ':' in text:
                key, value = text.split(':', 1)
                specs[key.strip()] = value.strip()
        return specs
    
    def _extract_seller_info(self, doc: pq) -> Dict[str, any]:
        """提取卖家信息"""
        seller_info = {
            "name": None,
            "url": None,
            "is_self_operated": False,
            "score": None
        }
        
        # 检查是否为京东自营
        if doc('.u-jd').length > 0:
            seller_info["is_self_operated"] = True
            seller_info["name"] = "京东自营"
        
        # 第三方卖家信息
        shop_link = doc('.shopname a')
        if shop_link.length > 0:
            seller_info["name"] = shop_link.text().strip()
            shop_url = shop_link.attr('href')
            if shop_url:
                seller_info["url"] = f"https:{shop_url}" if shop_url.startswith('//') else shop_url
        
        return seller_info
    
    def _check_stock(self, product_id: str) -> str:
        """检查商品库存状态"""
        try:
            stock_url = f"https://c0.3.cn/stock?skuId={product_id}&area=1_72_2799_0&venderId=1000000000&buyNum=1"
            response = requests.get(stock_url, headers=self._get_headers(), timeout=5)
            
            if response.status_code == 200 and response.text:
                # 解析JSONP响应
                json_str = re.findall(r'\((.*?)\)', response.text)[0]
                stock_data = json.loads(json_str)
                stock_status = stock_data.get('stock', {}).get('stockStatus', 0)
                return "有货" if stock_status == 1 else "无货"
            return "未知"
        except Exception as e:
            logger.error(f"检查商品 {product_id} 库存失败: {str(e)}")
            return "未知"
    
    def _extract_tags(self, doc: pq) -> List[str]:
        """提取商品标签"""
        tags = []
        for tag in doc('.tag-common').items():
            tag_text = tag.text().strip()
            if tag_text:
                tags.append(tag_text)
        return tags
    
    def batch_collect_products(self, product_ids: List[str]) -> List[Dict]:
        """批量采集商品数据"""
        results = []
        for product_id in product_ids:
            data = self.get_product_detail(product_id)
            if data:
                results.append(data)
            # 批量采集增加额外延迟
            time.sleep(random.uniform(0.5, 1.5))
        return results

2. 数据处理与缓存模块

该模块负责对采集到的原始数据进行清洗、标准化,并实现高效缓存机制。

import json
import time
from typing import Dict, Optional, List
import redis
from pymongo import MongoClient
from pymongo.errors import PyMongoError
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class DataProcessor:
    def __init__(self, 
                 redis_host: str = 'localhost', 
                 redis_port: int = 6379, 
                 redis_db: int = 0,
                 mongo_uri: str = 'mongodb://localhost:27017/',
                 mongo_db: str = 'jd_product_db'):
        """初始化数据处理器"""
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=redis_db, decode_responses=True)
        self.mongo_client = MongoClient(mongo_uri)
        self.db = self.mongo_client[mongo_db]
        self.product_collection = self.db['products']
        
        # 创建索引提高查询效率
        self.product_collection.create_index('product_id', unique=True)
        self.product_collection.create_index('crawl_time')
        self.product_collection.create_index('category')
    
    def process_product_data(self, raw_data: Dict) -> Optional[Dict]:
        """
        处理原始商品数据,进行清洗和标准化
        :param raw_data: 原始采集数据
        :return: 处理后的标准化数据
        """
        if not raw_data or 'product_id' not in raw_data:
            logger.error("无效的商品数据,缺少product_id")
            return None
            
        try:
            # 数据清洗和标准化
            processed = {
                'product_id': raw_data['product_id'],
                'title': raw_data.get('title', '').strip()[:500],  # 限制标题长度
                'price': float(raw_data.get('price', 0)) if raw_data.get('price') else 0,
                'original_price': float(raw_data.get('original_price', 0)) if raw_data.get('original_price') else 0,
                'discount': round(
                    (float(raw_data.get('price', 0)) / float(raw_data.get('original_price', 1)) * 100) 
                    if (raw_data.get('price') and raw_data.get('original_price') and float(raw_data.get('original_price', 0)) > 0) 
                    else 0, 
                    1
                ),
                'images': [img.strip() for img in raw_data.get('images', []) if img and img.strip()],
                'category': raw_data.get('category', []),
                'main_category': raw_data.get('category', [])[0] if raw_data.get('category') else None,
                'specifications': raw_data.get('specifications', {}),
                'seller_info': raw_data.get('seller_info', {}),
                'is_self_operated': raw_data.get('seller_info', {}).get('is_self_operated', False),
                'stock_status': raw_data.get('stock_status', '未知'),
                'tags': raw_data.get('tags', []),
                'crawl_time': raw_data.get('crawl_time', time.strftime("%Y-%m-%d %H:%M:%S")),
                'update_time': time.strftime("%Y-%m-%d %H:%M:%S")
            }
            
            # 提取品牌信息(从规格参数中)
            processed['brand'] = processed['specifications'].get('品牌', None)
            
            return processed
        except Exception as e:
            logger.error(f"处理商品 {raw_data.get('product_id')} 数据失败: {str(e)}")
            return None
    
    def cache_product_data(self, product_data: Dict, expire_seconds: int = 3600) -> bool:
        """
        缓存商品数据到Redis
        :param product_data: 商品数据
        :param expire_seconds: 过期时间(秒),默认1小时
        :return: 缓存成功返回True,否则返回False
        """
        if not product_data or 'product_id' not in product_data:
            return False
            
        product_id = product_data['product_id']
        cache_key = f"jd:product:{product_id}"
        
        try:
            # 存储为JSON字符串
            self.redis_client.setex(
                cache_key, 
                expire_seconds, 
                json.dumps(product_data, ensure_ascii=False)
            )
            logger.info(f"商品 {product_id} 数据已缓存,过期时间 {expire_seconds} 秒")
            return True
        except Exception as e:
            logger.error(f"缓存商品 {product_id} 数据失败: {str(e)}")
            return False
    
    def get_cached_product(self, product_id: str) -> Optional[Dict]:
        """
        从缓存获取商品数据
        :param product_id: 商品ID
        :return: 商品数据字典,不存在返回None
        """
        cache_key = f"jd:product:{product_id}"
        
        try:
            data = self.redis_client.get(cache_key)
            if data:
                logger.info(f"从缓存获取商品 {product_id} 数据")
                return json.loads(data)
            return None
        except Exception as e:
            logger.error(f"获取商品 {product_id} 缓存数据失败: {str(e)}")
            return None
    
    def save_to_database(self, product_data: Dict) -> bool:
        """
        保存商品数据到MongoDB
        :param product_data: 商品数据
        :return: 保存成功返回True,否则返回False
        """
        if not product_data or 'product_id' not in product_data:
            return False
            
        product_id = product_data['product_id']
        
        try:
            # 存在则更新,不存在则插入
            result = self.product_collection.update_one(
                {'product_id': product_id},
                {'$set': product_data},
                upsert=True
            )
            
            if result.upserted_id:
                logger.info(f"商品 {product_id} 数据已插入数据库")
            else:
                logger.info(f"商品 {product_id} 数据已更新")
            return True
        except PyMongoError as e:
            logger.error(f"保存商品 {product_id} 到数据库失败: {str(e)}")
            return False
    
    def batch_process_and_save(self, products: List[Dict]) -> Dict:
        """
        批量处理并保存商品数据
        :param products: 商品数据列表
        :return: 处理结果统计
        """
        stats = {
            'total': len(products),
            'processed': 0,
            'cached': 0,
            'saved': 0,
            'failed': 0
        }
        
        for product in products:
            try:
                # 处理数据
                processed = self.process_product_data(product)
                if not processed:
                    stats['failed'] += 1
                    continue
                
                stats['processed'] += 1
                
                # 缓存数据
                if self.cache_product_data(processed):
                    stats['cached'] += 1
                
                # 保存到数据库
                if self.save_to_database(processed):
                    stats['saved'] += 1
            except Exception as e:
                logger.error(f"批量处理商品 {product.get('product_id')} 失败: {str(e)}")
                stats['failed'] += 1
        
        logger.info(f"批量处理完成: {json.dumps(stats, ensure_ascii=False)}")
        return stats

3. API 服务层实现

使用 FastAPI 构建高性能 API 服务,提供标准化的数据访问接口。

from fastapi import FastAPI, Query, HTTPException, Depends, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from typing import Optional, List, Dict
import time
import json
import logging
from jd_collector import JDDataCollector
from data_processor import DataProcessor
import asyncio
from functools import lru_cache
import redis
from starlette.responses import Response

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# 初始化应用
app = FastAPI(
    title="京东商品数据API服务",
    description="提供京东商品数据的实时获取与查询接口",
    version="1.0.0"
)

# 允许跨域请求
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 初始化组件
collector = JDDataCollector()
processor = DataProcessor()

# 简单的请求计数中间件
@app.middleware("http")
async def request_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    
    # 记录请求日志
    logger.info(
        f"请求: {request.method} {request.url} | "
        f"状态码: {response.status_code} | "
        f"耗时: {process_time:.4f}秒 | "
        f"客户端IP: {request.client.host}"
    )
    return response

# 限流依赖项
async def rate_limiter(request: Request):
    """限制每个IP的请求频率"""
    redis_client = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
    client_ip = request.client.host
    key = f"rate_limit:{client_ip}"
    
    try:
        # 检查1分钟内的请求次数
        count = redis_client.incr(key)
        if count == 1:
            redis_client.expire(key, 60)  # 1分钟过期
            
        # 限制每分钟最多60次请求
        if count > 60:
            raise HTTPException(
                status_code=429,
                detail="请求过于频繁,请稍后再试",
                headers={"Retry-After": "60"}
            )
    except Exception as e:
        logger.warning(f"限流检查失败: {str(e)}")
    
    return client_ip

@app.get("/health", summary="服务健康检查")
async def health_check():
    """检查API服务是否正常运行"""
    return {
        "status": "healthy",
        "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
        "service": "jd-product-api"
    }

@app.get("/api/product/{product_id}", 
         summary="获取单个商品详情",
         dependencies=[Depends(rate_limiter)])
async def get_product(
    product_id: str,
    refresh: bool = Query(False, description="是否强制刷新数据,不使用缓存")
):
    """
    获取京东商品的详细信息
    
    - **product_id**: 京东商品ID
    - **refresh**: 是否强制刷新数据,不使用缓存
    """
    # 先检查缓存
    if not refresh:
        cached_data = processor.get_cached_product(product_id)
        if cached_data:
            return {
                "code": 200,
                "message": "success",
                "data": cached_data,
                "source": "cache"
            }
    
    # 缓存未命中或需要刷新,从源头获取
    raw_data = collector.get_product_detail(product_id)
    if not raw_data:
        raise HTTPException(status_code=404, detail=f"商品 {product_id} 数据获取失败")
    
    # 处理数据
    processed_data = processor.process_product_data(raw_data)
    if not processed_data:
        raise HTTPException(status_code=500, detail=f"商品 {product_id} 数据处理失败")
    
    # 缓存数据
    processor.cache_product_data(processed_data)
    
    # 保存到数据库(异步)
    async def save_async():
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(None, processor.save_to_database, processed_data)
    
    asyncio.create_task(save_async())
    
    return {
        "code": 200,
        "message": "success",
        "data": processed_data,
        "source": "fresh"
    }

@app.get("/api/products", 
         summary="批量获取商品详情",
         dependencies=[Depends(rate_limiter)])
async def get_products(
    ids: str = Query(..., description="商品ID列表,用逗号分隔,最多10个"),
    refresh: bool = Query(False, description="是否强制刷新数据,不使用缓存")
):
    """批量获取多个京东商品的详细信息"""
    product_ids = ids.split(',')[:10]  # 限制最多10个商品
    if not product_ids:
        raise HTTPException(status_code=400, detail="请提供有效的商品ID列表")
    
    results = []
    for pid in product_ids:
        try:
            # 检查缓存
            if not refresh:
                cached_data = processor.get_cached_product(pid)
                if cached_data:
                    results.append({
                        "product_id": pid,
                        "data": cached_data,
                        "source": "cache",
                        "error": None
                    })
                    continue
            
            # 从源头获取
            raw_data = collector.get_product_detail(pid)
            if not raw_data:
                results.append({
                    "product_id": pid,
                    "data": None,
                    "source": None,
                    "error": "数据获取失败"
                })
                continue
            
            # 处理数据
            processed_data = processor.process_product_data(raw_data)
            if not processed_data:
                results.append({
                    "product_id": pid,
                    "data": None,
                    "source": None,
                    "error": "数据处理失败"
                })
                continue
            
            # 缓存数据
            processor.cache_product_data(processed_data)
            
            # 保存到数据库(异步)
            async def save_async(data):
                loop = asyncio.get_event_loop()
                await loop.run_in_executor(None, processor.save_to_database, data)
            
            asyncio.create_task(save_async(processed_data))
            
            results.append({
                "product_id": pid,
                "data": processed_data,
                "source": "fresh",
                "error": None
            })
            
        except Exception as e:
            results.append({
                "product_id": pid,
                "data": None,
                "source": None,
                "error": str(e)
            })
        
        # 避免请求过于密集
        await asyncio.sleep(0.1)
    
    return {
        "code": 200,
        "message": "success",
        "count": len(results),
        "data": results
    }

@app.get("/api/category/{category_name}", 
         summary="按分类获取商品",
         dependencies=[Depends(rate_limiter)])
async def get_products_by_category(
    category_name: str,
    page: int = Query(1, ge=1, description="页码"),
    limit: int = Query(10, ge=1, le=50, description="每页数量")
):
    """按分类获取商品数据"""
    try:
        skip = (page - 1) * limit
        
        # 从数据库查询
        products = list(processor.product_collection.find(
            {"main_category": category_name},
            {"_id": 0}  # 排除MongoDB的_id字段
        ).skip(skip).limit(limit).sort("crawl_time", -1))
        
        # 获取总数
        total = processor.product_collection.count_documents({"main_category": category_name})
        
        return {
            "code": 200,
            "message": "success",
            "data": {
                "products": products,
                "pagination": {
                    "page": page,
                    "limit": limit,
                    "total": total,
                    "pages": (total + limit - 1) // limit
                }
            }
        }
    except Exception as e:
        logger.error(f"按分类查询商品失败: {str(e)}")
        raise HTTPException(status_code=500, detail="查询商品分类数据失败")

@app.get("/api/search", 
         summary="搜索商品",
         dependencies=[Depends(rate_limiter)])
async def search_products(
    keyword: str = Query(..., description="搜索关键词"),
    page: int = Query(1, ge=1, description="页码"),
    limit: int = Query(10, ge=1, le=50, description="每页数量")
):
    """搜索商品数据"""
    try:
        skip = (page - 1) * limit
        
        # 构建搜索条件
        query = {
            "$or": [
                {"title": {"$regex": keyword, "$options": "i"}},
                {"brand": {"$regex": keyword, "$options": "i"}},
                {"tags": {"$regex": keyword, "$options": "i"}}
            ]
        }
        
        # 从数据库查询
        products = list(processor.product_collection.find(
            query,
            {"_id": 0}  # 排除MongoDB的_id字段
        ).skip(skip).limit(limit).sort("crawl_time", -1))
        
        # 获取总数
        total = processor.product_collection.count_documents(query)
        
        return {
            "code": 200,
            "message": "success",
            "data": {
                "products": products,
                "pagination": {
                    "page": page,
                    "limit": limit,
                    "total": total,
                    "pages": (total + limit - 1) // limit
                }
            }
        }
    except Exception as e:
        logger.error(f"搜索商品失败: {str(e)}")
        raise HTTPException(status_code=500, detail="搜索商品数据失败")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=False, workers=4)

4. 异步任务队列实现

使用 Celery 实现异步任务处理,提高系统并发能力和稳定性。

import time
import json
from celery import Celery
from celery.utils.log import get_task_logger
from jd_collector import JDDataCollector
from data_processor import DataProcessor

# 初始化Celery
app = Celery(
    'jd_tasks',
    broker='amqp://guest@localhost//',  # RabbitMQ作为消息代理
    backend='redis://localhost:6379/0'  # Redis作为结果存储
)

# 配置任务
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
    worker_concurrency=4,  # 工作进程数量
    task_acks_late=True,   # 任务完成后才确认
    worker_prefetch_multiplier=1  # 每次预取1个任务
)

# 初始化日志
logger = get_task_logger(__name__)

# 初始化组件
collector = JDDataCollector()
processor = DataProcessor()

@app.task(bind=True, max_retries=3, retry_backoff=True)
def fetch_product_data(self, product_id: str):
    """
    异步获取商品数据的任务
    :param product_id: 商品ID
    :return: 处理后的商品数据
    """
    try:
        logger.info(f"开始获取商品 {product_id} 数据")
        
        # 获取原始数据
        raw_data = collector.get_product_detail(product_id)
        if not raw_data:
            logger.error(f"获取商品 {product_id} 原始数据失败")
            raise self.retry(exc=Exception(f"获取商品 {product_id} 原始数据失败"))
        
        # 处理数据
        processed_data = processor.process_product_data(raw_data)
        if not processed_data:
            logger.error(f"处理商品 {product_id} 数据失败")
            raise self.retry(exc=Exception(f"处理商品 {product_id} 数据失败"))
        
        # 缓存并保存数据
        processor.cache_product_data(processed_data)
        processor.save_to_database(processed_data)
        
        logger.info(f"商品 {product_id} 数据获取与处理完成")
        return processed_data
        
    except Exception as e:
        logger.error(f"处理商品 {product_id} 任务失败: {str(e)}")
        raise self.retry(exc=e)

@app.task(bind=True)
def batch_fetch_products(self, product_ids: list):
    """
    批量获取商品数据的任务
    :param product_ids: 商品ID列表
    :return: 处理结果统计
    """
    start_time = time.time()
    logger.info(f"开始批量处理 {len(product_ids)} 个商品数据")
    
    results = {
        "success": 0,
        "failed": 0,
        "total": len(product_ids),
        "failed_ids": [],
        "time_used": 0
    }
    
    try:
        # 批量采集
        raw_products = collector.batch_collect_products(product_ids)
        
        # 批量处理和保存
        process_result = processor.batch_process_and_save(raw_products)
        
        # 统计结果
        results["success"] = process_result["saved"]
        results["failed"] = process_result["failed"]
        results["failed_ids"] = [
            pid for pid in product_ids 
            if not any(p.get("product_id") == pid for p in raw_products)
        ]
        
    except Exception as e:
        logger.error(f"批量处理商品任务失败: {str(e)}")
        results["failed"] = len(product_ids)
        results["failed_ids"] = product_ids
        
    finally:
        results["time_used"] = time.time() - start_time
        logger.info(f"批量处理完成: {json.dumps(results, ensure_ascii=False)}")
        return results

@app.task
def scheduled_product_update(category: str = None, limit: int = 100):
    """
    定时更新商品数据的任务
    :param category: 商品分类,为None则更新所有分类
    :param limit: 每次更新的商品数量限制
    """
    logger.info(f"开始定时更新商品数据,分类: {category}, 数量限制: {limit}")
    
    try:
        # 查询需要更新的商品(例如:24小时未更新的商品)
        query = {}
        if category:
            query["main_category"] = category
            
        # 查找最旧的数据
        old_products = list(processor.product_collection.find(
            query
        ).sort("update_time", 1).limit(limit))
        
        # 获取商品ID列表
        product_ids = [p["product_id"] for p in old_products]
        logger.info(f"将更新 {len(product_ids)} 个商品数据")
        
        # 批量更新
        if product_ids:
            batch_fetch_products.delay(product_ids)
            
        return {
            "status": "success",
            "updated_count": len(product_ids)
        }
        
    except Exception as e:
        logger.error(f"定时更新商品数据失败: {str(e)}")
        return {
            "status": "failed",
            "error": str(e)
        }

# 配置定时任务
app.conf.beat_schedule = {
    'update-popular-products-every-6-hours': {
        'task': 'tasks.scheduled_product_update',
        'schedule': 6 * 3600.0,  # 每6小时执行一次
        'args': (None, 200),  # 更新所有分类,每次200个
    },
    'update-electronics-daily': {
        'task': 'tasks.scheduled_product_update',
        'schedule': 24 * 3600.0,  # 每天执行一次
        'args': ('家用电器', 300),  # 更新家用电器分类,每次300个
    },
}

系统部署与监控

部署架构

本方案推荐采用 Docker 容器化部署,主要包含以下服务:

  1. FastAPI 应用服务(4 个工作进程)
  2. Celery Worker 服务(处理异步任务)
  3. Celery Beat 服务(处理定时任务)
  4. Redis 服务(缓存与结果存储)
  5. MongoDB 服务(数据持久化存储)
  6. RabbitMQ 服务(消息队列)
  7. Prometheus + Grafana(监控系统)

监控指标设计

关键监控指标包括:

  1. API 接口性能:响应时间、请求量、错误率
  2. 数据采集指标:成功率、平均耗时、被拒绝率
  3. 系统资源:CPU 使用率、内存占用、网络 IO
  4. 缓存命中率:Redis 缓存命中情况
  5. 任务队列:任务数量、处理速度、失败率

方案优势与扩展

方案优势

  1. 高性能:采用 FastAPI 框架和异步任务处理,支持高并发请求
  2. 稳定性:完善的错误处理和重试机制,保证数据获取的可靠性
  3. 可扩展性:模块化设计,便于功能扩展和横向扩展
  4. 反爬策略:代理池、随机请求头、频率控制等多重反爬措施
  5. 缓存机制:多级缓存设计,减少重复请求和提高响应速度

功能扩展方向

  1. 增加用户认证:实现 API 密钥和 OAuth2.0 认证,控制访问权限
  2. 数据订阅服务:允许用户订阅特定商品,数据变化时推送通知
  3. 数据分析模块:增加价格趋势分析、竞品对比等高级功能
  4. 分布式爬虫:扩展为分布式架构,提高大规模数据采集能力
  5. 多平台支持:扩展支持淘宝、拼多多等其他电商平台的数据采集

总结

本文详细介绍了京东商品数据通道的完整解决方案,从数据采集、处理、存储到 API 服务构建,实现了一个高效、稳定、可扩展的商品数据获取与分发系统。该方案采用现代化的技术栈和架构设计,能够满足电商数据分析、竞品监控、价格跟踪等多种业务场景的需求。

在实际应用中,还需要根据具体业务需求和数据规模进行适当调整和优化,特别是反爬策略和系统性能方面,需要持续迭代和改进以应对不断变化的网站机制和业务需求。

Logo

纵情码海钱塘涌,杭州开发者创新动! 属于杭州的开发者社区!致力于为杭州地区的开发者提供学习、合作和成长的机会;同时也为企业交流招聘提供舞台!

更多推荐