Clawdbot汉化版代码实例:用Python封装Clawdbot API实现批量会话分析
本文介绍了如何在星图GPU平台上自动化部署Clawdbot汉化版增加企业微信入口镜像,实现企业级AI会话数据分析。通过Python封装API,可批量导出对话记录并进行统计分析,典型应用包括自动生成使用报告并推送至企业微信,助力团队优化AI服务体验。
·
Clawdbot汉化版代码实例:用Python封装Clawdbot API实现批量会话分析
1. 项目背景与需求场景
在实际的企业应用场景中,我们经常需要对大量的AI对话会话进行分析和统计。Clawdbot作为一个优秀的本地化AI对话平台,虽然提供了丰富的命令行工具,但在批量处理和数据分析方面还存在一些不便。
通过Python封装Clawdbot API,我们可以实现:
- 批量会话导出:一次性导出所有历史对话记录
- 会话数据分析:统计对话频率、时长、主题分布等指标
- 自动化报告生成:定期生成对话分析报告
- 企业微信集成:将分析结果推送到企业微信工作群
这种集成方式特别适合需要监控AI助手使用情况、分析用户需求、优化AI服务的团队和企业。
2. 环境准备与基础配置
2.1 安装必要的Python库
首先确保你的Python环境已经安装以下依赖库:
pip install requests pandas numpy matplotlib seaborn
2.2 配置Clawdbot访问信息
创建配置文件 clawdbot_config.py:
# Clawdbot API配置
CLAWDBOT_CONFIG = {
'base_url': 'http://localhost:18789',
'api_token': 'dev-test-token',
'gateway_token': 'dev-test-token',
'data_dir': '/root/.clawdbot',
'sessions_path': '/root/.clawdbot/agents/main/sessions'
}
# 企业微信配置
WECHAT_CONFIG = {
'webhook_url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY',
'enabled': True
}
3. Clawdbot API封装实现
3.1 基础API客户端类
创建 ClawdbotClient 类来封装所有API调用:
import requests
import json
import os
from typing import Dict, List, Optional
class ClawdbotClient:
def __init__(self, base_url: str, api_token: str):
self.base_url = base_url.rstrip('/')
self.api_token = api_token
self.headers = {
'Authorization': f'Bearer {api_token}',
'Content-Type': 'application/json'
}
def send_message(self, message: str, agent: str = 'main',
thinking: str = 'medium', session_id: Optional[str] = None) -> Dict:
"""发送消息到Clawdbot"""
payload = {
'agent': agent,
'message': message,
'thinking': thinking
}
if session_id:
payload['session_id'] = session_id
response = requests.post(
f'{self.base_url}/api/chat',
headers=self.headers,
json=payload,
timeout=30
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f'API请求失败: {response.status_code} - {response.text}')
def get_session_history(self, session_id: str) -> List[Dict]:
"""获取特定会话的历史记录"""
response = requests.get(
f'{self.base_url}/api/sessions/{session_id}/history',
headers=self.headers,
timeout=30
)
if response.status_code == 200:
return response.json().get('messages', [])
else:
raise Exception(f'获取会话历史失败: {response.status_code}')
def list_sessions(self, limit: int = 100, offset: int = 0) -> Dict:
"""列出所有会话"""
params = {'limit': limit, 'offset': offset}
response = requests.get(
f'{self.base_url}/api/sessions',
headers=self.headers,
params=params,
timeout=30
)
if response.status_code == 200:
return response.json()
else:
raise Exception(f'列出会话失败: {response.status_code}')
3.2 会话数据分析类
创建 SessionAnalyzer 类来处理会话数据:
import pandas as pd
from datetime import datetime, timedelta
import re
from collections import Counter
class SessionAnalyzer:
def __init__(self, client: ClawdbotClient):
self.client = client
self.sessions_data = []
def load_sessions(self, days: int = 7):
"""加载指定天数内的会话数据"""
end_time = datetime.now()
start_time = end_time - timedelta(days=days)
# 模拟从API获取会话数据
# 实际使用时替换为真实的API调用
sessions = self.client.list_sessions(limit=1000)
for session in sessions.get('items', []):
session_id = session['id']
created_at = datetime.fromisoformat(session['created_at'].replace('Z', '+00:00'))
if created_at >= start_time:
history = self.client.get_session_history(session_id)
self.sessions_data.extend(history)
def analyze_message_patterns(self) -> Dict:
"""分析消息模式"""
if not self.sessions_data:
return {}
df = pd.DataFrame(self.sessions_data)
# 计算基本统计
user_messages = df[df['role'] == 'user']
assistant_messages = df[df['role'] == 'assistant']
analysis = {
'total_messages': len(df),
'user_messages': len(user_messages),
'assistant_messages': len(assistant_messages),
'avg_response_length': assistant_messages['content'].str.len().mean(),
'common_topics': self._extract_topics(user_messages['content']),
'busiest_hours': self._analyze_peak_hours(df),
'session_duration_stats': self._analyze_session_duration(df)
}
return analysis
def _extract_topics(self, messages: pd.Series) -> List[Dict]:
"""提取常见话题"""
# 简单的关键词提取
common_words = [
'如何', '怎么', '为什么', '请帮', '解释', '说明',
'代码', '编程', '写一个', '制作', '设计', '建议'
]
topic_counter = Counter()
for message in messages:
if isinstance(message, str):
for word in common_words:
if word in message:
topic_counter[word] += 1
return [{'topic': k, 'count': v} for k, v in topic_counter.most_common(10)]
def _analyze_peak_hours(self, df: pd.DataFrame) -> List[Dict]:
"""分析高峰时段"""
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour
hour_counts = df['hour'].value_counts().sort_index()
return [{'hour': f'{h}:00', 'count': count}
for h, count in hour_counts.items()]
def _analyze_session_duration(self, df: pd.DataFrame) -> Dict:
"""分析会话时长统计"""
# 这里需要根据实际数据结构实现
return {
'avg_duration_minutes': 0,
'max_duration_minutes': 0,
'min_duration_minutes': 0
}
4. 企业微信集成实现
4.1 企业微信消息推送
import requests
import json
class WeChatNotifier:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
def send_text_message(self, content: str, mentioned_list: List[str] = None):
"""发送文本消息到企业微信"""
payload = {
'msgtype': 'text',
'text': {
'content': content,
'mentioned_list': mentioned_list or []
}
}
response = requests.post(
self.webhook_url,
headers={'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=10
)
if response.status_code == 200:
return True
else:
print(f'企业微信消息发送失败: {response.status_code}')
return False
def send_markdown_message(self, title: str, content: str):
"""发送Markdown格式消息"""
payload = {
'msgtype': 'markdown',
'markdown': {
'content': f'## {title}\n{content}'
}
}
response = requests.post(
self.webhook_url,
headers={'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=10
)
return response.status_code == 200
def create_daily_report(analysis: Dict) -> str:
"""创建每日报告"""
report = [
"🤖 Clawdbot 每日使用报告",
"========================",
f"📊 总消息数: {analysis.get('total_messages', 0)}",
f"👤 用户消息: {analysis.get('user_messages', 0)}",
f"🤖 AI回复: {analysis.get('assistant_messages', 0)}",
"",
"🔥 热门话题:"
]
for topic in analysis.get('common_topics', [])[:5]:
report.append(f"- {topic['topic']}: {topic['count']}次")
report.extend([
"",
"⏰ 高峰时段:"
])
for hour_data in analysis.get('busiest_hours', [])[:3]:
report.append(f"- {hour_data['hour']}: {hour_data['count']}条消息")
return "\n".join(report)
5. 完整使用示例
5.1 主程序实现
def main():
# 初始化客户端
client = ClawdbotClient(
base_url='http://localhost:18789',
api_token='dev-test-token'
)
# 初始化分析器
analyzer = SessionAnalyzer(client)
# 加载最近7天的数据
print("正在加载会话数据...")
analyzer.load_sessions(days=7)
# 进行分析
print("正在进行数据分析...")
analysis = analyzer.analyze_message_patterns()
# 生成报告
report = create_daily_report(analysis)
print("\n分析报告:")
print(report)
# 推送到企业微信
if WECHAT_CONFIG['enabled']:
notifier = WeChatNotifier(WECHAT_CONFIG['webhook_url'])
success = notifier.send_markdown_message(
"Clawdbot每日报告",
report
)
if success:
print("✅ 报告已推送到企业微信")
else:
print("❌ 企业微信推送失败")
# 保存分析结果到文件
with open('clawdbot_analysis.json', 'w', encoding='utf-8') as f:
json.dump(analysis, f, ensure_ascii=False, indent=2)
print("✅ 分析结果已保存到 clawdbot_analysis.json")
if __name__ == '__main__':
main()
5.2 定时任务配置
创建定时任务脚本 cron_analysis.py:
#!/usr/bin/env python3
"""
Clawdbot分析定时任务
每天上午9点运行一次
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from clawdbot_analysis import main
if __name__ == '__main__':
try:
main()
print("分析任务完成")
except Exception as e:
print(f"分析任务失败: {str(e)}")
# 可以在这里添加错误通知逻辑
配置crontab每天自动运行:
# 编辑crontab
crontab -e
# 添加以下行(每天上午9点运行)
0 9 * * * /usr/bin/python3 /path/to/cron_analysis.py >> /var/log/clawdbot_analysis.log 2>&1
6. 高级功能扩展
6.1 情感分析集成
from textblob import TextBlob
import jieba
class SentimentAnalyzer:
def analyze_sentiment(self, text: str) -> Dict:
"""分析文本情感"""
# 使用TextBlob进行英文情感分析
blob = TextBlob(text)
sentiment_en = blob.sentiment
# 使用jieba进行中文情感分析(需要额外训练模型)
words = list(jieba.cut(text))
return {
'polarity': sentiment_en.polarity,
'subjectivity': sentiment_en.subjectivity,
'word_count': len(words),
'is_positive': sentiment_en.polarity > 0.1,
'is_negative': sentiment_en.polarity < -0.1
}
def enhance_analysis_with_sentiment(analyzer: SessionAnalyzer):
"""增强分析结果的情感维度"""
sentiment_analyzer = SentimentAnalyzer()
for message in analyzer.sessions_data:
if message['role'] == 'user':
sentiment = sentiment_analyzer.analyze_sentiment(message['content'])
message['sentiment'] = sentiment
6.2 数据可视化报告
import matplotlib.pyplot as plt
import seaborn as sns
def generate_visual_report(analysis: Dict, output_dir: str = 'reports'):
"""生成可视化报告"""
os.makedirs(output_dir, exist_ok=True)
# 消息数量统计图
plt.figure(figsize=(10, 6))
message_types = ['用户消息', 'AI回复']
counts = [analysis['user_messages'], analysis['assistant_messages']]
plt.bar(message_types, counts)
plt.title('消息类型分布')
plt.savefig(f'{output_dir}/message_types.png')
plt.close()
# 高峰时段图
hours_data = analysis['busiest_hours']
hours = [int(h['hour'].split(':')[0]) for h in hours_data]
counts = [h['count'] for h in hours_data]
plt.figure(figsize=(12, 6))
plt.plot(hours, counts, marker='o')
plt.title('消息发送高峰时段')
plt.xlabel('小时')
plt.ylabel('消息数量')
plt.grid(True)
plt.savefig(f'{output_dir}/peak_hours.png')
plt.close()
7. 总结与展望
通过这个Python封装方案,我们实现了对Clawdbot会话数据的批量分析和企业微信集成。这个方案具有以下优势:
- 自动化处理:可以定时自动运行,无需人工干预
- 深度分析:提供丰富的会话数据统计和分析功能
- 企业集成:完美支持企业微信消息推送
- 扩展性强:可以轻松添加新的分析维度和集成平台
在实际部署时,建议:
- 根据实际业务需求调整分析指标
- 设置合适的定时任务频率(每天/每周)
- 定期检查和更新API集成配置
- 监控分析任务的运行状态和日志
这个方案为企业和团队提供了强大的Clawdbot会话监控和分析能力,帮助更好地理解用户需求,优化AI服务体验。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐



所有评论(0)