06-Hadoop生态系统组件(1)
Hadoop生态系统是一个围绕Hadoop核心构建的开源大数据解决方案集合,包含存储、处理、查询等各类组件。核心组件包括:HDFS(分布式存储)、HBase(NoSQL数据库)、Spark(统一处理引擎)、Flink(流处理)、Hive(SQL查询)等。这些组件各具特点,如HDFS适合大文件存储,Spark支持内存计算和多种处理模式,Flink专长于实时流处理。系统组件按成熟度分为孵化、稳定、成熟
·
1. 生态系统概述
1.1 Hadoop生态系统简介
Hadoop生态系统是一个庞大的开源软件集合,围绕Apache Hadoop核心组件构建,为大数据存储、处理、分析和管理提供了完整的解决方案。
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class ComponentCategory(Enum):
"""组件类别"""
STORAGE = "存储"
PROCESSING = "处理"
COORDINATION = "协调"
WORKFLOW = "工作流"
MONITORING = "监控"
SECURITY = "安全"
DATA_INGESTION = "数据摄取"
QUERY_ENGINE = "查询引擎"
MACHINE_LEARNING = "机器学习"
STREAMING = "流处理"
class MaturityLevel(Enum):
"""成熟度级别"""
INCUBATING = "孵化中"
STABLE = "稳定"
MATURE = "成熟"
DEPRECATED = "已弃用"
@dataclass
class EcosystemComponent:
"""生态系统组件"""
name: str
category: ComponentCategory
description: str
key_features: List[str]
use_cases: List[str]
maturity: MaturityLevel
dependencies: List[str]
alternatives: List[str]
learning_curve: str # 学习曲线:简单、中等、困难
popularity_score: int # 流行度评分 1-10
class HadoopEcosystemGuide:
"""
Hadoop生态系统指南
"""
def __init__(self):
self.components = self._initialize_components()
self.integration_patterns = self._initialize_integration_patterns()
self.architecture_templates = self._initialize_architecture_templates()
def _initialize_components(self) -> List[EcosystemComponent]:
"""
初始化生态系统组件
Returns:
List[EcosystemComponent]: 组件列表
"""
components = [
# 存储组件
EcosystemComponent(
name="HDFS",
category=ComponentCategory.STORAGE,
description="Hadoop分布式文件系统,提供高容错性的分布式存储",
key_features=[
"分布式存储",
"高容错性",
"高吞吐量",
"数据复制",
"大文件优化"
],
use_cases=[
"大数据存储",
"数据湖构建",
"批处理数据源",
"数据归档"
],
maturity=MaturityLevel.MATURE,
dependencies=["Java"],
alternatives=["Amazon S3", "Google Cloud Storage", "Azure Blob Storage"],
learning_curve="中等",
popularity_score=9
),
EcosystemComponent(
name="HBase",
category=ComponentCategory.STORAGE,
description="基于HDFS的分布式、面向列的NoSQL数据库",
key_features=[
"列式存储",
"实时读写",
"自动分片",
"强一致性",
"与Hadoop集成"
],
use_cases=[
"实时数据访问",
"时序数据存储",
"大规模结构化数据",
"在线服务后端"
],
maturity=MaturityLevel.MATURE,
dependencies=["HDFS", "ZooKeeper"],
alternatives=["Cassandra", "MongoDB", "DynamoDB"],
learning_curve="困难",
popularity_score=7
),
# 处理组件
EcosystemComponent(
name="Apache Spark",
category=ComponentCategory.PROCESSING,
description="统一的大数据处理引擎,支持批处理、流处理、机器学习和图计算",
key_features=[
"内存计算",
"多语言支持",
"统一API",
"流批一体",
"机器学习库"
],
use_cases=[
"大数据分析",
"实时流处理",
"机器学习",
"ETL处理",
"图计算"
],
maturity=MaturityLevel.MATURE,
dependencies=["Scala", "Java", "Python"],
alternatives=["Apache Flink", "Apache Storm", "MapReduce"],
learning_curve="中等",
popularity_score=10
),
EcosystemComponent(
name="Apache Flink",
category=ComponentCategory.STREAMING,
description="分布式流处理引擎,提供低延迟、高吞吐量的流处理能力",
key_features=[
"真正的流处理",
"事件时间处理",
"状态管理",
"容错机制",
"精确一次语义"
],
use_cases=[
"实时数据处理",
"事件驱动应用",
"实时分析",
"复杂事件处理"
],
maturity=MaturityLevel.STABLE,
dependencies=["Java", "Scala"],
alternatives=["Apache Spark Streaming", "Apache Storm", "Apache Kafka Streams"],
learning_curve="困难",
popularity_score=8
),
# 查询引擎
EcosystemComponent(
name="Apache Hive",
category=ComponentCategory.QUERY_ENGINE,
description="基于Hadoop的数据仓库软件,提供SQL查询接口",
key_features=[
"SQL接口",
"元数据管理",
"多种存储格式",
"分区支持",
"UDF支持"
],
use_cases=[
"数据仓库",
"批量数据分析",
"ETL处理",
"报表生成"
],
maturity=MaturityLevel.MATURE,
dependencies=["HDFS", "MapReduce/Tez/Spark"],
alternatives=["Apache Impala", "Presto", "Apache Drill"],
learning_curve="简单",
popularity_score=8
),
EcosystemComponent(
name="Apache Impala",
category=ComponentCategory.QUERY_ENGINE,
description="高性能的分布式SQL查询引擎,专为Hadoop设计",
key_features=[
"MPP架构",
"内存计算",
"实时查询",
"标准SQL",
"与Hadoop集成"
],
use_cases=[
"交互式查询",
"实时分析",
"商业智能",
"即席查询"
],
maturity=MaturityLevel.STABLE,
dependencies=["HDFS", "HBase", "Hive Metastore"],
alternatives=["Presto", "Apache Drill", "Spark SQL"],
learning_curve="中等",
popularity_score=7
),
# 协调组件
EcosystemComponent(
name="Apache ZooKeeper",
category=ComponentCategory.COORDINATION,
description="分布式协调服务,为分布式应用提供配置管理、命名服务等",
key_features=[
"配置管理",
"命名服务",
"分布式锁",
"集群管理",
"通知机制"
],
use_cases=[
"配置中心",
"服务发现",
"分布式锁",
"集群协调",
"主从选举"
],
maturity=MaturityLevel.MATURE,
dependencies=["Java"],
alternatives=["Apache Curator", "etcd", "Consul"],
learning_curve="中等",
popularity_score=8
),
# 数据摄取
EcosystemComponent(
name="Apache Kafka",
category=ComponentCategory.DATA_INGESTION,
description="分布式流处理平台,提供高吞吐量的消息队列服务",
key_features=[
"高吞吐量",
"持久化存储",
"分布式架构",
"实时处理",
"多消费者支持"
],
use_cases=[
"消息队列",
"日志收集",
"事件流",
"数据管道",
"实时数据传输"
],
maturity=MaturityLevel.MATURE,
dependencies=["Java", "ZooKeeper"],
alternatives=["Apache Pulsar", "RabbitMQ", "Amazon Kinesis"],
learning_curve="中等",
popularity_score=9
),
EcosystemComponent(
name="Apache Flume",
category=ComponentCategory.DATA_INGESTION,
description="分布式日志收集系统,专为Hadoop环境设计",
key_features=[
"可靠数据传输",
"灵活配置",
"多种数据源",
"事务支持",
"负载均衡"
],
use_cases=[
"日志收集",
"数据摄取",
"实时数据传输",
"ETL前端"
],
maturity=MaturityLevel.STABLE,
dependencies=["Java"],
alternatives=["Apache Kafka", "Logstash", "Filebeat"],
learning_curve="简单",
popularity_score=6
),
# 工作流
EcosystemComponent(
name="Apache Oozie",
category=ComponentCategory.WORKFLOW,
description="Hadoop作业调度系统,管理复杂的数据处理工作流",
key_features=[
"工作流调度",
"依赖管理",
"错误处理",
"Web界面",
"多种作业类型"
],
use_cases=[
"ETL流程",
"批处理调度",
"数据管道",
"作业编排"
],
maturity=MaturityLevel.STABLE,
dependencies=["Hadoop"],
alternatives=["Apache Airflow", "Azkaban", "Luigi"],
learning_curve="中等",
popularity_score=5
),
# 监控
EcosystemComponent(
name="Apache Ambari",
category=ComponentCategory.MONITORING,
description="Hadoop集群管理和监控平台",
key_features=[
"集群管理",
"服务监控",
"配置管理",
"告警系统",
"Web界面"
],
use_cases=[
"集群部署",
"运维监控",
"配置管理",
"性能监控"
],
maturity=MaturityLevel.STABLE,
dependencies=["Python", "Java"],
alternatives=["Cloudera Manager", "Hortonworks Data Platform"],
learning_curve="中等",
popularity_score=6
),
# 机器学习
EcosystemComponent(
name="Apache Mahout",
category=ComponentCategory.MACHINE_LEARNING,
description="可扩展的机器学习库,专为大数据环境设计",
key_features=[
"分布式算法",
"可扩展性",
"多种算法",
"与Hadoop集成",
"数学库"
],
use_cases=[
"推荐系统",
"聚类分析",
"分类算法",
"协同过滤"
],
maturity=MaturityLevel.STABLE,
dependencies=["Hadoop", "Spark"],
alternatives=["Spark MLlib", "TensorFlow", "Scikit-learn"],
learning_curve="困难",
popularity_score=4
)
]
return components
def _initialize_integration_patterns(self) -> Dict[str, Dict[str, Any]]:
"""
初始化集成模式
Returns:
Dict[str, Dict[str, Any]]: 集成模式
"""
patterns = {
"Lambda架构": {
"description": "结合批处理和流处理的混合架构",
"components": ["Kafka", "Spark", "HBase", "HDFS"],
"use_cases": ["实时分析", "历史数据处理", "低延迟查询"],
"advantages": ["容错性强", "支持复杂查询", "数据一致性"],
"disadvantages": ["复杂度高", "维护成本大", "数据重复"]
},
"Kappa架构": {
"description": "纯流处理架构,统一批处理和流处理",
"components": ["Kafka", "Flink", "Elasticsearch"],
"use_cases": ["实时处理", "事件驱动", "简化架构"],
"advantages": ["架构简单", "实时性好", "维护容易"],
"disadvantages": ["历史数据处理复杂", "状态管理挑战"]
},
"数据湖架构": {
"description": "集中存储结构化和非结构化数据的架构",
"components": ["HDFS", "Spark", "Hive", "HBase"],
"use_cases": ["数据存储", "数据探索", "机器学习"],
"advantages": ["灵活性高", "成本低", "支持多种数据类型"],
"disadvantages": ["数据质量挑战", "治理复杂"]
},
"现代数据栈": {
"description": "云原生的现代数据处理架构",
"components": ["Kafka", "Spark", "Delta Lake", "Kubernetes"],
"use_cases": ["云端数据处理", "DevOps集成", "弹性扩展"],
"advantages": ["云原生", "弹性扩展", "运维简化"],
"disadvantages": ["云厂商绑定", "成本控制"]
}
}
return patterns
def _initialize_architecture_templates(self) -> Dict[str, Dict[str, Any]]:
"""
初始化架构模板
Returns:
Dict[str, Dict[str, Any]]: 架构模板
"""
templates = {
"基础大数据平台": {
"description": "适合初学者的基础Hadoop平台",
"core_components": ["HDFS", "YARN", "MapReduce", "Hive"],
"optional_components": ["HBase", "ZooKeeper", "Oozie"],
"complexity": "简单",
"use_cases": ["数据仓库", "批处理", "报表分析"],
"deployment_size": "小型(3-10节点)"
},
"实时分析平台": {
"description": "支持实时数据处理和分析的平台",
"core_components": ["Kafka", "Spark Streaming", "HBase", "Elasticsearch"],
"optional_components": ["Flink", "Redis", "Grafana"],
"complexity": "中等",
"use_cases": ["实时监控", "事件处理", "在线推荐"],
"deployment_size": "中型(10-50节点)"
},
"机器学习平台": {
"description": "支持大规模机器学习的数据平台",
"core_components": ["Spark", "HDFS", "Jupyter", "MLflow"],
"optional_components": ["Kubeflow", "TensorFlow", "PyTorch"],
"complexity": "困难",
"use_cases": ["模型训练", "特征工程", "模型部署"],
"deployment_size": "大型(50+节点)"
},
"数据湖平台": {
"description": "统一存储和处理多种数据类型的平台",
"core_components": ["HDFS", "Spark", "Hive", "Presto"],
"optional_components": ["Delta Lake", "Apache Iceberg", "Ranger"],
"complexity": "中等",
"use_cases": ["数据探索", "自助分析", "数据科学"],
"deployment_size": "中大型(20-100节点)"
}
}
return templates
def get_components_by_category(self, category: ComponentCategory) -> List[EcosystemComponent]:
"""
按类别获取组件
Args:
category: 组件类别
Returns:
List[EcosystemComponent]: 组件列表
"""
return [comp for comp in self.components if comp.category == category]
def get_component_by_name(self, name: str) -> Optional[EcosystemComponent]:
"""
按名称获取组件
Args:
name: 组件名称
Returns:
Optional[EcosystemComponent]: 组件信息
"""
return next((comp for comp in self.components if comp.name == name), None)
def recommend_components(self, use_case: str, complexity_preference: str = "中等") -> List[Dict[str, Any]]:
"""
根据用例推荐组件
Args:
use_case: 使用场景
complexity_preference: 复杂度偏好
Returns:
List[Dict[str, Any]]: 推荐组件列表
"""
recommendations = []
for component in self.components:
# 检查用例匹配
use_case_match = any(use_case.lower() in uc.lower() for uc in component.use_cases)
# 检查复杂度匹配
complexity_match = component.learning_curve == complexity_preference
if use_case_match:
score = component.popularity_score
if complexity_match:
score += 2
recommendations.append({
'component': component,
'score': score,
'reason': f"适用于{use_case},学习曲线{component.learning_curve}"
})
# 按评分排序
recommendations.sort(key=lambda x: x['score'], reverse=True)
return recommendations[:5] # 返回前5个推荐
def generate_architecture_recommendation(self, requirements: Dict[str, Any]) -> Dict[str, Any]:
"""
生成架构推荐
Args:
requirements: 需求描述
Returns:
Dict[str, Any]: 架构推荐
"""
use_cases = requirements.get('use_cases', [])
data_volume = requirements.get('data_volume', 'medium') # small, medium, large
real_time_requirement = requirements.get('real_time', False)
complexity_tolerance = requirements.get('complexity', '中等')
recommended_components = []
# 基础组件(总是需要)
recommended_components.extend(['HDFS', 'YARN'])
# 根据用例添加组件
if 'batch_processing' in use_cases or 'data_warehouse' in use_cases:
recommended_components.extend(['Hive', 'Spark'])
if real_time_requirement:
recommended_components.extend(['Kafka', 'Spark Streaming'])
if complexity_tolerance == '困难':
recommended_components.append('Flink')
if 'machine_learning' in use_cases:
recommended_components.extend(['Spark', 'Jupyter'])
if 'nosql' in use_cases or 'real_time_access' in use_cases:
recommended_components.append('HBase')
if len(recommended_components) > 5: # 复杂系统需要协调
recommended_components.append('ZooKeeper')
# 选择架构模板
template = None
if real_time_requirement:
template = self.architecture_templates['实时分析平台']
elif 'machine_learning' in use_cases:
template = self.architecture_templates['机器学习平台']
elif data_volume == 'large':
template = self.architecture_templates['数据湖平台']
else:
template = self.architecture_templates['基础大数据平台']
return {
'recommended_components': list(set(recommended_components)),
'architecture_template': template,
'integration_pattern': self._suggest_integration_pattern(requirements),
'deployment_considerations': self._generate_deployment_considerations(requirements),
'learning_path': self._generate_learning_path(recommended_components)
}
def _suggest_integration_pattern(self, requirements: Dict[str, Any]) -> Dict[str, Any]:
"""
建议集成模式
Args:
requirements: 需求描述
Returns:
Dict[str, Any]: 集成模式建议
"""
if requirements.get('real_time', False) and requirements.get('batch_processing', False):
return self.integration_patterns['Lambda架构']
elif requirements.get('real_time', False):
return self.integration_patterns['Kappa架构']
elif requirements.get('data_exploration', False):
return self.integration_patterns['数据湖架构']
else:
return self.integration_patterns['现代数据栈']
def _generate_deployment_considerations(self, requirements: Dict[str, Any]) -> List[str]:
"""
生成部署考虑事项
Args:
requirements: 需求描述
Returns:
List[str]: 部署考虑事项
"""
considerations = []
data_volume = requirements.get('data_volume', 'medium')
if data_volume == 'large':
considerations.append("考虑使用SSD存储提高I/O性能")
considerations.append("规划足够的网络带宽")
considerations.append("考虑数据分层存储策略")
if requirements.get('real_time', False):
considerations.append("优化网络延迟")
considerations.append("配置足够的内存")
considerations.append("考虑使用SSD作为缓存")
if requirements.get('high_availability', False):
considerations.append("部署多个数据中心")
considerations.append("配置组件高可用")
considerations.append("建立灾备方案")
considerations.extend([
"制定监控和告警策略",
"规划容量增长",
"建立安全策略",
"制定备份和恢复计划"
])
return considerations
def _generate_learning_path(self, components: List[str]) -> List[Dict[str, str]]:
"""
生成学习路径
Args:
components: 组件列表
Returns:
List[Dict[str, str]]: 学习路径
"""
learning_order = {
'HDFS': 1,
'YARN': 2,
'MapReduce': 3,
'Hive': 4,
'Spark': 5,
'HBase': 6,
'Kafka': 7,
'ZooKeeper': 8,
'Flink': 9
}
# 按学习顺序排序
sorted_components = sorted(
[comp for comp in components if comp in learning_order],
key=lambda x: learning_order.get(x, 999)
)
learning_path = []
for i, comp in enumerate(sorted_components, 1):
component_obj = self.get_component_by_name(comp)
if component_obj:
learning_path.append({
'step': i,
'component': comp,
'difficulty': component_obj.learning_curve,
'estimated_time': self._estimate_learning_time(component_obj.learning_curve),
'prerequisites': component_obj.dependencies
})
return learning_path
def _estimate_learning_time(self, difficulty: str) -> str:
"""
估算学习时间
Args:
difficulty: 难度级别
Returns:
str: 估算学习时间
"""
time_mapping = {
'简单': '1-2周',
'中等': '2-4周',
'困难': '4-8周'
}
return time_mapping.get(difficulty, '2-4周')
def generate_ecosystem_report(self) -> Dict[str, Any]:
"""
生成生态系统报告
Returns:
Dict[str, Any]: 生态系统报告
"""
category_stats = {}
for category in ComponentCategory:
components = self.get_components_by_category(category)
category_stats[category.value] = {
'count': len(components),
'components': [comp.name for comp in components],
'avg_popularity': sum(comp.popularity_score for comp in components) / len(components) if components else 0
}
maturity_stats = {}
for maturity in MaturityLevel:
components = [comp for comp in self.components if comp.maturity == maturity]
maturity_stats[maturity.value] = len(components)
return {
'total_components': len(self.components),
'category_distribution': category_stats,
'maturity_distribution': maturity_stats,
'top_components': sorted(
self.components,
key=lambda x: x.popularity_score,
reverse=True
)[:5],
'integration_patterns': list(self.integration_patterns.keys()),
'architecture_templates': list(self.architecture_templates.keys())
}
# 使用示例
if __name__ == "__main__":
# 创建生态系统指南
ecosystem = HadoopEcosystemGuide()
print("=== Hadoop生态系统组件指南 ===")
# 按类别显示组件
print("\n=== 按类别显示组件 ===")
for category in ComponentCategory:
components = ecosystem.get_components_by_category(category)
if components:
print(f"\n{category.value}:")
for comp in components:
print(f" - {comp.name}: {comp.description}")
# 组件推荐示例
print("\n=== 组件推荐示例 ===")
recommendations = ecosystem.recommend_components("实时处理", "中等")
print("针对'实时处理'用例的推荐:")
for rec in recommendations:
print(f" {rec['component'].name} (评分: {rec['score']}) - {rec['reason']}")
# 架构推荐示例
print("\n=== 架构推荐示例 ===")
requirements = {
'use_cases': ['batch_processing', 'real_time'],
'data_volume': 'large',
'real_time': True,
'complexity': '中等',
'high_availability': True
}
arch_rec = ecosystem.generate_architecture_recommendation(requirements)
print("推荐组件:", arch_rec['recommended_components'])
print("架构模板:", arch_rec['architecture_template']['description'])
print("集成模式:", arch_rec['integration_pattern']['description'])
print("\n部署考虑事项:")
for consideration in arch_rec['deployment_considerations']:
print(f" - {consideration}")
print("\n学习路径:")
for step in arch_rec['learning_path']:
print(f" {step['step']}. {step['component']} ({step['difficulty']}, 预计{step['estimated_time']})")
# 生态系统报告
print("\n=== 生态系统报告 ===")
report = ecosystem.generate_ecosystem_report()
print(f"总组件数: {report['total_components']}")
print("\n类别分布:")
for category, stats in report['category_distribution'].items():
print(f" {category}: {stats['count']}个组件 (平均流行度: {stats['avg_popularity']:.1f})")
print("\n成熟度分布:")
for maturity, count in report['maturity_distribution'].items():
print(f" {maturity}: {count}个组件")
print("\n最受欢迎的组件:")
for comp in report['top_components']:
print(f" {comp.name} (流行度: {comp.popularity_score})")
1.2 生态系统架构层次
from typing import Dict, List, Tuple
from dataclasses import dataclass
from enum import Enum
class ArchitectureLayer(Enum):
"""架构层次"""
STORAGE = "存储层"
RESOURCE_MANAGEMENT = "资源管理层"
PROCESSING = "处理层"
COORDINATION = "协调层"
APPLICATION = "应用层"
INTERFACE = "接口层"
@dataclass
class LayerComponent:
"""层次组件"""
name: str
layer: ArchitectureLayer
description: str
responsibilities: List[str]
interfaces: List[str]
dependencies: List[str]
class EcosystemArchitecture:
"""
生态系统架构
"""
def __init__(self):
self.layers = self._initialize_layers()
self.component_relationships = self._initialize_relationships()
def _initialize_layers(self) -> Dict[ArchitectureLayer, List[LayerComponent]]:
"""
初始化架构层次
Returns:
Dict[ArchitectureLayer, List[LayerComponent]]: 层次组件映射
"""
layers = {
ArchitectureLayer.STORAGE: [
LayerComponent(
name="HDFS",
layer=ArchitectureLayer.STORAGE,
description="分布式文件系统",
responsibilities=["数据存储", "数据复制", "容错处理"],
interfaces=["WebHDFS", "Java API", "命令行"],
dependencies=["Java"]
),
LayerComponent(
name="HBase",
layer=ArchitectureLayer.STORAGE,
description="NoSQL数据库",
responsibilities=["实时数据访问", "列式存储", "自动分片"],
interfaces=["Java API", "REST API", "Thrift"],
dependencies=["HDFS", "ZooKeeper"]
)
],
ArchitectureLayer.RESOURCE_MANAGEMENT: [
LayerComponent(
name="YARN",
layer=ArchitectureLayer.RESOURCE_MANAGEMENT,
description="资源管理器",
responsibilities=["资源分配", "作业调度", "集群管理"],
interfaces=["ResourceManager API", "Web UI"],
dependencies=["HDFS"]
)
],
ArchitectureLayer.PROCESSING: [
LayerComponent(
name="MapReduce",
layer=ArchitectureLayer.PROCESSING,
description="批处理框架",
responsibilities=["批量数据处理", "分布式计算"],
interfaces=["Java API", "Streaming API"],
dependencies=["YARN", "HDFS"]
),
LayerComponent(
name="Spark",
layer=ArchitectureLayer.PROCESSING,
description="统一计算引擎",
responsibilities=["批处理", "流处理", "机器学习", "图计算"],
interfaces=["Scala API", "Python API", "Java API", "R API"],
dependencies=["YARN", "HDFS"]
)
],
ArchitectureLayer.COORDINATION: [
LayerComponent(
name="ZooKeeper",
layer=ArchitectureLayer.COORDINATION,
description="分布式协调服务",
responsibilities=["配置管理", "命名服务", "分布式锁"],
interfaces=["Java API", "C API", "命令行"],
dependencies=["Java"]
)
],
ArchitectureLayer.APPLICATION: [
LayerComponent(
name="Hive",
layer=ArchitectureLayer.APPLICATION,
description="数据仓库软件",
responsibilities=["SQL查询", "元数据管理", "数据仓库"],
interfaces=["HiveQL", "JDBC", "ODBC"],
dependencies=["HDFS", "MapReduce/Spark"]
),
LayerComponent(
name="HBase",
layer=ArchitectureLayer.APPLICATION,
description="NoSQL数据库",
responsibilities=["实时数据访问", "大规模数据存储"],
interfaces=["Java API", "REST API", "Shell"],
dependencies=["HDFS", "ZooKeeper"]
)
],
ArchitectureLayer.INTERFACE: [
LayerComponent(
name="Ambari",
layer=ArchitectureLayer.INTERFACE,
description="集群管理界面",
responsibilities=["集群监控", "配置管理", "服务管理"],
interfaces=["Web UI", "REST API"],
dependencies=["所有Hadoop组件"]
)
]
}
return layers
def _initialize_relationships(self) -> Dict[str, List[str]]:
"""
初始化组件关系
Returns:
Dict[str, List[str]]: 组件依赖关系
"""
relationships = {
"HDFS": [], # 基础层,无依赖
"YARN": ["HDFS"],
"MapReduce": ["YARN", "HDFS"],
"Spark": ["YARN", "HDFS"],
"Hive": ["HDFS", "MapReduce"],
"HBase": ["HDFS", "ZooKeeper"],
"ZooKeeper": [],
"Kafka": ["ZooKeeper"],
"Flume": ["HDFS"],
"Oozie": ["YARN", "HDFS"],
"Ambari": ["所有组件"]
}
return relationships
def get_layer_components(self, layer: ArchitectureLayer) -> List[LayerComponent]:
"""
获取指定层的组件
Args:
layer: 架构层次
Returns:
List[LayerComponent]: 组件列表
"""
return self.layers.get(layer, [])
def get_component_dependencies(self, component_name: str) -> List[str]:
"""
获取组件依赖
Args:
component_name: 组件名称
Returns:
List[str]: 依赖组件列表
"""
return self.component_relationships.get(component_name, [])
def generate_deployment_order(self, components: List[str]) -> List[List[str]]:
"""
生成部署顺序
Args:
components: 要部署的组件列表
Returns:
List[List[str]]: 按层次分组的部署顺序
"""
# 拓扑排序算法
in_degree = {comp: 0 for comp in components}
graph = {comp: [] for comp in components}
# 构建图和计算入度
for comp in components:
deps = self.get_component_dependencies(comp)
for dep in deps:
if dep in components:
graph[dep].append(comp)
in_degree[comp] += 1
# 拓扑排序
result = []
queue = [comp for comp in components if in_degree[comp] == 0]
while queue:
current_level = queue[:]
queue = []
result.append(current_level)
for comp in current_level:
for neighbor in graph[comp]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return result
def analyze_architecture_complexity(self, components: List[str]) -> Dict[str, Any]:
"""
分析架构复杂度
Args:
components: 组件列表
Returns:
Dict[str, Any]: 复杂度分析结果
"""
# 计算层次分布
layer_distribution = {}
for layer in ArchitectureLayer:
layer_components = [comp.name for comp in self.get_layer_components(layer)]
count = len([comp for comp in components if comp in layer_components])
if count > 0:
layer_distribution[layer.value] = count
# 计算依赖复杂度
total_dependencies = sum(
len(self.get_component_dependencies(comp))
for comp in components
)
# 计算复杂度评分
complexity_score = len(components) + total_dependencies
if complexity_score <= 5:
complexity_level = "简单"
elif complexity_score <= 15:
complexity_level = "中等"
else:
complexity_level = "复杂"
return {
'total_components': len(components),
'layer_distribution': layer_distribution,
'total_dependencies': total_dependencies,
'complexity_score': complexity_score,
'complexity_level': complexity_level,
'deployment_phases': len(self.generate_deployment_order(components))
}
def generate_architecture_diagram_data(self, components: List[str]) -> Dict[str, Any]:
"""
生成架构图数据
Args:
components: 组件列表
Returns:
Dict[str, Any]: 架构图数据
"""
nodes = []
edges = []
# 生成节点
for comp in components:
# 查找组件所属层次
layer = None
for arch_layer, layer_components in self.layers.items():
if any(lc.name == comp for lc in layer_components):
layer = arch_layer.value
break
nodes.append({
'id': comp,
'label': comp,
'layer': layer or "未知",
'dependencies': self.get_component_dependencies(comp)
})
# 生成边
for comp in components:
deps = self.get_component_dependencies(comp)
for dep in deps:
if dep in components:
edges.append({
'from': dep,
'to': comp,
'type': 'dependency'
})
return {
'nodes': nodes,
'edges': edges,
'layers': list(set(node['layer'] for node in nodes))
}
# 使用示例
if __name__ == "__main__":
# 创建架构分析器
architecture = EcosystemArchitecture()
print("=== Hadoop生态系统架构分析 ===")
# 显示各层组件
print("\n=== 架构层次 ===")
for layer in ArchitectureLayer:
components = architecture.get_layer_components(layer)
if components:
print(f"\n{layer.value}:")
for comp in components:
print(f" - {comp.name}: {comp.description}")
print(f" 职责: {', '.join(comp.responsibilities)}")
print(f" 接口: {', '.join(comp.interfaces)}")
# 分析示例架构
example_components = ["HDFS", "YARN", "Spark", "Hive", "HBase", "ZooKeeper"]
print("\n=== 部署顺序分析 ===")
deployment_order = architecture.generate_deployment_order(example_components)
for i, phase in enumerate(deployment_order, 1):
print(f"阶段 {i}: {', '.join(phase)}")
print("\n=== 架构复杂度分析 ===")
complexity = architecture.analyze_architecture_complexity(example_components)
print(f"总组件数: {complexity['total_components']}")
print(f"总依赖数: {complexity['total_dependencies']}")
print(f"复杂度评分: {complexity['complexity_score']}")
print(f"复杂度级别: {complexity['complexity_level']}")
print(f"部署阶段数: {complexity['deployment_phases']}")
print("\n层次分布:")
for layer, count in complexity['layer_distribution'].items():
print(f" {layer}: {count}个组件")
print("\n=== 架构图数据 ===")
diagram_data = architecture.generate_architecture_diagram_data(example_components)
print(f"节点数: {len(diagram_data['nodes'])}")
print(f"边数: {len(diagram_data['edges'])}")
print(f"涉及层次: {', '.join(diagram_data['layers'])}")
2. 核心存储组件
2.1 HDFS深入解析
我们在前面的章节已经详细介绍了HDFS,这里重点关注其在生态系统中的作用和与其他组件的集成。
2.2 HBase详解
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class HBaseTable:
"""HBase表结构"""
name: str
column_families: List[str]
regions: int
compression: str
bloom_filter: bool
max_versions: int
@dataclass
class HBaseRegion:
"""HBase区域"""
table_name: str
start_key: str
end_key: str
region_server: str
size_mb: float
read_requests: int
write_requests: int
class HBaseClusterManager:
"""
HBase集群管理器
"""
def __init__(self):
self.tables = {}
self.regions = {}
self.region_servers = {}
self.performance_metrics = {}
def create_table(self, table_config: Dict[str, Any]) -> Dict[str, Any]:
"""
创建HBase表
Args:
table_config: 表配置
Returns:
Dict[str, Any]: 创建结果
"""
table = HBaseTable(
name=table_config['name'],
column_families=table_config.get('column_families', ['cf1']),
regions=table_config.get('regions', 1),
compression=table_config.get('compression', 'SNAPPY'),
bloom_filter=table_config.get('bloom_filter', True),
max_versions=table_config.get('max_versions', 3)
)
self.tables[table.name] = table
# 创建初始区域
self._create_initial_regions(table)
return {
'status': 'success',
'table_name': table.name,
'column_families': table.column_families,
'initial_regions': table.regions,
'configuration': {
'compression': table.compression,
'bloom_filter': table.bloom_filter,
'max_versions': table.max_versions
}
}
def _create_initial_regions(self, table: HBaseTable):
"""
创建初始区域
Args:
table: HBase表
"""
if table.name not in self.regions:
self.regions[table.name] = []
# 创建预分区
for i in range(table.regions):
start_key = f"{i:08d}" if i > 0 else ""
end_key = f"{i+1:08d}" if i < table.regions - 1 else ""
region = HBaseRegion(
table_name=table.name,
start_key=start_key,
end_key=end_key,
region_server=f"rs{i % 3 + 1}", # 简单的负载均衡
size_mb=0.0,
read_requests=0,
write_requests=0
)
self.regions[table.name].append(region)
def put_data(self, table_name: str, row_key: str, column_family: str,
column: str, value: str) -> Dict[str, Any]:
"""
插入数据
Args:
table_name: 表名
row_key: 行键
column_family: 列族
column: 列名
value: 值
Returns:
Dict[str, Any]: 插入结果
"""
if table_name not in self.tables:
return {'status': 'error', 'message': f'Table {table_name} not found'}
table = self.tables[table_name]
if column_family not in table.column_families:
return {'status': 'error', 'message': f'Column family {column_family} not found'}
# 找到对应的区域
region = self._find_region(table_name, row_key)
if region:
region.write_requests += 1
region.size_mb += len(value) / (1024 * 1024) # 简化的大小计算
return {
'status': 'success',
'table': table_name,
'row_key': row_key,
'column': f'{column_family}:{column}',
'region_server': region.region_server,
'timestamp': datetime.now().isoformat()
}
return {'status': 'error', 'message': 'No suitable region found'}
def get_data(self, table_name: str, row_key: str,
column_family: str = None, column: str = None) -> Dict[str, Any]:
"""
获取数据
Args:
table_name: 表名
row_key: 行键
column_family: 列族(可选)
column: 列名(可选)
Returns:
Dict[str, Any]: 查询结果
"""
if table_name not in self.tables:
return {'status': 'error', 'message': f'Table {table_name} not found'}
# 找到对应的区域
region = self._find_region(table_name, row_key)
if region:
region.read_requests += 1
# 模拟数据返回
result = {
'status': 'success',
'table': table_name,
'row_key': row_key,
'region_server': region.region_server,
'data': self._simulate_data_retrieval(table_name, row_key, column_family, column),
'timestamp': datetime.now().isoformat()
}
return result
return {'status': 'error', 'message': 'No suitable region found'}
def _find_region(self, table_name: str, row_key: str) -> Optional[HBaseRegion]:
"""
查找行键对应的区域
Args:
table_name: 表名
row_key: 行键
Returns:
Optional[HBaseRegion]: 区域信息
"""
if table_name not in self.regions:
return None
for region in self.regions[table_name]:
# 简化的区域查找逻辑
if (not region.start_key or row_key >= region.start_key) and \
(not region.end_key or row_key < region.end_key):
return region
# 如果没找到,返回第一个区域
return self.regions[table_name][0] if self.regions[table_name] else None
def _simulate_data_retrieval(self, table_name: str, row_key: str,
column_family: str, column: str) -> Dict[str, Any]:
"""
模拟数据检索
Args:
table_name: 表名
row_key: 行键
column_family: 列族
column: 列名
Returns:
Dict[str, Any]: 模拟数据
"""
table = self.tables[table_name]
data = {}
# 如果指定了列族和列
if column_family and column:
if column_family in table.column_families:
data[f'{column_family}:{column}'] = f'value_for_{row_key}_{column}'
# 如果只指定了列族
elif column_family:
if column_family in table.column_families:
for i in range(3): # 模拟3个列
data[f'{column_family}:col{i}'] = f'value_for_{row_key}_col{i}'
# 返回所有列族的数据
else:
for cf in table.column_families:
for i in range(3):
data[f'{cf}:col{i}'] = f'value_for_{row_key}_{cf}_col{i}'
return data
def scan_table(self, table_name: str, start_row: str = None,
end_row: str = None, limit: int = 100) -> Dict[str, Any]:
"""
扫描表
Args:
table_name: 表名
start_row: 起始行键
end_row: 结束行键
limit: 限制行数
Returns:
Dict[str, Any]: 扫描结果
"""
if table_name not in self.tables:
return {'status': 'error', 'message': f'Table {table_name} not found'}
# 模拟扫描结果
results = []
for i in range(min(limit, 10)): # 模拟最多10行数据
row_key = f"row_{i:06d}"
if start_row and row_key < start_row:
continue
if end_row and row_key >= end_row:
break
row_data = self._simulate_data_retrieval(table_name, row_key, None, None)
results.append({
'row_key': row_key,
'data': row_data
})
return {
'status': 'success',
'table': table_name,
'results': results,
'count': len(results),
'scan_range': {
'start_row': start_row,
'end_row': end_row
}
}
def split_region(self, table_name: str, region_index: int, split_key: str) -> Dict[str, Any]:
"""
分裂区域
Args:
table_name: 表名
region_index: 区域索引
split_key: 分裂键
Returns:
Dict[str, Any]: 分裂结果
"""
if table_name not in self.regions:
return {'status': 'error', 'message': f'Table {table_name} not found'}
regions = self.regions[table_name]
if region_index >= len(regions):
return {'status': 'error', 'message': 'Invalid region index'}
original_region = regions[region_index]
# 创建两个新区域
region1 = HBaseRegion(
table_name=table_name,
start_key=original_region.start_key,
end_key=split_key,
region_server=original_region.region_server,
size_mb=original_region.size_mb / 2,
read_requests=0,
write_requests=0
)
region2 = HBaseRegion(
table_name=table_name,
start_key=split_key,
end_key=original_region.end_key,
region_server=self._select_region_server(),
size_mb=original_region.size_mb / 2,
read_requests=0,
write_requests=0
)
# 替换原区域
regions[region_index] = region1
regions.append(region2)
return {
'status': 'success',
'original_region': {
'start_key': original_region.start_key,
'end_key': original_region.end_key,
'size_mb': original_region.size_mb
},
'new_regions': [
{
'start_key': region1.start_key,
'end_key': region1.end_key,
'region_server': region1.region_server
},
{
'start_key': region2.start_key,
'end_key': region2.end_key,
'region_server': region2.region_server
}
],
'split_key': split_key
}
def _select_region_server(self) -> str:
"""
选择区域服务器
Returns:
str: 区域服务器名称
"""
# 简单的负载均衡策略
servers = ['rs1', 'rs2', 'rs3']
server_loads = {server: 0 for server in servers}
# 计算每个服务器的负载
for table_regions in self.regions.values():
for region in table_regions:
if region.region_server in server_loads:
server_loads[region.region_server] += 1
# 返回负载最小的服务器
return min(server_loads.items(), key=lambda x: x[1])[0]
def get_table_info(self, table_name: str) -> Dict[str, Any]:
"""
获取表信息
Args:
table_name: 表名
Returns:
Dict[str, Any]: 表信息
"""
if table_name not in self.tables:
return {'status': 'error', 'message': f'Table {table_name} not found'}
table = self.tables[table_name]
regions = self.regions.get(table_name, [])
# 计算统计信息
total_size = sum(region.size_mb for region in regions)
total_read_requests = sum(region.read_requests for region in regions)
total_write_requests = sum(region.write_requests for region in regions)
region_info = []
for i, region in enumerate(regions):
region_info.append({
'index': i,
'start_key': region.start_key,
'end_key': region.end_key,
'region_server': region.region_server,
'size_mb': round(region.size_mb, 2),
'read_requests': region.read_requests,
'write_requests': region.write_requests
})
return {
'status': 'success',
'table_name': table_name,
'column_families': table.column_families,
'configuration': {
'compression': table.compression,
'bloom_filter': table.bloom_filter,
'max_versions': table.max_versions
},
'statistics': {
'region_count': len(regions),
'total_size_mb': round(total_size, 2),
'total_read_requests': total_read_requests,
'total_write_requests': total_write_requests
},
'regions': region_info
}
def compact_table(self, table_name: str, compact_type: str = 'minor') -> Dict[str, Any]:
"""
压缩表
Args:
table_name: 表名
compact_type: 压缩类型(minor/major)
Returns:
Dict[str, Any]: 压缩结果
"""
if table_name not in self.tables:
return {'status': 'error', 'message': f'Table {table_name} not found'}
regions = self.regions.get(table_name, [])
# 模拟压缩过程
compacted_regions = 0
size_reduction = 0.0
for region in regions:
if compact_type == 'minor':
# 小压缩:减少10-20%的大小
reduction = region.size_mb * 0.15
else:
# 大压缩:减少20-40%的大小
reduction = region.size_mb * 0.30
region.size_mb = max(0, region.size_mb - reduction)
size_reduction += reduction
compacted_regions += 1
return {
'status': 'success',
'table_name': table_name,
'compact_type': compact_type,
'compacted_regions': compacted_regions,
'size_reduction_mb': round(size_reduction, 2),
'completion_time': datetime.now().isoformat()
}
def get_cluster_status(self) -> Dict[str, Any]:
"""
获取集群状态
Returns:
Dict[str, Any]: 集群状态
"""
# 统计所有表和区域
total_tables = len(self.tables)
total_regions = sum(len(regions) for regions in self.regions.values())
total_size = sum(
sum(region.size_mb for region in regions)
for regions in self.regions.values()
)
# 统计区域服务器负载
server_stats = {}
for table_regions in self.regions.values():
for region in table_regions:
server = region.region_server
if server not in server_stats:
server_stats[server] = {
'region_count': 0,
'total_size_mb': 0.0,
'read_requests': 0,
'write_requests': 0
}
server_stats[server]['region_count'] += 1
server_stats[server]['total_size_mb'] += region.size_mb
server_stats[server]['read_requests'] += region.read_requests
server_stats[server]['write_requests'] += region.write_requests
return {
'cluster_summary': {
'total_tables': total_tables,
'total_regions': total_regions,
'total_size_mb': round(total_size, 2),
'region_servers': len(server_stats)
},
'region_servers': {
server: {
'region_count': stats['region_count'],
'total_size_mb': round(stats['total_size_mb'], 2),
'read_requests': stats['read_requests'],
'write_requests': stats['write_requests']
}
for server, stats in server_stats.items()
},
'tables': list(self.tables.keys())
}
# 使用示例
if __name__ == "__main__":
# 创建HBase集群管理器
hbase = HBaseClusterManager()
print("=== HBase集群管理示例 ===")
# 创建表
print("\n=== 创建表 ===")
table_config = {
'name': 'user_profiles',
'column_families': ['personal', 'contact', 'preferences'],
'regions': 3,
'compression': 'SNAPPY',
'bloom_filter': True,
'max_versions': 5
}
result = hbase.create_table(table_config)
print(f"创建表结果: {result['status']}")
print(f"表名: {result['table_name']}")
print(f"列族: {result['column_families']}")
print(f"初始区域数: {result['initial_regions']}")
# 插入数据
print("\n=== 插入数据 ===")
put_result = hbase.put_data('user_profiles', 'user001', 'personal', 'name', 'John Doe')
print(f"插入结果: {put_result}")
put_result = hbase.put_data('user_profiles', 'user001', 'contact', 'email', 'john@example.com')
print(f"插入结果: {put_result}")
# 查询数据
print("\n=== 查询数据 ===")
get_result = hbase.get_data('user_profiles', 'user001')
print(f"查询结果: {get_result}")
# 扫描表
print("\n=== 扫描表 ===")
scan_result = hbase.scan_table('user_profiles', limit=5)
print(f"扫描结果: 找到 {scan_result['count']} 行数据")
# 获取表信息
print("\n=== 表信息 ===")
table_info = hbase.get_table_info('user_profiles')
print(f"表名: {table_info['table_name']}")
print(f"列族: {table_info['column_families']}")
print(f"区域数: {table_info['statistics']['region_count']}")
print(f"总大小: {table_info['statistics']['total_size_mb']} MB")
# 分裂区域
print("\n=== 分裂区域 ===")
split_result = hbase.split_region('user_profiles', 0, 'user500')
print(f"分裂结果: {split_result['status']}")
if split_result['status'] == 'success':
print(f"分裂键: {split_result['split_key']}")
print(f"新区域数: {len(split_result['new_regions'])}")
# 压缩表
print("\n=== 压缩表 ===")
compact_result = hbase.compact_table('user_profiles', 'major')
print(f"压缩结果: {compact_result}")
# 获取集群状态
print("\n=== 集群状态 ===")
cluster_status = hbase.get_cluster_status()
print(f"总表数: {cluster_status['cluster_summary']['total_tables']}")
print(f"总区域数: {cluster_status['cluster_summary']['total_regions']}")
print(f"区域服务器数: {cluster_status['cluster_summary']['region_servers']}")
print("\n区域服务器负载:")
for server, stats in cluster_status['region_servers'].items():
print(f" {server}: {stats['region_count']}个区域, {stats['total_size_mb']}MB")
3. 数据处理组件
3.1 Apache Spark详解
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
import json
class SparkJobStatus(Enum):
"""Spark作业状态"""
SUBMITTED = "已提交"
RUNNING = "运行中"
SUCCEEDED = "成功"
FAILED = "失败"
CANCELLED = "已取消"
class SparkApplicationType(Enum):
"""Spark应用类型"""
BATCH = "批处理"
STREAMING = "流处理"
MACHINE_LEARNING = "机器学习"
GRAPH_PROCESSING = "图处理"
SQL = "SQL查询"
@dataclass
class SparkJob:
"""Spark作业"""
job_id: str
app_id: str
name: str
job_type: SparkApplicationType
status: SparkJobStatus
submit_time: datetime
start_time: Optional[datetime]
end_time: Optional[datetime]
driver_memory: str
executor_memory: str
executor_cores: int
num_executors: int
input_data_size: float # GB
output_data_size: float # GB
stages: List[Dict[str, Any]]
metrics: Dict[str, Any]
@dataclass
class SparkCluster:
"""Spark集群"""
cluster_id: str
master_url: str
total_cores: int
total_memory_gb: float
worker_nodes: List[str]
active_applications: List[str]
completed_applications: List[str]
class SparkClusterManager:
"""
Spark集群管理器
"""
def __init__(self):
self.clusters = {}
self.applications = {}
self.jobs = {}
self.performance_metrics = {}
self.resource_usage = {}
def create_cluster(self, cluster_config: Dict[str, Any]) -> Dict[str, Any]:
"""
创建Spark集群
Args:
cluster_config: 集群配置
Returns:
Dict[str, Any]: 创建结果
"""
cluster = SparkCluster(
cluster_id=cluster_config['cluster_id'],
master_url=cluster_config.get('master_url', 'spark://master:7077'),
total_cores=cluster_config.get('total_cores', 16),
total_memory_gb=cluster_config.get('total_memory_gb', 64.0),
worker_nodes=cluster_config.get('worker_nodes', ['worker1', 'worker2', 'worker3']),
active_applications=[],
completed_applications=[]
)
self.clusters[cluster.cluster_id] = cluster
# 初始化资源使用情况
self.resource_usage[cluster.cluster_id] = {
'used_cores': 0,
'used_memory_gb': 0.0,
'cpu_utilization': 0.0,
'memory_utilization': 0.0
}
return {
'status': 'success',
'cluster_id': cluster.cluster_id,
'master_url': cluster.master_url,
'total_cores': cluster.total_cores,
'total_memory_gb': cluster.total_memory_gb,
'worker_nodes': cluster.worker_nodes
}
def submit_application(self, app_config: Dict[str, Any]) -> Dict[str, Any]:
"""
提交Spark应用
Args:
app_config: 应用配置
Returns:
Dict[str, Any]: 提交结果
"""
cluster_id = app_config.get('cluster_id')
if cluster_id not in self.clusters:
return {'status': 'error', 'message': f'Cluster {cluster_id} not found'}
cluster = self.clusters[cluster_id]
# 检查资源是否足够
required_cores = app_config.get('executor_cores', 2) * app_config.get('num_executors', 2)
required_memory = float(app_config.get('executor_memory', '2g').replace('g', '')) * app_config.get('num_executors', 2)
current_usage = self.resource_usage[cluster_id]
if (current_usage['used_cores'] + required_cores > cluster.total_cores or
current_usage['used_memory_gb'] + required_memory > cluster.total_memory_gb):
return {
'status': 'error',
'message': 'Insufficient resources',
'required': {'cores': required_cores, 'memory_gb': required_memory},
'available': {
'cores': cluster.total_cores - current_usage['used_cores'],
'memory_gb': cluster.total_memory_gb - current_usage['used_memory_gb']
}
}
# 创建应用
app_id = f"app-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{len(self.applications)}"
application = {
'app_id': app_id,
'name': app_config.get('name', 'Spark Application'),
'cluster_id': cluster_id,
'app_type': SparkApplicationType(app_config.get('app_type', 'BATCH')),
'status': SparkJobStatus.SUBMITTED,
'submit_time': datetime.now(),
'driver_memory': app_config.get('driver_memory', '1g'),
'executor_memory': app_config.get('executor_memory', '2g'),
'executor_cores': app_config.get('executor_cores', 2),
'num_executors': app_config.get('num_executors', 2),
'main_class': app_config.get('main_class'),
'jar_file': app_config.get('jar_file'),
'arguments': app_config.get('arguments', []),
'configuration': app_config.get('configuration', {})
}
self.applications[app_id] = application
cluster.active_applications.append(app_id)
# 更新资源使用
current_usage['used_cores'] += required_cores
current_usage['used_memory_gb'] += required_memory
current_usage['cpu_utilization'] = (current_usage['used_cores'] / cluster.total_cores) * 100
current_usage['memory_utilization'] = (current_usage['used_memory_gb'] / cluster.total_memory_gb) * 100
return {
'status': 'success',
'app_id': app_id,
'cluster_id': cluster_id,
'submit_time': application['submit_time'].isoformat(),
'resource_allocation': {
'cores': required_cores,
'memory_gb': required_memory,
'executors': application['num_executors']
}
}
def create_spark_job(self, job_config: Dict[str, Any]) -> Dict[str, Any]:
"""
创建Spark作业
Args:
job_config: 作业配置
Returns:
Dict[str, Any]: 创建结果
"""
app_id = job_config.get('app_id')
if app_id not in self.applications:
return {'status': 'error', 'message': f'Application {app_id} not found'}
application = self.applications[app_id]
job = SparkJob(
job_id=f"job-{len(self.jobs)}",
app_id=app_id,
name=job_config.get('name', 'Spark Job'),
job_type=application['app_type'],
status=SparkJobStatus.SUBMITTED,
submit_time=datetime.now(),
start_time=None,
end_time=None,
driver_memory=application['driver_memory'],
executor_memory=application['executor_memory'],
executor_cores=application['executor_cores'],
num_executors=application['num_executors'],
input_data_size=job_config.get('input_data_size', 1.0),
output_data_size=0.0,
stages=[],
metrics={}
)
self.jobs[job.job_id] = job
return {
'status': 'success',
'job_id': job.job_id,
'app_id': app_id,
'submit_time': job.submit_time.isoformat()
}
def start_job(self, job_id: str) -> Dict[str, Any]:
"""
启动作业
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 启动结果
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
job.status = SparkJobStatus.RUNNING
job.start_time = datetime.now()
# 模拟创建stages
stages = self._create_job_stages(job)
job.stages = stages
return {
'status': 'success',
'job_id': job_id,
'start_time': job.start_time.isoformat(),
'stages': len(stages),
'estimated_duration': self._estimate_job_duration(job)
}
def _create_job_stages(self, job: SparkJob) -> List[Dict[str, Any]]:
"""
创建作业阶段
Args:
job: Spark作业
Returns:
List[Dict[str, Any]]: 阶段列表
"""
stages = []
if job.job_type == SparkApplicationType.BATCH:
# 批处理作业的典型阶段
stages = [
{
'stage_id': 0,
'name': 'Data Loading',
'num_tasks': job.num_executors * 2,
'status': 'pending',
'input_size_gb': job.input_data_size,
'output_size_gb': job.input_data_size * 0.9
},
{
'stage_id': 1,
'name': 'Data Transformation',
'num_tasks': job.num_executors * 4,
'status': 'pending',
'input_size_gb': job.input_data_size * 0.9,
'output_size_gb': job.input_data_size * 0.7
},
{
'stage_id': 2,
'name': 'Data Aggregation',
'num_tasks': job.num_executors,
'status': 'pending',
'input_size_gb': job.input_data_size * 0.7,
'output_size_gb': job.input_data_size * 0.3
}
]
elif job.job_type == SparkApplicationType.MACHINE_LEARNING:
# 机器学习作业的典型阶段
stages = [
{
'stage_id': 0,
'name': 'Feature Extraction',
'num_tasks': job.num_executors * 2,
'status': 'pending',
'input_size_gb': job.input_data_size,
'output_size_gb': job.input_data_size * 1.2
},
{
'stage_id': 1,
'name': 'Model Training',
'num_tasks': job.num_executors * 8,
'status': 'pending',
'input_size_gb': job.input_data_size * 1.2,
'output_size_gb': 0.1
},
{
'stage_id': 2,
'name': 'Model Evaluation',
'num_tasks': job.num_executors,
'status': 'pending',
'input_size_gb': job.input_data_size * 0.2,
'output_size_gb': 0.01
}
]
return stages
def _estimate_job_duration(self, job: SparkJob) -> str:
"""
估算作业持续时间
Args:
job: Spark作业
Returns:
str: 估算时间
"""
# 基于数据大小和资源的简单估算
base_time = job.input_data_size * 60 # 每GB 60秒
resource_factor = max(1, 8 / job.num_executors) # 资源调整因子
estimated_seconds = base_time * resource_factor
if estimated_seconds < 60:
return f"{int(estimated_seconds)}秒"
elif estimated_seconds < 3600:
return f"{int(estimated_seconds/60)}分钟"
else:
return f"{estimated_seconds/3600:.1f}小时"
def complete_job(self, job_id: str, success: bool = True) -> Dict[str, Any]:
"""
完成作业
Args:
job_id: 作业ID
success: 是否成功
Returns:
Dict[str, Any]: 完成结果
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
job.status = SparkJobStatus.SUCCEEDED if success else SparkJobStatus.FAILED
job.end_time = datetime.now()
# 计算作业指标
if job.start_time:
duration = (job.end_time - job.start_time).total_seconds()
# 模拟输出数据大小
if success:
job.output_data_size = sum(stage.get('output_size_gb', 0) for stage in job.stages)
job.metrics = {
'duration_seconds': duration,
'input_data_gb': job.input_data_size,
'output_data_gb': job.output_data_size,
'data_processing_rate_gb_per_sec': job.input_data_size / duration if duration > 0 else 0,
'total_tasks': sum(stage.get('num_tasks', 0) for stage in job.stages),
'resource_hours': (job.num_executors * job.executor_cores * duration) / 3600
}
# 释放资源
self._release_job_resources(job)
return {
'status': 'success',
'job_id': job_id,
'final_status': job.status.value,
'end_time': job.end_time.isoformat(),
'metrics': job.metrics
}
def _release_job_resources(self, job: SparkJob):
"""
释放作业资源
Args:
job: Spark作业
"""
application = self.applications.get(job.app_id)
if not application:
return
cluster_id = application['cluster_id']
if cluster_id not in self.resource_usage:
return
# 计算释放的资源
released_cores = job.executor_cores * job.num_executors
released_memory = float(job.executor_memory.replace('g', '')) * job.num_executors
# 更新资源使用
current_usage = self.resource_usage[cluster_id]
current_usage['used_cores'] = max(0, current_usage['used_cores'] - released_cores)
current_usage['used_memory_gb'] = max(0, current_usage['used_memory_gb'] - released_memory)
cluster = self.clusters[cluster_id]
current_usage['cpu_utilization'] = (current_usage['used_cores'] / cluster.total_cores) * 100
current_usage['memory_utilization'] = (current_usage['used_memory_gb'] / cluster.total_memory_gb) * 100
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""
获取作业状态
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 作业状态
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
result = {
'job_id': job.job_id,
'app_id': job.app_id,
'name': job.name,
'job_type': job.job_type.value,
'status': job.status.value,
'submit_time': job.submit_time.isoformat(),
'resource_allocation': {
'driver_memory': job.driver_memory,
'executor_memory': job.executor_memory,
'executor_cores': job.executor_cores,
'num_executors': job.num_executors
},
'data_info': {
'input_size_gb': job.input_data_size,
'output_size_gb': job.output_data_size
},
'stages': job.stages
}
if job.start_time:
result['start_time'] = job.start_time.isoformat()
if job.end_time:
result['end_time'] = job.end_time.isoformat()
if job.metrics:
result['metrics'] = job.metrics
return result
def get_cluster_status(self, cluster_id: str) -> Dict[str, Any]:
"""
获取集群状态
Args:
cluster_id: 集群ID
Returns:
Dict[str, Any]: 集群状态
"""
if cluster_id not in self.clusters:
return {'status': 'error', 'message': f'Cluster {cluster_id} not found'}
cluster = self.clusters[cluster_id]
usage = self.resource_usage[cluster_id]
# 统计应用状态
active_apps = []
for app_id in cluster.active_applications:
if app_id in self.applications:
app = self.applications[app_id]
active_apps.append({
'app_id': app_id,
'name': app['name'],
'type': app['app_type'].value,
'status': app['status'].value,
'submit_time': app['submit_time'].isoformat()
})
return {
'cluster_id': cluster_id,
'master_url': cluster.master_url,
'resources': {
'total_cores': cluster.total_cores,
'total_memory_gb': cluster.total_memory_gb,
'used_cores': usage['used_cores'],
'used_memory_gb': usage['used_memory_gb'],
'available_cores': cluster.total_cores - usage['used_cores'],
'available_memory_gb': cluster.total_memory_gb - usage['used_memory_gb'],
'cpu_utilization': round(usage['cpu_utilization'], 2),
'memory_utilization': round(usage['memory_utilization'], 2)
},
'worker_nodes': cluster.worker_nodes,
'applications': {
'active_count': len(cluster.active_applications),
'completed_count': len(cluster.completed_applications),
'active_applications': active_apps
}
}
def optimize_resource_allocation(self, cluster_id: str) -> Dict[str, Any]:
"""
优化资源分配
Args:
cluster_id: 集群ID
Returns:
Dict[str, Any]: 优化建议
"""
if cluster_id not in self.clusters:
return {'status': 'error', 'message': f'Cluster {cluster_id} not found'}
cluster = self.clusters[cluster_id]
usage = self.resource_usage[cluster_id]
recommendations = []
# CPU利用率分析
if usage['cpu_utilization'] > 90:
recommendations.append({
'type': 'resource_shortage',
'priority': 'high',
'message': 'CPU利用率过高,建议增加worker节点或减少并发作业',
'suggested_action': 'scale_out'
})
elif usage['cpu_utilization'] < 30:
recommendations.append({
'type': 'resource_waste',
'priority': 'medium',
'message': 'CPU利用率较低,可以考虑减少资源或增加作业并发度',
'suggested_action': 'scale_in_or_increase_concurrency'
})
# 内存利用率分析
if usage['memory_utilization'] > 85:
recommendations.append({
'type': 'memory_pressure',
'priority': 'high',
'message': '内存使用率过高,可能导致GC压力,建议优化内存配置',
'suggested_action': 'optimize_memory_config'
})
# 负载均衡分析
active_apps = len(cluster.active_applications)
worker_count = len(cluster.worker_nodes)
if active_apps > worker_count * 2:
recommendations.append({
'type': 'load_imbalance',
'priority': 'medium',
'message': '作业数量较多,建议检查负载均衡配置',
'suggested_action': 'check_load_balancing'
})
# 配置优化建议
optimization_suggestions = {
'executor_memory': self._suggest_executor_memory(cluster, usage),
'executor_cores': self._suggest_executor_cores(cluster, usage),
'dynamic_allocation': usage['cpu_utilization'] < 50,
'speculation': usage['cpu_utilization'] > 70
}
return {
'cluster_id': cluster_id,
'current_utilization': {
'cpu': usage['cpu_utilization'],
'memory': usage['memory_utilization']
},
'recommendations': recommendations,
'optimization_suggestions': optimization_suggestions,
'analysis_time': datetime.now().isoformat()
}
def _suggest_executor_memory(self, cluster: SparkCluster, usage: Dict[str, Any]) -> str:
"""
建议executor内存配置
Args:
cluster: Spark集群
usage: 资源使用情况
Returns:
str: 建议的内存配置
"""
# 基于集群总内存和worker数量的简单计算
memory_per_worker = cluster.total_memory_gb / len(cluster.worker_nodes)
suggested_memory = max(1, int(memory_per_worker * 0.8 / 2)) # 保留20%给系统,每个worker运行2个executor
return f"{suggested_memory}g"
def _suggest_executor_cores(self, cluster: SparkCluster, usage: Dict[str, Any]) -> int:
"""
建议executor核心数配置
Args:
cluster: Spark集群
usage: 资源使用情况
Returns:
int: 建议的核心数
"""
# 基于集群总核心数和worker数量的简单计算
cores_per_worker = cluster.total_cores / len(cluster.worker_nodes)
suggested_cores = max(1, int(cores_per_worker / 2)) # 每个worker运行2个executor
return min(suggested_cores, 5) # 限制最大核心数为5
# 使用示例
if __name__ == "__main__":
# 创建Spark集群管理器
spark_manager = SparkClusterManager()
print("=== Spark集群管理示例 ===")
# 创建集群
print("\n=== 创建集群 ===")
cluster_config = {
'cluster_id': 'spark-cluster-1',
'master_url': 'spark://master:7077',
'total_cores': 32,
'total_memory_gb': 128.0,
'worker_nodes': ['worker1', 'worker2', 'worker3', 'worker4']
}
cluster_result = spark_manager.create_cluster(cluster_config)
print(f"集群创建结果: {cluster_result}")
# 提交应用
print("\n=== 提交应用 ===")
app_config = {
'cluster_id': 'spark-cluster-1',
'name': 'Data Processing Job',
'app_type': 'BATCH',
'driver_memory': '2g',
'executor_memory': '4g',
'executor_cores': 2,
'num_executors': 4,
'main_class': 'com.example.DataProcessor',
'jar_file': '/path/to/app.jar'
}
app_result = spark_manager.submit_application(app_config)
print(f"应用提交结果: {app_result}")
if app_result['status'] == 'success':
app_id = app_result['app_id']
# 创建作业
print("\n=== 创建作业 ===")
job_config = {
'app_id': app_id,
'name': 'ETL Job',
'input_data_size': 10.0 # 10GB
}
job_result = spark_manager.create_spark_job(job_config)
print(f"作业创建结果: {job_result}")
if job_result['status'] == 'success':
job_id = job_result['job_id']
# 启动作业
print("\n=== 启动作业 ===")
start_result = spark_manager.start_job(job_id)
print(f"作业启动结果: {start_result}")
# 获取作业状态
print("\n=== 作业状态 ===")
job_status = spark_manager.get_job_status(job_id)
print(f"作业状态: {job_status['status']}")
print(f"阶段数: {len(job_status['stages'])}")
print(f"预估时间: {start_result.get('estimated_duration')}")
# 完成作业
print("\n=== 完成作业 ===")
complete_result = spark_manager.complete_job(job_id, success=True)
print(f"作业完成结果: {complete_result}")
# 获取集群状态
print("\n=== 集群状态 ===")
cluster_status = spark_manager.get_cluster_status('spark-cluster-1')
print(f"集群ID: {cluster_status['cluster_id']}")
print(f"CPU利用率: {cluster_status['resources']['cpu_utilization']}%")
print(f"内存利用率: {cluster_status['resources']['memory_utilization']}%")
print(f"活跃应用数: {cluster_status['applications']['active_count']}")
# 资源优化建议
print("\n=== 资源优化建议 ===")
optimization = spark_manager.optimize_resource_allocation('spark-cluster-1')
print(f"当前CPU利用率: {optimization['current_utilization']['cpu']}%")
print(f"当前内存利用率: {optimization['current_utilization']['memory']}%")
print("\n优化建议:")
for rec in optimization['recommendations']:
print(f" - {rec['type']} ({rec['priority']}): {rec['message']}")
print("\n配置建议:")
suggestions = optimization['optimization_suggestions']
print(f" - 建议executor内存: {suggestions['executor_memory']}")
print(f" - 建议executor核心数: {suggestions['executor_cores']}")
print(f" - 启用动态分配: {suggestions['dynamic_allocation']}")
print(f" - 启用推测执行: {suggestions['speculation']}")
3.2 Apache Flink详解
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
import json
import time
class FlinkJobStatus(Enum):
"""Flink作业状态"""
CREATED = "已创建"
RUNNING = "运行中"
FINISHED = "已完成"
CANCELED = "已取消"
FAILED = "失败"
RESTARTING = "重启中"
class FlinkJobType(Enum):
"""Flink作业类型"""
STREAMING = "流处理"
BATCH = "批处理"
CEP = "复杂事件处理"
MACHINE_LEARNING = "机器学习"
GRAPH = "图处理"
class CheckpointStatus(Enum):
"""检查点状态"""
COMPLETED = "已完成"
IN_PROGRESS = "进行中"
FAILED = "失败"
@dataclass
class FlinkCheckpoint:
"""Flink检查点"""
checkpoint_id: int
timestamp: datetime
status: CheckpointStatus
size_bytes: int
duration_ms: int
external_path: Optional[str] = None
@dataclass
class FlinkTaskManager:
"""Flink任务管理器"""
tm_id: str
host: str
port: int
slots_total: int
slots_available: int
memory_mb: int
cpu_cores: int
network_memory_mb: int
managed_memory_mb: int
@dataclass
class FlinkJob:
"""Flink作业"""
job_id: str
name: str
job_type: FlinkJobType
status: FlinkJobStatus
start_time: datetime
end_time: Optional[datetime]
parallelism: int
max_parallelism: int
checkpointing_enabled: bool
checkpoint_interval_ms: int
savepoint_path: Optional[str]
restart_strategy: str
metrics: Dict[str, Any]
checkpoints: List[FlinkCheckpoint]
class FlinkClusterManager:
"""
Flink集群管理器
"""
def __init__(self):
self.job_manager_host = "localhost"
self.job_manager_port = 8081
self.task_managers = {}
self.jobs = {}
self.checkpoints = {}
self.savepoints = {}
self.cluster_metrics = {
'total_slots': 0,
'available_slots': 0,
'running_jobs': 0,
'completed_jobs': 0,
'failed_jobs': 0
}
def add_task_manager(self, tm_config: Dict[str, Any]) -> Dict[str, Any]:
"""
添加任务管理器
Args:
tm_config: 任务管理器配置
Returns:
Dict[str, Any]: 添加结果
"""
tm = FlinkTaskManager(
tm_id=tm_config['tm_id'],
host=tm_config.get('host', 'localhost'),
port=tm_config.get('port', 6121),
slots_total=tm_config.get('slots_total', 4),
slots_available=tm_config.get('slots_total', 4),
memory_mb=tm_config.get('memory_mb', 4096),
cpu_cores=tm_config.get('cpu_cores', 4),
network_memory_mb=tm_config.get('network_memory_mb', 512),
managed_memory_mb=tm_config.get('managed_memory_mb', 1024)
)
self.task_managers[tm.tm_id] = tm
# 更新集群指标
self.cluster_metrics['total_slots'] += tm.slots_total
self.cluster_metrics['available_slots'] += tm.slots_available
return {
'status': 'success',
'tm_id': tm.tm_id,
'host': tm.host,
'slots': tm.slots_total,
'memory_mb': tm.memory_mb
}
def submit_job(self, job_config: Dict[str, Any]) -> Dict[str, Any]:
"""
提交Flink作业
Args:
job_config: 作业配置
Returns:
Dict[str, Any]: 提交结果
"""
# 检查资源是否足够
required_slots = job_config.get('parallelism', 1)
if self.cluster_metrics['available_slots'] < required_slots:
return {
'status': 'error',
'message': f'Insufficient slots. Required: {required_slots}, Available: {self.cluster_metrics["available_slots"]}'
}
# 创建作业
job_id = f"job-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{len(self.jobs)}"
job = FlinkJob(
job_id=job_id,
name=job_config.get('name', 'Flink Job'),
job_type=FlinkJobType(job_config.get('job_type', 'STREAMING')),
status=FlinkJobStatus.CREATED,
start_time=datetime.now(),
end_time=None,
parallelism=job_config.get('parallelism', 1),
max_parallelism=job_config.get('max_parallelism', 128),
checkpointing_enabled=job_config.get('checkpointing_enabled', True),
checkpoint_interval_ms=job_config.get('checkpoint_interval_ms', 60000),
savepoint_path=job_config.get('savepoint_path'),
restart_strategy=job_config.get('restart_strategy', 'fixed-delay'),
metrics={},
checkpoints=[]
)
self.jobs[job_id] = job
# 分配资源
self._allocate_slots(job)
return {
'status': 'success',
'job_id': job_id,
'name': job.name,
'parallelism': job.parallelism,
'submit_time': job.start_time.isoformat()
}
def _allocate_slots(self, job: FlinkJob):
"""
为作业分配slot
Args:
job: Flink作业
"""
slots_needed = job.parallelism
allocated = 0
for tm in self.task_managers.values():
if allocated >= slots_needed:
break
slots_to_allocate = min(tm.slots_available, slots_needed - allocated)
tm.slots_available -= slots_to_allocate
allocated += slots_to_allocate
# 更新集群指标
self.cluster_metrics['available_slots'] -= slots_needed
def start_job(self, job_id: str) -> Dict[str, Any]:
"""
启动作业
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 启动结果
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
job.status = FlinkJobStatus.RUNNING
# 更新集群指标
self.cluster_metrics['running_jobs'] += 1
# 如果启用检查点,创建初始检查点
if job.checkpointing_enabled:
self._create_checkpoint(job)
return {
'status': 'success',
'job_id': job_id,
'status_change': f'{FlinkJobStatus.CREATED.value} -> {FlinkJobStatus.RUNNING.value}',
'start_time': job.start_time.isoformat()
}
def _create_checkpoint(self, job: FlinkJob) -> FlinkCheckpoint:
"""
创建检查点
Args:
job: Flink作业
Returns:
FlinkCheckpoint: 检查点
"""
checkpoint_id = len(job.checkpoints)
checkpoint = FlinkCheckpoint(
checkpoint_id=checkpoint_id,
timestamp=datetime.now(),
status=CheckpointStatus.IN_PROGRESS,
size_bytes=0,
duration_ms=0
)
# 模拟检查点完成
checkpoint.status = CheckpointStatus.COMPLETED
checkpoint.size_bytes = job.parallelism * 1024 * 1024 # 每个并行度1MB
checkpoint.duration_ms = job.parallelism * 100 # 每个并行度100ms
checkpoint.external_path = f"/checkpoints/{job.job_id}/chk-{checkpoint_id}"
job.checkpoints.append(checkpoint)
return checkpoint
def create_savepoint(self, job_id: str, target_directory: Optional[str] = None) -> Dict[str, Any]:
"""
创建保存点
Args:
job_id: 作业ID
target_directory: 目标目录
Returns:
Dict[str, Any]: 创建结果
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
if job.status != FlinkJobStatus.RUNNING:
return {'status': 'error', 'message': f'Job {job_id} is not running'}
savepoint_id = f"savepoint-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
savepoint_path = target_directory or f"/savepoints/{job_id}/{savepoint_id}"
savepoint = {
'savepoint_id': savepoint_id,
'job_id': job_id,
'path': savepoint_path,
'timestamp': datetime.now(),
'size_bytes': job.parallelism * 2 * 1024 * 1024, # 每个并行度2MB
'trigger_type': 'manual'
}
self.savepoints[savepoint_id] = savepoint
job.savepoint_path = savepoint_path
return {
'status': 'success',
'savepoint_id': savepoint_id,
'path': savepoint_path,
'size_bytes': savepoint['size_bytes'],
'timestamp': savepoint['timestamp'].isoformat()
}
def stop_job(self, job_id: str, with_savepoint: bool = False,
target_directory: Optional[str] = None) -> Dict[str, Any]:
"""
停止作业
Args:
job_id: 作业ID
with_savepoint: 是否创建保存点
target_directory: 保存点目录
Returns:
Dict[str, Any]: 停止结果
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
result = {
'status': 'success',
'job_id': job_id,
'previous_status': job.status.value
}
# 如果需要创建保存点
if with_savepoint and job.status == FlinkJobStatus.RUNNING:
savepoint_result = self.create_savepoint(job_id, target_directory)
if savepoint_result['status'] == 'success':
result['savepoint'] = savepoint_result
else:
return savepoint_result
# 停止作业
job.status = FlinkJobStatus.FINISHED
job.end_time = datetime.now()
# 释放资源
self._release_slots(job)
# 更新集群指标
if result['previous_status'] == FlinkJobStatus.RUNNING.value:
self.cluster_metrics['running_jobs'] -= 1
self.cluster_metrics['completed_jobs'] += 1
result['end_time'] = job.end_time.isoformat()
return result
def _release_slots(self, job: FlinkJob):
"""
释放作业占用的slot
Args:
job: Flink作业
"""
slots_to_release = job.parallelism
released = 0
for tm in self.task_managers.values():
if released >= slots_to_release:
break
slots_can_release = min(tm.slots_total - tm.slots_available, slots_to_release - released)
tm.slots_available += slots_can_release
released += slots_can_release
# 更新集群指标
self.cluster_metrics['available_slots'] += slots_to_release
def cancel_job(self, job_id: str) -> Dict[str, Any]:
"""
取消作业
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 取消结果
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
previous_status = job.status
job.status = FlinkJobStatus.CANCELED
job.end_time = datetime.now()
# 释放资源
self._release_slots(job)
# 更新集群指标
if previous_status == FlinkJobStatus.RUNNING:
self.cluster_metrics['running_jobs'] -= 1
self.cluster_metrics['failed_jobs'] += 1
return {
'status': 'success',
'job_id': job_id,
'previous_status': previous_status.value,
'current_status': job.status.value,
'end_time': job.end_time.isoformat()
}
def get_job_details(self, job_id: str) -> Dict[str, Any]:
"""
获取作业详情
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 作业详情
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
# 计算运行时间
if job.end_time:
duration = job.end_time - job.start_time
else:
duration = datetime.now() - job.start_time
# 获取最新检查点信息
latest_checkpoint = None
if job.checkpoints:
latest_checkpoint = {
'checkpoint_id': job.checkpoints[-1].checkpoint_id,
'timestamp': job.checkpoints[-1].timestamp.isoformat(),
'status': job.checkpoints[-1].status.value,
'size_bytes': job.checkpoints[-1].size_bytes,
'duration_ms': job.checkpoints[-1].duration_ms
}
result = {
'job_id': job.job_id,
'name': job.name,
'job_type': job.job_type.value,
'status': job.status.value,
'start_time': job.start_time.isoformat(),
'duration_seconds': int(duration.total_seconds()),
'parallelism': job.parallelism,
'max_parallelism': job.max_parallelism,
'checkpointing': {
'enabled': job.checkpointing_enabled,
'interval_ms': job.checkpoint_interval_ms,
'total_checkpoints': len(job.checkpoints),
'latest_checkpoint': latest_checkpoint
},
'restart_strategy': job.restart_strategy
}
if job.end_time:
result['end_time'] = job.end_time.isoformat()
if job.savepoint_path:
result['savepoint_path'] = job.savepoint_path
return result
def get_cluster_overview(self) -> Dict[str, Any]:
"""
获取集群概览
Returns:
Dict[str, Any]: 集群概览
"""
# 统计任务管理器信息
tm_summary = {
'total_task_managers': len(self.task_managers),
'total_memory_mb': sum(tm.memory_mb for tm in self.task_managers.values()),
'total_cpu_cores': sum(tm.cpu_cores for tm in self.task_managers.values()),
'task_managers': []
}
for tm in self.task_managers.values():
tm_summary['task_managers'].append({
'tm_id': tm.tm_id,
'host': tm.host,
'slots_total': tm.slots_total,
'slots_available': tm.slots_available,
'slots_used': tm.slots_total - tm.slots_available,
'memory_mb': tm.memory_mb,
'cpu_cores': tm.cpu_cores
})
# 统计作业信息
job_summary = {
'total_jobs': len(self.jobs),
'running_jobs': self.cluster_metrics['running_jobs'],
'completed_jobs': self.cluster_metrics['completed_jobs'],
'failed_jobs': self.cluster_metrics['failed_jobs'],
'jobs_by_type': {}
}
# 按类型统计作业
for job in self.jobs.values():
job_type = job.job_type.value
if job_type not in job_summary['jobs_by_type']:
job_summary['jobs_by_type'][job_type] = 0
job_summary['jobs_by_type'][job_type] += 1
return {
'job_manager': {
'host': self.job_manager_host,
'port': self.job_manager_port
},
'cluster_metrics': self.cluster_metrics,
'task_managers': tm_summary,
'jobs': job_summary,
'timestamp': datetime.now().isoformat()
}
def get_job_metrics(self, job_id: str) -> Dict[str, Any]:
"""
获取作业指标
Args:
job_id: 作业ID
Returns:
Dict[str, Any]: 作业指标
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
# 模拟一些指标数据
if job.status == FlinkJobStatus.RUNNING:
# 运行中的作业有实时指标
metrics = {
'records_in_per_second': job.parallelism * 1000,
'records_out_per_second': job.parallelism * 950,
'bytes_in_per_second': job.parallelism * 1024 * 100,
'bytes_out_per_second': job.parallelism * 1024 * 95,
'backpressure_ratio': 0.1,
'cpu_utilization': 0.75,
'memory_utilization': 0.65,
'gc_time_ms_per_second': 50,
'checkpoint_duration_ms': 500,
'checkpoint_size_bytes': job.parallelism * 1024 * 1024
}
else:
# 非运行状态的作业只有历史指标
duration = (job.end_time - job.start_time).total_seconds() if job.end_time else 0
metrics = {
'total_records_processed': int(job.parallelism * 1000 * duration),
'total_bytes_processed': int(job.parallelism * 1024 * 100 * duration),
'average_processing_rate': job.parallelism * 1000 if duration > 0 else 0,
'total_checkpoints': len(job.checkpoints),
'successful_checkpoints': len([cp for cp in job.checkpoints if cp.status == CheckpointStatus.COMPLETED]),
'failed_checkpoints': len([cp for cp in job.checkpoints if cp.status == CheckpointStatus.FAILED])
}
return {
'job_id': job_id,
'status': job.status.value,
'metrics': metrics,
'timestamp': datetime.now().isoformat()
}
def restart_job(self, job_id: str, from_savepoint: Optional[str] = None) -> Dict[str, Any]:
"""
重启作业
Args:
job_id: 作业ID
from_savepoint: 从保存点恢复
Returns:
Dict[str, Any]: 重启结果
"""
if job_id not in self.jobs:
return {'status': 'error', 'message': f'Job {job_id} not found'}
job = self.jobs[job_id]
# 检查作业状态
if job.status == FlinkJobStatus.RUNNING:
return {'status': 'error', 'message': f'Job {job_id} is already running'}
# 检查资源
if self.cluster_metrics['available_slots'] < job.parallelism:
return {
'status': 'error',
'message': f'Insufficient slots. Required: {job.parallelism}, Available: {self.cluster_metrics["available_slots"]}'
}
# 重启作业
job.status = FlinkJobStatus.RESTARTING
# 分配资源
self._allocate_slots(job)
# 更新状态为运行中
job.status = FlinkJobStatus.RUNNING
job.start_time = datetime.now()
job.end_time = None
# 更新集群指标
self.cluster_metrics['running_jobs'] += 1
if job.status in [FlinkJobStatus.FAILED, FlinkJobStatus.CANCELED]:
self.cluster_metrics['failed_jobs'] -= 1
result = {
'status': 'success',
'job_id': job_id,
'restart_time': job.start_time.isoformat(),
'parallelism': job.parallelism
}
if from_savepoint:
result['restored_from_savepoint'] = from_savepoint
return result
# 使用示例
if __name__ == "__main__":
# 创建Flink集群管理器
flink_manager = FlinkClusterManager()
print("=== Flink集群管理示例 ===")
# 添加任务管理器
print("\n=== 添加任务管理器 ===")
for i in range(3):
tm_config = {
'tm_id': f'taskmanager-{i+1}',
'host': f'worker{i+1}',
'port': 6121 + i,
'slots_total': 4,
'memory_mb': 4096,
'cpu_cores': 4
}
tm_result = flink_manager.add_task_manager(tm_config)
print(f"任务管理器 {i+1}: {tm_result}")
# 提交流处理作业
print("\n=== 提交流处理作业 ===")
streaming_job_config = {
'name': 'Real-time Analytics',
'job_type': 'STREAMING',
'parallelism': 6,
'max_parallelism': 128,
'checkpointing_enabled': True,
'checkpoint_interval_ms': 30000,
'restart_strategy': 'fixed-delay'
}
job_result = flink_manager.submit_job(streaming_job_config)
print(f"作业提交结果: {job_result}")
if job_result['status'] == 'success':
job_id = job_result['job_id']
# 启动作业
print("\n=== 启动作业 ===")
start_result = flink_manager.start_job(job_id)
print(f"作业启动结果: {start_result}")
# 获取作业详情
print("\n=== 作业详情 ===")
job_details = flink_manager.get_job_details(job_id)
print(f"作业名称: {job_details['name']}")
print(f"作业状态: {job_details['status']}")
print(f"并行度: {job_details['parallelism']}")
print(f"检查点: 启用={job_details['checkpointing']['enabled']}, 间隔={job_details['checkpointing']['interval_ms']}ms")
# 获取作业指标
print("\n=== 作业指标 ===")
metrics = flink_manager.get_job_metrics(job_id)
print(f"输入速率: {metrics['metrics']['records_in_per_second']} records/sec")
print(f"输出速率: {metrics['metrics']['records_out_per_second']} records/sec")
print(f"CPU利用率: {metrics['metrics']['cpu_utilization']*100:.1f}%")
print(f"内存利用率: {metrics['metrics']['memory_utilization']*100:.1f}%")
# 创建保存点
print("\n=== 创建保存点 ===")
savepoint_result = flink_manager.create_savepoint(job_id)
print(f"保存点创建结果: {savepoint_result}")
# 停止作业(带保存点)
print("\n=== 停止作业 ===")
stop_result = flink_manager.stop_job(job_id, with_savepoint=True)
print(f"作业停止结果: {stop_result}")
# 重启作业
print("\n=== 重启作业 ===")
restart_result = flink_manager.restart_job(job_id)
print(f"作业重启结果: {restart_result}")
# 获取集群概览
print("\n=== 集群概览 ===")
cluster_overview = flink_manager.get_cluster_overview()
print(f"任务管理器数量: {cluster_overview['task_managers']['total_task_managers']}")
print(f"总slot数: {cluster_overview['cluster_metrics']['total_slots']}")
print(f"可用slot数: {cluster_overview['cluster_metrics']['available_slots']}")
print(f"运行中作业: {cluster_overview['cluster_metrics']['running_jobs']}")
print(f"已完成作业: {cluster_overview['cluster_metrics']['completed_jobs']}")
print("\n任务管理器详情:")
for tm in cluster_overview['task_managers']['task_managers']:
print(f" {tm['tm_id']}: {tm['slots_used']}/{tm['slots_total']} slots used, {tm['memory_mb']}MB memory")
更多推荐
所有评论(0)