NLP大语言模型数据准备
数据准备是大语言模型训练的基础,直接影响模型的最终性能。数据质量优于数量: 高质量的小数据集往往优于低质量的大数据集保持数据多样性: 确保数据覆盖不同领域、风格和难度注重隐私保护: 严格处理PII信息,遵守相关法规建立可复现的Pipeline: 版本控制、文档化、自动化持续迭代优化: 根据模型表现不断改进数据处理策略数据收集:介绍了常用的公开数据集(如Common Crawl、Wikipedia等
·
NLP大语言模型数据准备完整指南
1. 概述
大语言模型(LLM)的性能很大程度上取决于训练数据的质量和规模。数据准备是整个模型训练流程中最关键且耗时的环节,通常占据整个项目60-80%的时间。
1.1 数据准备流程概览
原始数据收集 → 数据清洗 → 数据预处理 → 分词/编码 → 数据集构建 → 质量验证
2. 数据收集
2.1 数据源类型
公开数据集
- Common Crawl: 网页爬虫数据,规模达PB级
- Wikipedia: 高质量百科全书数据
- BookCorpus: 图书文本数据
- OpenWebText: Reddit高质量链接内容
- C4 (Colossal Clean Crawled Corpus): Google清洗后的网页数据
- The Pile: 825GB多源混合数据集
专有数据
- 企业内部文档
- 领域专业资料
- 用户生成内容(需遵守隐私法规)
2.2 数据收集工具
# 常用爬虫框架
- Scrapy: 强大的爬虫框架
- BeautifulSoup: HTML/XML解析
- Selenium: 动态网页爬取
- Requests: HTTP请求库
# 数据下载工具
- wget/curl: 命令行下载工具
- Hugging Face Datasets: 数据集管理库
- TorchData: PyTorch数据加载工具
3. 数据清洗
3.1 去重处理
精确去重
# 使用哈希算法进行文档级去重
import hashlib
from collections import defaultdict
def exact_dedup(documents):
seen_hashes = set()
unique_docs = []
for doc in documents:
doc_hash = hashlib.sha256(doc.encode()).hexdigest()
if doc_hash not in seen_hashes:
seen_hashes.add(doc_hash)
unique_docs.append(doc)
return unique_docs
模糊去重
# 使用MinHash进行近似去重
from datasketch import MinHash, MinHashLSH
def fuzzy_dedup(documents, threshold=0.9):
lsh = MinHashLSH(threshold=threshold)
unique_docs = []
for idx, doc in enumerate(documents):
minhash = MinHash()
for word in doc.split():
minhash.update(word.encode('utf8'))
if not lsh.query(minhash):
lsh.insert(f"doc_{idx}", minhash)
unique_docs.append(doc)
return unique_docs
3.2 质量过滤
语言检测
from langdetect import detect_langs
import fasttext
# FastText语言检测模型
model = fasttext.load_model('lid.176.bin')
def filter_by_language(text, target_lang='zh'):
predictions = model.predict(text)
lang = predictions[0][0].replace('__label__', '')
confidence = predictions[1][0]
return lang == target_lang and confidence > 0.8
内容质量评估
- 困惑度过滤: 移除困惑度异常高的文本
- 长度过滤: 移除过短或过长的文档
- 特殊字符比例: 控制特殊字符占比
- 重复性检测: 检测文档内部重复
def quality_filter(text):
# 长度过滤
if len(text) < 100 or len(text) > 100000:
return False
# 特殊字符比例
special_char_ratio = len([c for c in text if not c.isalnum()]) / len(text)
if special_char_ratio > 0.5:
return False
# 重复行检测
lines = text.split('\n')
if len(set(lines)) / len(lines) < 0.7: # 超过30%重复
return False
return True
3.3 隐私和敏感信息处理
import re
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
# 使用正则表达式清理PII
def remove_pii(text):
# 邮箱
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'[EMAIL]', text)
# 电话号码
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
# 身份证号
text = re.sub(r'\b\d{17}[\dXx]\b', '[ID]', text)
# IP地址
text = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', '[IP]', text)
return text
# 使用Presidio进行高级PII检测
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
def anonymize_text(text):
results = analyzer.analyze(text=text, language='en')
anonymized = anonymizer.anonymize(text=text, analyzer_results=results)
return anonymized.text
4. 数据预处理
4.1 文本规范化
import unicodedata
import ftfy
def normalize_text(text):
# 修复编码问题
text = ftfy.fix_text(text)
# Unicode规范化
text = unicodedata.normalize('NFKC', text)
# 空白字符规范化
text = ' '.join(text.split())
# 大小写处理(根据需求)
# text = text.lower()
return text
4.2 文本清理
import re
from bs4 import BeautifulSoup
def clean_text(text):
# 移除HTML标签
text = BeautifulSoup(text, 'html.parser').get_text()
# 移除URLs
text = re.sub(r'http[s]?://\S+', '', text)
# 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 移除控制字符
text = ''.join(char for char in text if ord(char) >= 32)
return text.strip()
5. 分词与编码
5.1 分词器类型
BPE (Byte Pair Encoding)
from tokenizers import ByteLevelBPETokenizer
# 训练BPE分词器
tokenizer = ByteLevelBPETokenizer()
tokenizer.train(
files=['corpus.txt'],
vocab_size=50000,
min_frequency=2,
special_tokens=['<s>', '</s>', '<pad>', '<unk>', '<mask>']
)
WordPiece
from tokenizers import BertWordPieceTokenizer
tokenizer = BertWordPieceTokenizer()
tokenizer.train(
files=['corpus.txt'],
vocab_size=30000,
min_frequency=2,
limit_alphabet=1000,
wordpieces_prefix='##'
)
SentencePiece
import sentencepiece as spm
# 训练SentencePiece模型
spm.SentencePieceTrainer.train(
input='corpus.txt',
model_prefix='model',
vocab_size=32000,
character_coverage=0.9995,
model_type='unigram', # 或 'bpe'
pad_id=3,
unk_id=0,
bos_id=1,
eos_id=2
)
5.2 中文分词特殊处理
import jieba
from LAC import LAC
# Jieba分词
def chinese_tokenize_jieba(text):
return list(jieba.cut(text))
# 百度LAC分词
lac = LAC(mode='seg')
def chinese_tokenize_lac(text):
return lac.run(text)[0]
# 使用预训练模型的分词器
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('bert-base-chinese')
tokens = tokenizer.tokenize(text)
6. 数据集构建
6.1 预训练数据格式
文本文件格式
# 每行一个文档,空行分隔
def create_pretraining_data(documents, output_file):
with open(output_file, 'w', encoding='utf-8') as f:
for doc in documents:
f.write(doc + '\n\n')
JSONL格式
import json
def create_jsonl_dataset(documents, output_file):
with open(output_file, 'w', encoding='utf-8') as f:
for doc in documents:
json_line = json.dumps({'text': doc}, ensure_ascii=False)
f.write(json_line + '\n')
6.2 微调数据格式
指令微调格式
def create_instruction_data(instructions, inputs, outputs):
dataset = []
for inst, inp, out in zip(instructions, inputs, outputs):
sample = {
'instruction': inst,
'input': inp,
'output': out
}
dataset.append(sample)
return dataset
对话格式
def create_dialogue_data(conversations):
dataset = []
for conv in conversations:
formatted_conv = {
'messages': [
{'role': 'system', 'content': conv['system']},
{'role': 'user', 'content': conv['user']},
{'role': 'assistant', 'content': conv['assistant']}
]
}
dataset.append(formatted_conv)
return dataset
7. 数据增强
7.1 回译增强
from transformers import MarianMTModel, MarianTokenizer
def back_translation(text, src_lang='zh', tgt_lang='en'):
# 中译英
model_name = f'Helsinki-NLP/opus-mt-{src_lang}-{tgt_lang}'
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)
inputs = tokenizer(text, return_tensors="pt", padding=True)
translated = model.generate(**inputs)
en_text = tokenizer.decode(translated[0], skip_special_tokens=True)
# 英译中
model_name = f'Helsinki-NLP/opus-mt-{tgt_lang}-{src_lang}'
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)
inputs = tokenizer(en_text, return_tensors="pt", padding=True)
translated = model.generate(**inputs)
augmented_text = tokenizer.decode(translated[0], skip_special_tokens=True)
return augmented_text
7.2 同义词替换
import random
from synonyms import synonyms
def synonym_replacement(text, n=2):
words = text.split()
new_words = words.copy()
random_word_list = list(set([word for word in words if word.isalpha()]))
random.shuffle(random_word_list)
num_replaced = 0
for random_word in random_word_list:
synonyms_list = synonyms(random_word)
if len(synonyms_list) > 0:
synonym = random.choice(synonyms_list)
new_words = [synonym if word == random_word else word for word in new_words]
num_replaced += 1
if num_replaced >= n:
break
return ' '.join(new_words)
8. 技术栈与工具
8.1 数据处理框架
| 工具 | 用途 | 特点 |
|---|---|---|
| Apache Spark | 大规模数据处理 | 分布式计算,支持PB级数据 |
| Dask | Python并行计算 | 扩展pandas/numpy到大数据 |
| Ray | 分布式AI工作负载 | 高性能,易于扩展 |
| Apache Beam | 批流一体处理 | 统一的编程模型 |
8.2 NLP专用工具
| 工具 | 用途 | 特点 |
|---|---|---|
| spaCy | 工业级NLP | 快速、准确、易用 |
| NLTK | NLP教学和研究 | 功能全面,文档丰富 |
| Stanza | 多语言NLP | Stanford出品,学术质量 |
| TextBlob | 简单NLP任务 | API简洁,适合初学者 |
8.3 深度学习框架集成
# PyTorch数据加载
from torch.utils.data import Dataset, DataLoader
class TextDataset(Dataset):
def __init__(self, texts, tokenizer, max_length=512):
self.texts = texts
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
encoding = self.tokenizer(
text,
truncation=True,
padding='max_length',
max_length=self.max_length,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten()
}
# 创建数据加载器
dataset = TextDataset(texts, tokenizer)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
8.4 数据版本管理
# DVC (Data Version Control)
dvc init
dvc add data/corpus.txt
dvc remote add -d storage s3://mybucket/path
dvc push
# Git LFS (Large File Storage)
git lfs track "*.bin"
git add .gitattributes
git add model.bin
git commit -m "Add model file"
9. 质量控制与验证
9.1 数据质量指标
import numpy as np
from collections import Counter
def calculate_data_statistics(texts):
stats = {
'total_documents': len(texts),
'total_tokens': sum(len(text.split()) for text in texts),
'avg_length': np.mean([len(text) for text in texts]),
'std_length': np.std([len(text) for text in texts]),
'vocabulary_size': len(set(' '.join(texts).split()))
}
# 词频分布
all_words = ' '.join(texts).split()
word_freq = Counter(all_words)
stats['top_10_words'] = word_freq.most_common(10)
return stats
9.2 数据集分割
from sklearn.model_selection import train_test_split
def split_dataset(data, train_ratio=0.8, val_ratio=0.1, test_ratio=0.1):
assert train_ratio + val_ratio + test_ratio == 1.0
# 第一次分割:训练集和临时集
train_data, temp_data = train_test_split(
data,
test_size=1-train_ratio,
random_state=42
)
# 第二次分割:验证集和测试集
val_data, test_data = train_test_split(
temp_data,
test_size=test_ratio/(val_ratio+test_ratio),
random_state=42
)
return {
'train': train_data,
'validation': val_data,
'test': test_data
}
10. 最佳实践
10.1 数据处理Pipeline
class DataProcessingPipeline:
def __init__(self, config):
self.config = config
self.tokenizer = self._load_tokenizer()
def _load_tokenizer(self):
return AutoTokenizer.from_pretrained(self.config['tokenizer'])
def process(self, raw_data):
# 1. 清洗
cleaned_data = self.clean(raw_data)
# 2. 去重
deduped_data = self.deduplicate(cleaned_data)
# 3. 过滤
filtered_data = self.filter(deduped_data)
# 4. 规范化
normalized_data = self.normalize(filtered_data)
# 5. 分词
tokenized_data = self.tokenize(normalized_data)
return tokenized_data
def clean(self, data):
return [clean_text(text) for text in data]
def deduplicate(self, data):
return exact_dedup(data)
def filter(self, data):
return [text for text in data if quality_filter(text)]
def normalize(self, data):
return [normalize_text(text) for text in data]
def tokenize(self, data):
return [self.tokenizer(text) for text in data]
10.2 性能优化建议
- 并行处理: 使用multiprocessing或Ray进行并行数据处理
- 批处理: 批量处理数据而非逐条处理
- 缓存机制: 缓存中间结果避免重复计算
- 内存管理: 使用生成器处理大文件,避免内存溢出
- 分布式存储: 使用HDFS或对象存储处理海量数据
10.3 常见问题与解决方案
| 问题 | 解决方案 |
|---|---|
| 内存不足 | 使用流式处理,分批加载数据 |
| 处理速度慢 | 并行化处理,使用更高效的算法 |
| 数据不平衡 | 采样策略,数据增强 |
| 编码问题 | 统一使用UTF-8,使用ftfy修复 |
| 分词不准确 | 使用领域特定词典,自定义分词规则 |
11. 监控与日志
import logging
from tqdm import tqdm
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('data_processing.log'),
logging.StreamHandler()
]
)
def process_with_monitoring(data, process_func):
logger = logging.getLogger(__name__)
processed_data = []
failed_items = []
for item in tqdm(data, desc="Processing data"):
try:
result = process_func(item)
processed_data.append(result)
except Exception as e:
logger.error(f"Failed to process item: {e}")
failed_items.append(item)
logger.info(f"Successfully processed: {len(processed_data)}")
logger.info(f"Failed items: {len(failed_items)}")
return processed_data, failed_items
12. 总结
数据准备是大语言模型训练的基础,直接影响模型的最终性能。关键要点:
-
数据质量优于数量: 高质量的小数据集往往优于低质量的大数据集
-
保持数据多样性: 确保数据覆盖不同领域、风格和难度
-
注重隐私保护: 严格处理PII信息,遵守相关法规
-
建立可复现的Pipeline: 版本控制、文档化、自动化
-
持续迭代优化: 根据模型表现不断改进数据处理策略
-
数据收集:介绍了常用的公开数据集(如Common Crawl、Wikipedia等)和数据收集工具
-
数据清洗:
- 精确去重和模糊去重技术
- 质量过滤(语言检测、内容质量评估)
- 隐私信息处理(PII检测和脱敏)
-
数据预处理:文本规范化、HTML清理、编码修复等技术
-
分词与编码:详细介绍了BPE、WordPiece、SentencePiece等主流分词器,以及中文分词的特殊处理
-
数据集构建:预训练和微调数据的不同格式要求
-
数据增强:回译、同义词替换等技术
-
技术栈:
- 大数据处理框架(Spark、Dask、Ray)
- NLP工具(spaCy、NLTK、Stanza)
- 深度学习框架集成(PyTorch、TensorFlow)
- 数据版本管理(DVC、Git LFS)
-
质量控制:数据统计、数据集分割、监控日志等
-
最佳实践:完整的数据处理Pipeline设计和性能优化建议
更多推荐

所有评论(0)