NLP大语言模型数据准备完整指南

1. 概述

大语言模型(LLM)的性能很大程度上取决于训练数据的质量和规模。数据准备是整个模型训练流程中最关键且耗时的环节,通常占据整个项目60-80%的时间。

1.1 数据准备流程概览

原始数据收集 → 数据清洗 → 数据预处理 → 分词/编码 → 数据集构建 → 质量验证

2. 数据收集

2.1 数据源类型

公开数据集
  • Common Crawl: 网页爬虫数据,规模达PB级
  • Wikipedia: 高质量百科全书数据
  • BookCorpus: 图书文本数据
  • OpenWebText: Reddit高质量链接内容
  • C4 (Colossal Clean Crawled Corpus): Google清洗后的网页数据
  • The Pile: 825GB多源混合数据集
专有数据
  • 企业内部文档
  • 领域专业资料
  • 用户生成内容(需遵守隐私法规)

2.2 数据收集工具

# 常用爬虫框架
- Scrapy: 强大的爬虫框架
- BeautifulSoup: HTML/XML解析
- Selenium: 动态网页爬取
- Requests: HTTP请求库

# 数据下载工具
- wget/curl: 命令行下载工具
- Hugging Face Datasets: 数据集管理库
- TorchData: PyTorch数据加载工具

3. 数据清洗

3.1 去重处理

精确去重
# 使用哈希算法进行文档级去重
import hashlib
from collections import defaultdict

def exact_dedup(documents):
    seen_hashes = set()
    unique_docs = []
    
    for doc in documents:
        doc_hash = hashlib.sha256(doc.encode()).hexdigest()
        if doc_hash not in seen_hashes:
            seen_hashes.add(doc_hash)
            unique_docs.append(doc)
    
    return unique_docs
模糊去重
# 使用MinHash进行近似去重
from datasketch import MinHash, MinHashLSH

def fuzzy_dedup(documents, threshold=0.9):
    lsh = MinHashLSH(threshold=threshold)
    unique_docs = []
    
    for idx, doc in enumerate(documents):
        minhash = MinHash()
        for word in doc.split():
            minhash.update(word.encode('utf8'))
        
        if not lsh.query(minhash):
            lsh.insert(f"doc_{idx}", minhash)
            unique_docs.append(doc)
    
    return unique_docs

3.2 质量过滤

语言检测
from langdetect import detect_langs
import fasttext

# FastText语言检测模型
model = fasttext.load_model('lid.176.bin')

def filter_by_language(text, target_lang='zh'):
    predictions = model.predict(text)
    lang = predictions[0][0].replace('__label__', '')
    confidence = predictions[1][0]
    
    return lang == target_lang and confidence > 0.8
内容质量评估
  • 困惑度过滤: 移除困惑度异常高的文本
  • 长度过滤: 移除过短或过长的文档
  • 特殊字符比例: 控制特殊字符占比
  • 重复性检测: 检测文档内部重复
def quality_filter(text):
    # 长度过滤
    if len(text) < 100 or len(text) > 100000:
        return False
    
    # 特殊字符比例
    special_char_ratio = len([c for c in text if not c.isalnum()]) / len(text)
    if special_char_ratio > 0.5:
        return False
    
    # 重复行检测
    lines = text.split('\n')
    if len(set(lines)) / len(lines) < 0.7:  # 超过30%重复
        return False
    
    return True

3.3 隐私和敏感信息处理

import re
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine

# 使用正则表达式清理PII
def remove_pii(text):
    # 邮箱
    text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 
                   '[EMAIL]', text)
    # 电话号码
    text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
    # 身份证号
    text = re.sub(r'\b\d{17}[\dXx]\b', '[ID]', text)
    # IP地址
    text = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', '[IP]', text)
    
    return text

# 使用Presidio进行高级PII检测
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

def anonymize_text(text):
    results = analyzer.analyze(text=text, language='en')
    anonymized = anonymizer.anonymize(text=text, analyzer_results=results)
    return anonymized.text

4. 数据预处理

4.1 文本规范化

import unicodedata
import ftfy

def normalize_text(text):
    # 修复编码问题
    text = ftfy.fix_text(text)
    
    # Unicode规范化
    text = unicodedata.normalize('NFKC', text)
    
    # 空白字符规范化
    text = ' '.join(text.split())
    
    # 大小写处理(根据需求)
    # text = text.lower()
    
    return text

4.2 文本清理

import re
from bs4 import BeautifulSoup

def clean_text(text):
    # 移除HTML标签
    text = BeautifulSoup(text, 'html.parser').get_text()
    
    # 移除URLs
    text = re.sub(r'http[s]?://\S+', '', text)
    
    # 移除多余空白
    text = re.sub(r'\s+', ' ', text)
    
    # 移除控制字符
    text = ''.join(char for char in text if ord(char) >= 32)
    
    return text.strip()

5. 分词与编码

5.1 分词器类型

BPE (Byte Pair Encoding)
from tokenizers import ByteLevelBPETokenizer

# 训练BPE分词器
tokenizer = ByteLevelBPETokenizer()
tokenizer.train(
    files=['corpus.txt'],
    vocab_size=50000,
    min_frequency=2,
    special_tokens=['<s>', '</s>', '<pad>', '<unk>', '<mask>']
)
WordPiece
from tokenizers import BertWordPieceTokenizer

tokenizer = BertWordPieceTokenizer()
tokenizer.train(
    files=['corpus.txt'],
    vocab_size=30000,
    min_frequency=2,
    limit_alphabet=1000,
    wordpieces_prefix='##'
)
SentencePiece
import sentencepiece as spm

# 训练SentencePiece模型
spm.SentencePieceTrainer.train(
    input='corpus.txt',
    model_prefix='model',
    vocab_size=32000,
    character_coverage=0.9995,
    model_type='unigram',  # 或 'bpe'
    pad_id=3,
    unk_id=0,
    bos_id=1,
    eos_id=2
)

5.2 中文分词特殊处理

import jieba
from LAC import LAC

# Jieba分词
def chinese_tokenize_jieba(text):
    return list(jieba.cut(text))

# 百度LAC分词
lac = LAC(mode='seg')
def chinese_tokenize_lac(text):
    return lac.run(text)[0]

# 使用预训练模型的分词器
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained('bert-base-chinese')
tokens = tokenizer.tokenize(text)

6. 数据集构建

6.1 预训练数据格式

文本文件格式
# 每行一个文档,空行分隔
def create_pretraining_data(documents, output_file):
    with open(output_file, 'w', encoding='utf-8') as f:
        for doc in documents:
            f.write(doc + '\n\n')
JSONL格式
import json

def create_jsonl_dataset(documents, output_file):
    with open(output_file, 'w', encoding='utf-8') as f:
        for doc in documents:
            json_line = json.dumps({'text': doc}, ensure_ascii=False)
            f.write(json_line + '\n')

6.2 微调数据格式

指令微调格式
def create_instruction_data(instructions, inputs, outputs):
    dataset = []
    for inst, inp, out in zip(instructions, inputs, outputs):
        sample = {
            'instruction': inst,
            'input': inp,
            'output': out
        }
        dataset.append(sample)
    return dataset
对话格式
def create_dialogue_data(conversations):
    dataset = []
    for conv in conversations:
        formatted_conv = {
            'messages': [
                {'role': 'system', 'content': conv['system']},
                {'role': 'user', 'content': conv['user']},
                {'role': 'assistant', 'content': conv['assistant']}
            ]
        }
        dataset.append(formatted_conv)
    return dataset

7. 数据增强

7.1 回译增强

from transformers import MarianMTModel, MarianTokenizer

def back_translation(text, src_lang='zh', tgt_lang='en'):
    # 中译英
    model_name = f'Helsinki-NLP/opus-mt-{src_lang}-{tgt_lang}'
    tokenizer = MarianTokenizer.from_pretrained(model_name)
    model = MarianMTModel.from_pretrained(model_name)
    
    inputs = tokenizer(text, return_tensors="pt", padding=True)
    translated = model.generate(**inputs)
    en_text = tokenizer.decode(translated[0], skip_special_tokens=True)
    
    # 英译中
    model_name = f'Helsinki-NLP/opus-mt-{tgt_lang}-{src_lang}'
    tokenizer = MarianTokenizer.from_pretrained(model_name)
    model = MarianMTModel.from_pretrained(model_name)
    
    inputs = tokenizer(en_text, return_tensors="pt", padding=True)
    translated = model.generate(**inputs)
    augmented_text = tokenizer.decode(translated[0], skip_special_tokens=True)
    
    return augmented_text

7.2 同义词替换

import random
from synonyms import synonyms

def synonym_replacement(text, n=2):
    words = text.split()
    new_words = words.copy()
    random_word_list = list(set([word for word in words if word.isalpha()]))
    random.shuffle(random_word_list)
    
    num_replaced = 0
    for random_word in random_word_list:
        synonyms_list = synonyms(random_word)
        if len(synonyms_list) > 0:
            synonym = random.choice(synonyms_list)
            new_words = [synonym if word == random_word else word for word in new_words]
            num_replaced += 1
        if num_replaced >= n:
            break
    
    return ' '.join(new_words)

8. 技术栈与工具

8.1 数据处理框架

工具 用途 特点
Apache Spark 大规模数据处理 分布式计算,支持PB级数据
Dask Python并行计算 扩展pandas/numpy到大数据
Ray 分布式AI工作负载 高性能,易于扩展
Apache Beam 批流一体处理 统一的编程模型

8.2 NLP专用工具

工具 用途 特点
spaCy 工业级NLP 快速、准确、易用
NLTK NLP教学和研究 功能全面,文档丰富
Stanza 多语言NLP Stanford出品,学术质量
TextBlob 简单NLP任务 API简洁,适合初学者

8.3 深度学习框架集成

# PyTorch数据加载
from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):
    def __init__(self, texts, tokenizer, max_length=512):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_length = max_length
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        text = self.texts[idx]
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten()
        }

# 创建数据加载器
dataset = TextDataset(texts, tokenizer)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

8.4 数据版本管理

# DVC (Data Version Control)
dvc init
dvc add data/corpus.txt
dvc remote add -d storage s3://mybucket/path
dvc push

# Git LFS (Large File Storage)
git lfs track "*.bin"
git add .gitattributes
git add model.bin
git commit -m "Add model file"

9. 质量控制与验证

9.1 数据质量指标

import numpy as np
from collections import Counter

def calculate_data_statistics(texts):
    stats = {
        'total_documents': len(texts),
        'total_tokens': sum(len(text.split()) for text in texts),
        'avg_length': np.mean([len(text) for text in texts]),
        'std_length': np.std([len(text) for text in texts]),
        'vocabulary_size': len(set(' '.join(texts).split()))
    }
    
    # 词频分布
    all_words = ' '.join(texts).split()
    word_freq = Counter(all_words)
    stats['top_10_words'] = word_freq.most_common(10)
    
    return stats

9.2 数据集分割

from sklearn.model_selection import train_test_split

def split_dataset(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
    assert train_ratio + val_ratio + test_ratio == 1.0
    
    # 第一次分割:训练集和临时集
    train_data, temp_data = train_test_split(
        data, 
        test_size=1-train_ratio, 
        random_state=42
    )
    
    # 第二次分割:验证集和测试集
    val_data, test_data = train_test_split(
        temp_data, 
        test_size=test_ratio/(val_ratio+test_ratio), 
        random_state=42
    )
    
    return {
        'train': train_data,
        'validation': val_data,
        'test': test_data
    }

10. 最佳实践

10.1 数据处理Pipeline

class DataProcessingPipeline:
    def __init__(self, config):
        self.config = config
        self.tokenizer = self._load_tokenizer()
    
    def _load_tokenizer(self):
        return AutoTokenizer.from_pretrained(self.config['tokenizer'])
    
    def process(self, raw_data):
        # 1. 清洗
        cleaned_data = self.clean(raw_data)
        
        # 2. 去重
        deduped_data = self.deduplicate(cleaned_data)
        
        # 3. 过滤
        filtered_data = self.filter(deduped_data)
        
        # 4. 规范化
        normalized_data = self.normalize(filtered_data)
        
        # 5. 分词
        tokenized_data = self.tokenize(normalized_data)
        
        return tokenized_data
    
    def clean(self, data):
        return [clean_text(text) for text in data]
    
    def deduplicate(self, data):
        return exact_dedup(data)
    
    def filter(self, data):
        return [text for text in data if quality_filter(text)]
    
    def normalize(self, data):
        return [normalize_text(text) for text in data]
    
    def tokenize(self, data):
        return [self.tokenizer(text) for text in data]

10.2 性能优化建议

  1. 并行处理: 使用multiprocessing或Ray进行并行数据处理
  2. 批处理: 批量处理数据而非逐条处理
  3. 缓存机制: 缓存中间结果避免重复计算
  4. 内存管理: 使用生成器处理大文件,避免内存溢出
  5. 分布式存储: 使用HDFS或对象存储处理海量数据

10.3 常见问题与解决方案

问题 解决方案
内存不足 使用流式处理,分批加载数据
处理速度慢 并行化处理,使用更高效的算法
数据不平衡 采样策略,数据增强
编码问题 统一使用UTF-8,使用ftfy修复
分词不准确 使用领域特定词典,自定义分词规则

11. 监控与日志

import logging
from tqdm import tqdm

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('data_processing.log'),
        logging.StreamHandler()
    ]
)

def process_with_monitoring(data, process_func):
    logger = logging.getLogger(__name__)
    processed_data = []
    failed_items = []
    
    for item in tqdm(data, desc="Processing data"):
        try:
            result = process_func(item)
            processed_data.append(result)
        except Exception as e:
            logger.error(f"Failed to process item: {e}")
            failed_items.append(item)
    
    logger.info(f"Successfully processed: {len(processed_data)}")
    logger.info(f"Failed items: {len(failed_items)}")
    
    return processed_data, failed_items

12. 总结

数据准备是大语言模型训练的基础,直接影响模型的最终性能。关键要点:

  1. 数据质量优于数量: 高质量的小数据集往往优于低质量的大数据集

  2. 保持数据多样性: 确保数据覆盖不同领域、风格和难度

  3. 注重隐私保护: 严格处理PII信息,遵守相关法规

  4. 建立可复现的Pipeline: 版本控制、文档化、自动化

  5. 持续迭代优化: 根据模型表现不断改进数据处理策略

  6. 数据收集:介绍了常用的公开数据集(如Common Crawl、Wikipedia等)和数据收集工具

  7. 数据清洗

    • 精确去重和模糊去重技术
    • 质量过滤(语言检测、内容质量评估)
    • 隐私信息处理(PII检测和脱敏)
  8. 数据预处理:文本规范化、HTML清理、编码修复等技术

  9. 分词与编码:详细介绍了BPE、WordPiece、SentencePiece等主流分词器,以及中文分词的特殊处理

  10. 数据集构建:预训练和微调数据的不同格式要求

  11. 数据增强:回译、同义词替换等技术

  12. 技术栈

    • 大数据处理框架(Spark、Dask、Ray)
    • NLP工具(spaCy、NLTK、Stanza)
    • 深度学习框架集成(PyTorch、TensorFlow)
    • 数据版本管理(DVC、Git LFS)
  13. 质量控制:数据统计、数据集分割、监控日志等

  14. 最佳实践:完整的数据处理Pipeline设计和性能优化建议

Logo

惟楚有才,于斯为盛。欢迎来到长沙!!! 茶颜悦色、臭豆腐、CSDN和你一个都不能少~

更多推荐