MCP调用遇到的坑
总体实现不难,主要注意官方MCP的接口协议,和实际调试遇到的问题。当然也需要规范线程池的使用,控制资源调度和并发量。
为什么要使用MCP?
在大模型时代,我们用豆包、chatgpt等工具去提问问题,大模型会在已经训练的数据中搜索答案。而有了MCP,你要搜索高德地图,大模型去调用高德的服务,你要搜索火车票,大模型会去调用12306的服务,当然也可以调用任意一家开放了MCP接口的企业的服务!
1、产品需求
上周产品需求是集成阿里的MCP工具,官方的地址:https://www.modelscope.cn/mcp
系统功能截图如下,此功能
2、实现方案
介绍:调用MCP实际分为两步,第一步连接sse接口,第二步,发送post请求。第二步请求的结果会在第一步sse接口中返回。
很简单,只需要用http工具去请求这俩接口,拼装一下参数即可。
3、遇到问题
在开发过程中遇到两个坑:
1、第一个坑是mcp接收消息,长消息只接收到前一部分数据,后面数据丢失
经过排查发现,长消息服务端是通过多段流返回的,需要拼接处理,这边贴一下代码,主要
看subscribe方法的实现。
public SseEmitter getHttpSSE(String url, String Authorization, String logTitle) {
// 流式请求参数
WebClient webClient = SslUtils.createWebClient();
log.info("【{}】模型sse接口请求:url={},Authorization={}", logTitle, url, Authorization);
// 请求参数封装
WebClient.RequestHeadersUriSpec<?> getRequest = webClient.get();
// 创建SSE发射器,设置超时时间(0表示不超时)
SseEmitter emitter = new SseEmitter(0L);
// 处理连接关闭事件
emitter.onCompletion(() -> System.out.println("SSE连接已完成"));
emitter.onTimeout(() -> handleTimeout(emitter));
emitter.onError(e -> handleError(emitter, e));
final AtomicReference<StringBuilder> bufferRef = new AtomicReference<>(new StringBuilder());
getRequest
.uri(url)
// 模型权限认证
.header("Authorization", Authorization)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.header(HttpHeaders.ACCEPT, MediaType.TEXT_EVENT_STREAM_VALUE)
.retrieve()
.bodyToFlux(DataBuffer.class)
.onBackpressureBuffer()
// 连接结束关闭接口
.doFinally(signalType -> {
emitter.complete();
})
.subscribe(
// 处理接收到的每一条数据
data -> {
try {
byte[] bytes = new byte[data.readableByteCount()];
data.read(bytes);
String chunk = new String(bytes);
// 累积数据块
StringBuilder currentBuffer = bufferRef.get();
currentBuffer.append(chunk);
// 检查是否包含完整的事件(以两个换行符结尾)
String accumulatedData = currentBuffer.toString();
int endIndex = accumulatedData.indexOf("\n\n");
if (endIndex != -1) {
// 提取完整事件
String completeEvent = accumulatedData.substring(0, endIndex + 2);
// 保留剩余数据用于下一次处理
String remainingData = accumulatedData.substring(endIndex + 2);
bufferRef.set(new StringBuilder(remainingData));
// 解析完整事件
Map<String, String> parsed = parseEventString(completeEvent);
if (CollUtil.isNotEmpty(parsed)) {
emitter.send(SseEmitter.event()
.name(parsed.get("event"))
.data(parsed.get("data"), MediaType.TEXT_PLAIN));
}
}
DataBufferUtils.release(data);
} catch (IOException e) {
emitter.completeWithError(e);
}
},
// 处理错误
error -> {
handleError(emitter, error);
emitter.completeWithError(error);
}
);
return emitter;
}
第二个坑是:请求post接口时,入参和协议完全一致,返回却是失败
此处不得不用wireshark抓取两个请求的网络包做了比对
最终发现两次请求头中header的参数存在差异,最后排除法测试出,高德部分工具不支持content-type为application/json;charset=UTF-8,只能设置为application/json
4、总结
总体实现不难,主要注意官方MCP的接口协议,和实际调试遇到的问题。当然也需要规范线程池的使用,控制资源调度和并发量。
更多推荐
所有评论(0)