“程序出错了怎么办?”——这是写自动化脚本必须考虑的问题。好的异常处理能让脚本更健壮,出问题时有据可查。这篇文章分享异常处理的最佳实践和实用技巧。

一、异常处理基础回顾

# 基本语法
try:
    result = risky_operation()
except SomeError as e:
    handle_error(e)
except (Error1, Error2) as e:
    handle_multiple(e)
except Exception as e:
    handle_all_other(e)
else:
    # try块成功执行后执行
    print("操作成功")
finally:
    # 无论是否异常都执行
    cleanup()

二、异常处理原则

原则1:具体捕获,宽泛记录

# 不好:捕获所有异常
try:
    process_data()
except Exception as e:
    print("出错了")

# 好:具体捕获
try:
    process_data()
except FileNotFoundError as e:
    print(f"文件不存在: {e.filename}")
except PermissionError as e:
    print(f"没有权限: {e}")
except Exception as e:
    print(f"未知错误: {e}")

原则2:异常要记录,不是吞掉

import logging

logger = logging.getLogger(__name__)

# 不好:静默吞掉异常
try:
    send_notification()
except:
    pass

# 好:记录异常
try:
    send_notification()
except Exception as e:
    logger.error(f"发送通知失败: {e}", exc_info=True)

原则3:异常要传播还是处理?

# 需要传播的异常:让调用者知道
def read_config():
    try:
        with open('config.json') as f:
            return json.load(f)
    except FileNotFoundError:
        # 配置缺失是严重问题,应该让调用者处理
        raise ConfigNotFoundError("config.json not found")

# 需要处理的异常:本地处理更合适
def get_default_config():
    try:
        return read_config()
    except ConfigNotFoundError:
        # 找不到配置就用默认值
        return {'debug': False, 'timeout': 30}

三、实用异常处理模式

模式1:重试机制

import time
from functools import wraps

def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            current_delay = delay
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    attempts += 1
                    if attempts >= max_attempts:
                        raise
                    logger.warning(
                        f"{func.__name__} 失败,{current_delay}秒后重试 "
                        f"({attempts}/{max_attempts}): {e}"
                    )
                    time.sleep(current_delay)
                    current_delay *= backoff
        return wrapper
    return decorator

# 使用示例
@retry(max_attempts=3, delay=1, exceptions=(ConnectionError, TimeoutError))
def fetch_data(url):
    return requests.get(url)

模式2:上下文管理器

from contextlib import contextmanager

@contextmanager
def managed_resource(name, timeout=30):
    """资源管理上下文"""
    logger.info(f"获取资源: {name}")
    resource = acquire_resource(name)
    try:
        yield resource
    except Exception as e:
        logger.error(f"资源 {name} 操作失败: {e}")
        raise
    finally:
        logger.info(f"释放资源: {name}")
        release_resource(resource)

# 使用
with managed_resource('database') as db:
    db.execute(query)

模式3:优雅降级

def get_data_with_fallback(primary_func, fallback_func, *args, **kwargs):
    """带降级方案的数据获取"""
    try:
        return primary_func(*args, **kwargs)
    except Exception as e:
        logger.warning(f"主方案失败,使用降级方案: {e}")
        try:
            return fallback_func(*args, **kwargs)
        except Exception as e2:
            logger.error(f"降级方案也失败: {e2}")
            return None

# 使用
data = get_data_with_fallback(
    primary_func=fetch_from_api,
    fallback_func=fetch_from_cache,
    url='https://api.example.com/data'
)

模式4:批量操作容错

from dataclasses import dataclass
from typing import List, Callable, Any

@dataclass
class Result:
    success: bool
    data: Any = None
    error: str = None

def batch_process_with_errors(items: List, process_func: Callable) -> List[Result]:
    """批量处理,返回每个结果"""
    results = []
    for item in items:
        try:
            data = process_func(item)
            results.append(Result(success=True, data=data))
        except Exception as e:
            results.append(Result(success=False, error=str(e)))
            logger.error(f"处理 {item} 失败: {e}")
    return results

def get_summary(results: List[Result]) -> dict:
    """获取处理摘要"""
    success = sum(1 for r in results if r.success)
    failed = sum(1 for r in results if not r.success)
    return {'total': len(results), 'success': success, 'failed': failed}

四、自定义异常

class AutomationError(Exception):
    """自动化脚本基础异常"""
    pass

class ConfigError(AutomationError):
    """配置错误"""
    pass

class NetworkError(AutomationError):
    """网络相关错误"""
    pass

class DataProcessingError(AutomationError):
    """数据处理错误"""
    pass

# 使用
raise ConfigError("配置文件格式错误")

五、全局异常处理

在脚本入口处设置全局异常处理器:

import sys
import traceback
import logging
from pathlib import Path

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler('error.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

def global_exception_handler(exc_type, exc_value, exc_traceback):
    """全局异常处理器"""
    if issubclass(exc_type, KeyboardInterrupt):
        # 用户中断,正常退出
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return
    
    error_msg = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
    logger.critical(f"未捕获的异常:\n{error_msg}")
    
    # 写入错误报告文件
    report_file = Path('crash_report.txt')
    report_file.write_text(
        f"时间: {datetime.now()}\n"
        f"异常类型: {exc_type.__name__}\n"
        f"异常信息: {exc_value}\n"
        f"完整堆栈:\n{error_msg}"
    )

# 注册全局处理器
sys.excepthook = global_exception_handler

def main():
    # 你的主逻辑
    pass

if __name__ == '__main__':
    try:
        main()
    except Exception as e:
        logger.exception(f"程序异常退出: {e}")
        sys.exit(1)

六、日志记录最佳实践

import logging
from logging.handlers import RotatingFileHandler

def setup_logger(name, log_file='app.log', level=logging.INFO):
    """配置日志记录器"""
    logger = logging.getLogger(name)
    logger.setLevel(level)
    
    # 避免重复添加handler
    if logger.handlers:
        return logger
    
    # 文件处理器(自动轮转)
    file_handler = RotatingFileHandler(
        log_file, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8'
    )
    file_handler.setLevel(level)
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(level)
    
    # 格式
    formatter = logging.Formatter(
        '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
    )
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

# 使用
logger = setup_logger(__name__)
logger.debug('调试信息')
logger.info('普通信息')
logger.warning('警告信息')
logger.error('错误信息', exc_info=True)  # 包含堆栈
logger.critical('严重错误')

七、验证和测试异常处理

import pytest

def test_retry_success():
    """测试重试成功"""
    attempts = 0
    
    @retry(max_attempts=3, delay=0.1)
    def succeed_on_third():
        nonlocal attempts
        attempts += 1
        if attempts < 3:
            raise ValueError("not ready")
        return "success"
    
    assert succeed_on_third() == "success"
    assert attempts == 3

def test_retry_all_fail():
    """测试重试全部失败"""
    @retry(max_attempts=3, delay=0.1)
    def always_fail():
        raise ValueError("always fails")
    
    with pytest.raises(ValueError):
        always_fail()

def test_batch_process_partial_failure():
    """测试批量处理部分失败"""
    def may_fail(x):
        if x == 2:
            raise ValueError("bad luck")
        return x * 2
    
    results = batch_process_with_errors([1, 2, 3], may_fail)
    summary = get_summary(results)
    
    assert summary['total'] == 3
    assert summary['success'] == 2
    assert summary['failed'] == 1

总结

异常处理的最佳实践:

  1. 具体捕获:不要用except Exception
  2. 记录异常:用logging记录详细信息
  3. 不要吞掉异常:除非有意为之
  4. 重试机制:对于临时性故障
  5. 优雅降级:有备选方案
  6. 全局处理:捕获未预期的异常
  7. 测试异常处理:确保覆盖边界情况

好的异常处理让脚本在出问题时有据可查,能自动恢复,是自动化脚本健壮性的保障。

更多推荐