5.1 项目愿景:AI的“通用数据总线”

经过前面四个章节的铺垫,我们已经掌握了MCP的核心概念、协议规范,并亲手构建了Server和Host。现在,是时候将所有知识融会贯通,挑战一个更宏大、更接近真实世界应用场景的“实践项目”了。

项目目标: 构建一个多源AI研究助理 (Multi-Source AI Research Assistant)

这个助理将具备以下能力:

  1. 连接多个异构数据源:它能同时与多个MCP Server建立连接。每个Server都代表一个独立的数据源或一组工具。在我们的项目中,我们将构建两个Server:
    • 本地文件系统Server:与第三章类似,提供对本地研究资料(如PDF、Markdown文件)的访问能力。
    • 模拟Wikipedia Server:一个全新的Server,它将模拟一个在线知识库(如Wikipedia),提供按主题查询文章摘要的工具。
  2. 统一的交互接口:AI助理(Host)将通过MCP协议与这两个完全不同的Server进行交互,而不需要为每个Server编写特殊的连接或解析代码。这正是MCP“一次实现,到处连接”设计哲学的体现。
  3. 智能综合与回答:当用户提出一个研究问题时(例如:“请总结一下关于‘Transformer模型’的资料,并结合我本地的笔记‘attention_is_all_you_need.md’进行说明。”),AI助理将:
    • 智能路由:判断需要从哪些Server获取信息。
    • 并行查询:同时向Wikipedia Server查询“Transformer模型”,并向文件系统Server读取本地笔记。
    • 信息综合:将从多个来源获取的信息整合在一起,形成一个全面、连贯的答案,最终呈现给用户。

这个项目将完美地诠释MCP如何充当AI应用的“通用数据总线”。无论后端的数据源是文件系统、数据库、Web API还是专有服务,只要它们都通过MCP Server暴露能力,上层的AI应用(Host)就可以用一种标准化的方式无缝地接入和使用它们,极大地降低了构建复杂、多源信息应用的难度。


5.2 项目架构设计

在动手编码之前,我们先用一张清晰的架构图来描绘整个系统的全貌。

5.2.1 Mermaid流程图

MCP Servers (能力提供者)
Server A: Local Filesystem
Server B: Wikipedia API
AI Research Assistant (Host)
User
提问
综合信息, 生成答案
1. 连接/发送请求
2. 接收响应
fs/readFile, ...
响应
操作
project/getSummary
响应
API调用
🌐 模拟Wikipedia Server
🌍 Wikipedia API
📄 文件系统Server
🗂️ 本地研究资料
🔌 MCP Host Multiplexer
🤖 AI研究助理
👨‍💻 研究员

5.2.2 架构解析

  1. 用户 (User):项目的起点,向AI研究助理提出自然语言问题。

  2. AI研究助理 (Host):这是我们实践项目的核心,它本身也是一个MCP Host。但与第四章不同的是,它内部包含一个关键组件:

    • MCP Host Multiplexer (MUX):可以称之为“多路复用器”。这是我们Host端需要实现的核心逻辑。它负责维护与多个下游Server的连接,当Host需要发送请求时,MUX会根据请求的方法(如fs/前缀或project/前缀)或目标Server的标识,将请求路由到正确的Server。它还负责接收来自所有Server的响应,并将其分发回Host内部等待的协程。
  3. MCP Servers:我们有两个独立的Server进程。

    • 文件系统Server:基本复用第三章的代码,负责暴露本地文件系统的读写能力。
    • 模拟Wikipedia Server:一个全新的Python程序。它将实现一个自定义的project/getSummary工具。当被调用时,它会(在我们的模拟中)返回一个关于特定主题的硬编码文本摘要,以模仿对真实Wikipedia API的调用。
  4. 数据源 (Data Sources)

    • 本地研究资料:一个包含Markdown文件和子目录的文件夹。
    • Wikipedia API:一个外部知识源,我们的模拟Server将与之交互。

这个架构的精妙之处在于,AI研究助理(Host)的核心逻辑与数据源的具体实现完全解耦。未来如果想增加新的数据源,比如一个SQL数据库或一个Notion页面,我们只需要再实现一个新的MCP Server,然后在Host的MUX中注册它即可,Host的核心代码几乎无需改动。


5.3 编码实现

现在,让我们开始动手构建这个系统。我们将创建一个新的项目目录ai-research-assistant

5.3.1 项目初始化

# 创建项目主目录
mkdir ai-research-assistant
cd ai-research-assistant

# 创建Python虚拟环境
python3 -m venv .venv
source .venv/bin/activate

# 安装必要的库 (我们只需要mcp-sdk和openai)
pip install mcp-sdk openai

# 创建Server和Host的目录
mkdir fs_server
mkdir wiki_server
mkdir assistant_host

# 创建本地研究资料
mkdir -p fs_server/workspace/notes
echo "Attention is All You Need 论文的核心思想是..." > fs_server/workspace/notes/attention_is_all_you_need.md
echo "# 项目简介\n这是一个多源AI研究助理项目" > fs_server/workspace/README.md

# 创建空的主程序文件
touch fs_server/main.py
touch wiki_server/main.py
touch assistant_host/main.py

5.3.2 实现文件系统Server (fs_server/main.py)

这部分代码与我们第三章的Server非常相似,只是做了一些微调,使其成为一个独立的、可执行的程序。我们直接使用最终代码。

# fs_server/main.py

import asyncio
import json
import logging
import os
import base64
from typing import Dict, Any

# --- 与第三章相同的安全路径函数 ---
def _to_safe_path(workspace_root: str, relative_path: str) -> str:
    # ... (省略与第三章完全相同的代码)
    base_path = os.path.abspath(workspace_root)
    requested_path = os.path.abspath(os.path.join(base_path, relative_path))
    if not requested_path.startswith(base_path):
        raise ValueError("Path traversal attempt detected")
    return requested_path

class FileSystemServer:
    def __init__(self, workspace_root: str):
        self.workspace_root = os.path.abspath(workspace_root)
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - FS_SERVER - %(levelname)s - %(message)s')

    async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        method = request.get("method")
        params = request.get("params", {})
        request_id = request.get("id")

        try:
            if method == "fs/listDirectory":
                result = self._list_directory(params.get("path", "."))
            elif method == "fs/readFile":
                result = self._read_file(params.get("path"))
            else:
                raise NotImplementedError(f"Method '{method}' not supported.")
            
            return {"jsonrpc": "2.0", "id": request_id, "result": result}
        except Exception as e:
            return {"jsonrpc": "2.0", "id": request_id, "error": {"code": -32000, "message": str(e)}}

    def _list_directory(self, path: str):
        safe_path = _to_safe_path(self.workspace_root, path)
        # ... (省略与第三章完全相同的代码)
        entries = []
        for entry in os.scandir(safe_path):
            entries.append({"name": entry.name, "type": "directory" if entry.is_dir() else "file"})
        return entries

    def _read_file(self, path: str):
        safe_path = _to_safe_path(self.workspace_root, path)
        # ... (省略与第三章完全相同的代码)
        with open(safe_path, 'rb') as f:
            content = f.read()
        return {"content": base64.b64encode(content).decode('utf-8')}

async def main():
    server = FileSystemServer("./workspace")
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await asyncio.get_running_loop().connect_read_pipe(lambda: protocol, asyncio.get_event_loop()._default_reader)
    writer = asyncio.StreamWriter(protocol, asyncio.get_event_loop(), None, None)

    logging.info("File System Server started. Waiting for requests...")

    while True:
        line = await reader.readline()
        if not line:
            break
        request = json.loads(line.decode('utf-8'))
        response = await server.handle_request(request)
        response_line = json.dumps(response) + '\n'
        sys.stdout.write(response_line)
        await sys.stdout.flush()

if __name__ == "__main__":
    import sys
    # 重定向stderr到stdout,这样Host可以捕获所有输出
    sys.stderr = sys.stdout
    asyncio.run(main())

5.3.3 实现模拟Wikipedia Server (wiki_server/main.py)

这个Server的结构与文件系统Server类似,但它实现的是自定义工具而不是文件系统能力。

# wiki_server/main.py

import asyncio
import json
import logging
from typing import Dict, Any

class WikiServer:
    def __init__(self):
        # 模拟的维基百科数据库
        self._db = {
            "transformer model": "The Transformer model, introduced in 'Attention Is All You Need', is a deep learning architecture based on the self-attention mechanism. It is notable for processing entire input sequences at once, enabling significant parallelization and reducing training time.",
            "mcp": "The Model Context Protocol (MCP) is an open protocol designed to standardize how AI applications provide context to large language models. It acts like a universal interface, decoupling the AI's reasoning core from the specifics of data sources and tools."
        }
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - WIKI_SERVER - %(levelname)s - %(message)s')

    async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        method = request.get("method")
        params = request.get("params", {})
        request_id = request.get("id")

        try:
            if method == "project/listTools":
                result = self._list_tools()
            elif method == "project/executeTool":
                result = self._execute_tool(params.get("name"), params.get("parameters"))
            else:
                raise NotImplementedError(f"Method '{method}' not supported.")
            
            return {"jsonrpc": "2.0", "id": request_id, "result": result}
        except Exception as e:
            return {"jsonrpc": "2.0", "id": request_id, "error": {"code": -32000, "message": str(e)}}

    def _list_tools(self):
        return [{
            "name": "project/getSummary",
            "description": "Retrieves a brief summary of a given topic from the knowledge base.",
            "parameters": [{
                "name": "topic",
                "type": "string",
                "required": True
            }]
        }]

    def _execute_tool(self, name: str, parameters: Dict[str, Any]):
        if name != "project/getSummary":
            raise ValueError(f"Tool '{name}' not found.")
        
        topic = parameters.get("topic", "").lower()
        summary = self._db.get(topic, f"Sorry, I don't have information on '{topic}'.")
        
        return {
            "result": {"summary": summary},
            "stdout": f"Successfully retrieved summary for '{topic}'.",
            "stderr": None
        }

async def main():
    # ... (main函数与fs_server/main.py完全相同,除了日志前缀)
    server = WikiServer()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await asyncio.get_running_loop().connect_read_pipe(lambda: protocol, asyncio.get_event_loop()._default_reader)
    writer = asyncio.StreamWriter(protocol, asyncio.get_event_loop(), None, None)

    logging.info("Wikipedia Server started. Waiting for requests...")

    while True:
        line = await reader.readline()
        if not line:
            break
        request = json.loads(line.decode('utf-8'))
        response = await server.handle_request(request)
        response_line = json.dumps(response) + '\n'
        sys.stdout.write(response_line)
        await sys.stdout.flush()

if __name__ == "__main__":
    import sys
    sys.stderr = sys.stdout
    asyncio.run(main())

5.3.4 实现AI研究助理 (assistant_host/main.py)

这是我们项目的核心,它将实现前面提到的Multiplexer,并集成OpenAI来理解和回答问题。

# assistant_host/main.py

import asyncio
import json
import logging
import os
import base64
from typing import Dict, Any, Optional, List
import openai

# --- StdioMcpConnection 类 --- #
# 这是一个辅助类,封装了与单个Server子进程的通信逻辑
# 它与第四章的StdioMcpHost非常相似
class StdioMcpConnection:
    def __init__(self, name: str, command: str, cwd: str):
        self.name = name
        self.command = command
        self.cwd = cwd
        self.process: Optional[asyncio.subprocess.Process] = None
        self.reader: Optional[asyncio.StreamReader] = None
        self.writer: Optional[asyncio.StreamWriter] = None
        self._request_id_counter = 0
        self._pending_requests: Dict[int, asyncio.Future] = {}
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - ASSISTANT - %(levelname)s - %(message)s')

    async def start(self):
        self.process = await asyncio.create_subprocess_shell(
            self.command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, 
            stderr=asyncio.subprocess.PIPE, cwd=self.cwd
        )
        self.reader, self.writer = self.process.stdout, self.process.stdin
        asyncio.create_task(self._listen())
        asyncio.create_task(self._log_stderr())
        logging.info(f"Connection '{self.name}' started.")

    async def _listen(self):
        # ... (与第四章 _listen_to_server 类似)
        while self.process.returncode is None:
            line = await self.reader.readline()
            if not line: break
            response = json.loads(line.decode('utf-8'))
            req_id = response.get('id')
            if req_id in self._pending_requests:
                self._pending_requests.pop(req_id).set_result(response)

    async def _log_stderr(self):
        # ... (与第四章 _listen_to_stderr 类似)
        while self.process.returncode is None:
            line = await self.process.stderr.readline()
            if not line: break
            logging.info(f"[{self.name}_STDERR] {line.decode('utf-8').strip()}")

    async def send_request(self, method: str, params: Dict) -> Dict:
        # ... (与第四章 send_request 类似)
        self._request_id_counter += 1
        req_id = self._request_id_counter
        req = {"jsonrpc": "2.0", "id": req_id, "method": method, "params": params}
        future = asyncio.get_running_loop().create_future()
        self._pending_requests[req_id] = future
        self.writer.write((json.dumps(req) + '\n').encode('utf-8'))
        await self.writer.drain()
        return await asyncio.wait_for(future, timeout=15.0)

    async def stop(self):
        if self.process: self.process.terminate(); await self.process.wait()

# --- MCP Multiplexer --- #
class McpMultiplexer:
    def __init__(self):
        self.connections: Dict[str, StdioMcpConnection] = {}

    def add_connection(self, conn: StdioMcpConnection):
        self.connections[conn.name] = conn

    async def start_all(self):
        await asyncio.gather(*(conn.start() for conn in self.connections.values()))

    async def stop_all(self):
        await asyncio.gather(*(conn.stop() for conn in self.connections.values()))

    async def request(self, server_name: str, method: str, params: Dict = {}) -> Dict:
        if server_name not in self.connections:
            raise ValueError(f"Server '{server_name}' not found.")
        return await self.connections[server_name].send_request(method, params)

# --- AI Research Assistant --- #
class AiResearchAssistant:
    def __init__(self, multiplexer: McpMultiplexer, openai_api_key: str):
        self.mux = multiplexer
        openai.api_key = openai_api_key

    async def answer(self, question: str) -> str:
        logging.info(f"Received question: '{question}'")
        # 简单的关键词路由逻辑
        context_parts = []
        tasks = []

        if "transformer" in question.lower():
            tasks.append(self.mux.request("wiki", "project/executeTool", 
                {"name": "project/getSummary", "parameters": {"topic": "transformer model"}}))
        
        if "mcp" in question.lower():
            tasks.append(self.mux.request("wiki", "project/executeTool", 
                {"name": "project/getSummary", "parameters": {"topic": "mcp"}}))

        if "attention_is_all_you_need.md" in question:
            tasks.append(self.mux.request("fs", "fs/readFile", {"path": "notes/attention_is_all_you_need.md"}))

        if not tasks:
            return "I'm not sure how to answer that. Please ask about 'Transformer model', 'MCP', or include 'attention_is_all_you_need.md' in your question."

        logging.info(f"Gathering context from {len(tasks)} sources...")
        responses = await asyncio.gather(*tasks, return_exceptions=True)

        # 构建上下文
        for i, res in enumerate(responses):
            if isinstance(res, Exception):
                context_parts.append(f"Error from source {i}: {res}")
                continue
            
            if 'result' in res:
                if 'summary' in res['result'].get('result', {}):
                    context_parts.append(f"[Wikipedia Summary]:\n{res['result']['result']['summary']}")
                elif 'content' in res['result']:
                    content = base64.b64decode(res['result']['content']).decode('utf-8')
                    context_parts.append(f"[Local File Content]:\n{content}")
        
        context_str = "\n\n---\n\n".join(context_parts)
        prompt = f"Based on the following context from multiple sources, please provide a comprehensive answer to the user's question.\n\nContext:\n{context_str}\n\nQuestion: {question}\n\nAnswer:"

        logging.info("Sending request to OpenAI...")
        response = await openai.ChatCompletion.acreate(
            model="gpt-4",
            messages=[{"role": "system", "content": "You are a helpful research assistant."}, {"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content

async def main():
    api_key = os.getenv("OPENAI_API_KEY")
    if not api_key:
        print("Error: OPENAI_API_KEY environment variable not set.")
        return

    # 1. 创建 Multiplexer
    mux = McpMultiplexer()

    # 2. 定义并添加 Server 连接
    fs_conn = StdioMcpConnection("fs", "python3 main.py", "../fs_server")
    wiki_conn = StdioMcpConnection("wiki", "python3 main.py", "../wiki_server")
    mux.add_connection(fs_conn)
    mux.add_connection(wiki_conn)

    # 3. 创建 AI 助理
    assistant = AiResearchAssistant(mux, api_key)

    try:
        # 启动所有 Server
        await mux.start_all()
        await asyncio.sleep(2) # 等待 Server 初始化
        logging.info("All servers started. AI Research Assistant is ready.")

        # 交互式问答
        while True:
            question = await asyncio.to_thread(input, "\nAsk your research question (or type 'exit'): ")
            if question.lower() == 'exit': break
            answer = await assistant.answer(question)
            print(f"\n🤖 Assistant:\n{answer}")

    finally:
        logging.info("Shutting down all servers...")
        await mux.stop_all()

if __name__ == "__main__":
    asyncio.run(main())

5.4 运行与见证奇迹

现在,所有组件都已就绪。我们将启动我们的AI研究助理,并向它提出一个需要综合多个数据源才能回答的问题。

操作步骤:

  1. 设置OpenAI API密钥。在你的终端中,确保设置了环境变量:

    export OPENAI_API_KEY="your-openai-api-key"
    
  2. 激活虚拟环境。确保你位于ai-research-assistant目录下,并已激活.venv

  3. 启动AI研究助理。在终端中执行:

    cd assistant_host
    python3 main.py
    
  4. 观察启动日志。你会看到类似下面的输出,表明Host已成功启动了两个独立的Server子进程:

    2023-10-27 12:00:00,123 - ASSISTANT - INFO - Connection 'fs' started.
    2023-10-27 12:00:00,124 - ASSISTANT - INFO - Connection 'wiki' started.
    2023-10-27 12:00:00,125 - ASSISTANT - INFO - [fs_STDERR] 2023-10-27 12:00:00,124 - File System Server started. Waiting for requests...
    2023-10-27 12:00:00,126 - ASSISTANT - INFO - [wiki_STDERR] 2023-10-27 12:00:00,125 - Wikipedia Server started. Waiting for requests...
    2023-10-27 12:00:02,130 - ASSISTANT - INFO - All servers started. AI Research Assistant is ready.
    
    Ask your research question (or type 'exit'): 
    
  5. 提出一个综合性问题。现在,输入那个需要跨源信息的问题:

    Ask your research question (or type 'exit'): Can you explain the Transformer model and relate it to the notes in 'attention_is_all_you_need.md'?
    
  6. 见证魔法。AI助理会:

    • wiki_server请求“transformer model”的摘要。
    • fs_server请求读取notes/attention_is_all_you_need.md文件的内容。
    • 将两个结果合并为上下文,发送给OpenAI GPT-4。
    • 最后,打印出由大模型生成的、综合了在线知识和本地笔记的、连贯的答案。

    你可能会得到类似这样的回答:

    🤖 Assistant:
    Of course. The Transformer model, as described in the knowledge base, is a deep learning architecture introduced in the paper 'Attention Is All You Need'. Its key innovation is the self-attention mechanism, which allows it to process entire input sequences simultaneously, leading to significant gains in training efficiency through parallelization.
    
    This directly corresponds with your local notes from 'attention_is_all_you_need.md', which state that the core idea of the paper is indeed the attention mechanism. By combining these two sources, we can understand that the Transformer model's architecture fundamentally relies on the concept of 'attention' to weigh the importance of different words in the input sequence, moving away from the recurrent or convolutional layers that were common in sequence-to-sequence tasks before it.
    

5.5 教程终章:回顾与展望

恭喜你!你已经成功完成了MCP教程的“实践项目”,并构建了一个真正意义上的、上下文感知的、多源AI应用。这个项目不仅是一个技术练习,更是对MCP核心价值的一次深刻体验。

我们在这五章中学到了什么?

  1. 核心哲学:我们理解了AI应用开发中的“巴别塔”困境,以及MCP如何借鉴LSP的思想,通过解耦AI的“思考大脑”与“感知/行动”能力来解决这个问题。
  2. 协议规范:我们学习了MCP基于JSON-RPC 2.0的通信方式,以及fs/*project/*等核心能力的定义。
  3. Server构建:我们掌握了如何从零开始,使用Python构建一个安全的、功能明确的MCP Server,将任何数据或工具封装成标准化的能力。
  4. Host构建:我们学会了如何构建MCP Host,无论是简单的命令行工具,还是复杂的、能与Server子进程通过stdio进行健壮通信的异步应用。
  5. 系统集成:在最终的项目中,我们将所有部分组合在一起,构建了一个能与多个异构Server并行通信的复杂Host,真正释放了MCP作为“通用数据总线”的潜力。

未来可以走向何方?

这个实践项目仅仅是一个开始。基于MCP的架构,你可以轻松地扩展你的AI研究助理:

  • 接入数据库:编写一个新的sql_server.py,实现一个project/executeQuery工具,让你的AI能查询公司的销售数据。
  • 连接Notion/Slack:编写一个notion_server.py,实现project/searchPages,让AI能搜索你的团队知识库。
  • 构建RAG系统:将文件系统Server替换为一个更强大的RAG(Retrieval-Augmented Generation)Server,它内部使用向量数据库来提供更智能的文档检索能力。
  • Agent间协作:想象一下,每个Agent本身都是一个MCP Server,暴露自己的特殊能力。一个“主管Agent”(Host)可以通过MCP来协调多个“专家Agent”(Servers)共同完成一个复杂任务。

Model Context Protocol为我们打开了一扇通往模块化、可扩展、可互操作的AI应用生态系统的大门。掌握了它,你就不再是为每个AI应用重复造轮子,而是在一个标准化的、可组合的框架上进行创新。希望这个教程能为你未来的AI开发之旅,提供一块坚实的基石。

Logo

更多推荐