1. 项目概述:为什么需要“通用模板”?

干了这么多年测试开发,我最大的感受就是,很多活儿是重复的。今天要写个脚本去捞日志,明天要搞个工具去批量改配置,后天又要搭个数据校验的框架。每次都是从零开始,打开IDE,新建文件,然后开始敲那些几乎一样的导入语句、日志配置、异常处理……效率低不说,还容易出错,特别是赶进度的时候,一着急,连基本的错误重试都忘了加。

所以,我花了很长时间,把自己和团队里常用的自动化脚本套路,沉淀成了两套“通用模板”。这玩意儿不是什么高深莫测的框架,它就是两个Python文件。但你拿到手,改改配置、填填核心逻辑,一个健壮、可维护的自动化脚本就出来了。一套模板专门对付那些 一次性的、临时的、但逻辑可能有点复杂 的日常任务,比如数据清洗、环境检查、批量操作;另一套模板则用来构建 需要周期性执行、有明确状态流转、甚至要跟其他系统联动 的自动化流程,比如定时巡检、报表生成、流水线任务。

它们的价值不在于技术多新颖,而在于“开箱即用”和“减少决策”。你不用再纠结日志该用 logging 还是 print ,不用再琢磨异常怎么捕获才合理,也不用再重新设计命令行参数解析。模板都给你定好了最佳实践,你只需要关注最核心的业务逻辑。这就是“提效”最实在的体现。

2. 第一套模板:面向临时任务的“瑞士军刀”脚本

这套模板的设计哲学是: 轻量、灵活、自包含 。一个 .py 文件搞定所有事情,扔到任何有Python环境的机器上都能跑。它适合处理那些“一次性”但可能下次还会微调再用的任务。

2.1 模板结构与核心设计思想

整个模板的结构非常清晰,遵循了“配置 -> 初始化 -> 主逻辑 -> 收尾”的线性流程。但它的巧妙之处在于,通过几个关键组件,将脚本的可靠性提升了好几个档次。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
脚本描述:这里用一句话说明这个脚本是干什么的。
作者:YourName
日期:2023-10-27
"""

import argparse
import logging
import sys
import os
import traceback
from typing import Optional, Any
from datetime import datetime

# 可能常用的第三方库,按需取消注释
# import requests
# import pandas as pd
# from selenium import webdriver

设计思想解析

  1. Shebang和编码声明 #!/usr/bin/env python3 让脚本在Unix/Linux系统下可以直接 ./script.py 执行(需加执行权限)。 utf-8 编码声明避免中文乱码。
  2. 模块化导入 :将标准库、第三方库、自定义库分块导入,清晰且易于管理。 typing 用于类型提示,在现代Python开发中是提升代码可读性和IDE友好性的好习惯。
  3. 日志先行 :日志是调试和运维的“眼睛”。在脚本开头就定义好日志,确保任何地方都能记录状态。

2.2 核心组件深度拆解:日志、参数与异常处理

接下来是模板的三大支柱:日志配置、命令行参数解析和全局异常捕获。这是脚本是否“专业”和“健壮”的分水岭。

日志配置 :我强烈建议抛弃 print ,使用 logging 模块。模板里配置了一个同时输出到控制台和文件的日志器。

def setup_logging(log_level: str = "INFO", log_file: Optional[str] = None) -> logging.Logger:
    """
    配置日志。
    Args:
        log_level: 日志级别,如 'DEBUG', 'INFO', 'WARNING'
        log_file: 日志文件路径,如果不提供则只输出到控制台
    Returns:
        配置好的 logger 实例
    """
    logger = logging.getLogger(__name__)
    # 避免重复添加handler
    if logger.handlers:
        return logger

    logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))

    # 定义日志格式
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
    )

    # 控制台Handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    # 文件Handler (可选)
    if log_file:
        # 创建日志目录
        log_dir = os.path.dirname(log_file)
        if log_dir and not os.path.exists(log_dir):
            os.makedirs(log_dir)
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

    return logger

实操心得 :这里有个关键细节, if logger.handlers: 这行检查是为了防止 setup_logging 函数被多次调用时,给同一个logger重复添加handler,导致日志重复打印。这是实践中很容易踩的坑。

命令行参数解析 :使用 argparse 库,让脚本可以通过命令行灵活配置,而不是把参数硬编码在代码里。

def parse_args():
    """
    解析命令行参数。
    """
    parser = argparse.ArgumentParser(description='你的脚本描述在这里')
    parser.add_argument('-u', '--username', type=str, required=True, help='用户名,必填')
    parser.add_argument('-p', '--password', type=str, help='密码,如果不填则通过安全方式获取')
    parser.add_argument('-e', '--env', type=str, choices=['dev', 'test', 'prod'], default='test',
                        help='运行环境,默认为test')
    parser.add_argument('--debug', action='store_true', help='开启调试模式,将输出DEBUG级别日志')
    parser.add_argument('--dry-run', action='store_true', help='试运行模式,只打印将要执行的操作而不实际执行')
    # 可以添加更多参数...
    return parser.parse_args()

注意事项

  1. required=True 用于指定必填参数,如果用户没提供, argparse 会自动报错并显示帮助信息,省去我们自己写校验逻辑。
  2. action='store_true' 用于实现布尔开关,指定该参数即为 True ,不指定则为 False
  3. choices 参数限制输入范围,有效防止无效值传入。
  4. --dry-run (试运行)是一个非常重要的参数,在涉及数据删除、线上操作等危险动作前,先用此模式跑一遍,确认逻辑无误,能避免灾难性后果。

全局异常处理与脚本退出码 :这是保证脚本在任何异常情况下都能“体面退出”的关键。我们使用 sys.exit(code) 来返回退出码,调用方(如Jenkins、Cron)可以根据退出码判断脚本执行成功与否。

def main():
    """脚本主函数"""
    args = parse_args()
    log_level = "DEBUG" if args.debug else "INFO"
    logger = setup_logging(log_level=log_level, log_file=f"./logs/script_{datetime.now().strftime('%Y%m%d')}.log")

    exit_code = 0  # 0 代表成功,非0代表失败
    try:
        logger.info("脚本开始执行")
        logger.info(f"接收到的参数: {vars(args)}")

        # >>>>>>>>>>>>>>>>>>> 你的核心业务逻辑从这里开始 <<<<<<<<<<<<<<<<<<<<
        # 示例:一个简单的处理
        if args.dry_run:
            logger.warning("DRY-RUN 模式:模拟执行操作...")
            # 这里写模拟执行的逻辑
            result = simulate_core_operation(args)
        else:
            logger.info("正式执行模式...")
            # 这里写实际执行的逻辑
            result = real_core_operation(args)
        # >>>>>>>>>>>>>>>>>>> 你的核心业务逻辑到这里结束 <<<<<<<<<<<<<<<<<<<<

        logger.info(f"脚本执行成功,结果: {result}")

    except argparse.ArgumentError as e:
        logger.error(f"命令行参数错误: {e}")
        exit_code = 2
    except KeyboardInterrupt:
        logger.warning("用户中断执行 (Ctrl+C)")
        exit_code = 130  # Unix/Linux 标准中断退出码
    except Exception as e:
        logger.error(f"脚本执行过程中发生未预期异常: {e}")
        logger.error(traceback.format_exc())  # 打印完整的异常堆栈,便于定位
        exit_code = 1
    finally:
        # 无论成功失败,都会执行的清理工作,如关闭数据库连接、浏览器驱动等
        cleanup_resources()
        logger.info(f"脚本退出,退出码: {exit_code}")
    sys.exit(exit_code)

核心技巧

  1. vars(args) 可以将 argparse 的命名空间对象转换成字典,方便日志记录全部参数。
  2. 不同的异常类型用不同的 except 块捕获,并设置不同的退出码。这能让上游系统更精确地知道失败原因(如参数错误、用户中断、程序异常)。
  3. finally 块确保资源清理(如关闭文件句柄、网络连接)一定会执行,避免资源泄漏。
  4. traceback.format_exc() 在捕获到未知异常时至关重要,它能打印出错误发生的完整调用链,是线上排查问题的“救命稻草”。

2.3 模板应用实例:一个批量查询API状态的脚本

假设我们需要一个脚本,批量检查一组微服务的健康状态(通过调用其 /health 端点)。

第一步:复制模板 。将上述模板代码保存为 check_service_health.py

第二步:填充核心逻辑 。找到 main() 函数中“你的核心业务逻辑”部分,替换为以下内容:

        # 核心业务逻辑
        services = [
            {"name": "user-service", "url": f"http://{args.env}-user-service.example.com/health"},
            {"name": "order-service", "url": f"http://{args.env}-order-service.example.com/health"},
            {"name": "payment-service", "url": f"http://{args.env}-payment-service.example.com/health"},
        ]

        unhealthy_services = []
        for svc in services:
            try:
                # 使用 requests 库,需要先 pip install requests
                import requests
                resp = requests.get(svc['url'], timeout=5)
                if resp.status_code == 200:
                    logger.info(f"服务 {svc['name']} 状态健康")
                else:
                    logger.warning(f"服务 {svc['name']} 返回异常状态码: {resp.status_code}")
                    unhealthy_services.append(svc['name'])
            except requests.exceptions.RequestException as e:
                logger.error(f"请求服务 {svc['name']} 失败: {e}")
                unhealthy_services.append(svc['name'])

        if unhealthy_services:
            logger.error(f"以下服务不健康: {unhealthy_services}")
            # 这里可以触发告警,如发送邮件、钉钉消息等
            # send_alert(unhealthy_services)
            exit_code = 1  # 将退出码置为失败
        else:
            logger.info("所有服务状态健康")

第三步:定义参数和运行 。我们可能不需要额外的命令行参数,直接用 --env 指定环境即可。

# 试运行,检查测试环境
python check_service_health.py --env test --dry-run

# 正式运行,检查生产环境
python check_service_health.py --env prod

# 开启调试模式,查看详细日志
python check_service_health.py --env test --debug

你看,我们几乎没有在“脚手架”代码上花时间,直接聚焦在“检查服务健康”这个业务逻辑上,而且产出的脚本自带完善的日志、错误处理和命令行接口,可靠性极高。

3. 第二套模板:面向流程的“状态机”式任务编排

第一套模板适合线性任务。但当任务变得复杂,包含多个步骤,且步骤间有依赖关系、可能失败、需要重试时,我们就需要第二套模板。它的核心思想是: 将流程抽象为一系列步骤(Step),每个步骤独立且可配置,并由一个执行引擎(Engine)负责调度、状态管理和持久化

这套模板模仿了简单工作流引擎的概念,特别适合用于搭建自动化测试流水线、数据ETL任务、每日巡检报告等场景。

3.1 架构设计:步骤、上下文与执行引擎

这套模板包含三个核心类: Step Context Engine

  1. Step(步骤) :代表流程中的一个原子操作。每个Step需要实现 execute 方法,接收一个 Context 对象,并返回成功或失败。Step应该是无状态的,其运行所需数据和产生的数据都存放在 Context 中。
  2. Context(上下文) :一个在整个流程中传递的字典-like对象,用于在不同Step之间共享数据。比如,Step A从数据库读取的数据可以存入 context[‘raw_data’] ,Step B再从里面取出来处理。
  3. Engine(引擎) :负责按顺序执行一系列Step。它控制流程的生命周期:开始 -> 执行Step1 -> 更新上下文 -> 执行Step2 -> … -> 结束。引擎还负责错误处理、日志记录和可能的状态持久化(比如把执行进度记录到文件或数据库,防止脚本中途崩溃后全量重跑)。

3.2 模板代码实现详解

我们来实现一个基础版本。首先,定义 Step 基类和 Context 类。

# workflow_engine.py
import logging
import json
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from datetime import datetime

class Context:
    """流程上下文,用于在步骤间传递数据"""
    def __init__(self, initial_data: Optional[Dict[str, Any]] = None):
        self._data = initial_data or {}
        self.execution_id = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.logger = logging.getLogger(self.__class__.__name__)

    def set(self, key: str, value: Any):
        self._data[key] = value

    def get(self, key: str, default: Any = None) -> Any:
        return self._data.get(key, default)

    def to_dict(self) -> Dict[str, Any]:
        return self._data.copy()

class Step(ABC):
    """步骤抽象基类"""
    def __init__(self, name: str):
        self.name = name
        self.logger = logging.getLogger(self.__class__.__name__)

    @abstractmethod
    def execute(self, context: Context) -> bool:
        """
        执行步骤的核心逻辑。
        Args:
            context: 流程上下文
        Returns:
            bool: True表示成功,False表示失败
        """
        pass

    def __str__(self):
        return f"Step[{self.name}]"

接下来,实现一个简单的 SequentialEngine (顺序执行引擎)。

class SequentialEngine:
    """顺序执行引擎"""
    def __init__(self, steps: List[Step], context: Optional[Context] = None):
        self.steps = steps
        self.context = context or Context()
        self.logger = logging.getLogger(self.__class__.__name__)
        self._execution_log: List[Dict] = []  # 记录每个步骤的执行结果

    def run(self) -> bool:
        """
        按顺序执行所有步骤。
        Returns:
            bool: 整个流程是否全部成功
        """
        self.logger.info(f"开始执行流程,执行ID: {self.context.execution_id}")
        overall_success = True

        for i, step in enumerate(self.steps):
            step_log = {"step_name": step.name, "status": "unknown", "error": None}
            self.logger.info(f"开始执行步骤 ({i+1}/{len(self.steps)}): {step}")
            try:
                success = step.execute(self.context)
                if success:
                    step_log["status"] = "success"
                    self.logger.info(f"步骤执行成功: {step}")
                else:
                    step_log["status"] = "failed"
                    self.logger.error(f"步骤执行失败: {step}")
                    overall_success = False
                    # 这里可以定义步骤失败后的策略:继续、暂停还是终止?
                    # 我们选择失败即终止整个流程(Fail Fast)
                    break
            except Exception as e:
                step_log["status"] = "error"
                step_log["error"] = str(e)
                self.logger.exception(f"步骤执行时发生异常: {step}")  # logger.exception会记录堆栈
                overall_success = False
                break
            finally:
                self._execution_log.append(step_log)

        final_status = "成功" if overall_success else "失败"
        self.logger.info(f"流程执行{final_status}。总计步骤: {len(self.steps)}, 成功步骤: {len([l for l in self._execution_log if l['status']=='success'])}")
        # 可以将执行日志保存下来,用于后续分析或审计
        self._save_execution_log()
        return overall_success

    def _save_execution_log(self):
        """保存执行日志到文件"""
        log_file = f"./workflow_logs/execution_{self.context.execution_id}.json"
        import os
        log_dir = os.path.dirname(log_file)
        if log_dir and not os.path.exists(log_dir):
            os.makedirs(log_dir)
        try:
            with open(log_file, 'w', encoding='utf-8') as f:
                json.dump(self._execution_log, f, indent=2, ensure_ascii=False)
            self.logger.debug(f"执行日志已保存至: {log_file}")
        except Exception as e:
            self.logger.error(f"保存执行日志失败: {e}")

    def get_execution_log(self) -> List[Dict]:
        """获取本次执行的详细日志"""
        return self._execution_log.copy()

3.3 如何基于模板构建一个自动化流程

假设我们要构建一个“自动化日报生成”流程,包含三个步骤:1. 从数据库拉取数据;2. 处理分析数据;3. 生成PDF报告并邮件发送。

第一步:为每个步骤创建具体的Step类。

# daily_report_steps.py
from workflow_engine import Step, Context
import pandas as pd
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# 假设有数据库连接和PDF生成库
# import pymysql
# from reportlab.pdfgen import canvas

class FetchDataStep(Step):
    def execute(self, context: Context) -> bool:
        self.logger.info(f"执行步骤: {self.name}")
        # 模拟从数据库获取数据
        try:
            # conn = pymysql.connect(...)
            # df = pd.read_sql("SELECT * FROM daily_metrics", conn)
            # conn.close()
            df = pd.DataFrame({ # 模拟数据
                'date': ['2023-10-26', '2023-10-27'],
                'sales': [1000, 1200],
                'users': [150, 180]
            })
            context.set('raw_data', df)
            self.logger.info(f"成功获取数据,共 {len(df)} 条记录")
            return True
        except Exception as e:
            self.logger.error(f"获取数据失败: {e}")
            return False

class ProcessDataStep(Step):
    def execute(self, context: Context) -> bool:
        self.logger.info(f"执行步骤: {self.name}")
        df = context.get('raw_data')
        if df is None:
            self.logger.error("上下文中未找到 'raw_data'")
            return False
        try:
            # 进行一些数据处理,例如计算日均值
            df['date'] = pd.to_datetime(df['date'])
            avg_sales = df['sales'].mean()
            avg_users = df['users'].mean()
            summary = {
                'avg_sales': avg_sales,
                'avg_users': avg_users,
                'data_points': len(df)
            }
            context.set('processed_summary', summary)
            self.logger.info(f"数据处理完成,摘要: {summary}")
            return True
        except Exception as e:
            self.logger.error(f"处理数据失败: {e}")
            return False

class GenerateReportStep(Step):
    def __init__(self, name: str, recipient_email: str):
        super().__init__(name)
        self.recipient_email = recipient_email

    def execute(self, context: Context) -> bool:
        self.logger.info(f"执行步骤: {self.name}")
        summary = context.get('processed_summary')
        if summary is None:
            self.logger.error("上下文中未找到 'processed_summary'")
            return False
        try:
            # 1. 生成PDF(模拟)
            report_content = f"""
            每日业务报告
            =============
            统计日期: {context.execution_id}
            平均销售额: {summary['avg_sales']:.2f}
            平均用户数: {summary['avg_users']:.2f}
            数据点数: {summary['data_points']}
            """
            # pdf_path = f"./reports/report_{context.execution_id}.pdf"
            # ... 实际生成PDF的代码 ...
            self.logger.info("报告内容生成成功")

            # 2. 发送邮件(模拟,实际需要配置SMTP服务器)
            # self._send_email(report_content)
            self.logger.info(f"模拟发送报告至 {self.recipient_email}")
            return True
        except Exception as e:
            self.logger.error(f"生成或发送报告失败: {e}")
            return False

    def _send_email(self, content: str):
        # 实际的邮件发送逻辑
        msg = MIMEMultipart()
        msg['From'] = 'automation@example.com'
        msg['To'] = self.recipient_email
        msg['Subject'] = f'每日业务报告 - {datetime.now().strftime("%Y-%m-%d")}'
        msg.attach(MIMEText(content, 'plain'))
        # with smtplib.SMTP('smtp.example.com', 587) as server:
        #     server.starttls()
        #     server.login('user', 'password')
        #     server.send_message(msg)
        pass

第二步:组装流程并执行。

# main.py
import logging
from workflow_engine import SequentialEngine, Context
from daily_report_steps import FetchDataStep, ProcessDataStep, GenerateReportStep

def setup_logging():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.StreamHandler(),
            logging.FileHandler(f'./workflow_logs/engine_{datetime.now().strftime("%Y%m%d")}.log')
        ]
    )

if __name__ == "__main__":
    setup_logging()
    logger = logging.getLogger(__name__)

    # 1. 定义步骤
    steps = [
        FetchDataStep(name="拉取业务数据"),
        ProcessDataStep(name="处理与分析数据"),
        GenerateReportStep(name="生成并发送日报", recipient_email="team@example.com")
    ]

    # 2. 创建上下文(可以传入初始参数,如数据库连接字符串)
    context = Context(initial_data={'env': 'prod'})

    # 3. 创建并执行引擎
    engine = SequentialEngine(steps=steps, context=context)
    success = engine.run()

    # 4. 根据结果处理(例如,失败时发送告警)
    if not success:
        logger.error("日报生成流程失败!")
        # send_alert(engine.get_execution_log())
        exit(1)
    else:
        logger.info("日报生成流程执行完毕。")
        exit(0)

运行这个 main.py ,你就会看到一个清晰的、分步骤执行的自动化流程。每个步骤的成功失败都被独立记录,上下文确保了数据在步骤间流转,引擎控制了整体流程和状态。

高级技巧与扩展

  1. 步骤依赖与跳过 :可以在 Step 类中加入 depends_on 属性,引擎在执行前检查依赖步骤是否成功。还可以加入 skip 条件,根据上下文数据决定是否跳过该步骤。
  2. 并行执行 :可以实现一个 ParallelEngine ,利用 concurrent.futures 线程池来并发执行没有依赖关系的步骤,大幅提升效率。
  3. 状态持久化 :将 Context _execution_log 序列化后保存到数据库(如SQLite、Redis)。这样即使脚本进程被杀死,重启后可以从上一个成功的步骤继续执行(断点续跑)。
  4. 可视化与监控 :将 _execution_log 结构化存储后,可以很容易地开发一个简单的Web界面来查看历史流程的执行状态、耗时和日志,方便运维。

4. 模板的进阶使用与融合实践

两套模板并非互斥,在实际项目中,我经常将它们结合使用,形成更强大的自动化能力。

4.1 场景:使用“瑞士军刀”脚本驱动“状态机”流程

想象一个更复杂的场景:我们需要每周一凌晨运行一个 数据备份与校验流程 。这个流程包含:1. 从生产数据库备份数据;2. 将备份文件上传到云存储;3. 从云存储下载备份文件到测试环境;4. 在测试环境恢复数据并运行校验脚本;5. 发送校验结果通知。

这个流程步骤多,且有明确的顺序和依赖(上传成功才能下载)。这显然是第二套模板(状态机)的用武之地。但是,每个步骤本身可能就是一个复杂的操作,比如“运行校验脚本”,这个脚本本身可能需要参数、需要完善的日志和错误处理——这正是第一套模板的专长。

解决方案 :为“运行校验脚本”这个步骤( RunValidationStep ),在其 execute 方法中,不是直接写一堆校验逻辑,而是去 调用一个由第一套模板构建的独立脚本

class RunValidationStep(Step):
    def execute(self, context: Context) -> bool:
        backup_file_path = context.get('backup_file_path')
        if not backup_file_path:
            self.logger.error("上下文缺少备份文件路径")
            return False

        # 使用第一套模板风格的脚本
        # 假设我们已经有一个健壮的校验脚本:validate_backup.py
        # 它接受 --backup-file 和 --env 参数,并返回适当的退出码
        import subprocess
        cmd = [
            'python', 'path/to/validate_backup.py',
            '--backup-file', backup_file_path,
            '--env', 'testing',
            '--log-level', 'INFO'
        ]
        self.logger.info(f"执行校验命令: {' '.join(cmd)}")
        try:
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
            # 将子进程的日志输出到当前日志
            if result.stdout:
                self.logger.info(f"校验脚本输出: {result.stdout}")
            if result.stderr:
                self.logger.error(f"校验脚本错误: {result.stderr}")

            # 判断子进程退出码,0表示成功
            if result.returncode == 0:
                self.logger.info("数据校验通过")
                context.set('validation_result', 'PASS')
                return True
            else:
                self.logger.error(f"数据校验失败,退出码: {result.returncode}")
                context.set('validation_result', 'FAIL')
                return False
        except subprocess.TimeoutExpired:
            self.logger.error("校验脚本执行超时")
            return False
        except Exception as e:
            self.logger.exception(f"执行校验脚本时发生异常: {e}")
            return False

这样做的好处是:

  • 解耦 :校验逻辑( validate_backup.py )可以独立开发、测试和运行。
  • 复用 validate_backup.py 这个“瑞士军刀”脚本可以在其他地方被单独调用。
  • 维护性 :步骤( RunValidationStep )只负责流程控制,具体校验能力由专门的脚本提供,职责清晰。

4.2 性能与稳定性考量

当自动化脚本承担起重要职责时,性能和稳定性就必须纳入考量。

  1. 超时控制 :任何网络请求、外部命令调用都必须设置超时。如上例中的 subprocess.run(..., timeout=300) 。避免因为一个挂起的操作阻塞整个流程。
  2. 重试机制 :对于可能因网络抖动等临时性问题失败的操作,应加入重试逻辑。可以封装一个带重试的装饰器或工具函数。
    import time
    from functools import wraps
    
    def retry_on_failure(max_retries=3, delay=2, exceptions=(Exception,)):
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                last_exception = None
                for attempt in range(1, max_retries + 1):
                    try:
                        return func(*args, **kwargs)
                    except exceptions as e:
                        last_exception = e
                        if attempt < max_retries:
                            wait = delay * attempt  # 指数退避
                            logging.warning(f"函数 {func.__name__} 第{attempt}次尝试失败,{wait}秒后重试。错误: {e}")
                            time.sleep(wait)
                        else:
                            logging.error(f"函数 {func.__name__} 重试{max_retries}次后仍失败。")
                raise last_exception
            return wrapper
        return decorator
    
    # 在Step的execute方法或关键函数上使用
    class MyStep(Step):
        @retry_on_failure(max_retries=3, exceptions=(requests.exceptions.ConnectionError,))
        def execute(self, context):
            response = requests.get('https://api.example.com', timeout=10)
            response.raise_for_status()
            # ... 处理逻辑
    
  3. 资源清理 :确保在 finally 块或 Context 析构时,释放所有资源(数据库连接、浏览器驱动、临时文件等)。
  4. 配置外部化 :不要将数据库密码、API密钥等敏感信息硬编码在脚本中。使用配置文件(如 config.ini config.yaml )或环境变量来管理,模板初始化时读取。
    # 在模板开头或专门的配置模块中
    import os
    from dotenv import load_dotenv  # 需要 pip install python-dotenv
    load_dotenv()  # 从 .env 文件加载环境变量
    
    DB_HOST = os.getenv('DB_HOST', 'localhost')
    API_KEY = os.getenv('API_KEY')
    if not API_KEY:
        raise ValueError("请在环境变量中设置 API_KEY")
    

5. 避坑指南与最佳实践总结

在多年使用和推广这些模板的过程中,我和团队踩过不少坑,也积累了一些确保脚本长期稳定运行的经验。

5.1 常见问题与排查技巧

  1. 脚本在Cron或定时任务中不执行或报错

    • 问题 :手动运行正常,放到Cron里就失败。
    • 排查
      • 环境变量 :Cron的环境与用户Shell环境不同。在脚本开头显式设置 PATH PYTHONPATH ,或使用绝对路径调用Python和第三方库。
      • 工作目录 :Cron的执行目录通常是用户的家目录。脚本内涉及相对路径(如 ./logs/ )会出错。使用 os.path.dirname(os.path.abspath(__file__)) 获取脚本所在绝对路径,再基于此构建其他路径。
      • 输出重定向 :Cron默认会邮件发送命令输出。务必在脚本内做好日志记录到文件,并在Cron命令末尾添加 >> /path/to/cron.log 2>&1 来捕获所有输出。
  2. 日志文件无限增长,占满磁盘

    • 问题 :脚本每天运行,日志文件越来越大。
    • 解决 :使用 logging.handlers.RotatingFileHandler TimedRotatingFileHandler 。模板中可以升级 setup_logging 函数。
      from logging.handlers import RotatingFileHandler
      # 替换原来的 FileHandler
      file_handler = RotatingFileHandler(log_file, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8')
      # maxBytes=10MB, backupCount=5 表示保留最近5个10MB的日志备份
      
  3. 脚本被意外中断,导致数据处于中间状态

    • 问题 :执行到一半被 kill 或系统重启。
    • 解决
      • 原子操作 :尽可能让每个步骤的操作是原子的。例如,移动文件时,先写入临时文件,完成后用原子操作(如 os.rename )替换目标文件。
      • 状态标记 :对于第二套模板,实现状态持久化。每个步骤开始前,在外部存储(如一个小型SQLite数据库或一个状态文件)中标记“进行中”;成功后标记“完成”。脚本重启后,先检查状态,跳过已完成的步骤。
      • 幂等性设计 :努力让脚本可以安全地重复执行。例如,插入数据前先检查是否存在,或者使用“INSERT ... ON DUPLICATE KEY UPDATE”。
  4. 依赖的第三方服务不稳定

    • 问题 :调用外部API或数据库偶尔超时或失败。
    • 解决 :如前所述,实现 重试机制 ,并采用 指数退避 策略。同时,为关键的外部调用设置合理的 超时时间 ,并做好异常捕获和降级处理(如返回默认值、记录错误后继续执行后续非依赖步骤)。

5.2 让脚本更“专业”的细节

  1. 添加脚本使用说明 :在脚本文件开头,用多行注释写一个清晰的 --help 信息,说明功能、参数、示例。 argparse 会自动生成帮助,但一个手写的说明更友好。
  2. 版本管理 :在脚本中定义一个 __version__ 变量,方便追踪。
  3. 单元测试 :为脚本的核心逻辑函数编写单元测试(使用 pytest )。特别是第二套模板中的每个 Step ,由于其独立性,非常适合做单元测试。这能极大提升脚本的可靠性和可维护性。
  4. 代码风格与静态检查 :使用 black 自动格式化代码,使用 isort 整理导入语句,使用 flake8 pylint 进行代码风格检查。保持代码整洁,方便团队协作。
  5. 打包与分发 :如果脚本需要在多台机器上使用,可以考虑用 setuptools 打包,或者制作成Docker镜像,实现环境的一致性。

这两套模板,从简单的临时脚本到复杂的自动化流程,基本覆盖了测开日常工作中80%的自动化场景。它们提供的不是功能,而是一种 规范 起点 。当你拿到一个新需求,不再是从一片空白的编辑器开始,而是思考:“这个需求,是用模板一还是模板二来套?” 这个思考过程本身,就是效率的巨大提升。剩下的,就是把你天马行空的业务逻辑,填充到这些经过千锤百炼的“骨架”之中。

更多推荐