OpenClaw 实战案例:数据分析平台构建
·
目录
摘要
本文通过一个完整的数据分析平台案例,演示如何使用 OpenClaw 构建智能数据分析系统。文章涵盖数据采集、数据清洗、数据分析、可视化展示等核心功能,帮助开发者掌握 OpenClaw 在数据分析场景的应用。通过详细的系统设计和代码实现,让读者了解数据分析平台的完整构建过程。📊
1. 引言 - 数据分析平台概述
1.1 数据分析需求
企业数据分析面临诸多挑战,传统方案难以满足现代业务需求:
| 挑战 | 传统方案 | OpenClaw方案 |
|---|---|---|
| 数据分散 | 手动汇总 | 自动采集整合 |
| 分析门槛高 | 需要专业分析师 | 自然语言查询 |
| 响应慢 | 批量处理 | 实时分析 |
| 洞察浅 | 描述性分析 | 预测性分析 |
| 协作难 | 报告分发 | 智能问答 |
1.2 平台架构设计
1.3 核心功能规划
| 功能模块 | 核心能力 | 技术实现 |
|---|---|---|
| 数据采集 | 多源数据接入 | 连接器 + 流式采集 |
| 数据清洗 | 数据质量保障 | 规则引擎 + 异常检测 |
| 数据分析 | 多维度分析 | SQL + ML |
| 智能查询 | 自然语言交互 | NLP + SQL生成 |
| 可视化 | 图表展示 | 图表库 + 自动推荐 |
2. 数据采集模块
2.1 多源数据连接器
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import pandas as pd
@dataclass
class DataSource:
"""数据源配置"""
name: str
type: str
connection: Dict[str, Any]
schema: Optional[Dict] = None
class DataConnector(ABC):
"""数据连接器基类"""
@abstractmethod
def connect(self, source: DataSource) -> bool:
"""建立连接"""
pass
@abstractmethod
def fetch(self, query: str) -> pd.DataFrame:
"""获取数据"""
pass
@abstractmethod
def get_schema(self) -> Dict:
"""获取数据结构"""
pass
class DatabaseConnector(DataConnector):
"""数据库连接器"""
def __init__(self):
self.connection = None
self.source = None
def connect(self, source: DataSource) -> bool:
"""建立数据库连接"""
self.source = source
# 根据数据库类型选择驱动
db_type = source.type
if db_type == "mysql":
import pymysql
self.connection = pymysql.connect(
host=source.connection["host"],
port=source.connection.get("port", 3306),
user=source.connection["user"],
password=source.connection["password"],
database=source.connection["database"]
)
elif db_type == "postgresql":
import psycopg2
self.connection = psycopg2.connect(
host=source.connection["host"],
port=source.connection.get("port", 5432),
user=source.connection["user"],
password=source.connection["password"],
database=source.connection["database"]
)
elif db_type == "sqlite":
import sqlite3
self.connection = sqlite3.connect(source.connection["path"])
return self.connection is not None
def fetch(self, query: str) -> pd.DataFrame:
"""执行查询并返回DataFrame"""
if not self.connection:
raise Exception("未建立连接")
return pd.read_sql(query, self.connection)
def get_schema(self) -> Dict:
"""获取数据库结构"""
if self.source.type == "mysql":
query = f"""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = '{self.source.connection["database"]}'
"""
elif self.source.type == "postgresql":
query = f"""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
"""
else:
return {}
df = self.fetch(query)
# 构建schema字典
schema = {}
for _, row in df.iterrows():
table = row["table_name"]
if table not in schema:
schema[table] = {"columns": []}
schema[table]["columns"].append({
"name": row["column_name"],
"type": row["data_type"]
})
return schema
class APIConnector(DataConnector):
"""API连接器"""
def __init__(self):
self.source = None
self.session = None
def connect(self, source: DataSource) -> bool:
"""配置API连接"""
self.source = source
import requests
self.session = requests.Session()
# 设置认证
if "api_key" in source.connection:
self.session.headers["Authorization"] = f"Bearer {source.connection['api_key']}"
return True
def fetch(self, endpoint: str, params: Dict = None) -> pd.DataFrame:
"""获取API数据"""
if not self.session:
raise Exception("未建立连接")
base_url = self.source.connection["base_url"]
url = f"{base_url}/{endpoint}"
response = self.session.get(url, params=params)
if response.status_code == 200:
data = response.json()
# 处理嵌套数据
if isinstance(data, list):
return pd.DataFrame(data)
elif isinstance(data, dict):
# 尝试找到数据列表
for key in ["data", "results", "items"]:
if key in data and isinstance(data[key], list):
return pd.DataFrame(data[key])
return pd.DataFrame([data])
raise Exception(f"API请求失败: {response.status_code}")
def get_schema(self) -> Dict:
"""获取API数据结构"""
# 通过示例数据推断结构
return {}
class FileConnector(DataConnector):
"""文件连接器"""
def __init__(self):
self.source = None
self.data = None
def connect(self, source: DataSource) -> bool:
"""加载文件"""
self.source = source
file_path = source.connection["path"]
file_type = source.connection.get("type", "csv")
if file_type == "csv":
self.data = pd.read_csv(file_path)
elif file_type == "excel":
self.data = pd.read_excel(file_path)
elif file_type == "json":
self.data = pd.read_json(file_path)
elif file_type == "parquet":
self.data = pd.read_parquet(file_path)
else:
return False
return True
def fetch(self, query: str = None) -> pd.DataFrame:
"""获取数据"""
return self.data.copy()
def get_schema(self) -> Dict:
"""获取数据结构"""
if self.data is None:
return {}
return {
"columns": [
{"name": col, "type": str(self.data[col].dtype)}
for col in self.data.columns
]
}
# 使用示例
# 数据库连接
db_source = DataSource(
name="main_db",
type="mysql",
connection={
"host": "localhost",
"user": "root",
"password": "password",
"database": "mydb"
}
)
db_connector = DatabaseConnector()
db_connector.connect(db_source)
df = db_connector.fetch("SELECT * FROM users LIMIT 10")
print(f"获取 {len(df)} 条记录")
# API连接
api_source = DataSource(
name="weather_api",
type="api",
connection={
"base_url": "https://api.weather.com",
"api_key": "your_api_key"
}
)
api_connector = APIConnector()
api_connector.connect(api_source)
weather_df = api_connector.fetch("current", {"city": "beijing"})
print(f"天气数据: {weather_df.shape}")
# 文件连接
file_source = DataSource(
name="sales_data",
type="file",
connection={
"path": "/data/sales.csv",
"type": "csv"
}
)
file_connector = FileConnector()
file_connector.connect(file_source)
sales_df = file_connector.fetch()
print(f"销售数据: {sales_df.shape}")
2.2 数据采集调度器
from typing import Dict, List, Callable
from datetime import datetime, timedelta
import threading
import time
@dataclass
class CollectionTask:
"""采集任务"""
id: str
name: str
connector: DataConnector
query: str
schedule: str # cron表达式
callback: Callable
last_run: datetime = None
next_run: datetime = None
status: str = "pending"
class DataCollectionScheduler:
"""数据采集调度器"""
def __init__(self):
self.tasks: Dict[str, CollectionTask] = {}
self.running = False
self.thread = None
def add_task(self, task: CollectionTask):
"""添加采集任务"""
# 计算下次运行时间
task.next_run = self._parse_schedule(task.schedule)
self.tasks[task.id] = task
def remove_task(self, task_id: str):
"""移除采集任务"""
if task_id in self.tasks:
del self.tasks[task_id]
def start(self):
"""启动调度器"""
self.running = True
self.thread = threading.Thread(target=self._run_loop, daemon=True)
self.thread.start()
def stop(self):
"""停止调度器"""
self.running = False
if self.thread:
self.thread.join(timeout=5)
def _run_loop(self):
"""调度循环"""
while self.running:
now = datetime.now()
for task in self.tasks.values():
if task.next_run and now >= task.next_run:
self._execute_task(task)
task.last_run = now
task.next_run = self._parse_schedule(task.schedule)
time.sleep(1)
def _execute_task(self, task: CollectionTask):
"""执行采集任务"""
try:
task.status = "running"
# 获取数据
df = task.connector.fetch(task.query)
# 调用回调
task.callback(df)
task.status = "success"
except Exception as e:
task.status = "failed"
print(f"任务 {task.name} 执行失败: {e}")
def _parse_schedule(self, schedule: str) -> datetime:
"""解析调度时间"""
# 简化实现:支持简单格式
# 实际应使用 croniter 库
if schedule == "every_minute":
return datetime.now() + timedelta(minutes=1)
elif schedule == "every_hour":
return datetime.now() + timedelta(hours=1)
elif schedule == "every_day":
return datetime.now() + timedelta(days=1)
else:
return datetime.now() + timedelta(hours=1)
def get_task_status(self) -> List[Dict]:
"""获取任务状态"""
return [
{
"id": task.id,
"name": task.name,
"status": task.status,
"last_run": task.last_run.isoformat() if task.last_run else None,
"next_run": task.next_run.isoformat() if task.next_run else None
}
for task in self.tasks.values()
]
# 使用示例
scheduler = DataCollectionScheduler()
def process_data(df: pd.DataFrame):
"""处理采集的数据"""
print(f"处理 {len(df)} 条数据")
# 存储到数据仓库
# 或触发后续分析
# 添加采集任务
task1 = CollectionTask(
id="task_001",
name="用户数据采集",
connector=db_connector,
query="SELECT * FROM users WHERE created_at > NOW() - INTERVAL 1 HOUR",
schedule="every_hour",
callback=process_data
)
scheduler.add_task(task1)
scheduler.start()
3. 数据清洗模块
3.1 数据质量检测
from typing import Dict, List, Tuple
import numpy as np
class DataQualityChecker:
"""数据质量检测器"""
def __init__(self):
self.rules: List[Dict] = []
def add_rule(self, column: str, rule_type: str, params: Dict = None):
"""添加质量规则"""
self.rules.append({
"column": column,
"type": rule_type,
"params": params or {}
})
def check(self, df: pd.DataFrame) -> Dict:
"""执行质量检测"""
results = {
"total_rows": len(df),
"total_columns": len(df.columns),
"issues": [],
"score": 100
}
for rule in self.rules:
column = rule["column"]
rule_type = rule["type"]
params = rule["params"]
if column not in df.columns:
results["issues"].append({
"column": column,
"type": "missing_column",
"message": f"列 {column} 不存在"
})
continue
col_data = df[column]
if rule_type == "not_null":
null_count = col_data.isnull().sum()
if null_count > 0:
results["issues"].append({
"column": column,
"type": "null_values",
"count": null_count,
"percentage": null_count / len(df) * 100
})
elif rule_type == "unique":
dup_count = col_data.duplicated().sum()
if dup_count > 0:
results["issues"].append({
"column": column,
"type": "duplicates",
"count": dup_count
})
elif rule_type == "range":
min_val = params.get("min")
max_val = params.get("max")
if min_val is not None:
below_min = (col_data < min_val).sum()
if below_min > 0:
results["issues"].append({
"column": column,
"type": "below_min",
"count": below_min,
"min": min_val
})
if max_val is not None:
above_max = (col_data > max_val).sum()
if above_max > 0:
results["issues"].append({
"column": column,
"type": "above_max",
"count": above_max,
"max": max_val
})
elif rule_type == "pattern":
pattern = params.get("pattern")
if pattern:
import re
invalid = ~col_data.astype(str).str.match(pattern, na=False)
invalid_count = invalid.sum()
if invalid_count > 0:
results["issues"].append({
"column": column,
"type": "pattern_mismatch",
"count": invalid_count,
"pattern": pattern
})
# 计算质量分数
if results["issues"]:
issue_penalty = sum(
issue.get("percentage", 5)
for issue in results["issues"]
)
results["score"] = max(0, 100 - issue_penalty)
return results
def get_profile(self, df: pd.DataFrame) -> Dict:
"""获取数据概要"""
profile = {
"row_count": len(df),
"column_count": len(df.columns),
"memory_usage": df.memory_usage(deep=True).sum(),
"columns": {}
}
for col in df.columns:
col_data = df[col]
col_profile = {
"dtype": str(col_data.dtype),
"null_count": col_data.isnull().sum(),
"null_percentage": col_data.isnull().sum() / len(df) * 100,
"unique_count": col_data.nunique()
}
# 数值类型统计
if col_data.dtype in ["int64", "float64"]:
col_profile.update({
"min": col_data.min(),
"max": col_data.max(),
"mean": col_data.mean(),
"median": col_data.median(),
"std": col_data.std()
})
# 字符串类型统计
elif col_data.dtype == "object":
col_profile.update({
"min_length": col_data.astype(str).str.len().min(),
"max_length": col_data.astype(str).str.len().max(),
"avg_length": col_data.astype(str).str.len().mean()
})
profile["columns"][col] = col_profile
return profile
# 使用示例
checker = DataQualityChecker()
# 添加质量规则
checker.add_rule("user_id", "not_null")
checker.add_rule("user_id", "unique")
checker.add_rule("age", "range", {"min": 0, "max": 150})
checker.add_rule("email", "pattern", {"pattern": r"^[\w\.-]+@[\w\.-]+\.\w+$"})
# 执行检测
df = pd.DataFrame({
"user_id": [1, 2, 3, None, 5],
"age": [25, 30, -5, 40, 200],
"email": ["a@b.com", "invalid", "c@d.com", "e@f.com", "g@h.com"]
})
results = checker.check(df)
print(f"质量分数: {results['score']}")
print(f"问题数: {len(results['issues'])}")
# 获取数据概要
profile = checker.get_profile(df)
print(f"数据概要: {profile['row_count']} 行, {profile['column_count']} 列")
3.2 数据清洗处理器
from typing import Dict, List, Callable
class DataCleaner:
"""数据清洗处理器"""
def __init__(self):
self.steps: List[Dict] = []
def add_step(self, name: str, processor: Callable, params: Dict = None):
"""添加清洗步骤"""
self.steps.append({
"name": name,
"processor": processor,
"params": params or {}
})
def clean(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
"""执行清洗"""
original_count = len(df)
report = {
"original_rows": original_count,
"steps": []
}
result_df = df.copy()
for step in self.steps:
before_count = len(result_df)
result_df = step["processor"](result_df, **step["params"])
after_count = len(result_df)
report["steps"].append({
"name": step["name"],
"rows_before": before_count,
"rows_after": after_count,
"rows_removed": before_count - after_count
})
report["final_rows"] = len(result_df)
report["rows_removed"] = original_count - len(result_df)
return result_df, report
# 预定义清洗处理器
def remove_duplicates(df: pd.DataFrame, subset: List[str] = None) -> pd.DataFrame:
"""去除重复"""
return df.drop_duplicates(subset=subset)
def fill_missing(df: pd.DataFrame, columns: Dict[str, Any] = None) -> pd.DataFrame:
"""填充缺失值"""
result = df.copy()
for col, value in (columns or {}).items():
if col in result.columns:
result[col] = result[col].fillna(value)
return result
def remove_outliers(df: pd.DataFrame, column: str, method: str = "iqr", threshold: float = 1.5) -> pd.DataFrame:
"""去除异常值"""
if column not in df.columns:
return df
if method == "iqr":
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - threshold * IQR
upper = Q3 + threshold * IQR
return df[(df[column] >= lower) & (df[column] <= upper)]
elif method == "zscore":
from scipy import stats
z_scores = stats.zscore(df[column].dropna())
return df[abs(z_scores) <= threshold]
return df
def standardize_text(df: pd.DataFrame, column: str, lowercase: bool = True, strip: bool = True) -> pd.DataFrame:
"""标准化文本"""
result = df.copy()
if column in result.columns:
if lowercase:
result[column] = result[column].astype(str).str.lower()
if strip:
result[column] = result[column].astype(str).str.strip()
return result
def convert_types(df: pd.DataFrame, columns: Dict[str, str]) -> pd.DataFrame:
"""转换数据类型"""
result = df.copy()
for col, dtype in columns.items():
if col in result.columns:
try:
result[col] = result[col].astype(dtype)
except Exception as e:
print(f"转换 {col} 失败: {e}")
return result
# 使用示例
cleaner = DataCleaner()
# 添加清洗步骤
cleaner.add_step("去除重复", remove_duplicates, {"subset": ["user_id"]})
cleaner.add_step("填充缺失", fill_missing, {"columns": {"age": 0, "name": "未知"}})
cleaner.add_step("去除异常值", remove_outliers, {"column": "age", "method": "iqr"})
cleaner.add_step("标准化文本", standardize_text, {"column": "email", "lowercase": True})
cleaner.add_step("类型转换", convert_types, {"columns": {"age": "int64", "created_at": "datetime64"}})
# 执行清洗
cleaned_df, report = cleaner.clean(df)
print(f"清洗报告: {report}")
4. 数据分析引擎
4.1 统计分析器
from typing import Dict, List, Optional
from scipy import stats
import numpy as np
class StatisticalAnalyzer:
"""统计分析器"""
def __init__(self, df: pd.DataFrame):
self.df = df
def descriptive_stats(self, columns: List[str] = None) -> Dict:
"""描述性统计"""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns.tolist()
result = {}
for col in columns:
if col not in self.df.columns:
continue
data = self.df[col].dropna()
result[col] = {
"count": len(data),
"mean": data.mean(),
"std": data.std(),
"min": data.min(),
"q1": data.quantile(0.25),
"median": data.median(),
"q3": data.quantile(0.75),
"max": data.max(),
"skewness": data.skew(),
"kurtosis": data.kurtosis()
}
return result
def correlation_analysis(self, method: str = "pearson") -> pd.DataFrame:
"""相关性分析"""
numeric_df = self.df.select_dtypes(include=[np.number])
return numeric_df.corr(method=method)
def hypothesis_test(self, column1: str, column2: str, test_type: str = "ttest") -> Dict:
"""假设检验"""
data1 = self.df[column1].dropna()
data2 = self.df[column2].dropna()
if test_type == "ttest":
statistic, pvalue = stats.ttest_ind(data1, data2)
return {
"test": "t-test",
"statistic": statistic,
"p_value": pvalue,
"significant": pvalue < 0.05
}
elif test_type == "mannwhitney":
statistic, pvalue = stats.mannwhitneyu(data1, data2)
return {
"test": "Mann-Whitney U",
"statistic": statistic,
"p_value": pvalue,
"significant": pvalue < 0.05
}
elif test_type == "chi2":
contingency = pd.crosstab(self.df[column1], self.df[column2])
statistic, pvalue, dof, expected = stats.chi2_contingency(contingency)
return {
"test": "Chi-square",
"statistic": statistic,
"p_value": pvalue,
"dof": dof,
"significant": pvalue < 0.05
}
return {}
def anova(self, group_column: str, value_column: str) -> Dict:
"""方差分析"""
groups = self.df.groupby(group_column)[value_column]
group_data = [group.dropna().values for name, group in groups]
statistic, pvalue = stats.f_oneway(*group_data)
return {
"test": "ANOVA",
"statistic": statistic,
"p_value": pvalue,
"significant": pvalue < 0.05,
"groups": len(group_data)
}
def time_series_analysis(self, date_column: str, value_column: str, freq: str = "D") -> Dict:
"""时间序列分析"""
df = self.df.copy()
df[date_column] = pd.to_datetime(df[date_column])
df = df.set_index(date_column)
# 重采样
resampled = df[value_column].resample(freq)
result = {
"daily_stats": {
"mean": resampled.mean().to_dict(),
"sum": resampled.sum().to_dict(),
"count": resampled.count().to_dict()
}
}
# 趋势分析
from scipy.signal import detrend
values = df[value_column].values
trend = detrend(values)
result["detrended"] = trend.tolist()
return result
# 使用示例
analyzer = StatisticalAnalyzer(sales_df)
# 描述性统计
stats_result = analyzer.descriptive_stats(["price", "quantity", "total"])
print(f"统计结果: {stats_result}")
# 相关性分析
corr = analyzer.correlation_analysis()
print(f"相关性矩阵:\n{corr}")
# 假设检验
test_result = analyzer.hypothesis_test("group_a", "group_b", "ttest")
print(f"检验结果: {test_result}")
4.2 自然语言查询
from typing import Dict, List, Optional
import re
class NaturalLanguageQuery:
"""自然语言查询处理器"""
def __init__(self, schema: Dict):
self.schema = schema
self.query_templates = self._build_templates()
def _build_templates(self) -> List[Dict]:
"""构建查询模板"""
return [
{
"pattern": r"(.+)的平均值",
"sql_template": "SELECT AVG({column}) FROM {table}",
"type": "aggregation"
},
{
"pattern": r"(.+)的总和",
"sql_template": "SELECT SUM({column}) FROM {table}",
"type": "aggregation"
},
{
"pattern": r"(.+)的最大值",
"sql_template": "SELECT MAX({column}) FROM {table}",
"type": "aggregation"
},
{
"pattern": r"(.+)的最小值",
"sql_template": "SELECT MIN({column}) FROM {table}",
"type": "aggregation"
},
{
"pattern": r"按(.+)分组统计(.+)",
"sql_template": "SELECT {group_column}, COUNT(*) FROM {table} GROUP BY {group_column}",
"type": "grouping"
},
{
"pattern": r"(.+)前(\d+)名",
"sql_template": "SELECT * FROM {table} ORDER BY {column} DESC LIMIT {limit}",
"type": "ranking"
}
]
def parse(self, question: str) -> Dict:
"""解析自然语言问题"""
result = {
"question": question,
"sql": None,
"type": None,
"confidence": 0
}
for template in self.query_templates:
match = re.search(template["pattern"], question)
if match:
result["type"] = template["type"]
# 提取参数
if template["type"] == "aggregation":
column_name = match.group(1)
column = self._find_column(column_name)
table = self._find_table(column)
if column and table:
result["sql"] = template["sql_template"].format(
column=column,
table=table
)
result["confidence"] = 0.8
elif template["type"] == "grouping":
group_col_name = match.group(1)
value_col_name = match.group(2)
group_column = self._find_column(group_col_name)
table = self._find_table(group_column)
if group_column and table:
result["sql"] = template["sql_template"].format(
group_column=group_column,
table=table
)
result["confidence"] = 0.7
elif template["type"] == "ranking":
column_name = match.group(1)
limit = match.group(2)
column = self._find_column(column_name)
table = self._find_table(column)
if column and table:
result["sql"] = template["sql_template"].format(
column=column,
table=table,
limit=limit
)
result["confidence"] = 0.8
break
return result
def _find_column(self, name: str) -> Optional[str]:
"""查找匹配的列名"""
name_lower = name.lower()
for table_name, table_info in self.schema.items():
for column in table_info.get("columns", []):
if name_lower in column["name"].lower():
return column["name"]
return None
def _find_table(self, column: str) -> Optional[str]:
"""查找列所在的表"""
for table_name, table_info in self.schema.items():
for col in table_info.get("columns", []):
if col["name"] == column:
return table_name
return None
def execute(self, question: str, connector: DataConnector) -> pd.DataFrame:
"""执行自然语言查询"""
parsed = self.parse(question)
if parsed["sql"]:
return connector.fetch(parsed["sql"])
return pd.DataFrame()
# 使用示例
schema = {
"sales": {
"columns": [
{"name": "product", "type": "string"},
{"name": "price", "type": "float"},
{"name": "quantity", "type": "int"},
{"name": "region", "type": "string"}
]
}
}
nlq = NaturalLanguageQuery(schema)
# 解析问题
result = nlq.parse("价格的平均值")
print(f"SQL: {result['sql']}")
print(f"置信度: {result['confidence']}")
# 执行查询
# df = nlq.execute("销售额前10名", db_connector)
5. 可视化引擎
5.1 图表生成器
from typing import Dict, List, Optional
import matplotlib.pyplot as plt
import seaborn as sns
class ChartGenerator:
"""图表生成器"""
def __init__(self, style: str = "seaborn"):
plt.style.use(style)
self.figures: List[plt.Figure] = []
def bar_chart(self, df: pd.DataFrame, x: str, y: str, title: str = None) -> plt.Figure:
"""柱状图"""
fig, ax = plt.subplots(figsize=(10, 6))
df.plot.bar(x=x, y=y, ax=ax)
if title:
ax.set_title(title)
ax.set_xlabel(x)
ax.set_ylabel(y)
plt.tight_layout()
self.figures.append(fig)
return fig
def line_chart(self, df: pd.DataFrame, x: str, y: str, title: str = None) -> plt.Figure:
"""折线图"""
fig, ax = plt.subplots(figsize=(12, 6))
df.plot.line(x=x, y=y, ax=ax)
if title:
ax.set_title(title)
plt.tight_layout()
self.figures.append(fig)
return fig
def pie_chart(self, df: pd.DataFrame, values: str, labels: str, title: str = None) -> plt.Figure:
"""饼图"""
fig, ax = plt.subplots(figsize=(8, 8))
ax.pie(df[values], labels=df[labels], autopct='%1.1f%%')
if title:
ax.set_title(title)
self.figures.append(fig)
return fig
def scatter_plot(self, df: pd.DataFrame, x: str, y: str, hue: str = None, title: str = None) -> plt.Figure:
"""散点图"""
fig, ax = plt.subplots(figsize=(10, 8))
if hue:
for category in df[hue].unique():
subset = df[df[hue] == category]
ax.scatter(subset[x], subset[y], label=category, alpha=0.6)
ax.legend()
else:
ax.scatter(df[x], df[y], alpha=0.6)
ax.set_xlabel(x)
ax.set_ylabel(y)
if title:
ax.set_title(title)
plt.tight_layout()
self.figures.append(fig)
return fig
def heatmap(self, df: pd.DataFrame, title: str = None) -> plt.Figure:
"""热力图"""
fig, ax = plt.subplots(figsize=(10, 8))
sns.heatmap(df, annot=True, fmt=".2f", cmap="coolwarm", ax=ax)
if title:
ax.set_title(title)
plt.tight_layout()
self.figures.append(fig)
return fig
def histogram(self, df: pd.DataFrame, column: str, bins: int = 30, title: str = None) -> plt.Figure:
"""直方图"""
fig, ax = plt.subplots(figsize=(10, 6))
ax.hist(df[column], bins=bins, edgecolor='black')
ax.set_xlabel(column)
ax.set_ylabel('Frequency')
if title:
ax.set_title(title)
plt.tight_layout()
self.figures.append(fig)
return fig
def box_plot(self, df: pd.DataFrame, x: str, y: str, title: str = None) -> plt.Figure:
"""箱线图"""
fig, ax = plt.subplots(figsize=(10, 6))
df.boxplot(column=y, by=x, ax=ax)
if title:
ax.set_title(title)
plt.tight_layout()
self.figures.append(fig)
return fig
def save_all(self, directory: str, prefix: str = "chart"):
"""保存所有图表"""
import os
os.makedirs(directory, exist_ok=True)
for i, fig in enumerate(self.figures):
path = os.path.join(directory, f"{prefix}_{i+1}.png")
fig.savefig(path, dpi=150)
print(f"保存图表: {path}")
def close_all(self):
"""关闭所有图表"""
for fig in self.figures:
plt.close(fig)
self.figures.clear()
# 使用示例
chart_gen = ChartGenerator()
# 生成图表
chart_gen.bar_chart(sales_df, x="product", y="sales", title="产品销售额")
chart_gen.line_chart(time_df, x="date", y="revenue", title="收入趋势")
chart_gen.pie_chart(category_df, values="amount", labels="category", title="类别占比")
chart_gen.scatter_plot(sales_df, x="price", y="quantity", hue="region", title="价格与销量关系")
# 保存图表
chart_gen.save_all("/output/charts", "analysis")
6. 最佳实践
6.1 平台设计原则
| 原则 | 说明 | 实践 |
|---|---|---|
| 易用性 | 降低使用门槛 | 自然语言查询 |
| 可扩展 | 支持新数据源 | 插件式连接器 |
| 高性能 | 快速响应 | 缓存 + 索引 |
| 安全性 | 数据保护 | 权限控制 |
6.2 常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 查询慢 | 数据量大 | 分区 + 索引 |
| 结果不准 | 数据质量差 | 数据清洗 |
| 图表乱码 | 编码问题 | 设置字体 |
7. 总结
7.1 核心要点
本文通过完整的数据分析平台案例,展示了 OpenClaw 在数据分析场景的应用:
| 模块 | 核心功能 | 技术要点 |
|---|---|---|
| 数据采集 | 多源接入 | 连接器 + 调度 |
| 数据清洗 | 质量保障 | 规则 + 处理器 |
| 数据分析 | 多维分析 | 统计 + ML |
| 智能查询 | 自然语言 | NLP + SQL |
| 可视化 | 图表展示 | 自动推荐 |
7.2 下一步学习
- 第76篇:OpenClaw 实战案例:内容创作系统
参考资料
更多推荐



所有评论(0)