系列第4篇:Python+Go构建企业级AI Agent实战指南(4/13)

标签: DeerFlow | Skill开发 | 数据分析 | Python | 源码解析


一、开篇:Skill是Agent的"超能力"

如果把AI Agent比作一个职场新人,那么**Skill(技能)**就是他的专业能力:

  • 会Excel → 数据处理Skill
  • 会Python → 编程Skill
  • 会写报告 → 文案Skill

DeerFlow的Skill机制,让Agent的能力扩展变得像安装APP一样简单。本文将深入Skill.md开发,带你构建一个生产级的数据分析Agent。


二、Skill.md文件详解

2.1 Skill.md结构

# Skill.md 标准结构

## 基本信息
name: Skill名称
description: Skill功能描述
version: 版本号
author: 作者

## 能力列表 (capabilities)
定义Agent能做什么

## 工作流程 (workflow)  
定义Agent如何执行任务

## 约束条件 (constraints)
定义边界和限制

## 示例 (examples)
提供使用示例

2.2 完整Skill.md示例

# sales_analyzer/Skill.md

## 基本信息
name: SalesAnalyzer
description: 专业的销售数据分析智能体,提供销售趋势分析、预测和洞察
version: 2.0.0
author: AI Team

## 能力列表
capabilities:
  - name: load_sales_data
    description: 加载销售数据,支持CSV、Excel格式
    parameters:
      - name: file_path
        type: string
        required: true
        description: 数据文件路径
      - name: date_column
        type: string
        required: false
        default: "date"
        description: 日期列名称
    returns:
      type: dataframe
      description: 加载的数据框

  - name: analyze_trends
    description: 分析销售趋势,包括同比、环比、移动平均线
    parameters:
      - name: df
        type: dataframe
        required: true
      - name: metric
        type: string
        required: true
        description: 分析指标,如sales、revenue
      - name: period
        type: string
        enum: [daily, weekly, monthly]
        default: "monthly"
    returns:
      type: dict
      description: 趋势分析结果

  - name: forecast_sales
    description: 使用 Prophet 模型预测未来销售
    parameters:
      - name: df
        type: dataframe
        required: true
      - name: periods
        type: integer
        required: false
        default: 30
        description: 预测天数
    returns:
      type: dataframe
      description: 预测结果

  - name: generate_insights
    description: 基于分析结果生成业务洞察
    parameters:
      - name: analysis_results
        type: dict
        required: true
    returns:
      type: list
      description: 洞察列表

## 工作流程
workflow:
  1. 接收用户的数据文件路径和分析需求
  2. 验证文件格式和完整性
  3. 加载数据并执行基础清洗
  4. 根据需求选择分析维度(趋势/预测/对比)
  5. 执行分析并生成可视化
  6. 提取关键洞察和业务建议
  7. 整合输出完整的分析报告

## 约束条件
constraints:
  - 数据文件大小不超过500MB
  - 支持格式:CSV、Excel (.xlsx, .xls)
  - 必须包含日期列
  - 数值列不能包含非数字字符
  - 预测功能需要至少3个月历史数据

## 示例
examples:
  - input: "分析sales_2024.csv的销售趋势"
    output: |
      📊 销售趋势分析报告
      
      1. 整体趋势:2024年销售额同比增长23%
      2. 季节性:Q4为销售旺季,占全年35%
      3. 异常点:3月份销售额下降15%,建议关注
      
      [附:趋势图、预测图]

  - input: "预测下个月的销售额"
    output: |
      🔮 销售预测
      
      基于历史数据,预测下月销售额:
      - 预测值:$125,000
      - 置信区间:$118,000 - $132,000
      - 环比预期:+8%

三、Skill的Python实现

3.1 项目结构

sales_analyzer/
├── Skill.md              # 技能定义文件
├── __init__.py
├── agent.py              # Agent主类
├── tools/
│   ├── __init__.py
│   ├── data_loader.py    # 数据加载工具
│   ├── analyzer.py       # 分析工具
│   ├── forecaster.py     # 预测工具
│   └── visualizer.py     # 可视化工具
├── prompts/
│   └── analysis_prompt.txt
└── tests/
    └── test_analyzer.py

3.2 完整代码实现

# sales_analyzer/agent.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import matplotlib.pyplot as plt
import seaborn as sns
from prophet import Prophet
import warnings
warnings.filterwarnings('ignore')

from deerflow import Agent, Tool
from deerflow.types import ToolResult

plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']
plt.rcParams['axes.unicode_minus'] = False


class SalesAnalyzerAgent(Agent):
    """
    销售数据分析智能体
    
    提供销售趋势分析、预测和洞察生成功能
    """
    
    def __init__(self, config: Optional[Dict] = None):
        super().__init__()
        self.config = config or {}
        self.data = None
        self.setup_tools()
    
    def setup_tools(self):
        """注册所有工具"""
        
        @Tool(
            name="load_sales_data",
            description="加载销售数据文件"
        )
        def load_data(file_path: str, date_column: str = "date") -> ToolResult:
            """加载并验证销售数据"""
            try:
                # 根据扩展名选择读取方式
                if file_path.endswith('.csv'):
                    df = pd.read_csv(file_path)
                elif file_path.endswith(('.xlsx', '.xls')):
                    df = pd.read_excel(file_path)
                else:
                    return ToolResult.error("不支持的文件格式,请使用CSV或Excel")
                
                # 验证日期列
                if date_column not in df.columns:
                    return ToolResult.error(f"未找到日期列: {date_column}")
                
                # 转换日期格式
                df[date_column] = pd.to_datetime(df[date_column])
                df = df.sort_values(date_column)
                
                # 存储数据
                self.data = df
                
                # 返回数据摘要
                summary = {
                    "rows": len(df),
                    "columns": list(df.columns),
                    "date_range": f"{df[date_column].min()} 至 {df[date_column].max()}",
                    "numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist()
                }
                
                return ToolResult.success(summary)
                
            except Exception as e:
                return ToolResult.error(f"数据加载失败: {str(e)}")
        
        @Tool(
            name="analyze_trends",
            description="分析销售趋势"
        )
        def analyze_trends(
            metric: str = "sales",
            period: str = "monthly"
        ) -> ToolResult:
            """分析销售趋势指标"""
            if self.data is None:
                return ToolResult.error("请先加载数据")
            
            if metric not in self.data.columns:
                return ToolResult.error(f"未找到指标列: {metric}")
            
            try:
                df = self.data.copy()
                
                # 按周期聚合
                if period == "daily":
                    df_grouped = df.groupby(df['date'].dt.date)[metric].sum()
                elif period == "weekly":
                    df_grouped = df.groupby(df['date'].dt.to_period('W'))[metric].sum()
                else:  # monthly
                    df_grouped = df.groupby(df['date'].dt.to_period('M'))[metric].sum()
                
                # 计算关键指标
                total = df_grouped.sum()
                mean = df_grouped.mean()
                growth_rate = (df_grouped.iloc[-1] - df_grouped.iloc[0]) / df_grouped.iloc[0] * 100
                
                # 计算移动平均线
                ma_7 = df_grouped.rolling(window=7, min_periods=1).mean()
                ma_30 = df_grouped.rolling(window=30, min_periods=1).mean()
                
                # 找出峰值和谷值
                peak_idx = df_grouped.idxmax()
                valley_idx = df_grouped.idxmin()
                
                result = {
                    "period": period,
                    "metric": metric,
                    "total": round(total, 2),
                    "average": round(mean, 2),
                    "growth_rate": round(growth_rate, 2),
                    "peak": {"period": str(peak_idx), "value": round(df_grouped.max(), 2)},
                    "valley": {"period": str(valley_idx), "value": round(df_grouped.min(), 2)},
                    "trend_data": df_grouped.to_dict()
                }
                
                return ToolResult.success(result)
                
            except Exception as e:
                return ToolResult.error(f"趋势分析失败: {str(e)}")
        
        @Tool(
            name="forecast_sales",
            description="预测未来销售趋势"
        )
        def forecast_sales(
            metric: str = "sales",
            periods: int = 30
        ) -> ToolResult:
            """使用Prophet模型进行销售预测"""
            if self.data is None:
                return ToolResult.error("请先加载数据")
            
            try:
                # 准备Prophet格式数据
                df_prophet = self.data.rename(columns={
                    'date': 'ds',
                    metric: 'y'
                })[['ds', 'y']]
                
                # 创建并训练模型
                model = Prophet(
                    yearly_seasonality=True,
                    weekly_seasonality=True,
                    daily_seasonality=False
                )
                model.fit(df_prophet)
                
                # 生成预测
                future = model.make_future_dataframe(periods=periods)
                forecast = model.predict(future)
                
                # 提取预测结果
                forecast_future = forecast[forecast['ds'] > df_prophet['ds'].max()]
                
                result = {
                    "forecast_periods": periods,
                    "predictions": [
                        {
                            "date": row['ds'].strftime('%Y-%m-%d'),
                            "value": round(row['yhat'], 2),
                            "lower": round(row['yhat_lower'], 2),
                            "upper": round(row['yhat_upper'], 2)
                        }
                        for _, row in forecast_future.head(10).iterrows()
                    ],
                    "trend_direction": "up" if forecast['trend'].iloc[-1] > forecast['trend'].iloc[-30] else "down"
                }
                
                return ToolResult.success(result)
                
            except Exception as e:
                return ToolResult.error(f"预测失败: {str(e)}")
        
        @Tool(
            name="generate_report",
            description="生成完整的分析报告"
        )
        def generate_report(
            trend_analysis: Dict,
            forecast: Optional[Dict] = None
        ) -> ToolResult:
            """生成Markdown格式的分析报告"""
            
            report = f"""# 📊 销售数据分析报告

生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}

## 1. 数据概览

- **分析指标**: {trend_analysis['metric']}
- **时间周期**: {trend_analysis['period']}
- **总销售额**: ¥{trend_analysis['total']:,.2f}
- **平均销售额**: ¥{trend_analysis['average']:,.2f}

## 2. 趋势分析

### 2.1 整体趋势
- **增长率**: {trend_analysis['growth_rate']:+.2f}%
- **趋势方向**: {'📈 上升' if trend_analysis['growth_rate'] > 0 else '📉 下降'}

### 2.2 关键节点
- **销售峰值**: ¥{trend_analysis['peak']['value']:,.2f} ({trend_analysis['peak']['period']})
- **销售谷值**: ¥{trend_analysis['valley']['value']:,.2f} ({trend_analysis['valley']['period']})

"""
            
            if forecast:
                report += f"""
## 3. 销售预测

### 3.1 未来{trend_analysis['period']}预测
- **趋势方向**: {'📈 预期上升' if forecast['trend_direction'] == 'up' else '📉 预期下降'}
- **预测周期**: {forecast['forecast_periods']} 天

### 3.2 近期预测值
| 日期 | 预测值 | 置信区间下限 | 置信区间上限 |
|------|--------|-------------|-------------|
"""
                for pred in forecast['predictions'][:5]:
                    report += f"| {pred['date']} | ¥{pred['value']:,.2f} | ¥{pred['lower']:,.2f} | ¥{pred['upper']:,.2f} |\n"
            
            report += """
## 4. 业务建议

基于以上分析,建议关注以下方面:

1. **趋势把握**: 根据增长趋势调整库存和人员配置
2. **季节性**: 关注销售峰值周期,提前做好营销准备
3. **异常监控**: 建立预警机制,及时发现销售异常

---
*报告由 SalesAnalyzer Agent 自动生成*
"""
            
            return ToolResult.success({"report": report})
        
        # 注册所有工具
        self.register_tool(load_data)
        self.register_tool(analyze_trends)
        self.register_tool(forecast_sales)
        self.register_tool(generate_report)
    
    async def run_analysis(self, file_path: str, metric: str = "sales") -> str:
        """执行完整的分析流程"""
        
        print("🚀 开始销售数据分析...")
        
        # 1. 加载数据
        print("📖 加载数据...")
        load_result = await self.call_tool("load_sales_data", file_path=file_path)
        if not load_result.success:
            return f"❌ 数据加载失败: {load_result.data}"
        print(f"✅ 数据加载成功: {load_result.data}")
        
        # 2. 趋势分析
        print("📊 分析趋势...")
        trend_result = await self.call_tool("analyze_trends", metric=metric)
        if not trend_result.success:
            return f"❌ 趋势分析失败: {trend_result.data}"
        print("✅ 趋势分析完成")
        
        # 3. 销售预测
        print("🔮 预测未来趋势...")
        forecast_result = await self.call_tool("forecast_sales", metric=metric, periods=30)
        if not forecast_result.success:
            print(f"⚠️ 预测失败: {forecast_result.data}")
            forecast_result = None
        else:
            print("✅ 预测完成")
        
        # 4. 生成报告
        print("📝 生成报告...")
        report_result = await self.call_tool(
            "generate_report",
            trend_analysis=trend_result.data,
            forecast=forecast_result.data if forecast_result else None
        )
        
        if report_result.success:
            # 保存报告
            output_path = f"sales_report_{datetime.now().strftime('%Y%m%d_%H%M')}.md"
            with open(output_path, 'w', encoding='utf-8') as f:
                f.write(report_result.data['report'])
            print(f"✅ 报告已保存: {output_path}")
            return report_result.data['report']
        else:
            return f"❌ 报告生成失败: {report_result.data}"


# 使用示例
async def main():
    # 创建示例数据
    np.random.seed(42)
    dates = pd.date_range('2024-01-01', periods=365, freq='D')
    base_sales = 1000
    trend = np.linspace(0, 500, 365)
    seasonal = 200 * np.sin(2 * np.pi * np.arange(365) / 365.25)
    noise = np.random.normal(0, 100, 365)
    sales = base_sales + trend + seasonal + noise
    
    sample_df = pd.DataFrame({
        'date': dates,
        'sales': sales.astype(int),
        'orders': (sales / 50).astype(int)
    })
    sample_df.to_csv('sample_sales.csv', index=False)
    print("✅ 示例数据已创建: sample_sales.csv")
    
    # 创建Agent并执行分析
    agent = SalesAnalyzerAgent()
    report = await agent.run_analysis('sample_sales.csv', metric='sales')
    
    print("\n" + "="*60)
    print("📄 报告预览:")
    print("="*60)
    print(report[:1000])
    print("...")


if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

四、测试与调试

4.1 单元测试

# tests/test_analyzer.py
import pytest
import pandas as pd
from sales_analyzer.agent import SalesAnalyzerAgent

@pytest.fixture
def sample_data():
    """创建测试数据"""
    dates = pd.date_range('2024-01-01', periods=100, freq='D')
    return pd.DataFrame({
        'date': dates,
        'sales': range(100),
        'revenue': [x * 10 for x in range(100)]
    })

@pytest.fixture
def agent():
    return SalesAnalyzerAgent()

@pytest.mark.asyncio
async def test_load_data(agent, tmp_path):
    """测试数据加载"""
    # 创建临时CSV
    csv_path = tmp_path / "test.csv"
    sample_data().to_csv(csv_path, index=False)
    
    result = await agent.call_tool("load_sales_data", file_path=str(csv_path))
    assert result.success
    assert result.data['rows'] == 100

@pytest.mark.asyncio
async def test_analyze_trends(agent, tmp_path):
    """测试趋势分析"""
    # 先加载数据
    csv_path = tmp_path / "test.csv"
    sample_data().to_csv(csv_path, index=False)
    await agent.call_tool("load_sales_data", file_path=str(csv_path))
    
    # 测试分析
    result = await agent.call_tool("analyze_trends", metric="sales")
    assert result.success
    assert 'growth_rate' in result.data

五、结语

通过本文,你已经掌握了:

  1. ✅ Skill.md文件的完整结构和编写规范
  2. ✅ DeerFlow Agent的Python实现
  3. ✅ 工具注册和调用机制
  4. ✅ 完整的数据分析Agent开发流程

下一篇,我们将探讨Python与国产大模型的深度集成。


系列文章导航: ← 3. Python构建AI多智能体系统实战 4. Python技能开发实战(本文) 5. Python与国产大模型的深度集成 →


本文首发于CSDN,转载请注明出处。

标签: DeerFlow | Skill开发 | 数据分析 | Python | 源码解析

更多推荐