第一部分:AgenticAI基础概念

1.1 什么是AgenticAI?

AgenticAI(智能体AI)是指具有自主性、目标导向和交互能力的AI系统。与传统AI模型不同,AgenticAI能够感知环境、制定计划、执行行动并从中学习,实现复杂的多步骤任务。

python

# 基础AgenticAI类示例
class BasicAIAgent:
    def __init__(self, name, capabilities):
        self.name = name
        self.capabilities = capabilities  # 代理具备的能力
        self.memory = []  # 记忆存储
        self.goals = []   # 目标列表
    
    def perceive(self, environment):
        """感知环境信息"""
        observations = []
        for sensor in self.capabilities.get('sensors', []):
            observation = sensor.observe(environment)
            observations.append(observation)
        return observations
    
    def plan(self, observations, goals):
        """基于观察和目标制定计划"""
        plan = []
        # 简单的规划逻辑
        for goal in goals:
            if self._is_achievable(goal, observations):
                steps = self._generate_steps(goal, observations)
                plan.extend(steps)
        return plan
    
    def act(self, plan, environment):
        """执行计划中的行动"""
        results = []
        for action in plan:
            result = action.execute(environment)
            results.append(result)
            self.memory.append({
                'action': action,
                'result': result,
                'timestamp': time.time()
            })
        return results
    
    def learn(self):
        """从经验中学习"""
        recent_experiences = self.memory[-10:]  # 最近10条经验
        for experience in recent_experiences:
            if experience['result']['success']:
                # 强化成功的行为模式
                self._reinforce_successful_pattern(experience)
            else:
                # 调整失败的行为
                self._adjust_failed_behavior(experience)

1.2 AgenticAI的核心组件

graph TD
    A[AgenticAI系统] --> B[感知模块]
    A --> C[决策模块]
    A --> D[执行模块]
    A --> E[学习模块]
    
    B --> B1[环境传感器]
    B --> B2[数据处理器]
    B --> B3[状态评估器]
    
    C --> C1[目标管理器]
    C --> C2[规划器]
    C --> C3[策略选择器]
    
    D --> D1[动作执行器]
    D --> D2[通信接口]
    D --> D3[工具调用]
    
    E --> E1[经验回放]
    E --> E2[模型更新]
    E --> E3[性能评估]
    
    F[环境] --> B
    D --> F
    E --> C

第二部分:AI编程基础

2.1 Python AI开发环境搭建

python

# requirements.txt 示例
"""
torch>=2.0.0
transformers>=4.30.0
langchain>=0.0.200
openai>=1.0.0
numpy>=1.24.0
pandas>=1.5.0
matplotlib>=3.7.0
seaborn>=0.12.0
scikit-learn>=1.2.0
jupyter>=1.0.0
"""

# 环境配置脚本
import subprocess
import sys

def setup_ai_environment():
    """设置AI开发环境"""
    packages = [
        "torch", "transformers", "langchain", "openai",
        "numpy", "pandas", "matplotlib", "seaborn", 
        "scikit-learn", "jupyter"
    ]
    
    for package in packages:
        try:
            subprocess.check_call([sys.executable, "-m", "pip", "install", package])
            print(f"✓ 成功安装 {package}")
        except subprocess.CalledProcessError:
            print(f"✗ 安装 {package} 失败")
    
    # 验证安装
    try:
        import torch
        import transformers
        print("✓ AI环境配置完成!")
    except ImportError as e:
        print(f"✗ 环境配置失败: {e}")

if __name__ == "__main__":
    setup_ai_environment()

2.2 基础AI模型使用

python

import torch
from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM
import numpy as np
import matplotlib.pyplot as plt

class BasicAIModels:
    def __init__(self):
        self.models = {}
        self.tokenizers = {}
    
    def load_text_generation_model(self, model_name="gpt2"):
        """加载文本生成模型"""
        print(f"正在加载模型: {model_name}")
        self.tokenizers['text_gen'] = AutoTokenizer.from_pretrained(model_name)
        self.models['text_gen'] = AutoModelForCausalLM.from_pretrained(model_name)
        
        # 添加padding token如果不存在
        if self.tokenizers['text_gen'].pad_token is None:
            self.tokenizers['text_gen'].pad_token = self.tokenizers['text_gen'].eos_token
        
        print(f"✓ 模型 {model_name} 加载完成")
    
    def generate_text(self, prompt, max_length=100, temperature=0.7):
        """生成文本"""
        if 'text_gen' not in self.models:
            self.load_text_generation_model()
        
        inputs = self.tokenizers['text_gen'](prompt, return_tensors="pt")
        
        with torch.no_grad():
            outputs = self.models['text_gen'].generate(
                inputs.input_ids,
                max_length=max_length,
                temperature=temperature,
                do_sample=True,
                pad_token_id=self.tokenizers['text_gen'].eos_token_id
            )
        
        generated_text = self.tokenizers['text_gen'].decode(outputs[0], skip_special_tokens=True)
        return generated_text
    
    def load_sentiment_analysis(self):
        """加载情感分析模型"""
        self.models['sentiment'] = pipeline(
            "sentiment-analysis",
            model="distilbert-base-uncased-finetuned-sst-2-english"
        )
    
    def analyze_sentiment(self, text):
        """分析文本情感"""
        if 'sentiment' not in self.models:
            self.load_sentiment_analysis()
        
        result = self.models['sentiment'](text)
        return result

# 使用示例
if __name__ == "__main__":
    ai_models = BasicAIModels()
    
    # 文本生成示例
    prompt = "人工智能的未来发展"
    generated_text = ai_models.generate_text(prompt, max_length=150)
    print("生成的文本:", generated_text)
    
    # 情感分析示例
    sentiment = ai_models.analyze_sentiment("I love this amazing technology!")
    print("情感分析结果:", sentiment)

第三部分:构建智能代理系统

3.1 任务导向型代理

python

from typing import List, Dict, Any, Callable
import json
import time

class TaskOrientedAgent:
    def __init__(self, name: str, tools: List[Callable] = None):
        self.name = name
        self.tools = tools or []
        self.task_history = []
        self.current_goal = None
        self.state = "idle"
        
    def add_tool(self, tool: Callable, description: str):
        """添加工具到代理"""
        self.tools.append({
            'function': tool,
            'description': description,
            'name': tool.__name__
        })
    
    def set_goal(self, goal: str):
        """设置当前目标"""
        self.current_goal = goal
        self.state = "planning"
        self.task_history.append({
            'timestamp': time.time(),
            'type': 'goal_set',
            'goal': goal
        })
    
    def plan_execution(self) -> List[Dict]:
        """制定执行计划"""
        print(f"{self.name} 正在为目标制定计划: {self.current_goal}")
        
        # 简单的基于规则的规划
        plan = []
        goal_lower = self.current_goal.lower()
        
        if "计算" in goal_lower or "calculate" in goal_lower:
            plan.append({
                'action': 'calculate',
                'tool': 'calculator_tool',
                'parameters': {'expression': self._extract_expression(goal_lower)}
            })
        elif "搜索" in goal_lower or "search" in goal_lower:
            plan.append({
                'action': 'search',
                'tool': 'search_tool',
                'parameters': {'query': self._extract_query(goal_lower)}
            })
        elif "总结" in goal_lower or "summarize" in goal_lower:
            plan.append({
                'action': 'summarize',
                'tool': 'summary_tool',
                'parameters': {'text': self._extract_text(goal_lower)}
            })
        else:
            # 默认行动
            plan.append({
                'action': 'general_processing',
                'tool': 'general_tool',
                'parameters': {'input': self.current_goal}
            })
        
        return plan
    
    def execute_plan(self, plan: List[Dict]) -> Any:
        """执行计划"""
        self.state = "executing"
        results = []
        
        for step in plan:
            print(f"执行步骤: {step['action']}")
            
            # 查找对应的工具
            tool_info = next(
                (t for t in self.tools if t['name'] == step['tool']), 
                None
            )
            
            if tool_info:
                try:
                    result = tool_info['function'](**step['parameters'])
                    results.append({
                        'step': step['action'],
                        'result': result,
                        'status': 'success'
                    })
                except Exception as e:
                    results.append({
                        'step': step['action'],
                        'result': str(e),
                        'status': 'failed'
                    })
            else:
                results.append({
                    'step': step['action'],
                    'result': f"工具 {step['tool']} 未找到",
                    'status': 'failed'
                })
        
        self.state = "completed"
        return results
    
    def _extract_expression(self, text: str) -> str:
        """从文本中提取数学表达式"""
        # 简单的提取逻辑
        import re
        numbers = re.findall(r'\d+\.?\d*', text)
        operators = re.findall(r'[+\-*/]', text)
        return ' '.join(numbers + operators)
    
    def _extract_query(self, text: str) -> str:
        """从文本中提取搜索查询"""
        # 移除搜索相关关键词
        query = text.replace('搜索', '').replace('search', '').strip()
        return query
    
    def _extract_text(self, text: str) -> str:
        """从文本中提取需要总结的内容"""
        # 在实际应用中,这里会有更复杂的逻辑
        return text

# 工具函数示例
def calculator_tool(expression: str) -> float:
    """计算工具"""
    try:
        result = eval(expression)
        return f"计算结果: {expression} = {result}"
    except Exception as e:
        return f"计算错误: {e}"

def search_tool(query: str) -> str:
    """搜索工具(模拟)"""
    return f"搜索 '{query}' 的结果: 这是模拟的搜索结果"

def summary_tool(text: str) -> str:
    """总结工具"""
    words = text.split()
    if len(words) > 10:
        summary = ' '.join(words[:10]) + "..."
    else:
        summary = text
    return f"总结: {summary}"

# 使用示例
if __name__ == "__main__":
    # 创建代理
    agent = TaskOrientedAgent("任务助手")
    
    # 添加工具
    agent.add_tool(calculator_tool, "执行数学计算")
    agent.add_tool(search_tool, "搜索信息")
    agent.add_tool(summary_tool, "文本总结")
    
    # 设置目标并执行
    agent.set_goal("计算 25 + 38 的结果")
    plan = agent.plan_execution()
    results = agent.execute_plan(plan)
    
    print("执行结果:")
    for result in results:
        print(f"- {result['step']}: {result['result']} (状态: {result['status']})")

3.2 多步骤任务处理流程图

flowchart TD
    Start[开始] --> Input[输入任务]
    Input --> Parse[解析任务]
    Parse --> Goal[确定目标]
    Goal --> Plan[制定计划]
    
    Plan --> PlanStep1[步骤1: 分析需求]
    Plan --> PlanStep2[步骤2: 选择工具]
    Plan --> PlanStep3[步骤3: 安排顺序]
    
    PlanStep3 --> Execute[执行计划]
    
    Execute --> Step1[执行步骤1]
    Step1 --> Check1{步骤1成功?}
    Check1 -->|是| Step2[执行步骤2]
    Check1 -->|否| Adjust1[调整计划]
    Adjust1 --> Step1
    
    Step2 --> Check2{步骤2成功?}
    Check2 -->|是| Step3[执行步骤3]
    Check2 -->|否| Adjust2[调整计划]
    Adjust2 --> Step2
    
    Step3 --> Check3{步骤3成功?}
    Check3 -->|是| Evaluate[评估结果]
    Check3 -->|否| Adjust3[调整计划]
    Adjust3 --> Step3
    
    Evaluate --> FinalCheck{目标达成?}
    FinalCheck -->|是| Success[任务成功]
    FinalCheck -->|否| Replan[重新规划]
    Replan --> Plan
    
    Success --> Learn[学习经验]
    Learn --> End[结束]

第四部分:高级AgenticAI功能

4.1 记忆与上下文管理

python

import numpy as np
from datetime import datetime, timedelta
from collections import defaultdict
import pickle

class MemorySystem:
    def __init__(self, max_memories=1000):
        self.memories = []
        self.max_memories = max_memories
        self.memory_vectors = {}  # 用于相似性搜索的向量存储
        self.importance_weights = defaultdict(float)
    
    def add_memory(self, content: str, memory_type: str = "general", 
                   importance: float = 0.5, metadata: dict = None):
        """添加记忆"""
        memory = {
            'id': len(self.memories),
            'content': content,
            'type': memory_type,
            'timestamp': datetime.now(),
            'importance': importance,
            'last_accessed': datetime.now(),
            'access_count': 0,
            'metadata': metadata or {}
        }
        
        self.memories.append(memory)
        
        # 维护记忆数量限制
        if len(self.memories) > self.max_memories:
            self._prune_memories()
        
        # 更新重要性权重
        self._update_importance(memory_type, importance)
    
    def retrieve_relevant_memories(self, query: str, n_results: int = 5):
        """检索相关记忆"""
        # 简单的基于关键词的检索(实际应用中会使用嵌入向量)
        query_lower = query.lower()
        relevant_memories = []
        
        for memory in self.memories:
            content_lower = memory['content'].lower()
            
            # 计算简单相关性分数
            score = 0
            for word in query_lower.split():
                if word in content_lower:
                    score += 1
            
            # 考虑记忆的重要性和新鲜度
            recency = self._calculate_recency(memory['timestamp'])
            importance_score = memory['importance']
            
            final_score = score + importance_score + recency
            
            if final_score > 0:
                relevant_memories.append((memory, final_score))
        
        # 按分数排序并返回前n个结果
        relevant_memories.sort(key=lambda x: x[1], reverse=True)
        return [mem for mem, score in relevant_memories[:n_results]]
    
    def _calculate_recency(self, timestamp):
        """计算新鲜度分数"""
        time_diff = datetime.now() - timestamp
        hours_diff = time_diff.total_seconds() / 3600
        
        # 指数衰减的新鲜度分数
        recency = np.exp(-hours_diff / 24)  # 24小时衰减因子
        return recency
    
    def _prune_memories(self):
        """修剪记忆,移除不重要的"""
        if len(self.memories) <= self.max_memories:
            return
        
        # 计算每个记忆的综合分数
        memory_scores = []
        for memory in self.memories:
            recency = self._calculate_recency(memory['timestamp'])
            score = memory['importance'] + recency + (memory['access_count'] * 0.1)
            memory_scores.append((memory, score))
        
        # 按分数排序,移除分数最低的
        memory_scores.sort(key=lambda x: x[1])
        memories_to_remove = memory_scores[:len(self.memories) - self.max_memories]
        
        for memory, _ in memories_to_remove:
            self.memories.remove(memory)
    
    def _update_importance(self, memory_type: str, importance: float):
        """更新记忆类型的重要性权重"""
        self.importance_weights[memory_type] = (
            0.9 * self.importance_weights[memory_type] + 0.1 * importance
        )
    
    def save_memories(self, filepath: str):
        """保存记忆到文件"""
        with open(filepath, 'wb') as f:
            pickle.dump({
                'memories': self.memories,
                'importance_weights': dict(self.importance_weights)
            }, f)
    
    def load_memories(self, filepath: str):
        """从文件加载记忆"""
        try:
            with open(filepath, 'rb') as f:
                data = pickle.load(f)
                self.memories = data['memories']
                self.importance_weights = defaultdict(float, data['importance_weights'])
        except FileNotFoundError:
            print("记忆文件不存在,创建新的记忆系统")

# 增强的代理类,包含记忆系统
class AdvancedAIAgent(TaskOrientedAgent):
    def __init__(self, name: str, tools: List[Callable] = None):
        super().__init__(name, tools)
        self.memory_system = MemorySystem()
        self.conversation_context = []
        self.max_context_length = 10
    
    def process_with_memory(self, user_input: str) -> str:
        """使用记忆处理用户输入"""
        # 检索相关记忆
        relevant_memories = self.memory_system.retrieve_relevant_memories(user_input)
        
        # 添加上下文
        self.conversation_context.append({
            'role': 'user',
            'content': user_input,
            'timestamp': datetime.now()
        })
        
        # 构建增强的提示
        enhanced_prompt = self._build_enhanced_prompt(user_input, relevant_memories)
        
        # 处理请求(这里简化处理,实际会调用AI模型)
        response = self._generate_response(enhanced_prompt)
        
        # 保存到记忆
        self.memory_system.add_memory(
            content=f"用户: {user_input} - 助手: {response}",
            memory_type="conversation",
            importance=0.7,
            metadata={'interaction_type': 'conversation'}
        )
        
        # 更新上下文
        self.conversation_context.append({
            'role': 'assistant',
            'content': response,
            'timestamp': datetime.now()
        })
        
        # 维护上下文长度
        if len(self.conversation_context) > self.max_context_length * 2:
            self.conversation_context = self.conversation_context[-self.max_context_length * 2:]
        
        return response
    
    def _build_enhanced_prompt(self, user_input: str, relevant_memories: list) -> str:
        """构建增强的提示"""
        prompt = "基于以下信息和上下文回答用户问题:\n\n"
        
        # 添加上下文
        if self.conversation_context:
            prompt += "最近的对话上下文:\n"
            for msg in self.conversation_context[-self.max_context_length:]:
                role = "用户" if msg['role'] == 'user' else "助手"
                prompt += f"{role}: {msg['content']}\n"
            prompt += "\n"
        
        # 添加相关记忆
        if relevant_memories:
            prompt += "相关记忆:\n"
            for memory in relevant_memories:
                prompt += f"- {memory['content']}\n"
            prompt += "\n"
        
        prompt += f"当前问题: {user_input}\n\n回答:"
        return prompt
    
    def _generate_response(self, prompt: str) -> str:
        """生成响应(简化版本)"""
        # 在实际应用中,这里会调用语言模型
        # 这里使用简单的规则作为示例
        
        if "你好" in prompt or "hello" in prompt.lower():
            return "你好!我是你的AI助手,有什么我可以帮助你的吗?"
        elif "计算" in prompt:
            # 提取并执行计算
            import re
            numbers = re.findall(r'\d+', prompt)
            if len(numbers) >= 2:
                return f"计算结果: {numbers[0]} + {numbers[1]} = {int(numbers[0]) + int(numbers[1])}"
        elif "天气" in prompt:
            return "我目前无法获取实时天气信息,但建议你查看当地的天气预报应用。"
        
        return "我理解了你的问题,但我需要更多信息来提供准确的回答。你能详细说明一下吗?"

4.2 工具使用与API集成

python

import requests
import json

class ToolIntegrationSystem:
    def __init__(self):
        self.available_tools = {}
        self.api_keys = {}
    
    def register_tool(self, tool_name: str, tool_function: Callable, description: str):
        """注册工具"""
        self.available_tools[tool_name] = {
            'function': tool_function,
            'description': description,
            'parameters': self._extract_parameters(tool_function)
        }
    
    def register_api(self, api_name: str, base_url: str, api_key: str = None):
        """注册API"""
        self.api_keys[api_name] = {
            'base_url': base_url,
            'api_key': api_key
        }
    
    def execute_tool(self, tool_name: str, **kwargs):
        """执行工具"""
        if tool_name not in self.available_tools:
            return f"错误: 工具 '{tool_name}' 未找到"
        
        try:
            tool = self.available_tools[tool_name]
            result = tool['function'](**kwargs)
            return result
        except Exception as e:
            return f"执行工具时出错: {e}"
    
    def call_api(self, api_name: str, endpoint: str, method: str = "GET", data: dict = None):
        """调用API"""
        if api_name not in self.api_keys:
            return f"错误: API '{api_name}' 未注册"
        
        api_info = self.api_keys[api_name]
        url = f"{api_info['base_url']}/{endpoint}"
        headers = {}
        
        # 添加API密钥(如果存在)
        if api_info['api_key']:
            headers['Authorization'] = f"Bearer {api_info['api_key']}"
        
        try:
            if method.upper() == "GET":
                response = requests.get(url, headers=headers, params=data)
            elif method.upper() == "POST":
                headers['Content-Type'] = 'application/json'
                response = requests.post(url, headers=headers, data=json.dumps(data))
            else:
                return f"不支持的HTTP方法: {method}"
            
            if response.status_code == 200:
                return response.json()
            else:
                return f"API调用失败: {response.status_code} - {response.text}"
        
        except requests.RequestException as e:
            return f"API请求错误: {e}"
    
    def _extract_parameters(self, func: Callable) -> list:
        """提取函数参数信息"""
        import inspect
        sig = inspect.signature(func)
        parameters = []
        
        for name, param in sig.parameters.items():
            param_info = {
                'name': name,
                'type': str(param.annotation) if param.annotation != inspect.Parameter.empty else 'any',
                'required': param.default == inspect.Parameter.empty
            }
            parameters.append(param_info)
        
        return parameters
    
    def list_tools(self) -> dict:
        """列出所有可用工具"""
        return {
            name: {
                'description': tool['description'],
                'parameters': tool['parameters']
            }
            for name, tool in self.available_tools.items()
        }

# 示例工具函数
def web_search_tool(query: str, max_results: int = 5) -> str:
    """网页搜索工具(模拟)"""
    # 在实际应用中,这里会调用真实的搜索API
    return f"搜索 '{query}' 的模拟结果: 找到了 {max_results} 条相关信息"

def calculator_tool(expression: str) -> float:
    """高级计算工具"""
    try:
        # 安全地评估数学表达式
        allowed_chars = set('0123456789+-*/.() ')
        if all(c in allowed_chars for c in expression):
            result = eval(expression)
            return f"计算结果: {expression} = {result}"
        else:
            return "错误: 表达式包含不安全字符"
    except Exception as e:
        return f"计算错误: {e}"

def weather_tool(city: str) -> str:
    """天气查询工具(模拟)"""
    # 模拟天气数据
    weather_data = {
        "北京": "晴, 25°C",
        "上海": "多云, 23°C", 
        "广州": "雨, 28°C",
        "深圳": "阴, 26°C"
    }
    
    if city in weather_data:
        return f"{city}的天气: {weather_data[city]}"
    else:
        return f"未找到{city}的天气信息"

# 使用示例
if __name__ == "__main__":
    tool_system = ToolIntegrationSystem()
    
    # 注册工具
    tool_system.register_tool(
        "web_search", 
        web_search_tool, 
        "执行网页搜索"
    )
    tool_system.register_tool(
        "calculator", 
        calculator_tool, 
        "执行数学计算"
    )
    tool_system.register_tool(
        "weather", 
        weather_tool, 
        "查询天气信息"
    )
    
    # 测试工具
    print("=== 工具测试 ===")
    print(tool_system.execute_tool("web_search", query="人工智能", max_results=3))
    print(tool_system.execute_tool("calculator", expression="(15 + 23) * 2"))
    print(tool_system.execute_tool("weather", city="北京"))
    
    print("\n=== 可用工具列表 ===")
    tools = tool_system.list_tools()
    for tool_name, tool_info in tools.items():
        print(f"- {tool_name}: {tool_info['description']}")

第五部分:实际应用案例

5.1 个人学习助手

python

class PersonalLearningAssistant(AdvancedAIAgent):
    def __init__(self, name: str):
        super().__init__(name)
        self.learning_goals = []
        self.study_sessions = []
        self.knowledge_base = {}
        
        # 初始化学习相关工具
        self._initialize_learning_tools()
    
    def _initialize_learning_tools(self):
        """初始化学习工具"""
        self.add_tool(self.create_study_plan, "制定学习计划")
        self.add_tool(self.quiz_me, "生成测验题目")
        self.add_tool(self.summarize_content, "总结学习内容")
        self.add_tool(self.track_progress, "跟踪学习进度")
    
    def set_learning_goal(self, goal: str, deadline: str = None):
        """设置学习目标"""
        learning_goal = {
            'goal': goal,
            'deadline': deadline,
            'created_at': datetime.now(),
            'progress': 0.0,
            'sub_tasks': []
        }
        self.learning_goals.append(learning_goal)
        
        self.memory_system.add_memory(
            content=f"设置了学习目标: {goal}",
            memory_type="learning_goal",
            importance=0.9
        )
        
        return f"已设置学习目标: {goal}"
    
    def create_study_plan(self, goal: str, hours_per_week: int = 10, weeks: int = 4):
        """制定学习计划"""
        total_hours = hours_per_week * weeks
        plan = {
            'goal': goal,
            'total_hours': total_hours,
            'weekly_hours': hours_per_week,
            'duration_weeks': weeks,
            'weekly_topics': [],
            'resources': []
        }
        
        # 生成每周学习主题(简化版本)
        topics = self._generate_learning_topics(goal, weeks)
        plan['weekly_topics'] = topics
        
        # 推荐学习资源
        resources = self._recommend_resources(goal)
        plan['resources'] = resources
        
        self.memory_system.add_memory(
            content=f"为目标 '{goal}' 创建了学习计划,总计 {total_hours} 小时",
            memory_type="study_plan",
            importance=0.8,
            metadata={'plan_details': plan}
        )
        
        return plan
    
    def quiz_me(self, topic: str, difficulty: str = "medium", num_questions: int = 5):
        """生成测验题目"""
        # 在实际应用中,这里会调用AI模型生成题目
        questions = []
        
        question_templates = {
            "easy": [
                f"什么是{topic}的基本概念?",
                f"{topic}的主要应用领域有哪些?",
                f"列举三个与{topic}相关的关键词"
            ],
            "medium": [
                f"解释{topic}的核心原理",
                f"比较{topic}与传统方法的区别",
                f"分析{topic}的优势和局限性"
            ],
            "hard": [
                f"详细描述{topic}的实现机制",
                f"讨论{topic}在未来发展中的挑战",
                f"提出改进{topic}的创新思路"
            ]
        }
        
        templates = question_templates.get(difficulty, question_templates["medium"])
        for i in range(min(num_questions, len(templates))):
            questions.append({
                'id': i + 1,
                'question': templates[i],
                'type': 'open_ended'
            })
        
        quiz_session = {
            'topic': topic,
            'difficulty': difficulty,
            'questions': questions,
            'timestamp': datetime.now()
        }
        
        return quiz_session
    
    def summarize_content(self, content: str, max_length: int = 200):
        """总结学习内容"""
        # 简化版的总结功能
        sentences = content.split('。')
        if len(sentences) > 3:
            summary = '。'.join(sentences[:3]) + '。'
        else:
            summary = content
        
        if len(summary) > max_length:
            summary = summary[:max_length] + '...'
        
        return {
            'original_length': len(content),
            'summary_length': len(summary),
            'summary': summary,
            'reduction_ratio': f"{((len(content) - len(summary)) / len(content)) * 100:.1f}%"
        }
    
    def track_progress(self, goal: str = None):
        """跟踪学习进度"""
        if goal:
            # 跟踪特定目标进度
            target_goal = next((g for g in self.learning_goals if g['goal'] == goal), None)
            if target_goal:
                return {
                    'goal': goal,
                    'progress': f"{target_goal['progress'] * 100}%",
                    'days_since_start': (datetime.now() - target_goal['created_at']).days
                }
            else:
                return f"未找到目标: {goal}"
        else:
            # 跟踪所有目标进度
            progress_report = {
                'total_goals': len(self.learning_goals),
                'goals': []
            }
            
            for goal in self.learning_goals:
                progress_report['goals'].append({
                    'goal': goal['goal'],
                    'progress': f"{goal['progress'] * 100}%",
                    'days_active': (datetime.now() - goal['created_at']).days
                })
            
            return progress_report
    
    def _generate_learning_topics(self, goal: str, weeks: int):
        """生成学习主题"""
        # 简化版本,实际应用中会使用更智能的方法
        base_topics = [
            "基础概念和术语",
            "核心原理和机制", 
            "实际应用案例",
            "高级特性和技巧",
            "最佳实践和模式",
            "常见问题和解决方案",
            "进阶学习和资源"
        ]
        
        topics = []
        for i in range(weeks):
            if i < len(base_topics):
                topics.append(f"第{i+1}周: {base_topics[i]}")
            else:
                topics.append(f"第{i+1}周: {goal} 进阶学习")
        
        return topics
    
    def _recommend_resources(self, goal: str):
        """推荐学习资源"""
        # 模拟资源推荐
        resource_categories = {
            "编程": ["官方文档", "在线教程", "视频课程", "实践项目"],
            "数学": ["教科书", "练习册", "在线课程", "解题指南"],
            "语言": ["教科书", "听力材料", "口语练习", "语法指南"]
        }
        
        # 简单的分类逻辑
        if any(keyword in goal.lower() for keyword in ['编程', '代码', '开发']):
            category = "编程"
        elif any(keyword in goal.lower() for keyword in ['数学', '计算', '统计']):
            category = "数学"
        elif any(keyword in goal.lower() for keyword in ['英语', '语言', '口语']):
            category = "语言"
        else:
            category = "通用"
        
        if category in resource_categories:
            return resource_categories[category]
        else:
            return ["在线课程", "实践项目", "参考书籍", "社区论坛"]

# 使用示例
if __name__ == "__main__":
    assistant = PersonalLearningAssistant("学习助手")
    
    print("=== 个人学习助手演示 ===")
    
    # 设置学习目标
    goal = "Python人工智能编程"
    print(assistant.set_learning_goal(goal, "2024-12-31"))
    
    # 创建学习计划
    plan = assistant.create_study_plan(goal, hours_per_week=8, weeks=6)
    print("\n=== 学习计划 ===")
    print(f"目标: {plan['goal']}")
    print(f"总时长: {plan['total_hours']} 小时")
    print("每周主题:")
    for topic in plan['weekly_topics']:
        print(f"  - {topic}")
    
    # 生成测验
    quiz = assistant.quiz_me("机器学习", difficulty="medium", num_questions=3)
    print("\n=== 测验题目 ===")
    for question in quiz['questions']:
        print(f"{question['id']}. {question['question']}")
    
    # 总结内容
    content = "人工智能是计算机科学的一个分支,旨在创造能够执行通常需要人类智能的任务的机器。这些任务包括学习、推理、问题解决、感知和语言理解。人工智能可以分为弱人工智能和强人工智能,弱人工智能专注于特定任务,而强人工智能则具有通用的人类级智能。"
    summary = assistant.summarize_content(content)
    print("\n=== 内容总结 ===")
    print(f"总结: {summary['summary']}")
    print(f"长度减少: {summary['reduction_ratio']}")
    
    # 跟踪进度
    progress = assistant.track_progress()
    print("\n=== 学习进度 ===")
    print(f"总目标数: {progress['total_goals']}")
    for goal_progress in progress['goals']:
        print(f"- {goal_progress['goal']}: {goal_progress['progress']} 完成")

5.2 系统架构图

graph TB
    subgraph UserInterfaceLayer["用户界面层"]
        A[Web界面]
        B[移动应用]
        C[API接口]
        D[命令行工具]
    end
    
    subgraph ApplicationServiceLayer["应用服务层"]
        E[学习助手服务]
        F[任务管理服务]
        G[记忆管理服务]
        H[工具执行服务]
    end
    
    subgraph AICoreLayer["AI核心层"]
        I[语言理解模块]
        J[规划决策模块]
        K[对话管理模块]
        L[学习优化模块]
    end
    
    subgraph DataStorageLayer["数据存储层"]
        M[用户数据]
        N[记忆存储]
        O[知识库]
        P[学习记录]
    end
    
    subgraph ExternalServiceLayer["外部服务层"]
        Q[AI模型API]
        R[工具API]
        S[知识图谱]
        T[云存储]
    end
    
    A --> E
    B --> E
    C --> E
    D --> E
    
    E --> F
    E --> G
    E --> H
    
    F --> J
    G --> I
    H --> J
    
    I --> Q
    J --> Q
    K --> Q
    
    E --> M
    E --> N
    E --> O
    E --> P
    
    H --> R
    E --> S
    G --> T

 

第六部分:性能优化与最佳实践

6.1 性能监控与优化

python

import time
import psutil
import logging
from functools import wraps
from dataclasses import dataclass
from typing import Dict, List, Any

@dataclass
class PerformanceMetrics:
    response_time: float
    memory_usage: float
    cpu_usage: float
    success: bool
    timestamp: float

class PerformanceMonitor:
    def __init__(self):
        self.metrics: List[PerformanceMetrics] = []
        self.logger = logging.getLogger('PerformanceMonitor')
    
    def monitor_performance(self, func):
        """性能监控装饰器"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            start_memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
            start_cpu = psutil.cpu_percent()
            
            try:
                result = func(*args, **kwargs)
                success = True
            except Exception as e:
                result = None
                success = False
                self.logger.error(f"函数 {func.__name__} 执行失败: {e}")
            
            end_time = time.time()
            end_memory = psutil.Process().memory_info().rss / 1024 / 1024
            end_cpu = psutil.cpu_percent()
            
            metrics = PerformanceMetrics(
                response_time=end_time - start_time,
                memory_usage=end_memory - start_memory,
                cpu_usage=end_cpu - start_cpu,
                success=success,
                timestamp=time.time()
            )
            
            self.metrics.append(metrics)
            self._check_performance_thresholds(metrics, func.__name__)
            
            return result
        return wrapper
    
    def _check_performance_thresholds(self, metrics: PerformanceMetrics, func_name: str):
        """检查性能阈值"""
        warnings = []
        
        if metrics.response_time > 5.0:  # 5秒阈值
            warnings.append(f"响应时间过长: {metrics.response_time:.2f}秒")
        
        if metrics.memory_usage > 100:  # 100MB阈值
            warnings.append(f"内存使用过多: {metrics.memory_usage:.2f}MB")
        
        if metrics.cpu_usage > 50:  # 50%阈值
            warnings.append(f"CPU使用率过高: {metrics.cpu_usage:.2f}%")
        
        if warnings:
            warning_msg = f"性能警告 [{func_name}]: " + "; ".join(warnings)
            self.logger.warning(warning_msg)
    
    def get_performance_report(self) -> Dict[str, Any]:
        """获取性能报告"""
        if not self.metrics:
            return {"error": "没有可用的性能数据"}
        
        successful_metrics = [m for m in self.metrics if m.success]
        
        if not successful_metrics:
            return {"error": "没有成功的执行记录"}
        
        return {
            "total_executions": len(self.metrics),
            "success_rate": len(successful_metrics) / len(self.metrics),
            "avg_response_time": sum(m.response_time for m in successful_metrics) / len(successful_metrics),
            "max_response_time": max(m.response_time for m in successful_metrics),
            "avg_memory_usage": sum(m.memory_usage for m in successful_metrics) / len(successful_metrics),
            "total_monitoring_duration": self.metrics[-1].timestamp - self.metrics[0].timestamp
        }

# 优化的代理类
class OptimizedAIAgent(AdvancedAIAgent):
    def __init__(self, name: str):
        super().__init__(name)
        self.performance_monitor = PerformanceMonitor()
        self.cache = {}
        self.request_timeout = 30
    
    @PerformanceMonitor.monitor_performance
    def process_with_memory_optimized(self, user_input: str) -> str:
        """优化的记忆处理"""
        # 检查缓存
        cache_key = f"response_{hash(user_input)}"
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # 原有的处理逻辑
        response = super().process_with_memory(user_input)
        
        # 缓存结果
        self.cache[cache_key] = response
        return response
    
    def clear_cache(self):
        """清空缓存"""
        self.cache.clear()
    
    def optimize_memory_usage(self):
        """优化内存使用"""
        # 清理旧的记忆
        current_time = time.time()
        self.memory_system.memories = [
            memory for memory in self.memory_system.memories
            if current_time - memory['timestamp'].timestamp() < 30 * 24 * 3600  # 保留30天内的记忆
        ]
        
        # 限制记忆数量
        if len(self.memory_system.memories) > 500:
            self.memory_system.memories = self.memory_system.memories[-500:]
        
        # 清理缓存
        if len(self.cache) > 100:
            # 保留最近使用的100个缓存项
            self.cache = dict(list(self.cache.items())[-100:])

# 使用示例
if __name__ == "__main__":
    # 设置日志
    logging.basicConfig(level=logging.INFO)
    
    agent = OptimizedAIAgent("优化助手")
    
    # 测试性能监控
    test_inputs = [
        "你好,请介绍一下人工智能",
        "计算 123 * 456 的结果",
        "什么是机器学习",
        "Python编程的最佳实践",
        "如何学习深度学习"
    ]
    
    print("=== 性能测试 ===")
    for i, input_text in enumerate(test_inputs, 1):
        print(f"处理请求 {i}: {input_text}")
        response = agent.process_with_memory_optimized(input_text)
        print(f"响应: {response[:50]}...")
        print("-" * 50)
    
    # 获取性能报告
    report = agent.performance_monitor.get_performance_report()
    print("\n=== 性能报告 ===")
    for key, value in report.items():
        print(f"{key}: {value}")
    
    # 优化内存使用
    agent.optimize_memory_usage()
    print("\n✓ 内存优化完成")

6.2 错误处理与容错机制

python

class RobustAIAgent(OptimizedAIAgent):
    def __init__(self, name: str):
        super().__init__(name)
        self.error_count = 0
        self.max_errors = 10
        self.recovery_actions = []
        self.fallback_responses = [
            "我遇到了一些技术问题,请稍后再试。",
            "系统正在维护中,请稍后联系。",
            "我目前无法处理这个请求,你可以尝试重新表述问题。",
            "服务暂时不可用,正在努力修复中。"
        ]
    
    def safe_process_request(self, user_input: str) -> str:
        """安全处理请求,包含错误恢复"""
        try:
            # 检查错误计数
            if self.error_count >= self.max_errors:
                return "系统检测到过多错误,已进入安全模式。请稍后重试。"
            
            response = self.process_with_memory_optimized(user_input)
            self.error_count = 0  # 重置错误计数
            return response
        
        except Exception as e:
            self.error_count += 1
            self.log_error(e, user_input)
            
            # 执行恢复动作
            self.execute_recovery_actions()
            
            # 返回降级响应
            return self.get_fallback_response()
    
    def log_error(self, error: Exception, user_input: str):
        """记录错误"""
        error_info = {
            'timestamp': datetime.now(),
            'error_type': type(error).__name__,
            'error_message': str(error),
            'user_input': user_input,
            'error_count': self.error_count
        }
        
        # 在实际应用中,这里会记录到日志文件或数据库
        print(f"错误记录: {error_info}")
        
        # 添加到记忆系统用于学习
        self.memory_system.add_memory(
            content=f"系统错误: {error} - 输入: {user_input}",
            memory_type="error",
            importance=0.6,
            metadata=error_info
        )
    
    def add_recovery_action(self, action: Callable, description: str):
        """添加恢复动作"""
        self.recovery_actions.append({
            'action': action,
            'description': description
        })
    
    def execute_recovery_actions(self):
        """执行恢复动作"""
        print(f"执行恢复动作 (错误计数: {self.error_count})")
        
        for recovery in self.recovery_actions:
            try:
                recovery['action']()
                print(f"✓ 执行恢复: {recovery['description']}")
            except Exception as e:
                print(f"✗ 恢复动作失败: {e}")
    
    def get_fallback_response(self) -> str:
        """获取降级响应"""
        import random
        return random.choice(self.fallback_responses)
    
    def health_check(self) -> Dict[str, Any]:
        """健康检查"""
        return {
            'status': 'healthy' if self.error_count < 5 else 'degraded',
            'error_count': self.error_count,
            'memory_usage_mb': psutil.Process().memory_info().rss / 1024 / 1024,
            'cache_size': len(self.cache),
            'memory_count': len(self.memory_system.memories),
            'uptime': time.time() - getattr(self, '_start_time', time.time())
        }

# 恢复动作示例
def clear_cache_recovery():
    """清理缓存恢复动作"""
    # 这个函数会被添加到恢复动作中
    pass

def restart_components_recovery():
    """重启组件恢复动作"""
    # 这个函数会被添加到恢复动作中
    pass

# 使用示例
if __name__ == "__main__":
    robust_agent = RobustAIAgent("健壮助手")
    
    # 添加恢复动作
    robust_agent.add_recovery_action(
        clear_cache_recovery,
        "清理缓存"
    )
    robust_agent.add_recovery_action(
        robust_agent.optimize_memory_usage,
        "优化内存使用"
    )
    
    print("=== 健壮性测试 ===")
    
    # 测试正常请求
    response = robust_agent.safe_process_request("你好")
    print(f"正常响应: {response}")
    
    # 健康检查
    health = robust_agent.health_check()
    print(f"健康状态: {health}")

总结

本教程详细介绍了AgenticAI和AI编程的各个方面,从基础概念到高级应用,涵盖了:

  1. AgenticAI基础:智能代理的核心概念和基本架构

  2. AI编程环境:Python AI开发环境的搭建和基础模型使用

  3. 智能代理系统:任务导向型代理的设计和实现

  4. 高级功能:记忆系统、工具集成、API调用

  5. 实际应用:个人学习助手的完整实现

  6. 性能优化:监控、优化和容错机制

通过详细讲解和丰富的代码示例,你应该已经掌握了构建智能AI代理系统的核心技能。这些知识可以应用于各种实际场景,如个人助手、客服系统、教育工具等。

记住,AI代理开发是一个不断迭代的过程,持续学习新的技术和方法,结合实际需求进行创新,才能构建出真正有用的AI系统。

Logo

更多推荐