Clawdbot汉化版代码实例:用Python封装Clawdbot API实现批量会话分析

1. 项目背景与需求场景

在实际的企业应用场景中,我们经常需要对大量的AI对话会话进行分析和统计。Clawdbot作为一个优秀的本地化AI对话平台,虽然提供了丰富的命令行工具,但在批量处理和数据分析方面还存在一些不便。

通过Python封装Clawdbot API,我们可以实现:

  • 批量会话导出:一次性导出所有历史对话记录
  • 会话数据分析:统计对话频率、时长、主题分布等指标
  • 自动化报告生成:定期生成对话分析报告
  • 企业微信集成:将分析结果推送到企业微信工作群

这种集成方式特别适合需要监控AI助手使用情况、分析用户需求、优化AI服务的团队和企业。

2. 环境准备与基础配置

2.1 安装必要的Python库

首先确保你的Python环境已经安装以下依赖库:

pip install requests pandas numpy matplotlib seaborn

2.2 配置Clawdbot访问信息

创建配置文件 clawdbot_config.py

# Clawdbot API配置
CLAWDBOT_CONFIG = {
    'base_url': 'http://localhost:18789',
    'api_token': 'dev-test-token',
    'gateway_token': 'dev-test-token',
    'data_dir': '/root/.clawdbot',
    'sessions_path': '/root/.clawdbot/agents/main/sessions'
}

# 企业微信配置
WECHAT_CONFIG = {
    'webhook_url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY',
    'enabled': True
}

3. Clawdbot API封装实现

3.1 基础API客户端类

创建 ClawdbotClient 类来封装所有API调用:

import requests
import json
import os
from typing import Dict, List, Optional

class ClawdbotClient:
    def __init__(self, base_url: str, api_token: str):
        self.base_url = base_url.rstrip('/')
        self.api_token = api_token
        self.headers = {
            'Authorization': f'Bearer {api_token}',
            'Content-Type': 'application/json'
        }
    
    def send_message(self, message: str, agent: str = 'main', 
                    thinking: str = 'medium', session_id: Optional[str] = None) -> Dict:
        """发送消息到Clawdbot"""
        payload = {
            'agent': agent,
            'message': message,
            'thinking': thinking
        }
        
        if session_id:
            payload['session_id'] = session_id
        
        response = requests.post(
            f'{self.base_url}/api/chat',
            headers=self.headers,
            json=payload,
            timeout=30
        )
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f'API请求失败: {response.status_code} - {response.text}')
    
    def get_session_history(self, session_id: str) -> List[Dict]:
        """获取特定会话的历史记录"""
        response = requests.get(
            f'{self.base_url}/api/sessions/{session_id}/history',
            headers=self.headers,
            timeout=30
        )
        
        if response.status_code == 200:
            return response.json().get('messages', [])
        else:
            raise Exception(f'获取会话历史失败: {response.status_code}')
    
    def list_sessions(self, limit: int = 100, offset: int = 0) -> Dict:
        """列出所有会话"""
        params = {'limit': limit, 'offset': offset}
        response = requests.get(
            f'{self.base_url}/api/sessions',
            headers=self.headers,
            params=params,
            timeout=30
        )
        
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f'列出会话失败: {response.status_code}')

3.2 会话数据分析类

创建 SessionAnalyzer 类来处理会话数据:

import pandas as pd
from datetime import datetime, timedelta
import re
from collections import Counter

class SessionAnalyzer:
    def __init__(self, client: ClawdbotClient):
        self.client = client
        self.sessions_data = []
    
    def load_sessions(self, days: int = 7):
        """加载指定天数内的会话数据"""
        end_time = datetime.now()
        start_time = end_time - timedelta(days=days)
        
        # 模拟从API获取会话数据
        # 实际使用时替换为真实的API调用
        sessions = self.client.list_sessions(limit=1000)
        
        for session in sessions.get('items', []):
            session_id = session['id']
            created_at = datetime.fromisoformat(session['created_at'].replace('Z', '+00:00'))
            
            if created_at >= start_time:
                history = self.client.get_session_history(session_id)
                self.sessions_data.extend(history)
    
    def analyze_message_patterns(self) -> Dict:
        """分析消息模式"""
        if not self.sessions_data:
            return {}
        
        df = pd.DataFrame(self.sessions_data)
        
        # 计算基本统计
        user_messages = df[df['role'] == 'user']
        assistant_messages = df[df['role'] == 'assistant']
        
        analysis = {
            'total_messages': len(df),
            'user_messages': len(user_messages),
            'assistant_messages': len(assistant_messages),
            'avg_response_length': assistant_messages['content'].str.len().mean(),
            'common_topics': self._extract_topics(user_messages['content']),
            'busiest_hours': self._analyze_peak_hours(df),
            'session_duration_stats': self._analyze_session_duration(df)
        }
        
        return analysis
    
    def _extract_topics(self, messages: pd.Series) -> List[Dict]:
        """提取常见话题"""
        # 简单的关键词提取
        common_words = [
            '如何', '怎么', '为什么', '请帮', '解释', '说明',
            '代码', '编程', '写一个', '制作', '设计', '建议'
        ]
        
        topic_counter = Counter()
        for message in messages:
            if isinstance(message, str):
                for word in common_words:
                    if word in message:
                        topic_counter[word] += 1
        
        return [{'topic': k, 'count': v} for k, v in topic_counter.most_common(10)]
    
    def _analyze_peak_hours(self, df: pd.DataFrame) -> List[Dict]:
        """分析高峰时段"""
        df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
        hour_counts = df['hour'].value_counts().sort_index()
        
        return [{'hour': f'{h}:00', 'count': count} 
                for h, count in hour_counts.items()]
    
    def _analyze_session_duration(self, df: pd.DataFrame) -> Dict:
        """分析会话时长统计"""
        # 这里需要根据实际数据结构实现
        return {
            'avg_duration_minutes': 0,
            'max_duration_minutes': 0,
            'min_duration_minutes': 0
        }

4. 企业微信集成实现

4.1 企业微信消息推送

import requests
import json

class WeChatNotifier:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url
    
    def send_text_message(self, content: str, mentioned_list: List[str] = None):
        """发送文本消息到企业微信"""
        payload = {
            'msgtype': 'text',
            'text': {
                'content': content,
                'mentioned_list': mentioned_list or []
            }
        }
        
        response = requests.post(
            self.webhook_url,
            headers={'Content-Type': 'application/json'},
            data=json.dumps(payload),
            timeout=10
        )
        
        if response.status_code == 200:
            return True
        else:
            print(f'企业微信消息发送失败: {response.status_code}')
            return False
    
    def send_markdown_message(self, title: str, content: str):
        """发送Markdown格式消息"""
        payload = {
            'msgtype': 'markdown',
            'markdown': {
                'content': f'## {title}\n{content}'
            }
        }
        
        response = requests.post(
            self.webhook_url,
            headers={'Content-Type': 'application/json'},
            data=json.dumps(payload),
            timeout=10
        )
        
        return response.status_code == 200

def create_daily_report(analysis: Dict) -> str:
    """创建每日报告"""
    report = [
        "🤖 Clawdbot 每日使用报告",
        "========================",
        f"📊 总消息数: {analysis.get('total_messages', 0)}",
        f"👤 用户消息: {analysis.get('user_messages', 0)}",
        f"🤖 AI回复: {analysis.get('assistant_messages', 0)}",
        "",
        "🔥 热门话题:"
    ]
    
    for topic in analysis.get('common_topics', [])[:5]:
        report.append(f"- {topic['topic']}: {topic['count']}次")
    
    report.extend([
        "",
        "⏰ 高峰时段:"
    ])
    
    for hour_data in analysis.get('busiest_hours', [])[:3]:
        report.append(f"- {hour_data['hour']}: {hour_data['count']}条消息")
    
    return "\n".join(report)

5. 完整使用示例

5.1 主程序实现

def main():
    # 初始化客户端
    client = ClawdbotClient(
        base_url='http://localhost:18789',
        api_token='dev-test-token'
    )
    
    # 初始化分析器
    analyzer = SessionAnalyzer(client)
    
    # 加载最近7天的数据
    print("正在加载会话数据...")
    analyzer.load_sessions(days=7)
    
    # 进行分析
    print("正在进行数据分析...")
    analysis = analyzer.analyze_message_patterns()
    
    # 生成报告
    report = create_daily_report(analysis)
    print("\n分析报告:")
    print(report)
    
    # 推送到企业微信
    if WECHAT_CONFIG['enabled']:
        notifier = WeChatNotifier(WECHAT_CONFIG['webhook_url'])
        success = notifier.send_markdown_message(
            "Clawdbot每日报告", 
            report
        )
        
        if success:
            print("✅ 报告已推送到企业微信")
        else:
            print("❌ 企业微信推送失败")
    
    # 保存分析结果到文件
    with open('clawdbot_analysis.json', 'w', encoding='utf-8') as f:
        json.dump(analysis, f, ensure_ascii=False, indent=2)
    
    print("✅ 分析结果已保存到 clawdbot_analysis.json")

if __name__ == '__main__':
    main()

5.2 定时任务配置

创建定时任务脚本 cron_analysis.py

#!/usr/bin/env python3
"""
Clawdbot分析定时任务
每天上午9点运行一次
"""

import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

from clawdbot_analysis import main

if __name__ == '__main__':
    try:
        main()
        print("分析任务完成")
    except Exception as e:
        print(f"分析任务失败: {str(e)}")
        # 可以在这里添加错误通知逻辑

配置crontab每天自动运行:

# 编辑crontab
crontab -e

# 添加以下行(每天上午9点运行)
0 9 * * * /usr/bin/python3 /path/to/cron_analysis.py >> /var/log/clawdbot_analysis.log 2>&1

6. 高级功能扩展

6.1 情感分析集成

from textblob import TextBlob
import jieba

class SentimentAnalyzer:
    def analyze_sentiment(self, text: str) -> Dict:
        """分析文本情感"""
        # 使用TextBlob进行英文情感分析
        blob = TextBlob(text)
        sentiment_en = blob.sentiment
        
        # 使用jieba进行中文情感分析(需要额外训练模型)
        words = list(jieba.cut(text))
        
        return {
            'polarity': sentiment_en.polarity,
            'subjectivity': sentiment_en.subjectivity,
            'word_count': len(words),
            'is_positive': sentiment_en.polarity > 0.1,
            'is_negative': sentiment_en.polarity < -0.1
        }

def enhance_analysis_with_sentiment(analyzer: SessionAnalyzer):
    """增强分析结果的情感维度"""
    sentiment_analyzer = SentimentAnalyzer()
    
    for message in analyzer.sessions_data:
        if message['role'] == 'user':
            sentiment = sentiment_analyzer.analyze_sentiment(message['content'])
            message['sentiment'] = sentiment

6.2 数据可视化报告

import matplotlib.pyplot as plt
import seaborn as sns

def generate_visual_report(analysis: Dict, output_dir: str = 'reports'):
    """生成可视化报告"""
    os.makedirs(output_dir, exist_ok=True)
    
    # 消息数量统计图
    plt.figure(figsize=(10, 6))
    message_types = ['用户消息', 'AI回复']
    counts = [analysis['user_messages'], analysis['assistant_messages']]
    
    plt.bar(message_types, counts)
    plt.title('消息类型分布')
    plt.savefig(f'{output_dir}/message_types.png')
    plt.close()
    
    # 高峰时段图
    hours_data = analysis['busiest_hours']
    hours = [int(h['hour'].split(':')[0]) for h in hours_data]
    counts = [h['count'] for h in hours_data]
    
    plt.figure(figsize=(12, 6))
    plt.plot(hours, counts, marker='o')
    plt.title('消息发送高峰时段')
    plt.xlabel('小时')
    plt.ylabel('消息数量')
    plt.grid(True)
    plt.savefig(f'{output_dir}/peak_hours.png')
    plt.close()

7. 总结与展望

通过这个Python封装方案,我们实现了对Clawdbot会话数据的批量分析和企业微信集成。这个方案具有以下优势:

  1. 自动化处理:可以定时自动运行,无需人工干预
  2. 深度分析:提供丰富的会话数据统计和分析功能
  3. 企业集成:完美支持企业微信消息推送
  4. 扩展性强:可以轻松添加新的分析维度和集成平台

在实际部署时,建议:

  • 根据实际业务需求调整分析指标
  • 设置合适的定时任务频率(每天/每周)
  • 定期检查和更新API集成配置
  • 监控分析任务的运行状态和日志

这个方案为企业和团队提供了强大的Clawdbot会话监控和分析能力,帮助更好地理解用户需求,优化AI服务体验。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐