多模态RAG系统开发实战:从PDF解析到智能问答

本文将详细讲解一个完整的多模态RAG(检索增强生成)系统的实现,该系统能够处理PDF文件,提取文本和图片信息,构建向量索引,并基于检索结果进行智能问答。我们将从系统架构、核心功能到代码实现进行全面解析。

系统架构概览

该多模态RAG系统主要由四个核心模块组成:

  1. PDF处理服务:负责PDF上传、解析、页面渲染和Markdown转换
  2. 索引服务:构建和管理向量索引,支持高效相似性检索
  3. RAG服务:实现检索-增强-生成的核心逻辑,处理问答流程
  4. API接口层:提供RESTful API和SSE流接口,供前端调用

系统文件结构如下:

.
├── app.py               # API接口层,FastAPI应用
├── services/
│   ├── pdf_service.py   # PDF处理服务
│   ├── index_service.py # 向量索引服务
│   └── rag_service.py   # RAG核心服务
└── data/                # 数据存储目录
    └── {file_id}/       # 每个文件的工作目录
        ├── original.pdf # 原始PDF文件
        ├── output.md    # 转换后的Markdown
        ├── index_faiss/ # FAISS索引文件
        ├── pages/       # 页面图片
        │   ├── original/ # 原始页面渲染
        │   └── parsed/  # 带布局标记的页面
        └── images/      # 提取的图片

核心功能模块解析

1. PDF处理服务 (pdf_service.py)

该模块负责PDF文件的上传、解析和转换,是整个系统的数据源入口。

# services/pdf_service.py (核心代码片段)
def workdir(file_id: str) -> Path:
    """获取文件工作目录,不存在则创建"""
    d = DATA_ROOT / file_id
    d.mkdir(parents=True, exist_ok=True)
    return d

def save_upload(file_id: str, upload_bytes: bytes, filename: str) -> Dict[str, Any]:
    """保存上传的PDF,并返回页数"""
    pdf_path = original_pdf_path(file_id)
    pdf_path.write_bytes(upload_bytes)
    with fitz.open(pdf_path) as doc:
        pages = doc.page_count
    return {"fileId": file_id, "name": filename, "pages": pages}

def pdf_to_markdown(file_id: str):
    """将PDF转换为Markdown,提取文本、表格和图片"""
    pdf_path = str(original_pdf_path(file_id))
    out_md = markdown_output(file_id)
    img_dir = images_dir(file_id)

    # 使用unstructured库解析PDF
    elements = partition_pdf(
        filename=pdf_path,
        infer_table_structure=True,
        strategy="hi_res",
        ocr_languages="chi_sim+eng",
        ocr_engine="paddleocr"
    )

    # 提取图片并保存
    image_map = {}
    with fitz.open(pdf_path) as doc:
        for page_num, page in enumerate(doc, start=1):
            image_map[page_num] = []
            for img_index, img in enumerate(page.get_images(full=True), start=1):
                # 处理并保存图片...
                image_map[page_num].append(img_path.name)

    # 构建Markdown内容
    md_lines: List[str] = []
    # 处理不同类型的元素(标题、文本、表格、图片等)
    # ...

    out_md.write_text("\n".join(md_lines), encoding="utf-8")
    return {"markdown": out_md.name, "images_dir": "images"}

def run_full_parse_pipeline(file_id: str) -> Dict[str, Any]:
    """完整解析流程:渲染页面 → 提取布局 → 生成Markdown"""
    render_original_pages(file_id)
    docs = unstructured_segments(file_id)
    render_parsed_pages_with_boxes(file_id, docs)
    md_info = pdf_to_markdown(file_id)
    return {"md": md_info["markdown"]}

核心功能

  • 为每个PDF文件创建独立工作目录,便于管理
  • 将PDF渲染为图片,支持原始视图和带布局标记的视图
  • 使用OCR技术提取PDF中的文本(支持中英文)
  • 识别并提取表格、图片等元素
  • 将提取的内容转换为结构化的Markdown格式
    这段代码是一个PDF处理服务的核心实现,主要功能是接收上传的PDF文件,将其转换为Markdown格式,并提取其中的文本、表格和图片等内容。让我来详细讲解各个部分:
1. 工作目录管理
def workdir(file_id: str) -> Path:
    """获取文件工作目录,不存在则创建"""
    d = DATA_ROOT / file_id
    d.mkdir(parents=True, exist_ok=True)
    return d

这个函数负责为每个PDF文件创建独立的工作目录:

  • 接收一个唯一的file_id作为参数
  • 基于根目录DATA_ROOTfile_id构建路径
  • 使用mkdir创建目录,parents=True允许创建多级目录,exist_ok=True避免目录已存在时的错误
  • 返回创建的目录路径
2. 保存上传的PDF文件
def save_upload(file_id: str, upload_bytes: bytes, filename: str) -> Dict[str, Any]:
    """保存上传的PDF,并返回页数"""
    pdf_path = original_pdf_path(file_id)
    pdf_path.write_bytes(upload_bytes)
    with fitz.open(pdf_path) as doc:
        pages = doc.page_count
    return {"fileId": file_id, "name": filename, "pages": pages}

这个函数处理用户上传的PDF文件:

  • 接收file_id、PDF字节数据和原始文件名
  • 将字节数据写入到指定路径(original_pdf_path生成的路径)
  • 使用fitz库(PyMuPDF)打开PDF,获取总页数
  • 返回包含文件ID、名称和页数的字典
3. PDF转Markdown的核心功能
def pdf_to_markdown(file_id: str):
    """将PDF转换为Markdown,提取文本、表格和图片"""
    pdf_path = str(original_pdf_path(file_id))
    out_md = markdown_output(file_id)
    img_dir = images_dir(file_id)

    # 使用unstructured库解析PDF
    elements = partition_pdf(
        filename=pdf_path,
        infer_table_structure=True,
        strategy="hi_res",
        ocr_languages="chi_sim+eng",
        ocr_engine="paddleocr"
    )

这部分代码是核心转换功能的开始:

  • 获取PDF路径、Markdown输出路径和图片保存目录
  • 使用unstructured库的partition_pdf函数解析PDF
  • 配置包括:启用表格结构推断、使用高分辨率策略、支持中英文OCR、使用paddleocr引擎
    # 提取图片并保存
    image_map = {}
    with fitz.open(pdf_path) as doc:
        for page_num, page in enumerate(doc, start=1):
            image_map[page_num] = []
            for img_index, img in enumerate(page.get_images(full=True), start=1):
                # 处理并保存图片...
                image_map[page_num].append(img_path.name)

这部分负责提取PDF中的图片:

  • 创建image_map字典记录每页包含的图片
  • 使用fitz库打开PDF,遍历每一页
  • 提取页面中的所有图片并保存
  • 将图片信息记录到image_map中,建立页面与图片的关联
    # 构建Markdown内容
    md_lines: List[str] = []
    # 处理不同类型的元素(标题、文本、表格、图片等)
    # ...

    out_md.write_text("\n".join(md_lines), encoding="utf-8")
    return {"markdown": out_md.name, "images_dir": "images"}

最后构建并保存Markdown内容:

  • 处理解析得到的各种元素(标题、文本、表格、图片等)
  • 将这些元素转换为对应的Markdown格式
  • 将最终的Markdown内容写入文件
  • 返回包含Markdown文件名和图片目录的字典
4. 完整解析流程
def run_full_parse_pipeline(file_id: str) -> Dict[str, Any]:
    """完整解析流程:渲染页面 → 提取布局 → 生成Markdown"""
    render_original_pages(file_id)
    docs = unstructured_segments(file_id)
    render_parsed_pages_with_boxes(file_id, docs)
    md_info = pdf_to_markdown(file_id)
    return {"md": md_info["markdown"]}

这个函数定义了完整的PDF解析流程:

  1. 渲染原始页面(render_original_pages
  2. 提取非结构化片段(unstructured_segments
  3. 渲染带有内容框的解析页面(render_parsed_pages_with_boxes
  4. 调用pdf_to_markdown生成Markdown
  5. 返回包含Markdown信息的结果
整体总结

这段代码实现了一个功能完整的PDF处理服务,主要技术点包括:

  • 使用文件ID进行文件隔离,每个PDF有独立的工作目录
  • 结合unstructured库进行PDF内容解析
  • 使用PyMuPDF(fitz)处理PDF文件和提取图片
  • 集成OCR功能,支持中英文识别
  • 将PDF内容(文本、表格、图片)转换为结构化的Markdown格式

这个服务可以应用于需要将PDF文档转换为易于处理的文本格式的场景,如文档检索、内容分析等。

2. 索引服务 (index_service.py)

该模块负责将解析后的文本内容转换为向量索引,支持高效的相似性检索。

# services/index_service.py (核心代码片段)
def load_embeddings() -> OpenAIEmbeddings:
    """加载嵌入模型,支持代理配置"""
    api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_EMBEDDING_API_KEY")
    base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENAI_EMBEDDING_BASE_URL")
    kwargs = {"api_key": api_key}
    if base_url:
        kwargs["base_url"] = base_url
    return OpenAIEmbeddings(model="text-embedding-3-large", **kwargs)

def split_markdown(md_text: str) -> List[Document]:
    """将Markdown文本按标题分割为文档片段"""
    headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        # 可添加更细粒度的分割
    ]
    splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
    docs = splitter.split_text(md_text)
    
    # 清洗文档,过滤空内容和过长文本
    cleaned: List[Document] = []
    for d in docs:
        txt = (d.page_content or "").strip()
        if not txt:
            continue
        if len(txt) > 8000:
            txt = txt[:8000]
        cleaned.append(Document(page_content=txt, metadata=d.metadata))
    return cleaned

def build_faiss_index(file_id: str) -> Dict[str, Any]:
    """构建FAISS向量索引"""
    md_file = markdown_path(file_id)
    if not md_file.exists():
        return {"ok": False, "error": "MARKDOWN_NOT_FOUND"}
    md_text = md_file.read_text(encoding="utf-8")

    docs = split_markdown(md_text)
    if not docs:
        return {"ok": False, "error": "EMPTY_MD"}

    embeddings = load_embeddings()
    vs = FAISS.from_documents(docs, embedding=embeddings)
    vs.save_local(str(index_dir(file_id)))
    return {"ok": True, "chunks": len(docs)}

def search_faiss(file_id: str, query: str, k: int = 5) -> Dict[str, Any]:
    """检索与查询相关的文档片段"""
    idx = index_dir(file_id)
    if not (idx / "index.faiss").exists():
        return {"ok": False, "error": "INDEX_NOT_FOUND"}

    embeddings = load_embeddings()
    vs = FAISS.load_local(str(idx), embeddings, allow_dangerous_deserialization=True)
    hits = vs.similarity_search_with_score(query, k=k)
    results = []
    for doc, score in hits:
        results.append({
            "text": doc.page_content,
            "score": float(score),
            "metadata": doc.metadata,
        })
    return {"ok": True, "results": results}

核心功能

  • 加载嵌入模型(使用OpenAI兼容接口)
  • 将Markdown文本按标题层级分割为语义片段
  • 使用FAISS构建向量索引,支持高效相似性检索
  • 提供索引构建和检索的基础接口

这段是一个处理文档向量索引和检索的服务代码,主要功能是将Markdown文档转换为向量索引,并提供基于语义的检索能力。让我详细讲解各个部分:

1. 嵌入模型加载
def load_embeddings() -> OpenAIEmbeddings:
    """加载嵌入模型,支持代理配置"""
    api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_EMBEDDING_API_KEY")
    base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENAI_EMBEDDING_BASE_URL")
    kwargs = {"api_key": api_key}
    if base_url:
        kwargs["base_url"] = base_url
    return OpenAIEmbeddings(model="text-embedding-3-large", **kwargs)

这个函数负责加载OpenAI的嵌入模型:

  • 从环境变量获取API密钥,支持主密钥和专用嵌入密钥
  • 支持配置基础URL(可用于代理或私有部署)
  • 使用"text-embedding-3-large"模型,这是OpenAI的高性能嵌入模型
  • 返回配置好的嵌入模型实例,用于后续的文本向量化处理
2. Markdown文档分割
def split_markdown(md_text: str) -> List[Document]:
    """将Markdown文本按标题分割为文档片段"""
    headers_to_split_on = [
        ("#", "Header 1"),
        ("##", "Header 2"),
        # 可添加更细粒度的分割
    ]
    splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
    docs = splitter.split_text(md_text)
    
    # 清洗文档,过滤空内容和过长文本
    cleaned: List[Document] = []
    for d in docs:
        txt = (d.page_content or "").strip()
        if not txt:
            continue
        if len(txt) > 8000:
            txt = txt[:8000]
        cleaned.append(Document(page_content=txt, metadata=d.metadata))
    return cleaned

这个函数将Markdown文本分割为适合嵌入的文档片段:

  • 基于Markdown标题层级进行分割(# 和 ## 级别)
  • 使用MarkdownHeaderTextSplitter专门处理Markdown格式
  • 对分割后的文档进行清洗:
    • 过滤空内容文档
    • 限制最大长度(8000字符),防止过长文本
  • 返回清洗后的文档列表,每个文档包含内容和元数据

这种分割方式保留了文档的结构信息,使后续检索更精准。

3. 构建FAISS向量索引
def build_faiss_index(file_id: str) -> Dict[str, Any]:
    """构建FAISS向量索引"""
    md_file = markdown_path(file_id)
    if not md_file.exists():
        return {"ok": False, "error": "MARKDOWN_NOT_FOUND"}
    md_text = md_file.read_text(encoding="utf-8")

    docs = split_markdown(md_text)
    if not docs:
        return {"ok": False, "error": "EMPTY_MD"}

    embeddings = load_embeddings()
    vs = FAISS.from_documents(docs, embedding=embeddings)
    vs.save_local(str(index_dir(file_id)))
    return {"ok": True, "chunks": len(docs)}

这是构建向量索引的核心函数:

  1. 检查Markdown文件是否存在,不存在则返回错误
  2. 读取Markdown文本内容
  3. 使用split_markdown函数分割文本为文档片段
  4. 加载嵌入模型
  5. 使用FAISS库从文档构建向量存储(FAISS.from_documents
  6. 将构建好的索引保存到本地目录
  7. 返回成功状态和分割的片段数量

FAISS是Facebook开发的高效向量检索库,适合存储和查询高维向量。

4. 向量检索功能
def search_faiss(file_id: str, query: str, k: int = 5) -> Dict[str, Any]:
    """检索与查询相关的文档片段"""
    idx = index_dir(file_id)
    if not (idx / "index.faiss").exists():
        return {"ok": False, "error": "INDEX_NOT_FOUND"}

    embeddings = load_embeddings()
    vs = FAISS.load_local(str(idx), embeddings, allow_dangerous_deserialization=True)
    hits = vs.similarity_search_with_score(query, k=k)
    results = []
    for doc, score in hits:
        results.append({
            "text": doc.page_content,
            "score": float(score),
            "metadata": doc.metadata,
        })
    return {"ok": True, "results": results}

这个函数提供基于语义的检索功能:

  1. 检查索引是否存在,不存在则返回错误
  2. 加载嵌入模型
  3. 从本地加载保存的FAISS索引
  4. 使用similarity_search_with_score进行相似性检索:
    • query是用户的查询文本
    • k指定返回的最相关结果数量(默认5个)
  5. 处理检索结果,包含文本内容、相似度分数和元数据
  6. 返回格式化的检索结果
整体总结

这段代码实现了一个完整的文档向量索引与检索 pipeline:

  1. 文本处理:将Markdown文档按结构分割为合理的片段
  2. 向量生成:使用OpenAI的嵌入模型将文本转换为向量
  3. 索引构建:使用FAISS构建高效的向量索引
  4. 语义检索:根据查询文本找到最相关的文档片段

这个服务通常用于构建基于文档的问答系统、智能检索工具等,能够理解文本的语义内容,而不仅仅是关键词匹配,大大提高了检索的准确性。

技术栈方面,主要使用了:

  • OpenAI的嵌入模型进行文本向量化
  • LangChain库的文档分割和处理工具
  • FAISS库进行向量存储和高效检索

3. RAG服务 (rag_service.py)

该模块实现了检索增强生成的核心逻辑,将检索到的信息与大语言模型结合,生成准确的回答。

# services/rag_service.py (核心代码片段)
# 会话历史管理
_sessions: dict[str, list[dict]] = defaultdict(list)

def get_history(session_id: str) -> list[dict]:
    return _sessions.get(session_id, [])

def append_history(session_id: str, role: str, content: str) -> None:
    _sessions[session_id].append({"role": role, "content": content})

async def retrieve(question: str, file_id: str) -> tuple[list[dict], str]:
    """检索相关文档片段并评估其相关性"""
    vs = _load_vs(file_id)
    hits = vs.similarity_search_with_score(question, k=K)
    citations = []
    ctx_snippets = []
    scores = []
    
    # 处理检索结果
    for i, (doc, score) in enumerate(hits, start=1):
        # 构建引用信息...
        citations.append(...)
        ctx_snippets.append(...)
        scores.append(float(score))
    
    context_text = "\n\n".join(ctx_snippets) if ctx_snippets else "(no hits)"

    # 规则 + LLM 复核检索结果相关性
    ok_by_score = _score_ok(scores)
    if not ok_by_score:
        grader = _get_grader()
        grade_prompt = GRADE_PROMPT.format(context=context_text, question=question)
        decision = await grader.ainvoke([{"role": "user", "content": grade_prompt}])
        ok_by_llm = "yes" in (decision.content or "").lower()
    else:
        ok_by_llm = True

    branch = "with_context" if ok_by_llm else "no_context"
    return citations, context_text if branch == "with_context" else ""

async def answer_stream(
    question: str,
    citations: list[dict],
    context_text: str,
    branch: str,
    session_id: str | None = None
) -> AsyncGenerator[dict, None]:
    """流式生成回答,并返回引用信息"""
    # 先推送引用信息
    if branch == "with_context" and citations:
        for c in citations:
            yield {"type": "citation", "data": c}

    # 构建提示信息
    llm = _get_llm()
    history_msgs = get_history(session_id) if session_id else []

    if branch == "with_context" and context_text:
        user_prompt = ANSWER_WITH_CONTEXT.format(question=question, context=context_text)
    else:
        user_prompt = ANSWER_NO_CONTEXT.format(question=question)

    # 完整消息序列:system + 历史多轮 + 当前用户
    msgs = [{"role": "system", "content": SYSTEM_INSTRUCTION}]
    msgs.extend(history_msgs)
    msgs.append({"role": "user", "content": user_prompt})

    # 流式生成回答
    final_text_parts: list[str] = []
    try:
        async for chunk in llm.astream(msgs):
            delta = getattr(chunk, "content", None)
            if delta:
                final_text_parts.append(delta)
                yield {"type": "token", "data": delta}
    except Exception:
        # 非流式回退
        resp = await llm.ainvoke(msgs)
        text = resp.content or ""
        final_text_parts.append(text)
        # 分块推送...

    # 附加相关图片
    if branch == "with_context" and citations:
        imgs = []
        for c in citations[:2]:
            url = c.get("previewUrl")
            if url:
                imgs.append(f"![参考页 {c.get('rank', '')}]({url})")
        if imgs:
            tail = "\n\n---\n**相关页面预览**\n\n" + "\n\n".join(imgs)
            yield {"type": "token", "data": tail}

    # 保存对话历史
    if session_id:
        append_history(session_id, "user", question)
        append_history(session_id, "assistant", "".join(final_text_parts))

    yield {"type": "done", "data": {"used_retrieval": branch == "with_context"}}

核心功能

  • 管理对话历史,支持多轮对话
  • 实现检索结果的相关性评估(规则+LLM)
  • 构建提示词,将检索到的上下文融入回答过程
  • 支持流式回答生成,提升用户体验
  • 在回答中插入相关图片引用,实现多模态输出

4. API接口层 (app.py)

该模块基于FastAPI实现了RESTful API和SSE流接口,提供前端访问的入口。

# app.py (核心代码片段)
app = FastAPI(
    title="九天老师公开课:多模态RAG系统API",
    version="1.0.0",
    description="多模态RAG系统开发实战后端API。"
)

# 跨域配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 聊天接口(SSE流式响应)
@app.post(f"{API_PREFIX}/chat", tags=["Chat"])
async def chat_stream(req: ChatRequest):
    """SSE 事件:token | citation | done | error"""
    async def gen():
        try:
            question = (req.message or "").strip()
            session_id = (req.sessionId or "default").strip()
            file_id = (req.pdfFileId or "").strip()

            citations, context_text = [], ""
            branch = "no_context"
            if file_id:
                try:
                    citations, context_text = await retrieve(question, file_id)
                    branch = "with_context" if context_text else "no_context"
                except FileNotFoundError:
                    branch = "no_context"

            # 推送引用和回答流
            # ...

        except Exception as e:
            # 错误处理
            # ...

    headers = {"Cache-Control": "no-cache, no-transform", "Connection": "keep-alive"}
    return StreamingResponse(gen(), media_type="text/event-stream", headers=headers)

# PDF上传接口
@app.post(f"{API_PREFIX}/pdf/upload", tags=["PDF"])
async def pdf_upload(file: UploadFile = File(...), replace: Optional[bool] = True):
    # 处理文件上传
    # ...

# 索引构建接口
@app.post(f"{API_PREFIX}/index/build", tags=["Index"])
async def index_build(req: BuildIndexRequest):
    # 构建索引
    # ...

这段代码是多模态RAG系统的API接口层实现,基于FastAPI框架开发,核心作用是对外提供标准化的接口服务,连接前端用户操作与后端业务逻辑(如PDF处理、向量检索、流式回答生成)。下面从整体架构到具体接口逐一详细讲解:

一、整体架构与核心依赖

  • 框架选择:使用FastAPI,因其支持异步请求、自动生成API文档、性能高效,且对SSE(Server-Sent Events,服务器推送事件)有良好支持,非常适合需要流式响应的AI对话场景。
  • 核心功能:提供3类核心接口(聊天流式接口、PDF上传接口、索引构建接口),并通过跨域中间件解决前后端分离架构中的跨域问题。
  • 数据流转:前端通过HTTP请求调用接口,后端将请求分发到对应业务模块(如rag_service.py的检索与回答逻辑、pdf_service.py的文件处理逻辑),最终将结果返回给前端。

二、关键初始化配置

1. FastAPI应用初始化
app = FastAPI(
    title="九天老师公开课:多模态RAG系统API",
    version="1.0.0",
    description="多模态RAG系统开发实战后端API。"
)
  • 初始化FastAPI应用实例app,并配置API的元信息(标题、版本、描述)。
  • 这些元信息会自动同步到FastAPI的默认文档页面(访问/docs/redoc可查看),方便前端开发者对接接口。
2. 跨域配置(解决前后端分离问题)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 允许所有前端域名访问(生产环境建议指定具体域名,如["https://your-frontend.com"])
    allow_credentials=True,  # 允许携带Cookie等凭证
    allow_methods=["*"],  # 允许所有HTTP方法(GET、POST、PUT等)
    allow_headers=["*"],  # 允许所有请求头(如Authorization、Content-Type)
)
  • 跨域问题的本质:浏览器的“同源策略”限制——前端(如http://localhost:3000)与后端(如http://localhost:8000)域名/端口不同时,直接请求会被拦截。
  • 解决方案:通过CORSMiddleware中间件配置跨域规则,允许前端正常调用后端接口。
  • 生产环境注意:allow_origins建议改为具体的前端域名(而非"*"),避免安全风险。

三、核心接口详解

1. 聊天流式接口(/chat)—— 核心交互接口

该接口是用户与RAG系统对话的核心入口,支持SSE流式响应(即后端逐步返回回答内容,前端实时展示,类似ChatGPT的“打字”效果),同时集成了文档检索逻辑。

接口定义
@app.post(f"{API_PREFIX}/chat", tags=["Chat"])
async def chat_stream(req: ChatRequest):
    """SSE 事件:token | citation | done | error"""
    async def gen():
        # 业务逻辑实现
        ...
    headers = {"Cache-Control": "no-cache, no-transform", "Connection": "keep-alive"}
    return StreamingResponse(gen(), media_type="text/event-stream", headers=headers)
  • 接口类型POST请求(因需要传递复杂参数,如会话ID、问题、PDF文件ID)。
  • 路由路径{API_PREFIX}/chatAPI_PREFIX是预定义的路径前缀,如/api/v1,用于统一管理接口版本)。
  • 标签tags=["Chat"]——将接口归类到“Chat”组,方便文档页面分类展示。
  • 请求参数req: ChatRequest——使用Pydantic模型ChatRequest接收前端参数(如message(用户问题)、sessionId(会话ID)、pdfFileId(目标PDF的ID)),FastAPI会自动校验参数格式并返回错误提示。
  • 响应类型StreamingResponse——FastAPI的流式响应对象,用于实现SSE。
内部逻辑拆解(gen()生成器函数)

gen()是异步生成器函数,负责处理请求并逐步生成响应内容(SSE事件),核心逻辑分为3步:

(1)参数解析与初始化
try:
    question = (req.message or "").strip()  # 获取用户问题,去除前后空格
    session_id = (req.sessionId or "default").strip()  # 获取会话ID,默认值为"default"(未传则使用默认会话)
    file_id = (req.pdfFileId or "").strip()  # 获取目标PDF的ID(用户指定基于哪个PDF回答)

    citations, context_text = [], ""  # 初始化引用信息(如文档片段来源)和上下文文本
    branch = "no_context"  # 初始化分支:默认“无上下文”(未关联PDF时)
  • 会话ID(session_id:用于关联多轮对话历史(对应rag_service.py_sessions会话管理),确保用户的多轮对话连贯(如“上一个问题的回答中提到的XX是什么意思?”)。
  • PDF文件ID(file_id:指定RAG系统基于哪个PDF文件生成回答(若为空,则系统直接基于LLM回答,不调用检索逻辑)。
(2)检索上下文(关联PDF时)
if file_id:  # 若传入了PDF的file_id,调用检索逻辑
    try:
        # 调用rag_service.py的retrieve函数:根据用户问题检索PDF的相关片段
        citations, context_text = await retrieve(question, file_id)
        # 若检索到有效上下文,切换到“有上下文”分支
        branch = "with_context" if context_text else "no_context"
    except FileNotFoundError:
        # 若PDF文件不存在(如file_id错误),仍保持“无上下文”分支
        branch = "no_context"
  • 核心逻辑:将用户问题与指定PDF关联,通过retrieve函数从PDF中检索与问题相关的片段(上下文),并生成引用信息(citations,如片段来源页码、预览链接)。
  • 分支切换:branch变量控制后续回答逻辑——“有上下文”则基于PDF片段生成回答,“无上下文”则直接让LLM生成回答(类似普通聊天机器人)。
(3)推送流式响应(SSE事件)
# 推送引用信息(先于回答内容,让用户知道回答基于哪些PDF片段)
if branch == "with_context" and citations:
    for c in citations:
        yield {"type": "citation", "data": c}  # 推送“引用”类型事件

# 调用rag_service.py的answer_stream函数,流式生成回答
async for event in answer_stream(question, citations, context_text, branch, session_id):
    yield event  # 推送“token”(回答片段)或“done”(完成)类型事件

except Exception as e:
    # 错误处理:推送“error”类型事件,告知前端异常
    yield {"type": "error", "data": f"对话出错:{str(e)}"}
  • SSE事件类型
    • citation:引用信息(如“来源:PDF第3页,片段内容:XXX”),前端可展示为“参考文档”区域。
    • token:回答的片段(如“RAG系统的核心是将检索到的文档片段融入LLM回答,…”),前端逐段拼接展示,实现“流式打字”效果。
    • done:回答完成信号,前端可关闭加载状态。
    • error:错误信息,前端可提示用户“对话出错,请重试”。
(4)响应头配置
headers = {"Cache-Control": "no-cache, no-transform", "Connection": "keep-alive"}
return StreamingResponse(gen(), media_type="text/event-stream", headers=headers)
  • media_type="text/event-stream":SSE的标准媒体类型,告知浏览器这是流式事件响应。
  • Cache-Control: no-cache:禁止浏览器缓存流式内容(避免重复展示旧内容)。
  • Connection: keep-alive:保持HTTP连接长驻(SSE需要持续推送内容,不能断开连接)。
2. PDF上传接口(/pdf/upload)—— 文档导入入口
@app.post(f"{API_PREFIX}/pdf/upload", tags=["PDF"])
async def pdf_upload(file: UploadFile = File(...), replace: Optional[bool] = True):
    # 处理文件上传
    # ...
  • 功能:接收前端上传的PDF文件,调用pdf_service.pysave_upload函数保存文件,并生成唯一的file_id(后续检索、索引构建需用到该ID)。
  • 请求参数
    • file: UploadFile = File(...):前端通过form-data形式上传的PDF文件,UploadFile是FastAPI提供的文件对象,支持读取文件字节、文件名等信息。
    • replace: Optional[bool] = True:是否覆盖已存在的同名PDF(若为True,则相同文件名的文件会被覆盖;若为False,则返回“文件已存在”错误)。
  • 返回结果:通常返回file_id、文件名、文件页数等信息(如{"ok": True, "fileId": "abc123", "name": "test.pdf", "pages": 10}),供前端后续调用索引构建接口。
3. 索引构建接口(/index/build)—— RAG检索前提
@app.post(f"{API_PREFIX}/index/build", tags=["Index"])
async def index_build(req: BuildIndexRequest):
    # 构建索引
    # ...
  • 功能:基于已上传的PDF文件(通过file_id指定),调用index_service.pybuild_faiss_index函数构建FAISS向量索引——这是RAG系统“检索”能力的前提(没有索引则无法快速匹配相关文档片段)。
  • 请求参数req: BuildIndexRequest——Pydantic模型,核心参数是file_id(指定为哪个PDF构建索引)。
  • 处理逻辑
    1. 校验file_id对应的PDF是否存在。
    2. 调用build_faiss_index函数,将PDF转换为Markdown片段、生成向量、构建FAISS索引并保存到本地。
    3. 返回索引构建结果(如{"ok": True, "chunks": 20},表示成功构建索引,共分割为20个文档片段)。
  • 注意:索引构建是“一次性操作”——PDF上传后只需构建一次索引,后续检索可直接复用索引,无需重复构建。
四、接口调用与前端交互流程

以“用户上传PDF→构建索引→基于PDF聊天”为例,完整流程如下:

  1. 前端调用/pdf/upload接口,上传PDF文件,获取file_id
  2. 前端调用/index/build接口,传入file_id,触发索引构建。
  3. 前端调用/chat接口,传入message(用户问题)、session_id(会话ID)、file_id(已构建索引的PDF)。
  4. 后端通过SSE逐步推送citation(引用)、token(回答片段)、done(完成)事件,前端实时展示。
五、核心亮点与注意事项
1. 亮点
  • 流式响应:通过SSE实现“边生成边返回”,减少用户等待感,提升交互体验。
  • 模块化设计:接口层仅负责“请求接收与结果返回”,业务逻辑 delegate 到其他模块(如rag_servicepdf_service),代码可维护性高。
  • 自动文档:FastAPI自动生成接口文档,前端无需手动编写接口文档,降低对接成本。
2. 注意事项
  • 会话管理session_id对应的会话历史存储在内存(rag_service.py_sessions),服务重启后会话会丢失——生产环境需改用Redis等持久化存储。
  • SSE连接稳定性:长连接可能因网络波动断开,前端需实现“重连逻辑”,避免对话中断。
  • 安全防护:生产环境需添加接口鉴权(如JWT令牌),防止未授权用户调用接口;同时限制file_id的有效性,避免恶意请求。

总结

该代码是多模态RAG系统的“前端入口层”,通过FastAPI将后端的检索、回答、文件处理能力封装为标准化API,实现了“用户提问→检索上下文→流式回答”的完整交互闭环。其核心价值在于:解决前后端跨域问题、支持流式响应提升体验、提供清晰的接口分类便于对接,是连接用户操作与后端业务逻辑的关键桥梁。

核心功能

  • 提供PDF上传、解析、状态查询接口
  • 实现索引构建和检索接口
  • 提供基于SSE的流式聊天接口,支持实时响应
  • 管理会话状态,支持多用户同时使用

系统工作流程

  1. PDF处理流程

    • 用户上传PDF文件 → 系统生成唯一fileId
    • 系统渲染PDF页面为图片
    • 解析PDF内容,提取文本、表格和图片
    • 将解析结果转换为Markdown格式
  2. 索引构建流程

    • 读取Markdown文件内容
    • 按标题层级分割为文本片段
    • 使用嵌入模型将文本片段转换为向量
    • 构建并保存FAISS向量索引
  3. 问答流程

    • 用户输入问题,系统获取会话历史
    • 检索与问题相关的文本片段
    • 评估检索结果的相关性
    • 构建包含上下文的提示词
    • 调用LLM生成回答,并流式返回结果
    • 在回答中插入相关图片引用
    • 保存对话历史,支持多轮对话

总结

本系统实现了一个完整的多模态RAG解决方案,能够处理PDF文件中的文本和图片信息,通过向量检索增强大语言模型的回答能力。系统采用模块化设计,各组件职责清晰,便于维护和扩展。

该系统的特点包括:

  • 多模态支持:不仅处理文本,还能提取和引用图片
  • 高效检索:使用FAISS实现快速相似性搜索
  • 流式响应:提供实时的回答生成体验
  • 会话管理:支持多轮对话,保持上下文
  • 可扩展性:各模块松耦合,便于替换或升级

通过这个系统,用户可以上传PDF文档,然后基于文档内容进行智能问答,系统会自动引用文档中的相关片段和图片,提高回答的准确性和可信度。

Logo

更多推荐