摘要

本文通过一个完整的数据分析平台案例,演示如何使用 OpenClaw 构建智能数据分析系统。文章涵盖数据采集、数据清洗、数据分析、可视化展示等核心功能,帮助开发者掌握 OpenClaw 在数据分析场景的应用。通过详细的系统设计和代码实现,让读者了解数据分析平台的完整构建过程。📊


1. 引言 - 数据分析平台概述

1.1 数据分析需求

企业数据分析面临诸多挑战,传统方案难以满足现代业务需求:

挑战 传统方案 OpenClaw方案
数据分散 手动汇总 自动采集整合
分析门槛高 需要专业分析师 自然语言查询
响应慢 批量处理 实时分析
洞察浅 描述性分析 预测性分析
协作难 报告分发 智能问答

1.2 平台架构设计

应用服务层

分析引擎层

数据处理层

数据源层

数据库

API接口

文件上传

实时流

数据采集

数据清洗

数据转换

数据存储

统计分析

机器学习

自然语言查询

可视化引擎

报表中心

智能问答

预警系统

数据API

1.3 核心功能规划

功能模块 核心能力 技术实现
数据采集 多源数据接入 连接器 + 流式采集
数据清洗 数据质量保障 规则引擎 + 异常检测
数据分析 多维度分析 SQL + ML
智能查询 自然语言交互 NLP + SQL生成
可视化 图表展示 图表库 + 自动推荐

2. 数据采集模块

2.1 多源数据连接器

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import pandas as pd

@dataclass
class DataSource:
    """数据源配置"""
    name: str
    type: str
    connection: Dict[str, Any]
    schema: Optional[Dict] = None

class DataConnector(ABC):
    """数据连接器基类"""
    
    @abstractmethod
    def connect(self, source: DataSource) -> bool:
        """建立连接"""
        pass
    
    @abstractmethod
    def fetch(self, query: str) -> pd.DataFrame:
        """获取数据"""
        pass
    
    @abstractmethod
    def get_schema(self) -> Dict:
        """获取数据结构"""
        pass

class DatabaseConnector(DataConnector):
    """数据库连接器"""
    
    def __init__(self):
        self.connection = None
        self.source = None
    
    def connect(self, source: DataSource) -> bool:
        """建立数据库连接"""
        self.source = source
        
        # 根据数据库类型选择驱动
        db_type = source.type
        
        if db_type == "mysql":
            import pymysql
            self.connection = pymysql.connect(
                host=source.connection["host"],
                port=source.connection.get("port", 3306),
                user=source.connection["user"],
                password=source.connection["password"],
                database=source.connection["database"]
            )
        
        elif db_type == "postgresql":
            import psycopg2
            self.connection = psycopg2.connect(
                host=source.connection["host"],
                port=source.connection.get("port", 5432),
                user=source.connection["user"],
                password=source.connection["password"],
                database=source.connection["database"]
            )
        
        elif db_type == "sqlite":
            import sqlite3
            self.connection = sqlite3.connect(source.connection["path"])
        
        return self.connection is not None
    
    def fetch(self, query: str) -> pd.DataFrame:
        """执行查询并返回DataFrame"""
        if not self.connection:
            raise Exception("未建立连接")
        
        return pd.read_sql(query, self.connection)
    
    def get_schema(self) -> Dict:
        """获取数据库结构"""
        if self.source.type == "mysql":
            query = f"""
            SELECT table_name, column_name, data_type 
            FROM information_schema.columns 
            WHERE table_schema = '{self.source.connection["database"]}'
            """
        elif self.source.type == "postgresql":
            query = f"""
            SELECT table_name, column_name, data_type 
            FROM information_schema.columns 
            WHERE table_schema = 'public'
            """
        else:
            return {}
        
        df = self.fetch(query)
        
        # 构建schema字典
        schema = {}
        for _, row in df.iterrows():
            table = row["table_name"]
            if table not in schema:
                schema[table] = {"columns": []}
            schema[table]["columns"].append({
                "name": row["column_name"],
                "type": row["data_type"]
            })
        
        return schema

class APIConnector(DataConnector):
    """API连接器"""
    
    def __init__(self):
        self.source = None
        self.session = None
    
    def connect(self, source: DataSource) -> bool:
        """配置API连接"""
        self.source = source
        
        import requests
        self.session = requests.Session()
        
        # 设置认证
        if "api_key" in source.connection:
            self.session.headers["Authorization"] = f"Bearer {source.connection['api_key']}"
        
        return True
    
    def fetch(self, endpoint: str, params: Dict = None) -> pd.DataFrame:
        """获取API数据"""
        if not self.session:
            raise Exception("未建立连接")
        
        base_url = self.source.connection["base_url"]
        url = f"{base_url}/{endpoint}"
        
        response = self.session.get(url, params=params)
        
        if response.status_code == 200:
            data = response.json()
            
            # 处理嵌套数据
            if isinstance(data, list):
                return pd.DataFrame(data)
            elif isinstance(data, dict):
                # 尝试找到数据列表
                for key in ["data", "results", "items"]:
                    if key in data and isinstance(data[key], list):
                        return pd.DataFrame(data[key])
                return pd.DataFrame([data])
        
        raise Exception(f"API请求失败: {response.status_code}")
    
    def get_schema(self) -> Dict:
        """获取API数据结构"""
        # 通过示例数据推断结构
        return {}

class FileConnector(DataConnector):
    """文件连接器"""
    
    def __init__(self):
        self.source = None
        self.data = None
    
    def connect(self, source: DataSource) -> bool:
        """加载文件"""
        self.source = source
        file_path = source.connection["path"]
        file_type = source.connection.get("type", "csv")
        
        if file_type == "csv":
            self.data = pd.read_csv(file_path)
        elif file_type == "excel":
            self.data = pd.read_excel(file_path)
        elif file_type == "json":
            self.data = pd.read_json(file_path)
        elif file_type == "parquet":
            self.data = pd.read_parquet(file_path)
        else:
            return False
        
        return True
    
    def fetch(self, query: str = None) -> pd.DataFrame:
        """获取数据"""
        return self.data.copy()
    
    def get_schema(self) -> Dict:
        """获取数据结构"""
        if self.data is None:
            return {}
        
        return {
            "columns": [
                {"name": col, "type": str(self.data[col].dtype)}
                for col in self.data.columns
            ]
        }

# 使用示例
# 数据库连接
db_source = DataSource(
    name="main_db",
    type="mysql",
    connection={
        "host": "localhost",
        "user": "root",
        "password": "password",
        "database": "mydb"
    }
)

db_connector = DatabaseConnector()
db_connector.connect(db_source)
df = db_connector.fetch("SELECT * FROM users LIMIT 10")
print(f"获取 {len(df)} 条记录")

# API连接
api_source = DataSource(
    name="weather_api",
    type="api",
    connection={
        "base_url": "https://api.weather.com",
        "api_key": "your_api_key"
    }
)

api_connector = APIConnector()
api_connector.connect(api_source)
weather_df = api_connector.fetch("current", {"city": "beijing"})
print(f"天气数据: {weather_df.shape}")

# 文件连接
file_source = DataSource(
    name="sales_data",
    type="file",
    connection={
        "path": "/data/sales.csv",
        "type": "csv"
    }
)

file_connector = FileConnector()
file_connector.connect(file_source)
sales_df = file_connector.fetch()
print(f"销售数据: {sales_df.shape}")

2.2 数据采集调度器

from typing import Dict, List, Callable
from datetime import datetime, timedelta
import threading
import time

@dataclass
class CollectionTask:
    """采集任务"""
    id: str
    name: str
    connector: DataConnector
    query: str
    schedule: str  # cron表达式
    callback: Callable
    last_run: datetime = None
    next_run: datetime = None
    status: str = "pending"

class DataCollectionScheduler:
    """数据采集调度器"""
    
    def __init__(self):
        self.tasks: Dict[str, CollectionTask] = {}
        self.running = False
        self.thread = None
    
    def add_task(self, task: CollectionTask):
        """添加采集任务"""
        # 计算下次运行时间
        task.next_run = self._parse_schedule(task.schedule)
        self.tasks[task.id] = task
    
    def remove_task(self, task_id: str):
        """移除采集任务"""
        if task_id in self.tasks:
            del self.tasks[task_id]
    
    def start(self):
        """启动调度器"""
        self.running = True
        self.thread = threading.Thread(target=self._run_loop, daemon=True)
        self.thread.start()
    
    def stop(self):
        """停止调度器"""
        self.running = False
        if self.thread:
            self.thread.join(timeout=5)
    
    def _run_loop(self):
        """调度循环"""
        while self.running:
            now = datetime.now()
            
            for task in self.tasks.values():
                if task.next_run and now >= task.next_run:
                    self._execute_task(task)
                    task.last_run = now
                    task.next_run = self._parse_schedule(task.schedule)
            
            time.sleep(1)
    
    def _execute_task(self, task: CollectionTask):
        """执行采集任务"""
        try:
            task.status = "running"
            
            # 获取数据
            df = task.connector.fetch(task.query)
            
            # 调用回调
            task.callback(df)
            
            task.status = "success"
        
        except Exception as e:
            task.status = "failed"
            print(f"任务 {task.name} 执行失败: {e}")
    
    def _parse_schedule(self, schedule: str) -> datetime:
        """解析调度时间"""
        # 简化实现:支持简单格式
        # 实际应使用 croniter 库
        
        if schedule == "every_minute":
            return datetime.now() + timedelta(minutes=1)
        elif schedule == "every_hour":
            return datetime.now() + timedelta(hours=1)
        elif schedule == "every_day":
            return datetime.now() + timedelta(days=1)
        else:
            return datetime.now() + timedelta(hours=1)
    
    def get_task_status(self) -> List[Dict]:
        """获取任务状态"""
        return [
            {
                "id": task.id,
                "name": task.name,
                "status": task.status,
                "last_run": task.last_run.isoformat() if task.last_run else None,
                "next_run": task.next_run.isoformat() if task.next_run else None
            }
            for task in self.tasks.values()
        ]

# 使用示例
scheduler = DataCollectionScheduler()

def process_data(df: pd.DataFrame):
    """处理采集的数据"""
    print(f"处理 {len(df)} 条数据")
    # 存储到数据仓库
    # 或触发后续分析

# 添加采集任务
task1 = CollectionTask(
    id="task_001",
    name="用户数据采集",
    connector=db_connector,
    query="SELECT * FROM users WHERE created_at > NOW() - INTERVAL 1 HOUR",
    schedule="every_hour",
    callback=process_data
)

scheduler.add_task(task1)
scheduler.start()

3. 数据清洗模块

3.1 数据质量检测

from typing import Dict, List, Tuple
import numpy as np

class DataQualityChecker:
    """数据质量检测器"""
    
    def __init__(self):
        self.rules: List[Dict] = []
    
    def add_rule(self, column: str, rule_type: str, params: Dict = None):
        """添加质量规则"""
        self.rules.append({
            "column": column,
            "type": rule_type,
            "params": params or {}
        })
    
    def check(self, df: pd.DataFrame) -> Dict:
        """执行质量检测"""
        results = {
            "total_rows": len(df),
            "total_columns": len(df.columns),
            "issues": [],
            "score": 100
        }
        
        for rule in self.rules:
            column = rule["column"]
            rule_type = rule["type"]
            params = rule["params"]
            
            if column not in df.columns:
                results["issues"].append({
                    "column": column,
                    "type": "missing_column",
                    "message": f"列 {column} 不存在"
                })
                continue
            
            col_data = df[column]
            
            if rule_type == "not_null":
                null_count = col_data.isnull().sum()
                if null_count > 0:
                    results["issues"].append({
                        "column": column,
                        "type": "null_values",
                        "count": null_count,
                        "percentage": null_count / len(df) * 100
                    })
            
            elif rule_type == "unique":
                dup_count = col_data.duplicated().sum()
                if dup_count > 0:
                    results["issues"].append({
                        "column": column,
                        "type": "duplicates",
                        "count": dup_count
                    })
            
            elif rule_type == "range":
                min_val = params.get("min")
                max_val = params.get("max")
                
                if min_val is not None:
                    below_min = (col_data < min_val).sum()
                    if below_min > 0:
                        results["issues"].append({
                            "column": column,
                            "type": "below_min",
                            "count": below_min,
                            "min": min_val
                        })
                
                if max_val is not None:
                    above_max = (col_data > max_val).sum()
                    if above_max > 0:
                        results["issues"].append({
                            "column": column,
                            "type": "above_max",
                            "count": above_max,
                            "max": max_val
                        })
            
            elif rule_type == "pattern":
                pattern = params.get("pattern")
                if pattern:
                    import re
                    invalid = ~col_data.astype(str).str.match(pattern, na=False)
                    invalid_count = invalid.sum()
                    if invalid_count > 0:
                        results["issues"].append({
                            "column": column,
                            "type": "pattern_mismatch",
                            "count": invalid_count,
                            "pattern": pattern
                        })
        
        # 计算质量分数
        if results["issues"]:
            issue_penalty = sum(
                issue.get("percentage", 5) 
                for issue in results["issues"]
            )
            results["score"] = max(0, 100 - issue_penalty)
        
        return results
    
    def get_profile(self, df: pd.DataFrame) -> Dict:
        """获取数据概要"""
        profile = {
            "row_count": len(df),
            "column_count": len(df.columns),
            "memory_usage": df.memory_usage(deep=True).sum(),
            "columns": {}
        }
        
        for col in df.columns:
            col_data = df[col]
            col_profile = {
                "dtype": str(col_data.dtype),
                "null_count": col_data.isnull().sum(),
                "null_percentage": col_data.isnull().sum() / len(df) * 100,
                "unique_count": col_data.nunique()
            }
            
            # 数值类型统计
            if col_data.dtype in ["int64", "float64"]:
                col_profile.update({
                    "min": col_data.min(),
                    "max": col_data.max(),
                    "mean": col_data.mean(),
                    "median": col_data.median(),
                    "std": col_data.std()
                })
            
            # 字符串类型统计
            elif col_data.dtype == "object":
                col_profile.update({
                    "min_length": col_data.astype(str).str.len().min(),
                    "max_length": col_data.astype(str).str.len().max(),
                    "avg_length": col_data.astype(str).str.len().mean()
                })
            
            profile["columns"][col] = col_profile
        
        return profile

# 使用示例
checker = DataQualityChecker()

# 添加质量规则
checker.add_rule("user_id", "not_null")
checker.add_rule("user_id", "unique")
checker.add_rule("age", "range", {"min": 0, "max": 150})
checker.add_rule("email", "pattern", {"pattern": r"^[\w\.-]+@[\w\.-]+\.\w+$"})

# 执行检测
df = pd.DataFrame({
    "user_id": [1, 2, 3, None, 5],
    "age": [25, 30, -5, 40, 200],
    "email": ["a@b.com", "invalid", "c@d.com", "e@f.com", "g@h.com"]
})

results = checker.check(df)
print(f"质量分数: {results['score']}")
print(f"问题数: {len(results['issues'])}")

# 获取数据概要
profile = checker.get_profile(df)
print(f"数据概要: {profile['row_count']} 行, {profile['column_count']} 列")

3.2 数据清洗处理器

from typing import Dict, List, Callable

class DataCleaner:
    """数据清洗处理器"""
    
    def __init__(self):
        self.steps: List[Dict] = []
    
    def add_step(self, name: str, processor: Callable, params: Dict = None):
        """添加清洗步骤"""
        self.steps.append({
            "name": name,
            "processor": processor,
            "params": params or {}
        })
    
    def clean(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]:
        """执行清洗"""
        original_count = len(df)
        report = {
            "original_rows": original_count,
            "steps": []
        }
        
        result_df = df.copy()
        
        for step in self.steps:
            before_count = len(result_df)
            
            result_df = step["processor"](result_df, **step["params"])
            
            after_count = len(result_df)
            
            report["steps"].append({
                "name": step["name"],
                "rows_before": before_count,
                "rows_after": after_count,
                "rows_removed": before_count - after_count
            })
        
        report["final_rows"] = len(result_df)
        report["rows_removed"] = original_count - len(result_df)
        
        return result_df, report

# 预定义清洗处理器
def remove_duplicates(df: pd.DataFrame, subset: List[str] = None) -> pd.DataFrame:
    """去除重复"""
    return df.drop_duplicates(subset=subset)

def fill_missing(df: pd.DataFrame, columns: Dict[str, Any] = None) -> pd.DataFrame:
    """填充缺失值"""
    result = df.copy()
    
    for col, value in (columns or {}).items():
        if col in result.columns:
            result[col] = result[col].fillna(value)
    
    return result

def remove_outliers(df: pd.DataFrame, column: str, method: str = "iqr", threshold: float = 1.5) -> pd.DataFrame:
    """去除异常值"""
    if column not in df.columns:
        return df
    
    if method == "iqr":
        Q1 = df[column].quantile(0.25)
        Q3 = df[column].quantile(0.75)
        IQR = Q3 - Q1
        
        lower = Q1 - threshold * IQR
        upper = Q3 + threshold * IQR
        
        return df[(df[column] >= lower) & (df[column] <= upper)]
    
    elif method == "zscore":
        from scipy import stats
        z_scores = stats.zscore(df[column].dropna())
        return df[abs(z_scores) <= threshold]
    
    return df

def standardize_text(df: pd.DataFrame, column: str, lowercase: bool = True, strip: bool = True) -> pd.DataFrame:
    """标准化文本"""
    result = df.copy()
    
    if column in result.columns:
        if lowercase:
            result[column] = result[column].astype(str).str.lower()
        if strip:
            result[column] = result[column].astype(str).str.strip()
    
    return result

def convert_types(df: pd.DataFrame, columns: Dict[str, str]) -> pd.DataFrame:
    """转换数据类型"""
    result = df.copy()
    
    for col, dtype in columns.items():
        if col in result.columns:
            try:
                result[col] = result[col].astype(dtype)
            except Exception as e:
                print(f"转换 {col} 失败: {e}")
    
    return result

# 使用示例
cleaner = DataCleaner()

# 添加清洗步骤
cleaner.add_step("去除重复", remove_duplicates, {"subset": ["user_id"]})
cleaner.add_step("填充缺失", fill_missing, {"columns": {"age": 0, "name": "未知"}})
cleaner.add_step("去除异常值", remove_outliers, {"column": "age", "method": "iqr"})
cleaner.add_step("标准化文本", standardize_text, {"column": "email", "lowercase": True})
cleaner.add_step("类型转换", convert_types, {"columns": {"age": "int64", "created_at": "datetime64"}})

# 执行清洗
cleaned_df, report = cleaner.clean(df)
print(f"清洗报告: {report}")

4. 数据分析引擎

4.1 统计分析器

from typing import Dict, List, Optional
from scipy import stats
import numpy as np

class StatisticalAnalyzer:
    """统计分析器"""
    
    def __init__(self, df: pd.DataFrame):
        self.df = df
    
    def descriptive_stats(self, columns: List[str] = None) -> Dict:
        """描述性统计"""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns.tolist()
        
        result = {}
        
        for col in columns:
            if col not in self.df.columns:
                continue
            
            data = self.df[col].dropna()
            
            result[col] = {
                "count": len(data),
                "mean": data.mean(),
                "std": data.std(),
                "min": data.min(),
                "q1": data.quantile(0.25),
                "median": data.median(),
                "q3": data.quantile(0.75),
                "max": data.max(),
                "skewness": data.skew(),
                "kurtosis": data.kurtosis()
            }
        
        return result
    
    def correlation_analysis(self, method: str = "pearson") -> pd.DataFrame:
        """相关性分析"""
        numeric_df = self.df.select_dtypes(include=[np.number])
        return numeric_df.corr(method=method)
    
    def hypothesis_test(self, column1: str, column2: str, test_type: str = "ttest") -> Dict:
        """假设检验"""
        data1 = self.df[column1].dropna()
        data2 = self.df[column2].dropna()
        
        if test_type == "ttest":
            statistic, pvalue = stats.ttest_ind(data1, data2)
            return {
                "test": "t-test",
                "statistic": statistic,
                "p_value": pvalue,
                "significant": pvalue < 0.05
            }
        
        elif test_type == "mannwhitney":
            statistic, pvalue = stats.mannwhitneyu(data1, data2)
            return {
                "test": "Mann-Whitney U",
                "statistic": statistic,
                "p_value": pvalue,
                "significant": pvalue < 0.05
            }
        
        elif test_type == "chi2":
            contingency = pd.crosstab(self.df[column1], self.df[column2])
            statistic, pvalue, dof, expected = stats.chi2_contingency(contingency)
            return {
                "test": "Chi-square",
                "statistic": statistic,
                "p_value": pvalue,
                "dof": dof,
                "significant": pvalue < 0.05
            }
        
        return {}
    
    def anova(self, group_column: str, value_column: str) -> Dict:
        """方差分析"""
        groups = self.df.groupby(group_column)[value_column]
        
        group_data = [group.dropna().values for name, group in groups]
        
        statistic, pvalue = stats.f_oneway(*group_data)
        
        return {
            "test": "ANOVA",
            "statistic": statistic,
            "p_value": pvalue,
            "significant": pvalue < 0.05,
            "groups": len(group_data)
        }
    
    def time_series_analysis(self, date_column: str, value_column: str, freq: str = "D") -> Dict:
        """时间序列分析"""
        df = self.df.copy()
        df[date_column] = pd.to_datetime(df[date_column])
        df = df.set_index(date_column)
        
        # 重采样
        resampled = df[value_column].resample(freq)
        
        result = {
            "daily_stats": {
                "mean": resampled.mean().to_dict(),
                "sum": resampled.sum().to_dict(),
                "count": resampled.count().to_dict()
            }
        }
        
        # 趋势分析
        from scipy.signal import detrend
        values = df[value_column].values
        trend = detrend(values)
        
        result["detrended"] = trend.tolist()
        
        return result

# 使用示例
analyzer = StatisticalAnalyzer(sales_df)

# 描述性统计
stats_result = analyzer.descriptive_stats(["price", "quantity", "total"])
print(f"统计结果: {stats_result}")

# 相关性分析
corr = analyzer.correlation_analysis()
print(f"相关性矩阵:\n{corr}")

# 假设检验
test_result = analyzer.hypothesis_test("group_a", "group_b", "ttest")
print(f"检验结果: {test_result}")

4.2 自然语言查询

from typing import Dict, List, Optional
import re

class NaturalLanguageQuery:
    """自然语言查询处理器"""
    
    def __init__(self, schema: Dict):
        self.schema = schema
        self.query_templates = self._build_templates()
    
    def _build_templates(self) -> List[Dict]:
        """构建查询模板"""
        return [
            {
                "pattern": r"(.+)的平均值",
                "sql_template": "SELECT AVG({column}) FROM {table}",
                "type": "aggregation"
            },
            {
                "pattern": r"(.+)的总和",
                "sql_template": "SELECT SUM({column}) FROM {table}",
                "type": "aggregation"
            },
            {
                "pattern": r"(.+)的最大值",
                "sql_template": "SELECT MAX({column}) FROM {table}",
                "type": "aggregation"
            },
            {
                "pattern": r"(.+)的最小值",
                "sql_template": "SELECT MIN({column}) FROM {table}",
                "type": "aggregation"
            },
            {
                "pattern": r"按(.+)分组统计(.+)",
                "sql_template": "SELECT {group_column}, COUNT(*) FROM {table} GROUP BY {group_column}",
                "type": "grouping"
            },
            {
                "pattern": r"(.+)前(\d+)名",
                "sql_template": "SELECT * FROM {table} ORDER BY {column} DESC LIMIT {limit}",
                "type": "ranking"
            }
        ]
    
    def parse(self, question: str) -> Dict:
        """解析自然语言问题"""
        result = {
            "question": question,
            "sql": None,
            "type": None,
            "confidence": 0
        }
        
        for template in self.query_templates:
            match = re.search(template["pattern"], question)
            
            if match:
                result["type"] = template["type"]
                
                # 提取参数
                if template["type"] == "aggregation":
                    column_name = match.group(1)
                    column = self._find_column(column_name)
                    table = self._find_table(column)
                    
                    if column and table:
                        result["sql"] = template["sql_template"].format(
                            column=column,
                            table=table
                        )
                        result["confidence"] = 0.8
                
                elif template["type"] == "grouping":
                    group_col_name = match.group(1)
                    value_col_name = match.group(2)
                    
                    group_column = self._find_column(group_col_name)
                    table = self._find_table(group_column)
                    
                    if group_column and table:
                        result["sql"] = template["sql_template"].format(
                            group_column=group_column,
                            table=table
                        )
                        result["confidence"] = 0.7
                
                elif template["type"] == "ranking":
                    column_name = match.group(1)
                    limit = match.group(2)
                    
                    column = self._find_column(column_name)
                    table = self._find_table(column)
                    
                    if column and table:
                        result["sql"] = template["sql_template"].format(
                            column=column,
                            table=table,
                            limit=limit
                        )
                        result["confidence"] = 0.8
                
                break
        
        return result
    
    def _find_column(self, name: str) -> Optional[str]:
        """查找匹配的列名"""
        name_lower = name.lower()
        
        for table_name, table_info in self.schema.items():
            for column in table_info.get("columns", []):
                if name_lower in column["name"].lower():
                    return column["name"]
        
        return None
    
    def _find_table(self, column: str) -> Optional[str]:
        """查找列所在的表"""
        for table_name, table_info in self.schema.items():
            for col in table_info.get("columns", []):
                if col["name"] == column:
                    return table_name
        
        return None
    
    def execute(self, question: str, connector: DataConnector) -> pd.DataFrame:
        """执行自然语言查询"""
        parsed = self.parse(question)
        
        if parsed["sql"]:
            return connector.fetch(parsed["sql"])
        
        return pd.DataFrame()

# 使用示例
schema = {
    "sales": {
        "columns": [
            {"name": "product", "type": "string"},
            {"name": "price", "type": "float"},
            {"name": "quantity", "type": "int"},
            {"name": "region", "type": "string"}
        ]
    }
}

nlq = NaturalLanguageQuery(schema)

# 解析问题
result = nlq.parse("价格的平均值")
print(f"SQL: {result['sql']}")
print(f"置信度: {result['confidence']}")

# 执行查询
# df = nlq.execute("销售额前10名", db_connector)

5. 可视化引擎

5.1 图表生成器

from typing import Dict, List, Optional
import matplotlib.pyplot as plt
import seaborn as sns

class ChartGenerator:
    """图表生成器"""
    
    def __init__(self, style: str = "seaborn"):
        plt.style.use(style)
        self.figures: List[plt.Figure] = []
    
    def bar_chart(self, df: pd.DataFrame, x: str, y: str, title: str = None) -> plt.Figure:
        """柱状图"""
        fig, ax = plt.subplots(figsize=(10, 6))
        
        df.plot.bar(x=x, y=y, ax=ax)
        
        if title:
            ax.set_title(title)
        
        ax.set_xlabel(x)
        ax.set_ylabel(y)
        
        plt.tight_layout()
        self.figures.append(fig)
        
        return fig
    
    def line_chart(self, df: pd.DataFrame, x: str, y: str, title: str = None) -> plt.Figure:
        """折线图"""
        fig, ax = plt.subplots(figsize=(12, 6))
        
        df.plot.line(x=x, y=y, ax=ax)
        
        if title:
            ax.set_title(title)
        
        plt.tight_layout()
        self.figures.append(fig)
        
        return fig
    
    def pie_chart(self, df: pd.DataFrame, values: str, labels: str, title: str = None) -> plt.Figure:
        """饼图"""
        fig, ax = plt.subplots(figsize=(8, 8))
        
        ax.pie(df[values], labels=df[labels], autopct='%1.1f%%')
        
        if title:
            ax.set_title(title)
        
        self.figures.append(fig)
        
        return fig
    
    def scatter_plot(self, df: pd.DataFrame, x: str, y: str, hue: str = None, title: str = None) -> plt.Figure:
        """散点图"""
        fig, ax = plt.subplots(figsize=(10, 8))
        
        if hue:
            for category in df[hue].unique():
                subset = df[df[hue] == category]
                ax.scatter(subset[x], subset[y], label=category, alpha=0.6)
            ax.legend()
        else:
            ax.scatter(df[x], df[y], alpha=0.6)
        
        ax.set_xlabel(x)
        ax.set_ylabel(y)
        
        if title:
            ax.set_title(title)
        
        plt.tight_layout()
        self.figures.append(fig)
        
        return fig
    
    def heatmap(self, df: pd.DataFrame, title: str = None) -> plt.Figure:
        """热力图"""
        fig, ax = plt.subplots(figsize=(10, 8))
        
        sns.heatmap(df, annot=True, fmt=".2f", cmap="coolwarm", ax=ax)
        
        if title:
            ax.set_title(title)
        
        plt.tight_layout()
        self.figures.append(fig)
        
        return fig
    
    def histogram(self, df: pd.DataFrame, column: str, bins: int = 30, title: str = None) -> plt.Figure:
        """直方图"""
        fig, ax = plt.subplots(figsize=(10, 6))
        
        ax.hist(df[column], bins=bins, edgecolor='black')
        
        ax.set_xlabel(column)
        ax.set_ylabel('Frequency')
        
        if title:
            ax.set_title(title)
        
        plt.tight_layout()
        self.figures.append(fig)
        
        return fig
    
    def box_plot(self, df: pd.DataFrame, x: str, y: str, title: str = None) -> plt.Figure:
        """箱线图"""
        fig, ax = plt.subplots(figsize=(10, 6))
        
        df.boxplot(column=y, by=x, ax=ax)
        
        if title:
            ax.set_title(title)
        
        plt.tight_layout()
        self.figures.append(fig)
        
        return fig
    
    def save_all(self, directory: str, prefix: str = "chart"):
        """保存所有图表"""
        import os
        
        os.makedirs(directory, exist_ok=True)
        
        for i, fig in enumerate(self.figures):
            path = os.path.join(directory, f"{prefix}_{i+1}.png")
            fig.savefig(path, dpi=150)
            print(f"保存图表: {path}")
    
    def close_all(self):
        """关闭所有图表"""
        for fig in self.figures:
            plt.close(fig)
        self.figures.clear()

# 使用示例
chart_gen = ChartGenerator()

# 生成图表
chart_gen.bar_chart(sales_df, x="product", y="sales", title="产品销售额")
chart_gen.line_chart(time_df, x="date", y="revenue", title="收入趋势")
chart_gen.pie_chart(category_df, values="amount", labels="category", title="类别占比")
chart_gen.scatter_plot(sales_df, x="price", y="quantity", hue="region", title="价格与销量关系")

# 保存图表
chart_gen.save_all("/output/charts", "analysis")

6. 最佳实践

6.1 平台设计原则

原则 说明 实践
易用性 降低使用门槛 自然语言查询
可扩展 支持新数据源 插件式连接器
高性能 快速响应 缓存 + 索引
安全性 数据保护 权限控制

6.2 常见问题

问题 原因 解决方案
查询慢 数据量大 分区 + 索引
结果不准 数据质量差 数据清洗
图表乱码 编码问题 设置字体

7. 总结

7.1 核心要点

本文通过完整的数据分析平台案例,展示了 OpenClaw 在数据分析场景的应用:

模块 核心功能 技术要点
数据采集 多源接入 连接器 + 调度
数据清洗 质量保障 规则 + 处理器
数据分析 多维分析 统计 + ML
智能查询 自然语言 NLP + SQL
可视化 图表展示 自动推荐

7.2 下一步学习

  • 第76篇:OpenClaw 实战案例:内容创作系统

参考资料


Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐