15. 如何打造垂直领域高质量数据集?模型高质量数据集的打造流程

高质量数据集概述

什么是高质量数据集?

高质量数据集是垂直领域大模型成功的基石,它不仅要满足数量要求,更要在准确性、多样性、一致性和时效性等方面达到专业标准。

1.1 数据集质量维度

核心质量指标

质量维度 定义 评估方法 重要性
准确性 数据内容的真实性和正确性 专家验证、交叉验证 极高
完整性 数据记录的完整程度 缺失值分析、字段完整性
一致性 数据格式和标准的一致性 格式验证、标准检查
时效性 数据的更新程度和新鲜度 时间戳分析、版本控制
代表性 数据覆盖目标领域的程度 分布分析、覆盖度评估
多样性 数据来源和类型的多样性 来源统计、类型分布
1.2 垂直领域数据特点

医疗领域特征

  • 专业术语密集,需要标准化处理
  • 隐私敏感,需要严格脱敏
  • 知识更新快,需要持续维护
  • 准确性要求极高,容错率低

金融领域特征

  • 数值精度要求高,小数位敏感
  • 时效性强,需要实时更新
  • 合规要求严格,需要审计追踪
  • 多源异构,需要统一格式

法律领域特征

  • 逻辑结构严谨,需要保持完整性
  • 引用关系复杂,需要维护关联
  • 地域差异明显,需要分类处理
  • 版本控制重要,需要追踪变更

数据集构建流程

2.1 需求分析与规划

2.1.1 业务需求定义

医疗问诊数据集需求示例

# medical_dataset_requirements.yaml
dataset_specifications:
  name: "医疗问诊高质量数据集"
  version: "1.0.0"
  domain: "医疗问诊"
  
  scope:
    coverage_areas:
      - "内科常见病"
      - "外科基础病"
      - "儿科多发病"
      - "妇科基础病"
      - "急诊处理"
    
    excluded_areas:
      - "罕见疾病"
      - "实验性治疗"
      - "未确认疗法"
  
  quality_requirements:
    accuracy_target: 0.95
    completeness_target: 0.98
    consistency_target: 0.99
    medical_expert_validation: true
    peer_review_required: true
  
  quantity_requirements:
    total_samples: 50000
    min_samples_per_disease: 100
    max_samples_per_disease: 1000
    train_val_test_split: [0.8, 0.1, 0.1]
  
  format_requirements:
    input_format: "instruction-input-output"
    output_length_range: [50, 500]
    language: "中文"
    encoding: "UTF-8"
2.1.2 数据源识别与评估

数据源分类矩阵

# data_source_evaluation.py
data_source_categories = {
    "primary_sources": {
        "electronic_health_records": {
            "quality_score": 9.5,
            "availability": "high",
            "access_difficulty": "high",
            "privacy_level": "critical",
            "update_frequency": "real-time"
        },
        "medical_literature": {
            "quality_score": 9.0,
            "availability": "medium",
            "access_difficulty": "low",
            "privacy_level": "none",
            "update_frequency": "monthly"
        },
        "clinical_guidelines": {
            "quality_score": 9.8,
            "availability": "high",
            "access_difficulty": "low",
            "privacy_level": "none",
            "update_frequency": "quarterly"
        }
    },
    
    "secondary_sources": {
        "medical_forums": {
            "quality_score": 6.5,
            "availability": "high",
            "access_difficulty": "low",
            "privacy_level": "medium",
            "update_frequency": "daily"
        },
        "health_websites": {
            "quality_score": 7.0,
            "availability": "high",
            "access_difficulty": "low",
            "privacy_level": "low",
            "update_frequency": "weekly"
        }
    }
}

2.2 数据收集与获取

2.2.1 多源数据收集策略

分层数据收集框架

# multi_source_collector.py
import asyncio
import aiohttp
from typing import List, Dict
import json
from datetime import datetime

class MultiSourceDataCollector:
    """多源数据收集器"""
    
    def __init__(self, config_path: str):
        self.config = self.load_config(config_path)
        self.collectors = self.initialize_collectors()
        
    def initialize_collectors(self) -> Dict:
        """初始化各个收集器"""
        return {
            "pubmed": PubMedCollector(self.config["pubmed"]),
            "clinical_trials": ClinicalTrialsCollector(self.config["clinical_trials"]),
            "medical_wiki": MedicalWikiCollector(self.config["medical_wiki"]),
            "health_forums": HealthForumsCollector(self.config["health_forums"])
        }
    
    async def collect_all_sources(self, query_terms: List[str]) -> Dict:
        """并行收集所有数据源"""
        tasks = []
        
        for source_name, collector in self.collectors.items():
            task = asyncio.create_task(
                self.collect_from_source(source_name, collector, query_terms)
            )
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 合并结果
        combined_data = {}
        for i, (source_name, result) in enumerate(zip(self.collectors.keys(), results)):
            if isinstance(result, Exception):
                print(f"Error collecting from {source_name}: {result}")
                combined_data[source_name] = []
            else:
                combined_data[source_name] = result
        
        return combined_data
    
    async def collect_from_source(self, source_name: str, collector, query_terms: List[str]):
        """从特定源收集数据"""
        try:
            data = await collector.collect(query_terms)
            print(f"Collected {len(data)} items from {source_name}")
            return data
        except Exception as e:
            print(f"Failed to collect from {source_name}: {e}")
            return []

# PubMed收集器实现
class PubMedCollector:
    """PubMed数据收集器"""
    
    def __init__(self, config: Dict):
        self.api_key = config.get("api_key")
        self.base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
        self.rate_limit = config.get("rate_limit", 3)  # 每秒请求数
        
    async def collect(self, query_terms: List[str]) -> List[Dict]:
        """收集PubMed数据"""
        results = []
        semaphore = asyncio.Semaphore(self.rate_limit)
        
        async with aiohttp.ClientSession() as session:
            for term in query_terms:
                async with semaphore:
                    data = await self.search_pubmed(session, term)
                    processed_data = self.process_pubmed_data(data)
                    results.extend(processed_data)
        
        return results
    
    async def search_pubmed(self, session: aiohttp.ClientSession, query: str) -> Dict:
        """搜索PubMed"""
        search_url = f"{self.base_url}/esearch.fcgi"
        params = {
            "db": "pubmed",
            "term": query,
            "retmode": "json",
            "retmax": 100
        }
        
        if self.api_key:
            params["api_key"] = self.api_key
        
        async with session.get(search_url, params=params) as response:
            if response.status == 200:
                return await response.json()
            else:
                print(f"PubMed API error: {response.status}")
                return {}
    
    def process_pubmed_data(self, data: Dict) -> List[Dict]:
        """处理PubMed数据"""
        results = []
        
        search_results = data.get("esearchresult", {})
        id_list = search_results.get("idlist", [])
        
        for pmid in id_list:
            result = {
                "source": "pubmed",
                "id": pmid,
                "title": self.extract_title(data),
                "abstract": self.extract_abstract(data),
                "keywords": self.extract_keywords(data),
                "timestamp": datetime.now().isoformat()
            }
            results.append(result)
        
        return results
2.2.2 数据爬取与API集成

智能爬虫系统

# intelligent_scraper.py
import scrapy
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
import re
from urllib.parse import urljoin

class MedicalSpider(scrapy.Spider):
    """医疗数据爬虫"""
    
    name = 'medical_spider'
    allowed_domains = ['mayoclinic.org', 'medlineplus.gov', 'webmd.com']
    
    custom_settings = {
        'ROBOTSTXT_OBEY': True,
        'DOWNLOAD_DELAY': 1,
        'RANDOMIZE_DOWNLOAD_DELAY': True,
        'USER_AGENT': 'MedicalBot/1.0 (Educational Purpose)',
        'FEED_EXPORT_ENCODING': 'utf-8',
        'CONCURRENT_REQUESTS': 4,
        'CONCURRENT_REQUESTS_PER_DOMAIN': 2,
    }
    
    def __init__(self, start_urls=None, *args, **kwargs):
        super(MedicalSpider, self).__init__(*args, **kwargs)
        self.start_urls = start_urls or [
            'https://www.mayoclinic.org/diseases-conditions',
            'https://medlineplus.gov/healthtopics.html'
        ]
    
    def parse(self, response):
        """解析医疗条件页面"""
        # 提取条件列表
        conditions = response.css('.condition-link::text').getall()
        
        for condition in conditions:
            condition_url = self.build_condition_url(condition, response.url)
            yield scrapy.Request(
                condition_url,
                callback=self.parse_condition,
                meta={'condition': condition.strip()}
            )
    
    def parse_condition(self, response):
        """解析具体疾病页面"""
        condition = response.meta['condition']
        
        # 提取症状
        symptoms_section = response.css('#symptoms')
        symptoms = symptoms_section.css('li::text').getall()
        
        # 提取治疗信息
        treatment_section = response.css('#treatment')
        treatments = treatment_section.css('li::text').getall()
        
        # 提取原因
        causes_section = response.css('#causes')
        causes = causes_section.css('p::text').getall()
        
        yield {
            'condition': condition,
            'symptoms': symptoms,
            'treatments': treatments,
            'causes': causes,
            'source_url': response.url,
            'scraped_at': datetime.now().isoformat(),
            'confidence_score': self.calculate_confidence(response)
        }
    
    def calculate_confidence(self, response):
        """计算数据置信度"""
        # 基于页面结构和内容完整性计算置信度
        confidence = 0.0
        
        # 检查是否有症状部分
        if response.css('#symptoms'):
            confidence += 0.3
        
        # 检查是否有治疗部分
        if response.css('#treatment'):
            confidence += 0.3
        
        # 检查内容长度
        content_length = len(response.text)
        if content_length > 1000:
            confidence += 0.2
        
        # 检查是否有医学关键词
        medical_keywords = ['symptom', 'treatment', 'diagnosis', 'condition']
        keyword_count = sum(1 for keyword in medical_keywords if keyword in response.text.lower())
        confidence += (keyword_count / len(medical_keywords)) * 0.2
        
        return min(confidence, 1.0)
    
    def build_condition_url(self, condition, base_url):
        """构建疾病详情页URL"""
        # 实现URL构建逻辑
        condition_slug = condition.lower().replace(' ', '-')
        return urljoin(base_url, f'/{condition_slug}/symptoms-causes/syc-2030000')

# 运行爬虫
def run_medical_spider(output_file="medical_data.json"):
    """运行医疗数据爬虫"""
    process = CrawlerProcess(get_project_settings())
    process.crawl(MedicalSpider)
    process.start()
    
    print(f"Medical data scraped and saved to {output_file}")

2.3 数据清洗与预处理

2.3.1 医疗文本清洗

专业文本清洗管道

# medical_text_cleaning.py
import re
import spacy
from typing import List, Dict
import unicodedata

class MedicalTextCleaner:
    """医疗文本清洗器"""
    
    def __init__(self):
        self.nlp = spacy.load("en_core_sci_sm")  # 科学文献模型
        self.medical_abbreviations = self.load_medical_abbreviations()
        
    def clean_medical_text(self, text: str) -> Dict:
        """清洗医疗文本"""
        original_text = text
        
        # 步骤1: 基础清洗
        text = self.basic_cleaning(text)
        
        # 步骤2: 医学术语标准化
        text = self.standardize_medical_terms(text)
        
        # 步骤3: 缩写扩展
        text = self.expand_abbreviations(text)
        
        # 步骤4: 实体识别和标准化
        entities = self.extract_medical_entities(text)
        
        # 步骤5: 语法检查
        text = self.grammar_check(text)
        
        return {
            "original_text": original_text,
            "cleaned_text": text,
            "entities": entities,
            "cleaning_metadata": self.get_cleaning_metadata(original_text, text)
        }
    
    def basic_cleaning(self, text: str) -> str:
        """基础文本清洗"""
        # 移除HTML标签
        text = re.sub(r'<[^>]+>', '', text)
        
        # 标准化Unicode字符
        text = unicodedata.normalize('NFKC', text)
        
        # 移除多余的空白字符
        text = re.sub(r'\s+', ' ', text)
        
        # 修正标点符号
        text = re.sub(r'\s+([,.!?;:])', r'\1', text)
        
        return text.strip()
    
    def standardize_medical_terms(self, text: str) -> str:
        """标准化医学术语"""
        # 统一疾病名称
        disease_mapping = {
            "heart attack": "myocardial infarction",
            "high blood pressure": "hypertension",
            "sugar diabetes": "diabetes mellitus",
            "bp": "blood pressure",
            "hr": "heart rate"
        }
        
        for common_term, medical_term in disease_mapping.items():
            text = re.sub(rf'\b{common_term}\b', medical_term, text, flags=re.IGNORECASE)
        
        return text
    
    def expand_abbreviations(self, text: str) -> str:
        """扩展医学缩写"""
        abbreviation_map = {
            r'\bq\.d\.?\b': 'once daily',
            r'\bb\.i\.d\.?\b': 'twice daily',
            r'\bt\.i\.d\.?\b': 'three times daily',
            r'\bq\.i\.d\.?\b': 'four times daily',
            r'\bp\.o\.?\b': 'by mouth',
            r'\bi\.m\.?\b': 'intramuscular',
            r'\bi\.v\.?\b': 'intravenous',
            r'\bs\.c\.?\b': 'subcutaneous',
            r'\bp\.r\.n\.?\b': 'as needed',
            r'\bstat\b': 'immediately'
        }
        
        for pattern, expansion in abbreviation_map.items():
            text = re.sub(pattern, expansion, text, flags=re.IGNORECASE)
        
        return text
    
    def extract_medical_entities(self, text: str) -> List[Dict]:
        """提取医学实体"""
        doc = self.nlp(text)
        entities = []
        
        for ent in doc.ents:
            if ent.label_ in ["DISEASE", "SYMPTOM", "MEDICATION", "PROCEDURE"]:
                entities.append({
                    "text": ent.text,
                    "label": ent.label_,
                    "start": ent.start_char,
                    "end": ent.end_char,
                    "confidence": 0.9  # spaCy默认置信度
                })
        
        return entities
    
    def grammar_check(self, text: str) -> str:
        """基础语法检查"""
        # 修正常见语法错误
        corrections = {
            r'\bain\'t\b': "isn't",
            r'\bgonna\b': "going to",
            r'\bgotta\b': "have got to",
            r'\bwanna\b': "want to"
        }
        
        for pattern, correction in corrections.items():
            text = re.sub(pattern, correction, text, flags=re.IGNORECASE)
        
        return text
    
    def get_cleaning_metadata(self, original: str, cleaned: str) -> Dict:
        """获取清洗元数据"""
        return {
            "original_length": len(original),
            "cleaned_length": len(cleaned),
            "length_change": len(cleaned) - len(original),
            "abbreviations_expanded": self.count_abbreviations_expanded(original, cleaned),
            "entities_found": len(self.extract_medical_entities(cleaned))
        }
    
    def count_abbreviations_expanded(self, original: str, cleaned: str) -> int:
        """计算扩展的缩写数量"""
        # 简化的实现
        return len(original) - len(cleaned)  # 长度变化作为近似指标
2.3.2 数据去重与融合

智能去重算法

# intelligent_deduplication.py
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from typing import List, Tuple

class IntelligentDeduplicator:
    """智能去重器"""
    
    def __init__(self, similarity_threshold=0.85):
        self.similarity_threshold = similarity_threshold
        self.vectorizer = TfidfVectorizer(
            max_features=5000,
            ngram_range=(1, 3),
            stop_words='english'
        )
        
    def find_duplicates(self, texts: List[str]) -> List[Tuple[int, int, float]]:
        """查找重复文本"""
        if len(texts) < 2:
            return []
        
        # 向量化文本
        tfidf_matrix = self.vectorizer.fit_transform(texts)
        
        # 计算相似度矩阵
        similarity_matrix = cosine_similarity(tfidf_matrix)
        
        # 找到相似度超过阈值的文本对
        duplicates = []
        n = len(texts)
        
        for i in range(n):
            for j in range(i+1, n):
                similarity = similarity_matrix[i][j]
                if similarity > self.similarity_threshold:
                    duplicates.append((i, j, similarity))
        
        return duplicates
    
    def remove_duplicates(self, texts: List[str], keep_strategy="longest") -> List[str]:
        """移除重复文本"""
        duplicates = self.find_duplicates(texts)
        
        if not duplicates:
            return texts
        
        # 构建重复关系图
        duplicate_groups = self.build_duplicate_groups(duplicates, len(texts))
        
        # 根据策略选择保留的文本
        unique_texts = []
        
        for group in duplicate_groups:
            if len(group) == 1:
                unique_texts.append(texts[group[0]])
            else:
                kept_index = self.select_text_to_keep(group, texts, keep_strategy)
                unique_texts.append(texts[kept_index])
        
        return unique_texts
    
    def build_duplicate_groups(self, duplicates: List[Tuple[int, int, float]], n_texts: int) -> List[List[int]]:
        """构建重复文本组"""
        # 使用并查集算法
        parent = list(range(n_texts))
        
        def find(x):
            if parent[x] != x:
                parent[x] = find(parent[x])
            return parent[x]
        
        def union(x, y):
            px, py = find(x), find(y)
            if px != py:
                parent[px] = py
        
        # 合并重复的文本
        for i, j, _ in duplicates:
            union(i, j)
        
        # 构建组
        groups = {}
        for i in range(n_texts):
            root = find(i)
            if root not in groups:
                groups[root] = []
            groups[root].append(i)
        
        return list(groups.values())
    
    def select_text_to_keep(self, group: List[int], texts: List[str], strategy: str) -> int:
        """选择要保留的文本"""
        if strategy == "longest":
            return max(group, key=lambda i: len(texts[i]))
        elif strategy == "shortest":
            return min(group, key=lambda i: len(texts[i]))
        elif strategy == "highest_quality":
            # 这里可以实现质量评分逻辑
            return max(group, key=lambda i: self.quality_score(texts[i]))
        else:
            return group[0]  # 默认保留第一个
    
    def quality_score(self, text: str) -> float:
        """计算文本质量分数"""
        score = 0.0
        
        # 长度因子
        if 100 <= len(text) <= 1000:
            score += 0.3
        
        # 完整性因子
        if text.endswith('.') or text.endswith('?'):
            score += 0.2
        
        # 关键词密度因子
        words = text.split()
        if len(words) > 10:
            unique_words = set(words)
            score += (len(unique_words) / len(words)) * 0.3
        
        # 语法正确性因子(简化版)
        if not any(char.isdigit() for char in text):
            score += 0.2
        
        return min(score, 1.0)

2.4 数据标注与验证

2.4.1 众包标注平台

标注任务设计

# annotation_task_design.py
from typing import List, Dict, Optional
from dataclasses import dataclass
import json

@dataclass
class AnnotationTask:
    """标注任务定义"""
    task_id: str
    task_type: str  # "classification", "ner", "qa", "ranking"
    instructions: str
    examples: List[Dict]
    guidelines: str
    difficulty: str  # "easy", "medium", "hard"
    estimated_time: int  # 分钟
    reward: float
    
class MedicalAnnotationPlatform:
    """医疗标注平台"""
    
    def __init__(self):
        self.annotators = {}
        self.quality_control = QualityControlSystem()
        
    def create_annotation_task(self, data_item: Dict, task_type: str) -> AnnotationTask:
        """创建标注任务"""
        
        if task_type == "medical_qa":
            return self.create_medical_qa_task(data_item)
        elif task_type == "symptom_classification":
            return self.create_symptom_classification_task(data_item)
        elif task_type == "treatment_ranking":
            return self.create_treatment_ranking_task(data_item)
        else:
            raise ValueError(f"Unknown task type: {task_type}")
    
    def create_medical_qa_task(self, data_item: Dict) -> AnnotationTask:
        """创建医疗问答标注任务"""
        return AnnotationTask(
            task_id=f"medical_qa_{data_item['id']}",
            task_type="qa",
            instructions="""
            请根据提供的医疗问题,判断AI生成的回答是否准确、完整、安全。
            
            评估标准:
            1. 准确性:信息是否正确,有无医学错误
            2. 完整性:是否全面回答了问题
            3. 安全性:是否包含适当的医疗免责声明
            4. 可读性:语言是否清晰易懂
            
            评分:1-5分,5分为最佳
            """,
            examples=[
                {
                    "question": "What are the symptoms of diabetes?",
                    "ai_answer": "Common symptoms include frequent urination, excessive thirst, and fatigue. However, please consult a healthcare professional for proper diagnosis.",
                    "rating": 5,
                    "feedback": "Good answer with appropriate disclaimer"
                }
            ],
            guidelines="Focus on medical accuracy and patient safety",
            difficulty="medium",
            estimated_time=3,
            reward=0.5
        )
    
    def distribute_tasks(self, tasks: List[AnnotationTask], annotator_pool: List[str]):
        """分发标注任务"""
        distribution = {}
        
        for task in tasks:
            # 选择最合适的标注员
            suitable_annotators = self.select_suitable_annotators(task, annotator_pool)
            
            # 分配任务给多个标注员(冗余标注)
            assigned_annotators = suitable_annotators[:3]  # 3个标注员
            
            distribution[task.task_id] = {
                "task": task,
                "annotators": assigned_annotators,
                "deadline": self.calculate_deadline(task)
            }
        
        return distribution
    
    def select_suitable_annotators(self, task: AnnotationTask, annotator_pool: List[str]) -> List[str]:
        """选择合适的标注员"""
        suitable = []
        
        for annotator_id in annotator_pool:
            annotator = self.annotators.get(annotator_id)
            
            if not annotator:
                continue
            
            # 检查专业背景
            if task.task_type == "medical_qa" and annotator.get("medical_background"):
                suitable.append(annotator_id)
            
            # 检查历史表现
            if annotator.get("accuracy_score", 0) > 0.85:
                suitable.append(annotator_id)
            
            # 检查活跃度
            if annotator.get("recent_activity", 0) > 0.8:
                suitable.append(annotator_id)
        
        return suitable[:10]  # 返回前10个最合适的标注员
2.4.2 质量控制机制

多轮标注验证

# quality_control_system.py
import statistics
from typing import List, Dict, Tuple

class QualityControlSystem:
    """质量控制系统"""
    
    def __init__(self):
        self.consensus_threshold = 0.7
        self.expert_weight = 2.0
        self.min_annotations = 3
        
    def calculate_inter_annotator_agreement(self, annotations: List[Dict]) -> float:
        """计算标注者间一致性"""
        if len(annotations) < 2:
            return 0.0
        
        # 提取评分
        ratings = [ann['rating'] for ann in annotations]
        
        # 计算Fleiss' Kappa或Krippendorff's Alpha
        # 这里使用简化的标准差方法
        if len(set(ratings)) == 1:
            return 1.0
        
        std_dev = statistics.stdev(ratings)
        max_rating = max(ratings)
        min_rating = min(ratings)
        rating_range = max_rating - min_rating
        
        # 标准化标准差
        normalized_std = std_dev / rating_range if rating_range > 0 else 0
        
        # 转换为一致性分数
        agreement = max(0, 1 - normalized_std)
        
        return agreement
    
    def resolve_disagreements(self, annotations: List[Dict]) -> Dict:
        """解决标注分歧"""
        agreement = self.calculate_inter_annotator_agreement(annotations)
        
        if agreement >= self.consensus_threshold:
            # 高一致性,使用简单平均
            return self.simple_consensus(annotations)
        else:
            # 低一致性,需要专家介入
            return self.expert_resolution(annotations)
    
    def simple_consensus(self, annotations: List[Dict]) -> Dict:
        """简单共识算法"""
        ratings = [ann['rating'] for ann in annotations]
        avg_rating = statistics.mean(ratings)
        
        # 选择最接近平均值的标注
        closest_annotation = min(annotations, key=lambda x: abs(x['rating'] - avg_rating))
        
        return {
            "final_annotation": closest_annotation,
            "consensus_method": "simple_average",
            "agreement_score": self.calculate_inter_annotator_agreement(annotations),
            "num_annotations": len(annotations)
        }
    
    def expert_resolution(self, annotations: List[Dict]) -> Dict:
        """专家解决机制"""
        # 识别专家标注员
        expert_annotations = [ann for ann in annotations if ann.get('is_expert', False)]
        
        if expert_annotations:
            # 使用专家意见
            expert_consensus = self.expert_consensus(expert_annotations)
            return {
                "final_annotation": expert_consensus,
                "consensus_method": "expert_consensus",
                "agreement_score": self.calculate_inter_annotator_agreement(expert_annotations),
                "num_experts": len(expert_annotations)
            }
        else:
            # 没有专家,需要重新标注
            return {
                "final_annotation": None,
                "consensus_method": "requires_expert_review",
                "agreement_score": self.calculate_inter_annotator_agreement(annotations),
                "action": "send_to_expert_panel"
            }
    
    def expert_consensus(self, expert_annotations: List[Dict]) -> Dict:
        """专家共识算法"""
        # 加权平均,专家权重更高
        weighted_ratings = []
        
        for ann in expert_annotations:
            rating = ann['rating']
            weight = ann.get('expert_weight', self.expert_weight)
            weighted_ratings.extend([rating] * int(weight))
        
        consensus_rating = statistics.mean(weighted_ratings)
        
        # 选择最接近共识的专家标注
        closest_expert = min(expert_annotations, 
                           key=lambda x: abs(x['rating'] - consensus_rating))
        
        return closest_expert
    
    def quality_assurance_check(self, dataset: List[Dict]) -> Dict:
        """质量保证检查"""
        quality_report = {
            "total_samples": len(dataset),
            "quality_score": 0.0,
            "issues_found": [],
            "recommendations": []
        }
        
        # 准确性检查
        accuracy_issues = self.check_accuracy(dataset)
        if accuracy_issues:
            quality_report["issues_found"].extend(accuracy_issues)
        
        # 一致性检查
        consistency_score = self.check_consistency(dataset)
        quality_report["quality_score"] = consistency_score
        
        # 完整性检查
        completeness_issues = self.check_completeness(dataset)
        if completeness_issues:
            quality_report["issues_found"].extend(completeness_issues)
        
        # 生成改进建议
        quality_report["recommendations"] = self.generate_quality_recommendations(quality_report)
        
        return quality_report
    
    def check_accuracy(self, dataset: List[Dict]) -> List[Dict]:
        """检查准确性"""
        issues = []
        
        for i, item in enumerate(dataset):
            # 检查医学事实一致性
            if self.has_medical_inconsistencies(item):
                issues.append({
                    "type": "medical_inconsistency",
                    "sample_id": item.get("id", i),
                    "description": "Medical facts are inconsistent",
                    "severity": "high"
                })
            
            # 检查逻辑一致性
            if self.has_logical_inconsistencies(item):
                issues.append({
                    "type": "logical_inconsistency", 
                    "sample_id": item.get("id", i),
                    "description": "Logical flow is inconsistent",
                    "severity": "medium"
                })
        
        return issues
    
    def check_consistency(self, dataset: List[Dict]) -> float:
        """检查一致性"""
        if len(dataset) < 2:
            return 1.0
        
        # 计算格式一致性
        format_consistency = self.calculate_format_consistency(dataset)
        
        # 计算术语一致性
        terminology_consistency = self.calculate_terminology_consistency(dataset)
        
        # 计算风格一致性
        style_consistency = self.calculate_style_consistency(dataset)
        
        # 综合一致性分数
        overall_consistency = (
            format_consistency * 0.4 +
            terminology_consistency * 0.4 +
            style_consistency * 0.2
        )
        
        return overall_consistency
    
    def generate_quality_recommendations(self, quality_report: Dict) -> List[str]:
        """生成质量改进建议"""
        recommendations = []
        
        if quality_report["quality_score"] < 0.8:
            recommendations.append("Consider re-annotating low-quality samples")
        
        high_severity_issues = [issue for issue in quality_report["issues_found"] 
                              if issue.get("severity") == "high"]
        
        if len(high_severity_issues) > 5:
            recommendations.append("Review annotation guidelines and provide additional training")
        
        if quality_report["quality_score"] < 0.9:
            recommendations.append("Implement additional quality control checkpoints")
        
        return recommendations

数据集构建实操

3.1 DIY数据集构建工具

3.1.1 数据集构建流水线

完整构建脚本

#!/bin/bash
# build_medical_dataset.sh

# 配置参数
DATASET_NAME="medical_consultation_v1"
OUTPUT_DIR="./datasets/${DATASET_NAME}"
MIN_SAMPLES=50000
QUALITY_THRESHOLD=0.85

# 创建输出目录
mkdir -p ${OUTPUT_DIR}/{raw,processed,final}

echo "=== 开始构建医疗问诊数据集 ==="
echo "数据集名称: ${DATASET_NAME}"
echo "目标样本数: ${MIN_SAMPLES}"
echo "质量阈值: ${QUALITY_THRESHOLD}"

# 步骤1: 数据收集
echo "步骤1: 收集原始数据..."
python scripts/collect_data.py \
    --sources pubmed,clinical_trials,medical_wiki \
    --queries ./config/medical_queries.txt \
    --output ${OUTPUT_DIR}/raw/collected_data.jsonl \
    --max_samples 100000

# 步骤2: 数据清洗
echo "步骤2: 清洗数据..."
python scripts/clean_data.py \
    --input ${OUTPUT_DIR}/raw/collected_data.jsonl \
    --output ${OUTPUT_DIR}/processed/cleaned_data.jsonl \
    --config ./config/cleaning_config.yaml

# 步骤3: 去重处理
echo "步骤3: 去重处理..."
python scripts/deduplicate.py \
    --input ${OUTPUT_DIR}/processed/cleaned_data.jsonl \
    --output ${OUTPUT_DIR}/processed/deduplicated_data.jsonl \
    --similarity_threshold 0.85

# 步骤4: 标注任务
echo "步骤4: 创建标注任务..."
python scripts/create_annotation_tasks.py \
    --input ${OUTPUT_DIR}/processed/deduplicated_data.jsonl \
    --output ${OUTPUT_DIR}/annotation/tasks.json \
    --num_tasks 1000

# 步骤5: 质量控制
echo "步骤5: 质量控制..."
python scripts/quality_control.py \
    --input ${OUTPUT_DIR}/annotation/annotated_data.jsonl \
    --output ${OUTPUT_DIR}/processed/quality_checked_data.jsonl \
    --quality_threshold ${QUALITY_THRESHOLD}

# 步骤6: 数据集分割
echo "步骤6: 数据集分割..."
python scripts/split_dataset.py \
    --input ${OUTPUT_DIR}/processed/quality_checked_data.jsonl \
    --output_dir ${OUTPUT_DIR}/final \
    --split_ratio 0.8,0.1,0.1 \
    --shuffle true

# 步骤7: 生成数据集报告
echo "步骤7: 生成数据集报告..."
python scripts/generate_dataset_report.py \
    --dataset_dir ${OUTPUT_DIR}/final \
    --output ${OUTPUT_DIR}/dataset_report.html

# 步骤8: 打包数据集
echo "步骤8: 打包数据集..."
cd ${OUTPUT_DIR}/final
tar -czf ../${DATASET_NAME}.tar.gz *.jsonl
cd ../..

echo "=== 数据集构建完成 ==="
echo "输出位置: ${OUTPUT_DIR}/final/"
echo "报告文件: ${OUTPUT_DIR}/dataset_report.html"
echo "打包文件: ${OUTPUT_DIR}/${DATASET_NAME}.tar.gz"
3.1.2 数据质量评估工具

质量评估仪表板

# dataset_quality_dashboard.py
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datasets import load_dataset
import json

class DatasetQualityDashboard:
    """数据集质量仪表板"""
    
    def __init__(self):
        st.set_page_config(
            page_title="医疗数据集质量仪表板",
            page_icon="🏥",
            layout="wide"
        )
        
    def run_dashboard(self):
        """运行仪表板"""
        st.title("🏥 医疗数据集质量仪表板")
        
        # 侧边栏
        with st.sidebar:
            st.header("数据集选择")
            dataset_path = st.text_input(
                "数据集路径",
                value="./datasets/medical_consultation_v1/final/train.jsonl"
            )
            
            sample_size = st.slider("样本数量", 100, 10000, 1000)
        
        # 加载数据
        try:
            df = self.load_dataset_sample(dataset_path, sample_size)
            st.success(f"成功加载 {len(df)} 条数据")
        except Exception as e:
            st.error(f"加载数据失败: {e}")
            return
        
        # 主要指标
        col1, col2, col3, col4 = st.columns(4)
        
        with col1:
            st.metric("总样本数", f"{len(df):,}")
        
        with col2:
            avg_length = df['output'].str.len().mean()
            st.metric("平均回答长度", f"{avg_length:.0f} 字符")
        
        with col3:
            unique_conditions = df['condition'].nunique() if 'condition' in df.columns else "N/A"
            st.metric("疾病种类", unique_conditions)
        
        with col4:
            quality_score = df.get('quality_score', pd.Series([0.85])).mean()
            st.metric("质量分数", f"{quality_score:.2f}")
        
        # 数据分布可视化
        st.header("📊 数据分布分析")
        
        col1, col2 = st.columns(2)
        
        with col1:
            # 回答长度分布
            fig_length = px.histogram(
                df, 
                x=df['output'].str.len(),
                nbins=50,
                title="回答长度分布",
                labels={'x': '字符数', 'y': '频数'}
            )
            st.plotly_chart(fig_length, use_container_width=True)
        
        with col2:
            # 疾病类型分布(如果有)
            if 'condition' in df.columns:
                condition_counts = df['condition'].value_counts().head(20)
                fig_conditions = px.bar(
                    x=condition_counts.index,
                    y=condition_counts.values,
                    title="Top 20 疾病类型分布",
                    labels={'x': '疾病类型', 'y': '数量'}
                )
                fig_conditions.update_xaxis(tickangle=45)
                st.plotly_chart(fig_conditions, use_container_width=True)
        
        # 质量指标
        st.header("✅ 质量指标分析")
        
        if 'quality_metrics' in df.columns:
            quality_df = pd.json_normalize(df['quality_metrics'])
            
            col1, col2, col3 = st.columns(3)
            
            with col1:
                st.subheader("准确性分布")
                accuracy_scores = quality_df.get('accuracy', pd.Series([0.9] * len(df)))
                fig_accuracy = px.histogram(
                    accuracy_scores,
                    nbins=20,
                    title="准确性分数分布"
                )
                st.plotly_chart(fig_accuracy, use_container_width=True)
            
            with col2:
                st.subheader("完整性分布")
                completeness_scores = quality_df.get('completeness', pd.Series([0.9] * len(df)))
                fig_completeness = px.histogram(
                    completeness_scores,
                    nbins=20,
                    title="完整性分数分布"
                )
                st.plotly_chart(fig_completeness, use_container_width=True)
            
            with col3:
                st.subheader("一致性分布")
                consistency_scores = quality_df.get('consistency', pd.Series([0.9] * len(df)))
                fig_consistency = px.histogram(
                    consistency_scores,
                    nbins=20,
                    title="一致性分数分布"
                )
                st.plotly_chart(fig_consistency, use_container_width=True)
        
        # 数据样本展示
        st.header("📝 数据样本")
        
        sample_idx = st.slider("选择样本索引", 0, len(df)-1, 0)
        sample = df.iloc[sample_idx]
        
        with st.expander("查看样本详情"):
            col1, col2 = st.columns(2)
            
            with col1:
                st.subheader("输入")
                st.text_area("", sample.get('instruction', 'N/A'), height=100, disabled=True)
                
                if 'input' in sample and sample['input']:
                    st.subheader("上下文")
                    st.text_area("", sample['input'], height=50, disabled=True)
            
            with col2:
                st.subheader("输出")
                st.text_area("", sample.get('output', 'N/A'), height=150, disabled=True)
        
        # 导出功能
        st.header("📤 导出选项")
        
        export_format = st.selectbox("导出格式", ["JSON", "CSV", "Parquet"])
        
        if st.button("导出数据"):
            if export_format == "JSON":
                json_data = df.to_json(orient='records', force_ascii=False, indent=2)
                st.download_button(
                    label="下载 JSON",
                    data=json_data,
                    file_name="medical_dataset.json",
                    mime="application/json"
                )
            elif export_format == "CSV":
                csv_data = df.to_csv(index=False)
                st.download_button(
                    label="下载 CSV",
                    data=csv_data,
                    file_name="medical_dataset.csv",
                    mime="text/csv"
                )
            elif export_format == "Parquet":
                # 需要安装 pyarrow
                try:
                    import pyarrow.parquet as pq
                    parquet_data = df.to_parquet()
                    st.download_button(
                        label="下载 Parquet",
                        data=parquet_data,
                        file_name="medical_dataset.parquet",
                        mime="application/octet-stream"
                    )
                except ImportError:
                    st.error("需要安装 pyarrow 才能导出 Parquet 格式")
    
    def load_dataset_sample(self, dataset_path: str, sample_size: int) -> pd.DataFrame:
        """加载数据集样本"""
        try:
            # 尝试作为JSONL文件加载
            data = []
            with open(dataset_path, 'r', encoding='utf-8') as f:
                for i, line in enumerate(f):
                    if i >= sample_size:
                        break
                    data.append(json.loads(line.strip()))
            
            return pd.DataFrame(data)
            
        except Exception as e:
            st.error(f"加载数据失败: {e}")
            # 返回示例数据
            return self.generate_sample_data()
    
    def generate_sample_data(self) -> pd.DataFrame:
        """生成示例数据"""
        sample_data = [
            {
                "instruction": "What are the symptoms of diabetes?",
                "input": "",
                "output": "Common symptoms of diabetes include frequent urination, excessive thirst, unexplained weight loss, increased hunger, fatigue, blurred vision, and slow-healing wounds. However, please consult a healthcare professional for proper diagnosis and treatment.",
                "condition": "diabetes",
                "quality_score": 0.92
            },
            {
                "instruction": "How is hypertension diagnosed?",
                "input": "",
                "output": "Hypertension is diagnosed by measuring blood pressure on multiple occasions. A diagnosis is made when blood pressure readings are consistently 140/90 mmHg or higher. Your doctor may also recommend additional tests to check for underlying conditions. Regular monitoring is essential.",
                "condition": "hypertension",
                "quality_score": 0.88
            }
        ]
        
        return pd.DataFrame(sample_data * 500)  # 重复生成更多数据

# 运行仪表板
if __name__ == "__main__":
    dashboard = DatasetQualityDashboard()
    dashboard.run_dashboard()

3.2 自动化数据集维护

3.2.1 持续集成流程

GitHub Actions工作流

# .github/workflows/dataset-update.yml
name: Dataset Update Pipeline

on:
  schedule:
    - cron: '0 2 * * 1'  # 每周一凌晨2点运行
  workflow_dispatch:
    inputs:
      update_type:
        description: 'Update type'
        required: true
        default: 'incremental'
        type: choice
        options:
        - incremental
        - full

jobs:
  data-collection:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v3
    
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install -e .
    
    - name: Collect new data
      run: |
        python scripts/collect_latest_data.py \
          --update-type ${{ github.event.inputs.update_type || 'incremental' }} \
          --output data/new_data.jsonl
    
    - name: Process new data
      run: |
        python scripts/process_new_data.py \
          --input data/new_data.jsonl \
          --output data/processed_data.jsonl
    
    - name: Quality check
      run: |
        python scripts/quality_check.py \
          --input data/processed_data.jsonl \
          --threshold 0.85
    
    - name: Merge with existing dataset
      run: |
        python scripts/merge_datasets.py \
          --existing data/current_dataset.jsonl \
          --new data/processed_data.jsonl \
          --output data/updated_dataset.jsonl
    
    - name: Generate quality report
      run: |
        python scripts/generate_quality_report.py \
          --dataset data/updated_dataset.jsonl \
          --output reports/quality_report.html
    
    - name: Upload artifacts
      uses: actions/upload-artifact@v3
      with:
        name: updated-dataset
        path: |
          data/updated_dataset.jsonl
          reports/quality_report.html
    
    - name: Create Pull Request
      uses: peter-evans/create-pull-request@v5
      with:
        token: ${{ secrets.GITHUB_TOKEN }}
        commit-message: "chore: update medical dataset"
        title: "Dataset Update - ${{ github.run_number }}"
        body: |
          ## 数据集更新报告
          
          ### 更新统计
          - 新增样本: ${{ steps.stats.outputs.new_samples }}
          - 质量分数: ${{ steps.stats.outputs.quality_score }}
          - 更新日期: ${{ steps.stats.outputs.update_date }}
          
          ### 质量报告
          详见: reports/quality_report.html
        branch: dataset-update-${{ github.run_number }}
        delete-branch: true
3.2.2 数据版本管理

语义化版本管理

# dataset_versioning.py
import semver
import json
import hashlib
from datetime import datetime
from typing import Dict, List

class DatasetVersionManager:
    """数据集版本管理器"""
    
    def __init__(self, major=1, minor=0, patch=0):
        self.version = semver.VersionInfo(major, minor, patch)
        self.version_history = []
        
    def bump_version(self, change_type: str, changes: Dict) -> str:
        """版本号递增"""
        if change_type == "major":
            # 重大变更:数据格式改变、质量要求重大调整
            new_version = self.version.bump_major()
        elif change_type == "minor":
            # 次要变更:新增大量数据、新增功能
            new_version = self.version.bump_minor()
        elif change_type == "patch":
            # 补丁变更:小量数据修正、bug修复
            new_version = self.version.bump_patch()
        else:
            raise ValueError(f"Unknown change type: {change_type}")
        
        # 记录版本历史
        version_record = {
            "version": str(new_version),
            "previous_version": str(self.version),
            "change_type": change_type,
            "changes": changes,
            "timestamp": datetime.now().isoformat(),
            "data_hash": self.calculate_dataset_hash(changes.get("dataset_path"))
        }
        
        self.version_history.append(version_record)
        self.version = new_version
        
        return str(new_version)
    
    def generate_changelog(self) -> str:
        """生成变更日志"""
        changelog = f"# 数据集变更日志\n\n"
        changelog += f"当前版本: {self.version}\n\n"
        
        for record in reversed(self.version_history[-10:]):  # 最近10个版本
            changelog += f"## {record['version']} ({record['timestamp'][:10]})\n\n"
            changelog += f"**变更类型**: {record['change_type']}\n\n"
            
            changes = record['changes']
            if 'new_samples' in changes:
                changelog += f"- 新增样本: {changes['new_samples']:,}\n"
            
            if 'quality_improvements' in changes:
                changelog += f"- 质量改进: {changes['quality_improvements']}\n"
            
            if 'bug_fixes' in changes:
                changelog += f"- Bug修复: {changes['bug_fixes']}\n"
            
            changelog += "\n"
        
        return changelog
    
    def calculate_dataset_hash(self, dataset_path: str) -> str:
        """计算数据集哈希"""
        if not dataset_path:
            return ""
        
        try:
            with open(dataset_path, 'rb') as f:
                file_hash = hashlib.sha256()
                while chunk := f.read(8192):
                    file_hash.update(chunk)
                return file_hash.hexdigest()[:16]
        except FileNotFoundError:
            return "file_not_found"
    
    def compare_versions(self, version1: str, version2: str) -> Dict:
        """比较两个版本"""
        v1 = semver.VersionInfo.parse(version1)
        v2 = semver.VersionInfo.parse(version2)
        
        comparison = {
            "version1": str(v1),
            "version2": str(v2),
            "is_newer": v1 > v2,
            "is_compatible": v1.major == v2.major,
            "difference": {
                "major": v1.major - v2.major,
                "minor": v1.minor - v2.minor,
                "patch": v1.patch - v2.patch
            }
        }
        
        return comparison

# 使用示例
version_manager = DatasetVersionManager(1, 0, 0)

# 记录一次小版本更新
new_version = version_manager.bump_version("minor", {
    "new_samples": 5000,
    "quality_improvements": "Improved medical terminology standardization",
    "source": "Added data from clinical trials"
})

print(f"新版本号: {new_version}")
print(version_manager.generate_changelog())

3.3 数据集发布与共享

3.3.1 数据集打包与文档

标准化发布流程

# dataset_publisher.py
import os
import json
import shutil
from datetime import datetime
from typing import Dict, List
import zipfile

class DatasetPublisher:
    """数据集发布器"""
    
    def __init__(self, output_dir: str = "./published_datasets"):
        self.output_dir = output_dir
        os.makedirs(output_dir, exist_ok=True)
        
    def publish_dataset(self, dataset_config: Dict, data_files: List[str]) -> str:
        """发布数据集"""
        
        # 生成发布ID
        publish_id = f"medical_dataset_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        publish_path = os.path.join(self.output_dir, publish_id)
        os.makedirs(publish_path, exist_ok=True)
        
        # 1. 复制数据文件
        data_dir = os.path.join(publish_path, "data")
        os.makedirs(data_dir, exist_ok=True)
        
        for file_path in data_files:
            if os.path.exists(file_path):
                shutil.copy2(file_path, data_dir)
        
        # 2. 生成数据集卡片
        dataset_card = self.generate_dataset_card(dataset_config)
        card_path = os.path.join(publish_path, "README.md")
        with open(card_path, 'w', encoding='utf-8') as f:
            f.write(dataset_card)
        
        # 3. 生成元数据文件
        metadata = self.generate_metadata(dataset_config, data_files)
        metadata_path = os.path.join(publish_path, "metadata.json")
        with open(metadata_path, 'w', encoding='utf-8') as f:
            json.dump(metadata, f, indent=2, ensure_ascii=False)
        
        # 4. 生成使用示例
        examples = self.generate_usage_examples(dataset_config)
        examples_path = os.path.join(publish_path, "examples.py")
        with open(examples_path, 'w', encoding='utf-8') as f:
            f.write(examples)
        
        # 5. 生成许可证文件
        license_text = self.generate_license(dataset_config.get("license", "apache-2.0"))
        license_path = os.path.join(publish_path, "LICENSE")
        with open(license_path, 'w', encoding='utf-8') as f:
            f.write(license_text)
        
        # 6. 创建压缩包
        archive_path = self.create_archive(publish_path, publish_id)
        
        print(f"数据集发布完成: {publish_path}")
        print(f"压缩包: {archive_path}")
        
        return publish_path
    
    def generate_dataset_card(self, config: Dict) -> str:
        """生成数据集卡片"""
        card_content = f"""# {config.get('name', 'Medical Dataset')}

## 数据集概述

- **名称**: {config.get('name', 'Medical Dataset')}
- **版本**: {config.get('version', '1.0.0')}
- **发布日期**: {datetime.now().strftime('%Y-%m-%d')}
- **数据集大小**: {config.get('total_samples', 'Unknown')} 条样本
- **领域**: {config.get('domain', 'Medical')}

## 数据集描述

{config.get('description', '这是一个医疗领域的高质量数据集。')}

### 数据组成

- 训练集: {config.get('train_size', 'N/A')} 条样本
- 验证集: {config.get('val_size', 'N/A')} 条样本  
- 测试集: {config.get('test_size', 'N/A')} 条样本

### 数据格式

每条数据包含以下字段:
- `instruction`: 指令或问题
- `input`: 输入上下文(可选)
- `output`: 期望的回答
- `metadata`: 元数据信息

### 质量指标

- 准确性: {config.get('quality_metrics', {}).get('accuracy', 'N/A')}
- 完整性: {config.get('quality_metrics', {}).get('completeness', 'N/A')}
- 一致性: {config.get('quality_metrics', {}).get('consistency', 'N/A')}

## 使用示例

    ```python
    from datasets import load_dataset

    # 加载数据集
    dataset = load_dataset("path/to/data")

    # 使用示例
    for item in dataset['train']:
        print(f"指令: {item['instruction']}")
        print(f"回答: {item['output']}")
    ```

## 许可证

{config.get('license', 'Apache 2.0')}

## 引用

如果您使用此数据集,请引用:
    ```bibtex
    {config.get('citation', 'Citation information to be added')}
    ```

## 联系方式

{config.get('contact', 'Contact information to be added')}
"""
        return card_content
    
    def generate_metadata(self, config: Dict, data_files: List[str]) -> Dict:
        """生成元数据"""
        metadata = {
            "name": config.get('name'),
            "version": config.get('version'),
            "description": config.get('description'),
            "created_at": datetime.now().isoformat(),
            "dataset_size": {
                "total_samples": config.get('total_samples'),
                "train_samples": config.get('train_size'),
                "val_samples": config.get('val_size'),
                "test_samples": config.get('test_size')
            },
            "files": [
                {
                    "name": os.path.basename(file_path),
                    "size": os.path.getsize(file_path),
                    "checksum": self.calculate_file_checksum(file_path)
                }
                for file_path in data_files
            ],
            "quality_metrics": config.get('quality_metrics', {}),
            "license": config.get('license', 'apache-2.0'),
            "tags": config.get('tags', []),
            "languages": config.get('languages', ['zh']),
            "task_categories": config.get('task_categories', ['text-generation'])
        }
        
        return metadata
    
    def generate_usage_examples(self, config: Dict) -> str:
        """生成使用示例"""
        examples = f"""# 数据集使用示例

import json
import pandas as pd
from datasets import load_dataset

# 方法1: 直接加载JSON文件
def load_json_dataset(file_path):
    data = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            data.append(json.loads(line.strip()))
    return data

# 方法2: 使用pandas加载
def load_pandas_dataset(file_path):
    return pd.read_json(file_path, lines=True)

# 方法3: 使用HuggingFace datasets
def load_hf_dataset(dataset_path):
    dataset = load_dataset('json', data_files=dataset_path)
    return dataset

# 使用示例
if __name__ == "__main__":
    # 加载数据
    dataset = load_json_dataset("train.jsonl")
    
    # 查看样本
    for i, sample in enumerate(dataset[:5]):
        print(f"=== 样本 {i+1} ===")
        print(f"指令: {{sample['instruction']}}")
        print(f"回答: {{sample['output'][:200]}}...")
        print()
    
    # 数据统计
    print(f"总样本数: {{len(dataset)}}")
    print(f"平均回答长度: {{sum(len(s['output']) for s in dataset) / len(dataset):.0f}} 字符")
"""
        return examples
    
    def calculate_file_checksum(self, file_path: str) -> str:
        """计算文件校验和"""
        sha256_hash = hashlib.sha256()
        with open(file_path, "rb") as f:
            for byte_block in iter(lambda: f.read(4096), b""):
                sha256_hash.update(byte_block)
        return sha256_hash.hexdigest()
    
    def create_archive(self, source_path: str, archive_name: str) -> str:
        """创建压缩包"""
        archive_path = os.path.join(self.output_dir, f"{archive_name}.zip")
        
        with zipfile.ZipFile(archive_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for root, dirs, files in os.walk(source_path):
                for file in files:
                    file_path = os.path.join(root, file)
                    arc_path = os.path.relpath(file_path, source_path)
                    zipf.write(file_path, arc_path)
        
        return archive_path

总结

构建高质量的垂直领域数据集是一个系统性的工程,需要从需求分析、数据收集、清洗处理、质量控制到发布维护的全流程管理。关键要点包括:

  1. 明确需求定义:深入了解业务场景和质量要求
  2. 多源数据整合:结合专业数据源和智能收集技术
  3. 严格质量控制:建立多层次的质量保证体系
  4. 自动化工具链:构建高效的自动化处理流程
  5. 持续维护更新:建立数据生命周期管理机制
  6. 标准化发布:遵循行业标准进行数据集发布

通过遵循这些最佳实践,可以构建出满足垂直领域大模型训练需求的高质量数据集,为模型的成功应用奠定坚实基础。

Logo

更多推荐