在软件开发领域,需求管理一直是项目成功的核心关键。随着项目复杂度提升和团队规模扩大,传统依赖文档、邮件和会议的需求管理方式显露出明显短板:版本混乱、协作困难、知识难以沉淀。更值得注意的是,行业内能够真正实现需求结构化、资产化,并结合AI技术进行智能化辅助的系统并不多见。我们公司是一家垂直领域专攻企业级需求与非企业级需求管理的公司, 我们公司的大模型应用连接:http://aipoc.chtech.cn:8880/#/login 欢迎试用。

最近原OpenAI的karpathy大佬在github上开源了一个NanoChat代码(https://github.com/karpathy/nanochat),这是一个基于 PyTorch 的聊天 AI 项目,包含训练、推理和评估等功能。这个架构体现了当前开源 LLM 项目的最佳实践,是理解大语言模型工作原理的绝佳案例。让我们一起来学习一下!

第一次讲解:项目概述和基础工具类

项目整体结构

┌─────────────────────────────────────────────────────────────┐
│                   应用层 (Application Layer)                 │
├─────────────────────────────────────────────────────────────┤
│ 训练脚本 (train.py) │ 推理脚本 (chat.py) │ 评估脚本 (eval.py) │
└─────────────────────────────────────────────────────────────┘
                              │
┌─────────────────────────────────────────────────────────────┐
│                   核心引擎层 (Core Engine Layer)             │
├─────────────────────────────────────────────────────────────┤
│   GPT模型  │   推理引擎   │   分词器    │   优化器    │ 评估器 │
└─────────────────────────────────────────────────────────────┘
                              │
┌─────────────────────────────────────────────────────────────┐
│                   数据层 (Data Layer)                        │
├─────────────────────────────────────────────────────────────┤
│  数据加载器  │   数据集    │  检查点管理  │   工具执行   │
└─────────────────────────────────────────────────────────────┘
                              │
┌─────────────────────────────────────────────────────────────┐
│                   基础设施层 (Infrastructure Layer)          │
├─────────────────────────────────────────────────────────────┤
│   分布式训练  │   日志系统   │   配置管理  │   报告系统   │    │
└─────────────────────────────────────────────────────────────┘

nanochat是一个完整的AI对话系统,包含:

  • 基础模型训练 (GPT架构)

  • 分词器 (BPE tokenizer)

  • 优化器 (AdamW和Muon)

  • 评估系统 (CORE metric)

  • 推理引擎 (高效生成)

  • 工具调用 (Python执行)

  • 报告系统 (训练监控)

1. common.py - 通用工具函数

"""
Common utilities for nanochat.
通用的工具函数和配置
"""

import os
import re
import logging
import torch
import torch.distributed as dist

class ColoredFormatter(logging.Formatter):
    """自定义格式化器,为日志消息添加颜色"""
    # ANSI颜色代码
    COLORS = {
        'DEBUG': '\033[36m',    # 青色
        'INFO': '\033[32m',     # 绿色  
        'WARNING': '\033[33m',  # 黄色
        'ERROR': '\033[31m',    # 红色
        'CRITICAL': '\033[35m', # 洋红色
    }
    RESET = '\033[0m'
    BOLD = '\033[1m'
    
    def format(self, record):
        # 为日志级别名称添加颜色
        levelname = record.levelname
        if levelname in self.COLORS:
            record.levelname = f"{self.COLORS[levelname]}{self.BOLD}{levelname}{self.RESET}"
        # 格式化消息
        message = super().format(record)
        # 为消息中的特定部分添加颜色
        if levelname == 'INFO':
            # 高亮数字和百分比
            message = re.sub(r'(\d+\.?\d*\s*(?:GB|MB|%|docs))', rf'{self.BOLD}\1{self.RESET}', message)
            message = re.sub(r'(Shard \d+)', rf'{self.COLORS["INFO"]}{self.BOLD}\1{self.RESET}', message)
        return message

def setup_default_logging():
    """设置默认的日志配置"""
    handler = logging.StreamHandler()
    handler.setFormatter(ColoredFormatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
    logging.basicConfig(
        level=logging.INFO,
        handlers=[handler]
    )

setup_default_logging()
logger = logging.getLogger(__name__)

def get_base_dir():
    """获取项目基础目录,默认在 ~/.cache/nanochat"""
    if os.environ.get("NANOCHAT_BASE_DIR"):
        nanochat_dir = os.environ.get("NANOCHAT_BASE_DIR")
    else:
        home_dir = os.path.expanduser("~")
        cache_dir = os.path.join(home_dir, ".cache")
        nanochat_dir = os.path.join(cache_dir, "nanochat")
    os.makedirs(nanochat_dir, exist_ok=True)
    return nanochat_dir

def print0(s="",**kwargs):
    """只在rank 0的进程打印,避免分布式训练中的重复输出"""
    ddp_rank = int(os.environ.get('RANK', 0))
    if ddp_rank == 0:
        print(s, **kwargs)

def is_ddp():
    """检查是否在分布式数据并行(DDP)环境中运行"""
    # TODO 是否有更合适的方法
    return int(os.environ.get('RANK', -1)) != -1

def get_dist_info():
    """获取分布式训练的信息"""
    if is_ddp():
        assert all(var in os.environ for var in ['RANK', 'LOCAL_RANK', 'WORLD_SIZE'])
        ddp_rank = int(os.environ['RANK'])
        ddp_local_rank = int(os.environ['LOCAL_RANK'])
        ddp_world_size = int(os.environ['WORLD_SIZE'])
        return True, ddp_rank, ddp_local_rank, ddp_world_size
    else:
        return False, 0, 0, 1

def compute_init():
    """基础初始化函数,在多个地方重复使用"""

    # 目前需要CUDA
    assert torch.cuda.is_available(), "分布式运行目前需要CUDA"

    # 可重现性设置
    torch.manual_seed(42)
    torch.cuda.manual_seed(42)
    # 暂时跳过完全可重现性,可能稍后研究速度下降问题
    # torch.use_deterministic_algorithms(True)
    # torch.backends.cudnn.deterministic = True
    # torch.backends.cudnn.benchmark = False

    # 精度设置
    torch.set_float32_matmul_precision("high") # 对矩阵乘法使用tf32而不是fp32

    # 分布式设置:分布式数据并行(DDP),可选
    ddp, ddp_rank, ddp_local_rank, ddp_world_size = get_dist_info()
    if ddp:
        device = torch.device("cuda", ddp_local_rank)
        torch.cuda.set_device(device) # 让"cuda"默认使用此设备
        dist.init_process_group(backend="nccl", device_id=device)
        dist.barrier()
    else:
        device = torch.device("cuda")

    if ddp_rank == 0:
        logger.info(f"分布式世界大小: {ddp_world_size}")

    return ddp, ddp_rank, ddp_local_rank, ddp_world_size, device

def compute_cleanup():
    """与compute_init配套的清理函数,在脚本退出前清理"""
    if is_ddp():
        dist.destroy_process_group()

class DummyWandb:
    """如果我们不想使用wandb但希望有相同的接口签名时很有用"""
    def __init__(self):
        pass
    def log(self, *args, **kwargs):
        pass
    def finish(self):
        pass

关键概念解释:

  1. 分布式训练 (DDP):

    • 在多GPU上训练模型的技术

    • RANK: 进程编号

    • WORLD_SIZE: 总进程数

    • LOCAL_RANK: 本地GPU编号

  2. 日志系统:

    • 使用颜色来区分不同级别的日志信息

    • 只在主进程(rank 0)输出,避免重复

  3. 设备管理:

    • 自动检测CUDA设备

    • 设置合适的精度(tf32比fp32快但精度稍低)

  4. 路径管理:

    • 统一的数据和缓存目录管理

    • 支持环境变量覆盖默认路径

这个文件是整个项目的基础设施,提供了日志、分布式训练、路径管理等通用功能。这些工具函数会在项目的其他部分频繁使用。

Logo

纵情码海钱塘涌,杭州开发者创新动! 属于杭州的开发者社区!致力于为杭州地区的开发者提供学习、合作和成长的机会;同时也为企业交流招聘提供舞台!

更多推荐