引言

在大型语言模型(LLM)时代,通用大模型虽然拥有惊人的知识储备和语言理解能力,但在医疗、法律等专业领域的精准应用仍面临巨大挑战。随着2025年大模型技术的快速迭代,领域特定微调已成为解决这一问题的核心策略。医疗和法律领域作为对专业性、准确性要求极高的领域,其大模型微调实践具有典型代表性和重要研究价值。

根据2025年最新统计数据,医疗大模型市场呈现爆发式增长,截至2025年5月1日已发布133个医疗大模型,远超2024年全年的94个和2023年全年的61个。这些模型在医疗服务、诊断辅助、医学研究等领域展现出巨大潜力。与此同时,法律领域的专业大模型也在合同审查、法律咨询、案例分析等场景中发挥着越来越重要的作用。

本教程将深入探讨医疗与法律领域的大模型微调技术,特别聚焦于专业语料的词汇优化策略,通过实际案例和代码实现,帮助读者掌握在专业领域进行高效微调的核心技能。我们将从语料构建、词汇优化、模型微调到评估部署,全方位覆盖领域微调的关键环节,为您的专业大模型应用提供完整的实践指南。

目录

  1. 领域微调的理论基础与挑战
  2. 医疗领域语料构建与预处理
  3. 法律领域语料构建与预处理
  4. 专业词汇优化技术
  5. 医疗领域微调实践
  6. 法律领域微调实践
  7. 领域模型评估与验证
  8. 部署与应用案例
  9. 最佳实践与未来展望

1. 领域微调的理论基础与挑战

1.1 领域微调的基本原理

领域微调是指在预训练大模型的基础上,使用特定领域的专业语料进行二次训练或微调,以使模型更好地适应特定领域的语言习惯、专业知识和任务需求。与通用模型相比,领域特定模型能够:

  • 更准确地理解专业术语和概念
  • 更好地适应领域特定的语言表达方式
  • 提供更精准的领域知识输出
  • 显著降低幻觉生成率
  • 提升在专业任务上的性能表现

从技术层面看,领域微调主要包含两种策略:

  1. 持续预训练:在领域语料上进行大规模预训练,更新模型的基础权重
  2. 参数高效微调:使用LoRA、QLoRA等技术,在保留模型原有能力的同时注入领域知识

1.2 2025年领域微调技术格局

2025年,领域微调技术已形成较为成熟的技术栈和方法论。以下是当前领域微调的主要技术特点:

技术维度 发展趋势 优势 应用场景
微调方法 QLoRA成为主流 内存效率高,微调效果好 资源受限环境下的领域微调
语料规模 大规模+高质量 覆盖全面,精度提升 医疗、法律等专业领域
词汇优化 领域特定词表扩展 提升专业术语理解能力 专业术语密集的领域
多模态融合 文本+专业知识图谱 知识结构化,推理增强 诊断辅助、法律推理
评估体系 领域特定基准测试 评估更精准,指导优化 模型优化与选择

1.3 医疗与法律领域的独特挑战

医疗和法律领域在大模型微调中面临一些共同挑战,同时也各自具有独特的技术难题:

1.3.1 共同挑战
  1. 专业性要求高:两个领域都对输出的准确性、可靠性有极高要求
  2. 数据获取困难:专业语料往往涉及隐私、版权或需要专业审核
  3. 知识更新快:医学研究和法律法规都在不断更新变化
  4. 解释性需求强:需要模型能够提供推理依据和解释
1.3.2 医疗领域特殊挑战
  1. 患者隐私保护:严格的HIPAA等隐私法规限制
  2. 诊断风险高:错误输出可能导致严重后果
  3. 医学术语复杂性:大量拉丁语、希腊语词根的专业术语
  4. 多源异构数据:需要整合病历、影像、检验等多种数据
1.3.3 法律领域特殊挑战
  1. 法规时效性:法律法规会随时间变化而修订
  2. 判例法依赖:需要理解和应用复杂的判例体系
  3. 法律解释多样性:不同法学家可能有不同解读
  4. 多法域复杂性:不同地区、国家的法律体系差异大

1.4 领域微调的评估指标

评估领域特定大模型性能需要考虑多个维度的指标:

1.4.1 通用性能指标
  • 困惑度(Perplexity):评估模型在领域语料上的预测能力
  • 准确率、精确率、召回率、F1分数:评估分类和识别任务的性能
  • BLEU、ROUGE、METEOR:评估生成任务的质量
1.4.2 领域特定指标
  • 医疗领域:诊断准确率、治疗方案推荐相关性、医学术语理解准确率
  • 法律领域:法规引用准确率、案例匹配度、法律推理有效性
1.4.3 实用价值指标
  • 幻觉率:生成内容中错误或虚构信息的比例
  • 解释质量:模型提供推理依据的充分性和准确性
  • 用户满意度:专业人员对模型输出的评价

2. 医疗领域语料构建与预处理

2.1 医疗语料的类型与来源

构建高质量的医疗领域语料库是微调成功的关键。2025年的医疗语料主要包括以下类型:

2.1.1 公开医学文献
  • PubMed:包含超过3500万篇生物医学文献摘要
  • Medline:医学索引数据库,提供结构化的医学文献信息
  • arXiv (Quantitative Biology):预印本平台上的生物医学研究
  • 医学教科书:如《哈里森内科学》、《格氏解剖学》等经典教材的公开部分
2.1.2 临床数据(去标识化)
  • MIMIC-III/IV:麻省理工学院维护的去标识化重症监护数据集
  • UK Biobank:大规模人群健康数据库
  • All of Us Research Program:美国国立卫生研究院的大规模研究计划数据
2.1.3 医疗问答与指南
  • Mayo Clinic Patient Care and Health Information:梅奥诊所的患者指南
  • MedlinePlus:美国国家医学图书馆提供的医学信息
  • UpToDate:循证医学临床决策支持资源的公开部分
2.1.4 医学术语资源
  • UMLS (Unified Medical Language System):统一医学语言系统,包含200多万个概念
  • SNOMED CT:系统化医学术语集,包含临床术语
  • ICD-10/11:国际疾病分类标准

2.2 医疗语料的预处理流程

医疗语料的预处理需要特别注意专业性和准确性。以下是一个标准的预处理流程:

2.2.1 数据清洗与去噪
import re
import pandas as pd
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

class MedicalCorpusPreprocessor:
    """
    医疗语料预处理类,用于清洗和标准化医疗文本数据
    """
    def __init__(self):
        # 下载必要的NLTK资源
        try:
            nltk.data.find('tokenizers/punkt')
            nltk.data.find('corpora/stopwords')
        except LookupError:
            nltk.download('punkt')
            nltk.download('stopwords')
        
        # 医疗领域特定停用词
        self.medical_stopwords = {
            'patient', 'pt', 'case', 'report', 'study', 'result', 'findings', 
            'conclusion', 'method', 'methods', 'purpose', 'objective'
        }
        
        # 通用停用词
        self.stop_words = set(stopwords.words('english')).union(self.medical_stopwords)
    
    def remove_html_tags(self, text):
        """移除HTML标签"""
        soup = BeautifulSoup(text, 'html.parser')
        return soup.get_text()
    
    def normalize_whitespace(self, text):
        """标准化空白字符"""
        return re.sub(r'\s+', ' ', text).strip()
    
    def remove_special_characters(self, text):
        """移除特殊字符,但保留医学术语中的连字符和数字"""
        # 保留字母、数字、连字符、句点和空格
        return re.sub(r'[^a-zA-Z0-9\-\.\s]', '', text)
    
    def normalize_medical_terms(self, text):
        """标准化常见医学术语缩写"""
        term_mapping = {
            'pt.': 'patient',
            'pts.': 'patients',
            'w/': 'with',
            'w/o': 'without',
            'tx': 'treatment',
            'dx': 'diagnosis',
            'rx': 'prescription',
            'f/u': 'follow-up',
            'vs.': 'versus',
            'e.g.': 'for example',
            'i.e.': 'that is',
            'NPO': 'nothing by mouth',
            'PRN': 'as needed',
            'q.d.': 'once a day',
            'b.i.d.': 'twice a day',
            't.i.d.': 'three times a day',
            'q.i.d.': 'four times a day',
        }
        
        for abbr, full_form in term_mapping.items():
            # 使用正则表达式确保匹配完整的缩写词
            text = re.sub(r'\b' + re.escape(abbr) + r'\b', full_form, text)
        
        return text
    
    def remove_stopwords(self, text):
        """移除停用词"""
        tokens = word_tokenize(text)
        filtered_tokens = [word for word in tokens if word.lower() not in self.stop_words]
        return ' '.join(filtered_tokens)
    
    def preprocess_text(self, text):
        """完整的文本预处理流程"""
        if not text or not isinstance(text, str):
            return ""
        
        # 转换为字符串(如果输入不是字符串)
        text = str(text)
        
        # 转换为小写
        text = text.lower()
        
        # 移除HTML标签
        text = self.remove_html_tags(text)
        
        # 标准化医学术语
        text = self.normalize_medical_terms(text)
        
        # 移除特殊字符
        text = self.remove_special_characters(text)
        
        # 标准化空白字符
        text = self.normalize_whitespace(text)
        
        # 可选:移除停用词(根据任务需求决定是否保留停用词)
        # text = self.remove_stopwords(text)
        
        return text
    
    def preprocess_dataframe(self, df, text_column):
        """预处理DataFrame中的文本列"""
        df_copy = df.copy()
        df_copy[text_column] = df_copy[text_column].apply(self.preprocess_text)
        # 移除空字符串
        df_copy = df_copy[df_copy[text_column].str.strip() != ""]
        return df_copy
    
    def detect_medical_terms(self, text, medical_lexicon):
        """检测文本中的医学术语"""
        detected_terms = set()
        tokens = word_tokenize(text)
        
        # 1. 检查单个词的术语
        for token in tokens:
            if token.lower() in medical_lexicon:
                detected_terms.add(token)
        
        # 2. 检查双词和三词的术语(N-gram)
        for i in range(len(tokens) - 1):
            bigram = f"{tokens[i]} {tokens[i+1]}".lower()
            if bigram in medical_lexicon:
                detected_terms.add(bigram)
        
        for i in range(len(tokens) - 2):
            trigram = f"{tokens[i]} {tokens[i+1]} {tokens[i+2]}".lower()
            if trigram in medical_lexicon:
                detected_terms.add(trigram)
        
        return detected_terms

# 使用示例
if __name__ == "__main__":
    # 创建预处理器实例
    preprocessor = MedicalCorpusPreprocessor()
    
    # 示例医疗文本
    sample_medical_text = """
    <h2>Case Report</h2>
    <p>The pt. is a 45 y.o. male with h/o hypertension and DM who presented c/o chest pain. 
    EKG showed ST-elevation. Cardiac enzymes were elevated. Diagnosis: AMI. 
    Tx: ASA, heparin, and urgent cath. F/u in cardiology clinic in 1 week.</p>
    """
    
    # 预处理文本
    cleaned_text = preprocessor.preprocess_text(sample_medical_text)
    print("Original Text:")
    print(sample_medical_text)
    print("\nCleaned Text:")
    print(cleaned_text)
    
    # 示例医学术语词典(简化版)
    sample_medical_lexicon = {
        'hypertension', 'dm', 'diabetes mellitus', 'chest pain', 'ekg', 
        'st-elevation', 'cardiac enzymes', 'ami', 'acute myocardial infarction',
        'asa', 'acetylsalicylic acid', 'heparin', 'cath', 'catheterization'
    }
    
    # 检测医学术语
    detected_terms = preprocessor.detect_medical_terms(cleaned_text, sample_medical_lexicon)
    print("\nDetected Medical Terms:")
    for term in detected_terms:
        print(f"- {term}")
2.2.2 隐私保护处理

医疗数据的隐私保护至关重要。在预处理阶段,必须严格执行去标识化处理:

import re
import random
from faker import Faker

class MedicalDeidentifier:
    """
    医疗数据去标识化类,用于保护患者隐私信息
    """
    def __init__(self):
        self.faker = Faker()
        
        # 用于生成一致的假名映射,确保相同的原始标识符总是映射到相同的假标识符
        self.name_map = {}
        self.id_map = {}
        self.phone_map = {}
        self.address_map = {}
    
    def replace_names(self, text):
        """替换人名"""
        # 使用正则表达式匹配可能的人名(简化版,实际应用需要更复杂的模式)
        # 这里假设人名通常是大写开头的单词,并且可能出现在特定上下文中
        
        # 匹配常见的人名出现模式
        patterns = [
            r'(?<=patient name:|name:|mr\.|ms\.|mrs\.|dr\.)\s*([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)',
            r'(?<=referred by:|attending physician:|ordered by:)\s*([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)',
        ]
        
        for pattern in patterns:
            matches = re.finditer(pattern, text, re.IGNORECASE)
            for match in matches:
                original_name = match.group(1)
                if original_name not in self.name_map:
                    # 生成假名字
                    if 'dr' in match.group(0).lower():
                        self.name_map[original_name] = f"Dr. {self.faker.last_name()}"
                    else:
                        self.name_map[original_name] = self.faker.name()
                text = text.replace(original_name, self.name_map[original_name])
        
        return text
    
    def replace_ids(self, text):
        """替换患者ID和医疗记录号"""
        # 匹配常见的ID格式
        patterns = [
            r'(?<=patient id:|id:|mrn:|medical record number:)\s*([A-Za-z0-9\-]{6,20})',
            r'(?<=\bpid\s*=|\bpatient_id\s*=)\s*([A-Za-z0-9\-]{6,20})',
        ]
        
        for pattern in patterns:
            matches = re.finditer(pattern, text, re.IGNORECASE)
            for match in matches:
                original_id = match.group(1)
                if original_id not in self.id_map:
                    # 生成假ID,保持相同的格式特征
                    if '-' in original_id:
                        parts = original_id.split('-')
                        fake_id = '-'.join([f"{random.randint(1000, 9999)}" for _ in parts])
                    else:
                        fake_id = f"PT{random.randint(100000, 999999)}"
                    self.id_map[original_id] = fake_id
                text = text.replace(original_id, self.id_map[original_id])
        
        return text
    
    def replace_dates(self, text):
        """替换日期,但保留相对时间关系"""
        # 简化版:将所有日期替换为通用的描述或相对时间
        date_patterns = [
            r'\b(?:\d{1,2}/\d{1,2}/\d{2,4})\b',  # MM/DD/YYYY or MM/DD/YY
            r'\b(?:\d{4}-\d{2}-\d{2})\b',  # YYYY-MM-DD
            r'\b(?:\d{1,2}\s+[A-Za-z]+\s+\d{2,4})\b',  # DD Month YYYY
        ]
        
        for pattern in date_patterns:
            text = re.sub(pattern, "[DATE]", text)
        
        return text
    
    def replace_phones(self, text):
        """替换电话号码"""
        phone_patterns = [
            r'\b(?:\d{3}[-.\s]?){2}\d{4}\b',  # 123-456-7890 or 123.456.7890 or 123 456 7890
            r'\b\+?\d{1,3}[-.\s]?(?:\d{3}[-.\s]?){2}\d{4}\b',  # 带国际区号
            r'\b\(\d{3}\)\s*\d{3}[-.\s]?\d{4}\b',  # (123) 456-7890
        ]
        
        for pattern in phone_patterns:
            matches = re.finditer(pattern, text)
            for match in matches:
                original_phone = match.group(0)
                if original_phone not in self.phone_map:
                    # 生成假电话号码,保持相同的格式
                    if '+' in original_phone:
                        self.phone_map[original_phone] = f"+{random.randint(1, 999)} {self.faker.phone_number()}"
                    elif '(' in original_phone:
                        self.phone_map[original_phone] = f"({random.randint(100, 999)}) {random.randint(100, 999)}-{random.randint(1000, 9999)}"
                    else:
                        self.phone_map[original_phone] = self.faker.phone_number()
                text = text.replace(original_phone, self.phone_map[original_phone])
        
        return text
    
    def replace_addresses(self, text):
        """替换地址信息"""
        # 简化版:匹配常见的地址模式
        address_pattern = r'\d+\s+[A-Za-z]+(?:\s+[A-Za-z]+)*\s+(?:street|st|avenue|ave|road|rd|boulevard|blvd|lane|ln)\b.*?\b(?:\d{5}(?:-\d{4})?)\b'
        
        matches = re.finditer(address_pattern, text, re.IGNORECASE)
        for match in matches:
            original_address = match.group(0)
            if original_address not in self.address_map:
                self.address_map[original_address] = self.faker.address()
            text = text.replace(original_address, self.address_map[original_address])
        
        return text
    
    def deidentify_text(self, text):
        """完整的去标识化流程"""
        text = self.replace_names(text)
        text = self.replace_ids(text)
        text = self.replace_dates(text)
        text = self.replace_phones(text)
        text = self.replace_addresses(text)
        return text
    
    def deidentify_dataframe(self, df, text_columns):
        """去标识化DataFrame中的文本列"""
        df_copy = df.copy()
        for column in text_columns:
            if column in df_copy.columns:
                df_copy[column] = df_copy[column].apply(self.deidentify_text)
        return df_copy

# 使用示例
if __name__ == "__main__":
    deidentifier = MedicalDeidentifier()
    
    # 示例医疗记录
    sample_record = """
    Patient Name: John Doe
    Patient ID: P-789012
    DOB: 01/15/1975
    Address: 123 Main Street, Anytown, USA 12345
    Phone: (555) 123-4567
    
    Chief Complaint: Chest pain for 2 hours
    History of Present Illness: Mr. Doe presented to the ED with chest pain radiating to the left arm.
    Attending Physician: Dr. Sarah Johnson
    Medical Record Number: MRN-567890
    """
    
    # 执行去标识化
    deidentified_record = deidentifier.deidentify_text(sample_record)
    
    print("Original Record:")
    print(sample_record)
    print("\nDeidentified Record:")
    print(deidentified_record)
2.2.3 医学术语标准化

医学术语的标准化是确保模型正确理解专业概念的关键步骤:

import json
from collections import defaultdict

class MedicalTermNormalizer:
    """
    医学术语标准化类,用于将不同表达形式的医学术语统一为标准形式
    """
    def __init__(self):
        # 初始化术语映射字典
        self.term_mappings = {
            # 疾病名称
            "mi": "myocardial infarction",
            "ami": "acute myocardial infarction",
            "chf": "congestive heart failure",
            "copd": "chronic obstructive pulmonary disease",
            "dm": "diabetes mellitus",
            "htn": "hypertension",
            "cad": "coronary artery disease",
            "cva": "cerebrovascular accident",
            "tia": "transient ischemic attack",
            "uti": "urinary tract infection",
            
            # 药物名称
            "asa": "acetylsalicylic acid",
            "tylenol": "acetaminophen",
            "ibuprofen": "ibuprofen",
            "advil": "ibuprofen",
            "motrin": "ibuprofen",
            "lipitor": "atorvastatin",
            "plavix": "clopidogrel",
            "aspirin": "acetylsalicylic acid",
            
            # 检查和程序
            "ekg": "electrocardiogram",
            "ecg": "electrocardiogram",
            "cxr": "chest x-ray",
            "ct": "computed tomography",
            "mri": "magnetic resonance imaging",
            "iv": "intravenous",
            "po": "per os",
            "im": "intramuscular",
            
            # 解剖学术语
            "cv": "cardiovascular",
            "gi": "gastrointestinal",
            "gu": "genitourinary",
            "cns": "central nervous system",
            "pns": "peripheral nervous system",
            "r/o": "rule out",
        }
        
        # 双向映射,用于快速查找
        self.reverse_mappings = defaultdict(set)
        for abbr, full in self.term_mappings.items():
            self.reverse_mappings[full].add(abbr)
            # 也添加小写形式以支持不区分大小写的匹配
            self.reverse_mappings[full.lower()].add(abbr.lower())
    
    def load_custom_mappings(self, json_file):
        """从JSON文件加载自定义术语映射"""
        try:
            with open(json_file, 'r', encoding='utf-8') as f:
                custom_mappings = json.load(f)
                # 更新术语映射
                for abbr, full in custom_mappings.items():
                    self.term_mappings[abbr] = full
                    self.reverse_mappings[full].add(abbr)
                    self.reverse_mappings[full.lower()].add(abbr.lower())
            print(f"成功加载自定义术语映射: {len(custom_mappings)} 项")
        except Exception as e:
            print(f"加载自定义术语映射失败: {str(e)}")
    
    def normalize_term(self, term):
        """标准化单个术语"""
        term_lower = term.lower().strip()
        # 检查是否存在直接映射
        if term_lower in self.term_mappings:
            return self.term_mappings[term_lower]
        return term  # 如果没有找到映射,返回原始术语
    
    def normalize_text(self, text):
        """标准化文本中的医学术语"""
        words = text.split()
        normalized_words = []
        
        i = 0
        while i < len(words):
            # 尝试匹配多词术语(最多3个词)
            matched = False
            
            # 检查3词术语
            if i < len(words) - 2:
                trigram = f"{words[i]} {words[i+1]} {words[i+2]}".lower()
                if trigram in self.term_mappings:
                    normalized_words.append(self.term_mappings[trigram])
                    i += 3
                    matched = True
            
            # 检查2词术语
            if not matched and i < len(words) - 1:
                bigram = f"{words[i]} {words[i+1]}".lower()
                if bigram in self.term_mappings:
                    normalized_words.append(self.term_mappings[bigram])
                    i += 2
                    matched = True
            
            # 检查1词术语
            if not matched:
                normalized_words.append(self.normalize_term(words[i]))
                i += 1
        
        return ' '.join(normalized_words)
    
    def extract_medical_terms(self, text):
        """从文本中提取医学术语"""
        words = text.split()
        extracted_terms = []
        
        i = 0
        while i < len(words):
            # 检查是否为医学术语(最多3个词)
            term_found = False
            
            # 检查3词术语
            if i < len(words) - 2:
                trigram = f"{words[i]} {words[i+1]} {words[i+2]}".lower()
                if trigram in self.term_mappings or trigram in self.reverse_mappings:
                    extracted_terms.append(trigram)
                    term_found = True
            
            # 检查2词术语
            if not term_found and i < len(words) - 1:
                bigram = f"{words[i]} {words[i+1]}".lower()
                if bigram in self.term_mappings or bigram in self.reverse_mappings:
                    extracted_terms.append(bigram)
                    term_found = True
            
            # 检查1词术语
            if not term_found:
                word_lower = words[i].lower()
                if word_lower in self.term_mappings or word_lower in self.reverse_mappings:
                    extracted_terms.append(word_lower)
            
            i += 1
        
        return extracted_terms
    
    def generate_term_frequency(self, text):
        """生成文本中医学术语的频率统计"""
        terms = self.extract_medical_terms(text)
        term_freq = defaultdict(int)
        
        for term in terms:
            # 使用标准化的术语作为键
            normalized_term = self.normalize_term(term)
            term_freq[normalized_term] += 1
        
        return dict(term_freq)

# 使用示例
if __name__ == "__main__":
    normalizer = MedicalTermNormalizer()
    
    # 示例医学文本
    sample_text = """
    The patient presents with htn and dm. We ordered an EKG to r/o MI. 
    Treatment includes ASA and Lipitor. Follow-up CXR in 1 week.
    """
    
    # 标准化文本
    normalized_text = normalizer.normalize_text(sample_text)
    print("Original Text:")
    print(sample_text)
    print("\nNormalized Text:")
    print(normalized_text)
    
    # 提取医学术语
    extracted_terms = normalizer.extract_medical_terms(sample_text)
    print("\nExtracted Medical Terms:")
    for term in extracted_terms:
        print(f"- {term}")
    
    # 生成术语频率统计
    term_freq = normalizer.generate_term_frequency(sample_text)
    print("\nTerm Frequency:")
    for term, freq in term_freq.items():
        print(f"- {term}: {freq}")

2.3 医疗语料的质量评估

评估医疗语料质量对于确保微调效果至关重要。以下是一些关键的质量评估指标和方法:

import pandas as pd
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats

class MedicalCorpusEvaluator:
    """
    医疗语料质量评估类,用于评估医疗文本语料库的质量特征
    """
    def __init__(self):
        # 常见医学期刊和来源的可信度权重
        self.source_reliability = {
            "pubmed": 1.0,
            "nejm": 1.0,
            "lancet": 1.0,
            "jama": 1.0,
            "bmj": 1.0,
            "cochrane": 0.95,
            "uptodate": 0.9,
            "mayoclinic": 0.85,
            "webmd": 0.7,
            "medlineplus": 0.8,
            "wikipedia": 0.6,
            "other": 0.5
        }
        
        # 医学领域关键词类别
        self.medical_keyword_categories = {
            "diseases": {"mi", "cancer", "diabetes", "hypertension", "copd", "stroke"},
            "treatments": {"surgery", "medication", "therapy", "rehabilitation", "transplant"},
            "tests": {"blood test", "ct scan", "mri", "ecg", "biopsy", "x-ray"},
            "symptoms": {"pain", "fever", "fatigue", "cough", "shortness of breath", "nausea"},
            "anatomy": {"heart", "brain", "lung", "liver", "kidney", "bone", "muscle"}
        }
    
    def calculate_text_statistics(self, texts):
        """计算文本统计特征"""
        stats = {
            "total_texts": len(texts),
            "total_tokens": 0,
            "mean_tokens_per_text": 0,
            "median_tokens_per_text": 0,
            "token_counts": [],
            "vocabulary_size": 0,
            "unique_tokens": set()
        }
        
        # 计算每个文本的词数和总词数
        for text in texts:
            if not isinstance(text, str):
                text = str(text)
            tokens = text.split()
            token_count = len(tokens)
            stats["token_counts"].append(token_count)
            stats["total_tokens"] += token_count
            stats["unique_tokens"].update(tokens)
        
        # 计算统计指标
        stats["vocabulary_size"] = len(stats["unique_tokens"])
        if stats["token_counts"]:
            stats["mean_tokens_per_text"] = np.mean(stats["token_counts"])
            stats["median_tokens_per_text"] = np.median(stats["token_counts"])
        
        return stats
    
    def assess_readability(self, text):
        """评估文本可读性(简化版Flesch-Kincaid公式)"""
        if not isinstance(text, str):
            text = str(text)
        
        # 简单版本:计算句子数、单词数和音节数
        sentences = re.split(r'[.!?]+', text)
        sentences = [s for s in sentences if s.strip()]
        num_sentences = len(sentences)
        
        words = text.split()
        num_words = len(words)
        
        # 简化的音节计数:每个单词平均音节数
        # 在实际应用中,可以使用更复杂的音节计数算法
        num_syllables = sum([self._count_syllables(word) for word in words])
        
        # Flesch-Kincaid Grade Level公式
        if num_sentences > 0 and num_words > 0:
            flesch_kincaid = 0.39 * (num_words / num_sentences) + 11.8 * (num_syllables / num_words) - 15.59
        else:
            flesch_kincaid = 0
        
        # Flesch Reading Ease公式
        if num_sentences > 0 and num_words > 0:
            flesch_ease = 206.835 - 1.015 * (num_words / num_sentences) - 84.6 * (num_syllables / num_words)
        else:
            flesch_ease = 0
        
        return {
            "flesch_kincaid_grade": flesch_kincaid,
            "flesch_reading_ease": flesch_ease,
            "num_sentences": num_sentences,
            "num_words": num_words,
            "num_syllables": num_syllables
        }
    
    def _count_syllables(self, word):
        """简单的音节计数函数"""
        word = word.lower()
        vowels = "aeiouy"
        count = 0
        
        # 处理特殊情况
        if len(word) == 0:
            return 0
        
        # 检查第一个字符是否为元音
        if word[0] in vowels:
            count += 1
        
        # 检查剩余字符
        for i in range(1, len(word)):
            if word[i] in vowels and word[i-1] not in vowels:
                count += 1
        
        # 处理以'e'结尾的单词
        if word.endswith('e') and len(word) > 1 and word[-2] not in vowels:
            count -= 1
        
        # 确保至少有一个音节
        return max(1, count)
    
    def analyze_source_reliability(self, texts_with_sources):
        """分析文本来源的可靠性"""
        total_weight = 0
        weighted_score = 0
        source_counts = Counter()
        
        for text, source in texts_with_sources:
            # 获取来源权重
            source_lower = source.lower()
            reliability_score = 0.5  # 默认中等可靠性
            
            # 检查是否匹配已知来源
            for known_source, weight in self.source_reliability.items():
                if known_source in source_lower:
                    reliability_score = weight
                    break
            
            # 计算加权得分
            text_length = len(text.split()) if isinstance(text, str) else 0
            weighted_score += reliability_score * text_length
            total_weight += text_length
            source_counts[source] += 1
        
        # 计算平均可靠性得分
        avg_reliability = weighted_score / total_weight if total_weight > 0 else 0
        
        return {
            "average_reliability": avg_reliability,
            "source_distribution": dict(source_counts),
            "reliability_summary": {
                "high_reliability_sources": sum(1 for src, cnt in source_counts.items() 
                                              if any(ks in src.lower() for ks, w in self.source_reliability.items() if w >= 0.8)),
                "medium_reliability_sources": sum(1 for src, cnt in source_counts.items()
                                                if any(ks in src.lower() for ks, w in self.source_reliability.items() if 0.5 <= w < 0.8)),
                "low_reliability_sources": sum(1 for src, cnt in source_counts.items()
                                              if any(ks in src.lower() for ks, w in self.source_reliability.items() if w < 0.5))
            }
        }
    
    def analyze_medical_term_coverage(self, texts):
        """分析医学术语覆盖情况"""
        category_coverage = {cat: set() for cat in self.medical_keyword_categories}
        total_keywords_found = set()
        
        for text in texts:
            if not isinstance(text, str):
                text = str(text)
            text_lower = text.lower()
            
            # 检查每个类别的关键词
            for category, keywords in self.medical_keyword_categories.items():
                for keyword in keywords:
                    if keyword in text_lower:
                        category_coverage[category].add(keyword)
                        total_keywords_found.add(keyword)
        
        # 计算覆盖率
        coverage_percentages = {}
        for category, found_keywords in category_coverage.items():
            total_category_keywords = len(self.medical_keyword_categories[category])
            if total_category_keywords > 0:
                coverage_percentages[category] = len(found_keywords) / total_category_keywords * 100
            else:
                coverage_percentages[category] = 0
        
        # 总体覆盖率
        total_possible_keywords = sum(len(keywords) for keywords in self.medical_keyword_categories.values())
        overall_coverage = len(total_keywords_found) / total_possible_keywords * 100 if total_possible_keywords > 0 else 0
        
        return {
            "overall_coverage_percentage": overall_coverage,
            "category_coverage_percentages": coverage_percentages,
            "keywords_found": dict({
                cat: list(keywords) for cat, keywords in category_coverage.items()
            })
        }
    
    def generate_corpus_quality_report(self, texts, texts_with_sources=None):
        """生成完整的语料质量报告"""
        report = {}
        
        # 1. 基本统计
        report["basic_statistics"] = self.calculate_text_statistics(texts)
        
        # 2. 可读性分析(对随机样本)
        sample_size = min(100, len(texts))
        if sample_size > 0:
            sample_texts = random.sample(texts, sample_size)
            readability_scores = [self.assess_readability(text) for text in sample_texts]
            
            report["readability"] = {
                "average_flesch_kincaid_grade": np.mean([r["flesch_kincaid_grade"] for r in readability_scores]),
                "average_flesch_reading_ease": np.mean([r["flesch_reading_ease"] for r in readability_scores]),
                "sample_size": sample_size
            }
        else:
            report["readability"] = {
                "error": "Sample size too small for readability analysis"
            }
        
        # 3. 来源可靠性分析
        if texts_with_sources and len(texts_with_sources) > 0:
            report["source_reliability"] = self.analyze_source_reliability(texts_with_sources)
        else:
            report["source_reliability"] = {
                "note": "Source information not provided for reliability analysis"
            }
        
        # 4. 医学术语覆盖分析
        report["medical_term_coverage"] = self.analyze_medical_term_coverage(texts)
        
        # 5. 综合质量评分
        # 基于多个指标计算综合质量评分(0-100)
        quality_scores = []
        
        # 文本长度评分(基于平均词数)
        avg_tokens = report["basic_statistics"]["mean_tokens_per_text"]
        if avg_tokens > 1000:
            length_score = 100
        elif avg_tokens < 100:
            length_score = 40
        else:
            length_score = 40 + (avg_tokens - 100) / 9
        quality_scores.append(length_score)
        
        # 词汇多样性评分
        vocab_size = report["basic_statistics"]["vocabulary_size"]
        if vocab_size > 50000:
            vocab_score = 100
        elif vocab_size < 5000:
            vocab_score = 40
        else:
            vocab_score = 40 + (vocab_size - 5000) / 450
        quality_scores.append(vocab_score)
        
        # 医学术语覆盖评分
        term_coverage = report["medical_term_coverage"]["overall_coverage_percentage"]
        quality_scores.append(term_coverage)
        
        # 来源可靠性评分(如果有)
        if "average_reliability" in report["source_reliability"]:
            reliability_score = report["source_reliability"]["average_reliability"] * 100
            quality_scores.append(reliability_score)
        
        # 计算综合评分
        report["overall_quality_score"] = np.mean(quality_scores)
        
        return report
    
    def visualize_corpus_characteristics(self, report, output_dir="."):
        """可视化语料库特征"""
        # 创建输出目录
        import os
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 1. 文本长度分布直方图
        plt.figure(figsize=(10, 6))
        if "token_counts" in report["basic_statistics"] and report["basic_statistics"]["token_counts"]:
            plt.hist(report["basic_statistics"]["token_counts"], bins=50, alpha=0.7, color='blue')
            plt.title('Text Length Distribution')
            plt.xlabel('Number of Tokens')
            plt.ylabel('Frequency')
            plt.grid(True, alpha=0.3)
            plt.savefig(os.path.join(output_dir, 'text_length_distribution.png'))
            plt.close()
        
        # 2. 医学术语覆盖饼图
        plt.figure(figsize=(10, 8))
        categories = list(report["medical_term_coverage"]["category_coverage_percentages"].keys())
        percentages = list(report["medical_term_coverage"]["category_coverage_percentages"].values())
        plt.pie(percentages, labels=categories, autopct='%1.1f%%', startangle=90)
        plt.title('Medical Term Category Coverage')
        plt.axis('equal')
        plt.savefig(os.path.join(output_dir, 'medical_term_coverage.png'))
        plt.close()
        
        # 3. 来源可靠性条形图
        if "source_distribution" in report["source_reliability"]:
            plt.figure(figsize=(12, 6))
            sources = list(report["source_reliability"]["source_distribution"].keys())
            counts = list(report["source_reliability"]["source_distribution"].values())
            plt.barh(sources, counts, alpha=0.7, color='green')
            plt.title('Source Distribution')
            plt.xlabel('Number of Documents')
            plt.tight_layout()
            plt.savefig(os.path.join(output_dir, 'source_distribution.png'))
            plt.close()
        
        print(f"可视化图表已保存到: {output_dir}")

# 使用示例
if __name__ == "__main__":
    import random
    import re
    
    # 创建一些示例医疗文本
    sample_medical_texts = [
        "Myocardial infarction (MI) is a leading cause of death worldwide. Early diagnosis and treatment are crucial for patient outcomes.",
        "Diabetes mellitus type 2 is characterized by insulin resistance and relative insulin deficiency. Management includes lifestyle modifications and medications.",
        "Chronic obstructive pulmonary disease (COPD) presents with progressive airflow limitation. Pulmonary function tests are essential for diagnosis.",
        "Hypertension, or high blood pressure, is a major risk factor for cardiovascular disease. Treatment typically involves antihypertensive medications.",
        "Stroke, or cerebrovascular accident, can be ischemic or hemorrhagic. Rapid intervention is critical to minimize brain damage.",
        # 添加更多示例文本...
    ]
    
    # 添加来源信息
    sample_sources = ["PubMed", "NEJM", "Mayo Clinic", "WebMD", "Wikipedia"]
    texts_with_sources = [(text, random.choice(sample_sources)) for text in sample_medical_texts]
    
    # 创建评估器实例
    evaluator = MedicalCorpusEvaluator()
    
    # 生成质量报告
    report = evaluator.generate_corpus_quality_report(sample_medical_texts, texts_with_sources)
    
    # 打印报告摘要
    print("=== 医疗语料质量评估报告 ===")
    print(f"总体质量评分: {report['overall_quality_score']:.2f}/100")
    print(f"文本总数: {report['basic_statistics']['total_texts']}")
    print(f"总词数: {report['basic_statistics']['total_tokens']}")
    print(f"平均文本长度: {report['basic_statistics']['mean_tokens_per_text']:.2f} 词")
    print(f"词汇量大小: {report['basic_statistics']['vocabulary_size']}")
    print(f"平均Flesch-Kincaid等级: {report['readability']['average_flesch_kincaid_grade']:.2f}")
    print(f"医学术语总体覆盖率: {report['medical_term_coverage']['overall_coverage_percentage']:.2f}%")
    
    # 可视化结果
    evaluator.visualize_corpus_characteristics(report, "./corpus_visualizations")

3. 法律领域语料构建与预处理

3.1 法律语料的类型与来源

法律领域的语料来源丰富多样,但需要特别注意权威性和时效性:

3.1.1 法律法规与司法解释
  • 全国人大及其常委会制定的法律:宪法、民法、刑法、商法等
  • 行政法规:国务院制定的条例、规定、办法等
  • 司法解释:最高人民法院、最高人民检察院发布的解释文件
  • 地方法规:各省、自治区、直辖市人大及其常委会制定的地方性法规
3.1.2 判例与裁判文书
  • 最高人民法院指导性案例:具有指导意义的典型案例
  • 中国裁判文书网:公开的裁判文书数据库
  • 案例汇编:各类法律出版社出版的案例集
  • 法学期刊案例:《中国法学》、《法学研究》等期刊中的案例分析
3.1.3 法学论著与学术文献
  • 法学教材:《民法总论》、《刑法学》等经典教材
  • 法学专著:各领域专家学者的研究著作
  • 法学论文:CNKI、万方等数据库中的法学论文
  • 法律评论:《中国法律评论》、《法学评论》等期刊
3.1.4 法律词典与术语资源
  • 中国法律大辞典:收录法律术语和概念的权威词典
  • 元照英美法词典:英美法系法律术语词典
  • 法律术语数据库:各类专业法律术语库

3.2 法律语料的预处理流程

法律语料的预处理需要注重准确性和规范性,特别是对法律术语和条文的处理:

3.2.1 法律文本清洗与规范化
import re
import pandas as pd
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords

class LegalCorpusPreprocessor:
    """
    法律语料预处理类,用于清洗和标准化法律文本数据
    """
    def __init__(self):
        # 下载必要的NLTK资源
        try:
            nltk.data.find('tokenizers/punkt')
            nltk.data.find('corpora/stopwords')
        except LookupError:
            nltk.download('punkt')
            nltk.download('stopwords')
        
        # 法律领域特定停用词
        self.legal_stopwords = {
            'shall', 'may', 'must', 'provided', 'however', 'otherwise', 'thereof', 
            'hereinafter', 'hereby', 'hereto', 'herein', 'whereof', 'whereby',
            'according', 'section', 'article', 'clause', 'paragraph', 'item'
        }
        
        # 通用停用词
        self.stop_words = set(stopwords.words('chinese')).union(self.legal_stopwords)
    
    def remove_html_tags(self, text):
        """移除HTML标签"""
        if not isinstance(text, str):
            return ""
        soup = BeautifulSoup(text, 'html.parser')
        return soup.get_text()
    
    def normalize_whitespace(self, text):
        """标准化空白字符"""
        if not isinstance(text, str):
            return ""
        return re.sub(r'\s+', ' ', text).strip()
    
    def remove_special_characters(self, text):
        """移除特殊字符,但保留法律文本中的必要符号"""
        if not isinstance(text, str):
            return ""
        # 保留中文、英文、数字、标点符号和法律文本中的特殊符号
        return re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9,。;:!?"\'()《》【】、·\s]', '', text)
    
    def normalize_legal_terms(self, text):
        """标准化常见法律术语缩写"""
        if not isinstance(text, str):
            return ""
        
        term_mapping = {
            # 中文法律术语标准化
            '民法典': '中华人民共和国民法典',
            '刑法': '中华人民共和国刑法',
            '合同法': '中华人民共和国合同法',
            '公司法': '中华人民共和国公司法',
            '知识产权法': '中华人民共和国知识产权法',
            '证券法': '中华人民共和国证券法',
            '行政法': '中华人民共和国行政法',
            '宪法': '中华人民共和国宪法',
            '民事诉讼法': '中华人民共和国民事诉讼法',
            '刑事诉讼法': '中华人民共和国刑事诉讼法',
            
            # 常见法律术语
            '原告诉称': '原告陈述',
            '被告辩称': '被告陈述',
            '经审理查明': '经审理查明',
            '本院认为': '本院认为',
            '判决如下': '判决如下',
        }
        
        for abbr, full_form in term_mapping.items():
            text = text.replace(abbr, full_form)
        
        return text
    
    def remove_stopwords(self, text):
        """移除停用词"""
        if not isinstance(text, str):
            return ""
        tokens = word_tokenize(text)
        filtered_tokens = [word for word in tokens if word not in self.stop_words]
        return ' '.join(filtered_tokens)
    
    def segment_chinese_text(self, text):
        """中文文本分词(这里使用简单的字符分割,实际应用中可使用jieba等专业分词工具)"""
        if not isinstance(text, str):
            return ""
        # 简单返回文本(实际应用中应集成jieba等分词库)
        return text
    
    def extract_legal_clauses(self, text):
        """提取法律条文中的条款信息"""
        if not isinstance(text, str):
            return []
        
        # 匹配常见的条款格式
        clause_patterns = [
            r'第[一二三四五六七八九十百千]+条',  # 中文数字条款
            r'第\d+条',                            # 阿拉伯数字条款
            r'第[一二三四五六七八九十百千]+款',  # 中文数字款项
            r'第\d+款',                            # 阿拉伯数字款项
            r'第[一二三四五六七八九十百千]+项',  # 中文数字项目
            r'第\d+项',                            # 阿拉伯数字项目
        ]
        
        clauses = []
        for pattern in clause_patterns:
            matches = re.findall(pattern, text)
            clauses.extend(matches)
        
        return list(set(clauses))  # 去重
    
    def preprocess_text(self, text):
        """完整的文本预处理流程"""
        if not text or not isinstance(text, str):
            return ""
        
        # 转换为字符串(如果输入不是字符串)
        text = str(text)
        
        # 移除HTML标签
        text = self.remove_html_tags(text)
        
        # 标准化法律术语
        text = self.normalize_legal_terms(text)
        
        # 移除特殊字符
        text = self.remove_special_characters(text)
        
        # 标准化空白字符
        text = self.normalize_whitespace(text)
        
        # 中文分词(可选)
        # text = self.segment_chinese_text(text)
        
        # 可选:移除停用词(根据任务需求决定是否保留停用词)
        # text = self.remove_stopwords(text)
        
        return text
    
    def preprocess_dataframe(self, df, text_column):
        """预处理DataFrame中的文本列"""
        df_copy = df.copy()
        df_copy[text_column] = df_copy[text_column].apply(self.preprocess_text)
        # 移除空字符串
        df_copy = df_copy[df_copy[text_column].str.strip() != ""]
        return df_copy
    
    def detect_legal_terms(self, text, legal_lexicon):
        """检测文本中的法律术语"""
        if not isinstance(text, str) or not legal_lexicon:
            return set()
        
        detected_terms = set()
        
        # 检查每个术语是否在文本中
        for term in legal_lexicon:
            if term in text:
                detected_terms.add(term)
        
        return detected_terms

# 使用示例
if __name__ == "__main__":
    # 创建预处理器实例
    preprocessor = LegalCorpusPreprocessor()
    
    # 示例法律文本
    sample_legal_text = """
    <h2>中华人民共和国民法典</h2>
    <p>第一千一百六十五条 行为人因过错侵害他人民事权益造成损害的,应当承担侵权责任。</p>
    <p>依照法律规定推定行为人有过错,其不能证明自己没有过错的,应当承担侵权责任。</p>
    <p>第一千一百六十六条 行为人造成他人民事权益损害,不论行为人有无过错,法律规定应当承担侵权责任的,依照其规定。</p>
    """
    
    # 预处理文本
    cleaned_text = preprocessor.preprocess_text(sample_legal_text)
    print("原始文本:")
    print(sample_legal_text)
    print("\n清洗后文本:")
    print(cleaned_text)
    
    # 提取条款
    clauses = preprocessor.extract_legal_clauses(cleaned_text)
    print("\n提取的条款:")
    for clause in clauses:
        print(f"- {clause}")
    
    # 示例法律术语词典(简化版)
    sample_legal_lexicon = {
        '侵权责任', '民事权益', '过错', '损害', '推定', '证明', '行为人'
    }
    
    # 检测法律术语
    detected_terms = preprocessor.detect_legal_terms(cleaned_text, sample_legal_lexicon)
    print("\n检测到的法律术语:")
    for term in detected_terms:
        print(f"- {term}")
3.2.2 法律文本结构化

法律文本通常具有层次结构,结构化处理有助于模型理解法律文本的组织方式:

import re
import json
from collections import defaultdict, deque

class LegalTextStructurizer:
    """
    法律文本结构化类,用于解析和构建法律文本的层次结构
    """
    def __init__(self):
        # 法律文本层级模式
        self.level_patterns = {
            'chapter': r'第[一二三四五六七八九十百千]+章',  # 章
            'section': r'第[一二三四五六七八九十百千]+节',  # 节
            'article': r'第[一二三四五六七八九十百千]+条',  # 条
            'paragraph': r'第[一二三四五六七八九十百千]+款',  # 款
            'item': r'第[一二三四五六七八九十百千]+项',  # 项
            'subitem': r'第[一二三四五六七八九十百千]+目',  # 目
        }
        
        # 阿拉伯数字版本的层级模式
        self.level_patterns_arabic = {
            'chapter': r'第\d+章',
            'section': r'第\d+节',
            'article': r'第\d+条',
            'paragraph': r'第\d+款',
            'item': r'第\d+项',
            'subitem': r'第\d+目',
        }
        
        # 层级关系
        self.hierarchy_order = ['chapter', 'section', 'article', 'paragraph', 'item', 'subitem']
    
    def parse_legal_text(self, text):
        """解析法律文本,构建层次结构"""
        if not isinstance(text, str):
            return {"error": "Input must be a string"}
        
        # 初始化结构
        structure = {
            "type": "legal_document",
            "content": text,
            "structure": {},
            "chapters": []
        }
        
        # 合并中文数字和阿拉伯数字的模式
        all_patterns = {**self.level_patterns, **self.level_patterns_arabic}
        
        # 按层级顺序处理
        current_levels = {}
        chapter_stack = []
        
        # 将文本按行分割
        lines = text.split('\n')
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
            
            # 检查是否包含任何层级标记
            level_found = False
            
            for level_name in self.hierarchy_order:
                pattern = all_patterns[level_name]
                matches = re.findall(pattern, line)
                
                if matches:
                    level_found = True
                    level_text = matches[0]
                    
                    # 创建层级节点
                    level_node = {
                        "type": level_name,
                        "identifier": level_text,
                        "content": line,
                        "children": []
                    }
                    
                    # 根据层级关系添加到适当的位置
                    if level_name == 'chapter':
                        # 章是最高层级
                        structure["chapters"].append(level_node)
                        chapter_stack = [level_node]
                        current_levels = {'chapter': level_node}
                    else:
                        # 找到父层级
                        parent_level_index = self.hierarchy_order.index(level_name) - 1
                        parent_level_name = self.hierarchy_order[parent_level_index]
                        
                        # 确保父层级存在
                        if parent_level_name in current_levels:
                            parent_node = current_levels[parent_level_name]
                            parent_node["children"].append(level_node)
                            current_levels[level_name] = level_node
                            
                            # 更新层级栈
                            if len(chapter_stack) > parent_level_index + 1:
                                chapter_stack = chapter_stack[:parent_level_index + 1]
                            chapter_stack.append(level_node)
                        else:
                            # 如果父层级不存在,尝试添加到最后一个可用的父层级
                            if chapter_stack:
                                last_node = chapter_stack[-1]
                                last_node["children"].append(level_node)
                                current_levels[level_name] = level_node
                
            if not level_found and chapter_stack:
                # 如果没有找到层级标记,但存在当前章节,将内容添加到最后一个层级
                last_node = chapter_stack[-1]
                if "content" in last_node:
                    last_node["content"] += " " + line
                else:
                    last_node["content"] = line
        
        return structure
    
    def extract_articles(self, structured_text):
        """从结构化文本中提取所有条款"""
        articles = []
        
        def extract_articles_recursive(node):
            if node.get("type") == "article":
                articles.append(node)
            
            for child in node.get("children", []):
                extract_articles_recursive(child)
        
        # 从每个章节开始递归提取
        for chapter in structured_text.get("chapters", []):
            extract_articles_recursive(chapter)
        
        return articles
    
    def generate_legal_outline(self, structured_text):
        """生成法律文本的大纲"""
        outline = []
        
        def add_to_outline(node, level=0):
            indent = "  " * level
            identifier = node.get("identifier", "")
            content_preview = node.get("content", "")[:100]  # 内容预览
            outline.append(f"{indent}- {identifier}: {content_preview}...")
            
            for child in node.get("children", []):
                add_to_outline(child, level + 1)
        
        # 从每个章节开始构建大纲
        for chapter in structured_text.get("chapters", []):
            add_to_outline(chapter)
        
        return "\n".join(outline)
    
    def find_article_by_number(self, structured_text, article_number):
        """通过条款编号查找特定条款"""
        target_identifier = f"第{article_number}条"
        
        def search_article_recursive(node):
            if node.get("identifier") == target_identifier:
                return node
            
            for child in node.get("children", []):
                result = search_article_recursive(child)
                if result:
                    return result
            
            return None
        
        # 从每个章节开始搜索
        for chapter in structured_text.get("chapters", []):
            result = search_article_recursive(chapter)
            if result:
                return result
        
        return None
    
    def export_to_json(self, structured_text, output_file=None):
        """导出结构化文本为JSON格式"""
        json_data = json.dumps(structured_text, ensure_ascii=False, indent=2)
        
        if output_file:
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(json_data)
            print(f"结构化数据已导出到: {output_file}")
        
        return json_data
    
    def import_from_json(self, json_file):
        """从JSON文件导入结构化文本"""
        try:
            with open(json_file, 'r', encoding='utf-8') as f:
                structured_text = json.load(f)
            return structured_text
        except Exception as e:
            print(f"导入JSON文件失败: {str(e)}")
            return None

# 使用示例
if __name__ == "__main__":
    # 创建结构化器实例
    structurizer = LegalTextStructurizer()
    
    # 示例法律文本
    sample_legal_text = """
    第一编 总则
    
    第一章 基本规定
    
    第一条 为了保护民事主体的合法权益,调整民事关系,维护社会和经济秩序,适应中国特色社会主义发展要求,弘扬社会主义核心价值观,根据宪法,制定本法。
    
    第二条 民法调整平等主体的自然人、法人和非法人组织之间的人身关系和财产关系。
    
    第三条 民事主体的人身权利、财产权利以及其他合法权益受法律保护,任何组织或者个人不得侵犯。
    
    第二章 自然人
    
    第一节 民事权利能力和民事行为能力
    
    第十三条 自然人从出生时起到死亡时止,具有民事权利能力,依法享有民事权利,承担民事义务。
    
    第十四条 自然人的民事权利能力一律平等。
    
    第十五条 自然人的出生时间和死亡时间,以出生证明、死亡证明记载的时间为准;没有出生证明、死亡证明的,以户籍登记或者其他有效身份登记记载的时间为准。有其他证据足以推翻以上记载时间的,以该证据证明的时间为准。
    """
    
    # 解析法律文本
    structured_text = structurizer.parse_legal_text(sample_legal_text)
    
    # 生成大纲
    outline = structurizer.generate_legal_outline(structured_text)
    print("法律文本大纲:")
    print(outline)
    
    # 提取条款
    articles = structurizer.extract_articles(structured_text)
    print(f"\n共提取到 {len(articles)} 个条款")
    for article in articles[:3]:  # 显示前3个条款
        print(f"\n{article['identifier']}:")
        print(article['content'])
    
    # 查找特定条款
    article_2 = structurizer.find_article_by_number(structured_text, "二")
    if article_2:
        print(f"\n找到第2条:")
        print(article_2['content'])
    
    # 导出为JSON
    # structurizer.export_to_json(structured_text, 'legal_text_structure.json')

3.3 法律语料的质量评估

法律语料的质量评估需要考虑权威性、时效性和准确性等多个维度:

import pandas as pd
import numpy as np
from collections import Counter, defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
import re
import datetime

class LegalCorpusEvaluator:
    """
    法律语料质量评估类,用于评估法律文本语料库的质量特征
    """
    def __init__(self):
        # 法律文本来源的可信度权重
        self.source_reliability = {
            "全国人大": 1.0,
            "国务院": 0.95,
            "最高人民法院": 0.95,
            "最高人民检察院": 0.95,
            "中国裁判文书网": 0.9,
            "法学核心期刊": 0.85,
            "法律出版社": 0.8,
            "知名法学院校": 0.8,
            "普通法学期刊": 0.7,
            "其他": 0.5
        }
        
        # 法律领域关键词类别
        self.legal_keyword_categories = {
            "民法": {"合同", "侵权", "物权", "债权", "人格权", "婚姻家庭", "继承"},
            "刑法": {"犯罪", "刑罚", "故意", "过失", "正当防卫", "紧急避险", "累犯"},
            "行政法": {"行政许可", "行政处罚", "行政强制", "行政复议", "行政诉讼"},
            "程序法": {"管辖", "证据", "举证责任", "诉讼时效", "再审", "执行"},
            "商法": {"公司", "证券", "保险", "票据", "破产", "海商"}
        }
        
        # 重要法律颁布日期(用于评估时效性)
        self.important_legal_dates = {
            "中华人民共和国民法典": datetime.datetime(2020, 5, 28),
            "中华人民共和国刑法修正案(十一)": datetime.datetime(2020, 12, 26),
            "中华人民共和国行政处罚法": datetime.datetime(2021, 1, 22),
            "中华人民共和国个人信息保护法": datetime.datetime(2021, 8, 20),
            "中华人民共和国数据安全法": datetime.datetime(2021, 6, 10)
        }
    
    def calculate_text_statistics(self, texts):
        """计算文本统计特征"""
        stats = {
            "total_texts": len(texts),
            "total_tokens": 0,
            "mean_tokens_per_text": 0,
            "median_tokens_per_text": 0,
            "token_counts": [],
            "vocabulary_size": 0,
            "unique_tokens": set()
        }
        
        # 计算每个文本的词数和总词数
        for text in texts:
            if not isinstance(text, str):
                text = str(text)
            tokens = text.split()
            token_count = len(tokens)
            stats["token_counts"].append(token_count)
            stats["total_tokens"] += token_count
            stats["unique_tokens"].update(tokens)
        
        # 计算统计指标
        stats["vocabulary_size"] = len(stats["unique_tokens"])
        if stats["token_counts"]:
            stats["mean_tokens_per_text"] = np.mean(stats["token_counts"])
            stats["median_tokens_per_text"] = np.median(stats["token_counts"])
        
        return stats
    
    def assess_readability(self, text):
        """评估法律文本可读性"""
        if not isinstance(text, str):
            text = str(text)
        
        # 法律文本特有的可读性指标
        # 1. 条款密度:平均每100字包含的条款数量
        clause_pattern = r'第[一二三四五六七八九十百千\d]+[条款项目]'
        clauses = re.findall(clause_pattern, text)
        clause_density = len(clauses) / (len(text) / 100) if len(text) > 0 else 0
        
        # 2. 法律术语密度:这里使用简单计数,实际应用中应使用专业术语表
        legal_terms = set()
        for category, terms in self.legal_keyword_categories.items():
            for term in terms:
                if term in text:
                    legal_terms.add(term)
        
        term_density = len(legal_terms) / (len(text) / 100) if len(text) > 0 else 0
        
        # 3. 句子复杂度:句号、分号、顿号的数量(简化版)
        punctuation_count = sum(1 for char in text if char in '。;;、')
        sentence_complexity = punctuation_count / (len(text) / 100) if len(text) > 0 else 0
        
        return {
            "clause_density": clause_density,
            "legal_term_density": term_density,
            "sentence_complexity": sentence_complexity,
            "text_length": len(text)
        }
    
    def analyze_source_reliability(self, texts_with_sources):
        """分析文本来源的可靠性"""
        total_weight = 0
        weighted_score = 0
        source_counts = Counter()
        
        for text, source in texts_with_sources:
            # 获取来源权重
            source_lower = source.lower() if isinstance(source, str) else ""
            reliability_score = 0.5  # 默认中等可靠性
            
            # 检查是否匹配已知来源
            for known_source, weight in self.source_reliability.items():
                if known_source.lower() in source_lower:
                    reliability_score = weight
                    break
            
            # 计算加权得分
            text_length = len(text) if isinstance(text, str) else 0
            weighted_score += reliability_score * text_length
            total_weight += text_length
            source_counts[source] += 1
        
        # 计算平均可靠性得分
        avg_reliability = weighted_score / total_weight if total_weight > 0 else 0
        
        return {
            "average_reliability": avg_reliability,
            "source_distribution": dict(source_counts),
            "reliability_summary": {
                "high_reliability_sources": sum(1 for src, cnt in source_counts.items()
                                              if any(ks.lower() in str(src).lower() for ks, w in self.source_reliability.items() if w >= 0.8)),
                "medium_reliability_sources": sum(1 for src, cnt in source_counts.items()
                                                if any(ks.lower() in str(src).lower() for ks, w in self.source_reliability.items() if 0.5 <= w < 0.8)),
                "low_reliability_sources": sum(1 for src, cnt in source_counts.items()
                                              if any(ks.lower() in str(src).lower() for ks, w in self.source_reliability.items() if w < 0.5))
            }
        }
    
    def analyze_legal_term_coverage(self, texts):
        """分析法律术语覆盖情况"""
        category_coverage = {cat: set() for cat in self.legal_keyword_categories}
        total_keywords_found = set()
        
        for text in texts:
            if not isinstance(text, str):
                text = str(text)
            text_lower = text.lower()
            
            # 检查每个类别的关键词
            for category, keywords in self.legal_keyword_categories.items():
                for keyword in keywords:
                    if keyword in text_lower:
                        category_coverage[category].add(keyword)
                        total_keywords_found.add(keyword)
        
        # 计算覆盖率
        coverage_percentages = {}
        for category, found_keywords in category_coverage.items():
            total_category_keywords = len(self.legal_keyword_categories[category])
            if total_category_keywords > 0:
                coverage_percentages[category] = len(found_keywords) / total_category_keywords * 100
            else:
                coverage_percentages[category] = 0
        
        # 总体覆盖率
        total_possible_keywords = sum(len(keywords) for keywords in self.legal_keyword_categories.values())
        overall_coverage = len(total_keywords_found) / total_possible_keywords * 100 if total_possible_keywords > 0 else 0
        
        return {
            "overall_coverage_percentage": overall_coverage,
            "category_coverage_percentages": coverage_percentages,
            "keywords_found": dict({
                cat: list(keywords) for cat, keywords in category_coverage.items()
            })
        }
    
    def assess_timeliness(self, texts, publication_dates=None):
        """评估法律文本的时效性"""
        current_year = datetime.datetime.now().year
        
        # 如果提供了发布日期,使用它们进行分析
        if publication_dates and len(publication_dates) == len(texts):
            ages = []
            recent_count = 0
            outdated_count = 0
            
            for date_str in publication_dates:
                try:
                    # 假设日期格式为YYYY-MM-DD
                    if isinstance(date_str, str):
                        pub_date = datetime.datetime.strptime(date_str, "%Y-%m-%d")
                    elif isinstance(date_str, datetime.datetime):
                        pub_date = date_str
                    else:
                        continue
                    
                    age_in_years = (datetime.datetime.now() - pub_date).days / 365.25
                    ages.append(age_in_years)
                    
                    # 法律文本时效性判断:3年内为近期,10年以上为过时
                    if age_in_years <= 3:
                        recent_count += 1
                    elif age_in_years > 10:
                        outdated_count += 1
                except Exception:
                    continue
            
            return {
                "assessment_method": "publication_dates",
                "average_age_years": np.mean(ages) if ages else 0,
                "recent_texts_percentage": (recent_count / len(texts) * 100) if texts else 0,
                "outdated_texts_percentage": (outdated_count / len(texts) * 100) if texts else 0
            }
        else:
            # 基于文本内容中的法律引用评估时效性
            # 检查是否引用了最新的法律
            recent_legal_refs = 0
            total_legal_refs = 0
            
            current_date = datetime.datetime.now()
            
            for text in texts:
                if not isinstance(text, str):
                    continue
                
                for law_name, promul_date in self.important_legal_dates.items():
                    if law_name in text:
                        total_legal_refs += 1
                        # 5年内颁布的法律视为近期
                        if (current_date - promul_date).days / 365.25 <= 5:
                            recent_legal_refs += 1
            
            return {
                "assessment_method": "legal_references",
                "recent_legal_references_percentage": (recent_legal_refs / total_legal_refs * 100) if total_legal_refs > 0 else 0,
                "total_legal_references": total_legal_refs
            }
    
    def generate_corpus_quality_report(self, texts, texts_with_sources=None, publication_dates=None):
        """生成完整的语料质量报告"""
        report = {}
        
        # 1. 基本统计
        report["basic_statistics"] = self.calculate_text_statistics(texts)
        
        # 2. 可读性分析(对随机样本)
        sample_size = min(100, len(texts))
        if sample_size > 0:
            import random
            sample_texts = random.sample(texts, sample_size)
            readability_scores = [self.assess_readability(text) for text in sample_texts]
            
            report["readability"] = {
                "average_clause_density": np.mean([r["clause_density"] for r in readability_scores]),
                "average_legal_term_density": np.mean([r["legal_term_density"] for r in readability_scores]),
                "average_sentence_complexity": np.mean([r["sentence_complexity"] for r in readability_scores]),
                "sample_size": sample_size
            }
        else:
            report["readability"] = {
                "error": "Sample size too small for readability analysis"
            }
        
        # 3. 来源可靠性分析
        if texts_with_sources and len(texts_with_sources) > 0:
            report["source_reliability"] = self.analyze_source_reliability(texts_with_sources)
        else:
            report["source_reliability"] = {
                "note": "Source information not provided for reliability analysis"
            }
        
        # 4. 法律术语覆盖分析
        report["legal_term_coverage"] = self.analyze_legal_term_coverage(texts)
        
        # 5. 时效性分析
        report["timeliness"] = self.assess_timeliness(texts, publication_dates)
        
        # 6. 综合质量评分
        # 基于多个指标计算综合质量评分(0-100)
        quality_scores = []
        
        # 文本长度评分(基于平均词数)
        avg_tokens = report["basic_statistics"]["mean_tokens_per_text"]
        if avg_tokens > 2000:
            length_score = 100
        elif avg_tokens < 500:
            length_score = 40
        else:
            length_score = 40 + (avg_tokens - 500) / 15
        quality_scores.append(length_score)
        
        # 词汇多样性评分
        vocab_size = report["basic_statistics"]["vocabulary_size"]
        if vocab_size > 100000:
            vocab_score = 100
        elif vocab_size < 10000:
            vocab_score = 40
        else:
            vocab_score = 40 + (vocab_size - 10000) / 900
        quality_scores.append(vocab_score)
        
        # 法律术语覆盖评分
        term_coverage = report["legal_term_coverage"]["overall_coverage_percentage"]
        quality_scores.append(term_coverage)
        
        # 来源可靠性评分(如果有)
        if "average_reliability" in report["source_reliability"]:
            reliability_score = report["source_reliability"]["average_reliability"] * 100
            quality_scores.append(reliability_score)
        
        # 时效性评分
        if "recent_texts_percentage" in report["timeliness"]:
            timeliness_score = report["timeliness"]["recent_texts_percentage"]
        elif "recent_legal_references_percentage" in report["timeliness"]:
            timeliness_score = report["timeliness"]["recent_legal_references_percentage"]
        else:
            timeliness_score = 50  # 默认中等时效性
        quality_scores.append(timeliness_score)
        
        # 计算综合评分
        report["overall_quality_score"] = np.mean(quality_scores)
        
        return report
    
    def visualize_corpus_characteristics(self, report, output_dir="."):
        """可视化语料库特征"""
        # 创建输出目录
        import os
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 1. 文本长度分布直方图
        plt.figure(figsize=(10, 6))
        if "token_counts" in report["basic_statistics"] and report["basic_statistics"]["token_counts"]:
            plt.hist(report["basic_statistics"]["token_counts"], bins=50, alpha=0.7, color='blue')
            plt.title('文本长度分布')
            plt.xlabel('词数')
            plt.ylabel('频率')
            plt.grid(True, alpha=0.3)
            plt.savefig(os.path.join(output_dir, 'text_length_distribution.png'))
            plt.close()
        
        # 2. 法律术语覆盖饼图
        plt.figure(figsize=(10, 8))
        categories = list(report["legal_term_coverage"]["category_coverage_percentages"].keys())
        percentages = list(report["legal_term_coverage"]["category_coverage_percentages"].values())
        plt.pie(percentages, labels=categories, autopct='%1.1f%%', startangle=90)
        plt.title('法律术语类别覆盖')
        plt.axis('equal')
        plt.savefig(os.path.join(output_dir, 'legal_term_coverage.png'))
        plt.close()
        
        # 3. 来源可靠性条形图
        if "source_distribution" in report["source_reliability"]:
            plt.figure(figsize=(12, 6))
            sources = list(report["source_reliability"]["source_distribution"].keys())
            counts = list(report["source_reliability"]["source_distribution"].values())
            plt.barh(sources, counts, alpha=0.7, color='green')
            plt.title('来源分布')
            plt.xlabel('文档数量')
            plt.tight_layout()
            plt.savefig(os.path.join(output_dir, 'source_distribution.png'))
            plt.close()
        
        print(f"可视化图表已保存到: {output_dir}")

# 使用示例
if __name__ == "__main__":
    import random
    
    # 创建一些示例法律文本
    sample_legal_texts = [
        "第一条 为了保护民事主体的合法权益,调整民事关系,维护社会和经济秩序,适应中国特色社会主义发展要求,弘扬社会主义核心价值观,根据宪法,制定本法。",
        "第二条 民法调整平等主体的自然人、法人和非法人组织之间的人身关系和财产关系。",
        "第三条 民事主体的人身权利、财产权利以及其他合法权益受法律保护,任何组织或者个人不得侵犯。",
        "第十三条 自然人从出生时起到死亡时止,具有民事权利能力,依法享有民事权利,承担民事义务。",
        "第十四条 自然人的民事权利能力一律平等。",
        "第一百一十条 自然人享有生命权、身体权、健康权、姓名权、肖像权、名誉权、荣誉权、隐私权、婚姻自主权等权利。",
        "第一千一百六十五条 行为人因过错侵害他人民事权益造成损害的,应当承担侵权责任。",
        "第二百零六条 国家坚持和完善公有制为主体、多种所有制经济共同发展,按劳分配为主体、多种分配方式并存,社会主义市场经济体制等社会主义基本经济制度。",
        "第三百零一条 处分共有的不动产或者动产以及对共有的不动产或者动产作重大修缮、变更性质或者用途的,应当经占份额三分之二以上的按份共有人或者全体共同共有人同意,但是共有人之间另有约定的除外。",
        "第五百七十七条 当事人一方不履行合同义务或者履行合同义务不符合约定的,应当承担继续履行、采取补救措施或者赔偿损失等违约责任。"
    ]
    
    # 添加来源信息
    sample_sources = ["全国人大", "最高人民法院", "中国裁判文书网", "法学核心期刊", "法律出版社"]
    texts_with_sources = [(text, random.choice(sample_sources)) for text in sample_legal_texts]
    
    # 添加发布日期
    sample_dates = ["2020-05-28", "2020-05-28", "2020-05-28", "2020-05-28", "2020-05-28",
                   "2020-05-28", "2020-05-28", "2020-05-28", "2020-05-28", "2020-05-28"]
    
    # 创建评估器实例
    evaluator = LegalCorpusEvaluator()
    
    # 生成质量报告
    report = evaluator.generate_corpus_quality_report(sample_legal_texts, texts_with_sources, sample_dates)
    
    # 打印报告摘要
    print("=== 法律语料质量评估报告 ===")
    print(f"总体质量评分: {report['overall_quality_score']:.2f}/100")
    print(f"文本总数: {report['basic_statistics']['total_texts']}")
    print(f"总词数: {report['basic_statistics']['total_tokens']}")
    print(f"平均文本长度: {report['basic_statistics']['mean_tokens_per_text']:.2f} 词")
    print(f"词汇量大小: {report['basic_statistics']['vocabulary_size']}")
    print(f"平均条款密度: {report['readability']['average_clause_density']:.2f} 条款/100字")
    print(f"法律术语总体覆盖率: {report['legal_term_coverage']['overall_coverage_percentage']:.2f}%")
    print(f"近期文本百分比: {report['timeliness']['recent_texts_percentage']:.2f}%")
    
    # 可视化结果
    evaluator.visualize_corpus_characteristics(report, "./legal_corpus_visualizations")

## 4. 专业词汇优化技术

### 4.1 领域词汇的重要性与挑战

在医疗和法律等专业领域,精确理解和正确使用专业术语是大模型性能的关键。通用大模型虽然词汇量大,但在专业领域仍存在以下挑战:

- **专业术语覆盖率不足**:许多医学和法律专业术语在通用预训练语料中出现频率低
- **术语歧义理解困难**:同一术语在不同专业语境中可能有完全不同的含义
- **术语变体识别能力弱**:无法有效识别术语的缩写、同义词和相关表达
- **术语组合理解有限**:对专业术语的复杂组合和上下文依赖关系理解不足

专业词汇优化已成为提升领域微调效果的重要技术手段。2025年的研究表明,通过针对性的词汇优化,可以将专业领域任务的性能提升15-30%### 4.2 词表扩展策略

#### 4.2.1 词表扩展方法

在领域微调中,词表扩展是最直接有效的词汇优化方法之一。以下是主要的词表扩展策略:

词表扩展流程
获取领域术语 → 频率统计与筛选 → 语义聚类 → 词表构建 → 模型适配


具体实现可以通过以下步骤:

1. **术语提取**:从大规模领域语料中提取高频专业术语
2. **术语验证**:通过专业词典和规则验证术语的有效性
3. **术语分级**:根据频率和重要性对术语进行分级
4. **嵌入学习**:为新术语学习高质量的词嵌入
5. **模型适配**:调整模型的词嵌入层以适应扩展词表

#### 4.2.2 词表扩展的实现代码

```python
import re
import json
import jieba
from collections import Counter
import numpy as np
from sklearn.cluster import DBSCAN
from transformers import AutoModel, AutoTokenizer

class DomainVocabularyExpander:
    """
    领域词表扩展器,用于扩展预训练模型的词表以包含领域专业术语
    """
    def __init__(self, model_name_or_path):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
        self.model = AutoModel.from_pretrained(model_name_or_path)
        self.existing_vocab = set(self.tokenizer.get_vocab().keys())
        self.new_terms = set()
    
    def extract_domain_terms(self, corpus, min_freq=5, min_length=2):
        """
        从领域语料中提取专业术语
        
        Args:
            corpus: 领域语料列表
            min_freq: 术语最小频率
            min_length: 术语最小长度
            
        Returns:
            提取的术语列表
        """
        # 分词
        all_tokens = []
        for text in corpus:
            tokens = jieba.cut(text)
            all_tokens.extend(tokens)
        
        # 频率统计
        token_counts = Counter(all_tokens)
        
        # 筛选专业术语(基于规则)
        domain_terms = []
        for token, count in token_counts.items():
            if count >= min_freq and len(token) >= min_length and \
               token not in self.existing_vocab and self._is_potential_term(token):
                domain_terms.append((token, count))
        
        # 按频率排序
        domain_terms.sort(key=lambda x: x[1], reverse=True)
        return domain_terms
    
    def expand_tokenizer_vocab(self, new_terms, save_path):
        """
        扩展分词器词表
        
        Args:
            new_terms: 新术语列表
            save_path: 保存路径
        """
        # 提取术语
        terms_to_add = [term for term, _ in new_terms]
        
        # 扩展词表
        self.tokenizer.add_tokens(terms_to_add)
        
        # 调整模型嵌入层
        self.model.resize_token_embeddings(len(self.tokenizer))
        
        # 保存扩展后的分词器和模型
        self.tokenizer.save_pretrained(save_path)
        self.model.save_pretrained(save_path)
        
        print(f"成功扩展词表,新增 {len(terms_to_add)} 个专业术语")
        return self.tokenizer

# 使用示例
if __name__ == "__main__":
    # 初始化词表扩展器
    expander = DomainVocabularyExpander("bert-base-chinese")
    
    # 模拟医疗领域语料
    medical_corpus = [
        "糖尿病是一种慢性代谢性疾病,主要特征是血糖水平持续升高。",
        "高血压患者需要定期监测血压,控制饮食并按时服药。",
        "冠状动脉粥样硬化性心脏病是常见的心血管疾病。",
        "脑卒中分为缺血性脑卒中和出血性脑卒中两种类型。",
        "乳腺癌筛查包括乳腺超声、乳腺X线摄影等方法。",
        "阿尔茨海默病是一种进行性发展的神经系统退行性疾病。",
        "慢性阻塞性肺疾病患者常出现咳嗽、咳痰和呼吸困难等症状。",
        "恶性肿瘤的治疗方法包括手术、放疗、化疗和靶向治疗等。",
        "类风湿关节炎是一种以慢性炎症性关节炎为主要表现的自身免疫性疾病。",
        "甲状腺功能亢进症患者常有心悸、多汗、体重减轻等症状。"
    ]
    
    # 提取领域术语
    domain_terms = expander.extract_domain_terms(medical_corpus, min_freq=1)
    print("提取的医疗领域术语:")
    for term, freq in domain_terms[:10]:  # 打印前10个
        print(f"{term}: {freq}")
    
    # 扩展词表
    expander.expand_tokenizer_vocab(domain_terms[:20], "./expanded_medical_bert")

### 4.3 法律领域专业词汇优化

法律领域的词汇优化具有特殊性,需要考虑法律法规的权威性、时效性和专业性。以下是法律领域词汇优化的关键策略:

法律词汇优化流程
法律法规提取 → 术语分级分类 → 语义关联建立 → 词表构建 → 法规定义关联


#### 4.3.1 法律术语优化实现

```python
class LegalTermOptimizer:
    """
    法律术语优化器,专门用于法律领域的词汇表优化
    """
    def __init__(self, tokenizer_path):
        self.tokenizer_path = tokenizer_path
        self.legal_term_categories = {
            "实体法": [],
            "程序法": [],
            "国际法": [],
            "部门法": [],
            "法理学": []
        }
    
    def load_legal_dictionary(self, dict_path):
        """
        加载法律词典
        
        Args:
            dict_path: 法律词典文件路径
            
        Returns:
            法律术语字典
        """
        legal_terms = {}
        try:
            with open(dict_path, 'r', encoding='utf-8') as f:
                for line in f:
                    if line.strip():
                        parts = line.strip().split('\t')
                        if len(parts) >= 3:
                            term, category, definition = parts[0], parts[1], '\t'.join(parts[2:])
                            legal_terms[term] = {"category": category, "definition": definition}
                            # 按类别分类
                            if category in self.legal_term_categories:
                                self.legal_term_categories[category].append(term)
                            else:
                                self.legal_term_categories[category] = [term]
        except Exception as e:
            print(f"加载法律词典失败: {e}")
        
        print(f"成功加载 {len(legal_terms)} 个法律术语")
        return legal_terms
    
    def analyze_term_coverage(self, corpus, legal_terms):
        """
        分析语料中的法律术语覆盖率
        
        Args:
            corpus: 法律语料列表
            legal_terms: 法律术语字典
            
        Returns:
            覆盖率分析报告
        """
        term_count = {term: 0 for term in legal_terms.keys()}
        total_terms = len(legal_terms)
        covered_terms = 0
        
        # 统计语料中的术语出现频率
        for text in corpus:
            for term in legal_terms.keys():
                if term in text:
                    term_count[term] += 1
        
        # 计算覆盖率
        for term, count in term_count.items():
            if count > 0:
                covered_terms += 1
        
        coverage_rate = (covered_terms / total_terms) * 100
        
        # 按频率排序
        sorted_terms = sorted(term_count.items(), key=lambda x: x[1], reverse=True)
        
        report = {
            "total_legal_terms": total_terms,
            "covered_terms": covered_terms,
            "coverage_rate": coverage_rate,
            "top_10_terms": sorted_terms[:10],
            "zero_coverage_terms_count": total_terms - covered_terms
        }
        
        return report

# 使用示例
if __name__ == "__main__":
    # 初始化法律术语优化器
    optimizer = LegalTermOptimizer("bert-base-chinese")
    
    # 模拟法律词典数据
    legal_terms_dict = {
        "民法典": {"category": "实体法", "definition": "调整平等主体的自然人、法人和非法人组织之间的人身关系和财产关系的法律"},
        "刑法": {"category": "实体法", "definition": "规定犯罪、刑事责任和刑罚的法律"},
        "行政法": {"category": "部门法", "definition": "调整行政主体在行使行政职权和接受行政法制监督过程中而与行政相对人、行政法制监督主体之间发生的各种关系"},
        "民事诉讼法": {"category": "程序法", "definition": "规定民事诉讼程序的法律"},
        "刑事诉讼法": {"category": "程序法", "definition": "规定刑事诉讼程序的法律"},
        "行政诉讼法": {"category": "程序法", "definition": "规定行政诉讼程序的法律"},
        "侵权责任": {"category": "实体法", "definition": "行为人因过错侵害他人民事权益造成损害的,应当承担侵权责任"},
        "合同": {"category": "实体法", "definition": "民事主体之间设立、变更、终止民事法律关系的协议"},
        "不当得利": {"category": "实体法", "definition": "没有合法根据取得不当利益,造成他人损失的,应当将取得的不当利益返还受损失的人"},
        "无因管理": {"category": "实体法", "definition": "没有法定的或者约定的义务,为避免他人利益受损失而进行管理的人,有权请求受益人偿还由此支出的必要费用"}
    }
    
    # 模拟法律语料
    legal_corpus = [
        "根据民法典第一百二十条,民事权益受到侵害的,被侵权人有权请求侵权人承担侵权责任。",
        "合同法规定,当事人订立合同,应当遵循自愿、公平、诚实信用的原则。",
        "刑法第二百六十四条规定,盗窃公私财物,数额较大的,处三年以下有期徒刑、拘役或者管制。",
        "民事诉讼法第一百一十九条规定了起诉的条件。",
        "行政诉讼法第二条规定,公民、法人或者其他组织认为行政机关和行政机关工作人员的行政行为侵犯其合法权益,可以向人民法院提起诉讼。"
    ]
    
    # 分析术语覆盖率
    coverage_report = optimizer.analyze_term_coverage(legal_corpus, legal_terms_dict)
    
    print("=== 法律术语覆盖率分析 ===")
    print(f"法律术语总数: {coverage_report['total_legal_terms']}")
    print(f"已覆盖术语数: {coverage_report['covered_terms']}")
    print(f"覆盖率: {coverage_report['coverage_rate']:.2f}%")
    print("高频法律术语:")
    for term, count in coverage_report['top_10_terms']:
        if count > 0:
            print(f"  {term}: {count}次")
    print(f"未覆盖术语数: {coverage_report['zero_coverage_terms_count']}")

词汇表扩展的核心原理包括:

1. **低频专业术语识别与保留**:识别并保留领域中出现频率较低但对语义理解至关重要的专业术语
2. **新术语动态融入**:将最新出现的专业术语纳入词表
3. **多形态术语统一**:处理术语的不同形态、缩写和变体
4. **术语之间关系建模**:捕获术语之间的层级关系和关联

### 4.2 医疗领域词汇表优化

医疗领域的词汇表优化需要特别关注医学术语的复杂性和专业性。以下是一个医疗词汇表优化的实现示例:

```python
import json
from collections import Counter
import re
import numpy as np
from transformers import AutoTokenizer

class MedicalVocabularyOptimizer:
    """
    医疗领域词汇表优化器,用于扩展和优化预训练模型的词表
    """
    def __init__(self, base_model_name="bert-base-uncased"):
        # 加载基础模型的分词器
        self.base_tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        self.base_vocab = self.base_tokenizer.get_vocab()
        self.vocab_size = len(self.base_vocab)
        
        # 医学术语数据库
        self.medical_terms = set()
        self.term_frequencies = Counter()
    
    def load_medical_terms(self, terms_file):
        """
        从文件加载医学术语
        """
        try:
            with open(terms_file, 'r', encoding='utf-8') as f:
                terms = json.load(f)
            self.medical_terms.update(terms)
            print(f"已加载 {len(terms)} 个医学术语")
        except Exception as e:
            print(f"加载医学术语文件失败: {e}")
    
    def extract_medical_terms_from_corpus(self, corpus_file, min_frequency=2):
        """
        从语料库中提取医学术语
        """
        try:
            with open(corpus_file, 'r', encoding='utf-8') as f:
                corpus = f.read()
            
            # 使用正则表达式提取可能的医学术语
            # 这里使用简化的模式,实际应用需要更复杂的模式和规则
            potential_terms = re.findall(r'\b[A-Za-z]+(?:-[A-Za-z]+)*\b', corpus)
            
            # 过滤出可能的医学术语(这里使用简化的判断逻辑)
            medical_candidates = [term for term in potential_terms if len(term) > 3]
            
            # 统计频率
            term_counts = Counter(medical_candidates)
            
            # 只保留频率大于等于最小阈值的术语
            filtered_terms = {term for term, count in term_counts.items() if count >= min_frequency}
            
            # 更新术语集合和频率计数器
            self.medical_terms.update(filtered_terms)
            self.term_frequencies.update(term_counts)
            
            print(f"从语料库中提取了 {len(filtered_terms)} 个医学术语")
            
        except Exception as e:
            print(f"从语料库提取医学术语失败: {e}")
    
    def identify_uncovered_terms(self):
        """
        识别基础词表中未覆盖的医学术语
        """
        uncovered_terms = set()
        
        for term in self.medical_terms:
            # 检查术语是否被正确分词(即是否作为单个token)
            tokens = self.base_tokenizer.tokenize(term)
            
            # 如果术语被拆分成多个子词,且没有一个子词等于原术语,则认为未被覆盖
            if len(tokens) > 1 or (tokens and tokens[0] != term):
                uncovered_terms.add(term)
        
        print(f"发现 {len(uncovered_terms)} 个未被基础词表覆盖的医学术语")
        return uncovered_terms
    
    def calculate_term_importance(self, uncovered_terms, top_n=1000):
        """
        计算未覆盖术语的重要性,并选择最重要的术语进行词表扩展
        """
        term_importance = []
        
        for term in uncovered_terms:
            # 计算术语重要性得分
            # 这里使用频率作为基础,并考虑术语长度(较长的术语通常更专业)
            frequency = self.term_frequencies.get(term, 1)
            length_score = len(term) / 10  # 归一化长度得分
            importance_score = frequency * (1 + length_score)
            
            term_importance.append((term, importance_score))
        
        # 按重要性排序
        term_importance.sort(key=lambda x: x[1], reverse=True)
        
        # 返回最重要的top_n个术语
        return term_importance[:top_n]
    
    def create_optimized_vocabulary(self, important_terms, output_file=None):
        """
        创建优化后的词汇表
        """
        # 创建新词汇表(基于基础词汇表)
        optimized_vocab = self.base_vocab.copy()
        
        # 添加新术语到词汇表
        new_tokens = []
        for term, _ in important_terms:
            if term not in optimized_vocab:
                # 为新术语分配ID(在基础词汇表大小的基础上递增)
                optimized_vocab[term] = self.vocab_size
                self.vocab_size += 1
                new_tokens.append(term)
        
        print(f"已将 {len(new_tokens)} 个新医学术语添加到词汇表")
        
        # 如果提供了输出文件,保存词汇表
        if output_file:
            try:
                # 词汇表需要保存为字典格式
                with open(output_file, 'w', encoding='utf-8') as f:
                    json.dump(optimized_vocab, f, ensure_ascii=False, indent=2)
                print(f"优化后的词汇表已保存到: {output_file}")
            except Exception as e:
                print(f"保存词汇表失败: {e}")
        
        return optimized_vocab, new_tokens
    
    def generate_tokenizer_config(self, new_tokens, output_dir):
        """
        生成用于扩展tokenizer的配置
        """
        try:
            # 保存新token列表
            with open(f"{output_dir}/new_medical_tokens.json", 'w', encoding='utf-8') as f:
                json.dump(new_tokens, f, ensure_ascii=False, indent=2)
            
            # 生成配置信息
            config = {
                "base_model": self.base_tokenizer.name_or_path,
                "new_tokens_count": len(new_tokens),
                "total_vocab_size": self.vocab_size
            }
            
            with open(f"{output_dir}/vocab_expansion_config.json", 'w', encoding='utf-8') as f:
                json.dump(config, f, ensure_ascii=False, indent=2)
            
            print(f"Tokenizer配置已保存到: {output_dir}")
            return True
        except Exception as e:
            print(f"生成Tokenizer配置失败: {e}")
            return False
    
# 使用示例
if __name__ == "__main__":
    # 创建词汇表优化器实例
    optimizer = MedicalVocabularyOptimizer(base_model_name="bert-base-uncased")
    
    # 加载医学术语文件
    optimizer.load_medical_terms("medical_terms.json")
    
    # 从语料库中提取医学术语
    optimizer.extract_medical_terms_from_corpus("medical_corpus.txt", min_frequency=3)
    
    # 识别未覆盖的术语
    uncovered_terms = optimizer.identify_uncovered_terms()
    
    # 计算术语重要性并选择最重要的术语
    important_terms = optimizer.calculate_term_importance(uncovered_terms, top_n=500)
    
    # 创建优化后的词汇表
    optimized_vocab, new_tokens = optimizer.create_optimized_vocabulary(
        important_terms, 
        output_file="optimized_medical_vocab.json"
    )
    
    # 生成tokenizer配置
    optimizer.generate_tokenizer_config(new_tokens, output_dir="./medical_tokenizer_config")
    
    # 评估词汇表覆盖情况
    coverage_result = optimizer.evaluate_vocabulary_coverage("medical_test_corpus.txt")

4.3 法律领域词汇表优化

法律领域的词汇表优化需要关注法律术语的精确性、多义性和层级关系。以下是一个法律词汇表优化的实现示例:

import json
from collections import Counter, defaultdict
import re
import jieba
from transformers import AutoTokenizer

class LegalVocabularyOptimizer:
    """
    法律领域词汇表优化器,用于扩展和优化预训练模型的词表
    """
    def __init__(self, base_model_name="bert-base-chinese"):
        # 加载基础模型的分词器
        self.base_tokenizer = AutoTokenizer.from_pretrained(base_model_name)
        self.base_vocab = self.base_tokenizer.get_vocab()
        self.vocab_size = len(self.base_vocab)
        
        # 法律术语数据库
        self.legal_terms = set()
        self.term_frequencies = Counter()
        self.term_relationships = defaultdict(set)  # 术语之间的关系
    
    def load_legal_terms(self, terms_file):
        """
        从文件加载法律术语
        """
        try:
            with open(terms_file, 'r', encoding='utf-8') as f:
                terms_data = json.load(f)
                
                # 加载术语
                if "terms" in terms_data:
                    self.legal_terms.update(terms_data["terms"])
                    print(f"已加载 {len(terms_data['terms'])} 个法律术语")
                
                # 加载术语关系(如果有)
                if "relationships" in terms_data:
                    for term, related_terms in terms_data["relationships"].items():
                        self.term_relationships[term].update(related_terms)
                    print(f"已加载术语关系数据")
        except Exception as e:
            print(f"加载法律术语文件失败: {e}")
    
    def extract_legal_terms_from_corpus(self, corpus_file, min_frequency=2):
        """
        从语料库中提取法律术语
        """
        try:
            with open(corpus_file, 'r', encoding='utf-8') as f:
                corpus = f.read()
            
            # 使用jieba分词获取可能的术语
            words = jieba.lcut(corpus)
            
            # 过滤出可能的法律术语(长度、频率过滤)
            # 法律术语通常包含特定的法律用词
            legal_keywords = ['法', '条例', '规定', '合同', '权利', '义务', 
                             '责任', '诉讼', '判决', '裁定', '调解', '仲裁',
                             '侵权', '赔偿', '处罚', '行政', '民事', '刑事']
            
            legal_candidates = []
            for word in words:
                # 长度过滤
                if len(word) < 2:  # 法律术语通常至少为2个字
                    continue
                
                # 关键词过滤
                contains_legal_keyword = any(keyword in word for keyword in legal_keywords)
                if contains_legal_keyword:
                    legal_candidates.append(word)
                # 长词也可能是法律术语
                elif len(word) >= 4:
                    legal_candidates.append(word)
            
            # 统计频率
            term_counts = Counter(legal_candidates)
            
            # 只保留频率大于等于最小阈值的术语
            filtered_terms = {term for term, count in term_counts.items() if count >= min_frequency}
            
            # 更新术语集合和频率计数器
            self.legal_terms.update(filtered_terms)
            self.term_frequencies.update(term_counts)
            
            print(f"从语料库中提取了 {len(filtered_terms)} 个法律术语")
            
        except Exception as e:
            print(f"从语料库提取法律术语失败: {e}")
    
    def identify_uncovered_terms(self):
        """
        识别基础词表中未覆盖的法律术语
        """
        uncovered_terms = set()
        
        for term in self.legal_terms:
            # 检查术语是否被正确分词
            tokens = self.base_tokenizer.tokenize(term)
            
            # 如果术语被拆分成多个子词,且没有一个子词等于原术语,则认为未被覆盖
            if len(tokens) > 1 or (tokens and tokens[0] != term):
                uncovered_terms.add(term)
        
        print(f"发现 {len(uncovered_terms)} 个未被基础词表覆盖的法律术语")
        return uncovered_terms
    
    def calculate_term_importance(self, uncovered_terms, top_n=1000):
        """
        计算未覆盖术语的重要性,并选择最重要的术语进行词表扩展
        """
        term_importance = []
        
        for term in uncovered_terms:
            # 基础得分:频率
            frequency = self.term_frequencies.get(term, 1)
            
            # 关系得分:术语在关系网络中的重要性
            relation_score = len(self.term_relationships.get(term, [])) / 10
            
            # 长度得分:法律术语通常较长
            length_score = len(term) / 10
            
            # 综合重要性得分
            importance_score = frequency * (1 + relation_score + length_score)
            
            term_importance.append((term, importance_score))
        
        # 按重要性排序
        term_importance.sort(key=lambda x: x[1], reverse=True)
        
        # 返回最重要的top_n个术语
        return term_importance[:top_n]
    
    def handle_term_variants(self, terms):
        """
        处理术语变体,将相关变体作为一个术语组处理
        """
        # 这里使用简化的变体处理逻辑,实际应用中需要更复杂的规则
        term_groups = {}
        
        for term, score in terms:
            # 检查是否已经存在相似术语
            found_group = False
            for group_key in term_groups:
                # 如果术语包含在已有组中,或已有组包含在术语中
                if term in group_key or group_key in term:
                    # 将较短的作为组键
                    if len(term) < len(group_key):
                        term_groups[term] = term_groups.pop(group_key)
                        term_groups[term].append((group_key, score))
                    else:
                        term_groups[group_key].append((term, score))
                    found_group = True
                    break
            
            if not found_group:
                term_groups[term] = [(term, score)]
        
        # 从每个组中选择得分最高的术语
        selected_terms = []
        for group_key, group_terms in term_groups.items():
            # 按得分排序,选择最高分的术语
            group_terms.sort(key=lambda x: x[1], reverse=True)
            selected_terms.append(group_terms[0])
        
        print(f"处理术语变体后,选择了 {len(selected_terms)} 个代表性术语")
        return selected_terms
    
    def generate_tokenizer_config(self, new_tokens, output_dir):
        """
        生成用于扩展tokenizer的配置
        """
        try:
            # 保存新token列表
            with open(f"{output_dir}/new_legal_tokens.json", 'w', encoding='utf-8') as f:
                json.dump(new_tokens, f, ensure_ascii=False, indent=2)
            
            # 生成配置信息
            config = {
                "base_model": self.base_tokenizer.name_or_path,
                "new_tokens_count": len(new_tokens),
                "total_vocab_size": self.vocab_size
            }
            
            with open(f"{output_dir}/vocab_expansion_config.json", 'w', encoding='utf-8') as f:
                json.dump(config, f, ensure_ascii=False, indent=2)
            
            print(f"Tokenizer配置已保存到: {output_dir}")
            return True
        except Exception as e:
            print(f"生成Tokenizer配置失败: {e}")
            return False
    
    def evaluate_vocabulary_coverage(self, test_corpus_file):
        """
        评估词汇表对测试语料的覆盖情况
        """
        try:
            with open(test_corpus_file, 'r', encoding='utf-8') as f:
                test_corpus = f.read()
            
            # 分词并统计未覆盖的子词比例
            tokens = self.base_tokenizer.tokenize(test_corpus)
            
            # 对于中文模型,通常没有##标记,这里使用不同的方法评估覆盖情况
            # 计算完整法律术语的识别率
            from collections import Counter
            token_counts = Counter(tokens)
            
            # 计算法律术语识别率
            recognized_legal_terms = 0
            total_legal_terms_in_corpus = 0
            
            # 简单评估:检查重要法律术语是否被作为完整token识别
            important_legal_terms = ['合同', '权利', '义务', '责任', '诉讼', 
                                    '判决', '裁定', '调解', '仲裁', '侵权', 
                                    '赔偿', '处罚', '行政', '民事', '刑事']
            
            for term in important_legal_terms:
                if term in test_corpus:
                    total_legal_terms_in_corpus += test_corpus.count(term)
                    if term in token_counts:
                        recognized_legal_terms += token_counts[term]
            
            # 计算术语识别率
            term_recognition_rate = recognized_legal_terms / total_legal_terms_in_corpus if total_legal_terms_in_corpus > 0 else 0
            
            # 计算平均token长度(较长的token通常表明更好的词汇表覆盖)
            avg_token_length = sum(len(token) for token in tokens) / len(tokens) if tokens else 0
            
            print(f"术语识别率: {term_recognition_rate:.2%}")
            print(f"平均token长度: {avg_token_length:.2f} 字符")
            print(f"总tokens数: {len(tokens)}")
            
            return {
                "term_recognition_rate": term_recognition_rate,
                "avg_token_length": avg_token_length,
                "total_tokens": len(tokens)
            }
        except Exception as e:
            print(f"评估词汇表覆盖失败: {e}")
            return None

# 使用示例
if __name__ == "__main__":
    # 创建词汇表优化器实例
    optimizer = LegalVocabularyOptimizer(base_model_name="bert-base-chinese")
    
    # 加载法律术语文件
    optimizer.load_legal_terms("legal_terms.json")
    
    # 从语料库中提取法律术语
    optimizer.extract_legal_terms_from_corpus("legal_corpus.txt", min_frequency=3)
    
    # 识别未覆盖的术语
    uncovered_terms = optimizer.identify_uncovered_terms()
    
    # 计算术语重要性并选择最重要的术语
    important_terms = optimizer.calculate_term_importance(uncovered_terms, top_n=500)
    
    # 创建优化后的词汇表
    optimized_vocab, new_tokens = optimizer.create_optimized_vocabulary(
        important_terms, 
        output_file="optimized_legal_vocab.json"
    )
    
    # 生成tokenizer配置
    optimizer.generate_tokenizer_config(new_tokens, output_dir="./legal_tokenizer_config")
    
    # 评估词汇表覆盖情况
    coverage_result = optimizer.evaluate_vocabulary_coverage("legal_test_corpus.txt")

4.4 词汇优化的性能评估

词汇表优化的效果需要通过一系列指标来评估。以下是评估词汇优化效果的关键指标:

  1. 词汇覆盖率:优化后的词汇表对领域语料的覆盖程度
  2. 术语识别准确率:正确识别领域术语的比例
  3. 模型性能提升:在下游任务上的准确率、精确率、召回率等指标的提升
  4. 推理效率:词汇优化对模型推理速度的影响

以下是一个评估词汇优化效果的实现示例:

import json
import os
import numpy as np
import matplotlib.pyplot as plt

class VocabularyOptimizationEvaluator:
    """
    词汇优化效果评估器
    """
    def __init__(self):
        self.evaluation_results = {}
    
    def load_baseline_results(self, baseline_file):
        """
        加载基线模型的评估结果
        """
        try:
            with open(baseline_file, 'r', encoding='utf-8') as f:
                self.evaluation_results['baseline'] = json.load(f)
            print(f"已加载基线模型评估结果")
        except Exception as e:
            print(f"加载基线模型评估结果失败: {e}")
    
    def load_optimized_results(self, optimized_file):
        """
        加载优化后模型的评估结果
        """
        try:
            with open(optimized_file, 'r', encoding='utf-8') as f:
                self.evaluation_results['optimized'] = json.load(f)
            print(f"已加载优化后模型评估结果")
        except Exception as e:
            print(f"加载优化后模型评估结果失败: {e}")
    
    def calculate_improvement(self):
        """
        计算优化带来的性能提升
        """
        if 'baseline' not in self.evaluation_results or 'optimized' not in self.evaluation_results:
            print("缺少评估数据,请先加载基线和优化后的结果")
            return None
        
        baseline = self.evaluation_results['baseline']
        optimized = self.evaluation_results['optimized']
        
        improvements = {}
        
        # 计算词汇表相关指标的提升
        if 'vocabulary_coverage' in baseline and 'vocabulary_coverage' in optimized:
            baseline_coverage = baseline['vocabulary_coverage']
            optimized_coverage = optimized['vocabulary_coverage']
            improvements['vocabulary_coverage'] = {
                'absolute_improvement': optimized_coverage - baseline_coverage,
                'relative_improvement': (optimized_coverage - baseline_coverage) / baseline_coverage if baseline_coverage > 0 else 0
            }
        
        # 计算术语识别率的提升
        if 'term_recognition_rate' in baseline and 'term_recognition_rate' in optimized:
            baseline_recognition = baseline['term_recognition_rate']
            optimized_recognition = optimized['term_recognition_rate']
            improvements['term_recognition_rate'] = {
                'absolute_improvement': optimized_recognition - baseline_recognition,
                'relative_improvement': (optimized_recognition - baseline_recognition) / baseline_recognition if baseline_recognition > 0 else 0
            }
        
        # 计算模型性能指标的提升
        for metric in ['accuracy', 'precision', 'recall', 'f1_score']:
            if metric in baseline and metric in optimized:
                baseline_value = baseline[metric]
                optimized_value = optimized[metric]
                improvements[metric] = {
                    'absolute_improvement': optimized_value - baseline_value,
                    'relative_improvement': (optimized_value - baseline_value) / baseline_value if baseline_value > 0 else 0
                }
        
        # 计算推理效率的变化
        if 'inference_time' in baseline and 'inference_time' in optimized:
            baseline_time = baseline['inference_time']
            optimized_time = optimized['inference_time']
            improvements['inference_time'] = {
                'absolute_change': optimized_time - baseline_time,
                'relative_change': (optimized_time - baseline_time) / baseline_time if baseline_time > 0 else 0
            }
        
        self.evaluation_results['improvements'] = improvements
        return improvements
    
    def generate_evaluation_report(self, output_file=None):
        """
        生成评估报告
        """
        # 计算性能提升
        improvements = self.calculate_improvement()
        if improvements is None:
            return None
        
        # 生成报告
        report = {
            'baseline_results': self.evaluation_results['baseline'],
            'optimized_results': self.evaluation_results['optimized'],
            'improvements': improvements,
            'summary': self._generate_summary()
        }
        
        # 保存报告
        if output_file:
            try:
                with open(output_file, 'w', encoding='utf-8') as f:
                    json.dump(report, f, ensure_ascii=False, indent=2)
                print(f"评估报告已保存到: {output_file}")
            except Exception as e:
                print(f"保存评估报告失败: {e}")
        
        return report
    
    def _generate_summary(self):
        """
        生成评估摘要
        """
        improvements = self.evaluation_results.get('improvements', {})
        summary = {
            'major_improvements': [],
            'minor_improvements': [],
            'regressions': [],
            'overall_assessment': ''
        }
        
        # 分析改进情况
        for metric, values in improvements.items():
            relative_imp = values.get('relative_improvement', values.get('relative_change', 0))
            
            # 对于推理时间,负值表示改进(更快)
            if metric == 'inference_time':
                if relative_imp < -0.1:  # 速度提升>10%
                    summary['major_improvements'].append((metric, relative_imp))
                elif relative_imp < -0.05:  # 速度提升5-10%
                    summary['minor_improvements'].append((metric, relative_imp))
                elif relative_imp > 0.05:  # 速度下降>5%
                    summary['regressions'].append((metric, relative_imp))
            else:
                # 对于其他指标,正值表示改进
                if relative_imp > 0.1:  # 提升>10%
                    summary['major_improvements'].append((metric, relative_imp))
                elif relative_imp > 0.05:  # 提升5-10%
                    summary['minor_improvements'].append((metric, relative_imp))
                elif relative_imp < -0.05:  # 下降>5%
                    summary['regressions'].append((metric, relative_imp))
        
        # 生成总体评估
        if summary['regressions']:
            summary['overall_assessment'] = '部分指标出现退化,需要进一步优化'
        elif summary['major_improvements']:
            summary['overall_assessment'] = '显著提升,优化效果良好'
        elif summary['minor_improvements']:
            summary['overall_assessment'] = '有一定提升,可以考虑进一步优化'
        else:
            summary['overall_assessment'] = '改进不明显,可能需要调整优化策略'
        
        return summary
    
#### 5.1.2 医疗微调器实现

在构建好数据集后,需要实现医疗领域的微调器,这将负责加载预训练模型、配置训练参数、执行微调过程,并保存微调后的模型。以下是医疗微调器的实现示例:

```python
import os
import torch
import json
from datetime import datetime
from transformers import (
    AutoModelForSequenceClassification,
    AutoModelForCausalLM,
    AutoModelForQuestionAnswering,
    Trainer,
    TrainingArguments,
    DataCollatorWithPadding,
    logging as transformers_logging
)
from torch.utils.data import DataLoader
from transformers.integrations import TensorBoardCallback

# 设置Transformers日志级别
transformers_logging.set_verbosity_error()

class MedicalFinetuner:
    """
    医疗领域微调器类
    """
    def __init__(self, config=None):
        """
        初始化微调器
        
        参数:
        config: 配置字典,包含微调的各种参数
        """
        self.config = config or {
            "model_name_or_path": "bert-base-chinese",
            "task_type": "classification",  # classification, generation, qa
            "output_dir": "./medical_finetuned_model",
            "num_train_epochs": 3,
            "per_device_train_batch_size": 16,
            "per_device_eval_batch_size": 32,
            "evaluation_strategy": "epoch",
            "save_strategy": "epoch",
            "learning_rate": 2e-5,
            "weight_decay": 0.01,
            "logging_dir": "./logs",
            "logging_steps": 100,
            "warmup_ratio": 0.1,
            "seed": 42,
            "fp16": torch.cuda.is_available(),
            "gradient_accumulation_steps": 1,
            "max_grad_norm": 1.0,
            "load_best_model_at_end": True,
            "metric_for_best_model": "eval_loss",
            "greater_is_better": False
        }
        
        self.model = None
        self.trainer = None
        self.tokenizer = None
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.start_time = None
        self.end_time = None
        
        # 创建输出目录
        os.makedirs(self.config["output_dir"], exist_ok=True)
        os.makedirs(self.config["logging_dir"], exist_ok=True)
        
        print(f"微调器初始化完成,使用设备: {self.device}")
        print(f"输出目录: {self.config['output_dir']}")
    
    def load_model(self):
        """
        加载预训练模型
        """
        try:
            print(f"正在加载预训练模型: {self.config['model_name_or_path']}")
            
            # 根据任务类型加载不同的模型
            if self.config["task_type"] == "classification":
                self.model = AutoModelForSequenceClassification.from_pretrained(
                    self.config["model_name_or_path"],
                    num_labels=2,  # 默认为二分类,可根据实际情况调整
                    ignore_mismatched_sizes=True  # 允许词汇表大小不匹配
                )
            elif self.config["task_type"] == "generation":
                self.model = AutoModelForCausalLM.from_pretrained(
                    self.config["model_name_or_path"],
                    ignore_mismatched_sizes=True,
                    torch_dtype=torch.float16 if self.config["fp16"] else torch.float32
                )
            elif self.config["task_type"] == "qa":
                self.model = AutoModelForQuestionAnswering.from_pretrained(
                    self.config["model_name_or_path"],
                    ignore_mismatched_sizes=True
                )
            else:
                raise ValueError(f"不支持的任务类型: {self.config['task_type']}")
            
            # 将模型移至指定设备
            self.model.to(self.device)
            print(f"模型加载成功,参数数量: {sum(p.numel() for p in self.model.parameters()):,}")
            return True
        except Exception as e:
            print(f"加载模型失败: {e}")
            return False
    
    def setup_tokenizer(self, tokenizer):
        """
        设置分词器
        
        参数:
        tokenizer: 分词器对象
        """
        self.tokenizer = tokenizer
        print(f"分词器设置完成: {tokenizer.__class__.__name__}")
        return True
    
    def create_training_args(self):
        """
        创建训练参数配置
        """
        try:
            training_args = TrainingArguments(
                output_dir=self.config["output_dir"],
                num_train_epochs=self.config["num_train_epochs"],
                per_device_train_batch_size=self.config["per_device_train_batch_size"],
                per_device_eval_batch_size=self.config["per_device_eval_batch_size"],
                evaluation_strategy=self.config["evaluation_strategy"],
                save_strategy=self.config["save_strategy"],
                learning_rate=self.config["learning_rate"],
                weight_decay=self.config["weight_decay"],
                logging_dir=self.config["logging_dir"],
                logging_steps=self.config["logging_steps"],
                warmup_ratio=self.config["warmup_ratio"],
                seed=self.config["seed"],
                fp16=self.config["fp16"],
                gradient_accumulation_steps=self.config["gradient_accumulation_steps"],
                max_grad_norm=self.config["max_grad_norm"],
                load_best_model_at_end=self.config["load_best_model_at_end"],
                metric_for_best_model=self.config["metric_for_best_model"],
                greater_is_better=self.config["greater_is_better"],
                dataloader_pin_memory=False,  # 节省内存
                report_to=["tensorboard"]  # 使用TensorBoard进行可视化
            )
            return training_args
        except Exception as e:
            print(f"创建训练参数失败: {e}")
            return None
    
    def create_trainer(self, train_dataset, eval_dataset=None, compute_metrics=None):
        """
        创建训练器
        
        参数:
        train_dataset: 训练数据集
        eval_dataset: 验证数据集(可选)
        compute_metrics: 评估指标计算函数(可选)
        """
        try:
            # 创建训练参数
            training_args = self.create_training_args()
            if training_args is None:
                return False
            
            # 创建数据收集器
            data_collator = DataCollatorWithPadding(tokenizer=self.tokenizer)
            
            # 创建训练器
            self.trainer = Trainer(
                model=self.model,
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=eval_dataset,
                data_collator=data_collator,
                compute_metrics=compute_metrics,
                callbacks=[TensorBoardCallback()]  # 添加TensorBoard回调
            )
            
            print("训练器创建成功")
            return True
        except Exception as e:
            print(f"创建训练器失败: {e}")
            return False
    
    def train(self):
        """
        执行微调过程
        """
        try:
            if self.trainer is None:
                raise ValueError("请先调用prepare_trainer方法")
            
            self.logger.info("开始评估法律领域模型")
            eval_metrics = self.trainer.evaluate(dataset)
            self.logger.info(f"评估完成,指标: {eval_metrics}")
            
            # 保存评估结果
            self.trainer.log_metrics("eval", eval_metrics)
            self.trainer.save_metrics("eval", eval_metrics)
            
            return eval_metrics
        except Exception as e:
            self.logger.error(f"评估失败: {e}")
            return None
    
    def save_model(self, output_dir=None):
        """
        保存微调后的模型
        
        参数:
        output_dir: 输出目录,默认为None使用训练器默认目录
        """
        try:
            if self.trainer is None:
                raise ValueError("请先调用prepare_trainer方法")
            
            save_dir = output_dir or f"./legal_model_output_{self.config['legal_task_type']}"
            self.logger.info(f"保存模型到: {save_dir}")
            
            # 保存模型、分词器和配置
            self.trainer.save_model(save_dir)
            if self.tokenizer:
                self.tokenizer.save_pretrained(save_dir)
            
            # 保存法律领域配置
            import json
            with open(f"{save_dir}/legal_config.json", "w", encoding="utf-8") as f:
                json.dump(self.config, f, ensure_ascii=False, indent=2)
            
            self.logger.info("模型保存完成")
            return True
        except Exception as e:
            self.logger.error(f"保存模型失败: {e}")
            return False
    
    def push_to_hub(self, repo_name, organization=None):
        """
        将模型推送到Hugging Face Hub
        
        参数:
        repo_name: 仓库名称
        organization: 组织名称(可选)
        """
        try:
            if self.trainer is None:
                raise ValueError("请先调用prepare_trainer方法")
            
            full_repo_name = f"{organization}/{repo_name}" if organization else repo_name
            self.logger.info(f"推送到Hugging Face Hub: {full_repo_name}")
            
            # 推送模型
            self.trainer.push_to_hub(
                repo_name=full_repo_name,
                commit_message="法律领域微调模型",
                tags=["legal", "finetuned", self.config["legal_task_type"]]
            )
            
            self.logger.info("模型推送完成")
            return True
        except Exception as e:
            self.logger.error(f"推送模型失败: {e}")
            return False
                ```python
# 法律领域微调器使用示例
if __name__ == "__main__":
    # 创建法律微调器实例
    legal_finetuner = LegalFinetuner(
        model_name_or_path="bert-base-chinese",
        config={
            "learning_rate": 1e-5,
            "batch_size": 8,
            "num_epochs": 3,
            "max_seq_length": 512,
            "save_steps": 500,
            "eval_steps": 250,
            "legal_weight_decay": 0.01,
            "legal_task_type": "classification",
            "num_labels": 3,
            "legal_dropout": 0.1
        }
    )
    
    # 加载模型
    if not legal_finetuner.load_model():
        print("模型加载失败")
        exit(1)
    
    # 假设我们已经有了处理好的数据集
    # 这里使用模拟数据,实际应用中应该加载真实的法律数据集
    from datasets import Dataset
    import pandas as pd
    import numpy as np
    
    # 模拟法律文本分类数据集
    # 创建法律文本示例(合同条款)
    legal_texts = [
        "甲方应当按照合同约定的时间向乙方支付货款,逾期支付的,每逾期一日,应当按照未支付金额的万分之三向乙方支付违约金。",
        "乙方应当确保交付的产品符合国家相关标准,如因产品质量问题给甲方造成损失的,乙方应当承担赔偿责任。",
        "本合同自双方签字盖章之日起生效,有效期为两年,期满前一个月,双方可协商续签事宜。",
        "甲方有权对乙方提供的服务进行监督和检查,发现问题有权要求乙方立即整改。",
        "任何一方违反本合同约定的,应当承担违约责任,并赔偿由此给对方造成的全部损失。",
        "本合同履行过程中发生争议的,双方应当协商解决;协商不成的,任何一方均有权向有管辖权的人民法院提起诉讼。"
    ]
    
    # 模拟标签:0=违约责任, 1=质量条款, 2=争议解决
    labels = [0, 1, 0, 1, 0, 2]
    
    # 创建数据集
    df = pd.DataFrame({"text": legal_texts, "label": labels})
    dataset = Dataset.from_pandas(df)
    
    # 分割数据集
    datasets = dataset.train_test_split(test_size=0.3, seed=42)
    train_dataset = datasets["train"]
    eval_dataset = datasets["test"]
    
    # 预处理函数
    def preprocess_function(examples):
        return legal_finetuner.tokenizer(
            examples["text"], 
            truncation=True, 
            padding="max_length", 
            max_length=legal_finetuner.config["max_seq_length"]
        )
    
    # 应用预处理
    train_dataset = train_dataset.map(preprocess_function, batched=True)
    eval_dataset = eval_dataset.map(preprocess_function, batched=True)
    
    # 设置格式
    train_dataset.set_format(columns=["input_ids", "attention_mask", "label"])
    eval_dataset.set_format(columns=["input_ids", "attention_mask", "label"])
    
    # 准备训练器
    if not legal_finetuner.prepare_trainer(train_dataset, eval_dataset):
        print("训练器准备失败")
        exit(1)
    
    # 执行微调
    metrics = legal_finetuner.fine_tune()
    if metrics:
        print(f"微调成功,训练指标: {metrics}")
    else:
        print("微调失败")
        exit(1)
    
    # 评估模型
    eval_results = legal_finetuner.evaluate(eval_dataset)
    if eval_results:
        print(f"评估成功,结果: {eval_results}")
    
    # 保存模型
    if legal_finetuner.save_model("./legal_classification_model"):
        print("模型保存成功")
    
    # 测试模型推理
    def predict_legal_text(text):
        # 准备输入
        inputs = legal_finetuner.tokenizer(
            text, 
            truncation=True, 
            padding="max_length", 
            max_length=legal_finetuner.config["max_seq_length"],
            return_tensors="pt"
        )
        
        # 模型推理
        legal_finetuner.model.eval()
        import torch
        with torch.no_grad():
            outputs = legal_finetuner.model(**inputs)
            predictions = torch.argmax(outputs.logits, dim=-1)
        
        # 标签映射
        label_map = {0: "违约责任", 1: "质量条款", 2: "争议解决"}
        return label_map[predictions.item()]
    
    # 测试预测
    test_text = "当事人一方不履行合同义务或者履行合同义务不符合约定的,应当承担继续履行、采取补救措施或者赔偿损失等违约责任。"
    prediction = predict_legal_text(test_text)
    print(f"\n测试文本分类结果:")
    print(f"文本: {test_text}")
    print(f"预测类别: {prediction}")
    
    print("\n法律领域模型微调流程完成")
        print("开始微调过程...")
        self.start_time = datetime.now()
        print(f"开始时间: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
        
        # 执行训练
        train_result = self.trainer.train()
        
        self.end_time = datetime.now()
        duration = (self.end_time - self.start_time).total_seconds() / 60  # 转换为分钟
        
        print(f"微调完成")
        print(f"结束时间: {self.end_time.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"耗时: {duration:.2f} 分钟")
        
        # 保存训练结果
        metrics = train_result.metrics
        self.trainer.log_metrics("train", metrics)
        self.trainer.save_metrics("train", metrics)
        self.trainer.save_state()
        
        return True
    except Exception as e:
        print(f"微调过程失败: {e}")
        return False

def evaluate(self, dataset=None):
    """
    评估微调后的模型
    
    参数:
    dataset: 要评估的数据集,如果为None则使用训练器中的验证集
    """
    try:
        if self.trainer is None:
            print("训练器未初始化")
            return None
        
        print("开始评估...")
        eval_result = self.trainer.evaluate(eval_dataset=dataset)
        
        # 记录评估结果
        self.trainer.log_metrics("eval", eval_result)
        self.trainer.save_metrics("eval", eval_result)
        
        # 格式化并打印评估结果
        print("评估结果:")
        for key, value in eval_result.items():
            if key.startswith("eval_"):
                print(f"  {key[5:]}: {value:.4f}")
        
        return eval_result
    except Exception as e:
        print(f"评估失败: {e}")
        return None

def save_model(self, output_dir=None):
    """
    保存微调后的模型
    
    参数:
    output_dir: 模型保存目录,如果为None则使用配置中的目录
    """
    try:
        save_dir = output_dir or self.config["output_dir"]
        
        print(f"正在保存模型到: {save_dir}")
        self.trainer.save_model(save_dir)
        self.tokenizer.save_pretrained(save_dir)
        
        # 保存微调配置和元数据
        metadata = {
            "model_name": self.config["model_name_or_path"],
            "task_type": self.config["task_type"],
            "training_start_time": self.start_time.strftime('%Y-%m-%d %H:%M:%S') if self.start_time else "N/A",
            "training_end_time": self.end_time.strftime('%Y-%m-%d %H:%M:%S') if self.end_time else "N/A",
            "training_duration_minutes": ((self.end_time - self.start_time).total_seconds() / 60) if self.start_time and self.end_time else "N/A",
            "config": self.config
        }
        
        with open(os.path.join(save_dir, "training_metadata.json"), 'w', encoding='utf-8') as f:
            json.dump(metadata, f, ensure_ascii=False, indent=2)
        
        print(f"模型和配置已成功保存到: {save_dir}")
        return True
    except Exception as e:
        print(f"保存模型失败: {e}")
        return False

def push_to_hub(self, repo_name, token):
    """
    将模型推送到Hugging Face Hub
    
    参数:
    repo_name: 仓库名称
    token: Hugging Face访问令牌
    """
    try:
        print(f"正在将模型推送到Hugging Face Hub: {repo_name}")
        self.trainer.push_to_hub(
            repo_name=repo_name,
            use_auth_token=token,
            commit_message="医疗领域微调模型"
        )
        print("模型推送成功")
        return True
    except Exception as e:
        print(f"推送模型失败: {e}")
        return False

使用示例

if name == “main”:
# 加载必要的组件
from transformers import AutoTokenizer
from datasets import load_from_disk

# 创建微调器实例
finetuner = MedicalFinetuner({
    "model_name_or_path": "bert-base-chinese",
    "task_type": "classification",
    "output_dir": "./medical_classification_model",
    "num_train_epochs": 3,
    "per_device_train_batch_size": 8,
    "per_device_eval_batch_size": 16,
    "learning_rate": 2e-5,
    "weight_decay": 0.01,
    "warmup_ratio": 0.1,
    "evaluation_strategy": "epoch",
    "save_strategy": "epoch",
    "logging_dir": "./medical_logs",
    "logging_steps": 50,
    "fp16": torch.cuda.is_available(),
    "load_best_model_at_end": True,
    "metric_for_best_model": "eval_accuracy",
    "greater_is_better": True
})

# 加载模型
finetuner.load_model()

# 加载分词器
tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")
finetuner.setup_tokenizer(tokenizer)

# 加载处理后的数据集
tokenized_datasets = load_from_disk("./medical_tokenized_dataset")

# 定义评估指标计算函数
import evaluate
accuracy = evaluate.load("accuracy")

def compute_metrics(eval_pred):
    predictions, labels = eval_pred
    predictions = predictions.argmax(axis=-1)
    return accuracy.compute(predictions=predictions, references=labels)

# 创建训练器
finetuner.create_trainer(
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    compute_metrics=compute_metrics
)

# 执行微调
finetuner.train()

# 在测试集上评估
test_results = finetuner.evaluate(tokenized_datasets["test"])

# 保存模型
finetuner.save_model()

### 5.2 法律领域微调实现

法律领域的微调实现需要特别考虑法律文本的专业性和严谨性。与医疗领域类似,我们也需要构建法律数据集和实现法律微调器。

5.2.1 法律数据集构建

法律领域的数据集构建需要处理各种类型的法律文本,包括法律法规、判例、司法解释等。以下是法律数据集构建的实现示例:

import json
import os
import random
import pandas as pd
from datasets import Dataset, DatasetDict
from sklearn.model_selection import train_test_split
import re

class LegalDataset:
    """
    法律领域数据集构建类
    """
    def __init__(self, config=None):
        """
        初始化数据集构建器
        
        参数:
        config: 配置字典,包含数据集构建的各种参数
        """
        self.config = config or {
            "max_seq_length": 512,
            "test_size": 0.1,
            "validation_size": 0.1,
            "seed": 42,
            "legal_text_types": ["law", "regulation", "case", "judgment"],
            "min_text_length": 50  # 最小文本长度要求
        }
        self.data = {
            "train": [],
            "validation": [],
            "test": []
        }
        
        # 设置随机种子以确保可重复性
        random.seed(self.config["seed"])
    
    def load_raw_data(self, data_path):
        """
        加载原始法律数据
        
        参数:
        data_path: 数据文件路径或目录路径
        """
        try:
            all_data = []
            
            # 如果是目录,递归读取所有文件
            if os.path.isdir(data_path):
                for root, dirs, files in os.walk(data_path):
                    for file in files:
                        file_path = os.path.join(root, file)
                        file_data = self._load_single_file(file_path)
                        if file_data:
                            all_data.extend(file_data)
            
            # 如果是文件,直接读取
            elif os.path.isfile(data_path):
                file_data = self._load_single_file(data_path)
                if file_data:
                    all_data.extend(file_data)
            
            print(f"已加载法律数据,共 {len(all_data)} 条记录")
            return pd.DataFrame(all_data)
        except Exception as e:
            print(f"加载原始数据失败: {e}")
            return None
    
    def _load_single_file(self, file_path):
        """
        加载单个文件的数据
        
        参数:
        file_path: 文件路径
        """
        try:
            if file_path.endswith(".csv"):
                df = pd.read_csv(file_path)
                return df.to_dict("records")
            elif file_path.endswith(".json"):
                with open(file_path, 'r', encoding='utf-8') as f:
                    return json.load(f)
            elif file_path.endswith(".jsonl"):
                data = []
                with open(file_path, 'r', encoding='utf-8') as f:
                    for line in f:
                        if line.strip():
                            data.append(json.loads(line))
                return data
            elif file_path.endswith(".txt"):
                # 简单的TXT文件处理,假设每行一个文档
                data = []
                with open(file_path, 'r', encoding='utf-8') as f:
                    for line in f:
                        line = line.strip()
                        if line:
                            data.append({"text": line})
                return data
            else:
                print(f"跳过不支持的文件格式: {file_path}")
                return []
        except Exception as e:
            print(f"加载文件 {file_path} 失败: {e}")
            return []
    
    def preprocess_legal_text(self, df):
        """
        预处理法律文本
        
        参数:
        df: 包含原始法律数据的DataFrame
        """
        try:
            # 1. 确保必要的字段存在
            required_columns = ["text"]
            for col in required_columns:
                if col not in df.columns:
                    print(f"警告: 缺少必要字段 {col}")
                    return None
            
            # 2. 处理缺失值
            df = df.dropna(subset=required_columns)
            
            # 3. 法律文本专用清洗
            def clean_legal_text(text):
                # 移除HTML标签
                text = re.sub(r'<[^>]+>', '', str(text))
                # 移除多余的空白字符
                text = re.sub(r'\s+', ' ', text)
                # 移除特殊字符,但保留法律文本中的关键符号
                text = re.sub(r'[\x00-\x08\x0b-\x0c\x0e-\x1f]', '', text)
                # 移除常见的页眉页脚标记
                text = re.sub(r'^第\d+页/共\d+页\s*', '', text)
                text = re.sub(r'^.*法律条文.*$\n?', '', text, flags=re.MULTILINE)
                # 移除多余的标点符号
                text = re.sub(r'([。,;!?])\1+', '\1', text)
                return text.strip()
            
            # 应用清洗函数
            df["text"] = df["text"].apply(clean_legal_text)
            
            # 4. 过滤过短的文本
            df = df[df["text"].str.len() >= self.config["min_text_length"]]
            
            # 5. 添加文本类型标签(如果不存在)
            if "type" not in df.columns and "label" not in df.columns:
                # 根据文本内容推测法律文本类型
                def infer_text_type(text):
                    text = str(text).lower()
                    if re.search(r'法\s*第\s*\d+\s*条', text):
                        return "law"
                    elif re.search(r'条例|规定|办法', text):
                        return "regulation"
                    elif re.search(r'判决书|裁定', text):
                        return "judgment"
                    elif re.search(r'案例|判例', text):
                        return "case"
                    else:
                        return "other"
                
                df["type"] = df["text"].apply(infer_text_type)
            
            # 6. 如果只有type字段,将其映射到label
            if "type" in df.columns and "label" not in df.columns:
                type_to_label = {t: i for i, t in enumerate(self.config["legal_text_types"])}
                df["label"] = df["type"].map(lambda x: type_to_label.get(x, -1))
                # 移除未映射的标签
                df = df[df["label"] != -1]
            
            print(f"法律文本预处理完成,剩余 {len(df)} 条有效数据")
            
            # 7. 统计各类型法律文本的数量
            if "type" in df.columns:
                type_counts = df["type"].value_counts()
                print("法律文本类型分布:")
                for t, count in type_counts.items():
                    print(f"  - {t}: {count} 条")
            
            return df
        except Exception as e:
            print(f"预处理法律文本失败: {e}")
            return None
    
    def split_dataset(self, df):
        """
        将数据集分割为训练集、验证集和测试集
        
        参数:
        df: 预处理后的DataFrame
        """
        try:
            # 优先使用label进行分层采样,确保各类别在各集合中比例一致
            stratify_col = "label" if "label" in df.columns else None
            
            # 先分割训练集和临时集
            train_df, temp_df = train_test_split(
                df, 
                test_size=self.config["test_size"] + self.config["validation_size"],
                random_state=self.config["seed"],
                stratify=df[stratify_col] if stratify_col else None
            )
            
            # 从临时集中分割验证集和测试集
            if self.config["validation_size"] > 0:
                validation_ratio = self.config["validation_size"] / (
                    self.config["test_size"] + self.config["validation_size"]
                )
                validation_df, test_df = train_test_split(
                    temp_df, 
                    test_size=validation_ratio,
                    random_state=self.config["seed"],
                    stratify=temp_df[stratify_col] if stratify_col else None
                )
                self.data["validation"] = validation_df.to_dict("records")
            else:
                test_df = temp_df
            
            self.data["train"] = train_df.to_dict("records")
            self.data["test"] = test_df.to_dict("records")
            
            print(f"数据集分割完成:")
            print(f"  - 训练集: {len(self.data['train'])} 条")
            print(f"  - 验证集: {len(self.data['validation'])} 条")
            print(f"  - 测试集: {len(self.data['test'])} 条")
            
            # 验证各集的类型分布是否均衡
            if "type" in df.columns:
                self._check_distribution_balance()
            
            return True
        except Exception as e:
            print(f"分割数据集失败: {e}")
            return False
    
    def _check_distribution_balance(self):
        """
        检查各数据集的分布均衡性
        """
        try:
            print("\n各数据集类型分布检查:")
            
            for split_name, split_data in self.data.items():
                if not split_data:
                    continue
                    
                split_df = pd.DataFrame(split_data)
                type_counts = split_df["type"].value_counts()
                total = len(split_data)
                
                print(f"{split_name} 集:")
                for t, count in type_counts.items():
                    percentage = (count / total) * 100
                    print(f"  - {t}: {count} 条 ({percentage:.1f}%)")
        except Exception as e:
            print(f"检查分布均衡性失败: {e}")
    
    def create_huggingface_dataset(self):
        """
        创建Hugging Face格式的数据集
        """
        try:
            # 将字典数据转换为Hugging Face的Dataset对象
            datasets = {}
            for split, data in self.data.items():
                if data:  # 只处理非空的数据集
                    datasets[split] = Dataset.from_list(data)
            
            dataset_dict = DatasetDict(datasets)
            print(f"已创建Hugging Face数据集,包含 {list(dataset_dict.keys())} 分割")
            return dataset_dict
        except Exception as e:
            print(f"创建Hugging Face数据集失败: {e}")
            return None
    
    def tokenize_dataset(self, dataset_dict, tokenizer):
        """
        使用tokenizer对数据集进行分词处理
        
        参数:
        dataset_dict: Hugging Face的DatasetDict对象
        tokenizer: 分词器对象
        """
        try:
            def tokenize_function(examples):
                # 根据不同任务类型进行分词
                if "label" in examples:
                    # 分类任务
                    return tokenizer(
                        examples["text"],
                        padding="max_length",
                        truncation=True,
                        max_length=self.config["max_seq_length"]
                    )
                elif "question" in examples and "context" in examples:
                    # 法律问答任务
                    return tokenizer(
                        examples["question"],
                        examples["context"],
                        padding="max_length",
                        truncation=True,
                        max_length=self.config["max_seq_length"]
                    )
                elif "query" in examples and "answer" in examples:
                    # 法律检索任务
                    return tokenizer(
                        examples["query"],
                        examples["answer"],
                        padding="max_length",
                        truncation=True,
                        max_length=self.config["max_seq_length"]
                    )
                else:
                    # 法律文本生成任务
                    return tokenizer(
                        examples["text"],
                        padding="max_length",
                        truncation=True,
                        max_length=self.config["max_seq_length"]
                    )
            
            # 应用分词函数到数据集
            tokenized_datasets = dataset_dict.map(
                tokenize_function,
                batched=True,
                remove_columns=dataset_dict["train"].column_names if "train" in dataset_dict else None
            )
            
            # 设置格式以适应PyTorch
            tokenized_datasets.set_format("torch")
            
            print("数据集分词完成")
            return tokenized_datasets
        except Exception as e:
            print(f"分词数据集失败: {e}")
            return None
    
    def save_dataset(self, dataset_dict, output_dir):
        """
        保存处理后的数据集
        
        参数:
        dataset_dict: 处理后的数据集对象
        output_dir: 输出目录路径
        """
        try:
            # 创建输出目录
            os.makedirs(output_dir, exist_ok=True)
            
            # 保存数据集
            dataset_dict.save_to_disk(output_dir)
            print(f"数据集已保存到: {output_dir}")
            
            # 保存数据集统计信息
            stats = {
                "train_size": len(dataset_dict["train"]) if "train" in dataset_dict else 0,
                "validation_size": len(dataset_dict["validation"]) if "validation" in dataset_dict else 0,
                "test_size": len(dataset_dict["test"]) if "test" in dataset_dict else 0,
                "config": self.config
            }
            
            # 添加各集的类型分布统计
            distribution_stats = {}
            for split_name in ["train", "validation", "test"]:
                if split_name in dataset_dict:
                    split_df = dataset_dict[split_name].to_pandas()
                    if "type" in split_df.columns:
                        distribution_stats[split_name] = split_df["type"].value_counts().to_dict()
            
            stats["type_distribution"] = distribution_stats
            
            with open(os.path.join(output_dir, "dataset_stats.json"), 'w', encoding='utf-8') as f:
                json.dump(stats, f, ensure_ascii=False, indent=2)
            
            return True
        except Exception as e:
            print(f"保存数据集失败: {e}")
            return False

# 使用示例
if __name__ == "__main__":
    # 创建数据集构建器实例
    dataset_builder = LegalDataset({
        "max_seq_length": 512,
        "test_size": 0.1,
        "validation_size": 0.1,
        "seed": 42,
        "legal_text_types": ["law", "regulation", "case", "judgment", "other"],
        "min_text_length": 100
    })
    
    # 加载原始法律数据
    df = dataset_builder.load_raw_data("./legal_texts")
    # 也可以加载单个文件
    # df = dataset_builder.load_raw_data("legal_corpus.json")
    
    if df is not None:
        # 预处理法律文本
        processed_df = dataset_builder.preprocess_legal_text(df)
        
        if processed_df is not None:
            # 分割数据集
            dataset_builder.split_dataset(processed_df)
            
            # 创建Hugging Face格式的数据集
            hf_dataset = dataset_builder.create_huggingface_dataset()
            
            if hf_dataset is not None:
                # 加载tokenizer并分词
                from transformers import AutoTokenizer
                tokenizer = AutoTokenizer.from_pretrained("bert-base-chinese")
                tokenized_dataset = dataset_builder.tokenize_dataset(hf_dataset, tokenizer)
                
                if tokenized_dataset is not None:
    #### 5.2.2 法律微调器实现

法律微调器需要考虑法律文本的专业性和严谨性,特别是在处理法律术语、法律推理和法律解释方面。以下是法律微调器的实现示例:

```python
import os
import torch
import json
import time
import numpy as np
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    DataCollatorForLanguageModeling,
    AutoModelForSequenceClassification
)
from datasets import load_from_disk
from sklearn.metrics import (
    accuracy_score,
    precision_recall_fscore_support,
    classification_report
)
import matplotlib.pyplot as plt

class LegalFinetuner:
    """
    法律领域模型微调器
    """
    def __init__(self, config=None):
        """
        初始化法律微调器
        
        参数:
        config: 配置字典,包含微调相关的各种参数
        """
        self.config = config or {
            "model_name": "uer/gpt2-chinese-cluecorpussmall",
            "task_type": "causal_lm",  # causal_lm, seq_classification
            "output_dir": "./legal_finetuned_model",
            "num_train_epochs": 3,
            "per_device_train_batch_size": 4,
            "per_device_eval_batch_size": 4,
            "learning_rate": 5e-5,
            "weight_decay": 0.01,
            "warmup_ratio": 0.1,
            "max_grad_norm": 1.0,
            "save_strategy": "epoch",
            "evaluation_strategy": "epoch",
            "logging_steps": 100,
            "seed": 42,
            "fp16": True,
            "gradient_accumulation_steps": 4,
            "disable_tqdm": False,
            "push_to_hub": False,
            "hub_model_id": "",
            "hub_token": ""
        }
        
        # 设置随机种子以确保可重复性
        torch.manual_seed(self.config["seed"])
        np.random.seed(self.config["seed"])
        
        # 初始化模型和分词器
        self.model = None
        self.tokenizer = None
        self.trainer = None
        self.metrics = {}
        
        # 检查GPU可用性
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"使用设备: {self.device}")
    
    def load_model(self):
        """
        加载预训练模型和分词器
        """
        try:
            print(f"正在加载模型: {self.config['model_name']}")
            start_time = time.time()
            
            # 加载分词器
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.config["model_name"],
                trust_remote_code=True  # 对于一些自定义模型可能需要
            )
            
            # 确保分词器有pad_token
            if self.tokenizer.pad_token is None:
                self.tokenizer.pad_token = self.tokenizer.eos_token
            
            # 根据任务类型加载不同的模型
            if self.config["task_type"] == "causal_lm":
                self.model = AutoModelForCausalLM.from_pretrained(
                    self.config["model_name"],
                    trust_remote_code=True,
                    torch_dtype=torch.float16 if self.config["fp16"] and self.device == "cuda" else torch.float32
                )
            elif self.config["task_type"] == "seq_classification":
                # 对于分类任务,需要指定类别数量
                num_labels = self.config.get("num_labels", 5)  # 默认5个类别
                self.model = AutoModelForSequenceClassification.from_pretrained(
                    self.config["model_name"],
                    num_labels=num_labels,
                    trust_remote_code=True,
                    torch_dtype=torch.float16 if self.config["fp16"] and self.device == "cuda" else torch.float32
                )
            else:
                raise ValueError(f"不支持的任务类型: {self.config['task_type']}")
            
            # 将模型移至指定设备
            self.model.to(self.device)
            
            load_time = time.time() - start_time
            print(f"模型加载完成,耗时: {load_time:.2f} 秒")
            
            # 打印模型配置信息
            print(f"模型参数量: {sum(p.numel() for p in self.model.parameters()):,}")
            
            return True
        except Exception as e:
            print(f"加载模型失败: {e}")
            return False
    
    def load_dataset(self, dataset_dir):
        """
        加载预处理好的数据集
        
        参数:
        dataset_dir: 数据集目录路径
        """
        try:
            print(f"正在加载数据集: {dataset_dir}")
            dataset = load_from_disk(dataset_dir)
            print(f"数据集加载完成,包含以下分割: {list(dataset.keys())}")
            return dataset
        except Exception as e:
            print(f"加载数据集失败: {e}")
            return None
    
    def create_training_args(self):
        """
        创建训练参数
        """
        try:
            # 创建输出目录
            os.makedirs(self.config["output_dir"], exist_ok=True)
            
            # 创建TrainingArguments
            training_args = TrainingArguments(
                output_dir=self.config["output_dir"],
                num_train_epochs=self.config["num_train_epochs"],
                per_device_train_batch_size=self.config["per_device_train_batch_size"],
                per_device_eval_batch_size=self.config["per_device_eval_batch_size"],
                learning_rate=self.config["learning_rate"],
                weight_decay=self.config["weight_decay"],
                warmup_ratio=self.config["warmup_ratio"],
                max_grad_norm=self.config["max_grad_norm"],
                save_strategy=self.config["save_strategy"],
                evaluation_strategy=self.config["evaluation_strategy"],
                logging_steps=self.config["logging_steps"],
                seed=self.config["seed"],
                fp16=self.config["fp16"] and self.device == "cuda",
                gradient_accumulation_steps=self.config["gradient_accumulation_steps"],
                disable_tqdm=self.config["disable_tqdm"],
                push_to_hub=self.config["push_to_hub"],
                hub_model_id=self.config["hub_model_id"],
                hub_token=self.config["hub_token"],
                # 法律文本特有的参数
                load_best_model_at_end=True if "validation" in self.config else False,
                metric_for_best_model="accuracy" if self.config["task_type"] == "seq_classification" else "loss"
            )
            
            return training_args
        except Exception as e:
            print(f"创建训练参数失败: {e}")
            return None
    
    def create_data_collator(self):
        """
        创建数据收集器
        """
        try:
            if self.config["task_type"] == "causal_lm":
                # 对于语言模型,使用掩码语言建模的数据收集器
                data_collator = DataCollatorForLanguageModeling(
                    tokenizer=self.tokenizer,
                    mlm=False,  # GPT类模型使用因果语言建模,不需要掩码
                    pad_to_multiple_of=8
                )
            else:
                # 对于分类任务,使用默认的数据收集器
                data_collator = None
            
            return data_collator
        except Exception as e:
            print(f"创建数据收集器失败: {e}")
            return None
    
    def compute_metrics(self, eval_pred):
        """
        计算评估指标
        
        参数:
        eval_pred: 评估预测结果,包含(predictions, labels)
        """
        try:
            predictions, labels = eval_pred
            
            if self.config["task_type"] == "causal_lm":
                # 对于语言模型,直接返回损失
                return {"loss": np.mean(predictions)}
            elif self.config["task_type"] == "seq_classification":
                # 对于分类任务,计算准确率、精确率、召回率和F1分数
                if isinstance(predictions, tuple):
                    predictions = predictions[0]
                
                # 转换logits为预测标签
                predictions = np.argmax(predictions, axis=1)
                
                # 计算评估指标
                accuracy = accuracy_score(labels, predictions)
                precision, recall, f1, _ = precision_recall_fscore_support(
                    labels, 
                    predictions, 
                    average="weighted"
                )
                
                # 保存详细的分类报告
                report = classification_report(
                    labels, 
                    predictions, 
                    output_dict=True
                )
                self.metrics["classification_report"] = report
                
                return {
                    "accuracy": accuracy,
                    "precision": precision,
                    "recall": recall,
                    "f1": f1
                }
            else:
                return {}
        except Exception as e:
            print(f"计算评估指标失败: {e}")
            return {}
    
    def create_trainer(self, dataset, training_args, data_collator):
        """
        创建训练器
        
        参数:
        dataset: 数据集对象
        training_args: 训练参数
        data_collator: 数据收集器
        """
        try:
            # 确定训练集和评估集
            train_dataset = dataset["train"]
            eval_dataset = dataset["validation"] if "validation" in dataset else None
            
            # 创建Trainer
            self.trainer = Trainer(
                model=self.model,
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=eval_dataset,
                data_collator=data_collator,
                tokenizer=self.tokenizer,
                compute_metrics=self.compute_metrics if self.config["task_type"] == "seq_classification" else None
            )
            
            print("训练器创建完成")
            return True
        except Exception as e:
            print(f"创建训练器失败: {e}")
            return False
    
    def train(self):
        """
        执行微调训练
        """
        try:
            if self.trainer is None:
                print("错误: 训练器未创建")
                return False
            
            print("开始微调训练...")
            start_time = time.time()
            
            # 执行训练
            train_result = self.trainer.train()
            
            train_time = time.time() - start_time
            print(f"训练完成,耗时: {train_time:.2f} 秒")
            
            # 保存训练结果
            self.metrics["train"] = {
                "global_step": train_result.global_step,
                "training_loss": train_result.training_loss,
                "train_time": train_time
            }
            
            # 记录学习率变化
            if hasattr(self.trainer, "state") and hasattr(self.trainer.state, "log_history"):
                lr_history = []
                for log in self.trainer.state.log_history:
                    if "learning_rate" in log:
                        lr_history.append({
                            "step": log.get("step", 0),
                            "learning_rate": log["learning_rate"]
                        })
                self.metrics["lr_history"] = lr_history
            
            return True
        except Exception as e:
            print(f"训练失败: {e}")
            return False
    
    def evaluate(self, dataset=None):
        """
        评估微调后的模型
        
        参数:
        dataset: 可选的评估数据集,如果不提供则使用训练器中的评估集
        """
        try:
            if self.trainer is None:
                print("错误: 训练器未创建")
                return False
            
            print("开始评估模型...")
            start_time = time.time()
            
            # 执行评估
            eval_result = self.trainer.evaluate(eval_dataset=dataset)
            
            eval_time = time.time() - start_time
            print(f"评估完成,耗时: {eval_time:.2f} 秒")
            
            # 保存评估结果
            self.metrics["eval"] = eval_result
            self.metrics["eval_time"] = eval_time
            
            # 打印评估指标
            print("评估指标:")
            for key, value in eval_result.items():
                print(f"  - {key}: {value}")
            
            return True
        except Exception as e:
            print(f"评估失败: {e}")
            return False
    
    def save_model(self, output_dir=None):
        """
        保存微调后的模型
        
        参数:
        output_dir: 可选的输出目录,如果不提供则使用配置中的输出目录
        """
        try:
            # 确定输出目录
            save_dir = output_dir or self.config["output_dir"]
            os.makedirs(save_dir, exist_ok=True)
            
            print(f"正在保存模型到: {save_dir}")
            
            # 保存模型和分词器
            self.trainer.save_model(save_dir)
            self.tokenizer.save_pretrained(save_dir)
            
            # 保存训练配置和指标
            with open(os.path.join(save_dir, "training_config.json"), 'w', encoding='utf-8') as f:
                json.dump(self.config, f, ensure_ascii=False, indent=2)
            
            with open(os.path.join(save_dir, "training_metrics.json"), 'w', encoding='utf-8') as f:
                json.dump(self.metrics, f, ensure_ascii=False, indent=2)
            
            # 生成训练报告
            self._generate_training_report(save_dir)
            
            print("模型保存完成")
            return True
        except Exception as e:
            print(f"保存模型失败: {e}")
            return False
    
    def _generate_training_report(self, output_dir):
        """
        生成训练报告
        
        参数:
        output_dir: 输出目录路径
        """
        try:
            # 创建报告内容
            report = "# 法律领域模型微调报告\n\n"
            
            # 添加基本信息
            report += "## 基本信息\n\n"
            report += f"- 模型名称: {self.config['model_name']}\n"
            report += f"- 任务类型: {self.config['task_type']}\n"
            report += f"- 微调时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}\n"
            
            # 添加训练参数
            report += "\n## 训练参数\n\n"
            for key, value in self.config.items():
                report += f"- {key}: {value}\n"
            
            # 添加训练结果
            report += "\n## 训练结果\n\n"
            if "train" in self.metrics:
                train_metrics = self.metrics["train"]
                report += f"- 训练步数: {train_metrics.get('global_step', 'N/A')}\n"
                report += f"- 训练损失: {train_metrics.get('training_loss', 'N/A')}\n"
                report += f"- 训练时间: {train_metrics.get('train_time', 'N/A'):.2f} 秒\n"
            
            # 添加评估结果
            if "eval" in self.metrics:
                report += "\n## 评估结果\n\n"
                for key, value in self.metrics["eval"].items():
                    report += f"- {key}: {value}\n"
            
            # 保存报告
            with open(os.path.join(output_dir, "training_report.md"), 'w', encoding='utf-8') as f:
                f.write(report)
            
        except Exception as e:
            print(f"生成训练报告失败: {e}")
    
    def plot_training_history(self, output_dir=None):
        """
        绘制训练历史图表
        
        参数:
        output_dir: 可选的输出目录,如果不提供则使用配置中的输出目录
        """
        try:
            # 确定输出目录
            save_dir = output_dir or self.config["output_dir"]
            os.makedirs(save_dir, exist_ok=True)
            
            # 获取训练历史
            if not hasattr(self.trainer, "state") or not hasattr(self.trainer.state, "log_history"):
                print("没有训练历史数据")
                return False
            
            log_history = self.trainer.state.log_history
            
            # 提取损失值
            train_loss = []
            eval_loss = []
            steps = []
            eval_steps = []
            
            for log in log_history:
                if "loss" in log and "eval_loss" not in log:
                    train_loss.append(log["loss"])
                    steps.append(log.get("step", len(steps)))
                elif "eval_loss" in log:
                    eval_loss.append(log["eval_loss"])
                    eval_steps.append(log.get("step", len(eval_steps) * self.config["logging_steps"]))
            
            # 绘制损失曲线
            plt.figure(figsize=(12, 6))
            plt.plot(steps, train_loss, label="训练损失")
            if eval_loss:
                plt.plot(eval_steps, eval_loss, label="评估损失")
            plt.xlabel("步数")
            plt.ylabel("损失")
            plt.title("法律领域模型训练损失曲线")
            plt.legend()
            plt.grid(True)
            
            # 保存图表
            loss_plot_path = os.path.join(save_dir, "loss_curve.png")
            plt.savefig(loss_plot_path)
            plt.close()
            
            # 如果有分类指标,绘制分类指标曲线
            if self.config["task_type"] == "seq_classification" and "eval" in self.metrics:
                eval_metrics = self.metrics["eval"]
                
                # 提取评估指标
                metrics_to_plot = ["eval_accuracy", "eval_precision", "eval_recall", "eval_f1"]
                
                plt.figure(figsize=(12, 6))
                for metric in metrics_to_plot:
                    if metric in eval_metrics:
                        plt.plot([eval_steps[-1]], [eval_metrics[metric]], 'o', label=metric.replace("eval_", ""))
                
                plt.xlabel("步数")
                plt.ylabel("指标值")
                plt.title("法律领域模型评估指标")
                plt.legend()
                plt.grid(True)
                plt.ylim(0, 1)
                
                # 保存图表
                metrics_plot_path = os.path.join(save_dir, "eval_metrics.png")
                plt.savefig(metrics_plot_path)
                plt.close()
            
            print(f"训练历史图表已保存到 {save_dir}")
            return True
        except Exception as e:
            print(f"绘制训练历史失败: {e}")
            return False
    
    def push_to_hub(self, model_id=None, token=None):
        """
        将微调后的模型推送到Hugging Face Hub
        
        参数:
        model_id: 可选的模型ID,如果不提供则使用配置中的模型ID
        token: 可选的访问令牌,如果不提供则使用配置中的令牌
        """
        try:
            # 确定模型ID和令牌
            hub_model_id = model_id or self.config["hub_model_id"]
            hub_token = token or self.config["hub_token"]
            
            if not hub_model_id:
                print("错误: 必须提供模型ID")
                return False
            
            if not hub_token:
                print("警告: 未提供访问令牌,可能无法推送")
            
            print(f"正在推送到Hugging Face Hub: {hub_model_id}")
            
            # 推送模型
            self.trainer.push_to_hub(
                repo_path_or_name=hub_model_id,
                use_auth_token=hub_token
            )
            
            print("模型推送完成")
            return True
        except Exception as e:
            print(f"推送模型失败: {e}")
            return False
    
    def run_legal_prediction(self, text, max_length=100, num_return_sequences=1):
        """
        使用微调后的模型进行法律文本预测
        
        参数:
        text: 输入文本
        max_length: 生成文本的最大长度
        num_return_sequences: 返回的序列数量
        """
        try:
            if self.model is None or self.tokenizer is None:
                print("错误: 模型或分词器未加载")
                return None
            
            # 确保模型处于评估模式
            self.model.eval()
            
            # 分词
            inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
            inputs = {k: v.to(self.device) for k, v in inputs.items()}
            
            # 生成预测
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_length=len(inputs["input_ids"][0]) + max_length,
                    num_return_sequences=num_return_sequences,
                    temperature=0.7,  # 控制生成的随机性
                    top_p=0.95,  # 核采样
                    repetition_penalty=1.2,  # 避免重复
                    do_sample=True,
                    pad_token_id=self.tokenizer.pad_token_id
                )
            
            # 解码生成的文本
            predictions = []
            for output in outputs:
                prediction = self.tokenizer.decode(output, skip_special_tokens=True)
                predictions.append(prediction)
            
            return predictions
        except Exception as e:
            print(f"预测失败: {e}")
            return None
    
    def run_legal_classification(self, text):
        """
        使用微调后的模型进行法律文本分类
        
        参数:
        text: 输入文本
        """
        try:
            if self.model is None or self.tokenizer is None:
                print("错误: 模型或分词器未加载")
                return None
            
            if self.config["task_type"] != "seq_classification":
                print("错误: 模型不是分类模型")
                return None
            
            # 确保模型处于评估模式
            self.model.eval()
            
            # 分词
            inputs = self.tokenizer(text, return_tensors="pt", truncation=True, max_length=512)
            inputs = {k: v.to(self.device) for k, v in inputs.items()}
            
            # 进行分类
            with torch.no_grad():
                outputs = self.model(**inputs)
                logits = outputs.logits
                probabilities = torch.nn.functional.softmax(logits, dim=-1)
            
            # 获取预测结果
            predicted_class = torch.argmax(logits, dim=-1).item()
            confidence = probabilities[0, predicted_class].item()
            
            return {
                "predicted_class": predicted_class,
                "confidence": confidence,
                "probabilities": probabilities.tolist()[0]
            }
        except Exception as e:
            print(f"分类失败: {e}")
            return None

# 使用示例
if __name__ == "__main__":
    # 创建法律微调器实例
    finetuner = LegalFinetuner({
        "model_name": "uer/gpt2-chinese-cluecorpussmall",
        "task_type": "causal_lm",
        "output_dir": "./legal_finetuned_gpt2",
        "num_train_epochs": 3,
        "per_device_train_batch_size": 4,
        "per_device_eval_batch_size": 4,
        "learning_rate": 5e-5,
        "weight_decay": 0.01,
        "warmup_ratio": 0.1,
        "save_strategy": "epoch",
        "evaluation_strategy": "epoch",
        "logging_steps": 100,
        "seed": 42,
        "fp16": True,
        "gradient_accumulation_steps": 4
    })
    
    # 加载模型
    finetuner.load_model()
    
    # 加载数据集
    dataset = finetuner.load_dataset("./legal_tokenized_dataset")
    
    if dataset is not None:
        # 创建训练参数
        training_args = finetuner.create_training_args()
        
        # 创建数据收集器
        data_collator = finetuner.create_data_collator()
        
        # 创建训练器
        finetuner.create_trainer(dataset, training_args, data_collator)
        
        # 执行训练
        finetuner.train()
        
        # 评估模型
        finetuner.evaluate()
        
        # 保存模型
        finetuner.save_model()
        
        # 绘制训练历史
        finetuner.plot_training_history()
        
        # 示例:使用微调后的模型进行法律文本生成
        if finetuner.config["task_type"] == "causal_lm":
            predictions = finetuner.run_legal_prediction(
                "根据《民法典》第一千零七十九条,夫妻一方要求离婚的,可以",
                max_length=150,
                num_return_sequences=1
            )
            if predictions:
                print("\n生成的法律文本:")
                print(predictions[0])
        
        # 示例:使用微调后的模型进行法律文本分类
        elif finetuner.config["task_type"] == "seq_classification":
            result = finetuner.run_legal_classification(
                "本院认为,原告与被告之间的合同关系依法成立,被告未按照合同约定履行义务,应当承担违约责任。"
            )
            if result:
                print("\n法律文本分类结果:")
                print(f"预测类别: {result['predicted_class']}")
                print(f"置信度: {result['confidence']:.4f}")
                print(f"各类别概率: {result['probabilities']}")

### 4.4.1 词汇优化评估器使用示例

下面是一个完整的词汇优化评估器使用示例,展示如何评估词汇优化对模型性能的影响:

```python
# 词汇优化评估器使用示例
if __name__ == "__main__":
    # 创建评估器实例
    evaluator = VocabularyOptimizationEvaluator()
    
    # 准备评估数据(模拟数据,实际使用时应替换为真实评估结果)
    baseline_results = {
        "vocabulary_coverage": 0.75,
        "term_recognition_rate": 0.68,
        "accuracy": 0.82,
        "precision": 0.79,
        "recall": 0.81,
        "f1_score": 0.80,
        "inference_time": 1.2  # 秒
    }
    
    optimized_results = {
        "vocabulary_coverage": 0.88,
        "term_recognition_rate": 0.83,
        "accuracy": 0.87,
        "precision": 0.86,
        "recall": 0.85,
        "f1_score": 0.85,
        "inference_time": 1.3  # 秒
    }
    
    # 保存评估数据到文件
    import json
    with open("baseline_evaluation.json", "w", encoding="utf-8") as f:
        json.dump(baseline_results, f, ensure_ascii=False, indent=2)
    
    with open("optimized_evaluation.json", "w", encoding="utf-8") as f:
        json.dump(optimized_results, f, ensure_ascii=False, indent=2)
    
    # 加载评估数据
    evaluator.load_baseline_results("baseline_evaluation.json")
    evaluator.load_optimized_results("optimized_evaluation.json")
    
    # 生成评估报告
    report = evaluator.generate_evaluation_report("vocabulary_optimization_report.json")
    
    # 输出评估摘要
    if report and "summary" in report:
        print("\n词汇优化评估摘要:")
        print(f"总体评估: {report['summary']['overall_assessment']}")
        
        print("\n主要改进:")
        for metric, imp in report['summary']['major_improvements']:
            print(f"  - {metric}: {imp:.2%} 变化")
        
        print("\n次要改进:")
        for metric, imp in report['summary']['minor_improvements']:
            print(f"  - {metric}: {imp:.2%} 变化")
        
        print("\n性能退化:")
        for metric, imp in report['summary']['regressions']:
            print(f"  - {metric}: {imp:.2%} 变化")
    
    # 可视化优化效果
    evaluator.visualize_improvements("./evaluation_visualizations")

4.5 词汇优化的最佳实践

在实施词汇表优化时,以下最佳实践可以帮助您获得更好的效果:

4.5.1 多阶段优化策略

采用多阶段优化策略可以逐步提升词汇表质量:

  1. 数据收集与分析阶段:收集领域语料,分析词汇使用模式
  2. 术语提取阶段:使用多种方法提取领域术语
  3. 重要性评估阶段:基于频率、上下文和语义重要性评估术语
  4. 优化实施阶段:执行词汇表扩展和优化
  5. 评估与迭代阶段:评估效果并迭代优化

4.5.2 多阶段词汇优化器使用示例

下面是一个完整的多阶段词汇优化器使用示例,展示如何在法律领域实施词汇优化的完整流程:

# 多阶段词汇优化器使用示例
if __name__ == "__main__":
    # 创建多阶段词汇优化器实例
    optimizer = MultiStageVocabularyOptimizer(domain="法律")
    
    # 准备法律领域语料路径(示例路径,实际应用中替换为真实路径)
    legal_corpus_paths = [
        "./legal_corpus/contracts.txt",
        "./legal_corpus/judgments.txt",
        "./legal_corpus/regulations.txt"
    ]
    
    # 阶段1: 数据收集与分析
    if optimizer.stage1_data_collection(legal_corpus_paths):
        print("阶段1完成: 数据收集与分析成功")
    else:
        print("阶段1失败: 数据收集与分析出错")
        exit(1)
    
    # 阶段2: 术语提取
    # 使用多种提取方法:频率分析、语义聚类、规则提取
    if optimizer.stage2_term_extraction(['frequency', 'semantic', 'rule_based']):
        print("阶段2完成: 术语提取成功")
    else:
        print("阶段2失败: 术语提取出错")
        exit(1)
    
    # 阶段3: 重要性评估
    # 设置评估参数
    importance_params = {
        "min_frequency": 10,
        "context_weight": 0.4,
        "domain_relevance": True,
        "max_terms": 500
    }
    
    if optimizer.stage3_importance_assessment(importance_params):
        print("阶段3完成: 重要性评估成功")
    else:
        print("阶段3失败: 重要性评估出错")
        exit(1)
    
    # 阶段4: 优化实施
    # 设置优化参数
    optimization_params = {
        "base_vocabulary_size": 30000,
        "max_extension_size": 1000,
        "merge_identical_vectors": True,
        "vocabulary_saving_path": "./optimized_vocabulary/"
    }
    
    if optimizer.stage4_optimization_implementation(optimization_params):
        print("阶段4完成: 优化实施成功")
    else:
        print("阶段4失败: 优化实施出错")
        exit(1)
    
    # 阶段5: 评估与迭代
    # 设置评估参数
    evaluation_params = {
        "baseline_model_path": "./base_legal_model",
        "optimized_model_path": "./optimized_legal_model",
        "evaluation_dataset": "./eval_dataset",
        "metrics": ["vocabulary_coverage", "term_recognition", "inference_quality"],
        "max_iterations": 3
    }
    
    # 运行评估并获取结果
    evaluation_results = optimizer.stage5_evaluation_iteration(evaluation_params)
    if evaluation_results:
        print("阶段5完成: 评估与迭代成功")
        
        # 输出评估结果摘要
        print("\n评估结果摘要:")
        print(f"词汇覆盖率提升: {evaluation_results.get('vocabulary_coverage_improvement', 0):.2%}")
        print(f"术语识别率提升: {evaluation_results.get('term_recognition_improvement', 0):.2%}")
        print(f"推理质量提升: {evaluation_results.get('inference_quality_improvement', 0):.2%}")
        print(f"是否达到优化目标: {'是' if evaluation_results.get('target_achieved', False) else '否'}")
    else:
        print("阶段5失败: 评估与迭代出错")
        exit(1)
    
    # 或者使用完整流程方法
    print("\n\n使用完整流程方法重新运行...")
    full_params = {
        "corpus_paths": legal_corpus_paths,
        "extraction_methods": ['frequency', 'semantic', 'rule_based'],
        "importance_params": importance_params,
        "optimization_params": optimization_params,
        "evaluation_params": evaluation_params,
        "auto_optimize": True,
        "save_results": True,
        "output_dir": "./vocabulary_optimization_results/"
    }
    
    # 运行完整优化流程
    final_results = optimizer.run_full_pipeline(full_params)
    if final_results and final_results.get('success', False):
        print("\n完整优化流程执行成功!")
        print("\n最终优化结果:")
        print(f"词汇表大小: {final_results.get('vocabulary_size', '未知')}")
        print(f"新增术语数量: {final_results.get('new_terms_added', '未知')}")
        print(f"平均性能提升: {final_results.get('average_improvement', '未知'):.2%}")
        print(f"优化后词汇表路径: {final_results.get('vocabulary_path', '未知')}")
        print(f"详细报告路径: {final_results.get('report_path', '未知')}")
        
        # 总结优化建议
        print("\n优化建议:")
        for recommendation in final_results.get('recommendations', []):
            print(f"- {recommendation}")
    else:
        print("完整优化流程执行失败")
    
    print("\n词汇优化流程完成")
```python
class MultiStageVocabularyOptimizer:
    """
    多阶段词汇优化器 - 领域词汇表优化的核心实现
    """
    def __init__(self, domain):
        self.domain = domain
        self.stage_results = {}
        self.extracted_terms = []
        self.important_terms = []
        self.optimized_vocabulary = None
    
    def stage1_data_collection(self, corpus_paths, output_file="corpus_analysis.json"):
        """
        阶段1: 数据收集与分析
        
        参数:
        corpus_paths: 语料文件路径列表
        output_file: 分析结果输出文件
        
        返回:
        bool: 是否成功完成
        """
        try:
            print(f"开始阶段1: 数据收集与分析 - {self.domain}领域")
            print(f"处理语料文件数量: {len(corpus_paths)}")
            
            # 模拟数据收集与分析
            # 实际应用中应读取文件并进行统计分析
            corpus_stats = {
                "total_files": len(corpus_paths),
                "estimated_tokens": sum([i * 10000 for i in range(1, len(corpus_paths)+1)]),
                "domain_keywords": [],
                "language_distribution": {"zh": 95, "en": 5}
            }
            
            # 保存分析结果
            import json
            with open(output_file, 'w', encoding='utf-8') as f:
                json.dump(corpus_stats, f, ensure_ascii=False, indent=2)
            
            print(f"语料分析完成,结果保存至: {output_file}")
            print(f"阶段1完成")
            self.stage_results['stage1'] = "数据收集与分析完成"
            return True
        except Exception as e:
            print(f"阶段1失败: {str(e)}")
            return False
    
    def stage2_term_extraction(self, extraction_methods=['frequency', 'semantic', 'rule_based']):
        """
        阶段2: 术语提取
        
        参数:
        extraction_methods: 使用的提取方法列表
        
        返回:
        bool: 是否成功完成
        """
        try:
            print(f"开始阶段2: 术语提取 - {self.domain}领域")
            print(f"使用提取方法: {', '.join(extraction_methods)}")
            
            # 模拟术语提取
            # 实际应用中应根据不同方法提取术语
            method_terms = {
                'frequency': [f"{self.domain}术语{i:03d}" for i in range(1, 101)],
                'semantic': [f"{self.domain}概念{i:03d}" for i in range(101, 201)],
                'rule_based': [f"{self.domain}规则{i:03d}" for i in range(201, 301)]
            }
            
            # 合并术语并去重
            all_terms = set()
            for method in extraction_methods:
                if method in method_terms:
                    all_terms.update(method_terms[method])
            
            self.extracted_terms = list(all_terms)
            print(f"提取的术语数量: {len(self.extracted_terms)}")
            print(f"阶段2完成")
            self.stage_results['stage2'] = "术语提取完成"
            return True
        except Exception as e:
            print(f"阶段2失败: {str(e)}")
            return False
    
    def stage3_importance_assessment(self, importance_params):
        """
        阶段3: 重要性评估
        
        参数:
        importance_params: 评估参数字典
        
        返回:
        bool: 是否成功完成
        """
        try:
            print(f"开始阶段3: 重要性评估 - {self.domain}领域")
            print(f"评估参数: {importance_params}")
            
            # 模拟重要性评估
            # 实际应用中应根据参数计算术语重要性
            min_freq = importance_params.get("min_frequency", 5)
            max_terms = importance_params.get("max_terms", 500)
            
            # 模拟重要性排序并选择
            important_terms_count = min(len(self.extracted_terms), max_terms)
            self.important_terms = self.extracted_terms[:important_terms_count]
            
            print(f"筛选出的重要术语数量: {len(self.important_terms)}")
            print(f"阶段3完成")
            self.stage_results['stage3'] = "重要性评估完成"
            return True
        except Exception as e:
            print(f"阶段3失败: {str(e)}")
            return False
    
    def stage4_optimization_implementation(self, optimization_params):
        """
        阶段4: 优化实施
        
        参数:
        optimization_params: 优化参数字典
        
        返回:
        bool: 是否成功完成
        """
        try:
            print(f"开始阶段4: 优化实施 - {self.domain}领域")
            print(f"优化参数: {optimization_params}")
            
            # 模拟词汇优化实施
            base_size = optimization_params.get("base_vocabulary_size", 30000)
            save_path = optimization_params.get("vocabulary_saving_path", "./")
            
            # 创建保存目录
            import os
            os.makedirs(save_path, exist_ok=True)
            
            # 模拟优化后的词汇表
            self.optimized_vocabulary = {
                "base_size": base_size,
                "extended_terms": self.important_terms,
                "total_size": base_size + len(self.important_terms)
            }
            
            # 保存优化结果
            vocab_file = os.path.join(save_path, f"{self.domain.lower()}_optimized_vocabulary.json")
            with open(vocab_file, 'w', encoding='utf-8') as f:
                import json
                json.dump(self.optimized_vocabulary, f, ensure_ascii=False, indent=2)
            
            print(f"优化后的词汇表保存至: {vocab_file}")
            print(f"词汇表总大小: {self.optimized_vocabulary['total_size']}")
            print(f"新增术语数量: {len(self.important_terms)}")
            print(f"阶段4完成")
            self.stage_results['stage4'] = "优化实施完成"
            return True
        except Exception as e:
            print(f"阶段4失败: {str(e)}")
            return False
    
    def stage5_evaluation_iteration(self, evaluation_params):
        """
        阶段5: 评估与迭代
        
        参数:
        evaluation_params: 评估参数字典
        
        返回:
        dict: 评估结果
        """
        try:
            print(f"开始阶段5: 评估与迭代 - {self.domain}领域")
            print(f"评估参数: {evaluation_params}")
            
            # 模拟评估过程
            metrics = evaluation_params.get("metrics", ["vocabulary_coverage"])
            max_iterations = evaluation_params.get("max_iterations", 3)
            
            # 模拟评估结果
            evaluation_results = {
                "vocabulary_coverage_improvement": 0.15,  # 15%提升
                "term_recognition_improvement": 0.22,    # 22%提升
                "inference_quality_improvement": 0.08,   # 8%提升
                "target_achieved": True,
                "iterations_run": 1,
                "max_iterations": max_iterations
            }
            
            # 保存评估结果
            eval_file = "vocabulary_optimization_evaluation.json"
            with open(eval_file, 'w', encoding='utf-8') as f:
                import json
                json.dump(evaluation_results, f, ensure_ascii=False, indent=2)
            
            print(f"评估结果保存至: {eval_file}")
            print(f"阶段5完成")
            self.stage_results['stage5'] = "评估与迭代完成"
            return evaluation_results
        except Exception as e:
            print(f"阶段5失败: {str(e)}")
            return None
    
    def run_full_pipeline(self, params):
        """
        运行完整的优化流程
        
        参数:
        params: 完整流程参数
        
        返回:
        dict: 最终结果
        """
        try:
            print(f"开始完整优化流程 - {self.domain}领域")
            
            # 创建输出目录
            output_dir = params.get("output_dir", "./vocabulary_optimization_results/")
            import os
            os.makedirs(output_dir, exist_ok=True)
            
            # 运行各阶段
            stage1_success = self.stage1_data_collection(
                params["corpus_paths"],
                os.path.join(output_dir, "stage1_corpus_analysis.json")
            )
            
            if not stage1_success:
                return {"success": False, "error": "阶段1失败"}
            
            stage2_success = self.stage2_term_extraction(params["extraction_methods"])
            if not stage2_success:
                return {"success": False, "error": "阶段2失败"}
            
            stage3_success = self.stage3_importance_assessment(params["importance_params"])
            if not stage3_success:
                return {"success": False, "error": "阶段3失败"}
            
            # 更新保存路径为输出目录
            params["optimization_params"]["vocabulary_saving_path"] = os.path.join(output_dir, "vocabulary/")
            stage4_success = self.stage4_optimization_implementation(params["optimization_params"])
            if not stage4_success:
                return {"success": False, "error": "阶段4失败"}
            
            evaluation_results = self.stage5_evaluation_iteration(params["evaluation_params"])
            if not evaluation_results:
                return {"success": False, "error": "阶段5失败"}
            
            # 生成优化建议
            recommendations = [
                f"考虑进一步扩展{self.domain}领域专业术语集",
                "定期更新词汇表以适应领域发展",
                "针对低频但重要的术语考虑特殊处理策略"
            ]
            
            # 计算平均改进
            improvements = [
                evaluation_results.get('vocabulary_coverage_improvement', 0),
                evaluation_results.get('term_recognition_improvement', 0),
                evaluation_results.get('inference_quality_improvement', 0)
            ]
            average_improvement = sum(improvements) / len(improvements)
            
            # 生成最终报告
            final_report = {
                "success": True,
                "vocabulary_size": self.optimized_vocabulary['total_size'],
                "new_terms_added": len(self.important_terms),
                "average_improvement": average_improvement,
                "vocabulary_path": params["optimization_params"]["vocabulary_saving_path"],
                "report_path": output_dir,
                "recommendations": recommendations,
                "evaluation_results": evaluation_results,
                "stage_results": self.stage_results
            }
            
            # 保存最终报告
            report_file = os.path.join(output_dir, "final_optimization_report.json")
            with open(report_file, 'w', encoding='utf-8') as f:
                import json
                json.dump(final_report, f, ensure_ascii=False, indent=2)
            
            print(f"完整优化流程完成,最终报告保存至: {report_file}")
            return final_report
        except Exception as e:
            print(f"完整优化流程失败: {str(e)}")
            return {"success": False, "error": str(e)}

### 5.2.2 法律领域微调器实现

法律领域微调器需要针对法律文本的特殊性进行优化,包括术语处理、法律逻辑理解等关键能力。以下是LegalFinetuner类的完整实现:

```python
class LegalFinetuner:
    """
    法律领域模型微调器
    """
    def __init__(self, model_name_or_path, config=None):
        """
        初始化法律领域微调器
        
        参数:
        model_name_or_path: 预训练模型名称或路径
        config: 微调配置参数
        """
        self.model_name = model_name_or_path
        self.config = config or {
            "learning_rate": 2e-5,
            "batch_size": 16,
            "num_epochs": 3,
            "max_seq_length": 512,
            "save_steps": 1000,
            "eval_steps": 500,
            "legal_weight_decay": 0.01,
            "legal_task_type": "classification"  # classification, qa, generation, retrieval
        }
        self.model = None
        self.tokenizer = None
        self.trainer = None
        self.logger = self._setup_logger()
    
    def _setup_logger(self):
        """
        设置日志记录器
        """
        import logging
        logger = logging.getLogger("legal_finetuner")
        logger.setLevel(logging.INFO)
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
            handler.setFormatter(formatter)
            logger.addHandler(handler)
        return logger
    
    def load_model(self):
        """
        加载预训练模型和分词器
        """
        from transformers import AutoModelForSequenceClassification, AutoModelForQuestionAnswering, \
                               AutoModelForCausalLM, AutoTokenizer
        
        try:
            # 根据任务类型加载相应的模型
            task_type = self.config.get("legal_task_type", "classification")
            
            if task_type == "classification":
                self.model = AutoModelForSequenceClassification.from_pretrained(
                    self.model_name,
                    num_labels=self.config.get("num_labels", 2)
                )
            elif task_type == "qa":
                self.model = AutoModelForQuestionAnswering.from_pretrained(self.model_name)
            elif task_type in ["generation", "retrieval"]:
                self.model = AutoModelForCausalLM.from_pretrained(self.model_name)
            else:
                raise ValueError(f"不支持的任务类型: {task_type}")
            
            # 加载分词器
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            
            # 法律领域特殊处理
            if hasattr(self.model, "config") and hasattr(self.model.config, "dropout"):
                # 为法律领域文本增加dropout以提高泛化性
                self.model.config.dropout = self.config.get("legal_dropout", 0.15)
            
            self.logger.info(f"已加载模型: {self.model_name},任务类型: {task_type}")
            return True
        except Exception as e:
            self.logger.error(f"加载模型失败: {e}")
            return False
    
    def prepare_trainer(self, train_dataset, eval_dataset=None):
        """
        准备训练器
        
        参数:
        train_dataset: 训练数据集
        eval_dataset: 评估数据集
        """
        from transformers import Trainer, TrainingArguments
        import torch
        
        try:
            # 法律领域特有的训练参数
            training_args = TrainingArguments(
                output_dir=f"./legal_model_output_{self.config['legal_task_type']}",
                learning_rate=self.config["learning_rate"],
                per_device_train_batch_size=self.config["batch_size"],
                per_device_eval_batch_size=self.config["batch_size"] * 2,
                num_train_epochs=self.config["num_epochs"],
                weight_decay=self.config["legal_weight_decay"],
                save_steps=self.config["save_steps"],
                eval_steps=self.config["eval_steps"],
                logging_steps=100,
                evaluation_strategy="steps" if eval_dataset else "no",
                save_total_limit=3,
                load_best_model_at_end=True if eval_dataset else False,
                metric_for_best_model="eval_loss" if eval_dataset else None,
                fp16=torch.cuda.is_available(),
                gradient_accumulation_steps=self.config.get("gradient_accumulation_steps", 1),
                # 法律领域特有的参数
                lr_scheduler_type="linear",  # 法律领域通常使用线性衰减
                warmup_ratio=0.1  # 法律领域通常需要更多预热
            )
            
            # 法律领域特有的指标函数
            def compute_metrics(eval_pred):
                from datasets import load_metric
                import numpy as np
                
                logits, labels = eval_pred
                
                if self.config["legal_task_type"] == "classification":
                    metric = load_metric("accuracy")
                    predictions = np.argmax(logits, axis=-1)
                    return metric.compute(predictions=predictions, references=labels)
                elif self.config["legal_task_type"] == "qa":
                    # 问答任务指标计算
                    metric = load_metric("squad")
                    # 此处应添加法律问答特有的指标计算逻辑
                    return {"f1": 0.0, "exact_match": 0.0}  # 占位符
                else:
                    return {"loss": np.mean(labels)}  # 其他任务返回损失值
            
            self.trainer = Trainer(
                model=self.model,
                args=training_args,
                train_dataset=train_dataset,
                eval_dataset=eval_dataset,
                tokenizer=self.tokenizer,
                compute_metrics=compute_metrics if self.config["legal_task_type"] == "classification" else None
            )
            
            self.logger.info("训练器准备完成")
            return True
        except Exception as e:
            self.logger.error(f"准备训练器失败: {e}")
            return False
    
    def fine_tune(self):
        """
        执行模型微调
        """
        try:
            if self.trainer is None:
                raise ValueError("请先调用prepare_trainer方法")
            
            self.logger.info("开始法律领域模型微调")
            train_result = self.trainer.train()
            
            # 记录训练结果
            metrics = train_result.metrics
            self.logger.info(f"微调完成,训练指标: {metrics}")
            
            # 保存训练结果
            self.trainer.save_model()
            self.trainer.log_metrics("train", metrics)
            self.trainer.save_metrics("train", metrics)
            self.trainer.save_state()
            
            return metrics
        except Exception as e:
            self.logger.error(f"微调失败: {e}")
            return None
    
    def evaluate(self, dataset):
        """
        评估微调后的模型
        
        参数:
        dataset: 用于评估的数据集
        """
        try:
            if self.trainer is None:
                raise ValueError("请先调用prepare_trainer方法")
            
            self.logger.info("开始评估模型")
            eval_result = self.trainer.evaluate(eval_dataset=dataset)
            
            # 记录评估结果
            self.logger.info(f"评估完成,评估指标: {eval_result}")
            
            # 保存评估结果
            self.trainer.log_metrics("eval", eval_result)
            self.trainer.save_metrics("eval", eval_result)
            
            # 法律领域特有的评估
            self._legal_domain_specific_evaluation(dataset)
            
            return eval_result
        except Exception as e:
            self.logger.error(f"评估失败: {e}")
            return None
    
    def _legal_domain_specific_evaluation(self, dataset):
        """
        法律领域特有的评估方法
        
        参数:
        dataset: 用于评估的数据集
        """
        try:
            # 计算法律术语覆盖率
            # 检查对特定法律概念的理解
            # 评估法律推理能力
            self.logger.info("执行法律领域特有评估完成")
        except Exception as e:
            self.logger.error(f"法律领域特有评估失败: {e}")
    
    def save_model(self, output_dir=None):
        """
        保存微调后的模型
        
        参数:
        output_dir: 输出目录,如果为None则使用默认目录
        """
        try:
            save_dir = output_dir or f"./legal_model_final_{self.config['legal_task_type']}"
            
            # 保存模型和分词器
            self.model.save_pretrained(save_dir)
            self.tokenizer.save_pretrained(save_dir)
            
            # 保存配置
            import json
            with open(f"{save_dir}/finetuning_config.json", "w", encoding="utf-8") as f:
                json.dump(self.config, f, ensure_ascii=False, indent=2)
            
            self.logger.info(f"模型已保存到: {save_dir}")
            return save_dir
        except Exception as e:
            self.logger.error(f"保存模型失败: {e}")
            return None
    
    def push_to_hub(self, repo_name, organization=None):
        """
        将模型推送到Hugging Face Hub
        
        参数:
        repo_name: 仓库名称
        organization: 组织名称(可选)
        """
        try:
            # 构建完整的仓库名称
            repo_id = f"{organization}/{repo_name}" if organization else repo_name
            
            # 推送到Hub
            self.model.push_to_hub(repo_id)
            self.tokenizer.push_to_hub(repo_id)
            
            self.logger.info(f"模型已推送到Hugging Face Hub: {repo_id}")
            return repo_id
        except Exception as e:
            self.logger.error(f"推送模型到Hub失败: {e}")
            return None
5.2.3 法律领域微调使用示例

下面是法律领域微调的完整使用示例,展示如何从数据准备到模型评估的完整流程:

# 法律领域微调使用示例
if __name__ == "__main__":
    # 1. 数据准备 - 使用前面定义的LegalDataset类
    from legal_dataset import LegalDataset
    
    # 创建数据集构建器
    dataset_config = {
        "max_seq_length": 512,
        "test_size": 0.1,
        "validation_size": 0.1,
        "seed": 42,
        "legal_text_types": ["law", "regulation", "case", "judgment", "other"],
        "min_text_length": 100
    }
    
    dataset_builder = LegalDataset(dataset_config)
    
    # 加载并预处理法律数据
    print("加载法律文本数据...")
    df = dataset_builder.load_raw_data("./legal_texts")
    df = dataset_builder.preprocess_legal_text(df)
    dataset_builder.split_dataset(df)
    
    # 创建Hugging Face格式数据集
    hf_dataset = dataset_builder.create_huggingface_dataset()
    
    # 2. 加载分词器并分词
    print("分词处理...")
    from transformers import AutoTokenizer
    tokenizer = AutoTokenizer.from_pretrained("hfl/chinese-roberta-wwm-ext")
    tokenized_dataset = dataset_builder.tokenize_dataset(hf_dataset, tokenizer)
    
    # 3. 创建法律领域微调器
    print("初始化法律领域微调器...")
    from legal_finetuner import LegalFinetuner
    
    finetuner_config = {
        "learning_rate": 2e-5,
        "batch_size": 8,
        "num_epochs": 3,
        "max_seq_length": 512,
        "save_steps": 500,
        "eval_steps": 250,
        "legal_weight_decay": 0.01,
        "legal_task_type": "classification",
        "num_labels": 5  # 假设我们有5种法律文本类型
    }
    
    finetuner = LegalFinetuner(
        model_name_or_path="hfl/chinese-roberta-wwm-ext",
        config=finetuner_config
    )
    
    # 4. 加载模型
    print("加载预训练模型...")
    finetuner.load_model()
    
    # 5. 准备训练器
    print("准备训练器...")
    finetuner.prepare_trainer(
        train_dataset=tokenized_dataset["train"],
        eval_dataset=tokenized_dataset["validation"]
    )
    
    # 6. 执行微调
    print("开始法律领域微调...")
    training_results = finetuner.fine_tune()
    
    # 7. 评估模型
    print("评估微调后的模型...")
    eval_results = finetuner.evaluate(tokenized_dataset["test"])
    
    # 8. 保存模型
    print("保存微调后的模型...")
    saved_dir = finetuner.save_model("./legal_model_classification")
    
    # 9. 可选:推送到Hugging Face Hub
    # finetuner.push_to_hub("legal-text-classification-model")
    
    print("\n法律领域模型微调流程完成")
    print(f"训练结果: {training_results}")
    print(f"评估结果: {eval_results}")

5.3 医疗与法律领域微调对比

医疗与法律领域在模型微调方面有许多共同点和差异,以下是两者的详细对比:

对比维度 医疗领域 法律领域
数据特点 临床记录、医学文献、病历 法律法规、判例、法律文书
术语特点 专业医学术语、缩写密集 法律术语、法条编号、案例引用
微调重点 医学知识准确性、诊断能力 法律推理、合规性判断、文本分类
评估指标 医学准确率、敏感性、特异性 分类准确率、法律推理准确率
挑战 医疗数据隐私、专业知识深度 法律时效性、判例更新、解释性
优化策略 数据增强、多任务学习 法律文本特殊处理、逻辑推理增强
5.3.1 领域知识融合技巧

在进行跨领域微调时,可以采用以下领域知识融合技巧:

  1. 分阶段微调策略

    • 先在通用数据上预训练
    • 然后在大规模领域数据上进行领域适应
    • 最后在特定任务数据上进行微调
  2. 领域术语增强

    • 医疗领域:增强对医学术语、疾病名称、药物名称的识别
    • 法律领域:增强对法律术语、法条编号、案例引用的识别
  3. 跨领域知识迁移

    • 利用医疗领域的结构化知识组织方式提升法律领域文本理解
    • 利用法律领域的逻辑推理能力提升医疗领域的诊断推理

6.0 总结与展望

通过本文的学习,我们深入探讨了医疗与法律领域的大模型微调技术。主要内容包括:

  1. 领域微调基础:详细介绍了领域微调的基本概念、原理和流程
  2. 医疗领域微调:实现了医疗数据集构建和医疗领域微调器
  3. 法律领域微调:实现了法律数据集构建和法律领域微调器
  4. 词汇优化技术:探讨了词汇表优化的重要性和实施方法
  5. 领域对比分析:对比了医疗与法律领域微调的异同点

未来,领域微调技术有望在以下方向继续发展:

  1. 多模态领域微调:结合文本、图像等多种模态信息进行领域微调
  2. 持续学习与知识更新:实现模型在领域知识更新时的高效适应
  3. 跨领域知识融合:开发更有效的跨领域知识迁移方法
  4. 隐私保护领域微调:在保护敏感数据的前提下进行高效微调

通过不断优化和创新领域微调技术,我们可以更好地发挥大语言模型在特定领域的潜力,为医疗、法律等专业领域提供更智能、更准确的AI辅助工具。

4.5.2 医疗与法律领域的词汇优化策略对比

不同领域的词汇优化策略有所不同,以下是医疗与法律领域的关键差异:

优化维度 医疗领域策略 法律领域策略
术语来源 医学词典、临床指南、研究论文 法律法规、判例、法律词典
优先级因素 临床重要性、使用频率、专业性 法律有效性、权威性、时效性
变体处理 同义词、缩写、不同表述方式 术语变体、法律修订版本
特殊处理 药品名称、疾病编码 案例引用、法条编号
评估方法 医学准确率、专业覆盖率 法律准确性、合规性检查
4.5.3 词汇优化的常见问题与解决方案

在实施词汇表优化过程中,可能会遇到以下常见问题及解决方案:

  1. 词汇表膨胀问题

    • 问题:添加过多术语导致词汇表过大,影响模型性能
    • 解决方案:实施分层筛选机制,设置严格的重要性阈值,限制新增词汇数量
  2. 术语冲突问题

    • 问题:不同来源的术语可能存在冲突或不一致
    • 解决方案:建立术语规范化流程,确保术语的一致性和准确性
  3. 覆盖率与性能平衡

    • 问题:过度优化词汇表可能导致模型泛化能力下降
    • 解决方案:保持基础词汇与专业词汇的平衡,定期评估模型在通用任务上的表现
  4. 动态更新问题

    • 问题:领域术语会随时间演变,需要定期更新词汇表
    • 解决方案:建立术语监控机制,定期收集和分析新术语,实现词汇表的增量更新
Logo

更多推荐