代码质量是软件开发的核心要素之一。自动化代码审查可以及早发现问题,保证代码风格一致,提高团队协作效率。本文将介绍如何用Python实现代码质量检查、代码风格审查和自动化linting。

常用代码检查工具

  • Pylint:全面的Python代码分析器
  • Flake8:代码风格检查工具
  • Black:代码格式化工具
  • Mypy:静态类型检查
  • Bandit:安全检查
  • Safety:依赖漏洞检查

基础代码检查框架

import subprocess
import json
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime

@dataclass
class Issue:
    """代码问题"""
    file: str
    line: int
    column: int
    severity: str  # 'error', 'warning', 'info'
    message: str
    rule: str
    tool: str

class CodeChecker:
    """代码检查器基类"""
    
    name = "base"
    
    def __init__(self, project_path: str):
        self.project_path = Path(project_path)
        self.issues: List[Issue] = []
    
    def run(self) -> List[Issue]:
        """运行检查"""
        raise NotImplementedError
    
    def parse_output(self, output: str) -> List[Issue]:
        """解析输出"""
        raise NotImplementedError
    
    def save_report(self, output_file: str):
        """保存报告"""
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump([{
                'file': issue.file,
                'line': issue.line,
                'column': issue.column,
                'severity': issue.severity,
                'message': issue.message,
                'rule': issue.rule,
                'tool': issue.tool
            } for issue in self.issues], f, indent=2, ensure_ascii=False)


class PylintChecker(CodeChecker):
    """Pylint检查器"""
    
    name = "pylint"
    
    def run(self) -> List[Issue]:
        """运行Pylint检查"""
        try:
            result = subprocess.run(
                ['pylint', str(self.project_path), 
                 '--output-format=json', '--exit-zero'],
                capture_output=True,
                text=True,
                cwd=self.project_path
            )
            
            if result.stdout:
                self.issues.extend(self.parse_output(result.stdout))
            
        except FileNotFoundError:
            print("Pylint未安装,请运行: pip install pylint")
        
        return self.issues
    
    def parse_output(self, output: str) -> List[Issue]:
        """解析Pylint JSON输出"""
        issues = []
        
        try:
            for item in json.loads(output):
                issues.append(Issue(
                    file=item.get('path', ''),
                    line=item.get('line', 0),
                    column=item.get('column', 0),
                    severity=self._map_severity(item.get('type', 'info')),
                    message=item.get('message', ''),
                    rule=item.get('message-id', ''),
                    tool='pylint'
                ))
        except json.JSONDecodeError:
            pass
        
        return issues
    
    def _map_severity(self, pylint_type: str) -> str:
        """映射严重级别"""
        mapping = {
            'error': 'error',
            'warning': 'warning',
            'convention': 'info',
            'refactor': 'info'
        }
        return mapping.get(pylint_type, 'info')


class Flake8Checker(CodeChecker):
    """Flake8检查器"""
    
    name = "flake8"
    
    def run(self) -> List[Issue]:
        """运行Flake8检查"""
        try:
            result = subprocess.run(
                ['flake8', str(self.project_path), 
                 '--format=json', '--max-line-length=120'],
                capture_output=True,
                text=True,
                cwd=self.project_path
            )
            
            if result.stdout:
                self.issues.extend(self.parse_output(result.stdout))
            
        except FileNotFoundError:
            print("Flake8未安装,请运行: pip install flake8")
        
        return self.issues
    
    def parse_output(self, output: str) -> List[Issue]:
        """解析Flake8 JSON输出"""
        issues = []
        
        try:
            data = json.loads(output)
            for file_path, errors in data.items():
                for error in errors:
                    issues.append(Issue(
                        file=file_path,
                        line=error.get('row', 0),
                        column=error.get('col', 0),
                        severity='warning' if error.get('type') == 'W' else 'error',
                        message=error.get('text', ''),
                        rule=error.get('code', ''),
                        tool='flake8'
                    ))
        except json.JSONDecodeError:
            # 解析普通格式
            for line in output.strip().split('\n'):
                if ':' in line:
                    parts = line.split(':')
                    if len(parts) >= 4:
                        issues.append(Issue(
                            file=parts[0],
                            line=int(parts[1]) if parts[1].isdigit() else 0,
                            column=int(parts[2]) if parts[2].isdigit() else 0,
                            severity='warning',
                            message=':'.join(parts[3:]).strip(),
                            rule='',
                            tool='flake8'
                        ))
        
        return issues


class MypyChecker(CodeChecker):
    """Mypy类型检查器"""
    
    name = "mypy"
    
    def run(self) -> List[Issue]:
        """运行Mypy检查"""
        try:
            result = subprocess.run(
                ['mypy', str(self.project_path), '--json-report', '/tmp/mypy.json'],
                capture_output=True,
                text=True,
                cwd=self.project_path
            )
            
            # 解析stderr中的错误
            self.issues.extend(self.parse_output(result.stderr))
            
        except FileNotFoundError:
            print("Mypy未安装,请运行: pip install mypy")
        
        return self.issues
    
    def parse_output(self, output: str) -> List[Issue]:
        """解析Mypy输出"""
        issues = []
        
        for line in output.split('\n'):
            # 匹配格式: file.py:line: error: message
            match = re.match(r'([^:]+):(\d+):\s*(\w+):\s*(.+)', line)
            if match:
                issues.append(Issue(
                    file=match.group(1),
                    line=int(match.group(2)),
                    column=0,
                    severity='warning' if match.group(3) == 'warning' else 'error',
                    message=match.group(4),
                    rule='type-error',
                    tool='mypy'
                ))
        
        return issues


class BanditChecker(CodeChecker):
    """安全检查器"""
    
    name = "bandit"
    
    def run(self) -> List[Issue]:
        """运行Bandit安全检查"""
        try:
            result = subprocess.run(
                ['bandit', '-r', str(self.project_path), '-f', 'json'],
                capture_output=True,
                text=True,
                cwd=self.project_path
            )
            
            if result.stdout:
                self.issues.extend(self.parse_output(result.stdout))
            
        except FileNotFoundError:
            print("Bandit未安装,请运行: pip install bandit")
        
        return self.issues
    
    def parse_output(self, output: str) -> List[Issue]:
        """解析Bandit JSON输出"""
        issues = []
        
        try:
            data = json.loads(output)
            for result in data.get('results', []):
                issues.append(Issue(
                    file=result.get('filename', ''),
                    line=result.get('line_number', 0),
                    column=0,
                    severity='warning',
                    message=result.get('issue_text', ''),
                    rule=result.get('test_id', ''),
                    tool='bandit'
                ))
        except json.JSONDecodeError:
            pass
        
        return issues

代码质量检查器

class CodeQualityChecker:
    """综合代码质量检查器"""
    
    def __init__(self, project_path: str):
        self.project_path = Path(project_path)
        self.checkers = [
            PylintChecker(project_path),
            Flake8Checker(project_path),
            MypyChecker(project_path),
            BanditChecker(project_path)
        ]
        self.all_issues: List[Issue] = []
    
    def run_all(self) -> List[Issue]:
        """运行所有检查器"""
        self.all_issues = []
        
        for checker in self.checkers:
            print(f"运行 {checker.name}...")
            try:
                issues = checker.run()
                self.all_issues.extend(issues)
                print(f"  发现 {len(issues)} 个问题")
            except Exception as e:
                print(f"  {checker.name} 运行失败: {e}")
        
        return self.all_issues
    
    def get_summary(self) -> Dict:
        """获取问题汇总"""
        summary = {
            'total': len(self.all_issues),
            'by_severity': {
                'error': 0,
                'warning': 0,
                'info': 0
            },
            'by_tool': {},
            'by_file': {},
            'critical_rules': []
        }
        
        for issue in self.all_issues:
            summary['by_severity'][issue.severity] = \
                summary['by_severity'].get(issue.severity, 0) + 1
            
            summary['by_tool'][issue.tool] = \
                summary['by_tool'].get(issue.tool, 0) + 1
            
            summary['by_file'][issue.file] = \
                summary['by_file'].get(issue.file, 0) + 1
            
            if issue.severity == 'error':
                summary['critical_rules'].append({
                    'file': issue.file,
                    'line': issue.line,
                    'message': issue.message,
                    'tool': issue.tool
                })
        
        return summary
    
    def print_summary(self):
        """打印汇总信息"""
        summary = self.get_summary()
        
        print("\n" + "=" * 60)
        print("代码质量检查汇总")
        print("=" * 60)
        print(f"总问题数: {summary['total']}")
        print(f"  错误: {summary['by_severity']['error']}")
        print(f"  警告: {summary['by_severity']['warning']}")
        print(f"  信息: {summary['by_severity']['info']}")
        
        print("\n按工具统计:")
        for tool, count in summary['by_tool'].items():
            print(f"  {tool}: {count}")
        
        print("\n问题最多的文件:")
        for file, count in sorted(
            summary['by_file'].items(),
            key=lambda x: x[1],
            reverse=True
        )[:5]:
            print(f"  {file}: {count}")
        
        if summary['critical_rules']:
            print("\n严重错误 (需要优先修复):")
            for rule in summary['critical_rules'][:10]:
                print(f"  [{rule['tool']}] {rule['file']}:{rule['line']}")
                print(f"    {rule['message']}")
    
    def generate_report(self, output_file: str = 'code_quality_report.html'):
        """生成HTML报告"""
        summary = self.get_summary()
        
        html = f"""
<!DOCTYPE html>
<html>
<head>
    <meta charset="UTF-8">
    <title>代码质量检查报告</title>
    <style>
        body {{ font-family: Arial, sans-serif; margin: 20px; }}
        h1 {{ color: #333; }}
        .summary {{ background: #f5f5f5; padding: 15px; border-radius: 5px; }}
        .severity-error {{ color: #d32f2f; font-weight: bold; }}
        .severity-warning {{ color: #f57c00; font-weight: bold; }}
        .severity-info {{ color: #1976d2; }}
        table {{ border-collapse: collapse; width: 100%; margin-top: 20px; }}
        th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
        th {{ background-color: #4CAF50; color: white; }}
        tr:nth-child(even) {{ background-color: #f2f2f2; }}
    </style>
</head>
<body>
    <h1>📊 代码质量检查报告</h1>
    <p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
    
    <div class="summary">
        <h2>汇总统计</h2>
        <p>总问题数: <strong>{summary['total']}</strong></p>
        <p class="severity-error">错误: {summary['by_severity']['error']}</p>
        <p class="severity-warning">警告: {summary['by_severity']['warning']}</p>
        <p class="severity-info">信息: {summary['by_severity']['info']}</p>
    </div>
    
    <h2>按工具统计</h2>
    <table>
        <tr><th>工具</th><th>问题数</th></tr>
        {"".join(f"<tr><td>{k}</td><td>{v}</td></tr>" for k, v in summary['by_tool'].items())}
    </table>
    
    <h2>详细问题列表</h2>
    <table>
        <tr>
            <th>文件</th>
            <th>行号</th>
            <th>严重性</th>
            <th>问题</th>
            <th>工具</th>
        </tr>
        {"".join(f"""
        <tr>
            <td>{issue.file}</td>
            <td>{issue.line}</td>
            <td class="severity-{issue.severity}">{issue.severity}</td>
            <td>{issue.message}</td>
            <td>{issue.tool}</td>
        </tr>
        """ for issue in self.all_issues[:100])}
    </table>
</body>
</html>
"""
        
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(html)
        
        print(f"\n报告已保存到: {output_file}")

代码风格检查

class CodeStyleChecker:
    """代码风格检查"""
    
    def __init__(self, project_path: str):
        self.project_path = Path(project_path)
        self.issues: List[Issue] = []
    
    def check_naming_conventions(self):
        """检查命名规范"""
        patterns = {
            'class': r'^class\s+([A-Z][a-zA-Z0-9]*)',
            'function': r'^def\s+([a-z][a-z0-9_]*)',
            'constant': r'^([A-Z][A-Z0-9_]*)\s*=',
            'variable': r'^([a-z][a-z0-9_]*)\s*='
        }
        
        for py_file in self.project_path.rglob('*.py'):
            with open(py_file, 'r', encoding='utf-8') as f:
                for i, line in enumerate(f, 1):
                    # 跳过注释和字符串
                    if line.strip().startswith('#'):
                        continue
                    
                    for pattern_name, pattern in patterns.items():
                        match = re.search(pattern, line)
                        if match:
                            name = match.group(1)
                            # 检查是否符合规范
                            if pattern_name == 'class' and not name[0].isupper():
                                self.issues.append(Issue(
                                    file=str(py_file),
                                    line=i,
                                    column=0,
                                    severity='warning',
                                    message=f"类名 '{name}' 应该以大写字母开头",
                                    rule='naming-convention',
                                    tool='style-checker'
                                ))
    
    def check_docstrings(self):
        """检查文档字符串"""
        for py_file in self.project_path.rglob('*.py'):
            with open(py_file, 'r', encoding='utf-8') as f:
                content = f.read()
                tree = ast.parse(content)
                
                for node in ast.walk(tree):
                    if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
                        # 检查是否有docstring
                        docstring = ast.get_docstring(node)
                        if not docstring:
                            self.issues.append(Issue(
                                file=str(py_file),
                                line=node.lineno,
                                column=0,
                                severity='info',
                                message=f"{node.__class__.__name__} '{node.name}' 缺少文档字符串",
                                rule='missing-docstring',
                                tool='style-checker'
                            ))
    
    def check_line_length(self, max_length: int = 120):
        """检查行长度"""
        for py_file in self.project_path.rglob('*.py'):
            with open(py_file, 'r', encoding='utf-8') as f:
                for i, line in enumerate(f, 1):
                    # 跳过注释和URL
                    stripped = line.rstrip()
                    if len(stripped) > max_length:
                        # 跳过包含URL的行
                        if 'http://' in stripped or 'https://' in stripped:
                            continue
                        
                        self.issues.append(Issue(
                            file=str(py_file),
                            line=i,
                            column=max_length,
                            severity='info',
                            message=f"行长度 {len(stripped)} 超过限制 {max_length}",
                            rule='line-too-long',
                            tool='style-checker'
                        ))
    
    def check_complexity(self, max_complexity: int = 10):
        """检查代码复杂度"""
        import ast
        
        class ComplexityVisitor(ast.NodeVisitor):
            def __init__(self):
                self.complexities = []
                self.current_complexity = 0
            
            def visit_FunctionDef(self, node):
                old_complexity = self.current_complexity
                self.current_complexity = 1
                
                self.generic_visit(node)
                
                self.complexities.append({
                    'name': node.name,
                    'line': node.lineno,
                    'complexity': self.current_complexity
                })
                
                self.current_complexity = old_complexity
            
            def visit_If(self, node):
                self.current_complexity += 1
                self.generic_visit(node)
            
            def visit_For(self, node):
                self.current_complexity += 1
                self.generic_visit(node)
            
            def visit_While(self, node):
                self.current_complexity += 1
                self.generic_visit(node)
            
            def visit_With(self, node):
                self.current_complexity += 1
                self.generic_visit(node)
            
            def visit_ExceptHandler(self, node):
                self.current_complexity += 1
                self.generic_visit(node)
            
            def visit_BoolOp(self, node):
                self.current_complexity += len(node.values) - 1
                self.generic_visit(node)
        
        for py_file in self.project_path.rglob('*.py'):
            with open(py_file, 'r', encoding='utf-8') as f:
                try:
                    tree = ast.parse(f.read())
                    visitor = ComplexityVisitor()
                    visitor.visit(tree)
                    
                    for item in visitor.complexities:
                        if item['complexity'] > max_complexity:
                            self.issues.append(Issue(
                                file=str(py_file),
                                line=item['line'],
                                column=0,
                                severity='warning',
                                message=f"函数 '{item['name']}' 圈复杂度 {item['complexity']} 超过限制 {max_complexity}",
                                rule='too-complex',
                                tool='style-checker'
                            ))
                except SyntaxError:
                    pass
    
    def run_all(self):
        """运行所有检查"""
        print("检查命名规范...")
        self.check_naming_conventions()
        
        print("检查文档字符串...")
        self.check_docstrings()
        
        print("检查行长度...")
        self.check_line_length()
        
        print("检查代码复杂度...")
        self.check_complexity()
        
        return self.issues

Git钩子集成

class PreCommitHook:
    """Pre-commit钩子"""
    
    def __init__(self, project_path: str):
        self.project_path = Path(project_path)
        self.hook_dir = self.project_path / '.git' / 'hooks'
    
    def install_pre_commit(self):
        """安装pre-commit钩子"""
        self.hook_dir.mkdir(parents=True, exist_ok=True)
        
        hook_content = '''#!/bin/sh
# Pre-commit hook

echo "运行代码检查..."

cd "$(git rev-parse --show-toplevel)"

# 检查文件
STAGED_FILES=$(git diff --cached --name-only --diff-filter=ACM | grep '\\.py$')

if [ -z "$STAGED_FILES" ]; then
    echo "没有暂存Python文件"
    exit 0
fi

# 检查语法
python -m py_compile $STAGED_FILES
if [ $? -ne 0 ]; then
    echo "语法错误,提交被拒绝"
    exit 1
fi

# 运行flake8
flake8 --max-line-length=120 $STAGED_FILES
if [ $? -ne 0 ]; then
    echo "Flake8检查失败,提交被拒绝"
    exit 1
fi

echo "代码检查通过"
exit 0
'''
        
        hook_file = self.hook_dir / 'pre-commit'
        with open(hook_file, 'w', encoding='utf-8') as f:
            f.write(hook_content)
        
        hook_file.chmod(0o755)
        print(f"Pre-commit钩子已安装到: {hook_file}")
    
    def uninstall_pre_commit(self):
        """卸载pre-commit钩子"""
        hook_file = self.hook_dir / 'pre-commit'
        if hook_file.exists():
            hook_file.unlink()
            print("Pre-commit钩子已卸载")

使用示例

if __name__ == '__main__':
    project_path = './my_project'
    
    # 综合代码检查
    print("=" * 60)
    print("开始代码质量检查")
    print("=" * 60)
    
    checker = CodeQualityChecker(project_path)
    checker.run_all()
    checker.print_summary()
    checker.generate_report('reports/code_quality.html')
    
    # 代码风格检查
    print("\n" + "=" * 60)
    print("检查代码风格")
    print("=" * 60)
    
    style_checker = CodeStyleChecker(project_path)
    style_issues = style_checker.run_all()
    print(f"发现 {len(style_issues)} 个风格问题")
    
    # 安装Git钩子
    hook = PreCommitHook(project_path)
    hook.install_pre_commit()

CI/CD集成配置

# .github/workflows/code-quality.yml
name: Code Quality

on: [push, pull_request]

jobs:
  quality:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'
    
    - name: Install dependencies
      run: |
        pip install pylint flake8 mypy bandit
    
    - name: Run Pylint
      run: pylint src/ --exit-zero || true
    
    - name: Run Flake8
      run: flake8 src/ --max-line-length=120
    
    - name: Run Mypy
      run: mypy src/ || true
    
    - name: Run Bandit
      run: bandit -r src/ || true
    
    - name: Check code complexity
      run: |
        pip install radon
        radon cc src/ -a -k

总结

自动化代码审查是保证代码质量的重要手段。通过集成多种检查工具,可以全面覆盖代码的各个方面:语法、风格、类型、安全性等。建议将代码检查集成到CI/CD流程中,确保每次提交都经过检查。

更多推荐