Python自动化脚本进阶日志配置:结构化日志与分布式追踪实战
·
大家好,我是扣扣。之前我们聊过日志基础配置,今天来聊聊进阶玩法——结构化日志和分布式追踪。
为什么需要结构化日志?
传统的文本日志是这样的:
2024-04-22 10:30:15 [INFO] User login success, user_id=12345
2024-04-22 10:30:16 [WARNING] Retry request 3 times
2024-04-22 10:30:17 [ERROR] Database connection failed
看起来清晰,但机器解析困难。如果你想统计某个用户的所有操作、或者按时间聚合错误日志,文本日志就力不从心了。
结构化日志把日志变成JSON格式:
{"time": "2024-04-22T10:30:15", "level": "INFO", "event": "user_login", "user_id": 12345, "ip": "192.168.1.100"}
这样可以用任何日志分析工具(ELK、Loki、Splunk)轻松查询和统计。
实战一:使用loguru实现结构化日志
loguru是我最喜欢的Python日志库,比标准库logging好用太多了。
from loguru import logger
import sys
import json
from pathlib import Path
from datetime import datetime
class StructuredLogger:
"""结构化日志记录器"""
def __init__(self, log_dir: str = './logs', rotation: str = '00:00'):
self.log_dir = Path(log_dir)
self.log_dir.mkdir(parents=True, exist_ok=True)
# 清除默认处理器
logger.remove()
# 控制台输出(美化格式)
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="DEBUG",
colorize=True
)
# 文件输出(JSON格式)
log_file = self.log_dir / f"app_{datetime.now().strftime('%Y%m%d')}.log"
logger.add(
log_file,
format="{message}",
level="INFO",
rotation=rotation,
retention="30 days",
compression="zip",
serialize=True # 关键!输出JSON格式
)
# 错误日志单独文件
error_file = self.log_dir / f"error_{datetime.now().strftime('%Y%m%d')}.log"
logger.add(
error_file,
format="{message}",
level="ERROR",
rotation=rotation,
retention="90 days",
serialize=True
)
def log_event(self, event: str, **kwargs):
"""记录结构化事件"""
logger.info(
json.dumps({
"event": event,
"timestamp": datetime.now().isoformat(),
**kwargs
})
)
def log_error(self, error: Exception, context: dict = None):
"""记录错误,包含堆栈信息"""
error_info = {
"event": "error",
"error_type": type(error).__name__,
"error_message": str(error),
"timestamp": datetime.now().isoformat()
}
if context:
error_info["context"] = context
logger.exception(json.dumps(error_info))
# 使用示例
if __name__ == '__main__':
log = StructuredLogger('./logs')
# 普通日志
log.log_event("user_login", user_id=12345, username="test_user", ip="192.168.1.100")
# 记录业务操作
log.log_event(
"file_processed",
file_path="/data/file.pdf",
file_size=1024000,
processing_time_ms=2500,
status="success"
)
# 记录错误
try:
result = 1 / 0
except Exception as e:
log.log_error(e, {"task_id": "task_001", "operation": "division"})
生成的JSON日志:
{"record": {"elapsed": {"seconds": 0, "nanoseconds": 120000}, "exception": null, "extra": {}, "file": {"name": "test.py", "path": "D:/test.py"}, "function": "<module>", "level": {"name": "INFO", "no": 20}, "message": "{\"event\": \"user_login\", \"timestamp\": \"2024-04-22T10:30:15.123\", \"user_id\": 12345}", "name": "__main__", "line": 58, "process": {"id": 1234, "name": "MainProcess"}, "thread": {"id": 12345, "name": "MainThread"}, "time": {"year": 2024, "month": 4, "day": 22, "hour": 10, "minute": 30, "second": 15, "microsecond": 123000}}}
实战二:分布式追踪请求ID
在微服务架构中,一个请求会经过多个服务,如何追踪请求的完整链路?这就需要请求ID(trace_id)。
import uuid
import contextvars
from functools import wraps
import requests
from loguru import logger
# 使用contextvars存储请求上下文(线程安全)
trace_id_var = contextvars.ContextVar('trace_id', default=None)
user_id_var = contextvars.ContextVar('user_id', default=None)
class DistributedLogger:
"""分布式追踪日志"""
def __init__(self, service_name: str):
self.service_name = service_name
self._setup_logger()
def _setup_logger(self):
"""配置日志"""
logger.remove()
logger.add(
lambda msg: print(self._format_message(msg)),
format="{message}",
level="DEBUG"
)
def _format_message(self, message):
"""添加追踪信息"""
trace_id = trace_id_var.get() or "no-trace"
return f"[{self.service_name}] [{trace_id}] {message}"
def generate_trace_id(self):
"""生成新的追踪ID"""
return str(uuid.uuid4())
def set_context(self, trace_id: str = None, user_id: str = None):
"""设置追踪上下文"""
if trace_id:
trace_id_var.set(trace_id)
if user_id:
user_id_var.set(user_id)
def info(self, msg: str, **kwargs):
"""记录info日志"""
logger.info(self._add_context(msg, kwargs))
def error(self, msg: str, **kwargs):
"""记录error日志"""
logger.error(self._add_context(msg, kwargs))
def _add_context(self, msg: str, extra: dict) -> str:
"""添加上下文信息"""
context = {
"trace_id": trace_id_var.get(),
"user_id": user_id_var.get(),
"service": self.service_name,
**extra
}
return f"{msg} | {json.dumps(context)}"
class RequestTracker:
"""请求追踪装饰器"""
def __init__(self, logger: DistributedLogger):
self.logger = logger
def track(self, func):
@wraps(func)
def wrapper(*args, **kwargs):
trace_id = trace_id_var.get() or self.logger.generate_trace_id()
trace_id_var.set(trace_id)
self.logger.info(f"开始执行 {func.__name__}",
args=str(args)[:100],
kwargs=str(kwargs)[:100])
try:
result = func(*args, **kwargs)
self.logger.info(f"完成 {func.__name__}", status="success")
return result
except Exception as e:
self.logger.error(f"执行失败 {func.__name__}",
error=str(e),
error_type=type(e).__name__)
raise
return wrapper
# HTTP客户端自动传递trace_id
class TracedSession(requests.Session):
"""带追踪功能的HTTP会话"""
def __init__(self, logger: DistributedLogger):
super().__init__()
self.logger = logger
def request(self, method, url, **kwargs):
trace_id = trace_id_var.get()
# 注入trace_id到请求头
headers = kwargs.get('headers', {})
headers['X-Trace-ID'] = trace_id or ''
kwargs['headers'] = headers
self.logger.info(f"发送请求 {method} {url}", trace_id=trace_id)
try:
response = super().request(method, url, **kwargs)
self.logger.info(f"收到响应 {response.status_code}",
url=url,
status_code=response.status_code)
return response
except Exception as e:
self.logger.error(f"请求失败 {url}", error=str(e))
raise
# 使用示例
if __name__ == '__main__':
logger = DistributedLogger('order-service')
tracker = RequestTracker(logger)
session = TracedSession(logger)
# 模拟一个请求的处理流程
trace_id = logger.generate_trace_id()
logger.set_context(trace_id=trace_id, user_id="user_12345")
@tracker.track
def create_order(order_id: str, amount: float):
logger.info("创建订单", order_id=order_id, amount=amount)
# 调用库存服务
@tracker.track
def check_inventory(product_id: str):
logger.info("检查库存", product_id=product_id)
return True
inventory_ok = check_inventory("prod_001")
if inventory_ok:
logger.info("订单创建成功", order_id=order_id)
else:
logger.error("库存不足", order_id=order_id)
create_order("order_001", 99.9)
实战三:日志性能优化
高频写入日志可能成为性能瓶颈,这里有几个优化技巧:
from loguru import logger
import atexit
import sys
class OptimizedLogger:
"""优化性能的日志记录器"""
def __init__(self, log_file: str):
self.buffer = []
self.buffer_size = 100 # 缓冲100条后写入
self.log_file = log_file
logger.add(
self._write_buffer,
format="{message}",
level="INFO",
enqueue=True, # 使用独立进程写入,避免阻塞
rotation="100 MB",
retention="7 days",
compression="zip"
)
def _write_buffer(self, message):
"""写入缓冲(由loguru异步调用)"""
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(message + '\n')
def log_batch(self, events: list):
"""批量记录日志(更高效)"""
for event in events:
logger.info(event)
# 采样日志:避免日志过多
class SampledLogger:
"""采样日志,只记录一定比例的日志"""
def __init__(self, sample_rate: float = 0.1):
self.sample_rate = sample_rate
import random
self.random = random
def should_log(self) -> bool:
"""判断是否应该记录"""
return self.random.random() < self.sample_rate
def debug(self, msg: str, **kwargs):
if self.should_log():
logger.debug(msg, **kwargs)
# 条件日志
class ConditionalLogger:
"""条件日志,只有满足条件时才记录"""
def __init__(self, enabled: bool = True):
self.enabled = enabled
self.stats = {'logged': 0, 'skipped': 0}
def info(self, msg: str, condition: bool = True, **kwargs):
if self.enabled and condition:
logger.info(msg, **kwargs)
self.stats['logged'] += 1
else:
self.stats['skipped'] += 1
def get_stats(self):
return self.stats
日志分析实战
生成日志后,如何分析?这里提供一个简单的分析脚本:
import json
from pathlib import Path
from collections import Counter
from datetime import datetime
def analyze_logs(log_file: str):
"""分析日志文件"""
error_counts = Counter()
event_counts = Counter()
errors_by_hour = Counter()
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
try:
data = json.loads(line.strip())
# 提取消息内容
if 'message' in data:
msg = json.loads(data['message'])
if 'event' in msg:
event_counts[msg['event']] += 1
if 'error_type' in msg:
error_counts[msg['error_type']] += 1
# 统计错误发生时间
if 'timestamp' in msg:
hour = msg['timestamp'][11:13]
errors_by_hour[hour] += 1
except json.JSONDecodeError:
continue
# 输出统计结果
print("=== 事件统计 ===")
for event, count in event_counts.most_common(10):
print(f"{event}: {count}")
print("\n=== 错误类型统计 ===")
for error_type, count in error_counts.most_common(10):
print(f"{error_type}: {count}")
print("\n=== 错误时间分布 ===")
for hour, count in sorted(errors_by_hour.items()):
print(f"{hour}:00 - {count}个错误")
if __name__ == '__main__':
analyze_logs('./logs/app_20240422.log')
总结
-
结构化日志优势
- 机器可解析,方便统计分析
- 支持复杂查询
- 便于接入日志系统
-
分布式追踪要点
- 唯一请求ID贯穿全程
- 自动注入到HTTP头
- 日志中包含追踪上下文
-
性能优化技巧
- 使用enqueue异步写入
- 批量日志缓冲
- 采样/条件日志
好了,今天的分享就到这里。日志虽小,但做好了对排查问题至关重要。我是扣扣,有问题留言~🙃
更多推荐
所有评论(0)