理论是灰色的,而实践之树常青。上一讲我们探讨了SAP协议的设计哲学,今天让我们进入实战战场,看看如何用SAP构建真正的AI Agent系统。

引言:从协议到系统的跨越

理解了SAP协议的语法和理念,下一步就是将其落地为可运行的系统。但这个过程充满了实际挑战:如何让LLM稳定输出协议格式?如何处理并发请求?如何设计可扩展的架构?

本讲将解决三个核心问题

  1. System Prompt工程:如何让LLM成为SAP协议专家?
  2. 错误处理实战:从理论到实践的完整错误处理体系
  3. 架构设计模式:可扩展、高性能的SAP Agent运行时设计

一、System Prompt设计:让LLM成为协议专家

1.1 基础Prompt模板:清晰、明确、可执行

一个好的System Prompt需要平衡三个目标:教会规则、提供示例、防止滥用

# SAP协议基础System Prompt
system_prompt: |
  你是一个AI助手,通过SPARK Agent Protocol(SAP)与后端系统交互。
  
  ## 协议格式
  当你需要查询信息或执行操作时,使用以下格式:
  
  @@<type>:<name>#<id>
  {
    "参数1": "值1",
    "参数2": "值2"
  }
  @@end
  
  ## 消息类型
  - query: 查询信息(只读)
  - action: 执行操作(写操作)
  - describe: 查询操作详情
  - result: 系统返回结果(你不主动使用)
  - event: 系统事件(你不主动使用)
  - error: 错误信息(你不主动使用)
  
  ## 重要规则
  1. JSON必须是合法JSON
  2. 每个块必须以@@end结束
  3. 可以在自然语言中嵌入多个协议块
  4. 需要系统能力时必须使用协议
  5. 不要输出未闭合的协议块
  
  ## 可用操作示例
  - dataset.table-exists: 检查表是否存在
  - file.read: 读取文件
  - system.info: 获取系统信息
  
  ## 示例对话
  用户:检查订单表是否存在
  
  你:我需要先检查订单表。
  
  @@query:dataset.table-exists#1
  {
    "table": "orders"
  }
  @@end

1.2 高级Prompt技巧:层次化、场景化、动态化

技巧1:层次化角色定义
# 第一层:核心角色
你是一个专业的数据工程师AI助手,擅长数据库操作、文件处理和系统管理。

# 第二层:协议规范
你通过SAP协议与系统交互,协议格式为@@type:name#id {json} @@end

# 第三层:安全约束
你只允许执行有权限的操作,如果遇到权限问题,明确告知用户。

# 第四层:交互风格
你的回答应该专业但友好,在发送协议前解释你要做什么。
技巧2:场景化示例嵌入
## 场景1:数据查询
用户:查询上周的销售数据

正确做法:
我将帮您查询上周的销售数据。首先检查销售表是否存在,然后查询数据。

@@query:dataset.table-exists#1
{"table": "sales"}
@@end

## 场景2:错误处理
用户:删除不存在的文件

正确做法:
@@action:file.delete#1
{"path": "nonexistent.txt"}
@@end

如果收到RESOURCE_NOT_FOUND错误,回复用户:文件不存在,无需删除。

## 场景3:多步骤操作
用户:备份数据库并压缩

正确做法:
我需要执行多个步骤:1) 导出数据 2) 压缩文件 3) 验证结果

@@action:database.export#1
{"database": "main", "format": "sql"}
@@end

@@action:file.compress#2
{"source": "backup.sql", "destination": "backup.zip"}
@@end
技巧3:动态能力发现
# 在Prompt中明确告知AI如何发现新能力
如果你不确定某个操作是否存在,或者不知道参数格式,可以使用describe操作:

@@describe:dataset.create-table#learn_1
{}
@@end

系统会返回完整的操作说明,包括参数格式、示例和可能错误。

你也可以查询系统所有能力:

@@query:system.capabilities#discover_1
{"scope": "all"}
@@end

1.3 Prompt优化策略:Token效率与效果平衡

优化维度 策略 效果 风险
长度优化 使用缩写、精简示例 减少20-30% Token 可能损失清晰度
结构优化 分章节、使用标记 提高可读性 增加复杂度
示例优化 选择典型场景 提高学习效率 可能覆盖不全
更新策略 动态加载部分Prompt 灵活适应变化 实现复杂度高

最佳实践:采用三段式Prompt结构

# 1. 核心角色和协议定义(固定,300-500 tokens)
base_prompt = "你是AI助手,使用SAP协议..."

# 2. 当前会话上下文(动态更新)
context_prompt = f"""
当前用户:{user_name}
项目上下文:{project_context}
最近操作:{recent_actions}
"""

# 3. 能力列表(可动态发现)
capabilities_prompt = """
可用操作:
- dataset.query-data
- file.read-content
...
"""

二、错误处理实战:从防御到恢复的完整体系

2.1 错误分类与处理策略

在实战中,我们需要将理论中的错误分类转化为具体的处理逻辑:

成功

失败

有效

无效

有权限

无权限

成功

失败

接收请求

解析SAP消息

验证参数

返回解析错误

检查权限

返回验证错误

执行操作

返回权限错误

返回成功结果

分析错误类型

资源错误

执行错误

系统错误

RESOURCE_NOT_FOUND等

TIMEOUT, EXTERNAL_ERROR等

INTERNAL_ERROR等

生成可操作错误

返回结构化错误

2.2 错误处理中间件设计

一个健壮的SAP系统需要多层错误处理:

/**
 * SAP错误处理中间件链
 */
public class SapErrorHandlingChain {
    
    private final List<ErrorHandler> handlers = Arrays.asList(
        new ValidationErrorHandler(),
        new PermissionErrorHandler(), 
        new ResourceErrorHandler(),
        new ExecutionErrorHandler(),
        new SystemErrorHandler(),
        new FallbackErrorHandler()
    );
    
    /**
     * 处理异常并生成SAP错误消息
     */
    public String handleException(SapMessage request, Exception e) {
        SapError error = null;
        
        // 按顺序尝试各个处理器
        for (ErrorHandler handler : handlers) {
            if (handler.canHandle(e)) {
                error = handler.handle(request, e);
                break;
            }
        }
        
        // 确保总有错误返回
        if (error == null) {
            error = new FallbackErrorHandler().handle(request, e);
        }
        
        return error.toSapMessage();
    }
}

/**
 * 基础错误处理器
 */
public abstract class ErrorHandler {
    public abstract boolean canHandle(Exception e);
    public abstract SapError handle(SapMessage request, Exception e);
}

/**
 * 资源不存在错误处理器
 */
public class ResourceNotFoundHandler extends ErrorHandler {
    
    @Override
    public boolean canHandle(Exception e) {
        return e instanceof ResourceNotFoundException;
    }
    
    @Override
    public SapError handle(SapMessage request, Exception e) {
        ResourceNotFoundException ex = (ResourceNotFoundException) e;
        
        return SapError.builder()
            .code("RESOURCE_NOT_FOUND")
            .message(String.format("资源 '%s' 不存在", ex.getResourceName()))
            .category(ErrorCategory.RESOURCE_ERROR)
            .details(Map.of(
                "resource_type", ex.getResourceType(),
                "resource_name", ex.getResourceName(),
                "suggested_actions", getSuggestedActions(ex)
            ))
            .suggestion(generateSuggestions(ex))
            .retryable(false)
            .timestamp(Instant.now())
            .build();
    }
    
    private List<String> generateSuggestions(ResourceNotFoundException ex) {
        List<String> suggestions = new ArrayList<>();
        
        suggestions.add("检查资源名称是否正确");
        
        if (ex.getResourceType().equals("TABLE")) {
            suggestions.add("使用 dataset.list-tables 查看所有表");
            suggestions.add("或使用 dataset.create-table 创建新表");
        } else if (ex.getResourceType().equals("FILE")) {
            suggestions.add("使用 file.list 查看目录内容");
        }
        
        return suggestions;
    }
}

2.3 渐进式错误反馈实战

在实际对话中,AI需要引导用户逐步修正问题:

class ProgressiveErrorHandler:
    """渐进式错误处理"""
    
    def __init__(self):
        self.error_history = {}  # 请求ID -> 错误历史
        
    def handle_error(self, request_id, current_error, user_input=None):
        """处理错误,考虑历史上下文"""
        
        # 获取这个请求的错误历史
        history = self.error_history.get(request_id, [])
        
        # 分析错误模式
        error_pattern = self.analyze_error_pattern(history, current_error)
        
        # 生成针对性的错误信息
        if error_pattern == "MISSING_REQUIRED_FIELD":
            suggestion = self.generate_field_suggestion(history, current_error)
        elif error_pattern == "VALIDATION_FAILED":
            suggestion = self.generate_validation_suggestion(history, current_error)
        elif error_pattern == "RESOURCE_NOT_FOUND":
            suggestion = self.generate_resource_suggestion(history, current_error)
        else:
            suggestion = current_error.get("suggestion", [])
        
        # 更新历史
        history.append({
            "error": current_error,
            "timestamp": datetime.now(),
            "user_input": user_input
        })
        self.error_history[request_id] = history
        
        # 构建响应
        response = {
            "code": current_error["code"],
            "message": self.format_message(current_error, history),
            "suggestion": suggestion,
            "history_length": len(history),
            "is_first_error": len(history) == 1
        }
        
        return response
    
    def analyze_error_pattern(self, history, current_error):
        """分析错误模式"""
        if not history:
            return "FIRST_ERROR"
        
        last_error = history[-1]["error"]
        
        # 如果是同一个错误连续出现
        if last_error["code"] == current_error["code"]:
            return "REPEATED_ERROR"
        
        # 从缺少字段到验证失败
        if (last_error["code"] == "MISSING_REQUIRED_FIELD" and 
            current_error["code"] == "VALIDATION_FAILED"):
            return "PROGRESSIVE_VALIDATION"
        
        return "NEW_ERROR"
    
    def format_message(self, error, history):
        """根据历史格式化错误信息"""
        if len(history) == 1:
            return error["message"]
        elif len(history) == 2:
            return f"{error['message']} (这是第二次出现此问题)"
        else:
            return f"{error['message']} (已尝试{len(history)}次,建议检查输入格式)"

2.4 可恢复错误与重试机制

有些错误可以通过重试解决,需要明确告知AI:

{
  "code": "EXTERNAL_SERVICE_UNAVAILABLE",
  "message": "数据库连接暂时不可用",
  "category": "EXECUTION_ERROR",
  "details": {
    "service": "database",
    "error_detail": "Connection timeout after 5000ms"
  },
  "suggestion": [
    "等待30秒后重试",
    "检查数据库服务状态",
    "如果问题持续,联系系统管理员"
  ],
  "retryable": true,
  "retry_info": {
    "max_retries": 3,
    "retry_delay_ms": 30000,
    "backoff_factor": 2
  },
  "timestamp": "2024-01-15T10:30:45.123Z"
}

重试策略实现

public class RetryableErrorHandler {
    
    private static final Map<String, RetryPolicy> RETRY_POLICIES = Map.of(
        "EXTERNAL_SERVICE_UNAVAILABLE", new RetryPolicy(3, 30000, 2.0),
        "NETWORK_TIMEOUT", new RetryPolicy(5, 10000, 1.5),
        "RATE_LIMIT_EXCEEDED", new RetryPolicy(2, 60000, 1.0)
    );
    
    public boolean shouldRetry(SapError error) {
        return error.isRetryable() && 
               error.getRetryCount() < getMaxRetries(error.getCode());
    }
    
    public long calculateDelay(SapError error) {
        RetryPolicy policy = RETRY_POLICIES.get(error.getCode());
        if (policy == null) {
            return 5000; // 默认5秒
        }
        
        // 指数退避
        return (long) (policy.baseDelayMs * 
                      Math.pow(policy.backoffFactor, error.getRetryCount()));
    }
    
    static class RetryPolicy {
        int maxRetries;
        long baseDelayMs;
        double backoffFactor;
        
        RetryPolicy(int maxRetries, long baseDelayMs, double backoffFactor) {
            this.maxRetries = maxRetries;
            this.baseDelayMs = baseDelayMs;
            this.backoffFactor = backoffFactor;
        }
    }
}

三、SAP Agent运行时架构设计

3.1 整体架构:模块化、可扩展、高性能

支撑系统

服务层

技能执行层

技能注册中心

运行时核心

协议层

客户端层

扩展技能

核心技能

用户界面/聊天界面

命令行工具

HTTP API网关

SAP协议解析器

参数验证器

响应编码器

流式处理器

消息路由器

中间件链

请求上下文

指标收集

技能注册表

动态发现

版本管理

DatasetSkill

FileSkill

SystemSkill

WorkflowSkill

自定义技能1

自定义技能2

插件技能

数据库

文件系统

缓存服务

外部服务

认证授权

日志审计

监控告警

配置管理

3.2 核心组件详细设计

3.2.1 消息路由器:智能路由与负载均衡
@Component
public class SapMessageRouter {
    
    @Autowired
    private SkillRegistry skillRegistry;
    
    @Autowired
    private List<RouterMiddleware> middlewares;
    
    @Autowired
    private MetricsCollector metricsCollector;
    
    /**
     * 路由SAP消息到对应技能
     */
    public CompletableFuture<SapMessage> route(SapMessage request) {
        // 开始计时
        long startTime = System.nanoTime();
        
        // 执行前置中间件
        SapMessage processedRequest = executePreMiddlewares(request);
        
        // 提取domain
        String domain = extractDomain(processedRequest.getName());
        
        // 查找技能
        SapSkill skill = skillRegistry.findSkill(domain);
        if (skill == null) {
            return CompletableFuture.completedFuture(
                createSkillNotFoundError(request, domain)
            );
        }
        
        // 检查技能是否支持该操作
        if (!skill.supportsOperation(extractOperation(processedRequest.getName()))) {
            return CompletableFuture.completedFuture(
                createUnsupportedOperationError(request)
            );
        }
        
        // 执行技能
        return skill.execute(processedRequest)
            .thenApply(response -> {
                // 执行后置中间件
                SapMessage processedResponse = executePostMiddlewares(request, response);
                
                // 收集指标
                recordMetrics(request, processedResponse, startTime);
                
                return processedResponse;
            })
            .exceptionally(error -> {
                // 错误处理
                return handleExecutionError(request, error);
            });
    }
    
    /**
     * 执行前置中间件链
     */
    private SapMessage executePreMiddlewares(SapMessage request) {
        SapMessage current = request;
        
        for (RouterMiddleware middleware : middlewares) {
            try {
                current = middleware.preProcess(current);
            } catch (MiddlewareException e) {
                // 中间件可以中断处理链
                throw e;
            }
        }
        
        return current;
    }
    
    /**
     * 提取domain
     */
    private String extractDomain(String actionName) {
        int dotIndex = actionName.indexOf('.');
        if (dotIndex <= 0) {
            throw new InvalidActionNameException("Action name must be in format 'domain.operation'");
        }
        return actionName.substring(0, dotIndex);
    }
    
    /**
     * 记录性能指标
     */
    private void recordMetrics(SapMessage request, SapMessage response, long startTime) {
        long duration = System.nanoTime() - startTime;
        
        metricsCollector.recordRequest(
            request.getType(),
            request.getName(),
            response.getType(),
            duration
        );
    }
}
3.2.2 技能注册表:动态发现与版本管理
@Service
public class SkillRegistry {
    
    private final Map<String, SkillRegistration> registry = new ConcurrentHashMap<>();
    private final SkillDiscoveryService discoveryService;
    private final SkillVersionManager versionManager;
    
    /**
     * 注册技能
     */
    public void registerSkill(SapSkill skill, SkillMetadata metadata) {
        String domain = skill.getDomain();
        
        SkillRegistration registration = new SkillRegistration(
            skill,
            metadata,
            Instant.now(),
            SkillStatus.ACTIVE
        );
        
        registry.put(domain, registration);
        
        // 通知发现服务
        discoveryService.notifySkillRegistered(domain, metadata);
    }
    
    /**
     * 查找技能
     */
    public SapSkill findSkill(String domain) {
        SkillRegistration registration = registry.get(domain);
        if (registration == null || registration.getStatus() != SkillStatus.ACTIVE) {
            return null;
        }
        
        // 检查版本兼容性
        if (!versionManager.isCompatible(registration.getMetadata().getVersion())) {
            throw new VersionIncompatibleException(
                "Skill version is not compatible: " + 
                registration.getMetadata().getVersion()
            );
        }
        
        return registration.getSkill();
    }
    
    /**
     * 获取所有可用技能
     */
    public List<SkillMetadata> listSkills() {
        return registry.values().stream()
            .filter(reg -> reg.getStatus() == SkillStatus.ACTIVE)
            .map(SkillRegistration::getMetadata)
            .collect(Collectors.toList());
    }
    
    /**
     * 动态加载技能
     */
    public void loadSkill(String skillClass, Properties config) {
        try {
            Class<?> clazz = Class.forName(skillClass);
            SapSkill skill = (SapSkill) clazz.getDeclaredConstructor().newInstance();
            
            // 配置技能
            skill.configure(config);
            
            // 创建元数据
            SkillMetadata metadata = SkillMetadata.builder()
                .domain(skill.getDomain())
                .version("1.0.0")
                .description(skill.getDescription())
                .operations(skill.getSupportedOperations())
                .build();
            
            // 注册技能
            registerSkill(skill, metadata);
            
        } catch (Exception e) {
            throw new SkillLoadException("Failed to load skill: " + skillClass, e);
        }
    }
    
    /**
     * 技能注册信息
     */
    @Data
    @AllArgsConstructor
    private static class SkillRegistration {
        private final SapSkill skill;
        private final SkillMetadata metadata;
        private final Instant registeredAt;
        private volatile SkillStatus status;
    }
}
3.2.3 中间件系统:横切关注点统一处理
/**
 * 路由器中间件接口
 */
public interface RouterMiddleware {
    
    /**
     * 前置处理
     */
    SapMessage preProcess(SapMessage request) throws MiddlewareException;
    
    /**
     * 后置处理
     */
    default SapMessage postProcess(SapMessage request, SapMessage response) {
        return response;
    }
}

/**
 * 认证中间件
 */
@Component
@Order(10) // 执行顺序
public class AuthenticationMiddleware implements RouterMiddleware {
    
    @Autowired
    private AuthenticationService authService;
    
    @Override
    public SapMessage preProcess(SapMessage request) {
        // 从请求中提取认证信息
        String token = extractToken(request);
        
        if (token == null) {
            throw new AuthenticationException("Missing authentication token");
        }
        
        // 验证token
        UserContext user = authService.authenticate(token);
        if (user == null) {
            throw new AuthenticationException("Invalid authentication token");
        }
        
        // 将用户上下文添加到请求中
        return request.withContext("user", user);
    }
    
    private String extractToken(SapMessage request) {
        // 从header或body中提取token
        if (request.getBody() instanceof Map) {
            Map<String, Object> body = (Map<String, Object>) request.getBody();
            return (String) body.get("_token");
        }
        return null;
    }
}

/**
 * 限流中间件
 */
@Component
@Order(20)
public class RateLimitMiddleware implements RouterMiddleware {
    
    @Autowired
    private RateLimiter rateLimiter;
    
    @Override
    public SapMessage preProcess(SapMessage request) {
        String userId = getUserId(request);
        String operation = request.getName();
        
        if (!rateLimiter.tryAcquire(userId, operation)) {
            throw new RateLimitExceededException(
                "Rate limit exceeded for operation: " + operation
            );
        }
        
        return request;
    }
    
    private String getUserId(SapMessage request) {
        UserContext user = (UserContext) request.getContext("user");
        return user != null ? user.getId() : "anonymous";
    }
}

/**
 * 日志中间件
 */
@Component
@Order(30)
public class LoggingMiddleware implements RouterMiddleware {
    
    private static final Logger logger = LoggerFactory.getLogger(LoggingMiddleware.class);
    
    @Override
    public SapMessage preProcess(SapMessage request) {
        logger.info("Incoming SAP request: type={}, name={}, id={}", 
                   request.getType(), request.getName(), request.getId());
        return request;
    }
    
    @Override
    public SapMessage postProcess(SapMessage request, SapMessage response) {
        long duration = calculateDuration(request, response);
        
        logger.info("SAP request completed: name={}, id={}, duration={}ms, status={}",
                   request.getName(), request.getId(), duration, 
                   response.getType().equals("error") ? "error" : "success");
        
        return response;
    }
    
    private long calculateDuration(SapMessage request, SapMessage response) {
        // 从请求/响应中提取时间戳计算持续时间
        return 0; // 简化实现
    }
}

3.3 技能开发框架

3.3.1 技能基类:标准化接口与生命周期
/**
 * SAP技能基类
 */
public abstract class SapSkill {
    
    private final String domain;
    private final String version;
    private final List<String> supportedOperations;
    
    protected SapSkill(String domain, String version) {
        this.domain = domain;
        this.version = version;
        this.supportedOperations = discoverSupportedOperations();
    }
    
    /**
     * 执行SAP请求
     */
    public abstract CompletableFuture<SapMessage> execute(SapMessage request);
    
    /**
     * 获取技能描述
     */
    public abstract String getDescription();
    
    /**
     * 获取技能元数据
     */
    public SkillMetadata getMetadata() {
        return SkillMetadata.builder()
            .domain(domain)
            .version(version)
            .description(getDescription())
            .operations(getSupportedOperations())
            .build();
    }
    
    /**
     * 发现支持的操作用注解或配置文件
     */
    private List<String> discoverSupportedOperations() {
        List<String> operations = new ArrayList<>();
        
        // 通过反射查找带有@SapOperation注解的方法
        for (Method method : this.getClass().getDeclaredMethods()) {
            SapOperation annotation = method.getAnnotation(SapOperation.class);
            if (annotation != null) {
                operations.add(annotation.value());
            }
        }
        
        return operations;
    }
    
    /**
     * 检查是否支持某个操作
     */
    public boolean supportsOperation(String operation) {
        return supportedOperations.contains(operation);
    }
    
    /**
     * 创建成功响应
     */
    protected SapMessage createSuccess(SapMessage request, Object data) {
        Map<String, Object> body = new HashMap<>();
        body.put("status", "success");
        body.put("data", data);
        body.put("timestamp", Instant.now().toString());
        
        return new SapMessage(
            "result",
            request.getName(),
            request.getId(),
            body
        );
    }
    
    /**
     * 创建进度事件
     */
    protected SapMessage createProgressEvent(String requestId, String task, int percent) {
        Map<String, Object> body = new HashMap<>();
        body.put("task", task);
        body.put("percent", percent);
        body.put("timestamp", Instant.now().toString());
        
        return new SapMessage(
            "event",
            "progress",
            requestId,
            body
        );
    }
}
3.3.2 文件操作技能示例
/**
 * 文件操作技能实现
 */
@Service
@SapSkillMetadata(
    domain = "file",
    version = "1.1.0",
    description = "文件系统操作技能,支持读写、列表、删除等操作"
)
public class FileSkill extends SapSkill {
    
    public FileSkill() {
        super("file", "1.1.0");
    }
    
    @Override
    public String getDescription() {
        return "提供文件系统操作能力,包括读写文件、目录操作等";
    }
    
    @Override
    public CompletableFuture<SapMessage> execute(SapMessage request) {
        return CompletableFuture.supplyAsync(() -> {
            String operation = extractOperation(request.getName());
            
            switch (operation) {
                case "read":
                    return handleRead(request);
                case "write":
                    return handleWrite(request);
                case "list":
                    return handleList(request);
                case "delete":
                    return handleDelete(request);
                case "info":
                    return handleInfo(request);
                default:
                    throw new UnsupportedOperationException(
                        "Unsupported file operation: " + operation
                    );
            }
        });
    }
    
    @SapOperation("read")
    private SapMessage handleRead(SapMessage request) {
        Map<String, Object> params = (Map<String, Object>) request.getBody();
        
        // 验证参数
        validateRequired(params, "path");
        
        String path = (String) params.get("path");
        String encoding = (String) params.getOrDefault("encoding", "UTF-8");
        
        try {
            // 读取文件
            String content = Files.readString(Paths.get(path), Charset.forName(encoding));
            
            // 构建响应
            Map<String, Object> data = new HashMap<>();
            data.put("content", content);
            data.put("path", path);
            data.put("encoding", encoding);
            data.put("size", content.length());
            
            return createSuccess(request, data);
            
        } catch (IOException e) {
            throw new SkillExecutionException("Failed to read file: " + path, e);
        }
    }
    
    @SapOperation("write")
    private SapMessage handleWrite(SapMessage request) {
        Map<String, Object> params = (Map<String, Object>) request.getBody();
        
        validateRequired(params, "path", "content");
        
        String path = (String) params.get("path");
        String content = (String) params.get("content");
        boolean append = (Boolean) params.getOrDefault("append", false);
        String encoding = (String) params.getOrDefault("encoding", "UTF-8");
        
        try {
            StandardOpenOption option = append ? 
                StandardOpenOption.APPEND : StandardOpenOption.TRUNCATE_EXISTING;
            
            Files.writeString(
                Paths.get(path),
                content,
                Charset.forName(encoding),
                StandardOpenOption.CREATE,
                option
            );
            
            // 返回写入结果
            Map<String, Object> data = new HashMap<>();
            data.put("path", path);
            data.put("bytes_written", content.getBytes(encoding).length);
            data.put("append", append);
            
            return createSuccess(request, data);
            
        } catch (IOException e) {
            throw new SkillExecutionException("Failed to write file: " + path, e);
        }
    }
    
    /**
     * 验证必需参数
     */
    private void validateRequired(Map<String, Object> params, String... requiredFields) {
        for (String field : requiredFields) {
            if (!params.containsKey(field) || params.get(field) == null) {
                throw new ValidationException("Missing required parameter: " + field);
            }
        }
    }
}

3.4 性能优化实战

3.4.1 连接池与资源管理
@Component
public class ResourcePoolManager {
    
    private final Map<Class<?>, GenericObjectPool<?>> pools = new ConcurrentHashMap<>();
    
    /**
     * 获取数据库连接
     */
    public Connection getConnection() throws Exception {
        return getResource(Connection.class, () -> {
            // 创建新连接
            return dataSource.getConnection();
        });
    }
    
    /**
     * 获取泛型资源
     */
    @SuppressWarnings("unchecked")
    public <T> T getResource(Class<T> resourceType, Supplier<T> creator) throws Exception {
        GenericObjectPool<T> pool = (GenericObjectPool<T>) pools.computeIfAbsent(
            resourceType,
            k -> createPool(creator)
        );
        
        return pool.borrowObject();
    }
    
    /**
     * 归还资源
     */
    public <T> void returnResource(Class<T> resourceType, T resource) {
        @SuppressWarnings("unchecked")
        GenericObjectPool<T> pool = (GenericObjectPool<T>) pools.get(resourceType);
        
        if (pool != null) {
            pool.returnObject(resource);
        }
    }
    
    private <T> GenericObjectPool<T> createPool(Supplier<T> creator) {
        GenericObjectPoolConfig<T> config = new GenericObjectPoolConfig<>();
        config.setMaxTotal(20);
        config.setMaxIdle(10);
        config.setMinIdle(5);
        config.setTestOnBorrow(true);
        
        return new GenericObjectPool<>(new BasePooledObjectFactory<T>() {
            @Override
            public T create() throws Exception {
                return creator.get();
            }
            
            @Override
            public PooledObject<T> wrap(T obj) {
                return new DefaultPooledObject<>(obj);
            }
        }, config);
    }
}
3.4.2 缓存策略实现
@Component
public class SapResponseCache {
    
    private final Cache<String, CachedResponse> cache;
    
    public SapResponseCache() {
        this.cache = Caffeine.newBuilder()
            .maximumSize(1000)
            .expireAfterWrite(5, TimeUnit.MINUTES)
            .recordStats()
            .build();
    }
    
    /**
     * 获取缓存响应
     */
    public Optional<SapMessage> get(String cacheKey) {
        CachedResponse cached = cache.getIfPresent(cacheKey);
        if (cached == null || isStale(cached)) {
            return Optional.empty();
        }
        return Optional.of(cached.getResponse());
    }
    
    /**
     * 设置缓存
     */
    public void put(String cacheKey, SapMessage response, CacheConfig config) {
        CachedResponse cached = new CachedResponse(
            response,
            Instant.now(),
            config.getTtlSeconds()
        );
        
        cache.put(cacheKey, cached);
    }
    
    /**
     * 生成缓存键
     */
    public String generateKey(SapMessage request) {
        // 基于请求内容生成唯一键
        String requestHash = DigestUtils.md5Hex(
            request.getType() + ":" + 
            request.getName() + ":" + 
            JsonUtils.toJson(request.getBody())
        );
        
        return String.format("sap:%s:%s", request.getType(), requestHash);
    }
    
    /**
     * 检查缓存是否过期
     */
    private boolean isStale(CachedResponse cached) {
        return cached.getCachedAt()
            .plusSeconds(cached.getTtlSeconds())
            .isBefore(Instant.now());
    }
    
    @Data
    @AllArgsConstructor
    private static class CachedResponse {
        private final SapMessage response;
        private final Instant cachedAt;
        private final long ttlSeconds;
    }
    
    @Data
    public static class CacheConfig {
        private long ttlSeconds = 300; // 默认5分钟
        private boolean enabled = true;
    }
}
3.4.3 异步处理与流式响应
@Component
public class StreamResponseHandler {
    
    private final SseEmitterFactory sseEmitterFactory;
    
    /**
     * 处理流式响应
     */
    public SseEmitter handleStreamRequest(SapMessage request) {
        SseEmitter emitter = sseEmitterFactory.create();
        
        // 异步处理
        CompletableFuture.runAsync(() -> {
            try {
                // 发送开始事件
                emitter.send(createProgressEvent(request.getId(), 0));
                
                // 模拟分步处理
                for (int i = 1; i <= 10; i++) {
                    Thread.sleep(1000); // 模拟耗时操作
                    
                    // 发送进度事件
                    emitter.send(createProgressEvent(request.getId(), i * 10));
                    
                    // 检查客户端是否断开
                    if (emitter.isDisconnected()) {
                        break;
                    }
                }
                
                // 发送完成事件
                emitter.send(createCompletionEvent(request.getId()));
                emitter.complete();
                
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        
        return emitter;
    }
    
    /**
     * 创建进度事件
     */
    private SseEmitter.SseEventBuilder createProgressEvent(String requestId, int percent) {
        Map<String, Object> data = new HashMap<>();
        data.put("request_id", requestId);
        data.put("percent", percent);
        data.put("timestamp", Instant.now().toString());
        
        String eventData = String.format(
            "@@event:progress#%s\n%s\n@@end",
            requestId,
            JsonUtils.toJson(data)
        );
        
        return SseEmitter.event()
            .name("sap")
            .data(eventData);
    }
}

四、监控与可观测性

4.1 指标收集与暴露

# application-metrics.yml
management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus
  metrics:
    export:
      prometheus:
        enabled: true
    distribution:
      percentiles-histogram:
        http.server.requests: true
    tags:
      application: sap-agent-runtime
    enable:
      jvm: true
      system: true
      logback: true

# 自定义SAP指标
sap:
  metrics:
    enabled: true
    request-duration-buckets: 10,50,100,500,1000,5000
    request-size-buckets: 1024,10240,102400,1048576

4.2 关键监控指标

@Component
public class SapMetricsCollector {
    
    private final MeterRegistry meterRegistry;
    
    // 计数器
    private final Counter requestCounter;
    private final Counter errorCounter;
    
    // 计时器
    private final Timer requestTimer;
    
    // 计量器
    private final DistributionSummary requestSizeSummary;
    
    public SapMetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        // 初始化指标
        this.requestCounter = Counter.builder("sap.requests.total")
            .description("Total number of SAP requests")
            .tag("type", "all")
            .register(meterRegistry);
        
        this.errorCounter = Counter.builder("sap.errors.total")
            .description("Total number of SAP errors")
            .tag("type", "all")
            .register(meterRegistry);
        
        this.requestTimer = Timer.builder("sap.request.duration")
            .description("Time taken to process SAP requests")
            .publishPercentiles(0.5, 0.95, 0.99)
            .register(meterRegistry);
        
        this.requestSizeSummary = DistributionSummary.builder("sap.request.size")
            .description("Size of SAP requests in bytes")
            .baseUnit("bytes")
            .register(meterRegistry);
    }
    
    /**
     * 记录请求指标
     */
    public void recordRequest(SapMessage request, SapMessage response, long durationMs) {
        String type = request.getType();
        String name = request.getName();
        String status = response.getType().equals("error") ? "error" : "success";
        
        // 计数
        requestCounter.increment();
        
        if ("error".equals(status)) {
            errorCounter.increment();
        }
        
        // 计时
        requestTimer.record(durationMs, TimeUnit.MILLISECONDS);
        
        // 记录带标签的指标
        meterRegistry.counter("sap.requests.by_type", 
            "type", type,
            "name", name,
            "status", status
        ).increment();
        
        // 记录请求大小
        int requestSize = estimateRequestSize(request);
        requestSizeSummary.record(requestSize);
    }
    
    /**
     * 获取当前指标快照
     */
    public Map<String, Object> getMetricsSnapshot() {
        Map<String, Object> snapshot = new HashMap<>();
        
        // 请求统计
        snapshot.put("total_requests", requestCounter.count());
        snapshot.put("total_errors", errorCounter.count());
        snapshot.put("error_rate", errorCounter.count() / Math.max(requestCounter.count(), 1));
        
        // 延迟统计
        snapshot.put("request_duration_p50", requestTimer.takeSnapshot().percentile(0.5));
        snapshot.put("request_duration_p95", requestTimer.takeSnapshot().percentile(0.95));
        snapshot.put("request_duration_p99", requestTimer.takeSnapshot().percentile(0.99));
        
        return snapshot;
    }
}

4.3 告警规则配置

# prometheus-alerts.yml
groups:
  - name: sap_agent_alerts
    rules:
      - alert: HighErrorRate
        expr: rate(sap_errors_total[5m]) / rate(sap_requests_total[5m]) > 0.1
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "SAP Agent错误率过高"
          description: "过去5分钟错误率超过10%,当前值: {{ $value }}"
      
      - alert: HighRequestLatency
        expr: histogram_quantile(0.95, rate(sap_request_duration_seconds_bucket[5m])) > 5
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "SAP Agent请求延迟过高"
          description: "95分位请求延迟超过5秒,当前值: {{ $value }}s"
      
      - alert: SkillExecutionFailed
        expr: increase(sap_skill_errors_total{skill!=""}[5m]) > 10
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "技能执行频繁失败"
          description: "技能 {{ $labels.skill }} 在5分钟内失败超过10次"

结语:从协议到系统的完整实现

通过本讲的实战指南,我们完成了从SAP协议理论到完整系统实现的跨越。我们不仅设计了清晰的System Prompt让LLM理解协议,还构建了健壮的错误处理体系和可扩展的运行时架构。

关键收获

  1. System Prompt是协议落地的关键:需要精心设计平衡指导性和灵活性
  2. 错误处理需要结构化+渐进式:让AI能够从错误中学习并恢复
  3. 架构设计遵循关注点分离:中间件、路由器、技能注册表各司其职
  4. 性能优化需要多层次策略:从连接池到缓存再到异步处理
  5. 可观测性不可或缺:监控、指标、告警是生产系统的眼睛

但我们的探索还未结束。在下篇中,我们将:

  1. 深度对比主流框架:SAP vs LangChain vs OpenAI Function Calling
  2. 性能基准测试实战:用数据说话,看SAP的真实表现
  3. 生态建设路线图:如何构建SAP的开源生态
  4. 快速开始完整指南:从零到一的完整项目示例

无论你是想要评估SAP协议的技术决策者,还是准备将其应用于项目的开发者,下篇都将为你提供最终的决策依据和实践指南。


下一篇预告:《SAP协议系列(下):生态展望——为什么SAP是AI Agent的未来通信标准》

我们将回答最关键的问题:

  • SAP在实际性能测试中表现如何?
  • 与主流框架相比,SAP的优势和劣势是什么?
  • 如何开始你的第一个SAP项目?
  • SAP生态系统的未来发展是什么?

敬请期待最终篇!

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐