Python实战:高效爬取A股全量股票数据并自动化存储

1. 数据采集环境搭建

在开始爬取A股数据之前,我们需要配置合适的开发环境。建议使用Python 3.7+版本,这是目前大多数金融数据接口兼容性最好的版本。以下是环境配置的核心步骤:

# 安装必要库(建议使用虚拟环境)
pip install requests beautifulsoup4 pandas openpyxl lxml

关键组件说明

  • requests :用于发送HTTP请求获取网页内容
  • beautifulsoup4 :HTML解析利器
  • pandas :数据清洗与分析的核心工具
  • openpyxl :Excel文件操作支持
  • lxml :高性能HTML/XML解析器

提示:如果遇到网络问题,可以尝试使用国内镜像源安装,如清华源: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple package_name

2. 目标网站分析与选择

A股数据获取有多种渠道,我们需要评估不同数据源的特点:

数据源 优点 缺点 反爬难度
东方财富网 数据全面,更新及时 反爬机制较强
新浪财经 接口稳定,结构清晰 部分数据需要登录
网易财经 历史数据丰富 动态加载内容较多
雪球网 社区数据有特色 需要处理大量动态内容

本例选择新浪财经接口 ,因其具有以下优势:

  • 提供清晰的API接口
  • 数据格式规范
  • 无需登录即可获取基础信息

3. 核心爬虫实现

3.1 请求头伪装技术

金融网站通常有严格的反爬机制,合理的请求头设置是成功的第一步:

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
    'Connection': 'keep-alive',
    'Referer': 'https://finance.sina.com.cn/'
}

3.2 多线程采集优化

A股股票数量庞大(超过4000只),单线程采集效率低下。我们可以使用 concurrent.futures 实现并发请求:

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_stock_data(stock_code):
    url = f'https://finance.sina.com.cn/realstock/company/{stock_code}/nc.shtml'
    try:
        response = requests.get(url, headers=headers, timeout=10)
        # 解析逻辑...
        return parsed_data
    except Exception as e:
        print(f"Error fetching {stock_code}: {str(e)}")
        return None

with ThreadPoolExecutor(max_workers=8) as executor:
    futures = [executor.submit(fetch_stock_data, code) for code in stock_codes]
    results = [f.result() for f in as_completed(futures) if f.result()]

3.3 数据解析技巧

使用BeautifulSoup解析HTML时,需要注意新浪财经的页面结构特点:

from bs4 import BeautifulSoup

def parse_stock_page(html):
    soup = BeautifulSoup(html, 'lxml')
    
    stock_info = {
        'code': soup.select('.code')[0].text.strip(),
        'name': soup.select('.name')[0].text.strip(),
        'price': soup.select('.price')[0].text.strip(),
        'change': soup.select('.change')[0].text.strip(),
        'volume': soup.select('.volume')[0].text.strip(),
        'market_cap': soup.select('.market-cap')[0].text.strip(),
        'pe_ratio': soup.select('.pe-ratio')[0].text.strip()
    }
    
    # 处理特殊标记(ST/*ST)
    if 'ST' in stock_info['name']:
        stock_info['special_flag'] = 'ST'
    elif '*ST' in stock_info['name']:
        stock_info['special_flag'] = '*ST'
    else:
        stock_info['special_flag'] = ''
    
    return stock_info

4. 数据存储方案

4.1 Excel存储优化

使用pandas直接导出Excel虽然简单,但大数据量时存在性能问题。我们可以采用以下优化策略:

import pandas as pd
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl import Workbook

def save_to_excel_optimized(data, filename):
    # 创建Workbook对象
    wb = Workbook()
    ws = wb.active
    
    # 添加表头
    headers = list(data[0].keys())
    ws.append(headers)
    
    # 批量写入数据
    for item in data:
        row = [item[key] for key in headers]
        ws.append(row)
    
    # 自动调整列宽
    for column in ws.columns:
        max_length = 0
        column_letter = column[0].column_letter
        for cell in column:
            try:
                if len(str(cell.value)) > max_length:
                    max_length = len(str(cell.value))
            except:
                pass
        adjusted_width = (max_length + 2) * 1.2
        ws.column_dimensions[column_letter].width = adjusted_width
    
    wb.save(filename)

4.2 多格式输出支持

除了Excel,我们可以增加CSV和JSON格式输出,方便不同场景使用:

def export_data(data, filename, format='excel'):
    df = pd.DataFrame(data)
    
    if format == 'excel':
        df.to_excel(f'{filename}.xlsx', index=False)
    elif format == 'csv':
        df.to_csv(f'{filename}.csv', index=False, encoding='utf_8_sig')
    elif format == 'json':
        df.to_json(f'{filename}.json', orient='records', force_ascii=False)
    else:
        raise ValueError("Unsupported format")

5. 异常处理与日志记录

健壮的爬虫需要完善的异常处理机制:

import logging
from datetime import datetime

# 配置日志系统
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'stock_crawler_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler()
    ]
)

def safe_request(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = requests.get(url, headers=headers, timeout=15)
            response.raise_for_status()
            return response
        except requests.exceptions.RequestException as e:
            wait_time = (attempt + 1) * 5
            logging.warning(f"Attempt {attempt + 1} failed: {str(e)}. Retrying in {wait_time} seconds...")
            time.sleep(wait_time)
    
    logging.error(f"Failed after {max_retries} attempts: {url}")
    return None

6. 完整实现代码

以下是整合所有模块的完整实现:

import requests
from bs4 import BeautifulSoup
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import logging
from datetime import datetime
from openpyxl import Workbook

# 日志配置
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f'stock_crawler_{datetime.now().strftime("%Y%m%d")}.log'),
        logging.StreamHandler()
    ]
)

# 请求头配置
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2',
    'Connection': 'keep-alive',
    'Referer': 'https://finance.sina.com.cn/'
}

def get_all_stock_codes():
    """获取沪深两市所有股票代码"""
    # 实际项目中应从可靠数据源获取,这里使用示例数据
    return ['000001', '600000', '300001']  # 示例代码

def parse_stock_page(html):
    """解析股票详情页"""
    soup = BeautifulSoup(html, 'lxml')
    
    # 实际解析逻辑需要根据新浪财经页面结构调整
    stock_info = {
        'code': soup.select('.code')[0].text.strip(),
        'name': soup.select('.name')[0].text.strip(),
        'price': soup.select('.price')[0].text.strip(),
        'change': soup.select('.change')[0].text.strip(),
        'volume': soup.select('.volume')[0].text.strip(),
        'market_cap': soup.select('.market-cap')[0].text.strip(),
        'pe_ratio': soup.select('.pe-ratio')[0].text.strip()
    }
    
    # 处理特殊标记
    if 'ST' in stock_info['name']:
        stock_info['special_flag'] = 'ST'
    elif '*ST' in stock_info['name']:
        stock_info['special_flag'] = '*ST'
    else:
        stock_info['special_flag'] = ''
    
    return stock_info

def fetch_single_stock(stock_code):
    """获取单只股票数据"""
    url = f'https://finance.sina.com.cn/realstock/company/{stock_code}/nc.shtml'
    try:
        response = requests.get(url, headers=HEADERS, timeout=10)
        response.raise_for_status()
        return parse_stock_page(response.text)
    except Exception as e:
        logging.error(f"Error fetching {stock_code}: {str(e)}")
        return None

def fetch_all_stocks():
    """批量获取所有股票数据"""
    stock_codes = get_all_stock_codes()
    results = []
    
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = {executor.submit(fetch_single_stock, code): code for code in stock_codes}
        
        for future in as_completed(futures):
            code = futures[future]
            try:
                result = future.result()
                if result:
                    results.append(result)
                    logging.info(f"Successfully fetched {code}")
            except Exception as e:
                logging.error(f"Error processing {code}: {str(e)}")
    
    return results

def save_to_excel(data, filename):
    """保存数据到Excel"""
    df = pd.DataFrame(data)
    
    # 优化Excel写入
    writer = pd.ExcelWriter(filename, engine='openpyxl')
    df.to_excel(writer, index=False)
    
    # 调整列宽
    worksheet = writer.sheets['Sheet1']
    for column in worksheet.columns:
        max_length = 0
        column_letter = column[0].column_letter
        for cell in column:
            try:
                if len(str(cell.value)) > max_length:
                    max_length = len(str(cell.value))
            except:
                pass
        adjusted_width = (max_length + 2) * 1.2
        worksheet.column_dimensions[column_letter].width = adjusted_width
    
    writer.save()
    logging.info(f"Data saved to {filename}")

if __name__ == '__main__':
    start_time = time.time()
    
    logging.info("Starting stock data collection...")
    stock_data = fetch_all_stocks()
    
    if stock_data:
        save_to_excel(stock_data, 'A股全量股票数据.xlsx')
        logging.info(f"Completed! Total {len(stock_data)} stocks fetched.")
    else:
        logging.error("No data was collected.")
    
    elapsed = time.time() - start_time
    logging.info(f"Total execution time: {elapsed:.2f} seconds")

7. 项目扩展建议

  1. 增量更新机制 :记录上次采集时间,只获取新增或变更的数据
  2. 数据验证 :添加数据校验逻辑,确保采集的数据质量
  3. 定时任务 :使用APScheduler等工具实现定时自动采集
  4. 数据库存储 :对于长期数据积累,建议使用MySQL或MongoDB
  5. 可视化分析 :结合Matplotlib/Pyecharts对采集的数据进行分析展示

注意:实际运行前请确保遵守目标网站的robots.txt规定,合理设置采集间隔,避免对目标服务器造成过大压力。商业使用前请确认数据授权情况。

更多推荐