LLM-15垂直领域大模型之高质量数据集的打造流程
如何打造垂直领域高质量数据集? 构建垂直领域高质量数据集需要系统化的流程和严格的质量控制。关键步骤包括: 明确需求规划:定义业务范围、质量指标(准确性≥95%、完整性≥98%)和数量要求(如5万条样本) 多源数据收集:采用分层策略整合电子病历(质量评分9.5)、医学文献(9.0)、临床指南(9.8)等权威数据源 专业质量控制: 建立医疗专家验证机制 设置同行评审环节 实施严格的数据脱敏处理 持续维
·
15. 如何打造垂直领域高质量数据集?模型高质量数据集的打造流程
高质量数据集概述
什么是高质量数据集?
高质量数据集是垂直领域大模型成功的基石,它不仅要满足数量要求,更要在准确性、多样性、一致性和时效性等方面达到专业标准。
1.1 数据集质量维度
核心质量指标
质量维度 | 定义 | 评估方法 | 重要性 |
---|---|---|---|
准确性 | 数据内容的真实性和正确性 | 专家验证、交叉验证 | 极高 |
完整性 | 数据记录的完整程度 | 缺失值分析、字段完整性 | 高 |
一致性 | 数据格式和标准的一致性 | 格式验证、标准检查 | 高 |
时效性 | 数据的更新程度和新鲜度 | 时间戳分析、版本控制 | 中 |
代表性 | 数据覆盖目标领域的程度 | 分布分析、覆盖度评估 | 高 |
多样性 | 数据来源和类型的多样性 | 来源统计、类型分布 | 中 |
1.2 垂直领域数据特点
医疗领域特征
- 专业术语密集,需要标准化处理
- 隐私敏感,需要严格脱敏
- 知识更新快,需要持续维护
- 准确性要求极高,容错率低
金融领域特征
- 数值精度要求高,小数位敏感
- 时效性强,需要实时更新
- 合规要求严格,需要审计追踪
- 多源异构,需要统一格式
法律领域特征
- 逻辑结构严谨,需要保持完整性
- 引用关系复杂,需要维护关联
- 地域差异明显,需要分类处理
- 版本控制重要,需要追踪变更
数据集构建流程
2.1 需求分析与规划
2.1.1 业务需求定义
医疗问诊数据集需求示例
# medical_dataset_requirements.yaml
dataset_specifications:
name: "医疗问诊高质量数据集"
version: "1.0.0"
domain: "医疗问诊"
scope:
coverage_areas:
- "内科常见病"
- "外科基础病"
- "儿科多发病"
- "妇科基础病"
- "急诊处理"
excluded_areas:
- "罕见疾病"
- "实验性治疗"
- "未确认疗法"
quality_requirements:
accuracy_target: 0.95
completeness_target: 0.98
consistency_target: 0.99
medical_expert_validation: true
peer_review_required: true
quantity_requirements:
total_samples: 50000
min_samples_per_disease: 100
max_samples_per_disease: 1000
train_val_test_split: [0.8, 0.1, 0.1]
format_requirements:
input_format: "instruction-input-output"
output_length_range: [50, 500]
language: "中文"
encoding: "UTF-8"
2.1.2 数据源识别与评估
数据源分类矩阵
# data_source_evaluation.py
data_source_categories = {
"primary_sources": {
"electronic_health_records": {
"quality_score": 9.5,
"availability": "high",
"access_difficulty": "high",
"privacy_level": "critical",
"update_frequency": "real-time"
},
"medical_literature": {
"quality_score": 9.0,
"availability": "medium",
"access_difficulty": "low",
"privacy_level": "none",
"update_frequency": "monthly"
},
"clinical_guidelines": {
"quality_score": 9.8,
"availability": "high",
"access_difficulty": "low",
"privacy_level": "none",
"update_frequency": "quarterly"
}
},
"secondary_sources": {
"medical_forums": {
"quality_score": 6.5,
"availability": "high",
"access_difficulty": "low",
"privacy_level": "medium",
"update_frequency": "daily"
},
"health_websites": {
"quality_score": 7.0,
"availability": "high",
"access_difficulty": "low",
"privacy_level": "low",
"update_frequency": "weekly"
}
}
}
2.2 数据收集与获取
2.2.1 多源数据收集策略
分层数据收集框架
# multi_source_collector.py
import asyncio
import aiohttp
from typing import List, Dict
import json
from datetime import datetime
class MultiSourceDataCollector:
"""多源数据收集器"""
def __init__(self, config_path: str):
self.config = self.load_config(config_path)
self.collectors = self.initialize_collectors()
def initialize_collectors(self) -> Dict:
"""初始化各个收集器"""
return {
"pubmed": PubMedCollector(self.config["pubmed"]),
"clinical_trials": ClinicalTrialsCollector(self.config["clinical_trials"]),
"medical_wiki": MedicalWikiCollector(self.config["medical_wiki"]),
"health_forums": HealthForumsCollector(self.config["health_forums"])
}
async def collect_all_sources(self, query_terms: List[str]) -> Dict:
"""并行收集所有数据源"""
tasks = []
for source_name, collector in self.collectors.items():
task = asyncio.create_task(
self.collect_from_source(source_name, collector, query_terms)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# 合并结果
combined_data = {}
for i, (source_name, result) in enumerate(zip(self.collectors.keys(), results)):
if isinstance(result, Exception):
print(f"Error collecting from {source_name}: {result}")
combined_data[source_name] = []
else:
combined_data[source_name] = result
return combined_data
async def collect_from_source(self, source_name: str, collector, query_terms: List[str]):
"""从特定源收集数据"""
try:
data = await collector.collect(query_terms)
print(f"Collected {len(data)} items from {source_name}")
return data
except Exception as e:
print(f"Failed to collect from {source_name}: {e}")
return []
# PubMed收集器实现
class PubMedCollector:
"""PubMed数据收集器"""
def __init__(self, config: Dict):
self.api_key = config.get("api_key")
self.base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
self.rate_limit = config.get("rate_limit", 3) # 每秒请求数
async def collect(self, query_terms: List[str]) -> List[Dict]:
"""收集PubMed数据"""
results = []
semaphore = asyncio.Semaphore(self.rate_limit)
async with aiohttp.ClientSession() as session:
for term in query_terms:
async with semaphore:
data = await self.search_pubmed(session, term)
processed_data = self.process_pubmed_data(data)
results.extend(processed_data)
return results
async def search_pubmed(self, session: aiohttp.ClientSession, query: str) -> Dict:
"""搜索PubMed"""
search_url = f"{self.base_url}/esearch.fcgi"
params = {
"db": "pubmed",
"term": query,
"retmode": "json",
"retmax": 100
}
if self.api_key:
params["api_key"] = self.api_key
async with session.get(search_url, params=params) as response:
if response.status == 200:
return await response.json()
else:
print(f"PubMed API error: {response.status}")
return {}
def process_pubmed_data(self, data: Dict) -> List[Dict]:
"""处理PubMed数据"""
results = []
search_results = data.get("esearchresult", {})
id_list = search_results.get("idlist", [])
for pmid in id_list:
result = {
"source": "pubmed",
"id": pmid,
"title": self.extract_title(data),
"abstract": self.extract_abstract(data),
"keywords": self.extract_keywords(data),
"timestamp": datetime.now().isoformat()
}
results.append(result)
return results
2.2.2 数据爬取与API集成
智能爬虫系统
# intelligent_scraper.py
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
import re
from urllib.parse import urljoin
class MedicalSpider(scrapy.Spider):
"""医疗数据爬虫"""
name = 'medical_spider'
allowed_domains = ['mayoclinic.org', 'medlineplus.gov', 'webmd.com']
custom_settings = {
'ROBOTSTXT_OBEY': True,
'DOWNLOAD_DELAY': 1,
'RANDOMIZE_DOWNLOAD_DELAY': True,
'USER_AGENT': 'MedicalBot/1.0 (Educational Purpose)',
'FEED_EXPORT_ENCODING': 'utf-8',
'CONCURRENT_REQUESTS': 4,
'CONCURRENT_REQUESTS_PER_DOMAIN': 2,
}
def __init__(self, start_urls=None, *args, **kwargs):
super(MedicalSpider, self).__init__(*args, **kwargs)
self.start_urls = start_urls or [
'https://www.mayoclinic.org/diseases-conditions',
'https://medlineplus.gov/healthtopics.html'
]
def parse(self, response):
"""解析医疗条件页面"""
# 提取条件列表
conditions = response.css('.condition-link::text').getall()
for condition in conditions:
condition_url = self.build_condition_url(condition, response.url)
yield scrapy.Request(
condition_url,
callback=self.parse_condition,
meta={'condition': condition.strip()}
)
def parse_condition(self, response):
"""解析具体疾病页面"""
condition = response.meta['condition']
# 提取症状
symptoms_section = response.css('#symptoms')
symptoms = symptoms_section.css('li::text').getall()
# 提取治疗信息
treatment_section = response.css('#treatment')
treatments = treatment_section.css('li::text').getall()
# 提取原因
causes_section = response.css('#causes')
causes = causes_section.css('p::text').getall()
yield {
'condition': condition,
'symptoms': symptoms,
'treatments': treatments,
'causes': causes,
'source_url': response.url,
'scraped_at': datetime.now().isoformat(),
'confidence_score': self.calculate_confidence(response)
}
def calculate_confidence(self, response):
"""计算数据置信度"""
# 基于页面结构和内容完整性计算置信度
confidence = 0.0
# 检查是否有症状部分
if response.css('#symptoms'):
confidence += 0.3
# 检查是否有治疗部分
if response.css('#treatment'):
confidence += 0.3
# 检查内容长度
content_length = len(response.text)
if content_length > 1000:
confidence += 0.2
# 检查是否有医学关键词
medical_keywords = ['symptom', 'treatment', 'diagnosis', 'condition']
keyword_count = sum(1 for keyword in medical_keywords if keyword in response.text.lower())
confidence += (keyword_count / len(medical_keywords)) * 0.2
return min(confidence, 1.0)
def build_condition_url(self, condition, base_url):
"""构建疾病详情页URL"""
# 实现URL构建逻辑
condition_slug = condition.lower().replace(' ', '-')
return urljoin(base_url, f'/{condition_slug}/symptoms-causes/syc-2030000')
# 运行爬虫
def run_medical_spider(output_file="medical_data.json"):
"""运行医疗数据爬虫"""
process = CrawlerProcess(get_project_settings())
process.crawl(MedicalSpider)
process.start()
print(f"Medical data scraped and saved to {output_file}")
2.3 数据清洗与预处理
2.3.1 医疗文本清洗
专业文本清洗管道
# medical_text_cleaning.py
import re
import spacy
from typing import List, Dict
import unicodedata
class MedicalTextCleaner:
"""医疗文本清洗器"""
def __init__(self):
self.nlp = spacy.load("en_core_sci_sm") # 科学文献模型
self.medical_abbreviations = self.load_medical_abbreviations()
def clean_medical_text(self, text: str) -> Dict:
"""清洗医疗文本"""
original_text = text
# 步骤1: 基础清洗
text = self.basic_cleaning(text)
# 步骤2: 医学术语标准化
text = self.standardize_medical_terms(text)
# 步骤3: 缩写扩展
text = self.expand_abbreviations(text)
# 步骤4: 实体识别和标准化
entities = self.extract_medical_entities(text)
# 步骤5: 语法检查
text = self.grammar_check(text)
return {
"original_text": original_text,
"cleaned_text": text,
"entities": entities,
"cleaning_metadata": self.get_cleaning_metadata(original_text, text)
}
def basic_cleaning(self, text: str) -> str:
"""基础文本清洗"""
# 移除HTML标签
text = re.sub(r'<[^>]+>', '', text)
# 标准化Unicode字符
text = unicodedata.normalize('NFKC', text)
# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text)
# 修正标点符号
text = re.sub(r'\s+([,.!?;:])', r'\1', text)
return text.strip()
def standardize_medical_terms(self, text: str) -> str:
"""标准化医学术语"""
# 统一疾病名称
disease_mapping = {
"heart attack": "myocardial infarction",
"high blood pressure": "hypertension",
"sugar diabetes": "diabetes mellitus",
"bp": "blood pressure",
"hr": "heart rate"
}
for common_term, medical_term in disease_mapping.items():
text = re.sub(rf'\b{common_term}\b', medical_term, text, flags=re.IGNORECASE)
return text
def expand_abbreviations(self, text: str) -> str:
"""扩展医学缩写"""
abbreviation_map = {
r'\bq\.d\.?\b': 'once daily',
r'\bb\.i\.d\.?\b': 'twice daily',
r'\bt\.i\.d\.?\b': 'three times daily',
r'\bq\.i\.d\.?\b': 'four times daily',
r'\bp\.o\.?\b': 'by mouth',
r'\bi\.m\.?\b': 'intramuscular',
r'\bi\.v\.?\b': 'intravenous',
r'\bs\.c\.?\b': 'subcutaneous',
r'\bp\.r\.n\.?\b': 'as needed',
r'\bstat\b': 'immediately'
}
for pattern, expansion in abbreviation_map.items():
text = re.sub(pattern, expansion, text, flags=re.IGNORECASE)
return text
def extract_medical_entities(self, text: str) -> List[Dict]:
"""提取医学实体"""
doc = self.nlp(text)
entities = []
for ent in doc.ents:
if ent.label_ in ["DISEASE", "SYMPTOM", "MEDICATION", "PROCEDURE"]:
entities.append({
"text": ent.text,
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char,
"confidence": 0.9 # spaCy默认置信度
})
return entities
def grammar_check(self, text: str) -> str:
"""基础语法检查"""
# 修正常见语法错误
corrections = {
r'\bain\'t\b': "isn't",
r'\bgonna\b': "going to",
r'\bgotta\b': "have got to",
r'\bwanna\b': "want to"
}
for pattern, correction in corrections.items():
text = re.sub(pattern, correction, text, flags=re.IGNORECASE)
return text
def get_cleaning_metadata(self, original: str, cleaned: str) -> Dict:
"""获取清洗元数据"""
return {
"original_length": len(original),
"cleaned_length": len(cleaned),
"length_change": len(cleaned) - len(original),
"abbreviations_expanded": self.count_abbreviations_expanded(original, cleaned),
"entities_found": len(self.extract_medical_entities(cleaned))
}
def count_abbreviations_expanded(self, original: str, cleaned: str) -> int:
"""计算扩展的缩写数量"""
# 简化的实现
return len(original) - len(cleaned) # 长度变化作为近似指标
2.3.2 数据去重与融合
智能去重算法
# intelligent_deduplication.py
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from typing import List, Tuple
class IntelligentDeduplicator:
"""智能去重器"""
def __init__(self, similarity_threshold=0.85):
self.similarity_threshold = similarity_threshold
self.vectorizer = TfidfVectorizer(
max_features=5000,
ngram_range=(1, 3),
stop_words='english'
)
def find_duplicates(self, texts: List[str]) -> List[Tuple[int, int, float]]:
"""查找重复文本"""
if len(texts) < 2:
return []
# 向量化文本
tfidf_matrix = self.vectorizer.fit_transform(texts)
# 计算相似度矩阵
similarity_matrix = cosine_similarity(tfidf_matrix)
# 找到相似度超过阈值的文本对
duplicates = []
n = len(texts)
for i in range(n):
for j in range(i+1, n):
similarity = similarity_matrix[i][j]
if similarity > self.similarity_threshold:
duplicates.append((i, j, similarity))
return duplicates
def remove_duplicates(self, texts: List[str], keep_strategy="longest") -> List[str]:
"""移除重复文本"""
duplicates = self.find_duplicates(texts)
if not duplicates:
return texts
# 构建重复关系图
duplicate_groups = self.build_duplicate_groups(duplicates, len(texts))
# 根据策略选择保留的文本
unique_texts = []
for group in duplicate_groups:
if len(group) == 1:
unique_texts.append(texts[group[0]])
else:
kept_index = self.select_text_to_keep(group, texts, keep_strategy)
unique_texts.append(texts[kept_index])
return unique_texts
def build_duplicate_groups(self, duplicates: List[Tuple[int, int, float]], n_texts: int) -> List[List[int]]:
"""构建重复文本组"""
# 使用并查集算法
parent = list(range(n_texts))
def find(x):
if parent[x] != x:
parent[x] = find(parent[x])
return parent[x]
def union(x, y):
px, py = find(x), find(y)
if px != py:
parent[px] = py
# 合并重复的文本
for i, j, _ in duplicates:
union(i, j)
# 构建组
groups = {}
for i in range(n_texts):
root = find(i)
if root not in groups:
groups[root] = []
groups[root].append(i)
return list(groups.values())
def select_text_to_keep(self, group: List[int], texts: List[str], strategy: str) -> int:
"""选择要保留的文本"""
if strategy == "longest":
return max(group, key=lambda i: len(texts[i]))
elif strategy == "shortest":
return min(group, key=lambda i: len(texts[i]))
elif strategy == "highest_quality":
# 这里可以实现质量评分逻辑
return max(group, key=lambda i: self.quality_score(texts[i]))
else:
return group[0] # 默认保留第一个
def quality_score(self, text: str) -> float:
"""计算文本质量分数"""
score = 0.0
# 长度因子
if 100 <= len(text) <= 1000:
score += 0.3
# 完整性因子
if text.endswith('.') or text.endswith('?'):
score += 0.2
# 关键词密度因子
words = text.split()
if len(words) > 10:
unique_words = set(words)
score += (len(unique_words) / len(words)) * 0.3
# 语法正确性因子(简化版)
if not any(char.isdigit() for char in text):
score += 0.2
return min(score, 1.0)
2.4 数据标注与验证
2.4.1 众包标注平台
标注任务设计
# annotation_task_design.py
from typing import List, Dict, Optional
from dataclasses import dataclass
import json
@dataclass
class AnnotationTask:
"""标注任务定义"""
task_id: str
task_type: str # "classification", "ner", "qa", "ranking"
instructions: str
examples: List[Dict]
guidelines: str
difficulty: str # "easy", "medium", "hard"
estimated_time: int # 分钟
reward: float
class MedicalAnnotationPlatform:
"""医疗标注平台"""
def __init__(self):
self.annotators = {}
self.quality_control = QualityControlSystem()
def create_annotation_task(self, data_item: Dict, task_type: str) -> AnnotationTask:
"""创建标注任务"""
if task_type == "medical_qa":
return self.create_medical_qa_task(data_item)
elif task_type == "symptom_classification":
return self.create_symptom_classification_task(data_item)
elif task_type == "treatment_ranking":
return self.create_treatment_ranking_task(data_item)
else:
raise ValueError(f"Unknown task type: {task_type}")
def create_medical_qa_task(self, data_item: Dict) -> AnnotationTask:
"""创建医疗问答标注任务"""
return AnnotationTask(
task_id=f"medical_qa_{data_item['id']}",
task_type="qa",
instructions="""
请根据提供的医疗问题,判断AI生成的回答是否准确、完整、安全。
评估标准:
1. 准确性:信息是否正确,有无医学错误
2. 完整性:是否全面回答了问题
3. 安全性:是否包含适当的医疗免责声明
4. 可读性:语言是否清晰易懂
评分:1-5分,5分为最佳
""",
examples=[
{
"question": "What are the symptoms of diabetes?",
"ai_answer": "Common symptoms include frequent urination, excessive thirst, and fatigue. However, please consult a healthcare professional for proper diagnosis.",
"rating": 5,
"feedback": "Good answer with appropriate disclaimer"
}
],
guidelines="Focus on medical accuracy and patient safety",
difficulty="medium",
estimated_time=3,
reward=0.5
)
def distribute_tasks(self, tasks: List[AnnotationTask], annotator_pool: List[str]):
"""分发标注任务"""
distribution = {}
for task in tasks:
# 选择最合适的标注员
suitable_annotators = self.select_suitable_annotators(task, annotator_pool)
# 分配任务给多个标注员(冗余标注)
assigned_annotators = suitable_annotators[:3] # 3个标注员
distribution[task.task_id] = {
"task": task,
"annotators": assigned_annotators,
"deadline": self.calculate_deadline(task)
}
return distribution
def select_suitable_annotators(self, task: AnnotationTask, annotator_pool: List[str]) -> List[str]:
"""选择合适的标注员"""
suitable = []
for annotator_id in annotator_pool:
annotator = self.annotators.get(annotator_id)
if not annotator:
continue
# 检查专业背景
if task.task_type == "medical_qa" and annotator.get("medical_background"):
suitable.append(annotator_id)
# 检查历史表现
if annotator.get("accuracy_score", 0) > 0.85:
suitable.append(annotator_id)
# 检查活跃度
if annotator.get("recent_activity", 0) > 0.8:
suitable.append(annotator_id)
return suitable[:10] # 返回前10个最合适的标注员
2.4.2 质量控制机制
多轮标注验证
# quality_control_system.py
import statistics
from typing import List, Dict, Tuple
class QualityControlSystem:
"""质量控制系统"""
def __init__(self):
self.consensus_threshold = 0.7
self.expert_weight = 2.0
self.min_annotations = 3
def calculate_inter_annotator_agreement(self, annotations: List[Dict]) -> float:
"""计算标注者间一致性"""
if len(annotations) < 2:
return 0.0
# 提取评分
ratings = [ann['rating'] for ann in annotations]
# 计算Fleiss' Kappa或Krippendorff's Alpha
# 这里使用简化的标准差方法
if len(set(ratings)) == 1:
return 1.0
std_dev = statistics.stdev(ratings)
max_rating = max(ratings)
min_rating = min(ratings)
rating_range = max_rating - min_rating
# 标准化标准差
normalized_std = std_dev / rating_range if rating_range > 0 else 0
# 转换为一致性分数
agreement = max(0, 1 - normalized_std)
return agreement
def resolve_disagreements(self, annotations: List[Dict]) -> Dict:
"""解决标注分歧"""
agreement = self.calculate_inter_annotator_agreement(annotations)
if agreement >= self.consensus_threshold:
# 高一致性,使用简单平均
return self.simple_consensus(annotations)
else:
# 低一致性,需要专家介入
return self.expert_resolution(annotations)
def simple_consensus(self, annotations: List[Dict]) -> Dict:
"""简单共识算法"""
ratings = [ann['rating'] for ann in annotations]
avg_rating = statistics.mean(ratings)
# 选择最接近平均值的标注
closest_annotation = min(annotations, key=lambda x: abs(x['rating'] - avg_rating))
return {
"final_annotation": closest_annotation,
"consensus_method": "simple_average",
"agreement_score": self.calculate_inter_annotator_agreement(annotations),
"num_annotations": len(annotations)
}
def expert_resolution(self, annotations: List[Dict]) -> Dict:
"""专家解决机制"""
# 识别专家标注员
expert_annotations = [ann for ann in annotations if ann.get('is_expert', False)]
if expert_annotations:
# 使用专家意见
expert_consensus = self.expert_consensus(expert_annotations)
return {
"final_annotation": expert_consensus,
"consensus_method": "expert_consensus",
"agreement_score": self.calculate_inter_annotator_agreement(expert_annotations),
"num_experts": len(expert_annotations)
}
else:
# 没有专家,需要重新标注
return {
"final_annotation": None,
"consensus_method": "requires_expert_review",
"agreement_score": self.calculate_inter_annotator_agreement(annotations),
"action": "send_to_expert_panel"
}
def expert_consensus(self, expert_annotations: List[Dict]) -> Dict:
"""专家共识算法"""
# 加权平均,专家权重更高
weighted_ratings = []
for ann in expert_annotations:
rating = ann['rating']
weight = ann.get('expert_weight', self.expert_weight)
weighted_ratings.extend([rating] * int(weight))
consensus_rating = statistics.mean(weighted_ratings)
# 选择最接近共识的专家标注
closest_expert = min(expert_annotations,
key=lambda x: abs(x['rating'] - consensus_rating))
return closest_expert
def quality_assurance_check(self, dataset: List[Dict]) -> Dict:
"""质量保证检查"""
quality_report = {
"total_samples": len(dataset),
"quality_score": 0.0,
"issues_found": [],
"recommendations": []
}
# 准确性检查
accuracy_issues = self.check_accuracy(dataset)
if accuracy_issues:
quality_report["issues_found"].extend(accuracy_issues)
# 一致性检查
consistency_score = self.check_consistency(dataset)
quality_report["quality_score"] = consistency_score
# 完整性检查
completeness_issues = self.check_completeness(dataset)
if completeness_issues:
quality_report["issues_found"].extend(completeness_issues)
# 生成改进建议
quality_report["recommendations"] = self.generate_quality_recommendations(quality_report)
return quality_report
def check_accuracy(self, dataset: List[Dict]) -> List[Dict]:
"""检查准确性"""
issues = []
for i, item in enumerate(dataset):
# 检查医学事实一致性
if self.has_medical_inconsistencies(item):
issues.append({
"type": "medical_inconsistency",
"sample_id": item.get("id", i),
"description": "Medical facts are inconsistent",
"severity": "high"
})
# 检查逻辑一致性
if self.has_logical_inconsistencies(item):
issues.append({
"type": "logical_inconsistency",
"sample_id": item.get("id", i),
"description": "Logical flow is inconsistent",
"severity": "medium"
})
return issues
def check_consistency(self, dataset: List[Dict]) -> float:
"""检查一致性"""
if len(dataset) < 2:
return 1.0
# 计算格式一致性
format_consistency = self.calculate_format_consistency(dataset)
# 计算术语一致性
terminology_consistency = self.calculate_terminology_consistency(dataset)
# 计算风格一致性
style_consistency = self.calculate_style_consistency(dataset)
# 综合一致性分数
overall_consistency = (
format_consistency * 0.4 +
terminology_consistency * 0.4 +
style_consistency * 0.2
)
return overall_consistency
def generate_quality_recommendations(self, quality_report: Dict) -> List[str]:
"""生成质量改进建议"""
recommendations = []
if quality_report["quality_score"] < 0.8:
recommendations.append("Consider re-annotating low-quality samples")
high_severity_issues = [issue for issue in quality_report["issues_found"]
if issue.get("severity") == "high"]
if len(high_severity_issues) > 5:
recommendations.append("Review annotation guidelines and provide additional training")
if quality_report["quality_score"] < 0.9:
recommendations.append("Implement additional quality control checkpoints")
return recommendations
数据集构建实操
3.1 DIY数据集构建工具
3.1.1 数据集构建流水线
完整构建脚本
#!/bin/bash
# build_medical_dataset.sh
# 配置参数
DATASET_NAME="medical_consultation_v1"
OUTPUT_DIR="./datasets/${DATASET_NAME}"
MIN_SAMPLES=50000
QUALITY_THRESHOLD=0.85
# 创建输出目录
mkdir -p ${OUTPUT_DIR}/{raw,processed,final}
echo "=== 开始构建医疗问诊数据集 ==="
echo "数据集名称: ${DATASET_NAME}"
echo "目标样本数: ${MIN_SAMPLES}"
echo "质量阈值: ${QUALITY_THRESHOLD}"
# 步骤1: 数据收集
echo "步骤1: 收集原始数据..."
python scripts/collect_data.py \
--sources pubmed,clinical_trials,medical_wiki \
--queries ./config/medical_queries.txt \
--output ${OUTPUT_DIR}/raw/collected_data.jsonl \
--max_samples 100000
# 步骤2: 数据清洗
echo "步骤2: 清洗数据..."
python scripts/clean_data.py \
--input ${OUTPUT_DIR}/raw/collected_data.jsonl \
--output ${OUTPUT_DIR}/processed/cleaned_data.jsonl \
--config ./config/cleaning_config.yaml
# 步骤3: 去重处理
echo "步骤3: 去重处理..."
python scripts/deduplicate.py \
--input ${OUTPUT_DIR}/processed/cleaned_data.jsonl \
--output ${OUTPUT_DIR}/processed/deduplicated_data.jsonl \
--similarity_threshold 0.85
# 步骤4: 标注任务
echo "步骤4: 创建标注任务..."
python scripts/create_annotation_tasks.py \
--input ${OUTPUT_DIR}/processed/deduplicated_data.jsonl \
--output ${OUTPUT_DIR}/annotation/tasks.json \
--num_tasks 1000
# 步骤5: 质量控制
echo "步骤5: 质量控制..."
python scripts/quality_control.py \
--input ${OUTPUT_DIR}/annotation/annotated_data.jsonl \
--output ${OUTPUT_DIR}/processed/quality_checked_data.jsonl \
--quality_threshold ${QUALITY_THRESHOLD}
# 步骤6: 数据集分割
echo "步骤6: 数据集分割..."
python scripts/split_dataset.py \
--input ${OUTPUT_DIR}/processed/quality_checked_data.jsonl \
--output_dir ${OUTPUT_DIR}/final \
--split_ratio 0.8,0.1,0.1 \
--shuffle true
# 步骤7: 生成数据集报告
echo "步骤7: 生成数据集报告..."
python scripts/generate_dataset_report.py \
--dataset_dir ${OUTPUT_DIR}/final \
--output ${OUTPUT_DIR}/dataset_report.html
# 步骤8: 打包数据集
echo "步骤8: 打包数据集..."
cd ${OUTPUT_DIR}/final
tar -czf ../${DATASET_NAME}.tar.gz *.jsonl
cd ../..
echo "=== 数据集构建完成 ==="
echo "输出位置: ${OUTPUT_DIR}/final/"
echo "报告文件: ${OUTPUT_DIR}/dataset_report.html"
echo "打包文件: ${OUTPUT_DIR}/${DATASET_NAME}.tar.gz"
3.1.2 数据质量评估工具
质量评估仪表板
# dataset_quality_dashboard.py
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datasets import load_dataset
import json
class DatasetQualityDashboard:
"""数据集质量仪表板"""
def __init__(self):
st.set_page_config(
page_title="医疗数据集质量仪表板",
page_icon="🏥",
layout="wide"
)
def run_dashboard(self):
"""运行仪表板"""
st.title("🏥 医疗数据集质量仪表板")
# 侧边栏
with st.sidebar:
st.header("数据集选择")
dataset_path = st.text_input(
"数据集路径",
value="./datasets/medical_consultation_v1/final/train.jsonl"
)
sample_size = st.slider("样本数量", 100, 10000, 1000)
# 加载数据
try:
df = self.load_dataset_sample(dataset_path, sample_size)
st.success(f"成功加载 {len(df)} 条数据")
except Exception as e:
st.error(f"加载数据失败: {e}")
return
# 主要指标
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("总样本数", f"{len(df):,}")
with col2:
avg_length = df['output'].str.len().mean()
st.metric("平均回答长度", f"{avg_length:.0f} 字符")
with col3:
unique_conditions = df['condition'].nunique() if 'condition' in df.columns else "N/A"
st.metric("疾病种类", unique_conditions)
with col4:
quality_score = df.get('quality_score', pd.Series([0.85])).mean()
st.metric("质量分数", f"{quality_score:.2f}")
# 数据分布可视化
st.header("📊 数据分布分析")
col1, col2 = st.columns(2)
with col1:
# 回答长度分布
fig_length = px.histogram(
df,
x=df['output'].str.len(),
nbins=50,
title="回答长度分布",
labels={'x': '字符数', 'y': '频数'}
)
st.plotly_chart(fig_length, use_container_width=True)
with col2:
# 疾病类型分布(如果有)
if 'condition' in df.columns:
condition_counts = df['condition'].value_counts().head(20)
fig_conditions = px.bar(
x=condition_counts.index,
y=condition_counts.values,
title="Top 20 疾病类型分布",
labels={'x': '疾病类型', 'y': '数量'}
)
fig_conditions.update_xaxis(tickangle=45)
st.plotly_chart(fig_conditions, use_container_width=True)
# 质量指标
st.header("✅ 质量指标分析")
if 'quality_metrics' in df.columns:
quality_df = pd.json_normalize(df['quality_metrics'])
col1, col2, col3 = st.columns(3)
with col1:
st.subheader("准确性分布")
accuracy_scores = quality_df.get('accuracy', pd.Series([0.9] * len(df)))
fig_accuracy = px.histogram(
accuracy_scores,
nbins=20,
title="准确性分数分布"
)
st.plotly_chart(fig_accuracy, use_container_width=True)
with col2:
st.subheader("完整性分布")
completeness_scores = quality_df.get('completeness', pd.Series([0.9] * len(df)))
fig_completeness = px.histogram(
completeness_scores,
nbins=20,
title="完整性分数分布"
)
st.plotly_chart(fig_completeness, use_container_width=True)
with col3:
st.subheader("一致性分布")
consistency_scores = quality_df.get('consistency', pd.Series([0.9] * len(df)))
fig_consistency = px.histogram(
consistency_scores,
nbins=20,
title="一致性分数分布"
)
st.plotly_chart(fig_consistency, use_container_width=True)
# 数据样本展示
st.header("📝 数据样本")
sample_idx = st.slider("选择样本索引", 0, len(df)-1, 0)
sample = df.iloc[sample_idx]
with st.expander("查看样本详情"):
col1, col2 = st.columns(2)
with col1:
st.subheader("输入")
st.text_area("", sample.get('instruction', 'N/A'), height=100, disabled=True)
if 'input' in sample and sample['input']:
st.subheader("上下文")
st.text_area("", sample['input'], height=50, disabled=True)
with col2:
st.subheader("输出")
st.text_area("", sample.get('output', 'N/A'), height=150, disabled=True)
# 导出功能
st.header("📤 导出选项")
export_format = st.selectbox("导出格式", ["JSON", "CSV", "Parquet"])
if st.button("导出数据"):
if export_format == "JSON":
json_data = df.to_json(orient='records', force_ascii=False, indent=2)
st.download_button(
label="下载 JSON",
data=json_data,
file_name="medical_dataset.json",
mime="application/json"
)
elif export_format == "CSV":
csv_data = df.to_csv(index=False)
st.download_button(
label="下载 CSV",
data=csv_data,
file_name="medical_dataset.csv",
mime="text/csv"
)
elif export_format == "Parquet":
# 需要安装 pyarrow
try:
import pyarrow.parquet as pq
parquet_data = df.to_parquet()
st.download_button(
label="下载 Parquet",
data=parquet_data,
file_name="medical_dataset.parquet",
mime="application/octet-stream"
)
except ImportError:
st.error("需要安装 pyarrow 才能导出 Parquet 格式")
def load_dataset_sample(self, dataset_path: str, sample_size: int) -> pd.DataFrame:
"""加载数据集样本"""
try:
# 尝试作为JSONL文件加载
data = []
with open(dataset_path, 'r', encoding='utf-8') as f:
for i, line in enumerate(f):
if i >= sample_size:
break
data.append(json.loads(line.strip()))
return pd.DataFrame(data)
except Exception as e:
st.error(f"加载数据失败: {e}")
# 返回示例数据
return self.generate_sample_data()
def generate_sample_data(self) -> pd.DataFrame:
"""生成示例数据"""
sample_data = [
{
"instruction": "What are the symptoms of diabetes?",
"input": "",
"output": "Common symptoms of diabetes include frequent urination, excessive thirst, unexplained weight loss, increased hunger, fatigue, blurred vision, and slow-healing wounds. However, please consult a healthcare professional for proper diagnosis and treatment.",
"condition": "diabetes",
"quality_score": 0.92
},
{
"instruction": "How is hypertension diagnosed?",
"input": "",
"output": "Hypertension is diagnosed by measuring blood pressure on multiple occasions. A diagnosis is made when blood pressure readings are consistently 140/90 mmHg or higher. Your doctor may also recommend additional tests to check for underlying conditions. Regular monitoring is essential.",
"condition": "hypertension",
"quality_score": 0.88
}
]
return pd.DataFrame(sample_data * 500) # 重复生成更多数据
# 运行仪表板
if __name__ == "__main__":
dashboard = DatasetQualityDashboard()
dashboard.run_dashboard()
3.2 自动化数据集维护
3.2.1 持续集成流程
GitHub Actions工作流
# .github/workflows/dataset-update.yml
name: Dataset Update Pipeline
on:
schedule:
- cron: '0 2 * * 1' # 每周一凌晨2点运行
workflow_dispatch:
inputs:
update_type:
description: 'Update type'
required: true
default: 'incremental'
type: choice
options:
- incremental
- full
jobs:
data-collection:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install -e .
- name: Collect new data
run: |
python scripts/collect_latest_data.py \
--update-type ${{ github.event.inputs.update_type || 'incremental' }} \
--output data/new_data.jsonl
- name: Process new data
run: |
python scripts/process_new_data.py \
--input data/new_data.jsonl \
--output data/processed_data.jsonl
- name: Quality check
run: |
python scripts/quality_check.py \
--input data/processed_data.jsonl \
--threshold 0.85
- name: Merge with existing dataset
run: |
python scripts/merge_datasets.py \
--existing data/current_dataset.jsonl \
--new data/processed_data.jsonl \
--output data/updated_dataset.jsonl
- name: Generate quality report
run: |
python scripts/generate_quality_report.py \
--dataset data/updated_dataset.jsonl \
--output reports/quality_report.html
- name: Upload artifacts
uses: actions/upload-artifact@v3
with:
name: updated-dataset
path: |
data/updated_dataset.jsonl
reports/quality_report.html
- name: Create Pull Request
uses: peter-evans/create-pull-request@v5
with:
token: ${{ secrets.GITHUB_TOKEN }}
commit-message: "chore: update medical dataset"
title: "Dataset Update - ${{ github.run_number }}"
body: |
## 数据集更新报告
### 更新统计
- 新增样本: ${{ steps.stats.outputs.new_samples }}
- 质量分数: ${{ steps.stats.outputs.quality_score }}
- 更新日期: ${{ steps.stats.outputs.update_date }}
### 质量报告
详见: reports/quality_report.html
branch: dataset-update-${{ github.run_number }}
delete-branch: true
3.2.2 数据版本管理
语义化版本管理
# dataset_versioning.py
import semver
import json
import hashlib
from datetime import datetime
from typing import Dict, List
class DatasetVersionManager:
"""数据集版本管理器"""
def __init__(self, major=1, minor=0, patch=0):
self.version = semver.VersionInfo(major, minor, patch)
self.version_history = []
def bump_version(self, change_type: str, changes: Dict) -> str:
"""版本号递增"""
if change_type == "major":
# 重大变更:数据格式改变、质量要求重大调整
new_version = self.version.bump_major()
elif change_type == "minor":
# 次要变更:新增大量数据、新增功能
new_version = self.version.bump_minor()
elif change_type == "patch":
# 补丁变更:小量数据修正、bug修复
new_version = self.version.bump_patch()
else:
raise ValueError(f"Unknown change type: {change_type}")
# 记录版本历史
version_record = {
"version": str(new_version),
"previous_version": str(self.version),
"change_type": change_type,
"changes": changes,
"timestamp": datetime.now().isoformat(),
"data_hash": self.calculate_dataset_hash(changes.get("dataset_path"))
}
self.version_history.append(version_record)
self.version = new_version
return str(new_version)
def generate_changelog(self) -> str:
"""生成变更日志"""
changelog = f"# 数据集变更日志\n\n"
changelog += f"当前版本: {self.version}\n\n"
for record in reversed(self.version_history[-10:]): # 最近10个版本
changelog += f"## {record['version']} ({record['timestamp'][:10]})\n\n"
changelog += f"**变更类型**: {record['change_type']}\n\n"
changes = record['changes']
if 'new_samples' in changes:
changelog += f"- 新增样本: {changes['new_samples']:,}\n"
if 'quality_improvements' in changes:
changelog += f"- 质量改进: {changes['quality_improvements']}\n"
if 'bug_fixes' in changes:
changelog += f"- Bug修复: {changes['bug_fixes']}\n"
changelog += "\n"
return changelog
def calculate_dataset_hash(self, dataset_path: str) -> str:
"""计算数据集哈希"""
if not dataset_path:
return ""
try:
with open(dataset_path, 'rb') as f:
file_hash = hashlib.sha256()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest()[:16]
except FileNotFoundError:
return "file_not_found"
def compare_versions(self, version1: str, version2: str) -> Dict:
"""比较两个版本"""
v1 = semver.VersionInfo.parse(version1)
v2 = semver.VersionInfo.parse(version2)
comparison = {
"version1": str(v1),
"version2": str(v2),
"is_newer": v1 > v2,
"is_compatible": v1.major == v2.major,
"difference": {
"major": v1.major - v2.major,
"minor": v1.minor - v2.minor,
"patch": v1.patch - v2.patch
}
}
return comparison
# 使用示例
version_manager = DatasetVersionManager(1, 0, 0)
# 记录一次小版本更新
new_version = version_manager.bump_version("minor", {
"new_samples": 5000,
"quality_improvements": "Improved medical terminology standardization",
"source": "Added data from clinical trials"
})
print(f"新版本号: {new_version}")
print(version_manager.generate_changelog())
3.3 数据集发布与共享
3.3.1 数据集打包与文档
标准化发布流程
# dataset_publisher.py
import os
import json
import shutil
from datetime import datetime
from typing import Dict, List
import zipfile
class DatasetPublisher:
"""数据集发布器"""
def __init__(self, output_dir: str = "./published_datasets"):
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
def publish_dataset(self, dataset_config: Dict, data_files: List[str]) -> str:
"""发布数据集"""
# 生成发布ID
publish_id = f"medical_dataset_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
publish_path = os.path.join(self.output_dir, publish_id)
os.makedirs(publish_path, exist_ok=True)
# 1. 复制数据文件
data_dir = os.path.join(publish_path, "data")
os.makedirs(data_dir, exist_ok=True)
for file_path in data_files:
if os.path.exists(file_path):
shutil.copy2(file_path, data_dir)
# 2. 生成数据集卡片
dataset_card = self.generate_dataset_card(dataset_config)
card_path = os.path.join(publish_path, "README.md")
with open(card_path, 'w', encoding='utf-8') as f:
f.write(dataset_card)
# 3. 生成元数据文件
metadata = self.generate_metadata(dataset_config, data_files)
metadata_path = os.path.join(publish_path, "metadata.json")
with open(metadata_path, 'w', encoding='utf-8') as f:
json.dump(metadata, f, indent=2, ensure_ascii=False)
# 4. 生成使用示例
examples = self.generate_usage_examples(dataset_config)
examples_path = os.path.join(publish_path, "examples.py")
with open(examples_path, 'w', encoding='utf-8') as f:
f.write(examples)
# 5. 生成许可证文件
license_text = self.generate_license(dataset_config.get("license", "apache-2.0"))
license_path = os.path.join(publish_path, "LICENSE")
with open(license_path, 'w', encoding='utf-8') as f:
f.write(license_text)
# 6. 创建压缩包
archive_path = self.create_archive(publish_path, publish_id)
print(f"数据集发布完成: {publish_path}")
print(f"压缩包: {archive_path}")
return publish_path
def generate_dataset_card(self, config: Dict) -> str:
"""生成数据集卡片"""
card_content = f"""# {config.get('name', 'Medical Dataset')}
## 数据集概述
- **名称**: {config.get('name', 'Medical Dataset')}
- **版本**: {config.get('version', '1.0.0')}
- **发布日期**: {datetime.now().strftime('%Y-%m-%d')}
- **数据集大小**: {config.get('total_samples', 'Unknown')} 条样本
- **领域**: {config.get('domain', 'Medical')}
## 数据集描述
{config.get('description', '这是一个医疗领域的高质量数据集。')}
### 数据组成
- 训练集: {config.get('train_size', 'N/A')} 条样本
- 验证集: {config.get('val_size', 'N/A')} 条样本
- 测试集: {config.get('test_size', 'N/A')} 条样本
### 数据格式
每条数据包含以下字段:
- `instruction`: 指令或问题
- `input`: 输入上下文(可选)
- `output`: 期望的回答
- `metadata`: 元数据信息
### 质量指标
- 准确性: {config.get('quality_metrics', {}).get('accuracy', 'N/A')}
- 完整性: {config.get('quality_metrics', {}).get('completeness', 'N/A')}
- 一致性: {config.get('quality_metrics', {}).get('consistency', 'N/A')}
## 使用示例
```python
from datasets import load_dataset
# 加载数据集
dataset = load_dataset("path/to/data")
# 使用示例
for item in dataset['train']:
print(f"指令: {item['instruction']}")
print(f"回答: {item['output']}")
```
## 许可证
{config.get('license', 'Apache 2.0')}
## 引用
如果您使用此数据集,请引用:
```bibtex
{config.get('citation', 'Citation information to be added')}
```
## 联系方式
{config.get('contact', 'Contact information to be added')}
"""
return card_content
def generate_metadata(self, config: Dict, data_files: List[str]) -> Dict:
"""生成元数据"""
metadata = {
"name": config.get('name'),
"version": config.get('version'),
"description": config.get('description'),
"created_at": datetime.now().isoformat(),
"dataset_size": {
"total_samples": config.get('total_samples'),
"train_samples": config.get('train_size'),
"val_samples": config.get('val_size'),
"test_samples": config.get('test_size')
},
"files": [
{
"name": os.path.basename(file_path),
"size": os.path.getsize(file_path),
"checksum": self.calculate_file_checksum(file_path)
}
for file_path in data_files
],
"quality_metrics": config.get('quality_metrics', {}),
"license": config.get('license', 'apache-2.0'),
"tags": config.get('tags', []),
"languages": config.get('languages', ['zh']),
"task_categories": config.get('task_categories', ['text-generation'])
}
return metadata
def generate_usage_examples(self, config: Dict) -> str:
"""生成使用示例"""
examples = f"""# 数据集使用示例
import json
import pandas as pd
from datasets import load_dataset
# 方法1: 直接加载JSON文件
def load_json_dataset(file_path):
data = []
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
data.append(json.loads(line.strip()))
return data
# 方法2: 使用pandas加载
def load_pandas_dataset(file_path):
return pd.read_json(file_path, lines=True)
# 方法3: 使用HuggingFace datasets
def load_hf_dataset(dataset_path):
dataset = load_dataset('json', data_files=dataset_path)
return dataset
# 使用示例
if __name__ == "__main__":
# 加载数据
dataset = load_json_dataset("train.jsonl")
# 查看样本
for i, sample in enumerate(dataset[:5]):
print(f"=== 样本 {i+1} ===")
print(f"指令: {{sample['instruction']}}")
print(f"回答: {{sample['output'][:200]}}...")
print()
# 数据统计
print(f"总样本数: {{len(dataset)}}")
print(f"平均回答长度: {{sum(len(s['output']) for s in dataset) / len(dataset):.0f}} 字符")
"""
return examples
def calculate_file_checksum(self, file_path: str) -> str:
"""计算文件校验和"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def create_archive(self, source_path: str, archive_name: str) -> str:
"""创建压缩包"""
archive_path = os.path.join(self.output_dir, f"{archive_name}.zip")
with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(source_path):
for file in files:
file_path = os.path.join(root, file)
arc_path = os.path.relpath(file_path, source_path)
zipf.write(file_path, arc_path)
return archive_path
总结
构建高质量的垂直领域数据集是一个系统性的工程,需要从需求分析、数据收集、清洗处理、质量控制到发布维护的全流程管理。关键要点包括:
- 明确需求定义:深入了解业务场景和质量要求
- 多源数据整合:结合专业数据源和智能收集技术
- 严格质量控制:建立多层次的质量保证体系
- 自动化工具链:构建高效的自动化处理流程
- 持续维护更新:建立数据生命周期管理机制
- 标准化发布:遵循行业标准进行数据集发布
通过遵循这些最佳实践,可以构建出满足垂直领域大模型训练需求的高质量数据集,为模型的成功应用奠定坚实基础。
更多推荐
所有评论(0)