SAP协议系列(中):实战指南——手把手构建AI Agent通信系统
SAP协议实战:构建AI Agent系统的核心要素 摘要: 本文深入探讨了SAP协议在AI Agent系统中的实际应用,重点解决三大核心问题:1) System Prompt工程,通过分层设计、场景化示例和动态能力发现,使LLM成为SAP协议专家;2) 错误处理体系,建立从防御到恢复的完整错误分类与处理策略;3) 架构设计模式,提出可扩展、高性能的SAP Agent运行时设计方案。文章详细介绍了P
理论是灰色的,而实践之树常青。上一讲我们探讨了SAP协议的设计哲学,今天让我们进入实战战场,看看如何用SAP构建真正的AI Agent系统。
引言:从协议到系统的跨越
理解了SAP协议的语法和理念,下一步就是将其落地为可运行的系统。但这个过程充满了实际挑战:如何让LLM稳定输出协议格式?如何处理并发请求?如何设计可扩展的架构?
本讲将解决三个核心问题:
- System Prompt工程:如何让LLM成为SAP协议专家?
- 错误处理实战:从理论到实践的完整错误处理体系
- 架构设计模式:可扩展、高性能的SAP Agent运行时设计
一、System Prompt设计:让LLM成为协议专家
1.1 基础Prompt模板:清晰、明确、可执行
一个好的System Prompt需要平衡三个目标:教会规则、提供示例、防止滥用。
# SAP协议基础System Prompt
system_prompt: |
你是一个AI助手,通过SPARK Agent Protocol(SAP)与后端系统交互。
## 协议格式
当你需要查询信息或执行操作时,使用以下格式:
@@<type>:<name>#<id>
{
"参数1": "值1",
"参数2": "值2"
}
@@end
## 消息类型
- query: 查询信息(只读)
- action: 执行操作(写操作)
- describe: 查询操作详情
- result: 系统返回结果(你不主动使用)
- event: 系统事件(你不主动使用)
- error: 错误信息(你不主动使用)
## 重要规则
1. JSON必须是合法JSON
2. 每个块必须以@@end结束
3. 可以在自然语言中嵌入多个协议块
4. 需要系统能力时必须使用协议
5. 不要输出未闭合的协议块
## 可用操作示例
- dataset.table-exists: 检查表是否存在
- file.read: 读取文件
- system.info: 获取系统信息
## 示例对话
用户:检查订单表是否存在
你:我需要先检查订单表。
@@query:dataset.table-exists#1
{
"table": "orders"
}
@@end
1.2 高级Prompt技巧:层次化、场景化、动态化
技巧1:层次化角色定义
# 第一层:核心角色
你是一个专业的数据工程师AI助手,擅长数据库操作、文件处理和系统管理。
# 第二层:协议规范
你通过SAP协议与系统交互,协议格式为@@type:name#id {json} @@end
# 第三层:安全约束
你只允许执行有权限的操作,如果遇到权限问题,明确告知用户。
# 第四层:交互风格
你的回答应该专业但友好,在发送协议前解释你要做什么。
技巧2:场景化示例嵌入
## 场景1:数据查询
用户:查询上周的销售数据
正确做法:
我将帮您查询上周的销售数据。首先检查销售表是否存在,然后查询数据。
@@query:dataset.table-exists#1
{"table": "sales"}
@@end
## 场景2:错误处理
用户:删除不存在的文件
正确做法:
@@action:file.delete#1
{"path": "nonexistent.txt"}
@@end
如果收到RESOURCE_NOT_FOUND错误,回复用户:文件不存在,无需删除。
## 场景3:多步骤操作
用户:备份数据库并压缩
正确做法:
我需要执行多个步骤:1) 导出数据 2) 压缩文件 3) 验证结果
@@action:database.export#1
{"database": "main", "format": "sql"}
@@end
@@action:file.compress#2
{"source": "backup.sql", "destination": "backup.zip"}
@@end
技巧3:动态能力发现
# 在Prompt中明确告知AI如何发现新能力
如果你不确定某个操作是否存在,或者不知道参数格式,可以使用describe操作:
@@describe:dataset.create-table#learn_1
{}
@@end
系统会返回完整的操作说明,包括参数格式、示例和可能错误。
你也可以查询系统所有能力:
@@query:system.capabilities#discover_1
{"scope": "all"}
@@end
1.3 Prompt优化策略:Token效率与效果平衡
| 优化维度 | 策略 | 效果 | 风险 |
|---|---|---|---|
| 长度优化 | 使用缩写、精简示例 | 减少20-30% Token | 可能损失清晰度 |
| 结构优化 | 分章节、使用标记 | 提高可读性 | 增加复杂度 |
| 示例优化 | 选择典型场景 | 提高学习效率 | 可能覆盖不全 |
| 更新策略 | 动态加载部分Prompt | 灵活适应变化 | 实现复杂度高 |
最佳实践:采用三段式Prompt结构
# 1. 核心角色和协议定义(固定,300-500 tokens)
base_prompt = "你是AI助手,使用SAP协议..."
# 2. 当前会话上下文(动态更新)
context_prompt = f"""
当前用户:{user_name}
项目上下文:{project_context}
最近操作:{recent_actions}
"""
# 3. 能力列表(可动态发现)
capabilities_prompt = """
可用操作:
- dataset.query-data
- file.read-content
...
"""
二、错误处理实战:从防御到恢复的完整体系
2.1 错误分类与处理策略
在实战中,我们需要将理论中的错误分类转化为具体的处理逻辑:
2.2 错误处理中间件设计
一个健壮的SAP系统需要多层错误处理:
/**
* SAP错误处理中间件链
*/
public class SapErrorHandlingChain {
private final List<ErrorHandler> handlers = Arrays.asList(
new ValidationErrorHandler(),
new PermissionErrorHandler(),
new ResourceErrorHandler(),
new ExecutionErrorHandler(),
new SystemErrorHandler(),
new FallbackErrorHandler()
);
/**
* 处理异常并生成SAP错误消息
*/
public String handleException(SapMessage request, Exception e) {
SapError error = null;
// 按顺序尝试各个处理器
for (ErrorHandler handler : handlers) {
if (handler.canHandle(e)) {
error = handler.handle(request, e);
break;
}
}
// 确保总有错误返回
if (error == null) {
error = new FallbackErrorHandler().handle(request, e);
}
return error.toSapMessage();
}
}
/**
* 基础错误处理器
*/
public abstract class ErrorHandler {
public abstract boolean canHandle(Exception e);
public abstract SapError handle(SapMessage request, Exception e);
}
/**
* 资源不存在错误处理器
*/
public class ResourceNotFoundHandler extends ErrorHandler {
@Override
public boolean canHandle(Exception e) {
return e instanceof ResourceNotFoundException;
}
@Override
public SapError handle(SapMessage request, Exception e) {
ResourceNotFoundException ex = (ResourceNotFoundException) e;
return SapError.builder()
.code("RESOURCE_NOT_FOUND")
.message(String.format("资源 '%s' 不存在", ex.getResourceName()))
.category(ErrorCategory.RESOURCE_ERROR)
.details(Map.of(
"resource_type", ex.getResourceType(),
"resource_name", ex.getResourceName(),
"suggested_actions", getSuggestedActions(ex)
))
.suggestion(generateSuggestions(ex))
.retryable(false)
.timestamp(Instant.now())
.build();
}
private List<String> generateSuggestions(ResourceNotFoundException ex) {
List<String> suggestions = new ArrayList<>();
suggestions.add("检查资源名称是否正确");
if (ex.getResourceType().equals("TABLE")) {
suggestions.add("使用 dataset.list-tables 查看所有表");
suggestions.add("或使用 dataset.create-table 创建新表");
} else if (ex.getResourceType().equals("FILE")) {
suggestions.add("使用 file.list 查看目录内容");
}
return suggestions;
}
}
2.3 渐进式错误反馈实战
在实际对话中,AI需要引导用户逐步修正问题:
class ProgressiveErrorHandler:
"""渐进式错误处理"""
def __init__(self):
self.error_history = {} # 请求ID -> 错误历史
def handle_error(self, request_id, current_error, user_input=None):
"""处理错误,考虑历史上下文"""
# 获取这个请求的错误历史
history = self.error_history.get(request_id, [])
# 分析错误模式
error_pattern = self.analyze_error_pattern(history, current_error)
# 生成针对性的错误信息
if error_pattern == "MISSING_REQUIRED_FIELD":
suggestion = self.generate_field_suggestion(history, current_error)
elif error_pattern == "VALIDATION_FAILED":
suggestion = self.generate_validation_suggestion(history, current_error)
elif error_pattern == "RESOURCE_NOT_FOUND":
suggestion = self.generate_resource_suggestion(history, current_error)
else:
suggestion = current_error.get("suggestion", [])
# 更新历史
history.append({
"error": current_error,
"timestamp": datetime.now(),
"user_input": user_input
})
self.error_history[request_id] = history
# 构建响应
response = {
"code": current_error["code"],
"message": self.format_message(current_error, history),
"suggestion": suggestion,
"history_length": len(history),
"is_first_error": len(history) == 1
}
return response
def analyze_error_pattern(self, history, current_error):
"""分析错误模式"""
if not history:
return "FIRST_ERROR"
last_error = history[-1]["error"]
# 如果是同一个错误连续出现
if last_error["code"] == current_error["code"]:
return "REPEATED_ERROR"
# 从缺少字段到验证失败
if (last_error["code"] == "MISSING_REQUIRED_FIELD" and
current_error["code"] == "VALIDATION_FAILED"):
return "PROGRESSIVE_VALIDATION"
return "NEW_ERROR"
def format_message(self, error, history):
"""根据历史格式化错误信息"""
if len(history) == 1:
return error["message"]
elif len(history) == 2:
return f"{error['message']} (这是第二次出现此问题)"
else:
return f"{error['message']} (已尝试{len(history)}次,建议检查输入格式)"
2.4 可恢复错误与重试机制
有些错误可以通过重试解决,需要明确告知AI:
{
"code": "EXTERNAL_SERVICE_UNAVAILABLE",
"message": "数据库连接暂时不可用",
"category": "EXECUTION_ERROR",
"details": {
"service": "database",
"error_detail": "Connection timeout after 5000ms"
},
"suggestion": [
"等待30秒后重试",
"检查数据库服务状态",
"如果问题持续,联系系统管理员"
],
"retryable": true,
"retry_info": {
"max_retries": 3,
"retry_delay_ms": 30000,
"backoff_factor": 2
},
"timestamp": "2024-01-15T10:30:45.123Z"
}
重试策略实现:
public class RetryableErrorHandler {
private static final Map<String, RetryPolicy> RETRY_POLICIES = Map.of(
"EXTERNAL_SERVICE_UNAVAILABLE", new RetryPolicy(3, 30000, 2.0),
"NETWORK_TIMEOUT", new RetryPolicy(5, 10000, 1.5),
"RATE_LIMIT_EXCEEDED", new RetryPolicy(2, 60000, 1.0)
);
public boolean shouldRetry(SapError error) {
return error.isRetryable() &&
error.getRetryCount() < getMaxRetries(error.getCode());
}
public long calculateDelay(SapError error) {
RetryPolicy policy = RETRY_POLICIES.get(error.getCode());
if (policy == null) {
return 5000; // 默认5秒
}
// 指数退避
return (long) (policy.baseDelayMs *
Math.pow(policy.backoffFactor, error.getRetryCount()));
}
static class RetryPolicy {
int maxRetries;
long baseDelayMs;
double backoffFactor;
RetryPolicy(int maxRetries, long baseDelayMs, double backoffFactor) {
this.maxRetries = maxRetries;
this.baseDelayMs = baseDelayMs;
this.backoffFactor = backoffFactor;
}
}
}
三、SAP Agent运行时架构设计
3.1 整体架构:模块化、可扩展、高性能
3.2 核心组件详细设计
3.2.1 消息路由器:智能路由与负载均衡
@Component
public class SapMessageRouter {
@Autowired
private SkillRegistry skillRegistry;
@Autowired
private List<RouterMiddleware> middlewares;
@Autowired
private MetricsCollector metricsCollector;
/**
* 路由SAP消息到对应技能
*/
public CompletableFuture<SapMessage> route(SapMessage request) {
// 开始计时
long startTime = System.nanoTime();
// 执行前置中间件
SapMessage processedRequest = executePreMiddlewares(request);
// 提取domain
String domain = extractDomain(processedRequest.getName());
// 查找技能
SapSkill skill = skillRegistry.findSkill(domain);
if (skill == null) {
return CompletableFuture.completedFuture(
createSkillNotFoundError(request, domain)
);
}
// 检查技能是否支持该操作
if (!skill.supportsOperation(extractOperation(processedRequest.getName()))) {
return CompletableFuture.completedFuture(
createUnsupportedOperationError(request)
);
}
// 执行技能
return skill.execute(processedRequest)
.thenApply(response -> {
// 执行后置中间件
SapMessage processedResponse = executePostMiddlewares(request, response);
// 收集指标
recordMetrics(request, processedResponse, startTime);
return processedResponse;
})
.exceptionally(error -> {
// 错误处理
return handleExecutionError(request, error);
});
}
/**
* 执行前置中间件链
*/
private SapMessage executePreMiddlewares(SapMessage request) {
SapMessage current = request;
for (RouterMiddleware middleware : middlewares) {
try {
current = middleware.preProcess(current);
} catch (MiddlewareException e) {
// 中间件可以中断处理链
throw e;
}
}
return current;
}
/**
* 提取domain
*/
private String extractDomain(String actionName) {
int dotIndex = actionName.indexOf('.');
if (dotIndex <= 0) {
throw new InvalidActionNameException("Action name must be in format 'domain.operation'");
}
return actionName.substring(0, dotIndex);
}
/**
* 记录性能指标
*/
private void recordMetrics(SapMessage request, SapMessage response, long startTime) {
long duration = System.nanoTime() - startTime;
metricsCollector.recordRequest(
request.getType(),
request.getName(),
response.getType(),
duration
);
}
}
3.2.2 技能注册表:动态发现与版本管理
@Service
public class SkillRegistry {
private final Map<String, SkillRegistration> registry = new ConcurrentHashMap<>();
private final SkillDiscoveryService discoveryService;
private final SkillVersionManager versionManager;
/**
* 注册技能
*/
public void registerSkill(SapSkill skill, SkillMetadata metadata) {
String domain = skill.getDomain();
SkillRegistration registration = new SkillRegistration(
skill,
metadata,
Instant.now(),
SkillStatus.ACTIVE
);
registry.put(domain, registration);
// 通知发现服务
discoveryService.notifySkillRegistered(domain, metadata);
}
/**
* 查找技能
*/
public SapSkill findSkill(String domain) {
SkillRegistration registration = registry.get(domain);
if (registration == null || registration.getStatus() != SkillStatus.ACTIVE) {
return null;
}
// 检查版本兼容性
if (!versionManager.isCompatible(registration.getMetadata().getVersion())) {
throw new VersionIncompatibleException(
"Skill version is not compatible: " +
registration.getMetadata().getVersion()
);
}
return registration.getSkill();
}
/**
* 获取所有可用技能
*/
public List<SkillMetadata> listSkills() {
return registry.values().stream()
.filter(reg -> reg.getStatus() == SkillStatus.ACTIVE)
.map(SkillRegistration::getMetadata)
.collect(Collectors.toList());
}
/**
* 动态加载技能
*/
public void loadSkill(String skillClass, Properties config) {
try {
Class<?> clazz = Class.forName(skillClass);
SapSkill skill = (SapSkill) clazz.getDeclaredConstructor().newInstance();
// 配置技能
skill.configure(config);
// 创建元数据
SkillMetadata metadata = SkillMetadata.builder()
.domain(skill.getDomain())
.version("1.0.0")
.description(skill.getDescription())
.operations(skill.getSupportedOperations())
.build();
// 注册技能
registerSkill(skill, metadata);
} catch (Exception e) {
throw new SkillLoadException("Failed to load skill: " + skillClass, e);
}
}
/**
* 技能注册信息
*/
@Data
@AllArgsConstructor
private static class SkillRegistration {
private final SapSkill skill;
private final SkillMetadata metadata;
private final Instant registeredAt;
private volatile SkillStatus status;
}
}
3.2.3 中间件系统:横切关注点统一处理
/**
* 路由器中间件接口
*/
public interface RouterMiddleware {
/**
* 前置处理
*/
SapMessage preProcess(SapMessage request) throws MiddlewareException;
/**
* 后置处理
*/
default SapMessage postProcess(SapMessage request, SapMessage response) {
return response;
}
}
/**
* 认证中间件
*/
@Component
@Order(10) // 执行顺序
public class AuthenticationMiddleware implements RouterMiddleware {
@Autowired
private AuthenticationService authService;
@Override
public SapMessage preProcess(SapMessage request) {
// 从请求中提取认证信息
String token = extractToken(request);
if (token == null) {
throw new AuthenticationException("Missing authentication token");
}
// 验证token
UserContext user = authService.authenticate(token);
if (user == null) {
throw new AuthenticationException("Invalid authentication token");
}
// 将用户上下文添加到请求中
return request.withContext("user", user);
}
private String extractToken(SapMessage request) {
// 从header或body中提取token
if (request.getBody() instanceof Map) {
Map<String, Object> body = (Map<String, Object>) request.getBody();
return (String) body.get("_token");
}
return null;
}
}
/**
* 限流中间件
*/
@Component
@Order(20)
public class RateLimitMiddleware implements RouterMiddleware {
@Autowired
private RateLimiter rateLimiter;
@Override
public SapMessage preProcess(SapMessage request) {
String userId = getUserId(request);
String operation = request.getName();
if (!rateLimiter.tryAcquire(userId, operation)) {
throw new RateLimitExceededException(
"Rate limit exceeded for operation: " + operation
);
}
return request;
}
private String getUserId(SapMessage request) {
UserContext user = (UserContext) request.getContext("user");
return user != null ? user.getId() : "anonymous";
}
}
/**
* 日志中间件
*/
@Component
@Order(30)
public class LoggingMiddleware implements RouterMiddleware {
private static final Logger logger = LoggerFactory.getLogger(LoggingMiddleware.class);
@Override
public SapMessage preProcess(SapMessage request) {
logger.info("Incoming SAP request: type={}, name={}, id={}",
request.getType(), request.getName(), request.getId());
return request;
}
@Override
public SapMessage postProcess(SapMessage request, SapMessage response) {
long duration = calculateDuration(request, response);
logger.info("SAP request completed: name={}, id={}, duration={}ms, status={}",
request.getName(), request.getId(), duration,
response.getType().equals("error") ? "error" : "success");
return response;
}
private long calculateDuration(SapMessage request, SapMessage response) {
// 从请求/响应中提取时间戳计算持续时间
return 0; // 简化实现
}
}
3.3 技能开发框架
3.3.1 技能基类:标准化接口与生命周期
/**
* SAP技能基类
*/
public abstract class SapSkill {
private final String domain;
private final String version;
private final List<String> supportedOperations;
protected SapSkill(String domain, String version) {
this.domain = domain;
this.version = version;
this.supportedOperations = discoverSupportedOperations();
}
/**
* 执行SAP请求
*/
public abstract CompletableFuture<SapMessage> execute(SapMessage request);
/**
* 获取技能描述
*/
public abstract String getDescription();
/**
* 获取技能元数据
*/
public SkillMetadata getMetadata() {
return SkillMetadata.builder()
.domain(domain)
.version(version)
.description(getDescription())
.operations(getSupportedOperations())
.build();
}
/**
* 发现支持的操作用注解或配置文件
*/
private List<String> discoverSupportedOperations() {
List<String> operations = new ArrayList<>();
// 通过反射查找带有@SapOperation注解的方法
for (Method method : this.getClass().getDeclaredMethods()) {
SapOperation annotation = method.getAnnotation(SapOperation.class);
if (annotation != null) {
operations.add(annotation.value());
}
}
return operations;
}
/**
* 检查是否支持某个操作
*/
public boolean supportsOperation(String operation) {
return supportedOperations.contains(operation);
}
/**
* 创建成功响应
*/
protected SapMessage createSuccess(SapMessage request, Object data) {
Map<String, Object> body = new HashMap<>();
body.put("status", "success");
body.put("data", data);
body.put("timestamp", Instant.now().toString());
return new SapMessage(
"result",
request.getName(),
request.getId(),
body
);
}
/**
* 创建进度事件
*/
protected SapMessage createProgressEvent(String requestId, String task, int percent) {
Map<String, Object> body = new HashMap<>();
body.put("task", task);
body.put("percent", percent);
body.put("timestamp", Instant.now().toString());
return new SapMessage(
"event",
"progress",
requestId,
body
);
}
}
3.3.2 文件操作技能示例
/**
* 文件操作技能实现
*/
@Service
@SapSkillMetadata(
domain = "file",
version = "1.1.0",
description = "文件系统操作技能,支持读写、列表、删除等操作"
)
public class FileSkill extends SapSkill {
public FileSkill() {
super("file", "1.1.0");
}
@Override
public String getDescription() {
return "提供文件系统操作能力,包括读写文件、目录操作等";
}
@Override
public CompletableFuture<SapMessage> execute(SapMessage request) {
return CompletableFuture.supplyAsync(() -> {
String operation = extractOperation(request.getName());
switch (operation) {
case "read":
return handleRead(request);
case "write":
return handleWrite(request);
case "list":
return handleList(request);
case "delete":
return handleDelete(request);
case "info":
return handleInfo(request);
default:
throw new UnsupportedOperationException(
"Unsupported file operation: " + operation
);
}
});
}
@SapOperation("read")
private SapMessage handleRead(SapMessage request) {
Map<String, Object> params = (Map<String, Object>) request.getBody();
// 验证参数
validateRequired(params, "path");
String path = (String) params.get("path");
String encoding = (String) params.getOrDefault("encoding", "UTF-8");
try {
// 读取文件
String content = Files.readString(Paths.get(path), Charset.forName(encoding));
// 构建响应
Map<String, Object> data = new HashMap<>();
data.put("content", content);
data.put("path", path);
data.put("encoding", encoding);
data.put("size", content.length());
return createSuccess(request, data);
} catch (IOException e) {
throw new SkillExecutionException("Failed to read file: " + path, e);
}
}
@SapOperation("write")
private SapMessage handleWrite(SapMessage request) {
Map<String, Object> params = (Map<String, Object>) request.getBody();
validateRequired(params, "path", "content");
String path = (String) params.get("path");
String content = (String) params.get("content");
boolean append = (Boolean) params.getOrDefault("append", false);
String encoding = (String) params.getOrDefault("encoding", "UTF-8");
try {
StandardOpenOption option = append ?
StandardOpenOption.APPEND : StandardOpenOption.TRUNCATE_EXISTING;
Files.writeString(
Paths.get(path),
content,
Charset.forName(encoding),
StandardOpenOption.CREATE,
option
);
// 返回写入结果
Map<String, Object> data = new HashMap<>();
data.put("path", path);
data.put("bytes_written", content.getBytes(encoding).length);
data.put("append", append);
return createSuccess(request, data);
} catch (IOException e) {
throw new SkillExecutionException("Failed to write file: " + path, e);
}
}
/**
* 验证必需参数
*/
private void validateRequired(Map<String, Object> params, String... requiredFields) {
for (String field : requiredFields) {
if (!params.containsKey(field) || params.get(field) == null) {
throw new ValidationException("Missing required parameter: " + field);
}
}
}
}
3.4 性能优化实战
3.4.1 连接池与资源管理
@Component
public class ResourcePoolManager {
private final Map<Class<?>, GenericObjectPool<?>> pools = new ConcurrentHashMap<>();
/**
* 获取数据库连接
*/
public Connection getConnection() throws Exception {
return getResource(Connection.class, () -> {
// 创建新连接
return dataSource.getConnection();
});
}
/**
* 获取泛型资源
*/
@SuppressWarnings("unchecked")
public <T> T getResource(Class<T> resourceType, Supplier<T> creator) throws Exception {
GenericObjectPool<T> pool = (GenericObjectPool<T>) pools.computeIfAbsent(
resourceType,
k -> createPool(creator)
);
return pool.borrowObject();
}
/**
* 归还资源
*/
public <T> void returnResource(Class<T> resourceType, T resource) {
@SuppressWarnings("unchecked")
GenericObjectPool<T> pool = (GenericObjectPool<T>) pools.get(resourceType);
if (pool != null) {
pool.returnObject(resource);
}
}
private <T> GenericObjectPool<T> createPool(Supplier<T> creator) {
GenericObjectPoolConfig<T> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(20);
config.setMaxIdle(10);
config.setMinIdle(5);
config.setTestOnBorrow(true);
return new GenericObjectPool<>(new BasePooledObjectFactory<T>() {
@Override
public T create() throws Exception {
return creator.get();
}
@Override
public PooledObject<T> wrap(T obj) {
return new DefaultPooledObject<>(obj);
}
}, config);
}
}
3.4.2 缓存策略实现
@Component
public class SapResponseCache {
private final Cache<String, CachedResponse> cache;
public SapResponseCache() {
this.cache = Caffeine.newBuilder()
.maximumSize(1000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.recordStats()
.build();
}
/**
* 获取缓存响应
*/
public Optional<SapMessage> get(String cacheKey) {
CachedResponse cached = cache.getIfPresent(cacheKey);
if (cached == null || isStale(cached)) {
return Optional.empty();
}
return Optional.of(cached.getResponse());
}
/**
* 设置缓存
*/
public void put(String cacheKey, SapMessage response, CacheConfig config) {
CachedResponse cached = new CachedResponse(
response,
Instant.now(),
config.getTtlSeconds()
);
cache.put(cacheKey, cached);
}
/**
* 生成缓存键
*/
public String generateKey(SapMessage request) {
// 基于请求内容生成唯一键
String requestHash = DigestUtils.md5Hex(
request.getType() + ":" +
request.getName() + ":" +
JsonUtils.toJson(request.getBody())
);
return String.format("sap:%s:%s", request.getType(), requestHash);
}
/**
* 检查缓存是否过期
*/
private boolean isStale(CachedResponse cached) {
return cached.getCachedAt()
.plusSeconds(cached.getTtlSeconds())
.isBefore(Instant.now());
}
@Data
@AllArgsConstructor
private static class CachedResponse {
private final SapMessage response;
private final Instant cachedAt;
private final long ttlSeconds;
}
@Data
public static class CacheConfig {
private long ttlSeconds = 300; // 默认5分钟
private boolean enabled = true;
}
}
3.4.3 异步处理与流式响应
@Component
public class StreamResponseHandler {
private final SseEmitterFactory sseEmitterFactory;
/**
* 处理流式响应
*/
public SseEmitter handleStreamRequest(SapMessage request) {
SseEmitter emitter = sseEmitterFactory.create();
// 异步处理
CompletableFuture.runAsync(() -> {
try {
// 发送开始事件
emitter.send(createProgressEvent(request.getId(), 0));
// 模拟分步处理
for (int i = 1; i <= 10; i++) {
Thread.sleep(1000); // 模拟耗时操作
// 发送进度事件
emitter.send(createProgressEvent(request.getId(), i * 10));
// 检查客户端是否断开
if (emitter.isDisconnected()) {
break;
}
}
// 发送完成事件
emitter.send(createCompletionEvent(request.getId()));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
/**
* 创建进度事件
*/
private SseEmitter.SseEventBuilder createProgressEvent(String requestId, int percent) {
Map<String, Object> data = new HashMap<>();
data.put("request_id", requestId);
data.put("percent", percent);
data.put("timestamp", Instant.now().toString());
String eventData = String.format(
"@@event:progress#%s\n%s\n@@end",
requestId,
JsonUtils.toJson(data)
);
return SseEmitter.event()
.name("sap")
.data(eventData);
}
}
四、监控与可观测性
4.1 指标收集与暴露
# application-metrics.yml
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
metrics:
export:
prometheus:
enabled: true
distribution:
percentiles-histogram:
http.server.requests: true
tags:
application: sap-agent-runtime
enable:
jvm: true
system: true
logback: true
# 自定义SAP指标
sap:
metrics:
enabled: true
request-duration-buckets: 10,50,100,500,1000,5000
request-size-buckets: 1024,10240,102400,1048576
4.2 关键监控指标
@Component
public class SapMetricsCollector {
private final MeterRegistry meterRegistry;
// 计数器
private final Counter requestCounter;
private final Counter errorCounter;
// 计时器
private final Timer requestTimer;
// 计量器
private final DistributionSummary requestSizeSummary;
public SapMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 初始化指标
this.requestCounter = Counter.builder("sap.requests.total")
.description("Total number of SAP requests")
.tag("type", "all")
.register(meterRegistry);
this.errorCounter = Counter.builder("sap.errors.total")
.description("Total number of SAP errors")
.tag("type", "all")
.register(meterRegistry);
this.requestTimer = Timer.builder("sap.request.duration")
.description("Time taken to process SAP requests")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
this.requestSizeSummary = DistributionSummary.builder("sap.request.size")
.description("Size of SAP requests in bytes")
.baseUnit("bytes")
.register(meterRegistry);
}
/**
* 记录请求指标
*/
public void recordRequest(SapMessage request, SapMessage response, long durationMs) {
String type = request.getType();
String name = request.getName();
String status = response.getType().equals("error") ? "error" : "success";
// 计数
requestCounter.increment();
if ("error".equals(status)) {
errorCounter.increment();
}
// 计时
requestTimer.record(durationMs, TimeUnit.MILLISECONDS);
// 记录带标签的指标
meterRegistry.counter("sap.requests.by_type",
"type", type,
"name", name,
"status", status
).increment();
// 记录请求大小
int requestSize = estimateRequestSize(request);
requestSizeSummary.record(requestSize);
}
/**
* 获取当前指标快照
*/
public Map<String, Object> getMetricsSnapshot() {
Map<String, Object> snapshot = new HashMap<>();
// 请求统计
snapshot.put("total_requests", requestCounter.count());
snapshot.put("total_errors", errorCounter.count());
snapshot.put("error_rate", errorCounter.count() / Math.max(requestCounter.count(), 1));
// 延迟统计
snapshot.put("request_duration_p50", requestTimer.takeSnapshot().percentile(0.5));
snapshot.put("request_duration_p95", requestTimer.takeSnapshot().percentile(0.95));
snapshot.put("request_duration_p99", requestTimer.takeSnapshot().percentile(0.99));
return snapshot;
}
}
4.3 告警规则配置
# prometheus-alerts.yml
groups:
- name: sap_agent_alerts
rules:
- alert: HighErrorRate
expr: rate(sap_errors_total[5m]) / rate(sap_requests_total[5m]) > 0.1
for: 2m
labels:
severity: warning
annotations:
summary: "SAP Agent错误率过高"
description: "过去5分钟错误率超过10%,当前值: {{ $value }}"
- alert: HighRequestLatency
expr: histogram_quantile(0.95, rate(sap_request_duration_seconds_bucket[5m])) > 5
for: 2m
labels:
severity: warning
annotations:
summary: "SAP Agent请求延迟过高"
description: "95分位请求延迟超过5秒,当前值: {{ $value }}s"
- alert: SkillExecutionFailed
expr: increase(sap_skill_errors_total{skill!=""}[5m]) > 10
for: 1m
labels:
severity: critical
annotations:
summary: "技能执行频繁失败"
description: "技能 {{ $labels.skill }} 在5分钟内失败超过10次"
结语:从协议到系统的完整实现
通过本讲的实战指南,我们完成了从SAP协议理论到完整系统实现的跨越。我们不仅设计了清晰的System Prompt让LLM理解协议,还构建了健壮的错误处理体系和可扩展的运行时架构。
关键收获:
- System Prompt是协议落地的关键:需要精心设计平衡指导性和灵活性
- 错误处理需要结构化+渐进式:让AI能够从错误中学习并恢复
- 架构设计遵循关注点分离:中间件、路由器、技能注册表各司其职
- 性能优化需要多层次策略:从连接池到缓存再到异步处理
- 可观测性不可或缺:监控、指标、告警是生产系统的眼睛
但我们的探索还未结束。在下篇中,我们将:
- 深度对比主流框架:SAP vs LangChain vs OpenAI Function Calling
- 性能基准测试实战:用数据说话,看SAP的真实表现
- 生态建设路线图:如何构建SAP的开源生态
- 快速开始完整指南:从零到一的完整项目示例
无论你是想要评估SAP协议的技术决策者,还是准备将其应用于项目的开发者,下篇都将为你提供最终的决策依据和实践指南。
下一篇预告:《SAP协议系列(下):生态展望——为什么SAP是AI Agent的未来通信标准》
我们将回答最关键的问题:
- SAP在实际性能测试中表现如何?
- 与主流框架相比,SAP的优势和劣势是什么?
- 如何开始你的第一个SAP项目?
- SAP生态系统的未来发展是什么?
敬请期待最终篇!
更多推荐



所有评论(0)