mcp-server案例分享
MCP Server 的架构与工作原理
MCP Server 采用客户端-服务器(Client-Server)架构,其中客户端(MCP Client)负责与服务器建立连接,发起请求,而服务器端则处理请求并返回响应。这种架构确保了数据交互的高效性与安全性。例如,客户端可以向服务器发送请求,如“查询数据库中的某个记录”或“调用某个API”,而服务器则根据请求类型,调用相应的资源或工具,完成任务并返回结果。

MCP Server 支持动态发现和实时更新机制。例如,当新的资源或工具被添加到服务器时,客户端可以自动感知并使用这些新功能,从而提高系统的灵活性和扩展性

MCP Server 的主要功能
资源暴露与工具提供:
MCP Server 可以将本地文件、数据库、API等资源作为数据实体暴露给AI模型,同时提供工具功能,帮助AI完成复杂任务,如数据检索、内容生成、实时更新等。例如,它支持对MySQL、PostgreSQL等数据库的查询和操作,也支持对本地文件系统的读写和目录管理。
会话管理与动态通知:
MCP Server 能够管理客户端与服务器的连接,确保会话的时效性和稳定性,同时通过实时推送机制,将最新的资源信息及时传递给AI模型,以保证数据的准确性和实时性。
安全性与隐私保护:
MCP Server 采用加密认证和访问控制机制,确保数据传输的安全性,避免敏感信息泄露。例如,它支持本地运行,避免将敏感数据上传至第三方平台,从而保护用户隐私。
标准化与模块化:
MCP Server 提供了标准化的通信协议,支持两种传输协议(STDIO和SSE),并允许开发者通过插件扩展功能,使其具备灵活性和扩展性。例如,它支持通过HTTP标准POST请求与客户端进行交互,同时支持WebSocket实现实时数据推送。
多场景应用:
MCP Server 可以应用于多种场景,包括但不限于:
本地资源集成:如文件操作、数据库管理、API调用等。
云服务交互:如与GitHub、Slack、Google Drive等云服务的集成。
AI助手扩展:如为ChatGPT等AI助手提供上下文支持和工具调用能力
目前mcp-server发展速度非常快。在短短1个多月的时间目前mcp-server已经发展超过5000个mcp-server

前端时间也给大家分享过一个速来围观!vs code + cline 联手 MCP-server,解锁大模型万物互联新玩法! 使用cline 实现mysql数据库的mcp-server 的一个案例。之前的这个案例主要介绍了如何使用这个mcp-server。今天给大家带来的是我们基于一个叫做fastapi_mcp的一个框架实现即梦AI 文生视频的一个mcp-server.那么话不多说下面带大家实现这个MCP- server.

我们这里使用Cherry Studio实现这个文生视频的mcp-server

上图中我们输入需要调用的工具名称,提示词 以及需要调用文生视频mcp-server apikey后 后端模型通过意图识别判断调用了这个文生视频的mcp-server从而实现了调用即梦AI 创建一个文生视频。返回的视频链接我们可以在即梦AI 看到预览

2.MCP Server 制作
在制作这个MCP Server之前我们首选介绍一下fastapi_mcp,它的实现fastapi的mcp接口实现。项目的源码地址是

https://github.com/tadata-org/fastapi_mcp

它的主要实现参考下面的代码

from fastapi import FastAPI
from fastapi_mcp import add_mcp_server

app = FastAPI()

mcp_server = add_mcp_server(
    app,                                    # Your FastAPI app
    mount_path="/mcp",                      # Where to mount the MCP server
    name="My API MCP",                      # Name for the MCP server
    describe_all_responses=True,            # False by default. Include all possible response schemas in tool descriptions, instead of just the successful response.
    describe_full_response_schema=True      # False by default. Include full JSON schema in tool descriptions, instead of just an LLM-friendly response example.
)

# Optionally add custom tools in addition to existing APIs.
@mcp_server.tool()
async def get_server_time() -> str:
    """Get the current server time."""
    from datetime import datetime
    return datetime.now().isoformat()
运行本项目
python
运行

通过上面的函数就将我们之前对外暴露的fastap 服务转换成支持mcp -server了

我们的即梦AI文生视频的mcp -server 主要功能如下:

1. 视频生成服务
该服务通过调用极梦(剪映)的API来生成AI视频,主要功能点包括:

视频生成接口 :提供 /jimeng/generate_video/ 接口,接收文本提示词、视频宽高比、时长和帧率等参数,生成对应的AI视频
认证机制 :实现了基于Bearer Token的认证系统,通过 verify_auth_token 函数验证请求的合法性
视频处理流程 :
调用极梦API生成视频
轮询检查视频生成状态
下载生成的视频到本地临时存储
上传视频到腾讯云COS对象存储
返回视频URL和预览信息
2. MCP工具集成
服务集成了FastAPI MCP(Model Control Protocol)框架,提供了可被其他服务调用的工具:

MCP服务器配置 :通过 add_mcp_server 将服务注册为MCP服务
视频生成工具 :提供 generate_video_mcp 工具,可以被其他支持MCP协议的服务(如AI助手)调用
3. 辅助功能
文件管理 :
生成带时间戳的唯一文件名
下载视频到本地
上传视频到腾讯云COS
清理临时文件
配置管理 :从配置文件读取API密钥、存储路径等信息
日志记录 :详细记录API调用、视频生成过程和错误信息
4. 错误处理
完善的异常处理机制,包括API调用失败、视频生成超时、文件处理错误等情况
返回标准化的HTTP错误响应
技术特点
使用FastAPI构建高性能异步API
集成腾讯云COS对象存储服务
实现MCP协议支持,便于与AI系统集成
完善的日志和错误处理机制
这个服务主要用于根据文本提示词生成AI视频,并提供标准化的接口供其他系统调用。
以上这个服务代码主要是基于我之前写的即梦AI 文生视频fastapi 服务端接口服务。主体功能还是原来的代码逻辑只是增加了一下上面代码中fastapi_mcp、@mcp_server.tool() 从而实现fastapi_mcp功能。

改造后的代码如下:

jimeng_video_service.py

from fastapi import FastAPI, HTTPException,Depends, Header
from pydantic import BaseModel
import logging
import time
import requests
import uuid
import configparser
import json
import os
import datetime
import random
from qcloud_cos import CosConfig
from qcloud_cos import CosS3Client
from fastapi_mcp import add_mcp_server

# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(
    title="Jimeng Video Service API",
    description="一个用于生成AI视频的服务 API",
    version="1.0.0",
)

# 读取配置文件
config = configparser.ConfigParser()
# 在读取配置文件部分添加 COS 配置
config.read('f:\\work\\code\\2024pythontest\\jimeng\\config.ini', encoding='utf-8')

# 添加 COS 配置读取
region = config.get('common', 'region')
secret_id = config.get('common', 'secret_id')
secret_key = config.get('common', 'secret_key')
bucket = config.get('common', 'bucket')
# 在读取 video_output_path 后添加目录检查和创建逻辑
video_output_path = config.get('common', 'video_output_path')
if not os.path.exists(video_output_path):
    os.makedirs(video_output_path)
    logger.info(f"创建视频输出目录: {video_output_path}")

class VideoRequest(BaseModel):
    prompt: str
    aspect_ratio: str = "16:9"
    duration_ms: int = 5000
    fps: int = 24

def verify_auth_token(authorization: str = Header(None)):
    """验证 Authorization Header 中的 Bearer Token"""
    if not authorization:
        raise HTTPException(status_code=401, detail="Missing Authorization Header")
    
    scheme, _, token = authorization.partition(" ")
    if scheme.lower() != "bearer":
        raise HTTPException(status_code=401, detail="Invalid Authorization Scheme")
    
    # 从配置文件读取有效token列表
    valid_tokens = json.loads(config.get('auth', 'valid_tokens'))
    if token not in valid_tokens:
        raise HTTPException(status_code=403, detail="Invalid or Expired Token")
    
    return token

# 修改视频生成接口,添加鉴权依赖
# 添加新的辅助函数
def generate_timestamp_filename_for_video(extension='mp4'):
    timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
    random_number = random.randint(1000, 9999)
    filename = f"video_{timestamp}_{random_number}.{extension}"
    return filename

def download_video(url, output_path):
    response = requests.get(url, stream=True)
    response.raise_for_status()
    
    filename = generate_timestamp_filename_for_video()
    file_path = os.path.join(output_path, filename)
    
    with open(file_path, 'wb') as file:
        for chunk in response.iter_content(chunk_size=8192):
            if chunk:
                file.write(chunk)
    
    return filename, file_path

def upload_to_cos(region, secret_id, secret_key, bucket, file_name, base_path):
    config = CosConfig(
        Region=region,
        SecretId=secret_id,
        SecretKey=secret_key
    )
    client = CosS3Client(config)
    file_path = os.path.join(base_path, file_name)
    
    response = client.upload_file(
        Bucket=bucket,
        LocalFilePath=file_path,
        Key=file_name,
        PartSize=10,
        MAXThread=10,
        EnableMD5=False
    )
    
    if response['ETag']:
        url = f"https://{bucket}.cos.{region}.myqcloud.com/{file_name}"
        return url
    return None

# 修改 generate_video 函数中的返回部分
@app.post("/jimeng/generate_video/")
async def generate_video(request: VideoRequest, auth_token: str = Depends(verify_auth_token)):
    try:
        logger.info(f"generate_video API 调用开始,提示词: {request.prompt}")
        start_time = time.time()
        
        # 从配置文件中获取视频API相关配置
        video_api_cookie = config.get('video_api', 'cookie')
        video_api_sign = config.get('video_api', 'sign')
        
        # 初始化视频生成API相关配置
        video_api_headers = {
            'accept': 'application/json, text/plain, */*',
            'accept-language': 'zh-CN,zh;q=0.9',
            'app-sdk-version': '48.0.0',
            'appid': '513695',
            'appvr': '5.8.0',
            'content-type': 'application/json',
            'cookie': video_api_cookie,
            'device-time': str(int(time.time())),
            'lan': 'zh-Hans',
            'loc': 'cn',
            'origin': 'https://jimeng.jianying.com',
            'pf': '7',
            'priority': 'u=1, i',
            'referer': 'https://jimeng.jianying.com/ai-tool/video/generate',
            'sec-ch-ua': '"Google Chrome";v="129", "Not=A?Brand";v="8", "Chromium";v="129"',
            'sec-ch-ua-mobile': '?0',
            'sec-ch-ua-platform': '"Windows"',
            'sec-fetch-dest': 'empty',
            'sec-fetch-mode': 'cors',
            'sec-fetch-site': 'same-origin',
            'sign': video_api_sign,
            'sign-ver': '1',
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36'
        }
        video_api_base = "https://jimeng.jianying.com/mweb/v1"
        
        # 生成唯一的submit_id
        submit_id = str(uuid.uuid4())
        
        # 准备请求数据
        generate_video_payload = {
            "submit_id": submit_id,
            "task_extra": "{\"promptSource\":\"custom\",\"originSubmitId\":\"0340110f-5a94-42a9-b737-f4518f90361f\",\"isDefaultSeed\":1,\"originTemplateId\":\"\",\"imageNameMapping\":{},\"isUseAiGenPrompt\":false,\"batchNumber\":1}",
            "http_common_info": {"aid": 513695},
            "input": {
                "video_aspect_ratio": request.aspect_ratio,
                "seed": 2934141961,
                "video_gen_inputs": [
                    {
                        "prompt": request.prompt,
                        "fps": request.fps,
                        "duration_ms": request.duration_ms,
                        "video_mode": 2,
                        "template_id": ""
                    }
                ],
                "priority": 0,
                "model_req_key": "dreamina_ic_generate_video_model_vgfm_lite"
            },
            "mode": "workbench",
            "history_option": {},
            "commerce_info": {
                "resource_id": "generate_video",
                "resource_id_type": "str",
                "resource_sub_type": "aigc",
                "benefit_type": "basic_video_operation_vgfm_lite"
            },
            "client_trace_data": {}
        }

        # 发送生成视频请求
        generate_video_url = f"{video_api_base}/generate_video?aid=513695"
        logger.info(f"发送视频生成请求...")
        
        response = requests.post(generate_video_url, headers=video_api_headers, json=generate_video_payload)
        if response.status_code != 200:
            raise HTTPException(status_code=500, detail=f"视频生成请求失败,状态码:{response.status_code}")
            
        response_data = response.json()
        if not response_data or "data" not in response_data or "aigc_data" not in response_data["data"]:
            raise HTTPException(status_code=500, detail="视频生成接口返回格式错误")
            
        task_id = response_data["data"]["aigc_data"]["task"]["task_id"]
        logger.info(f"视频生成任务已创建,任务ID: {task_id}")
        
        # 轮询检查视频生成状态
        mget_generate_task_url = f"{video_api_base}/mget_generate_task?aid=513695"
        mget_generate_task_payload = {"task_id_list": [task_id]}
        
        # 最多尝试30次,每次间隔2秒
        for attempt in range(30):
            time.sleep(2)
            logger.info(f"检查视频状态,第 {attempt + 1} 次尝试...")
            
            response = requests.post(mget_generate_task_url, headers=video_api_headers, json=mget_generate_task_payload)
            if response.status_code != 200:
                logger.warning(f"状态检查失败,状态码:{response.status_code}")
                continue
            
            response_data = response.json()
            if not response_data or "data" not in response_data or "task_map" not in response_data["data"]:
                logger.warning("状态检查返回格式错误")
                continue
            
            task_data = response_data["data"]["task_map"].get(task_id)
            if not task_data:
                logger.warning(f"未找到任务 {task_id} 的状态信息")
                continue
            
            task_status = task_data.get("status")
            logger.info(f"任务状态: {task_status}")
            
            if task_status == 50:  # 视频生成完成
                if "item_list" in task_data and task_data["item_list"] and "video" in task_data["item_list"][0]:
                    video_data = task_data["item_list"][0]["video"]
                    if "transcoded_video" in video_data and "origin" in video_data["transcoded_video"]:
                        video_url = video_data["transcoded_video"]["origin"]["video_url"]
                        elapsed_time = time.time() - start_time
                        logger.info(f"视频生成成功,耗时 {elapsed_time:.2f} 秒,URL: {video_url}")
                        
                        # 下载视频到本地
                        try:
                            filename, file_path = download_video(video_url, video_output_path)
                            logger.info(f"视频已下载到本地: {file_path}")
                            
                            # 上传到腾讯 COS
                            cos_url = upload_to_cos(region, secret_id, secret_key, bucket, filename, video_output_path)
                            if cos_url:
                                logger.info(f"视频已上传到 COS: {cos_url}")
                                # 删除本地文件
                                os.remove(file_path)
                                return {
                                    "video_url": cos_url,
                                    "task_id": task_id,
                                    "markdown": f"<video controls><source src='{cos_url}' type='video/mp4'>视频预览</video>"
                                }
                            else:
                                raise HTTPException(status_code=500, detail="上传视频到 COS 失败")
                        except Exception as e:
                            logger.error(f"处理视频文件失败: {str(e)}")
                            raise HTTPException(status_code=500, detail=f"处理视频文件失败: {str(e)}")

                raise HTTPException(status_code=500, detail="视频生成完成但未找到下载地址")
                
        raise HTTPException(status_code=500, detail="视频生成超时")
        
    except Exception as e:
        logger.error(f"视频生成失败: {str(e)}")
        raise HTTPException(status_code=500, detail=str(e))

# 修改 MCP 服务器配置
mcp_server = add_mcp_server(
    app,
    mount_path="/mcp",
    name="Jimeng Video MCP",
    description="集成了智能视频生成功能的 MCP 服务",
    base_url="http://localhost:8088"
)

# 添加自定义 MCP 工具
@mcp_server.tool()
async def generate_video_mcp(
    prompt: str,
    aspect_ratio: str = "16:9",
    duration_ms: int = 5000,
    fps: int = 24,
    authorization: str = Header(...)
) -> dict:
    """
    生成一个基于文本提示的 AI 视频。

    Args:
        prompt: 用于生成视频的文本提示词
        aspect_ratio: 视频宽高比,默认为 "16:9"
        duration_ms: 视频时长(毫秒),默认为 5000
        fps: 视频帧率,默认为 24
        authorization: Bearer token 用于认证(必填)

    Returns:
        dict: 包含以下字段的字典:
            - video_url: 生成视频的 URL
            - task_id: 任务 ID
            - markdown: 视频预览的 markdown 代码
    """
    request = VideoRequest(
        prompt=prompt,
        aspect_ratio=aspect_ratio,
        duration_ms=duration_ms,
        fps=fps
    )
    return await generate_video(request, auth_token=verify_auth_token(authorization))

if __name__ == "__main__":
    import uvicorn
    # 修改启动配置
    uvicorn.run(
        app,
        host="0.0.0.0",
        port=8088,
        log_level="info",
        reload=False  # 禁用热重载以避免初始化问题
    )
运行本项目
python
运行


上述代码编写完成后我们启动它

3.MCP Server 运行
点击trae run python file 完成 服务端启动

当然你也可以使用python jimeng_video_service.py 启动这个服务

上述启动和我们普通的fastapi 服务端启动类似。启动后他对外提供一个叫做http://localhost:8088/mcp sse服务

4 .Cherry Studio 配置mcp server
我们下载最新的Cherry Studio,这里为什么下载最新的 我记得早期1.0之前的版本好像是不支持mcp server的 新版本是支持的,支持不支持大家可以看这个

点开这里,我们添加一个服务器,选择SSE

我们在URL 填写上面创建的SSE服务器地址http://localhost:8088/mcp 点击保存按钮完成设置。这里点击保存后,客户端Cherry Studio会想服务器发起请求。

看到这个就说明客户端和服务器之间产生通讯。

5 .Cherry Studio 调用mcp server
这里我们选择一个模型支持function call 的模型,怎么判断呢模型设置里面有个小工具按钮,我们这里拿火山引擎提供的deepseek-V3模型作为案例

打开一个聊天对话窗口 选择火山引擎提供的deepseek-V3

选择模型后下面聊天窗口就会多出MCP Server小窗口,我们选择开启MCP服务器并开启我们需要的文生视频MCPserver

以上设置完成后,我们就可以进入聊天对话界面了。为了方便测试我们输入以下提示词

请帮我调用即梦AI文生视频mcpserver 工具,用户输入的提示词“小马过河”,需要的authorization 为”bearer sk-zhouhui1122444”
运行本项目
1
因为mcpserver 需要2个必填参数 一个是提示词,一个是鉴权,所以这2个值需要用户输入。当然你也可以一个一个填写通过模型引导你填入。我这里偷懒就直接把需要的2个参考一并告诉模型,这样它就不需要和客户交互直接执行函数调用了。怎么查看必填项呢

我们可以在之前的mcpserver 设置查看到

这2个必填项主要是服务端代码来实现的。

填写提示词后 deepseek-v3模型会通过意图识别自动开启函数调用

在调用过程中我们也可以看后端程序的运行,直到后面返回结果给前端

这里比较遗憾的是cherry studio 不知道怎么实现视频的预览,所以需要我们把生成的视频URL 链接复制到浏览器上重新下载才能看到效果。
 

Logo

更多推荐