一、前情提要

        今天我们来探讨智能投资顾问系统的终极演进形态——混合式架构。如果说反应式架构是短跑选手,能够在秒级内给出快速响应;深思式架构是马拉松选手,能够进行深度的分析和规划;那么混合式架构就是十项全能运动员,它集两者之长,根据不同的场景需求智能切换工作模式,真正实现了因材施教的个性化投资顾问服务。

        在前面几篇文章的基础上,今天我们将详细介绍混合式智能投资顾问系统的架构设计与实现原理,更新技术栈改进系统流程和运行方式,通过智能路由机制,在反应式快速响应与深思熟虑式深度分析之间实现动态平衡,为不同复杂度的投资咨询需求提供最优解决方案。

二、系统演进

        从单一到融合的必然演进,在深入探讨反应式和深思式架构后,我们发现两者各有优劣:

  • 反应式架构虽然响应迅速,但分析深度有限,仅仅只适合标准化问题,难以处理复杂场景,同时个性化程度相对较低;
  • 深思式架构分析深度足够,但响应时间较长;大模型接口资源消耗大,难以支撑高并发;同时对于简单问题存在过度处理的问题
  • 混合式架构顺应而生,不是选择用快刀还是利剑,而是根据不同的使用场景,智能选择最合适的匹配方式,由此,必定会有混合式架构的诞生。

三、混合架构的设计原则

  • 适应性:根据查询特征自动选择处理路径
  • 效率性:合理分配计算资源,避免过度处理
  • 用户体验:在响应速度和分析深度间找到最佳平衡点
  • 可扩展性:支持模块化扩展和性能线性提升

四、混合式架构的核心思想

        混合式架构的核心设计理念基于情境感知的智能路由和优势资源的动态调配。不同于单一架构的局限性,混合架构通过实时评估查询复杂度、用户价值和系统资源,在反应式的速度优势与深思熟虑式的深度优势之间实现最优平衡。

1. 智能路由

        混合式架构的智能路由机制如同架构的大脑,这就像一个经验丰富的导诊台护士,能够快速判断患者的病情轻重,然后分派到不同的诊室:

  • 急诊室(反应式通道):处理简单紧急的咨询
  • 专科门诊(混合式通道):处理中等复杂度的规划
  • 专家会诊(深思式通道):处理复杂的综合性问题

2. 关联与演进

        让我们通过一个具体的例子来说明三种架构的关联,例如用户查询:"我35岁,年收入50万,有房有车无负债,现有100万存款想做投资,能承受中等风险,希望5年内资产增长50%"

反应式处理:

# 快速提取关键信息后直接返回
"建议采用平衡型投资策略,配置50%权益类资产..."

深思式处理:

# 经过深度分析后返回
"""
经过全面分析,为您制定以下投资规划:
1. 资产配置:股票40%、债券30%、基金20%、现金10%
2. 预期收益:年化8.4%
3. 风险评估:中等偏上
...(详细分析报告)
"""

混合式处理:

# 快速响应 + 逐步深度分析
"""
 快速建议:基于您的风险偏好,建议平衡型配置

 深度分析(逐步加载):
• 用户画像分析完成...
• 市场环境评估中...
• 投资策略生成...
• 风险评估优化...

最终为您生成个性化投资方案...
"""

3. 技术创新

动态负载均衡

混合式架构引入了动态负载均衡机制,能够根据系统实时状态智能调整路由策略:

class DynamicLoadBalancer:
    def adjust_routing_strategy(self, system_status):
        if system_status.reactive_load > 80%:
            # 反应式负载高,适当提升混合式门槛
            self.complexity_thresholds['hybrid'] = 0.4
        elif system_status.deliberative_load > 60%:
            # 深思式负载高,启用快速模式
            self.enable_quick_deliberative_mode()

渐进式响应机制

混合式架构创新性地提出了渐进式响应的概念:

用户查询
    ↓
即时响应(1秒内)
    ↓  "正在为您深度分析..."
初步建议(3秒内)
    ↓  "基于快速分析,建议..."
深度分析(5-8秒)
    ↓  "深度分析完成,优化建议..."
完整报告(10-15秒)
    ↓  "个性化投资方案已生成"

这种机制既保证了用户体验的流畅性,又提供了深度的分析价值。

五、系统架构设计

1. 整体架构概览

混合式智能投资顾问系统架构
├── 接入层
│   ├── 用户请求接收
│   ├── 输入验证与预处理
│   └── 会话管理
├── 智能路由层
│   ├── 复杂度评估引擎
│   ├── 用户价值分析
│   └── 路由决策器
├── 处理引擎层
│   ├── 反应式处理通道
│   ├── 混合式处理通道
│   └── 深思式处理通道
├── 数据层
│   ├── 用户画像库
│   ├── 知识库
│   └── 性能监控库
└── 输出层
    ├── 结果格式化
    ├── 风险控制
    └── 质量验证

2. 核心组件详解

2.1 智能路由引擎

职责:

  • 实时分析用户查询的复杂度特征
  • 评估用户历史价值和当前需求
  • 选择最优处理通道
  • 动态调整路由策略

关键技术:

# 复杂度评估算法
复杂度分数 = (文本长度分数 + 金融术语分数 + 数字数量分数 + 规划词汇分数) / 4

# 路由决策逻辑
if 复杂度 ≤ 0.3: 选择反应式通道
elif 复杂度 ≥ 0.7 and 用户价值 > 0.5: 选择深思式通道  
else: 选择混合式通道

2.2 多通道处理引擎

  • 反应式通道:基于规则引擎的快速匹配、标准化建议模板、极速响应(0.5-2秒)
  • 混合式通道:双线程并行处理、结果智能融合、平衡响应(2-8秒)
  • 深思式通道:多阶段深度分析、个性化策略生成、深度处理(8-15秒)

3. 状态管理与性能优化

3.1 实时状态监控系统

状态数据模型:

状态信息 = {
    'current_step': '当前主步骤名称',
    'progress': 进度百分比,
    'architecture': '当前使用架构',
    'user_id': '用户标识',
    'current_query': '当前查询内容',
    'sub_steps': ['子步骤1', '子步骤2', ...]
}

监控维度:

  • 处理进度:实时显示完成百分比
  • 步骤跟踪:清晰标识当前执行阶段
  • 资源使用:监控各通道负载情况
  • 性能指标:响应时间和成功率统计

3.2 性能优化策略

负载均衡机制:

  • 基于系统负载动态调整路由阈值
  • 高负载时适当提升反应式处理比例
  • 资源竞争时的优先级调度

弹性伸缩策略:

if 反应式QPS > 阈值:
    提升混合式处理门槛
    启用缓存加速机制
    
if 深思式并发 > 限制:
    启用快速深思模式
    调整超时时间设置
    
if 系统负载 > 容量:
    临时拒绝低优先级请求
    启用服务降级策略

多级缓存体系:

  • 查询结果缓存:相同查询的直接返回
  • 用户画像缓存:活跃用户数据的快速访问
  • 策略模板缓存:常用投资策略的预生成
  • 市场数据缓存:实时市场信息的本地缓存

六、系统运行流程

1. 流程图

2. 智能路由分析

2.1 复杂度评估维度

文本特征分析:

  • 长度因子:查询文本长度与信息密度
  • 专业术语:金融投资领域专业词汇出现频率
  • 数字特征:金额、年限、比例等数值信息
  • 规划词汇:目标、计划、规划等前瞻性词汇

计算模型:

复杂度分数  =   文本长度分数 × 0.25 +  金融术语分数 × 0.35 +   数字数量分数 × 0.25 +  规划词汇分数 × 0.15

2.2 用户价值评估

评估指标:

  • 交互频率:历史查询次数和活跃度
  • 查询质量:平均复杂度和需求深度
  • 架构偏好:历史处理通道选择模式
  • 价值潜力:基于用户画像的长期价值预估

3. 多通道处理详细流程

3.1 反应式处理通道

执行序列:

  • 1. 快速意图识别
    • 关键词提取和分类
    • 意图优先级排序
    • 紧急程度判断
  • 2. 规则引擎匹配
    • 业务规则条件判断
    • 模板选择和建议生成
    • 风险提示自动添加
  • 3. 快速建议生成
    • 标准化建议填充
    • 个性化参数调整
    • 输出格式化和美化

3.2 混合式处理通道

并行处理架构:

混合式处理流程:
├── 反应式线程(快速通道)
│   ├── 即时意图分析
│   ├── 基础建议生成
│   └── 快速结果返回
├── 深思式线程(快速深度)
│   ├── 简化用户分析
│   ├── 关键因素评估
│   └── 优化建议生成
└── 结果融合引擎
    ├── 建议优先级排序
    ├── 冲突检测和解决
    └── 最终结果整合

架构优势:

  • 线程安全:确保并行处理的数据一致性
  • 超时控制:防止单一线程阻塞整体流程
  • 资源平衡:合理分配CPU和内存资源
  • 优雅降级:在部分服务不可用时保持基本功能

3.3 深思式处理通道

五阶段深度分析:

阶段一:深度用户画像

  • 人口统计特征分析
  • 财务状况全面评估
  • 风险偏好精确测量
  • 投资目标明确化

阶段二:市场环境分析

  • 宏观经济周期判断
  • 行业发展趋势评估
  • 市场估值水平分析
  • 政策环境影响考量

阶段三:多策略生成

  • 风险收益特征建模
  • 资产配置方案设计
  • 投资组合优化
  • 备选策略比较

阶段四:风险评估优化

  • 风险承受能力匹配
  • 压力测试情景分析
  • 风险控制措施制定
  • 应急预案准备

阶段五:个性化建议

  • 完整投资建议书生成
  • 实施计划详细制定
  • 后续跟踪方案设计
  • 风险提示完整披露

七、示例:混合式智能投资顾问系统

1. 系统概述

        本系统实现了智能投资顾问的混合式架构,通过智能路由机制在反应式快速响应,与深思式深度分析之间动态选择最优处理路径。

核心特性:

  • 1. 智能路由决策 - 基于查询复杂度自动选择处理通道
  • 2. 多通道并行处理 - 支持反应式、混合式、深思式三种处理模式
  • 3. 实时状态监控 - 可视化展示系统运行状态
  • 4. 用户画像学习 - 基于历史交互持续优化服务

2. 代码分解

2.1 混合式智能投资顾问主类

class HybridInvestmentAdvisor:
    """

    职责:协调整个系统的运行,包括路由决策、多通道处理、状态管理和可视化
       设计模式:Facade模式 + Strategy模式
    - Facade:对外提供统一的process_query接口
    - Strategy:不同的处理通道采用不同的策略
    """

2.2 初始化混合式顾问系统

def __init__(self):
        # 初始化各处理通道
        self.reactive_advisor = ReactiveAdvisor()      # 反应式处理组件
        self.deliberative_advisor = DeliberativeAdvisor()  # 深思式处理组件
        self.router = IntelligentRouter()              # 智能路由引擎
        
        # 数据存储结构
        self.user_profiles = {}                        # 用户画像:{user_id: profile_data}
        self.visualization_data = []                   # 可视化性能数据
        self.animation_frames = []                     # 动画帧序列
        
        # 系统当前状态(用于实时监控)
        self.current_status = {
            'step': '等待输入',           # 当前主步骤名称
            'progress': 0,                # 进度百分比(0-100)
            'current_architecture': None, # 当前使用的架构类型
            'current_user': None,         # 当前处理的用户ID
            'current_query': None,        # 当前处理的查询内容
            'sub_steps': []               # 当前步骤的子步骤列表
        }

 组件说明:

  • - reactive_advisor: 处理简单快速查询
  • - deliberative_advisor: 处理复杂深度分析  
  • - router: 智能路由决策引擎
  • - user_profiles: 用户画像存储
  • - visualization_data: 性能监控数据
  • - current_status: 当前系统状态(用于可视化)
  • - animation_frames: 动画帧序列(用于GIF生成)

2.3 处理用户查询的主入口方法

def process_query(self, user_id: str, user_input: str) -> Dict[str, Any]:

        # 阶段1: 初始化状态 - 设置开始处理状态
        self._update_status('开始处理', 0, user_id, user_input, [])
        self._capture_frame()  # 捕获初始状态帧
        
        start_time = time.time()  # 开始计时
        
        # 阶段2: 智能路由分析 - 决定使用哪种处理通道
        self._update_status('路由分析', 20, user_id, user_input, 
                           ['接收用户输入', '分析查询复杂度'])
        self._capture_frame()  # 捕获路由分析帧
        
        print(f"\n 开始处理用户 {user_id} 的查询: {user_input}")
        
        # 调用路由引擎分析查询复杂度并选择处理架构
        route_info = self.router.analyze_route(user_input, self.user_profiles.get(user_id, {}))
        print(f"   路由决策: {route_info['architecture']} (复杂度: {route_info['complexity_score']:.2f})")
        
        # 阶段3: 基于路由决策执行相应处理
        self._update_status('执行处理', 50, user_id, user_input, 
                           ['路由分析完成', f'选择{route_info["architecture"]}模式', '开始处理'])
        self._capture_frame()  # 捕获处理开始帧
        
        # 根据路由结果选择处理通道
        if route_info['architecture'] == 'reactive':
            result = self._reactive_processing(user_id, user_input)
        elif route_info['architecture'] == 'deliberative':
            result = self._deliberative_processing(user_id, user_input)
        else:  # hybrid
            result = self._hybrid_processing(user_id, user_input)
        
        # 阶段4: 结果处理和输出
        self._update_status('生成结果', 80, user_id, user_input, 
                           ['处理完成', '生成最终建议', '格式化输出'])
        self._capture_frame()  # 捕获结果生成帧
        
        # 计算处理时间并添加到结果中
        processing_time = time.time() - start_time
        result['processing_time'] = f"{processing_time:.2f}秒"
        result['architecture_used'] = route_info['architecture']
        
        # 更新用户画像和性能数据
        self._update_user_profile(user_id, user_input, result, route_info)
        
        # 记录本次处理的性能数据
        self.visualization_data.append({
            'timestamp': datetime.now(),
            'user_id': user_id,
            'architecture': route_info['architecture'],
            'complexity': route_info['complexity_score'],
            'processing_time': processing_time,
            'input_length': len(user_input)
        })
        
        # 阶段5: 完成处理
        self._update_status('完成', 100, user_id, user_input, ['所有步骤完成', '准备下一个查询'])
        self._capture_frame()  # 捕获完成帧
        
        return result

执行流程:

  • 1. 状态初始化 → 2. 路由分析 → 3. 通道处理 → 4. 结果生成 → 5. 数据更新          

输入参数:

  • - user_id: 用户标识,用于个性化服务和画像更新
  • - user_input: 用户输入的查询文本

返回结果:

  • - 包含建议内容、处理时间、使用架构的字典

2.4 反应式处理通道

def _reactive_processing(self, user_id: str, user_input: str) -> Dict[str, Any]:

        # 设置当前架构类型(用于可视化高亮)
        self.current_status['current_architecture'] = 'reactive'
        
        # 定义反应式处理的步骤序列
        steps = ['调用Qwen API', '快速意图分析', '规则匹配', '生成快速建议']
        
        # 逐步执行每个步骤并捕获动画帧
        for i, step in enumerate(steps):
            self._update_sub_steps(steps[:i+1])  # 更新当前完成的子步骤
            self._capture_frame()  # 捕获每个步骤的帧
            time.sleep(0.3)  # 模拟处理时间(实际系统中为真实处理)
        
        print(" 执行反应式处理...")
        # 调用反应式顾问处理查询
        result = self.reactive_advisor.process(user_input)
        
        # 标记处理完成
        self._update_sub_steps(['反应式处理完成'])
        self._capture_frame()
        return result
  • 特点:快速响应,适合简单查询
  • 处理时间:0.5-2秒
  • 适用场景:基金查询、风险评估、简单问答

2.5 混合式处理通道

def _hybrid_processing(self, user_id: str, user_input: str) -> Dict[str, Any]:

        self.current_status['current_architecture'] = 'hybrid'
        
        # 混合式处理的步骤序列
        steps = ['启动反应式线程', '启动深思式线程', '并行处理中', '结果融合']
        
        for i, step in enumerate(steps):
            self._update_sub_steps(steps[:i+1])
            self._capture_frame()
            time.sleep(0.3)
        
        print(" 执行混合式并行处理...")
        
        def reactive_task():
            """反应式处理线程任务"""
            self._update_sub_steps(['反应式线程运行中'])
            self._capture_frame()
            return self.reactive_advisor.process(user_input)
        
        def deliberative_task():
            """深思式快速分析线程任务"""  
            self._update_sub_steps(['深思式线程运行中'])
            self._capture_frame()
            return self.deliberative_advisor.quick_analysis(user_input)
        
        # 使用线程池并行执行两个任务
        with ThreadPoolExecutor(max_workers=2) as executor:
            # 提交任务到线程池
            reactive_future = executor.submit(reactive_task)
            deliberative_future = executor.submit(deliberative_task)
            
            # 获取处理结果(设置超时防止阻塞)
            reactive_result = reactive_future.result(timeout=3.0)
            deliberative_result = deliberative_future.result(timeout=8.0)
        
        # 融合两个通道的结果
        self._update_sub_steps(['结果融合中'])
        self._capture_frame()
        result = self._fuse_results(reactive_result, deliberative_result)
        self._update_sub_steps(['混合处理完成'])
        self._capture_frame()
        
        return result
  • 特点:并行处理 + 结果融合,平衡速度与深度
  • 处理时间:2-8秒  
  • 适用场景:中等复杂度咨询、资产配置

2.6 融合反应式和深思式处理结果

def _fuse_results(self, reactive_result: Dict, deliberative_result: Dict) -> Dict[str, Any]:

        return {
            'type': 'hybrid_advice',
            'quick_advice': reactive_result.get('advice', ''),           # 反应式的快速建议
            'detailed_analysis': deliberative_result.get('analysis', ''), # 深思式的深度分析
            'recommended_strategy': deliberative_result.get('strategy', {}), # 投资策略
            'risk_assessment': deliberative_result.get('risk', {}),      # 风险评估
            'fusion_note': '本建议融合了快速响应和深度分析的优点'         # 融合说明
        }
  • 策略:取反应式的快速建议 + 深思式的深度分析
  • 形成既有速度又有深度的综合建议

2.7 更新用户画像

def _update_user_profile(self, user_id: str, user_input: str, result: Dict, route_info: Dict):
        # 初始化新用户画像
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = {
                'query_count': 0,           # 查询次数
                'avg_complexity': 0,        # 平均复杂度
                'preferred_architecture': {}, # 偏好架构统计
                'last_interaction': None    # 最后交互时间
            }
        
        profile = self.user_profiles[user_id]
        profile['query_count'] += 1
        profile['last_interaction'] = datetime.now()
        
        # 更新平均复杂度(滑动平均)
        old_avg = profile['avg_complexity']
        profile['avg_complexity'] = (
            (old_avg * (profile['query_count'] - 1) + route_info['complexity_score']) 
            / profile['query_count']
        )
        
        # 更新架构使用统计
        arch = route_info['architecture']
        if arch not in profile['preferred_architecture']:
            profile['preferred_architecture'][arch] = 0
        profile['preferred_architecture'][arch] += 1
  • 基于本次交互更新用户特征,用于后续个性化服务优化

2.8 更新系统当前状态

def _update_status(self, step: str, progress: int, user_id: str, query: str, sub_steps: List[str]):

        self.current_status = {
            'step': step,
            'progress': progress,
            'current_architecture': self.current_status.get('current_architecture'),
            'current_user': user_id,
            'current_query': query,
            'sub_steps': sub_steps
        }
  • 用于实时监控和可视化显示

2.9 更新子步骤列表

def _update_sub_steps(self, sub_steps: List[str]):

        self.current_status['sub_steps'] = sub_steps
  • 用于显示当前正在执行的具体操作

2.10 捕获当前系统状态为动画帧

def _capture_frame(self):
        # 创建当前状态的可视化图像
        fig = self._create_visualization_frame()
        
        # 将图像保存到内存缓冲区
        buf = io.BytesIO()
        fig.savefig(buf, format='png', dpi=100, bbox_inches='tight')
        buf.seek(0)
        
        # 将图像帧添加到序列中
        self.animation_frames.append(Image.open(buf))
        plt.close(fig)  # 关闭图像释放内存
  • 将当前可视化状态保存为图像帧,用于生成GIF动画

2.11 创建系统状态可视化帧

def _create_visualization_frame(self):

        # 创建3x2的子图布局
        fig, ((ax1, ax2), (ax3, ax4), (ax5, ax6)) = plt.subplots(3, 2, figsize=(16, 12))
        fig.suptitle('混合式智能投资顾问 - 实时运行监控', fontsize=16, fontweight='bold')
        
        # 隐藏最后一个子图(保持布局对称)
        ax6.axis('off')
        
        # 绘制各个监控面板
        self._draw_architecture_overview(ax1)    # 架构概览面板
        self._draw_processing_flow(ax2)          # 处理流程面板
        self._draw_current_status(ax3)           # 当前状态面板
        self._draw_performance_metrics(ax4)      # 性能指标面板
        self._draw_qwen_usage(ax5)               # Qwen使用情况面板
        
        plt.tight_layout()
        return fig

生成包含5个监控面板的完整可视化界面:

  • 1. 架构概览 2. 处理流程 3. 当前状态 4. 性能指标 5. Qwen使用情况

2.12 绘制架构概览面板

def _draw_architecture_overview(self, ax):
        ax.set_title(' 混合架构概览', fontweight='bold', fontsize=12)
        
        # 定义三种处理通道的显示信息
        modules = [
            ('反应式\n0.5-2秒', 0.2, 0.7, '#4CAF50', 'reactive'),
            ('混合式\n2-8秒', 0.5, 0.7, '#2196F3', 'hybrid'), 
            ('深思式\n8-15秒', 0.8, 0.7, '#FF9800', 'deliberative')
        ]
        
        current_arch = self.current_status.get('current_architecture')
        
        # 绘制每个架构模块
        for name, x, y, color, arch_type in modules:
            # 当前使用的架构高亮显示
            if arch_type == current_arch:
                edge_color = 'red'
                edge_width = 3
                alpha = 1.0
            else:
                edge_color = 'black'
                edge_width = 1
                alpha = 0.7
            
            # 绘制架构圆圈
            circle = Circle((x, y), 0.08, fill=True, color=color, 
                          alpha=alpha, edgecolor=edge_color, linewidth=edge_width)
            ax.add_patch(circle)
            ax.text(x, y, name, ha='center', va='center', 
                   fontsize=9, fontweight='bold', color='white')
        
        # 绘制连接线
        ax.plot([0.3, 0.45], [0.7, 0.7], 'k-', alpha=0.5, linewidth=2)
        ax.plot([0.55, 0.7], [0.7, 0.7], 'k-', alpha=0.5, linewidth=2)
        
        # 绘制路由引擎
        router_circle = Circle((0.5, 0.3), 0.1, fill=True, color='lightgray',
                             alpha=0.8, edgecolor='blue', linewidth=2)
        ax.add_patch(router_circle)
        ax.text(0.5, 0.3, '智能路由\n引擎', ha='center', va='center', 
               fontsize=9, fontweight='bold')
        
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
        ax.axis('off')
  • 显示三种处理通道和智能路由引擎的关系
  • 当前使用的架构会高亮显示

2.13 绘制处理流程面板

def _draw_processing_flow(self, ax):

        ax.set_title(' 实时处理流程', fontweight='bold', fontsize=12)
        
        # 定义处理流程的各个阶段
        stages = [
            (0.1, 0.8, '用户输入', '#E3F2FD', 0),
            (0.3, 0.8, '路由分析', '#BBDEFB', 20),
            (0.5, 0.8, '执行处理', '#90CAF9', 50),
            (0.7, 0.8, '生成结果', '#64B5F6', 80),
            (0.9, 0.8, '完成输出', '#42A5F5', 100)
        ]
        
        current_progress = self.current_status.get('progress', 0)
        
        # 绘制每个处理阶段
        for i, (x, y, name, color, stage_progress) in enumerate(stages):
            # 已完成或正在进行的阶段高亮显示
            if current_progress >= stage_progress:
                fill_color = color
                text_color = 'black'
                alpha = 1.0
            else:
                fill_color = 'lightgray'
                text_color = 'gray'
                alpha = 0.5
            
            # 绘制阶段方块
            rect = FancyBboxPatch((x-0.08, y-0.05), 0.16, 0.1,
                                boxstyle="round,pad=0.02", 
                                facecolor=fill_color, edgecolor='black', alpha=alpha)
            ax.add_patch(rect)
            ax.text(x, y, name, ha='center', va='center', 
                   fontsize=8, fontweight='bold', color=text_color)
            
            # 绘制阶段间的箭头
            if i < len(stages) - 1:
                arrow_color = 'green' if current_progress > stage_progress else 'gray'
                ax.annotate('', xy=(x+0.1, y), xytext=(x+0.18, y),
                           arrowprops=dict(arrowstyle='->', lw=2, color=arrow_color))
        
        # 绘制进度条
        ax.add_patch(Rectangle((0.1, 0.6), 0.8, 0.05, fill=False, edgecolor='black'))
        ax.add_patch(Rectangle((0.1, 0.6), 0.8 * (current_progress/100), 0.05, 
                             facecolor='green', alpha=0.7))
        ax.text(0.5, 0.55, f'进度: {current_progress}%', ha='center', va='center', 
               fontsize=10, fontweight='bold')
        
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
        ax.axis('off')
  • 显示当前处理的进度和阶段状态

2.14 绘制当前状态面板

def _draw_current_status(self, ax):

        ax.set_title(' 当前执行状态', fontweight='bold', fontsize=12)
        
        status = self.current_status
        
        # 显示当前主步骤
        ax.text(0.05, 0.9, f'当前步骤: {status["step"]}', fontsize=11, fontweight='bold',
               bbox=dict(boxstyle="round,pad=0.3", facecolor='lightyellow'))
        
        # 显示用户和架构信息
        ax.text(0.05, 0.75, f'用户: {status["current_user"] or "等待中"}', fontsize=9)
        ax.text(0.05, 0.70, f'架构: {status["current_architecture"] or "未选择"}', fontsize=9)
        
        # 显示当前查询(截断长文本)
        if status["current_query"]:
            query_display = status["current_query"][:25] + "..." if len(status["current_query"]) > 25 else status["current_query"]
            ax.text(0.05, 0.65, f'查询: {query_display}', fontsize=8, style='italic')
        
        # 显示子步骤执行情况
        ax.text(0.05, 0.55, '执行子步骤:', fontsize=10, fontweight='bold')
        for i, sub_step in enumerate(status["sub_steps"]):
            y_pos = 0.5 - i * 0.07
            # 当前正在执行的子步骤用箭头标识
            if i == len(status["sub_steps"]) - 1 and status["progress"] < 100:
                ax.text(0.08, y_pos, f' {sub_step}', fontsize=8, color='blue', fontweight='bold')
            else:
                ax.text(0.08, y_pos, f'✓ {sub_step}', fontsize=8, color='green')
        
        # 显示Qwen API状态
        ax.text(0.05, 0.2, ' Qwen API状态: 运行中', fontsize=9, 
               bbox=dict(boxstyle="round,pad=0.2", facecolor='lightgreen'))
        
        # 显示更新时间
        current_time = datetime.now().strftime('%H:%M:%S')
        ax.text(0.7, 0.1, f'更新时间: {current_time}', fontsize=7, style='italic')
        
        ax.set_xlim(0, 1)
        ax.set_ylim(0, 1)
        ax.axis('off')
  • 显示详细的系统运行状态信息

2.15 绘制性能指标面板

def _draw_performance_metrics(self, ax):

        ax.set_title(' 性能指标监控', fontweight='bold', fontsize=11)
        
        # 检查是否有数据可显示
        if not self.visualization_data:
            ax.text(0.5, 0.5, '等待数据...', ha='center', va='center', fontsize=10)
            ax.set_xlim(0, 1)
            ax.set_ylim(0, 1)
            ax.axis('off')
            return
        
        # 提取架构使用数据
        data = self.visualization_data
        architectures = [d['architecture'] for d in data]
        
        # 定义架构类型和颜色
        arch_types = ['reactive', 'hybrid', 'deliberative']
        arch_colors = ['#4CAF50', '#2196F3', '#FF9800']
        
        # 统计各架构使用次数
        counts = [architectures.count(arch) for arch in arch_types]
        
        # 绘制柱状图
        bars = ax.bar(arch_types, counts, color=arch_colors, alpha=0.7)
        ax.set_ylabel('使用次数')
        ax.set_xlabel('架构类型')
        
        # 在柱子上显示具体数值
        for bar, count in zip(bars, counts):
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height + 0.1,
                   f'{count}', ha='center', va='bottom', fontweight='bold', fontsize=9)
  • 显示各架构使用频率的统计信息

2.16 绘制Qwen使用情况面板

def _draw_qwen_usage(self, ax):

        ax.set_title(' Qwen API 使用情况', fontweight='bold', fontsize=11)
        
        if not self.visualization_data:
            ax.text(0.5, 0.5, '等待数据...', ha='center', va='center', fontsize=9)
            ax.set_xlim(0, 1)
            ax.set_ylim(0, 1)
            return
        
        # 模拟Qwen API调用统计(实际系统中从API监控获取)
        qwen_calls = len(self.visualization_data) * 2  # 假设每次查询平均2次API调用
        
        # 绘制使用情况饼图
        labels = ['API调用成功', '剩余额度']
        sizes = [min(qwen_calls, 80), max(100 - qwen_calls, 0)]
        colors = ['#66BB6A', '#EF5350']
        
        wedges, texts, autotexts = ax.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%',
                                         startangle=90, textprops={'fontsize': 8})
        
        # 设置百分比文本样式
        for autotext in autotexts:
            autotext.set_color('white')
            autotext.set_fontweight('bold')
        
        ax.set_aspect('equal')
  • 显示API调用情况的饼图

2.17 保存动画为GIF文件

def save_animation_gif(self, filename: str = 'hybrid_advisor_animation.gif'):

        if not self.animation_frames:
            print(" 没有可用的动画帧")
            return
        
        print(f" 正在生成GIF动画,共{len(self.animation_frames)}帧...")
        
        # 使用PIL库保存GIF动画
        self.animation_frames[0].save(
            filename,
            save_all=True,
            append_images=self.animation_frames[1:],
            duration=500,  # 每帧显示500毫秒
            loop=0,       # 无限循环
            optimize=True # 优化文件大小
        )
        
        print(f" GIF动画已保存为: {filename}")
        return filename
  • 将捕获的动画帧序列保存为GIF动画,用于演示系统运行流程

2.18 智能路由引擎

class IntelligentRouter:

    def __init__(self):
        # 设置复杂度阈值
        self.complexity_thresholds = {'reactive': 0.3, 'hybrid': 0.7, 'deliberative': 0.3}
    
    def analyze_route(self, user_input: str, user_profile: Dict) -> Dict[str, Any]:
        """
        分析查询并选择处理通道
        ====================
        基于查询复杂度和用户价值做出路由决策
        """
        # 计算查询复杂度分数
        complexity_score = self._assess_complexity(user_input)
        # 评估用户价值
        user_value_score = self._assess_user_value(user_profile)
        
        # 路由决策逻辑
        if complexity_score <= self.complexity_thresholds['reactive']:
            architecture = 'reactive'  # 简单查询使用反应式
        elif complexity_score >= self.complexity_thresholds['deliberative'] and user_value_score > 0.5:
            architecture = 'deliberative'  # 复杂查询且高价值用户使用深思式
        else:
            architecture = 'hybrid'  # 其他情况使用混合式
        
        return {
            'architecture': architecture,
            'complexity_score': complexity_score,
            'user_value_score': user_value_score
        }
    
    def _assess_complexity(self, user_input: str) -> float:
        """
        评估查询复杂度
        ============
        基于多个维度计算查询的复杂程度
        """
        factors = {
            'length': min(len(user_input) / 100, 1.0),  # 文本长度
            'financial_terms': min(self._count_financial_terms(user_input) / 5, 1.0),  # 金融术语
            'numbers': min(len(re.findall(r'\d+', user_input)) / 3, 1.0),  # 数字数量
        }
        # 返回平均复杂度分数
        return sum(factors.values()) / len(factors)
    
    def _count_financial_terms(self, text: str) -> int:
        """统计文本中的金融术语数量"""
        terms = ['基金', '股票', '债券', '投资', '收益', '风险', '配置', '组合', '资产', '理财']
        return sum(1 for term in terms if term in text)
    
    def _assess_user_value(self, user_profile: Dict) -> float:
        """评估用户价值(基于历史交互)"""
        if not user_profile:
            return 0.5  # 新用户默认价值
        query_count = user_profile.get('query_count', 0)
        return min(query_count / 10, 1.0)  # 基于查询次数计算价值

职责:分析用户查询的复杂度,智能选择最优处理通道

路由策略:

  • - 简单查询(≤0.3) → 反应式通道
  • - 复杂查询(≥0.7) + 高价值用户 → 深思式通道  
  • - 其他情况 → 混合式通道

2.19 反应式投资顾问

class ReactiveAdvisor:
    
    def process(self, user_input: str) -> Dict[str, Any]:
        """处理简单查询,快速返回建议"""
        time.sleep(0.5)  # 模拟处理时间
        advice = "基于快速分析,建议您根据个人风险承受能力进行多元化资产配置。"
        return {
            'type': 'reactive_advice',
            'advice': advice,
            'response_time': '快速响应'
        }
  • 特点:快速响应,适合简单查询
  • 处理时间:0.5-2秒

2.20 深思式投资顾问  

class DeliberativeAdvisor:
    
    def process(self, user_input: str) -> Dict[str, Any]:
        """深度处理复杂查询"""
        time.sleep(1)  # 模拟深度分析时间
        analysis = "经过深度分析,我们建议采用平衡型投资策略,重点关注长期价值投资。"
        return {
            'type': 'deliberative_advice',
            'analysis': analysis,
            'strategy': {
                'name': '个性化资产配置策略',
                'allocation': {'股票': '40%', '债券': '35%', '基金': '20%', '现金': '5%'}
            }
        }
    
    def quick_analysis(self, user_input: str) -> Dict[str, Any]:
        """快速深度分析(混合模式使用)"""
        time.sleep(0.8)  # 较快的深度分析
        return {
            'type': 'quick_deliberative',
            'analysis': "快速深度分析结果:基于关键因素评估的优化建议",
            'strategy': {
                'name': '优化配置策略',
                'allocation': {'股票': '45%', '债券': '30%', '基金': '20%', '现金': '5%'}
            }
        }
  • 特点:深度分析,适合复杂规划
  • 处理时间:8-15秒

2.21 创建混合式顾问实例

def create_detailed_animation():

    print("=== 混合式智能投资顾问 ===")
    print("正在创建详细的执行流程动画...\n")
    
    # 创建混合式顾问实例
    advisor = HybridInvestmentAdvisor()
    
    # 测试用例 - 覆盖所有处理通道
    test_cases = [
        ("基金风险大吗?", "user_001"),                      # 简单查询 → 反应式
        ("有20万怎么投资?", "user_002"),                    # 中等查询 → 混合式  
        ("我35岁,年收入50万,如何规划退休投资?", "user_003"), # 复杂查询 → 深思式
    ]
    
    print(" 开始执行测试用例...")
    
    # 执行所有测试用例
    for i, (query, user_id) in enumerate(test_cases, 1):
        print(f"\n 执行测试案例 {i}: {query}")
        print("-" * 50)
        
        # 处理查询并捕获动画帧
        result = advisor.process_query(user_id, query)
        
        print(f" 处理完成!")
        print(f"   使用架构: {result['architecture_used']}")
        print(f"   处理时间: {result['processing_time']}")
        
        # 添加结果展示帧
        for _ in range(2):
            advisor._capture_frame()
    
    print(f"\n 收集到 {len(advisor.animation_frames)} 个动画帧")
    
    
    # 显示执行统计信息
    print("\n执行统计:")
    print("=" * 40)
    for user_id, profile in advisor.user_profiles.items():
        print(f"用户 {user_id}:")
        print(f"  查询次数: {profile['query_count']}")
        print(f"  平均复杂度: {profile['avg_complexity']:.2f}")
        print(f"  架构偏好: {profile['preferred_architecture']}")
    
    return gif_filename

2.22 主程序入口

if __name__ == "__main__":

   create_detailed_animation()

3. 输出结果

开始处理用户 user_001 的查询: 简单查询:基金风险大吗?
   路由决策: reactive (复杂度: 0.13)
 执行反应式处理...
 处理完成!
   使用架构: reactive
   处理时间: 7.23秒  

 执行测试案例 2: 中等查询:有20万怎么投资?
--------------------------------------------------

 开始处理用户 user_002 的查询: 中等查询:有20万怎么投资?
   路由决策: reactive (复杂度: 0.17)
 执行反应式处理...
 处理完成!
   使用架构: reactive
   处理时间: 7.58秒  

 执行测试案例 3: 复杂查询:我35岁,年收入50万,如何规划退休投资?
--------------------------------------------------

 开始处理用户 user_003 的查询: 复杂查询:我35岁,年收入50万,如何规划退休投资?
   路由决策: hybrid (复杂度: 0.45)
 执行混合式并行处理...

=== 混合式智能投资顾问生成 ===
正在创建详细的执行流程动画...

 开始执行测试用例...

 执行测试案例 1: 基金风险大吗?
--------------------------------------------------
 开始处理用户 user_001 的查询: 基金风险大吗?
   路由决策: reactive (复杂度: 0.16)
 执行反应式处理...
 处理完成!
   使用架构: reactive
   处理时间: 5.95秒

 执行测试案例 2: 有20万怎么投资?
--------------------------------------------------

 开始处理用户 user_002 的查询: 有20万怎么投资?
   路由决策: reactive (复杂度: 0.21)
 执行反应式处理...
 处理完成!
   使用架构: reactive
   处理时间: 6.58秒

 执行测试案例 3: 我35岁,年收入50万,如何规划退休投资?
--------------------------------------------------

 开始处理用户 user_003 的查询: 我35岁,年收入50万,如何规划退休投资?
   路由决策: hybrid (复杂度: 0.36)
 执行混合式并行处理...
 处理完成!
   使用架构: hybrid
   处理时间: 8.09秒

 执行统计:
========================================
用户 user_001:
  查询次数: 1
  平均复杂度: 0.16
  架构偏好: {'reactive': 1}
用户 user_002:
  查询次数: 1
  平均复杂度: 0.21
  架构偏好: {'reactive': 1}
用户 user_003:
  查询次数: 1
  平均复杂度: 0.36
  架构偏好: {'hybrid': 1}

生成完成!

八、总结

        混合式架构不是简单的技术堆砌,而是基于深刻业务理解的架构创新,开创性的智能路由机制,在速度与深度间找到平衡点,智能分配计算资源,避免不必要的深度处理,用户体验升级,个性化提升,使用便捷。

        混合式架构代表了智能投顾技术发展的成熟阶段,它告诉我们:最好的技术方案不是非此即彼的选择,而是恰到好处的融合。在这种架构下,我们既能够给用户闪电般的响应速度,又能够提供专家级的深度分析,真正实现了智能投资顾问的终极目标,让专业的投资建议触手可及。

Logo

更多推荐