Python自动化脚本代码审查与质量检查实战
·
代码质量是软件开发的核心要素之一。自动化代码审查可以及早发现问题,保证代码风格一致,提高团队协作效率。本文将介绍如何用Python实现代码质量检查、代码风格审查和自动化linting。
常用代码检查工具
- Pylint:全面的Python代码分析器
- Flake8:代码风格检查工具
- Black:代码格式化工具
- Mypy:静态类型检查
- Bandit:安全检查
- Safety:依赖漏洞检查
基础代码检查框架
import subprocess
import json
import re
from pathlib import Path
from dataclasses import dataclass
from typing import List, Dict, Optional
from datetime import datetime
@dataclass
class Issue:
"""代码问题"""
file: str
line: int
column: int
severity: str # 'error', 'warning', 'info'
message: str
rule: str
tool: str
class CodeChecker:
"""代码检查器基类"""
name = "base"
def __init__(self, project_path: str):
self.project_path = Path(project_path)
self.issues: List[Issue] = []
def run(self) -> List[Issue]:
"""运行检查"""
raise NotImplementedError
def parse_output(self, output: str) -> List[Issue]:
"""解析输出"""
raise NotImplementedError
def save_report(self, output_file: str):
"""保存报告"""
with open(output_file, 'w', encoding='utf-8') as f:
json.dump([{
'file': issue.file,
'line': issue.line,
'column': issue.column,
'severity': issue.severity,
'message': issue.message,
'rule': issue.rule,
'tool': issue.tool
} for issue in self.issues], f, indent=2, ensure_ascii=False)
class PylintChecker(CodeChecker):
"""Pylint检查器"""
name = "pylint"
def run(self) -> List[Issue]:
"""运行Pylint检查"""
try:
result = subprocess.run(
['pylint', str(self.project_path),
'--output-format=json', '--exit-zero'],
capture_output=True,
text=True,
cwd=self.project_path
)
if result.stdout:
self.issues.extend(self.parse_output(result.stdout))
except FileNotFoundError:
print("Pylint未安装,请运行: pip install pylint")
return self.issues
def parse_output(self, output: str) -> List[Issue]:
"""解析Pylint JSON输出"""
issues = []
try:
for item in json.loads(output):
issues.append(Issue(
file=item.get('path', ''),
line=item.get('line', 0),
column=item.get('column', 0),
severity=self._map_severity(item.get('type', 'info')),
message=item.get('message', ''),
rule=item.get('message-id', ''),
tool='pylint'
))
except json.JSONDecodeError:
pass
return issues
def _map_severity(self, pylint_type: str) -> str:
"""映射严重级别"""
mapping = {
'error': 'error',
'warning': 'warning',
'convention': 'info',
'refactor': 'info'
}
return mapping.get(pylint_type, 'info')
class Flake8Checker(CodeChecker):
"""Flake8检查器"""
name = "flake8"
def run(self) -> List[Issue]:
"""运行Flake8检查"""
try:
result = subprocess.run(
['flake8', str(self.project_path),
'--format=json', '--max-line-length=120'],
capture_output=True,
text=True,
cwd=self.project_path
)
if result.stdout:
self.issues.extend(self.parse_output(result.stdout))
except FileNotFoundError:
print("Flake8未安装,请运行: pip install flake8")
return self.issues
def parse_output(self, output: str) -> List[Issue]:
"""解析Flake8 JSON输出"""
issues = []
try:
data = json.loads(output)
for file_path, errors in data.items():
for error in errors:
issues.append(Issue(
file=file_path,
line=error.get('row', 0),
column=error.get('col', 0),
severity='warning' if error.get('type') == 'W' else 'error',
message=error.get('text', ''),
rule=error.get('code', ''),
tool='flake8'
))
except json.JSONDecodeError:
# 解析普通格式
for line in output.strip().split('\n'):
if ':' in line:
parts = line.split(':')
if len(parts) >= 4:
issues.append(Issue(
file=parts[0],
line=int(parts[1]) if parts[1].isdigit() else 0,
column=int(parts[2]) if parts[2].isdigit() else 0,
severity='warning',
message=':'.join(parts[3:]).strip(),
rule='',
tool='flake8'
))
return issues
class MypyChecker(CodeChecker):
"""Mypy类型检查器"""
name = "mypy"
def run(self) -> List[Issue]:
"""运行Mypy检查"""
try:
result = subprocess.run(
['mypy', str(self.project_path), '--json-report', '/tmp/mypy.json'],
capture_output=True,
text=True,
cwd=self.project_path
)
# 解析stderr中的错误
self.issues.extend(self.parse_output(result.stderr))
except FileNotFoundError:
print("Mypy未安装,请运行: pip install mypy")
return self.issues
def parse_output(self, output: str) -> List[Issue]:
"""解析Mypy输出"""
issues = []
for line in output.split('\n'):
# 匹配格式: file.py:line: error: message
match = re.match(r'([^:]+):(\d+):\s*(\w+):\s*(.+)', line)
if match:
issues.append(Issue(
file=match.group(1),
line=int(match.group(2)),
column=0,
severity='warning' if match.group(3) == 'warning' else 'error',
message=match.group(4),
rule='type-error',
tool='mypy'
))
return issues
class BanditChecker(CodeChecker):
"""安全检查器"""
name = "bandit"
def run(self) -> List[Issue]:
"""运行Bandit安全检查"""
try:
result = subprocess.run(
['bandit', '-r', str(self.project_path), '-f', 'json'],
capture_output=True,
text=True,
cwd=self.project_path
)
if result.stdout:
self.issues.extend(self.parse_output(result.stdout))
except FileNotFoundError:
print("Bandit未安装,请运行: pip install bandit")
return self.issues
def parse_output(self, output: str) -> List[Issue]:
"""解析Bandit JSON输出"""
issues = []
try:
data = json.loads(output)
for result in data.get('results', []):
issues.append(Issue(
file=result.get('filename', ''),
line=result.get('line_number', 0),
column=0,
severity='warning',
message=result.get('issue_text', ''),
rule=result.get('test_id', ''),
tool='bandit'
))
except json.JSONDecodeError:
pass
return issues
代码质量检查器
class CodeQualityChecker:
"""综合代码质量检查器"""
def __init__(self, project_path: str):
self.project_path = Path(project_path)
self.checkers = [
PylintChecker(project_path),
Flake8Checker(project_path),
MypyChecker(project_path),
BanditChecker(project_path)
]
self.all_issues: List[Issue] = []
def run_all(self) -> List[Issue]:
"""运行所有检查器"""
self.all_issues = []
for checker in self.checkers:
print(f"运行 {checker.name}...")
try:
issues = checker.run()
self.all_issues.extend(issues)
print(f" 发现 {len(issues)} 个问题")
except Exception as e:
print(f" {checker.name} 运行失败: {e}")
return self.all_issues
def get_summary(self) -> Dict:
"""获取问题汇总"""
summary = {
'total': len(self.all_issues),
'by_severity': {
'error': 0,
'warning': 0,
'info': 0
},
'by_tool': {},
'by_file': {},
'critical_rules': []
}
for issue in self.all_issues:
summary['by_severity'][issue.severity] = \
summary['by_severity'].get(issue.severity, 0) + 1
summary['by_tool'][issue.tool] = \
summary['by_tool'].get(issue.tool, 0) + 1
summary['by_file'][issue.file] = \
summary['by_file'].get(issue.file, 0) + 1
if issue.severity == 'error':
summary['critical_rules'].append({
'file': issue.file,
'line': issue.line,
'message': issue.message,
'tool': issue.tool
})
return summary
def print_summary(self):
"""打印汇总信息"""
summary = self.get_summary()
print("\n" + "=" * 60)
print("代码质量检查汇总")
print("=" * 60)
print(f"总问题数: {summary['total']}")
print(f" 错误: {summary['by_severity']['error']}")
print(f" 警告: {summary['by_severity']['warning']}")
print(f" 信息: {summary['by_severity']['info']}")
print("\n按工具统计:")
for tool, count in summary['by_tool'].items():
print(f" {tool}: {count}")
print("\n问题最多的文件:")
for file, count in sorted(
summary['by_file'].items(),
key=lambda x: x[1],
reverse=True
)[:5]:
print(f" {file}: {count}")
if summary['critical_rules']:
print("\n严重错误 (需要优先修复):")
for rule in summary['critical_rules'][:10]:
print(f" [{rule['tool']}] {rule['file']}:{rule['line']}")
print(f" {rule['message']}")
def generate_report(self, output_file: str = 'code_quality_report.html'):
"""生成HTML报告"""
summary = self.get_summary()
html = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>代码质量检查报告</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
h1 {{ color: #333; }}
.summary {{ background: #f5f5f5; padding: 15px; border-radius: 5px; }}
.severity-error {{ color: #d32f2f; font-weight: bold; }}
.severity-warning {{ color: #f57c00; font-weight: bold; }}
.severity-info {{ color: #1976d2; }}
table {{ border-collapse: collapse; width: 100%; margin-top: 20px; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #4CAF50; color: white; }}
tr:nth-child(even) {{ background-color: #f2f2f2; }}
</style>
</head>
<body>
<h1>📊 代码质量检查报告</h1>
<p>生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<div class="summary">
<h2>汇总统计</h2>
<p>总问题数: <strong>{summary['total']}</strong></p>
<p class="severity-error">错误: {summary['by_severity']['error']}</p>
<p class="severity-warning">警告: {summary['by_severity']['warning']}</p>
<p class="severity-info">信息: {summary['by_severity']['info']}</p>
</div>
<h2>按工具统计</h2>
<table>
<tr><th>工具</th><th>问题数</th></tr>
{"".join(f"<tr><td>{k}</td><td>{v}</td></tr>" for k, v in summary['by_tool'].items())}
</table>
<h2>详细问题列表</h2>
<table>
<tr>
<th>文件</th>
<th>行号</th>
<th>严重性</th>
<th>问题</th>
<th>工具</th>
</tr>
{"".join(f"""
<tr>
<td>{issue.file}</td>
<td>{issue.line}</td>
<td class="severity-{issue.severity}">{issue.severity}</td>
<td>{issue.message}</td>
<td>{issue.tool}</td>
</tr>
""" for issue in self.all_issues[:100])}
</table>
</body>
</html>
"""
with open(output_file, 'w', encoding='utf-8') as f:
f.write(html)
print(f"\n报告已保存到: {output_file}")
代码风格检查
class CodeStyleChecker:
"""代码风格检查"""
def __init__(self, project_path: str):
self.project_path = Path(project_path)
self.issues: List[Issue] = []
def check_naming_conventions(self):
"""检查命名规范"""
patterns = {
'class': r'^class\s+([A-Z][a-zA-Z0-9]*)',
'function': r'^def\s+([a-z][a-z0-9_]*)',
'constant': r'^([A-Z][A-Z0-9_]*)\s*=',
'variable': r'^([a-z][a-z0-9_]*)\s*='
}
for py_file in self.project_path.rglob('*.py'):
with open(py_file, 'r', encoding='utf-8') as f:
for i, line in enumerate(f, 1):
# 跳过注释和字符串
if line.strip().startswith('#'):
continue
for pattern_name, pattern in patterns.items():
match = re.search(pattern, line)
if match:
name = match.group(1)
# 检查是否符合规范
if pattern_name == 'class' and not name[0].isupper():
self.issues.append(Issue(
file=str(py_file),
line=i,
column=0,
severity='warning',
message=f"类名 '{name}' 应该以大写字母开头",
rule='naming-convention',
tool='style-checker'
))
def check_docstrings(self):
"""检查文档字符串"""
for py_file in self.project_path.rglob('*.py'):
with open(py_file, 'r', encoding='utf-8') as f:
content = f.read()
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
# 检查是否有docstring
docstring = ast.get_docstring(node)
if not docstring:
self.issues.append(Issue(
file=str(py_file),
line=node.lineno,
column=0,
severity='info',
message=f"{node.__class__.__name__} '{node.name}' 缺少文档字符串",
rule='missing-docstring',
tool='style-checker'
))
def check_line_length(self, max_length: int = 120):
"""检查行长度"""
for py_file in self.project_path.rglob('*.py'):
with open(py_file, 'r', encoding='utf-8') as f:
for i, line in enumerate(f, 1):
# 跳过注释和URL
stripped = line.rstrip()
if len(stripped) > max_length:
# 跳过包含URL的行
if 'http://' in stripped or 'https://' in stripped:
continue
self.issues.append(Issue(
file=str(py_file),
line=i,
column=max_length,
severity='info',
message=f"行长度 {len(stripped)} 超过限制 {max_length}",
rule='line-too-long',
tool='style-checker'
))
def check_complexity(self, max_complexity: int = 10):
"""检查代码复杂度"""
import ast
class ComplexityVisitor(ast.NodeVisitor):
def __init__(self):
self.complexities = []
self.current_complexity = 0
def visit_FunctionDef(self, node):
old_complexity = self.current_complexity
self.current_complexity = 1
self.generic_visit(node)
self.complexities.append({
'name': node.name,
'line': node.lineno,
'complexity': self.current_complexity
})
self.current_complexity = old_complexity
def visit_If(self, node):
self.current_complexity += 1
self.generic_visit(node)
def visit_For(self, node):
self.current_complexity += 1
self.generic_visit(node)
def visit_While(self, node):
self.current_complexity += 1
self.generic_visit(node)
def visit_With(self, node):
self.current_complexity += 1
self.generic_visit(node)
def visit_ExceptHandler(self, node):
self.current_complexity += 1
self.generic_visit(node)
def visit_BoolOp(self, node):
self.current_complexity += len(node.values) - 1
self.generic_visit(node)
for py_file in self.project_path.rglob('*.py'):
with open(py_file, 'r', encoding='utf-8') as f:
try:
tree = ast.parse(f.read())
visitor = ComplexityVisitor()
visitor.visit(tree)
for item in visitor.complexities:
if item['complexity'] > max_complexity:
self.issues.append(Issue(
file=str(py_file),
line=item['line'],
column=0,
severity='warning',
message=f"函数 '{item['name']}' 圈复杂度 {item['complexity']} 超过限制 {max_complexity}",
rule='too-complex',
tool='style-checker'
))
except SyntaxError:
pass
def run_all(self):
"""运行所有检查"""
print("检查命名规范...")
self.check_naming_conventions()
print("检查文档字符串...")
self.check_docstrings()
print("检查行长度...")
self.check_line_length()
print("检查代码复杂度...")
self.check_complexity()
return self.issues
Git钩子集成
class PreCommitHook:
"""Pre-commit钩子"""
def __init__(self, project_path: str):
self.project_path = Path(project_path)
self.hook_dir = self.project_path / '.git' / 'hooks'
def install_pre_commit(self):
"""安装pre-commit钩子"""
self.hook_dir.mkdir(parents=True, exist_ok=True)
hook_content = '''#!/bin/sh
# Pre-commit hook
echo "运行代码检查..."
cd "$(git rev-parse --show-toplevel)"
# 检查文件
STAGED_FILES=$(git diff --cached --name-only --diff-filter=ACM | grep '\\.py$')
if [ -z "$STAGED_FILES" ]; then
echo "没有暂存Python文件"
exit 0
fi
# 检查语法
python -m py_compile $STAGED_FILES
if [ $? -ne 0 ]; then
echo "语法错误,提交被拒绝"
exit 1
fi
# 运行flake8
flake8 --max-line-length=120 $STAGED_FILES
if [ $? -ne 0 ]; then
echo "Flake8检查失败,提交被拒绝"
exit 1
fi
echo "代码检查通过"
exit 0
'''
hook_file = self.hook_dir / 'pre-commit'
with open(hook_file, 'w', encoding='utf-8') as f:
f.write(hook_content)
hook_file.chmod(0o755)
print(f"Pre-commit钩子已安装到: {hook_file}")
def uninstall_pre_commit(self):
"""卸载pre-commit钩子"""
hook_file = self.hook_dir / 'pre-commit'
if hook_file.exists():
hook_file.unlink()
print("Pre-commit钩子已卸载")
使用示例
if __name__ == '__main__':
project_path = './my_project'
# 综合代码检查
print("=" * 60)
print("开始代码质量检查")
print("=" * 60)
checker = CodeQualityChecker(project_path)
checker.run_all()
checker.print_summary()
checker.generate_report('reports/code_quality.html')
# 代码风格检查
print("\n" + "=" * 60)
print("检查代码风格")
print("=" * 60)
style_checker = CodeStyleChecker(project_path)
style_issues = style_checker.run_all()
print(f"发现 {len(style_issues)} 个风格问题")
# 安装Git钩子
hook = PreCommitHook(project_path)
hook.install_pre_commit()
CI/CD集成配置
# .github/workflows/code-quality.yml
name: Code Quality
on: [push, pull_request]
jobs:
quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install pylint flake8 mypy bandit
- name: Run Pylint
run: pylint src/ --exit-zero || true
- name: Run Flake8
run: flake8 src/ --max-line-length=120
- name: Run Mypy
run: mypy src/ || true
- name: Run Bandit
run: bandit -r src/ || true
- name: Check code complexity
run: |
pip install radon
radon cc src/ -a -k
总结
自动化代码审查是保证代码质量的重要手段。通过集成多种检查工具,可以全面覆盖代码的各个方面:语法、风格、类型、安全性等。建议将代码检查集成到CI/CD流程中,确保每次提交都经过检查。
更多推荐
所有评论(0)