大家好,我是扣扣。之前我们聊过日志基础配置,今天来聊聊进阶玩法——结构化日志和分布式追踪。

为什么需要结构化日志?

传统的文本日志是这样的:

2024-04-22 10:30:15 [INFO] User login success, user_id=12345
2024-04-22 10:30:16 [WARNING] Retry request 3 times
2024-04-22 10:30:17 [ERROR] Database connection failed

看起来清晰,但机器解析困难。如果你想统计某个用户的所有操作、或者按时间聚合错误日志,文本日志就力不从心了。

结构化日志把日志变成JSON格式:

{"time": "2024-04-22T10:30:15", "level": "INFO", "event": "user_login", "user_id": 12345, "ip": "192.168.1.100"}

这样可以用任何日志分析工具(ELK、Loki、Splunk)轻松查询和统计。

实战一:使用loguru实现结构化日志

loguru是我最喜欢的Python日志库,比标准库logging好用太多了。

from loguru import logger
import sys
import json
from pathlib import Path
from datetime import datetime

class StructuredLogger:
    """结构化日志记录器"""
    
    def __init__(self, log_dir: str = './logs', rotation: str = '00:00'):
        self.log_dir = Path(log_dir)
        self.log_dir.mkdir(parents=True, exist_ok=True)
        
        # 清除默认处理器
        logger.remove()
        
        # 控制台输出(美化格式)
        logger.add(
            sys.stdout,
            format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
            level="DEBUG",
            colorize=True
        )
        
        # 文件输出(JSON格式)
        log_file = self.log_dir / f"app_{datetime.now().strftime('%Y%m%d')}.log"
        logger.add(
            log_file,
            format="{message}",
            level="INFO",
            rotation=rotation,
            retention="30 days",
            compression="zip",
            serialize=True  # 关键!输出JSON格式
        )
        
        # 错误日志单独文件
        error_file = self.log_dir / f"error_{datetime.now().strftime('%Y%m%d')}.log"
        logger.add(
            error_file,
            format="{message}",
            level="ERROR",
            rotation=rotation,
            retention="90 days",
            serialize=True
        )
    
    def log_event(self, event: str, **kwargs):
        """记录结构化事件"""
        logger.info(
            json.dumps({
                "event": event,
                "timestamp": datetime.now().isoformat(),
                **kwargs
            })
        )
    
    def log_error(self, error: Exception, context: dict = None):
        """记录错误,包含堆栈信息"""
        error_info = {
            "event": "error",
            "error_type": type(error).__name__,
            "error_message": str(error),
            "timestamp": datetime.now().isoformat()
        }
        if context:
            error_info["context"] = context
        
        logger.exception(json.dumps(error_info))

# 使用示例
if __name__ == '__main__':
    log = StructuredLogger('./logs')
    
    # 普通日志
    log.log_event("user_login", user_id=12345, username="test_user", ip="192.168.1.100")
    
    # 记录业务操作
    log.log_event(
        "file_processed",
        file_path="/data/file.pdf",
        file_size=1024000,
        processing_time_ms=2500,
        status="success"
    )
    
    # 记录错误
    try:
        result = 1 / 0
    except Exception as e:
        log.log_error(e, {"task_id": "task_001", "operation": "division"})

生成的JSON日志:

{"record": {"elapsed": {"seconds": 0, "nanoseconds": 120000}, "exception": null, "extra": {}, "file": {"name": "test.py", "path": "D:/test.py"}, "function": "<module>", "level": {"name": "INFO", "no": 20}, "message": "{\"event\": \"user_login\", \"timestamp\": \"2024-04-22T10:30:15.123\", \"user_id\": 12345}", "name": "__main__", "line": 58, "process": {"id": 1234, "name": "MainProcess"}, "thread": {"id": 12345, "name": "MainThread"}, "time": {"year": 2024, "month": 4, "day": 22, "hour": 10, "minute": 30, "second": 15, "microsecond": 123000}}}

实战二:分布式追踪请求ID

在微服务架构中,一个请求会经过多个服务,如何追踪请求的完整链路?这就需要请求ID(trace_id)。

import uuid
import contextvars
from functools import wraps
import requests
from loguru import logger

# 使用contextvars存储请求上下文(线程安全)
trace_id_var = contextvars.ContextVar('trace_id', default=None)
user_id_var = contextvars.ContextVar('user_id', default=None)

class DistributedLogger:
    """分布式追踪日志"""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self._setup_logger()
    
    def _setup_logger(self):
        """配置日志"""
        logger.remove()
        logger.add(
            lambda msg: print(self._format_message(msg)),
            format="{message}",
            level="DEBUG"
        )
    
    def _format_message(self, message):
        """添加追踪信息"""
        trace_id = trace_id_var.get() or "no-trace"
        return f"[{self.service_name}] [{trace_id}] {message}"
    
    def generate_trace_id(self):
        """生成新的追踪ID"""
        return str(uuid.uuid4())
    
    def set_context(self, trace_id: str = None, user_id: str = None):
        """设置追踪上下文"""
        if trace_id:
            trace_id_var.set(trace_id)
        if user_id:
            user_id_var.set(user_id)
    
    def info(self, msg: str, **kwargs):
        """记录info日志"""
        logger.info(self._add_context(msg, kwargs))
    
    def error(self, msg: str, **kwargs):
        """记录error日志"""
        logger.error(self._add_context(msg, kwargs))
    
    def _add_context(self, msg: str, extra: dict) -> str:
        """添加上下文信息"""
        context = {
            "trace_id": trace_id_var.get(),
            "user_id": user_id_var.get(),
            "service": self.service_name,
            **extra
        }
        return f"{msg} | {json.dumps(context)}"

class RequestTracker:
    """请求追踪装饰器"""
    
    def __init__(self, logger: DistributedLogger):
        self.logger = logger
    
    def track(self, func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            trace_id = trace_id_var.get() or self.logger.generate_trace_id()
            trace_id_var.set(trace_id)
            
            self.logger.info(f"开始执行 {func.__name__}", 
                           args=str(args)[:100],
                           kwargs=str(kwargs)[:100])
            
            try:
                result = func(*args, **kwargs)
                self.logger.info(f"完成 {func.__name__}", status="success")
                return result
            except Exception as e:
                self.logger.error(f"执行失败 {func.__name__}", 
                                error=str(e), 
                                error_type=type(e).__name__)
                raise
        
        return wrapper

# HTTP客户端自动传递trace_id
class TracedSession(requests.Session):
    """带追踪功能的HTTP会话"""
    
    def __init__(self, logger: DistributedLogger):
        super().__init__()
        self.logger = logger
    
    def request(self, method, url, **kwargs):
        trace_id = trace_id_var.get()
        
        # 注入trace_id到请求头
        headers = kwargs.get('headers', {})
        headers['X-Trace-ID'] = trace_id or ''
        kwargs['headers'] = headers
        
        self.logger.info(f"发送请求 {method} {url}", trace_id=trace_id)
        
        try:
            response = super().request(method, url, **kwargs)
            self.logger.info(f"收到响应 {response.status_code}", 
                          url=url, 
                          status_code=response.status_code)
            return response
        except Exception as e:
            self.logger.error(f"请求失败 {url}", error=str(e))
            raise

# 使用示例
if __name__ == '__main__':
    logger = DistributedLogger('order-service')
    tracker = RequestTracker(logger)
    session = TracedSession(logger)
    
    # 模拟一个请求的处理流程
    trace_id = logger.generate_trace_id()
    logger.set_context(trace_id=trace_id, user_id="user_12345")
    
    @tracker.track
    def create_order(order_id: str, amount: float):
        logger.info("创建订单", order_id=order_id, amount=amount)
        
        # 调用库存服务
        @tracker.track
        def check_inventory(product_id: str):
            logger.info("检查库存", product_id=product_id)
            return True
        
        inventory_ok = check_inventory("prod_001")
        
        if inventory_ok:
            logger.info("订单创建成功", order_id=order_id)
        else:
            logger.error("库存不足", order_id=order_id)
    
    create_order("order_001", 99.9)

实战三:日志性能优化

高频写入日志可能成为性能瓶颈,这里有几个优化技巧:

from loguru import logger
import atexit
import sys

class OptimizedLogger:
    """优化性能的日志记录器"""
    
    def __init__(self, log_file: str):
        self.buffer = []
        self.buffer_size = 100  # 缓冲100条后写入
        self.log_file = log_file
        
        logger.add(
            self._write_buffer,
            format="{message}",
            level="INFO",
            enqueue=True,  # 使用独立进程写入,避免阻塞
            rotation="100 MB",
            retention="7 days",
            compression="zip"
        )
    
    def _write_buffer(self, message):
        """写入缓冲(由loguru异步调用)"""
        with open(self.log_file, 'a', encoding='utf-8') as f:
            f.write(message + '\n')
    
    def log_batch(self, events: list):
        """批量记录日志(更高效)"""
        for event in events:
            logger.info(event)

# 采样日志:避免日志过多
class SampledLogger:
    """采样日志,只记录一定比例的日志"""
    
    def __init__(self, sample_rate: float = 0.1):
        self.sample_rate = sample_rate
        import random
        self.random = random
    
    def should_log(self) -> bool:
        """判断是否应该记录"""
        return self.random.random() < self.sample_rate
    
    def debug(self, msg: str, **kwargs):
        if self.should_log():
            logger.debug(msg, **kwargs)

# 条件日志
class ConditionalLogger:
    """条件日志,只有满足条件时才记录"""
    
    def __init__(self, enabled: bool = True):
        self.enabled = enabled
        self.stats = {'logged': 0, 'skipped': 0}
    
    def info(self, msg: str, condition: bool = True, **kwargs):
        if self.enabled and condition:
            logger.info(msg, **kwargs)
            self.stats['logged'] += 1
        else:
            self.stats['skipped'] += 1
    
    def get_stats(self):
        return self.stats

日志分析实战

生成日志后,如何分析?这里提供一个简单的分析脚本:

import json
from pathlib import Path
from collections import Counter
from datetime import datetime

def analyze_logs(log_file: str):
    """分析日志文件"""
    
    error_counts = Counter()
    event_counts = Counter()
    errors_by_hour = Counter()
    
    with open(log_file, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                data = json.loads(line.strip())
                
                # 提取消息内容
                if 'message' in data:
                    msg = json.loads(data['message'])
                    
                    if 'event' in msg:
                        event_counts[msg['event']] += 1
                    
                    if 'error_type' in msg:
                        error_counts[msg['error_type']] += 1
                    
                    # 统计错误发生时间
                    if 'timestamp' in msg:
                        hour = msg['timestamp'][11:13]
                        errors_by_hour[hour] += 1
                        
            except json.JSONDecodeError:
                continue
    
    # 输出统计结果
    print("=== 事件统计 ===")
    for event, count in event_counts.most_common(10):
        print(f"{event}: {count}")
    
    print("\n=== 错误类型统计 ===")
    for error_type, count in error_counts.most_common(10):
        print(f"{error_type}: {count}")
    
    print("\n=== 错误时间分布 ===")
    for hour, count in sorted(errors_by_hour.items()):
        print(f"{hour}:00 - {count}个错误")

if __name__ == '__main__':
    analyze_logs('./logs/app_20240422.log')

总结

  1. 结构化日志优势

    • 机器可解析,方便统计分析
    • 支持复杂查询
    • 便于接入日志系统
  2. 分布式追踪要点

    • 唯一请求ID贯穿全程
    • 自动注入到HTTP头
    • 日志中包含追踪上下文
  3. 性能优化技巧

    • 使用enqueue异步写入
    • 批量日志缓冲
    • 采样/条件日志

好了,今天的分享就到这里。日志虽小,但做好了对排查问题至关重要。我是扣扣,有问题留言~🙃

更多推荐