1. 生态系统概述

1.1 Hadoop生态系统简介

Hadoop生态系统是一个庞大的开源软件集合,围绕Apache Hadoop核心组件构建,为大数据存储、处理、分析和管理提供了完整的解决方案。

from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime

class ComponentCategory(Enum):
    """组件类别"""
    STORAGE = "存储"
    PROCESSING = "处理"
    COORDINATION = "协调"
    WORKFLOW = "工作流"
    MONITORING = "监控"
    SECURITY = "安全"
    DATA_INGESTION = "数据摄取"
    QUERY_ENGINE = "查询引擎"
    MACHINE_LEARNING = "机器学习"
    STREAMING = "流处理"

class MaturityLevel(Enum):
    """成熟度级别"""
    INCUBATING = "孵化中"
    STABLE = "稳定"
    MATURE = "成熟"
    DEPRECATED = "已弃用"

@dataclass
class EcosystemComponent:
    """生态系统组件"""
    name: str
    category: ComponentCategory
    description: str
    key_features: List[str]
    use_cases: List[str]
    maturity: MaturityLevel
    dependencies: List[str]
    alternatives: List[str]
    learning_curve: str  # 学习曲线:简单、中等、困难
    popularity_score: int  # 流行度评分 1-10

class HadoopEcosystemGuide:
    """
    Hadoop生态系统指南
    """
    
    def __init__(self):
        self.components = self._initialize_components()
        self.integration_patterns = self._initialize_integration_patterns()
        self.architecture_templates = self._initialize_architecture_templates()
    
    def _initialize_components(self) -> List[EcosystemComponent]:
        """
        初始化生态系统组件
        
        Returns:
            List[EcosystemComponent]: 组件列表
        """
        components = [
            # 存储组件
            EcosystemComponent(
                name="HDFS",
                category=ComponentCategory.STORAGE,
                description="Hadoop分布式文件系统,提供高容错性的分布式存储",
                key_features=[
                    "分布式存储",
                    "高容错性",
                    "高吞吐量",
                    "数据复制",
                    "大文件优化"
                ],
                use_cases=[
                    "大数据存储",
                    "数据湖构建",
                    "批处理数据源",
                    "数据归档"
                ],
                maturity=MaturityLevel.MATURE,
                dependencies=["Java"],
                alternatives=["Amazon S3", "Google Cloud Storage", "Azure Blob Storage"],
                learning_curve="中等",
                popularity_score=9
            ),
            EcosystemComponent(
                name="HBase",
                category=ComponentCategory.STORAGE,
                description="基于HDFS的分布式、面向列的NoSQL数据库",
                key_features=[
                    "列式存储",
                    "实时读写",
                    "自动分片",
                    "强一致性",
                    "与Hadoop集成"
                ],
                use_cases=[
                    "实时数据访问",
                    "时序数据存储",
                    "大规模结构化数据",
                    "在线服务后端"
                ],
                maturity=MaturityLevel.MATURE,
                dependencies=["HDFS", "ZooKeeper"],
                alternatives=["Cassandra", "MongoDB", "DynamoDB"],
                learning_curve="困难",
                popularity_score=7
            ),
            
            # 处理组件
            EcosystemComponent(
                name="Apache Spark",
                category=ComponentCategory.PROCESSING,
                description="统一的大数据处理引擎,支持批处理、流处理、机器学习和图计算",
                key_features=[
                    "内存计算",
                    "多语言支持",
                    "统一API",
                    "流批一体",
                    "机器学习库"
                ],
                use_cases=[
                    "大数据分析",
                    "实时流处理",
                    "机器学习",
                    "ETL处理",
                    "图计算"
                ],
                maturity=MaturityLevel.MATURE,
                dependencies=["Scala", "Java", "Python"],
                alternatives=["Apache Flink", "Apache Storm", "MapReduce"],
                learning_curve="中等",
                popularity_score=10
            ),
            EcosystemComponent(
                name="Apache Flink",
                category=ComponentCategory.STREAMING,
                description="分布式流处理引擎,提供低延迟、高吞吐量的流处理能力",
                key_features=[
                    "真正的流处理",
                    "事件时间处理",
                    "状态管理",
                    "容错机制",
                    "精确一次语义"
                ],
                use_cases=[
                    "实时数据处理",
                    "事件驱动应用",
                    "实时分析",
                    "复杂事件处理"
                ],
                maturity=MaturityLevel.STABLE,
                dependencies=["Java", "Scala"],
                alternatives=["Apache Spark Streaming", "Apache Storm", "Apache Kafka Streams"],
                learning_curve="困难",
                popularity_score=8
            ),
            
            # 查询引擎
            EcosystemComponent(
                name="Apache Hive",
                category=ComponentCategory.QUERY_ENGINE,
                description="基于Hadoop的数据仓库软件,提供SQL查询接口",
                key_features=[
                    "SQL接口",
                    "元数据管理",
                    "多种存储格式",
                    "分区支持",
                    "UDF支持"
                ],
                use_cases=[
                    "数据仓库",
                    "批量数据分析",
                    "ETL处理",
                    "报表生成"
                ],
                maturity=MaturityLevel.MATURE,
                dependencies=["HDFS", "MapReduce/Tez/Spark"],
                alternatives=["Apache Impala", "Presto", "Apache Drill"],
                learning_curve="简单",
                popularity_score=8
            ),
            EcosystemComponent(
                name="Apache Impala",
                category=ComponentCategory.QUERY_ENGINE,
                description="高性能的分布式SQL查询引擎,专为Hadoop设计",
                key_features=[
                    "MPP架构",
                    "内存计算",
                    "实时查询",
                    "标准SQL",
                    "与Hadoop集成"
                ],
                use_cases=[
                    "交互式查询",
                    "实时分析",
                    "商业智能",
                    "即席查询"
                ],
                maturity=MaturityLevel.STABLE,
                dependencies=["HDFS", "HBase", "Hive Metastore"],
                alternatives=["Presto", "Apache Drill", "Spark SQL"],
                learning_curve="中等",
                popularity_score=7
            ),
            
            # 协调组件
            EcosystemComponent(
                name="Apache ZooKeeper",
                category=ComponentCategory.COORDINATION,
                description="分布式协调服务,为分布式应用提供配置管理、命名服务等",
                key_features=[
                    "配置管理",
                    "命名服务",
                    "分布式锁",
                    "集群管理",
                    "通知机制"
                ],
                use_cases=[
                    "配置中心",
                    "服务发现",
                    "分布式锁",
                    "集群协调",
                    "主从选举"
                ],
                maturity=MaturityLevel.MATURE,
                dependencies=["Java"],
                alternatives=["Apache Curator", "etcd", "Consul"],
                learning_curve="中等",
                popularity_score=8
            ),
            
            # 数据摄取
            EcosystemComponent(
                name="Apache Kafka",
                category=ComponentCategory.DATA_INGESTION,
                description="分布式流处理平台,提供高吞吐量的消息队列服务",
                key_features=[
                    "高吞吐量",
                    "持久化存储",
                    "分布式架构",
                    "实时处理",
                    "多消费者支持"
                ],
                use_cases=[
                    "消息队列",
                    "日志收集",
                    "事件流",
                    "数据管道",
                    "实时数据传输"
                ],
                maturity=MaturityLevel.MATURE,
                dependencies=["Java", "ZooKeeper"],
                alternatives=["Apache Pulsar", "RabbitMQ", "Amazon Kinesis"],
                learning_curve="中等",
                popularity_score=9
            ),
            EcosystemComponent(
                name="Apache Flume",
                category=ComponentCategory.DATA_INGESTION,
                description="分布式日志收集系统,专为Hadoop环境设计",
                key_features=[
                    "可靠数据传输",
                    "灵活配置",
                    "多种数据源",
                    "事务支持",
                    "负载均衡"
                ],
                use_cases=[
                    "日志收集",
                    "数据摄取",
                    "实时数据传输",
                    "ETL前端"
                ],
                maturity=MaturityLevel.STABLE,
                dependencies=["Java"],
                alternatives=["Apache Kafka", "Logstash", "Filebeat"],
                learning_curve="简单",
                popularity_score=6
            ),
            
            # 工作流
            EcosystemComponent(
                name="Apache Oozie",
                category=ComponentCategory.WORKFLOW,
                description="Hadoop作业调度系统,管理复杂的数据处理工作流",
                key_features=[
                    "工作流调度",
                    "依赖管理",
                    "错误处理",
                    "Web界面",
                    "多种作业类型"
                ],
                use_cases=[
                    "ETL流程",
                    "批处理调度",
                    "数据管道",
                    "作业编排"
                ],
                maturity=MaturityLevel.STABLE,
                dependencies=["Hadoop"],
                alternatives=["Apache Airflow", "Azkaban", "Luigi"],
                learning_curve="中等",
                popularity_score=5
            ),
            
            # 监控
            EcosystemComponent(
                name="Apache Ambari",
                category=ComponentCategory.MONITORING,
                description="Hadoop集群管理和监控平台",
                key_features=[
                    "集群管理",
                    "服务监控",
                    "配置管理",
                    "告警系统",
                    "Web界面"
                ],
                use_cases=[
                    "集群部署",
                    "运维监控",
                    "配置管理",
                    "性能监控"
                ],
                maturity=MaturityLevel.STABLE,
                dependencies=["Python", "Java"],
                alternatives=["Cloudera Manager", "Hortonworks Data Platform"],
                learning_curve="中等",
                popularity_score=6
            ),
            
            # 机器学习
            EcosystemComponent(
                name="Apache Mahout",
                category=ComponentCategory.MACHINE_LEARNING,
                description="可扩展的机器学习库,专为大数据环境设计",
                key_features=[
                    "分布式算法",
                    "可扩展性",
                    "多种算法",
                    "与Hadoop集成",
                    "数学库"
                ],
                use_cases=[
                    "推荐系统",
                    "聚类分析",
                    "分类算法",
                    "协同过滤"
                ],
                maturity=MaturityLevel.STABLE,
                dependencies=["Hadoop", "Spark"],
                alternatives=["Spark MLlib", "TensorFlow", "Scikit-learn"],
                learning_curve="困难",
                popularity_score=4
            )
        ]
        
        return components
    
    def _initialize_integration_patterns(self) -> Dict[str, Dict[str, Any]]:
        """
        初始化集成模式
        
        Returns:
            Dict[str, Dict[str, Any]]: 集成模式
        """
        patterns = {
            "Lambda架构": {
                "description": "结合批处理和流处理的混合架构",
                "components": ["Kafka", "Spark", "HBase", "HDFS"],
                "use_cases": ["实时分析", "历史数据处理", "低延迟查询"],
                "advantages": ["容错性强", "支持复杂查询", "数据一致性"],
                "disadvantages": ["复杂度高", "维护成本大", "数据重复"]
            },
            "Kappa架构": {
                "description": "纯流处理架构,统一批处理和流处理",
                "components": ["Kafka", "Flink", "Elasticsearch"],
                "use_cases": ["实时处理", "事件驱动", "简化架构"],
                "advantages": ["架构简单", "实时性好", "维护容易"],
                "disadvantages": ["历史数据处理复杂", "状态管理挑战"]
            },
            "数据湖架构": {
                "description": "集中存储结构化和非结构化数据的架构",
                "components": ["HDFS", "Spark", "Hive", "HBase"],
                "use_cases": ["数据存储", "数据探索", "机器学习"],
                "advantages": ["灵活性高", "成本低", "支持多种数据类型"],
                "disadvantages": ["数据质量挑战", "治理复杂"]
            },
            "现代数据栈": {
                "description": "云原生的现代数据处理架构",
                "components": ["Kafka", "Spark", "Delta Lake", "Kubernetes"],
                "use_cases": ["云端数据处理", "DevOps集成", "弹性扩展"],
                "advantages": ["云原生", "弹性扩展", "运维简化"],
                "disadvantages": ["云厂商绑定", "成本控制"]
            }
        }
        
        return patterns
    
    def _initialize_architecture_templates(self) -> Dict[str, Dict[str, Any]]:
        """
        初始化架构模板
        
        Returns:
            Dict[str, Dict[str, Any]]: 架构模板
        """
        templates = {
            "基础大数据平台": {
                "description": "适合初学者的基础Hadoop平台",
                "core_components": ["HDFS", "YARN", "MapReduce", "Hive"],
                "optional_components": ["HBase", "ZooKeeper", "Oozie"],
                "complexity": "简单",
                "use_cases": ["数据仓库", "批处理", "报表分析"],
                "deployment_size": "小型(3-10节点)"
            },
            "实时分析平台": {
                "description": "支持实时数据处理和分析的平台",
                "core_components": ["Kafka", "Spark Streaming", "HBase", "Elasticsearch"],
                "optional_components": ["Flink", "Redis", "Grafana"],
                "complexity": "中等",
                "use_cases": ["实时监控", "事件处理", "在线推荐"],
                "deployment_size": "中型(10-50节点)"
            },
            "机器学习平台": {
                "description": "支持大规模机器学习的数据平台",
                "core_components": ["Spark", "HDFS", "Jupyter", "MLflow"],
                "optional_components": ["Kubeflow", "TensorFlow", "PyTorch"],
                "complexity": "困难",
                "use_cases": ["模型训练", "特征工程", "模型部署"],
                "deployment_size": "大型(50+节点)"
            },
            "数据湖平台": {
                "description": "统一存储和处理多种数据类型的平台",
                "core_components": ["HDFS", "Spark", "Hive", "Presto"],
                "optional_components": ["Delta Lake", "Apache Iceberg", "Ranger"],
                "complexity": "中等",
                "use_cases": ["数据探索", "自助分析", "数据科学"],
                "deployment_size": "中大型(20-100节点)"
            }
        }
        
        return templates
    
    def get_components_by_category(self, category: ComponentCategory) -> List[EcosystemComponent]:
        """
        按类别获取组件
        
        Args:
            category: 组件类别
            
        Returns:
            List[EcosystemComponent]: 组件列表
        """
        return [comp for comp in self.components if comp.category == category]
    
    def get_component_by_name(self, name: str) -> Optional[EcosystemComponent]:
        """
        按名称获取组件
        
        Args:
            name: 组件名称
            
        Returns:
            Optional[EcosystemComponent]: 组件信息
        """
        return next((comp for comp in self.components if comp.name == name), None)
    
    def recommend_components(self, use_case: str, complexity_preference: str = "中等") -> List[Dict[str, Any]]:
        """
        根据用例推荐组件
        
        Args:
            use_case: 使用场景
            complexity_preference: 复杂度偏好
            
        Returns:
            List[Dict[str, Any]]: 推荐组件列表
        """
        recommendations = []
        
        for component in self.components:
            # 检查用例匹配
            use_case_match = any(use_case.lower() in uc.lower() for uc in component.use_cases)
            
            # 检查复杂度匹配
            complexity_match = component.learning_curve == complexity_preference
            
            if use_case_match:
                score = component.popularity_score
                if complexity_match:
                    score += 2
                
                recommendations.append({
                    'component': component,
                    'score': score,
                    'reason': f"适用于{use_case},学习曲线{component.learning_curve}"
                })
        
        # 按评分排序
        recommendations.sort(key=lambda x: x['score'], reverse=True)
        
        return recommendations[:5]  # 返回前5个推荐
    
    def generate_architecture_recommendation(self, requirements: Dict[str, Any]) -> Dict[str, Any]:
        """
        生成架构推荐
        
        Args:
            requirements: 需求描述
            
        Returns:
            Dict[str, Any]: 架构推荐
        """
        use_cases = requirements.get('use_cases', [])
        data_volume = requirements.get('data_volume', 'medium')  # small, medium, large
        real_time_requirement = requirements.get('real_time', False)
        complexity_tolerance = requirements.get('complexity', '中等')
        
        recommended_components = []
        
        # 基础组件(总是需要)
        recommended_components.extend(['HDFS', 'YARN'])
        
        # 根据用例添加组件
        if 'batch_processing' in use_cases or 'data_warehouse' in use_cases:
            recommended_components.extend(['Hive', 'Spark'])
        
        if real_time_requirement:
            recommended_components.extend(['Kafka', 'Spark Streaming'])
            if complexity_tolerance == '困难':
                recommended_components.append('Flink')
        
        if 'machine_learning' in use_cases:
            recommended_components.extend(['Spark', 'Jupyter'])
        
        if 'nosql' in use_cases or 'real_time_access' in use_cases:
            recommended_components.append('HBase')
        
        if len(recommended_components) > 5:  # 复杂系统需要协调
            recommended_components.append('ZooKeeper')
        
        # 选择架构模板
        template = None
        if real_time_requirement:
            template = self.architecture_templates['实时分析平台']
        elif 'machine_learning' in use_cases:
            template = self.architecture_templates['机器学习平台']
        elif data_volume == 'large':
            template = self.architecture_templates['数据湖平台']
        else:
            template = self.architecture_templates['基础大数据平台']
        
        return {
            'recommended_components': list(set(recommended_components)),
            'architecture_template': template,
            'integration_pattern': self._suggest_integration_pattern(requirements),
            'deployment_considerations': self._generate_deployment_considerations(requirements),
            'learning_path': self._generate_learning_path(recommended_components)
        }
    
    def _suggest_integration_pattern(self, requirements: Dict[str, Any]) -> Dict[str, Any]:
        """
        建议集成模式
        
        Args:
            requirements: 需求描述
            
        Returns:
            Dict[str, Any]: 集成模式建议
        """
        if requirements.get('real_time', False) and requirements.get('batch_processing', False):
            return self.integration_patterns['Lambda架构']
        elif requirements.get('real_time', False):
            return self.integration_patterns['Kappa架构']
        elif requirements.get('data_exploration', False):
            return self.integration_patterns['数据湖架构']
        else:
            return self.integration_patterns['现代数据栈']
    
    def _generate_deployment_considerations(self, requirements: Dict[str, Any]) -> List[str]:
        """
        生成部署考虑事项
        
        Args:
            requirements: 需求描述
            
        Returns:
            List[str]: 部署考虑事项
        """
        considerations = []
        
        data_volume = requirements.get('data_volume', 'medium')
        if data_volume == 'large':
            considerations.append("考虑使用SSD存储提高I/O性能")
            considerations.append("规划足够的网络带宽")
            considerations.append("考虑数据分层存储策略")
        
        if requirements.get('real_time', False):
            considerations.append("优化网络延迟")
            considerations.append("配置足够的内存")
            considerations.append("考虑使用SSD作为缓存")
        
        if requirements.get('high_availability', False):
            considerations.append("部署多个数据中心")
            considerations.append("配置组件高可用")
            considerations.append("建立灾备方案")
        
        considerations.extend([
            "制定监控和告警策略",
            "规划容量增长",
            "建立安全策略",
            "制定备份和恢复计划"
        ])
        
        return considerations
    
    def _generate_learning_path(self, components: List[str]) -> List[Dict[str, str]]:
        """
        生成学习路径
        
        Args:
            components: 组件列表
            
        Returns:
            List[Dict[str, str]]: 学习路径
        """
        learning_order = {
            'HDFS': 1,
            'YARN': 2,
            'MapReduce': 3,
            'Hive': 4,
            'Spark': 5,
            'HBase': 6,
            'Kafka': 7,
            'ZooKeeper': 8,
            'Flink': 9
        }
        
        # 按学习顺序排序
        sorted_components = sorted(
            [comp for comp in components if comp in learning_order],
            key=lambda x: learning_order.get(x, 999)
        )
        
        learning_path = []
        for i, comp in enumerate(sorted_components, 1):
            component_obj = self.get_component_by_name(comp)
            if component_obj:
                learning_path.append({
                    'step': i,
                    'component': comp,
                    'difficulty': component_obj.learning_curve,
                    'estimated_time': self._estimate_learning_time(component_obj.learning_curve),
                    'prerequisites': component_obj.dependencies
                })
        
        return learning_path
    
    def _estimate_learning_time(self, difficulty: str) -> str:
        """
        估算学习时间
        
        Args:
            difficulty: 难度级别
            
        Returns:
            str: 估算学习时间
        """
        time_mapping = {
            '简单': '1-2周',
            '中等': '2-4周',
            '困难': '4-8周'
        }
        return time_mapping.get(difficulty, '2-4周')
    
    def generate_ecosystem_report(self) -> Dict[str, Any]:
        """
        生成生态系统报告
        
        Returns:
            Dict[str, Any]: 生态系统报告
        """
        category_stats = {}
        for category in ComponentCategory:
            components = self.get_components_by_category(category)
            category_stats[category.value] = {
                'count': len(components),
                'components': [comp.name for comp in components],
                'avg_popularity': sum(comp.popularity_score for comp in components) / len(components) if components else 0
            }
        
        maturity_stats = {}
        for maturity in MaturityLevel:
            components = [comp for comp in self.components if comp.maturity == maturity]
            maturity_stats[maturity.value] = len(components)
        
        return {
            'total_components': len(self.components),
            'category_distribution': category_stats,
            'maturity_distribution': maturity_stats,
            'top_components': sorted(
                self.components,
                key=lambda x: x.popularity_score,
                reverse=True
            )[:5],
            'integration_patterns': list(self.integration_patterns.keys()),
            'architecture_templates': list(self.architecture_templates.keys())
        }

# 使用示例
if __name__ == "__main__":
    # 创建生态系统指南
    ecosystem = HadoopEcosystemGuide()
    
    print("=== Hadoop生态系统组件指南 ===")
    
    # 按类别显示组件
    print("\n=== 按类别显示组件 ===")
    for category in ComponentCategory:
        components = ecosystem.get_components_by_category(category)
        if components:
            print(f"\n{category.value}:")
            for comp in components:
                print(f"  - {comp.name}: {comp.description}")
    
    # 组件推荐示例
    print("\n=== 组件推荐示例 ===")
    recommendations = ecosystem.recommend_components("实时处理", "中等")
    print("针对'实时处理'用例的推荐:")
    for rec in recommendations:
        print(f"  {rec['component'].name} (评分: {rec['score']}) - {rec['reason']}")
    
    # 架构推荐示例
    print("\n=== 架构推荐示例 ===")
    requirements = {
        'use_cases': ['batch_processing', 'real_time'],
        'data_volume': 'large',
        'real_time': True,
        'complexity': '中等',
        'high_availability': True
    }
    
    arch_rec = ecosystem.generate_architecture_recommendation(requirements)
    print("推荐组件:", arch_rec['recommended_components'])
    print("架构模板:", arch_rec['architecture_template']['description'])
    print("集成模式:", arch_rec['integration_pattern']['description'])
    
    print("\n部署考虑事项:")
    for consideration in arch_rec['deployment_considerations']:
        print(f"  - {consideration}")
    
    print("\n学习路径:")
    for step in arch_rec['learning_path']:
        print(f"  {step['step']}. {step['component']} ({step['difficulty']}, 预计{step['estimated_time']})")
    
    # 生态系统报告
    print("\n=== 生态系统报告 ===")
    report = ecosystem.generate_ecosystem_report()
    print(f"总组件数: {report['total_components']}")
    
    print("\n类别分布:")
    for category, stats in report['category_distribution'].items():
        print(f"  {category}: {stats['count']}个组件 (平均流行度: {stats['avg_popularity']:.1f})")
    
    print("\n成熟度分布:")
    for maturity, count in report['maturity_distribution'].items():
        print(f"  {maturity}: {count}个组件")
    
    print("\n最受欢迎的组件:")
    for comp in report['top_components']:
        print(f"  {comp.name} (流行度: {comp.popularity_score})")

1.2 生态系统架构层次

from typing import Dict, List, Tuple
from dataclasses import dataclass
from enum import Enum

class ArchitectureLayer(Enum):
    """架构层次"""
    STORAGE = "存储层"
    RESOURCE_MANAGEMENT = "资源管理层"
    PROCESSING = "处理层"
    COORDINATION = "协调层"
    APPLICATION = "应用层"
    INTERFACE = "接口层"

@dataclass
class LayerComponent:
    """层次组件"""
    name: str
    layer: ArchitectureLayer
    description: str
    responsibilities: List[str]
    interfaces: List[str]
    dependencies: List[str]

class EcosystemArchitecture:
    """
    生态系统架构
    """
    
    def __init__(self):
        self.layers = self._initialize_layers()
        self.component_relationships = self._initialize_relationships()
    
    def _initialize_layers(self) -> Dict[ArchitectureLayer, List[LayerComponent]]:
        """
        初始化架构层次
        
        Returns:
            Dict[ArchitectureLayer, List[LayerComponent]]: 层次组件映射
        """
        layers = {
            ArchitectureLayer.STORAGE: [
                LayerComponent(
                    name="HDFS",
                    layer=ArchitectureLayer.STORAGE,
                    description="分布式文件系统",
                    responsibilities=["数据存储", "数据复制", "容错处理"],
                    interfaces=["WebHDFS", "Java API", "命令行"],
                    dependencies=["Java"]
                ),
                LayerComponent(
                    name="HBase",
                    layer=ArchitectureLayer.STORAGE,
                    description="NoSQL数据库",
                    responsibilities=["实时数据访问", "列式存储", "自动分片"],
                    interfaces=["Java API", "REST API", "Thrift"],
                    dependencies=["HDFS", "ZooKeeper"]
                )
            ],
            ArchitectureLayer.RESOURCE_MANAGEMENT: [
                LayerComponent(
                    name="YARN",
                    layer=ArchitectureLayer.RESOURCE_MANAGEMENT,
                    description="资源管理器",
                    responsibilities=["资源分配", "作业调度", "集群管理"],
                    interfaces=["ResourceManager API", "Web UI"],
                    dependencies=["HDFS"]
                )
            ],
            ArchitectureLayer.PROCESSING: [
                LayerComponent(
                    name="MapReduce",
                    layer=ArchitectureLayer.PROCESSING,
                    description="批处理框架",
                    responsibilities=["批量数据处理", "分布式计算"],
                    interfaces=["Java API", "Streaming API"],
                    dependencies=["YARN", "HDFS"]
                ),
                LayerComponent(
                    name="Spark",
                    layer=ArchitectureLayer.PROCESSING,
                    description="统一计算引擎",
                    responsibilities=["批处理", "流处理", "机器学习", "图计算"],
                    interfaces=["Scala API", "Python API", "Java API", "R API"],
                    dependencies=["YARN", "HDFS"]
                )
            ],
            ArchitectureLayer.COORDINATION: [
                LayerComponent(
                    name="ZooKeeper",
                    layer=ArchitectureLayer.COORDINATION,
                    description="分布式协调服务",
                    responsibilities=["配置管理", "命名服务", "分布式锁"],
                    interfaces=["Java API", "C API", "命令行"],
                    dependencies=["Java"]
                )
            ],
            ArchitectureLayer.APPLICATION: [
                LayerComponent(
                    name="Hive",
                    layer=ArchitectureLayer.APPLICATION,
                    description="数据仓库软件",
                    responsibilities=["SQL查询", "元数据管理", "数据仓库"],
                    interfaces=["HiveQL", "JDBC", "ODBC"],
                    dependencies=["HDFS", "MapReduce/Spark"]
                ),
                LayerComponent(
                    name="HBase",
                    layer=ArchitectureLayer.APPLICATION,
                    description="NoSQL数据库",
                    responsibilities=["实时数据访问", "大规模数据存储"],
                    interfaces=["Java API", "REST API", "Shell"],
                    dependencies=["HDFS", "ZooKeeper"]
                )
            ],
            ArchitectureLayer.INTERFACE: [
                LayerComponent(
                    name="Ambari",
                    layer=ArchitectureLayer.INTERFACE,
                    description="集群管理界面",
                    responsibilities=["集群监控", "配置管理", "服务管理"],
                    interfaces=["Web UI", "REST API"],
                    dependencies=["所有Hadoop组件"]
                )
            ]
        }
        
        return layers
    
    def _initialize_relationships(self) -> Dict[str, List[str]]:
        """
        初始化组件关系
        
        Returns:
            Dict[str, List[str]]: 组件依赖关系
        """
        relationships = {
            "HDFS": [],  # 基础层,无依赖
            "YARN": ["HDFS"],
            "MapReduce": ["YARN", "HDFS"],
            "Spark": ["YARN", "HDFS"],
            "Hive": ["HDFS", "MapReduce"],
            "HBase": ["HDFS", "ZooKeeper"],
            "ZooKeeper": [],
            "Kafka": ["ZooKeeper"],
            "Flume": ["HDFS"],
            "Oozie": ["YARN", "HDFS"],
            "Ambari": ["所有组件"]
        }
        
        return relationships
    
    def get_layer_components(self, layer: ArchitectureLayer) -> List[LayerComponent]:
        """
        获取指定层的组件
        
        Args:
            layer: 架构层次
            
        Returns:
            List[LayerComponent]: 组件列表
        """
        return self.layers.get(layer, [])
    
    def get_component_dependencies(self, component_name: str) -> List[str]:
        """
        获取组件依赖
        
        Args:
            component_name: 组件名称
            
        Returns:
            List[str]: 依赖组件列表
        """
        return self.component_relationships.get(component_name, [])
    
    def generate_deployment_order(self, components: List[str]) -> List[List[str]]:
        """
        生成部署顺序
        
        Args:
            components: 要部署的组件列表
            
        Returns:
            List[List[str]]: 按层次分组的部署顺序
        """
        # 拓扑排序算法
        in_degree = {comp: 0 for comp in components}
        graph = {comp: [] for comp in components}
        
        # 构建图和计算入度
        for comp in components:
            deps = self.get_component_dependencies(comp)
            for dep in deps:
                if dep in components:
                    graph[dep].append(comp)
                    in_degree[comp] += 1
        
        # 拓扑排序
        result = []
        queue = [comp for comp in components if in_degree[comp] == 0]
        
        while queue:
            current_level = queue[:]
            queue = []
            result.append(current_level)
            
            for comp in current_level:
                for neighbor in graph[comp]:
                    in_degree[neighbor] -= 1
                    if in_degree[neighbor] == 0:
                        queue.append(neighbor)
        
        return result
    
    def analyze_architecture_complexity(self, components: List[str]) -> Dict[str, Any]:
        """
        分析架构复杂度
        
        Args:
            components: 组件列表
            
        Returns:
            Dict[str, Any]: 复杂度分析结果
        """
        # 计算层次分布
        layer_distribution = {}
        for layer in ArchitectureLayer:
            layer_components = [comp.name for comp in self.get_layer_components(layer)]
            count = len([comp for comp in components if comp in layer_components])
            if count > 0:
                layer_distribution[layer.value] = count
        
        # 计算依赖复杂度
        total_dependencies = sum(
            len(self.get_component_dependencies(comp)) 
            for comp in components
        )
        
        # 计算复杂度评分
        complexity_score = len(components) + total_dependencies
        
        if complexity_score <= 5:
            complexity_level = "简单"
        elif complexity_score <= 15:
            complexity_level = "中等"
        else:
            complexity_level = "复杂"
        
        return {
            'total_components': len(components),
            'layer_distribution': layer_distribution,
            'total_dependencies': total_dependencies,
            'complexity_score': complexity_score,
            'complexity_level': complexity_level,
            'deployment_phases': len(self.generate_deployment_order(components))
        }
    
    def generate_architecture_diagram_data(self, components: List[str]) -> Dict[str, Any]:
        """
        生成架构图数据
        
        Args:
            components: 组件列表
            
        Returns:
            Dict[str, Any]: 架构图数据
        """
        nodes = []
        edges = []
        
        # 生成节点
        for comp in components:
            # 查找组件所属层次
            layer = None
            for arch_layer, layer_components in self.layers.items():
                if any(lc.name == comp for lc in layer_components):
                    layer = arch_layer.value
                    break
            
            nodes.append({
                'id': comp,
                'label': comp,
                'layer': layer or "未知",
                'dependencies': self.get_component_dependencies(comp)
            })
        
        # 生成边
        for comp in components:
            deps = self.get_component_dependencies(comp)
            for dep in deps:
                if dep in components:
                    edges.append({
                        'from': dep,
                        'to': comp,
                        'type': 'dependency'
                    })
        
        return {
            'nodes': nodes,
            'edges': edges,
            'layers': list(set(node['layer'] for node in nodes))
        }

# 使用示例
if __name__ == "__main__":
    # 创建架构分析器
    architecture = EcosystemArchitecture()
    
    print("=== Hadoop生态系统架构分析 ===")
    
    # 显示各层组件
    print("\n=== 架构层次 ===")
    for layer in ArchitectureLayer:
        components = architecture.get_layer_components(layer)
        if components:
            print(f"\n{layer.value}:")
            for comp in components:
                print(f"  - {comp.name}: {comp.description}")
                print(f"    职责: {', '.join(comp.responsibilities)}")
                print(f"    接口: {', '.join(comp.interfaces)}")
    
    # 分析示例架构
    example_components = ["HDFS", "YARN", "Spark", "Hive", "HBase", "ZooKeeper"]
    
    print("\n=== 部署顺序分析 ===")
    deployment_order = architecture.generate_deployment_order(example_components)
    for i, phase in enumerate(deployment_order, 1):
        print(f"阶段 {i}: {', '.join(phase)}")
    
    print("\n=== 架构复杂度分析 ===")
    complexity = architecture.analyze_architecture_complexity(example_components)
    print(f"总组件数: {complexity['total_components']}")
    print(f"总依赖数: {complexity['total_dependencies']}")
    print(f"复杂度评分: {complexity['complexity_score']}")
    print(f"复杂度级别: {complexity['complexity_level']}")
    print(f"部署阶段数: {complexity['deployment_phases']}")
    
    print("\n层次分布:")
    for layer, count in complexity['layer_distribution'].items():
        print(f"  {layer}: {count}个组件")
    
    print("\n=== 架构图数据 ===")
    diagram_data = architecture.generate_architecture_diagram_data(example_components)
    print(f"节点数: {len(diagram_data['nodes'])}")
    print(f"边数: {len(diagram_data['edges'])}")
    print(f"涉及层次: {', '.join(diagram_data['layers'])}")

2. 核心存储组件

2.1 HDFS深入解析

我们在前面的章节已经详细介绍了HDFS,这里重点关注其在生态系统中的作用和与其他组件的集成。

2.2 HBase详解

from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class HBaseTable:
    """HBase表结构"""
    name: str
    column_families: List[str]
    regions: int
    compression: str
    bloom_filter: bool
    max_versions: int

@dataclass
class HBaseRegion:
    """HBase区域"""
    table_name: str
    start_key: str
    end_key: str
    region_server: str
    size_mb: float
    read_requests: int
    write_requests: int

class HBaseClusterManager:
    """
    HBase集群管理器
    """
    
    def __init__(self):
        self.tables = {}
        self.regions = {}
        self.region_servers = {}
        self.performance_metrics = {}
    
    def create_table(self, table_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        创建HBase表
        
        Args:
            table_config: 表配置
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        table = HBaseTable(
            name=table_config['name'],
            column_families=table_config.get('column_families', ['cf1']),
            regions=table_config.get('regions', 1),
            compression=table_config.get('compression', 'SNAPPY'),
            bloom_filter=table_config.get('bloom_filter', True),
            max_versions=table_config.get('max_versions', 3)
        )
        
        self.tables[table.name] = table
        
        # 创建初始区域
        self._create_initial_regions(table)
        
        return {
            'status': 'success',
            'table_name': table.name,
            'column_families': table.column_families,
            'initial_regions': table.regions,
            'configuration': {
                'compression': table.compression,
                'bloom_filter': table.bloom_filter,
                'max_versions': table.max_versions
            }
        }
    
    def _create_initial_regions(self, table: HBaseTable):
        """
        创建初始区域
        
        Args:
            table: HBase表
        """
        if table.name not in self.regions:
            self.regions[table.name] = []
        
        # 创建预分区
        for i in range(table.regions):
            start_key = f"{i:08d}" if i > 0 else ""
            end_key = f"{i+1:08d}" if i < table.regions - 1 else ""
            
            region = HBaseRegion(
                table_name=table.name,
                start_key=start_key,
                end_key=end_key,
                region_server=f"rs{i % 3 + 1}",  # 简单的负载均衡
                size_mb=0.0,
                read_requests=0,
                write_requests=0
            )
            
            self.regions[table.name].append(region)
    
    def put_data(self, table_name: str, row_key: str, column_family: str, 
                 column: str, value: str) -> Dict[str, Any]:
        """
        插入数据
        
        Args:
            table_name: 表名
            row_key: 行键
            column_family: 列族
            column: 列名
            value: 值
            
        Returns:
            Dict[str, Any]: 插入结果
        """
        if table_name not in self.tables:
            return {'status': 'error', 'message': f'Table {table_name} not found'}
        
        table = self.tables[table_name]
        if column_family not in table.column_families:
            return {'status': 'error', 'message': f'Column family {column_family} not found'}
        
        # 找到对应的区域
        region = self._find_region(table_name, row_key)
        if region:
            region.write_requests += 1
            region.size_mb += len(value) / (1024 * 1024)  # 简化的大小计算
            
            return {
                'status': 'success',
                'table': table_name,
                'row_key': row_key,
                'column': f'{column_family}:{column}',
                'region_server': region.region_server,
                'timestamp': datetime.now().isoformat()
            }
        
        return {'status': 'error', 'message': 'No suitable region found'}
    
    def get_data(self, table_name: str, row_key: str, 
                 column_family: str = None, column: str = None) -> Dict[str, Any]:
        """
        获取数据
        
        Args:
            table_name: 表名
            row_key: 行键
            column_family: 列族(可选)
            column: 列名(可选)
            
        Returns:
            Dict[str, Any]: 查询结果
        """
        if table_name not in self.tables:
            return {'status': 'error', 'message': f'Table {table_name} not found'}
        
        # 找到对应的区域
        region = self._find_region(table_name, row_key)
        if region:
            region.read_requests += 1
            
            # 模拟数据返回
            result = {
                'status': 'success',
                'table': table_name,
                'row_key': row_key,
                'region_server': region.region_server,
                'data': self._simulate_data_retrieval(table_name, row_key, column_family, column),
                'timestamp': datetime.now().isoformat()
            }
            
            return result
        
        return {'status': 'error', 'message': 'No suitable region found'}
    
    def _find_region(self, table_name: str, row_key: str) -> Optional[HBaseRegion]:
        """
        查找行键对应的区域
        
        Args:
            table_name: 表名
            row_key: 行键
            
        Returns:
            Optional[HBaseRegion]: 区域信息
        """
        if table_name not in self.regions:
            return None
        
        for region in self.regions[table_name]:
            # 简化的区域查找逻辑
            if (not region.start_key or row_key >= region.start_key) and \
               (not region.end_key or row_key < region.end_key):
                return region
        
        # 如果没找到,返回第一个区域
        return self.regions[table_name][0] if self.regions[table_name] else None
    
    def _simulate_data_retrieval(self, table_name: str, row_key: str, 
                                column_family: str, column: str) -> Dict[str, Any]:
        """
        模拟数据检索
        
        Args:
            table_name: 表名
            row_key: 行键
            column_family: 列族
            column: 列名
            
        Returns:
            Dict[str, Any]: 模拟数据
        """
        table = self.tables[table_name]
        data = {}
        
        # 如果指定了列族和列
        if column_family and column:
            if column_family in table.column_families:
                data[f'{column_family}:{column}'] = f'value_for_{row_key}_{column}'
        # 如果只指定了列族
        elif column_family:
            if column_family in table.column_families:
                for i in range(3):  # 模拟3个列
                    data[f'{column_family}:col{i}'] = f'value_for_{row_key}_col{i}'
        # 返回所有列族的数据
        else:
            for cf in table.column_families:
                for i in range(3):
                    data[f'{cf}:col{i}'] = f'value_for_{row_key}_{cf}_col{i}'
        
        return data
    
    def scan_table(self, table_name: str, start_row: str = None, 
                   end_row: str = None, limit: int = 100) -> Dict[str, Any]:
        """
        扫描表
        
        Args:
            table_name: 表名
            start_row: 起始行键
            end_row: 结束行键
            limit: 限制行数
            
        Returns:
            Dict[str, Any]: 扫描结果
        """
        if table_name not in self.tables:
            return {'status': 'error', 'message': f'Table {table_name} not found'}
        
        # 模拟扫描结果
        results = []
        for i in range(min(limit, 10)):  # 模拟最多10行数据
            row_key = f"row_{i:06d}"
            if start_row and row_key < start_row:
                continue
            if end_row and row_key >= end_row:
                break
            
            row_data = self._simulate_data_retrieval(table_name, row_key, None, None)
            results.append({
                'row_key': row_key,
                'data': row_data
            })
        
        return {
            'status': 'success',
            'table': table_name,
            'results': results,
            'count': len(results),
            'scan_range': {
                'start_row': start_row,
                'end_row': end_row
            }
        }
    
    def split_region(self, table_name: str, region_index: int, split_key: str) -> Dict[str, Any]:
        """
        分裂区域
        
        Args:
            table_name: 表名
            region_index: 区域索引
            split_key: 分裂键
            
        Returns:
            Dict[str, Any]: 分裂结果
        """
        if table_name not in self.regions:
            return {'status': 'error', 'message': f'Table {table_name} not found'}
        
        regions = self.regions[table_name]
        if region_index >= len(regions):
            return {'status': 'error', 'message': 'Invalid region index'}
        
        original_region = regions[region_index]
        
        # 创建两个新区域
        region1 = HBaseRegion(
            table_name=table_name,
            start_key=original_region.start_key,
            end_key=split_key,
            region_server=original_region.region_server,
            size_mb=original_region.size_mb / 2,
            read_requests=0,
            write_requests=0
        )
        
        region2 = HBaseRegion(
            table_name=table_name,
            start_key=split_key,
            end_key=original_region.end_key,
            region_server=self._select_region_server(),
            size_mb=original_region.size_mb / 2,
            read_requests=0,
            write_requests=0
        )
        
        # 替换原区域
        regions[region_index] = region1
        regions.append(region2)
        
        return {
            'status': 'success',
            'original_region': {
                'start_key': original_region.start_key,
                'end_key': original_region.end_key,
                'size_mb': original_region.size_mb
            },
            'new_regions': [
                {
                    'start_key': region1.start_key,
                    'end_key': region1.end_key,
                    'region_server': region1.region_server
                },
                {
                    'start_key': region2.start_key,
                    'end_key': region2.end_key,
                    'region_server': region2.region_server
                }
            ],
            'split_key': split_key
        }
    
    def _select_region_server(self) -> str:
        """
        选择区域服务器
        
        Returns:
            str: 区域服务器名称
        """
        # 简单的负载均衡策略
        servers = ['rs1', 'rs2', 'rs3']
        server_loads = {server: 0 for server in servers}
        
        # 计算每个服务器的负载
        for table_regions in self.regions.values():
            for region in table_regions:
                if region.region_server in server_loads:
                    server_loads[region.region_server] += 1
        
        # 返回负载最小的服务器
        return min(server_loads.items(), key=lambda x: x[1])[0]
    
    def get_table_info(self, table_name: str) -> Dict[str, Any]:
        """
        获取表信息
        
        Args:
            table_name: 表名
            
        Returns:
            Dict[str, Any]: 表信息
        """
        if table_name not in self.tables:
            return {'status': 'error', 'message': f'Table {table_name} not found'}
        
        table = self.tables[table_name]
        regions = self.regions.get(table_name, [])
        
        # 计算统计信息
        total_size = sum(region.size_mb for region in regions)
        total_read_requests = sum(region.read_requests for region in regions)
        total_write_requests = sum(region.write_requests for region in regions)
        
        region_info = []
        for i, region in enumerate(regions):
            region_info.append({
                'index': i,
                'start_key': region.start_key,
                'end_key': region.end_key,
                'region_server': region.region_server,
                'size_mb': round(region.size_mb, 2),
                'read_requests': region.read_requests,
                'write_requests': region.write_requests
            })
        
        return {
            'status': 'success',
            'table_name': table_name,
            'column_families': table.column_families,
            'configuration': {
                'compression': table.compression,
                'bloom_filter': table.bloom_filter,
                'max_versions': table.max_versions
            },
            'statistics': {
                'region_count': len(regions),
                'total_size_mb': round(total_size, 2),
                'total_read_requests': total_read_requests,
                'total_write_requests': total_write_requests
            },
            'regions': region_info
        }
    
    def compact_table(self, table_name: str, compact_type: str = 'minor') -> Dict[str, Any]:
        """
        压缩表
        
        Args:
            table_name: 表名
            compact_type: 压缩类型(minor/major)
            
        Returns:
            Dict[str, Any]: 压缩结果
        """
        if table_name not in self.tables:
            return {'status': 'error', 'message': f'Table {table_name} not found'}
        
        regions = self.regions.get(table_name, [])
        
        # 模拟压缩过程
        compacted_regions = 0
        size_reduction = 0.0
        
        for region in regions:
            if compact_type == 'minor':
                # 小压缩:减少10-20%的大小
                reduction = region.size_mb * 0.15
            else:
                # 大压缩:减少20-40%的大小
                reduction = region.size_mb * 0.30
            
            region.size_mb = max(0, region.size_mb - reduction)
            size_reduction += reduction
            compacted_regions += 1
        
        return {
            'status': 'success',
            'table_name': table_name,
            'compact_type': compact_type,
            'compacted_regions': compacted_regions,
            'size_reduction_mb': round(size_reduction, 2),
            'completion_time': datetime.now().isoformat()
        }
    
    def get_cluster_status(self) -> Dict[str, Any]:
        """
        获取集群状态
        
        Returns:
            Dict[str, Any]: 集群状态
        """
        # 统计所有表和区域
        total_tables = len(self.tables)
        total_regions = sum(len(regions) for regions in self.regions.values())
        total_size = sum(
            sum(region.size_mb for region in regions)
            for regions in self.regions.values()
        )
        
        # 统计区域服务器负载
        server_stats = {}
        for table_regions in self.regions.values():
            for region in table_regions:
                server = region.region_server
                if server not in server_stats:
                    server_stats[server] = {
                        'region_count': 0,
                        'total_size_mb': 0.0,
                        'read_requests': 0,
                        'write_requests': 0
                    }
                
                server_stats[server]['region_count'] += 1
                server_stats[server]['total_size_mb'] += region.size_mb
                server_stats[server]['read_requests'] += region.read_requests
                server_stats[server]['write_requests'] += region.write_requests
        
        return {
            'cluster_summary': {
                'total_tables': total_tables,
                'total_regions': total_regions,
                'total_size_mb': round(total_size, 2),
                'region_servers': len(server_stats)
            },
            'region_servers': {
                server: {
                    'region_count': stats['region_count'],
                    'total_size_mb': round(stats['total_size_mb'], 2),
                    'read_requests': stats['read_requests'],
                    'write_requests': stats['write_requests']
                }
                for server, stats in server_stats.items()
            },
            'tables': list(self.tables.keys())
        }

# 使用示例
if __name__ == "__main__":
    # 创建HBase集群管理器
    hbase = HBaseClusterManager()
    
    print("=== HBase集群管理示例 ===")
    
    # 创建表
    print("\n=== 创建表 ===")
    table_config = {
        'name': 'user_profiles',
        'column_families': ['personal', 'contact', 'preferences'],
        'regions': 3,
        'compression': 'SNAPPY',
        'bloom_filter': True,
        'max_versions': 5
    }
    
    result = hbase.create_table(table_config)
    print(f"创建表结果: {result['status']}")
    print(f"表名: {result['table_name']}")
    print(f"列族: {result['column_families']}")
    print(f"初始区域数: {result['initial_regions']}")
    
    # 插入数据
    print("\n=== 插入数据 ===")
    put_result = hbase.put_data('user_profiles', 'user001', 'personal', 'name', 'John Doe')
    print(f"插入结果: {put_result}")
    
    put_result = hbase.put_data('user_profiles', 'user001', 'contact', 'email', 'john@example.com')
    print(f"插入结果: {put_result}")
    
    # 查询数据
    print("\n=== 查询数据 ===")
    get_result = hbase.get_data('user_profiles', 'user001')
    print(f"查询结果: {get_result}")
    
    # 扫描表
    print("\n=== 扫描表 ===")
    scan_result = hbase.scan_table('user_profiles', limit=5)
    print(f"扫描结果: 找到 {scan_result['count']} 行数据")
    
    # 获取表信息
    print("\n=== 表信息 ===")
    table_info = hbase.get_table_info('user_profiles')
    print(f"表名: {table_info['table_name']}")
    print(f"列族: {table_info['column_families']}")
    print(f"区域数: {table_info['statistics']['region_count']}")
    print(f"总大小: {table_info['statistics']['total_size_mb']} MB")
    
    # 分裂区域
    print("\n=== 分裂区域 ===")
    split_result = hbase.split_region('user_profiles', 0, 'user500')
    print(f"分裂结果: {split_result['status']}")
    if split_result['status'] == 'success':
        print(f"分裂键: {split_result['split_key']}")
        print(f"新区域数: {len(split_result['new_regions'])}")
    
    # 压缩表
    print("\n=== 压缩表 ===")
    compact_result = hbase.compact_table('user_profiles', 'major')
    print(f"压缩结果: {compact_result}")
    
    # 获取集群状态
    print("\n=== 集群状态 ===")
    cluster_status = hbase.get_cluster_status()
    print(f"总表数: {cluster_status['cluster_summary']['total_tables']}")
    print(f"总区域数: {cluster_status['cluster_summary']['total_regions']}")
    print(f"区域服务器数: {cluster_status['cluster_summary']['region_servers']}")
    
    print("\n区域服务器负载:")
     for server, stats in cluster_status['region_servers'].items():
         print(f"  {server}: {stats['region_count']}个区域, {stats['total_size_mb']}MB")

3. 数据处理组件

3.1 Apache Spark详解

from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
import json

class SparkJobStatus(Enum):
    """Spark作业状态"""
    SUBMITTED = "已提交"
    RUNNING = "运行中"
    SUCCEEDED = "成功"
    FAILED = "失败"
    CANCELLED = "已取消"

class SparkApplicationType(Enum):
    """Spark应用类型"""
    BATCH = "批处理"
    STREAMING = "流处理"
    MACHINE_LEARNING = "机器学习"
    GRAPH_PROCESSING = "图处理"
    SQL = "SQL查询"

@dataclass
class SparkJob:
    """Spark作业"""
    job_id: str
    app_id: str
    name: str
    job_type: SparkApplicationType
    status: SparkJobStatus
    submit_time: datetime
    start_time: Optional[datetime]
    end_time: Optional[datetime]
    driver_memory: str
    executor_memory: str
    executor_cores: int
    num_executors: int
    input_data_size: float  # GB
    output_data_size: float  # GB
    stages: List[Dict[str, Any]]
    metrics: Dict[str, Any]

@dataclass
class SparkCluster:
    """Spark集群"""
    cluster_id: str
    master_url: str
    total_cores: int
    total_memory_gb: float
    worker_nodes: List[str]
    active_applications: List[str]
    completed_applications: List[str]

class SparkClusterManager:
    """
    Spark集群管理器
    """
    
    def __init__(self):
        self.clusters = {}
        self.applications = {}
        self.jobs = {}
        self.performance_metrics = {}
        self.resource_usage = {}
    
    def create_cluster(self, cluster_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        创建Spark集群
        
        Args:
            cluster_config: 集群配置
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        cluster = SparkCluster(
            cluster_id=cluster_config['cluster_id'],
            master_url=cluster_config.get('master_url', 'spark://master:7077'),
            total_cores=cluster_config.get('total_cores', 16),
            total_memory_gb=cluster_config.get('total_memory_gb', 64.0),
            worker_nodes=cluster_config.get('worker_nodes', ['worker1', 'worker2', 'worker3']),
            active_applications=[],
            completed_applications=[]
        )
        
        self.clusters[cluster.cluster_id] = cluster
        
        # 初始化资源使用情况
        self.resource_usage[cluster.cluster_id] = {
            'used_cores': 0,
            'used_memory_gb': 0.0,
            'cpu_utilization': 0.0,
            'memory_utilization': 0.0
        }
        
        return {
            'status': 'success',
            'cluster_id': cluster.cluster_id,
            'master_url': cluster.master_url,
            'total_cores': cluster.total_cores,
            'total_memory_gb': cluster.total_memory_gb,
            'worker_nodes': cluster.worker_nodes
        }
    
    def submit_application(self, app_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        提交Spark应用
        
        Args:
            app_config: 应用配置
            
        Returns:
            Dict[str, Any]: 提交结果
        """
        cluster_id = app_config.get('cluster_id')
        if cluster_id not in self.clusters:
            return {'status': 'error', 'message': f'Cluster {cluster_id} not found'}
        
        cluster = self.clusters[cluster_id]
        
        # 检查资源是否足够
        required_cores = app_config.get('executor_cores', 2) * app_config.get('num_executors', 2)
        required_memory = float(app_config.get('executor_memory', '2g').replace('g', '')) * app_config.get('num_executors', 2)
        
        current_usage = self.resource_usage[cluster_id]
        if (current_usage['used_cores'] + required_cores > cluster.total_cores or
            current_usage['used_memory_gb'] + required_memory > cluster.total_memory_gb):
            return {
                'status': 'error',
                'message': 'Insufficient resources',
                'required': {'cores': required_cores, 'memory_gb': required_memory},
                'available': {
                    'cores': cluster.total_cores - current_usage['used_cores'],
                    'memory_gb': cluster.total_memory_gb - current_usage['used_memory_gb']
                }
            }
        
        # 创建应用
        app_id = f"app-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{len(self.applications)}"
        
        application = {
            'app_id': app_id,
            'name': app_config.get('name', 'Spark Application'),
            'cluster_id': cluster_id,
            'app_type': SparkApplicationType(app_config.get('app_type', 'BATCH')),
            'status': SparkJobStatus.SUBMITTED,
            'submit_time': datetime.now(),
            'driver_memory': app_config.get('driver_memory', '1g'),
            'executor_memory': app_config.get('executor_memory', '2g'),
            'executor_cores': app_config.get('executor_cores', 2),
            'num_executors': app_config.get('num_executors', 2),
            'main_class': app_config.get('main_class'),
            'jar_file': app_config.get('jar_file'),
            'arguments': app_config.get('arguments', []),
            'configuration': app_config.get('configuration', {})
        }
        
        self.applications[app_id] = application
        cluster.active_applications.append(app_id)
        
        # 更新资源使用
        current_usage['used_cores'] += required_cores
        current_usage['used_memory_gb'] += required_memory
        current_usage['cpu_utilization'] = (current_usage['used_cores'] / cluster.total_cores) * 100
        current_usage['memory_utilization'] = (current_usage['used_memory_gb'] / cluster.total_memory_gb) * 100
        
        return {
            'status': 'success',
            'app_id': app_id,
            'cluster_id': cluster_id,
            'submit_time': application['submit_time'].isoformat(),
            'resource_allocation': {
                'cores': required_cores,
                'memory_gb': required_memory,
                'executors': application['num_executors']
            }
        }
    
    def create_spark_job(self, job_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        创建Spark作业
        
        Args:
            job_config: 作业配置
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        app_id = job_config.get('app_id')
        if app_id not in self.applications:
            return {'status': 'error', 'message': f'Application {app_id} not found'}
        
        application = self.applications[app_id]
        
        job = SparkJob(
            job_id=f"job-{len(self.jobs)}",
            app_id=app_id,
            name=job_config.get('name', 'Spark Job'),
            job_type=application['app_type'],
            status=SparkJobStatus.SUBMITTED,
            submit_time=datetime.now(),
            start_time=None,
            end_time=None,
            driver_memory=application['driver_memory'],
            executor_memory=application['executor_memory'],
            executor_cores=application['executor_cores'],
            num_executors=application['num_executors'],
            input_data_size=job_config.get('input_data_size', 1.0),
            output_data_size=0.0,
            stages=[],
            metrics={}
        )
        
        self.jobs[job.job_id] = job
        
        return {
            'status': 'success',
            'job_id': job.job_id,
            'app_id': app_id,
            'submit_time': job.submit_time.isoformat()
        }
    
    def start_job(self, job_id: str) -> Dict[str, Any]:
        """
        启动作业
        
        Args:
            job_id: 作业ID
            
        Returns:
            Dict[str, Any]: 启动结果
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        job.status = SparkJobStatus.RUNNING
        job.start_time = datetime.now()
        
        # 模拟创建stages
        stages = self._create_job_stages(job)
        job.stages = stages
        
        return {
            'status': 'success',
            'job_id': job_id,
            'start_time': job.start_time.isoformat(),
            'stages': len(stages),
            'estimated_duration': self._estimate_job_duration(job)
        }
    
    def _create_job_stages(self, job: SparkJob) -> List[Dict[str, Any]]:
        """
        创建作业阶段
        
        Args:
            job: Spark作业
            
        Returns:
            List[Dict[str, Any]]: 阶段列表
        """
        stages = []
        
        if job.job_type == SparkApplicationType.BATCH:
            # 批处理作业的典型阶段
            stages = [
                {
                    'stage_id': 0,
                    'name': 'Data Loading',
                    'num_tasks': job.num_executors * 2,
                    'status': 'pending',
                    'input_size_gb': job.input_data_size,
                    'output_size_gb': job.input_data_size * 0.9
                },
                {
                    'stage_id': 1,
                    'name': 'Data Transformation',
                    'num_tasks': job.num_executors * 4,
                    'status': 'pending',
                    'input_size_gb': job.input_data_size * 0.9,
                    'output_size_gb': job.input_data_size * 0.7
                },
                {
                    'stage_id': 2,
                    'name': 'Data Aggregation',
                    'num_tasks': job.num_executors,
                    'status': 'pending',
                    'input_size_gb': job.input_data_size * 0.7,
                    'output_size_gb': job.input_data_size * 0.3
                }
            ]
        elif job.job_type == SparkApplicationType.MACHINE_LEARNING:
            # 机器学习作业的典型阶段
            stages = [
                {
                    'stage_id': 0,
                    'name': 'Feature Extraction',
                    'num_tasks': job.num_executors * 2,
                    'status': 'pending',
                    'input_size_gb': job.input_data_size,
                    'output_size_gb': job.input_data_size * 1.2
                },
                {
                    'stage_id': 1,
                    'name': 'Model Training',
                    'num_tasks': job.num_executors * 8,
                    'status': 'pending',
                    'input_size_gb': job.input_data_size * 1.2,
                    'output_size_gb': 0.1
                },
                {
                    'stage_id': 2,
                    'name': 'Model Evaluation',
                    'num_tasks': job.num_executors,
                    'status': 'pending',
                    'input_size_gb': job.input_data_size * 0.2,
                    'output_size_gb': 0.01
                }
            ]
        
        return stages
    
    def _estimate_job_duration(self, job: SparkJob) -> str:
        """
        估算作业持续时间
        
        Args:
            job: Spark作业
            
        Returns:
            str: 估算时间
        """
        # 基于数据大小和资源的简单估算
        base_time = job.input_data_size * 60  # 每GB 60秒
        resource_factor = max(1, 8 / job.num_executors)  # 资源调整因子
        
        estimated_seconds = base_time * resource_factor
        
        if estimated_seconds < 60:
            return f"{int(estimated_seconds)}秒"
        elif estimated_seconds < 3600:
            return f"{int(estimated_seconds/60)}分钟"
        else:
            return f"{estimated_seconds/3600:.1f}小时"
    
    def complete_job(self, job_id: str, success: bool = True) -> Dict[str, Any]:
        """
        完成作业
        
        Args:
            job_id: 作业ID
            success: 是否成功
            
        Returns:
            Dict[str, Any]: 完成结果
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        job.status = SparkJobStatus.SUCCEEDED if success else SparkJobStatus.FAILED
        job.end_time = datetime.now()
        
        # 计算作业指标
        if job.start_time:
            duration = (job.end_time - job.start_time).total_seconds()
            
            # 模拟输出数据大小
            if success:
                job.output_data_size = sum(stage.get('output_size_gb', 0) for stage in job.stages)
            
            job.metrics = {
                'duration_seconds': duration,
                'input_data_gb': job.input_data_size,
                'output_data_gb': job.output_data_size,
                'data_processing_rate_gb_per_sec': job.input_data_size / duration if duration > 0 else 0,
                'total_tasks': sum(stage.get('num_tasks', 0) for stage in job.stages),
                'resource_hours': (job.num_executors * job.executor_cores * duration) / 3600
            }
        
        # 释放资源
        self._release_job_resources(job)
        
        return {
            'status': 'success',
            'job_id': job_id,
            'final_status': job.status.value,
            'end_time': job.end_time.isoformat(),
            'metrics': job.metrics
        }
    
    def _release_job_resources(self, job: SparkJob):
        """
        释放作业资源
        
        Args:
            job: Spark作业
        """
        application = self.applications.get(job.app_id)
        if not application:
            return
        
        cluster_id = application['cluster_id']
        if cluster_id not in self.resource_usage:
            return
        
        # 计算释放的资源
        released_cores = job.executor_cores * job.num_executors
        released_memory = float(job.executor_memory.replace('g', '')) * job.num_executors
        
        # 更新资源使用
        current_usage = self.resource_usage[cluster_id]
        current_usage['used_cores'] = max(0, current_usage['used_cores'] - released_cores)
        current_usage['used_memory_gb'] = max(0, current_usage['used_memory_gb'] - released_memory)
        
        cluster = self.clusters[cluster_id]
        current_usage['cpu_utilization'] = (current_usage['used_cores'] / cluster.total_cores) * 100
        current_usage['memory_utilization'] = (current_usage['used_memory_gb'] / cluster.total_memory_gb) * 100
    
    def get_job_status(self, job_id: str) -> Dict[str, Any]:
        """
        获取作业状态
        
        Args:
            job_id: 作业ID
            
        Returns:
            Dict[str, Any]: 作业状态
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        
        result = {
            'job_id': job.job_id,
            'app_id': job.app_id,
            'name': job.name,
            'job_type': job.job_type.value,
            'status': job.status.value,
            'submit_time': job.submit_time.isoformat(),
            'resource_allocation': {
                'driver_memory': job.driver_memory,
                'executor_memory': job.executor_memory,
                'executor_cores': job.executor_cores,
                'num_executors': job.num_executors
            },
            'data_info': {
                'input_size_gb': job.input_data_size,
                'output_size_gb': job.output_data_size
            },
            'stages': job.stages
        }
        
        if job.start_time:
            result['start_time'] = job.start_time.isoformat()
        
        if job.end_time:
            result['end_time'] = job.end_time.isoformat()
        
        if job.metrics:
            result['metrics'] = job.metrics
        
        return result
    
    def get_cluster_status(self, cluster_id: str) -> Dict[str, Any]:
        """
        获取集群状态
        
        Args:
            cluster_id: 集群ID
            
        Returns:
            Dict[str, Any]: 集群状态
        """
        if cluster_id not in self.clusters:
            return {'status': 'error', 'message': f'Cluster {cluster_id} not found'}
        
        cluster = self.clusters[cluster_id]
        usage = self.resource_usage[cluster_id]
        
        # 统计应用状态
        active_apps = []
        for app_id in cluster.active_applications:
            if app_id in self.applications:
                app = self.applications[app_id]
                active_apps.append({
                    'app_id': app_id,
                    'name': app['name'],
                    'type': app['app_type'].value,
                    'status': app['status'].value,
                    'submit_time': app['submit_time'].isoformat()
                })
        
        return {
            'cluster_id': cluster_id,
            'master_url': cluster.master_url,
            'resources': {
                'total_cores': cluster.total_cores,
                'total_memory_gb': cluster.total_memory_gb,
                'used_cores': usage['used_cores'],
                'used_memory_gb': usage['used_memory_gb'],
                'available_cores': cluster.total_cores - usage['used_cores'],
                'available_memory_gb': cluster.total_memory_gb - usage['used_memory_gb'],
                'cpu_utilization': round(usage['cpu_utilization'], 2),
                'memory_utilization': round(usage['memory_utilization'], 2)
            },
            'worker_nodes': cluster.worker_nodes,
            'applications': {
                'active_count': len(cluster.active_applications),
                'completed_count': len(cluster.completed_applications),
                'active_applications': active_apps
            }
        }
    
    def optimize_resource_allocation(self, cluster_id: str) -> Dict[str, Any]:
        """
        优化资源分配
        
        Args:
            cluster_id: 集群ID
            
        Returns:
            Dict[str, Any]: 优化建议
        """
        if cluster_id not in self.clusters:
            return {'status': 'error', 'message': f'Cluster {cluster_id} not found'}
        
        cluster = self.clusters[cluster_id]
        usage = self.resource_usage[cluster_id]
        
        recommendations = []
        
        # CPU利用率分析
        if usage['cpu_utilization'] > 90:
            recommendations.append({
                'type': 'resource_shortage',
                'priority': 'high',
                'message': 'CPU利用率过高,建议增加worker节点或减少并发作业',
                'suggested_action': 'scale_out'
            })
        elif usage['cpu_utilization'] < 30:
            recommendations.append({
                'type': 'resource_waste',
                'priority': 'medium',
                'message': 'CPU利用率较低,可以考虑减少资源或增加作业并发度',
                'suggested_action': 'scale_in_or_increase_concurrency'
            })
        
        # 内存利用率分析
        if usage['memory_utilization'] > 85:
            recommendations.append({
                'type': 'memory_pressure',
                'priority': 'high',
                'message': '内存使用率过高,可能导致GC压力,建议优化内存配置',
                'suggested_action': 'optimize_memory_config'
            })
        
        # 负载均衡分析
        active_apps = len(cluster.active_applications)
        worker_count = len(cluster.worker_nodes)
        if active_apps > worker_count * 2:
            recommendations.append({
                'type': 'load_imbalance',
                'priority': 'medium',
                'message': '作业数量较多,建议检查负载均衡配置',
                'suggested_action': 'check_load_balancing'
            })
        
        # 配置优化建议
        optimization_suggestions = {
            'executor_memory': self._suggest_executor_memory(cluster, usage),
            'executor_cores': self._suggest_executor_cores(cluster, usage),
            'dynamic_allocation': usage['cpu_utilization'] < 50,
            'speculation': usage['cpu_utilization'] > 70
        }
        
        return {
            'cluster_id': cluster_id,
            'current_utilization': {
                'cpu': usage['cpu_utilization'],
                'memory': usage['memory_utilization']
            },
            'recommendations': recommendations,
            'optimization_suggestions': optimization_suggestions,
            'analysis_time': datetime.now().isoformat()
        }
    
    def _suggest_executor_memory(self, cluster: SparkCluster, usage: Dict[str, Any]) -> str:
        """
        建议executor内存配置
        
        Args:
            cluster: Spark集群
            usage: 资源使用情况
            
        Returns:
            str: 建议的内存配置
        """
        # 基于集群总内存和worker数量的简单计算
        memory_per_worker = cluster.total_memory_gb / len(cluster.worker_nodes)
        suggested_memory = max(1, int(memory_per_worker * 0.8 / 2))  # 保留20%给系统,每个worker运行2个executor
        return f"{suggested_memory}g"
    
    def _suggest_executor_cores(self, cluster: SparkCluster, usage: Dict[str, Any]) -> int:
        """
        建议executor核心数配置
        
        Args:
            cluster: Spark集群
            usage: 资源使用情况
            
        Returns:
            int: 建议的核心数
        """
        # 基于集群总核心数和worker数量的简单计算
        cores_per_worker = cluster.total_cores / len(cluster.worker_nodes)
        suggested_cores = max(1, int(cores_per_worker / 2))  # 每个worker运行2个executor
        return min(suggested_cores, 5)  # 限制最大核心数为5

# 使用示例
if __name__ == "__main__":
    # 创建Spark集群管理器
    spark_manager = SparkClusterManager()
    
    print("=== Spark集群管理示例 ===")
    
    # 创建集群
    print("\n=== 创建集群 ===")
    cluster_config = {
        'cluster_id': 'spark-cluster-1',
        'master_url': 'spark://master:7077',
        'total_cores': 32,
        'total_memory_gb': 128.0,
        'worker_nodes': ['worker1', 'worker2', 'worker3', 'worker4']
    }
    
    cluster_result = spark_manager.create_cluster(cluster_config)
    print(f"集群创建结果: {cluster_result}")
    
    # 提交应用
    print("\n=== 提交应用 ===")
    app_config = {
        'cluster_id': 'spark-cluster-1',
        'name': 'Data Processing Job',
        'app_type': 'BATCH',
        'driver_memory': '2g',
        'executor_memory': '4g',
        'executor_cores': 2,
        'num_executors': 4,
        'main_class': 'com.example.DataProcessor',
        'jar_file': '/path/to/app.jar'
    }
    
    app_result = spark_manager.submit_application(app_config)
    print(f"应用提交结果: {app_result}")
    
    if app_result['status'] == 'success':
        app_id = app_result['app_id']
        
        # 创建作业
        print("\n=== 创建作业 ===")
        job_config = {
            'app_id': app_id,
            'name': 'ETL Job',
            'input_data_size': 10.0  # 10GB
        }
        
        job_result = spark_manager.create_spark_job(job_config)
        print(f"作业创建结果: {job_result}")
        
        if job_result['status'] == 'success':
            job_id = job_result['job_id']
            
            # 启动作业
            print("\n=== 启动作业 ===")
            start_result = spark_manager.start_job(job_id)
            print(f"作业启动结果: {start_result}")
            
            # 获取作业状态
            print("\n=== 作业状态 ===")
            job_status = spark_manager.get_job_status(job_id)
            print(f"作业状态: {job_status['status']}")
            print(f"阶段数: {len(job_status['stages'])}")
            print(f"预估时间: {start_result.get('estimated_duration')}")
            
            # 完成作业
            print("\n=== 完成作业 ===")
            complete_result = spark_manager.complete_job(job_id, success=True)
            print(f"作业完成结果: {complete_result}")
    
    # 获取集群状态
    print("\n=== 集群状态 ===")
    cluster_status = spark_manager.get_cluster_status('spark-cluster-1')
    print(f"集群ID: {cluster_status['cluster_id']}")
    print(f"CPU利用率: {cluster_status['resources']['cpu_utilization']}%")
    print(f"内存利用率: {cluster_status['resources']['memory_utilization']}%")
    print(f"活跃应用数: {cluster_status['applications']['active_count']}")
    
    # 资源优化建议
    print("\n=== 资源优化建议 ===")
    optimization = spark_manager.optimize_resource_allocation('spark-cluster-1')
    print(f"当前CPU利用率: {optimization['current_utilization']['cpu']}%")
    print(f"当前内存利用率: {optimization['current_utilization']['memory']}%")
    
    print("\n优化建议:")
    for rec in optimization['recommendations']:
        print(f"  - {rec['type']} ({rec['priority']}): {rec['message']}")
    
    print("\n配置建议:")
    suggestions = optimization['optimization_suggestions']
    print(f"  - 建议executor内存: {suggestions['executor_memory']}")
    print(f"  - 建议executor核心数: {suggestions['executor_cores']}")
    print(f"  - 启用动态分配: {suggestions['dynamic_allocation']}")
     print(f"  - 启用推测执行: {suggestions['speculation']}")

3.2 Apache Flink详解

from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
import json
import time

class FlinkJobStatus(Enum):
    """Flink作业状态"""
    CREATED = "已创建"
    RUNNING = "运行中"
    FINISHED = "已完成"
    CANCELED = "已取消"
    FAILED = "失败"
    RESTARTING = "重启中"

class FlinkJobType(Enum):
    """Flink作业类型"""
    STREAMING = "流处理"
    BATCH = "批处理"
    CEP = "复杂事件处理"
    MACHINE_LEARNING = "机器学习"
    GRAPH = "图处理"

class CheckpointStatus(Enum):
    """检查点状态"""
    COMPLETED = "已完成"
    IN_PROGRESS = "进行中"
    FAILED = "失败"

@dataclass
class FlinkCheckpoint:
    """Flink检查点"""
    checkpoint_id: int
    timestamp: datetime
    status: CheckpointStatus
    size_bytes: int
    duration_ms: int
    external_path: Optional[str] = None

@dataclass
class FlinkTaskManager:
    """Flink任务管理器"""
    tm_id: str
    host: str
    port: int
    slots_total: int
    slots_available: int
    memory_mb: int
    cpu_cores: int
    network_memory_mb: int
    managed_memory_mb: int

@dataclass
class FlinkJob:
    """Flink作业"""
    job_id: str
    name: str
    job_type: FlinkJobType
    status: FlinkJobStatus
    start_time: datetime
    end_time: Optional[datetime]
    parallelism: int
    max_parallelism: int
    checkpointing_enabled: bool
    checkpoint_interval_ms: int
    savepoint_path: Optional[str]
    restart_strategy: str
    metrics: Dict[str, Any]
    checkpoints: List[FlinkCheckpoint]

class FlinkClusterManager:
    """
    Flink集群管理器
    """
    
    def __init__(self):
        self.job_manager_host = "localhost"
        self.job_manager_port = 8081
        self.task_managers = {}
        self.jobs = {}
        self.checkpoints = {}
        self.savepoints = {}
        self.cluster_metrics = {
            'total_slots': 0,
            'available_slots': 0,
            'running_jobs': 0,
            'completed_jobs': 0,
            'failed_jobs': 0
        }
    
    def add_task_manager(self, tm_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        添加任务管理器
        
        Args:
            tm_config: 任务管理器配置
            
        Returns:
            Dict[str, Any]: 添加结果
        """
        tm = FlinkTaskManager(
            tm_id=tm_config['tm_id'],
            host=tm_config.get('host', 'localhost'),
            port=tm_config.get('port', 6121),
            slots_total=tm_config.get('slots_total', 4),
            slots_available=tm_config.get('slots_total', 4),
            memory_mb=tm_config.get('memory_mb', 4096),
            cpu_cores=tm_config.get('cpu_cores', 4),
            network_memory_mb=tm_config.get('network_memory_mb', 512),
            managed_memory_mb=tm_config.get('managed_memory_mb', 1024)
        )
        
        self.task_managers[tm.tm_id] = tm
        
        # 更新集群指标
        self.cluster_metrics['total_slots'] += tm.slots_total
        self.cluster_metrics['available_slots'] += tm.slots_available
        
        return {
            'status': 'success',
            'tm_id': tm.tm_id,
            'host': tm.host,
            'slots': tm.slots_total,
            'memory_mb': tm.memory_mb
        }
    
    def submit_job(self, job_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        提交Flink作业
        
        Args:
            job_config: 作业配置
            
        Returns:
            Dict[str, Any]: 提交结果
        """
        # 检查资源是否足够
        required_slots = job_config.get('parallelism', 1)
        if self.cluster_metrics['available_slots'] < required_slots:
            return {
                'status': 'error',
                'message': f'Insufficient slots. Required: {required_slots}, Available: {self.cluster_metrics["available_slots"]}'
            }
        
        # 创建作业
        job_id = f"job-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{len(self.jobs)}"
        
        job = FlinkJob(
            job_id=job_id,
            name=job_config.get('name', 'Flink Job'),
            job_type=FlinkJobType(job_config.get('job_type', 'STREAMING')),
            status=FlinkJobStatus.CREATED,
            start_time=datetime.now(),
            end_time=None,
            parallelism=job_config.get('parallelism', 1),
            max_parallelism=job_config.get('max_parallelism', 128),
            checkpointing_enabled=job_config.get('checkpointing_enabled', True),
            checkpoint_interval_ms=job_config.get('checkpoint_interval_ms', 60000),
            savepoint_path=job_config.get('savepoint_path'),
            restart_strategy=job_config.get('restart_strategy', 'fixed-delay'),
            metrics={},
            checkpoints=[]
        )
        
        self.jobs[job_id] = job
        
        # 分配资源
        self._allocate_slots(job)
        
        return {
            'status': 'success',
            'job_id': job_id,
            'name': job.name,
            'parallelism': job.parallelism,
            'submit_time': job.start_time.isoformat()
        }
    
    def _allocate_slots(self, job: FlinkJob):
        """
        为作业分配slot
        
        Args:
            job: Flink作业
        """
        slots_needed = job.parallelism
        allocated = 0
        
        for tm in self.task_managers.values():
            if allocated >= slots_needed:
                break
            
            slots_to_allocate = min(tm.slots_available, slots_needed - allocated)
            tm.slots_available -= slots_to_allocate
            allocated += slots_to_allocate
        
        # 更新集群指标
        self.cluster_metrics['available_slots'] -= slots_needed
    
    def start_job(self, job_id: str) -> Dict[str, Any]:
        """
        启动作业
        
        Args:
            job_id: 作业ID
            
        Returns:
            Dict[str, Any]: 启动结果
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        job.status = FlinkJobStatus.RUNNING
        
        # 更新集群指标
        self.cluster_metrics['running_jobs'] += 1
        
        # 如果启用检查点,创建初始检查点
        if job.checkpointing_enabled:
            self._create_checkpoint(job)
        
        return {
            'status': 'success',
            'job_id': job_id,
            'status_change': f'{FlinkJobStatus.CREATED.value} -> {FlinkJobStatus.RUNNING.value}',
            'start_time': job.start_time.isoformat()
        }
    
    def _create_checkpoint(self, job: FlinkJob) -> FlinkCheckpoint:
        """
        创建检查点
        
        Args:
            job: Flink作业
            
        Returns:
            FlinkCheckpoint: 检查点
        """
        checkpoint_id = len(job.checkpoints)
        
        checkpoint = FlinkCheckpoint(
            checkpoint_id=checkpoint_id,
            timestamp=datetime.now(),
            status=CheckpointStatus.IN_PROGRESS,
            size_bytes=0,
            duration_ms=0
        )
        
        # 模拟检查点完成
        checkpoint.status = CheckpointStatus.COMPLETED
        checkpoint.size_bytes = job.parallelism * 1024 * 1024  # 每个并行度1MB
        checkpoint.duration_ms = job.parallelism * 100  # 每个并行度100ms
        checkpoint.external_path = f"/checkpoints/{job.job_id}/chk-{checkpoint_id}"
        
        job.checkpoints.append(checkpoint)
        
        return checkpoint
    
    def create_savepoint(self, job_id: str, target_directory: Optional[str] = None) -> Dict[str, Any]:
        """
        创建保存点
        
        Args:
            job_id: 作业ID
            target_directory: 目标目录
            
        Returns:
            Dict[str, Any]: 创建结果
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        
        if job.status != FlinkJobStatus.RUNNING:
            return {'status': 'error', 'message': f'Job {job_id} is not running'}
        
        savepoint_id = f"savepoint-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        savepoint_path = target_directory or f"/savepoints/{job_id}/{savepoint_id}"
        
        savepoint = {
            'savepoint_id': savepoint_id,
            'job_id': job_id,
            'path': savepoint_path,
            'timestamp': datetime.now(),
            'size_bytes': job.parallelism * 2 * 1024 * 1024,  # 每个并行度2MB
            'trigger_type': 'manual'
        }
        
        self.savepoints[savepoint_id] = savepoint
        job.savepoint_path = savepoint_path
        
        return {
            'status': 'success',
            'savepoint_id': savepoint_id,
            'path': savepoint_path,
            'size_bytes': savepoint['size_bytes'],
            'timestamp': savepoint['timestamp'].isoformat()
        }
    
    def stop_job(self, job_id: str, with_savepoint: bool = False, 
                 target_directory: Optional[str] = None) -> Dict[str, Any]:
        """
        停止作业
        
        Args:
            job_id: 作业ID
            with_savepoint: 是否创建保存点
            target_directory: 保存点目录
            
        Returns:
            Dict[str, Any]: 停止结果
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        
        result = {
            'status': 'success',
            'job_id': job_id,
            'previous_status': job.status.value
        }
        
        # 如果需要创建保存点
        if with_savepoint and job.status == FlinkJobStatus.RUNNING:
            savepoint_result = self.create_savepoint(job_id, target_directory)
            if savepoint_result['status'] == 'success':
                result['savepoint'] = savepoint_result
            else:
                return savepoint_result
        
        # 停止作业
        job.status = FlinkJobStatus.FINISHED
        job.end_time = datetime.now()
        
        # 释放资源
        self._release_slots(job)
        
        # 更新集群指标
        if result['previous_status'] == FlinkJobStatus.RUNNING.value:
            self.cluster_metrics['running_jobs'] -= 1
        self.cluster_metrics['completed_jobs'] += 1
        
        result['end_time'] = job.end_time.isoformat()
        
        return result
    
    def _release_slots(self, job: FlinkJob):
        """
        释放作业占用的slot
        
        Args:
            job: Flink作业
        """
        slots_to_release = job.parallelism
        released = 0
        
        for tm in self.task_managers.values():
            if released >= slots_to_release:
                break
            
            slots_can_release = min(tm.slots_total - tm.slots_available, slots_to_release - released)
            tm.slots_available += slots_can_release
            released += slots_can_release
        
        # 更新集群指标
        self.cluster_metrics['available_slots'] += slots_to_release
    
    def cancel_job(self, job_id: str) -> Dict[str, Any]:
        """
        取消作业
        
        Args:
            job_id: 作业ID
            
        Returns:
            Dict[str, Any]: 取消结果
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        previous_status = job.status
        
        job.status = FlinkJobStatus.CANCELED
        job.end_time = datetime.now()
        
        # 释放资源
        self._release_slots(job)
        
        # 更新集群指标
        if previous_status == FlinkJobStatus.RUNNING:
            self.cluster_metrics['running_jobs'] -= 1
        self.cluster_metrics['failed_jobs'] += 1
        
        return {
            'status': 'success',
            'job_id': job_id,
            'previous_status': previous_status.value,
            'current_status': job.status.value,
            'end_time': job.end_time.isoformat()
        }
    
    def get_job_details(self, job_id: str) -> Dict[str, Any]:
        """
        获取作业详情
        
        Args:
            job_id: 作业ID
            
        Returns:
            Dict[str, Any]: 作业详情
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        
        # 计算运行时间
        if job.end_time:
            duration = job.end_time - job.start_time
        else:
            duration = datetime.now() - job.start_time
        
        # 获取最新检查点信息
        latest_checkpoint = None
        if job.checkpoints:
            latest_checkpoint = {
                'checkpoint_id': job.checkpoints[-1].checkpoint_id,
                'timestamp': job.checkpoints[-1].timestamp.isoformat(),
                'status': job.checkpoints[-1].status.value,
                'size_bytes': job.checkpoints[-1].size_bytes,
                'duration_ms': job.checkpoints[-1].duration_ms
            }
        
        result = {
            'job_id': job.job_id,
            'name': job.name,
            'job_type': job.job_type.value,
            'status': job.status.value,
            'start_time': job.start_time.isoformat(),
            'duration_seconds': int(duration.total_seconds()),
            'parallelism': job.parallelism,
            'max_parallelism': job.max_parallelism,
            'checkpointing': {
                'enabled': job.checkpointing_enabled,
                'interval_ms': job.checkpoint_interval_ms,
                'total_checkpoints': len(job.checkpoints),
                'latest_checkpoint': latest_checkpoint
            },
            'restart_strategy': job.restart_strategy
        }
        
        if job.end_time:
            result['end_time'] = job.end_time.isoformat()
        
        if job.savepoint_path:
            result['savepoint_path'] = job.savepoint_path
        
        return result
    
    def get_cluster_overview(self) -> Dict[str, Any]:
        """
        获取集群概览
        
        Returns:
            Dict[str, Any]: 集群概览
        """
        # 统计任务管理器信息
        tm_summary = {
            'total_task_managers': len(self.task_managers),
            'total_memory_mb': sum(tm.memory_mb for tm in self.task_managers.values()),
            'total_cpu_cores': sum(tm.cpu_cores for tm in self.task_managers.values()),
            'task_managers': []
        }
        
        for tm in self.task_managers.values():
            tm_summary['task_managers'].append({
                'tm_id': tm.tm_id,
                'host': tm.host,
                'slots_total': tm.slots_total,
                'slots_available': tm.slots_available,
                'slots_used': tm.slots_total - tm.slots_available,
                'memory_mb': tm.memory_mb,
                'cpu_cores': tm.cpu_cores
            })
        
        # 统计作业信息
        job_summary = {
            'total_jobs': len(self.jobs),
            'running_jobs': self.cluster_metrics['running_jobs'],
            'completed_jobs': self.cluster_metrics['completed_jobs'],
            'failed_jobs': self.cluster_metrics['failed_jobs'],
            'jobs_by_type': {}
        }
        
        # 按类型统计作业
        for job in self.jobs.values():
            job_type = job.job_type.value
            if job_type not in job_summary['jobs_by_type']:
                job_summary['jobs_by_type'][job_type] = 0
            job_summary['jobs_by_type'][job_type] += 1
        
        return {
            'job_manager': {
                'host': self.job_manager_host,
                'port': self.job_manager_port
            },
            'cluster_metrics': self.cluster_metrics,
            'task_managers': tm_summary,
            'jobs': job_summary,
            'timestamp': datetime.now().isoformat()
        }
    
    def get_job_metrics(self, job_id: str) -> Dict[str, Any]:
        """
        获取作业指标
        
        Args:
            job_id: 作业ID
            
        Returns:
            Dict[str, Any]: 作业指标
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        
        # 模拟一些指标数据
        if job.status == FlinkJobStatus.RUNNING:
            # 运行中的作业有实时指标
            metrics = {
                'records_in_per_second': job.parallelism * 1000,
                'records_out_per_second': job.parallelism * 950,
                'bytes_in_per_second': job.parallelism * 1024 * 100,
                'bytes_out_per_second': job.parallelism * 1024 * 95,
                'backpressure_ratio': 0.1,
                'cpu_utilization': 0.75,
                'memory_utilization': 0.65,
                'gc_time_ms_per_second': 50,
                'checkpoint_duration_ms': 500,
                'checkpoint_size_bytes': job.parallelism * 1024 * 1024
            }
        else:
            # 非运行状态的作业只有历史指标
            duration = (job.end_time - job.start_time).total_seconds() if job.end_time else 0
            metrics = {
                'total_records_processed': int(job.parallelism * 1000 * duration),
                'total_bytes_processed': int(job.parallelism * 1024 * 100 * duration),
                'average_processing_rate': job.parallelism * 1000 if duration > 0 else 0,
                'total_checkpoints': len(job.checkpoints),
                'successful_checkpoints': len([cp for cp in job.checkpoints if cp.status == CheckpointStatus.COMPLETED]),
                'failed_checkpoints': len([cp for cp in job.checkpoints if cp.status == CheckpointStatus.FAILED])
            }
        
        return {
            'job_id': job_id,
            'status': job.status.value,
            'metrics': metrics,
            'timestamp': datetime.now().isoformat()
        }
    
    def restart_job(self, job_id: str, from_savepoint: Optional[str] = None) -> Dict[str, Any]:
        """
        重启作业
        
        Args:
            job_id: 作业ID
            from_savepoint: 从保存点恢复
            
        Returns:
            Dict[str, Any]: 重启结果
        """
        if job_id not in self.jobs:
            return {'status': 'error', 'message': f'Job {job_id} not found'}
        
        job = self.jobs[job_id]
        
        # 检查作业状态
        if job.status == FlinkJobStatus.RUNNING:
            return {'status': 'error', 'message': f'Job {job_id} is already running'}
        
        # 检查资源
        if self.cluster_metrics['available_slots'] < job.parallelism:
            return {
                'status': 'error',
                'message': f'Insufficient slots. Required: {job.parallelism}, Available: {self.cluster_metrics["available_slots"]}'
            }
        
        # 重启作业
        job.status = FlinkJobStatus.RESTARTING
        
        # 分配资源
        self._allocate_slots(job)
        
        # 更新状态为运行中
        job.status = FlinkJobStatus.RUNNING
        job.start_time = datetime.now()
        job.end_time = None
        
        # 更新集群指标
        self.cluster_metrics['running_jobs'] += 1
        if job.status in [FlinkJobStatus.FAILED, FlinkJobStatus.CANCELED]:
            self.cluster_metrics['failed_jobs'] -= 1
        
        result = {
            'status': 'success',
            'job_id': job_id,
            'restart_time': job.start_time.isoformat(),
            'parallelism': job.parallelism
        }
        
        if from_savepoint:
            result['restored_from_savepoint'] = from_savepoint
        
        return result

# 使用示例
if __name__ == "__main__":
    # 创建Flink集群管理器
    flink_manager = FlinkClusterManager()
    
    print("=== Flink集群管理示例 ===")
    
    # 添加任务管理器
    print("\n=== 添加任务管理器 ===")
    for i in range(3):
        tm_config = {
            'tm_id': f'taskmanager-{i+1}',
            'host': f'worker{i+1}',
            'port': 6121 + i,
            'slots_total': 4,
            'memory_mb': 4096,
            'cpu_cores': 4
        }
        
        tm_result = flink_manager.add_task_manager(tm_config)
        print(f"任务管理器 {i+1}: {tm_result}")
    
    # 提交流处理作业
    print("\n=== 提交流处理作业 ===")
    streaming_job_config = {
        'name': 'Real-time Analytics',
        'job_type': 'STREAMING',
        'parallelism': 6,
        'max_parallelism': 128,
        'checkpointing_enabled': True,
        'checkpoint_interval_ms': 30000,
        'restart_strategy': 'fixed-delay'
    }
    
    job_result = flink_manager.submit_job(streaming_job_config)
    print(f"作业提交结果: {job_result}")
    
    if job_result['status'] == 'success':
        job_id = job_result['job_id']
        
        # 启动作业
        print("\n=== 启动作业 ===")
        start_result = flink_manager.start_job(job_id)
        print(f"作业启动结果: {start_result}")
        
        # 获取作业详情
        print("\n=== 作业详情 ===")
        job_details = flink_manager.get_job_details(job_id)
        print(f"作业名称: {job_details['name']}")
        print(f"作业状态: {job_details['status']}")
        print(f"并行度: {job_details['parallelism']}")
        print(f"检查点: 启用={job_details['checkpointing']['enabled']}, 间隔={job_details['checkpointing']['interval_ms']}ms")
        
        # 获取作业指标
        print("\n=== 作业指标 ===")
        metrics = flink_manager.get_job_metrics(job_id)
        print(f"输入速率: {metrics['metrics']['records_in_per_second']} records/sec")
        print(f"输出速率: {metrics['metrics']['records_out_per_second']} records/sec")
        print(f"CPU利用率: {metrics['metrics']['cpu_utilization']*100:.1f}%")
        print(f"内存利用率: {metrics['metrics']['memory_utilization']*100:.1f}%")
        
        # 创建保存点
        print("\n=== 创建保存点 ===")
        savepoint_result = flink_manager.create_savepoint(job_id)
        print(f"保存点创建结果: {savepoint_result}")
        
        # 停止作业(带保存点)
        print("\n=== 停止作业 ===")
        stop_result = flink_manager.stop_job(job_id, with_savepoint=True)
        print(f"作业停止结果: {stop_result}")
        
        # 重启作业
        print("\n=== 重启作业 ===")
        restart_result = flink_manager.restart_job(job_id)
        print(f"作业重启结果: {restart_result}")
    
    # 获取集群概览
    print("\n=== 集群概览 ===")
    cluster_overview = flink_manager.get_cluster_overview()
    print(f"任务管理器数量: {cluster_overview['task_managers']['total_task_managers']}")
    print(f"总slot数: {cluster_overview['cluster_metrics']['total_slots']}")
    print(f"可用slot数: {cluster_overview['cluster_metrics']['available_slots']}")
    print(f"运行中作业: {cluster_overview['cluster_metrics']['running_jobs']}")
    print(f"已完成作业: {cluster_overview['cluster_metrics']['completed_jobs']}")
    
    print("\n任务管理器详情:")
     for tm in cluster_overview['task_managers']['task_managers']:
         print(f"  {tm['tm_id']}: {tm['slots_used']}/{tm['slots_total']} slots used, {tm['memory_mb']}MB memory")
Logo

更多推荐