LangChain部署rag代码笔记(代码来自赋范大模型社区开源)
本系统实现了一个完整的多模态RAG解决方案,能够处理PDF文件中的文本和图片信息,通过向量检索增强大语言模型的回答能力。系统采用模块化设计,各组件职责清晰,便于维护和扩展。多模态支持:不仅处理文本,还能提取和引用图片高效检索:使用FAISS实现快速相似性搜索流式响应:提供实时的回答生成体验会话管理:支持多轮对话,保持上下文可扩展性:各模块松耦合,便于替换或升级。
多模态RAG系统开发实战:从PDF解析到智能问答
本文将详细讲解一个完整的多模态RAG(检索增强生成)系统的实现,该系统能够处理PDF文件,提取文本和图片信息,构建向量索引,并基于检索结果进行智能问答。我们将从系统架构、核心功能到代码实现进行全面解析。
系统架构概览
该多模态RAG系统主要由四个核心模块组成:
- PDF处理服务:负责PDF上传、解析、页面渲染和Markdown转换
- 索引服务:构建和管理向量索引,支持高效相似性检索
- RAG服务:实现检索-增强-生成的核心逻辑,处理问答流程
- API接口层:提供RESTful API和SSE流接口,供前端调用
系统文件结构如下:
.
├── app.py # API接口层,FastAPI应用
├── services/
│ ├── pdf_service.py # PDF处理服务
│ ├── index_service.py # 向量索引服务
│ └── rag_service.py # RAG核心服务
└── data/ # 数据存储目录
└── {file_id}/ # 每个文件的工作目录
├── original.pdf # 原始PDF文件
├── output.md # 转换后的Markdown
├── index_faiss/ # FAISS索引文件
├── pages/ # 页面图片
│ ├── original/ # 原始页面渲染
│ └── parsed/ # 带布局标记的页面
└── images/ # 提取的图片
核心功能模块解析
1. PDF处理服务 (pdf_service.py)
该模块负责PDF文件的上传、解析和转换,是整个系统的数据源入口。
# services/pdf_service.py (核心代码片段)
def workdir(file_id: str) -> Path:
"""获取文件工作目录,不存在则创建"""
d = DATA_ROOT / file_id
d.mkdir(parents=True, exist_ok=True)
return d
def save_upload(file_id: str, upload_bytes: bytes, filename: str) -> Dict[str, Any]:
"""保存上传的PDF,并返回页数"""
pdf_path = original_pdf_path(file_id)
pdf_path.write_bytes(upload_bytes)
with fitz.open(pdf_path) as doc:
pages = doc.page_count
return {"fileId": file_id, "name": filename, "pages": pages}
def pdf_to_markdown(file_id: str):
"""将PDF转换为Markdown,提取文本、表格和图片"""
pdf_path = str(original_pdf_path(file_id))
out_md = markdown_output(file_id)
img_dir = images_dir(file_id)
# 使用unstructured库解析PDF
elements = partition_pdf(
filename=pdf_path,
infer_table_structure=True,
strategy="hi_res",
ocr_languages="chi_sim+eng",
ocr_engine="paddleocr"
)
# 提取图片并保存
image_map = {}
with fitz.open(pdf_path) as doc:
for page_num, page in enumerate(doc, start=1):
image_map[page_num] = []
for img_index, img in enumerate(page.get_images(full=True), start=1):
# 处理并保存图片...
image_map[page_num].append(img_path.name)
# 构建Markdown内容
md_lines: List[str] = []
# 处理不同类型的元素(标题、文本、表格、图片等)
# ...
out_md.write_text("\n".join(md_lines), encoding="utf-8")
return {"markdown": out_md.name, "images_dir": "images"}
def run_full_parse_pipeline(file_id: str) -> Dict[str, Any]:
"""完整解析流程:渲染页面 → 提取布局 → 生成Markdown"""
render_original_pages(file_id)
docs = unstructured_segments(file_id)
render_parsed_pages_with_boxes(file_id, docs)
md_info = pdf_to_markdown(file_id)
return {"md": md_info["markdown"]}
核心功能:
- 为每个PDF文件创建独立工作目录,便于管理
- 将PDF渲染为图片,支持原始视图和带布局标记的视图
- 使用OCR技术提取PDF中的文本(支持中英文)
- 识别并提取表格、图片等元素
- 将提取的内容转换为结构化的Markdown格式
这段代码是一个PDF处理服务的核心实现,主要功能是接收上传的PDF文件,将其转换为Markdown格式,并提取其中的文本、表格和图片等内容。让我来详细讲解各个部分:
1. 工作目录管理
def workdir(file_id: str) -> Path:
"""获取文件工作目录,不存在则创建"""
d = DATA_ROOT / file_id
d.mkdir(parents=True, exist_ok=True)
return d
这个函数负责为每个PDF文件创建独立的工作目录:
- 接收一个唯一的
file_id
作为参数 - 基于根目录
DATA_ROOT
和file_id
构建路径 - 使用
mkdir
创建目录,parents=True
允许创建多级目录,exist_ok=True
避免目录已存在时的错误 - 返回创建的目录路径
2. 保存上传的PDF文件
def save_upload(file_id: str, upload_bytes: bytes, filename: str) -> Dict[str, Any]:
"""保存上传的PDF,并返回页数"""
pdf_path = original_pdf_path(file_id)
pdf_path.write_bytes(upload_bytes)
with fitz.open(pdf_path) as doc:
pages = doc.page_count
return {"fileId": file_id, "name": filename, "pages": pages}
这个函数处理用户上传的PDF文件:
- 接收
file_id
、PDF字节数据和原始文件名 - 将字节数据写入到指定路径(
original_pdf_path
生成的路径) - 使用
fitz
库(PyMuPDF)打开PDF,获取总页数 - 返回包含文件ID、名称和页数的字典
3. PDF转Markdown的核心功能
def pdf_to_markdown(file_id: str):
"""将PDF转换为Markdown,提取文本、表格和图片"""
pdf_path = str(original_pdf_path(file_id))
out_md = markdown_output(file_id)
img_dir = images_dir(file_id)
# 使用unstructured库解析PDF
elements = partition_pdf(
filename=pdf_path,
infer_table_structure=True,
strategy="hi_res",
ocr_languages="chi_sim+eng",
ocr_engine="paddleocr"
)
这部分代码是核心转换功能的开始:
- 获取PDF路径、Markdown输出路径和图片保存目录
- 使用
unstructured
库的partition_pdf
函数解析PDF - 配置包括:启用表格结构推断、使用高分辨率策略、支持中英文OCR、使用paddleocr引擎
# 提取图片并保存
image_map = {}
with fitz.open(pdf_path) as doc:
for page_num, page in enumerate(doc, start=1):
image_map[page_num] = []
for img_index, img in enumerate(page.get_images(full=True), start=1):
# 处理并保存图片...
image_map[page_num].append(img_path.name)
这部分负责提取PDF中的图片:
- 创建
image_map
字典记录每页包含的图片 - 使用
fitz
库打开PDF,遍历每一页 - 提取页面中的所有图片并保存
- 将图片信息记录到
image_map
中,建立页面与图片的关联
# 构建Markdown内容
md_lines: List[str] = []
# 处理不同类型的元素(标题、文本、表格、图片等)
# ...
out_md.write_text("\n".join(md_lines), encoding="utf-8")
return {"markdown": out_md.name, "images_dir": "images"}
最后构建并保存Markdown内容:
- 处理解析得到的各种元素(标题、文本、表格、图片等)
- 将这些元素转换为对应的Markdown格式
- 将最终的Markdown内容写入文件
- 返回包含Markdown文件名和图片目录的字典
4. 完整解析流程
def run_full_parse_pipeline(file_id: str) -> Dict[str, Any]:
"""完整解析流程:渲染页面 → 提取布局 → 生成Markdown"""
render_original_pages(file_id)
docs = unstructured_segments(file_id)
render_parsed_pages_with_boxes(file_id, docs)
md_info = pdf_to_markdown(file_id)
return {"md": md_info["markdown"]}
这个函数定义了完整的PDF解析流程:
- 渲染原始页面(
render_original_pages
) - 提取非结构化片段(
unstructured_segments
) - 渲染带有内容框的解析页面(
render_parsed_pages_with_boxes
) - 调用
pdf_to_markdown
生成Markdown - 返回包含Markdown信息的结果
整体总结
这段代码实现了一个功能完整的PDF处理服务,主要技术点包括:
- 使用文件ID进行文件隔离,每个PDF有独立的工作目录
- 结合
unstructured
库进行PDF内容解析 - 使用
PyMuPDF
(fitz)处理PDF文件和提取图片 - 集成OCR功能,支持中英文识别
- 将PDF内容(文本、表格、图片)转换为结构化的Markdown格式
这个服务可以应用于需要将PDF文档转换为易于处理的文本格式的场景,如文档检索、内容分析等。
2. 索引服务 (index_service.py)
该模块负责将解析后的文本内容转换为向量索引,支持高效的相似性检索。
# services/index_service.py (核心代码片段)
def load_embeddings() -> OpenAIEmbeddings:
"""加载嵌入模型,支持代理配置"""
api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_EMBEDDING_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENAI_EMBEDDING_BASE_URL")
kwargs = {"api_key": api_key}
if base_url:
kwargs["base_url"] = base_url
return OpenAIEmbeddings(model="text-embedding-3-large", **kwargs)
def split_markdown(md_text: str) -> List[Document]:
"""将Markdown文本按标题分割为文档片段"""
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
# 可添加更细粒度的分割
]
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
docs = splitter.split_text(md_text)
# 清洗文档,过滤空内容和过长文本
cleaned: List[Document] = []
for d in docs:
txt = (d.page_content or "").strip()
if not txt:
continue
if len(txt) > 8000:
txt = txt[:8000]
cleaned.append(Document(page_content=txt, metadata=d.metadata))
return cleaned
def build_faiss_index(file_id: str) -> Dict[str, Any]:
"""构建FAISS向量索引"""
md_file = markdown_path(file_id)
if not md_file.exists():
return {"ok": False, "error": "MARKDOWN_NOT_FOUND"}
md_text = md_file.read_text(encoding="utf-8")
docs = split_markdown(md_text)
if not docs:
return {"ok": False, "error": "EMPTY_MD"}
embeddings = load_embeddings()
vs = FAISS.from_documents(docs, embedding=embeddings)
vs.save_local(str(index_dir(file_id)))
return {"ok": True, "chunks": len(docs)}
def search_faiss(file_id: str, query: str, k: int = 5) -> Dict[str, Any]:
"""检索与查询相关的文档片段"""
idx = index_dir(file_id)
if not (idx / "index.faiss").exists():
return {"ok": False, "error": "INDEX_NOT_FOUND"}
embeddings = load_embeddings()
vs = FAISS.load_local(str(idx), embeddings, allow_dangerous_deserialization=True)
hits = vs.similarity_search_with_score(query, k=k)
results = []
for doc, score in hits:
results.append({
"text": doc.page_content,
"score": float(score),
"metadata": doc.metadata,
})
return {"ok": True, "results": results}
核心功能:
- 加载嵌入模型(使用OpenAI兼容接口)
- 将Markdown文本按标题层级分割为语义片段
- 使用FAISS构建向量索引,支持高效相似性检索
- 提供索引构建和检索的基础接口
这段是一个处理文档向量索引和检索的服务代码,主要功能是将Markdown文档转换为向量索引,并提供基于语义的检索能力。让我详细讲解各个部分:
1. 嵌入模型加载
def load_embeddings() -> OpenAIEmbeddings:
"""加载嵌入模型,支持代理配置"""
api_key = os.getenv("OPENAI_API_KEY") or os.getenv("OPENAI_EMBEDDING_API_KEY")
base_url = os.getenv("OPENAI_BASE_URL") or os.getenv("OPENAI_EMBEDDING_BASE_URL")
kwargs = {"api_key": api_key}
if base_url:
kwargs["base_url"] = base_url
return OpenAIEmbeddings(model="text-embedding-3-large", **kwargs)
这个函数负责加载OpenAI的嵌入模型:
- 从环境变量获取API密钥,支持主密钥和专用嵌入密钥
- 支持配置基础URL(可用于代理或私有部署)
- 使用"text-embedding-3-large"模型,这是OpenAI的高性能嵌入模型
- 返回配置好的嵌入模型实例,用于后续的文本向量化处理
2. Markdown文档分割
def split_markdown(md_text: str) -> List[Document]:
"""将Markdown文本按标题分割为文档片段"""
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
# 可添加更细粒度的分割
]
splitter = MarkdownHeaderTextSplitter(headers_to_split_on=headers_to_split_on)
docs = splitter.split_text(md_text)
# 清洗文档,过滤空内容和过长文本
cleaned: List[Document] = []
for d in docs:
txt = (d.page_content or "").strip()
if not txt:
continue
if len(txt) > 8000:
txt = txt[:8000]
cleaned.append(Document(page_content=txt, metadata=d.metadata))
return cleaned
这个函数将Markdown文本分割为适合嵌入的文档片段:
- 基于Markdown标题层级进行分割(# 和 ## 级别)
- 使用
MarkdownHeaderTextSplitter
专门处理Markdown格式 - 对分割后的文档进行清洗:
- 过滤空内容文档
- 限制最大长度(8000字符),防止过长文本
- 返回清洗后的文档列表,每个文档包含内容和元数据
这种分割方式保留了文档的结构信息,使后续检索更精准。
3. 构建FAISS向量索引
def build_faiss_index(file_id: str) -> Dict[str, Any]:
"""构建FAISS向量索引"""
md_file = markdown_path(file_id)
if not md_file.exists():
return {"ok": False, "error": "MARKDOWN_NOT_FOUND"}
md_text = md_file.read_text(encoding="utf-8")
docs = split_markdown(md_text)
if not docs:
return {"ok": False, "error": "EMPTY_MD"}
embeddings = load_embeddings()
vs = FAISS.from_documents(docs, embedding=embeddings)
vs.save_local(str(index_dir(file_id)))
return {"ok": True, "chunks": len(docs)}
这是构建向量索引的核心函数:
- 检查Markdown文件是否存在,不存在则返回错误
- 读取Markdown文本内容
- 使用
split_markdown
函数分割文本为文档片段 - 加载嵌入模型
- 使用FAISS库从文档构建向量存储(
FAISS.from_documents
) - 将构建好的索引保存到本地目录
- 返回成功状态和分割的片段数量
FAISS是Facebook开发的高效向量检索库,适合存储和查询高维向量。
4. 向量检索功能
def search_faiss(file_id: str, query: str, k: int = 5) -> Dict[str, Any]:
"""检索与查询相关的文档片段"""
idx = index_dir(file_id)
if not (idx / "index.faiss").exists():
return {"ok": False, "error": "INDEX_NOT_FOUND"}
embeddings = load_embeddings()
vs = FAISS.load_local(str(idx), embeddings, allow_dangerous_deserialization=True)
hits = vs.similarity_search_with_score(query, k=k)
results = []
for doc, score in hits:
results.append({
"text": doc.page_content,
"score": float(score),
"metadata": doc.metadata,
})
return {"ok": True, "results": results}
这个函数提供基于语义的检索功能:
- 检查索引是否存在,不存在则返回错误
- 加载嵌入模型
- 从本地加载保存的FAISS索引
- 使用
similarity_search_with_score
进行相似性检索:query
是用户的查询文本k
指定返回的最相关结果数量(默认5个)
- 处理检索结果,包含文本内容、相似度分数和元数据
- 返回格式化的检索结果
整体总结
这段代码实现了一个完整的文档向量索引与检索 pipeline:
- 文本处理:将Markdown文档按结构分割为合理的片段
- 向量生成:使用OpenAI的嵌入模型将文本转换为向量
- 索引构建:使用FAISS构建高效的向量索引
- 语义检索:根据查询文本找到最相关的文档片段
这个服务通常用于构建基于文档的问答系统、智能检索工具等,能够理解文本的语义内容,而不仅仅是关键词匹配,大大提高了检索的准确性。
技术栈方面,主要使用了:
- OpenAI的嵌入模型进行文本向量化
- LangChain库的文档分割和处理工具
- FAISS库进行向量存储和高效检索
3. RAG服务 (rag_service.py)
该模块实现了检索增强生成的核心逻辑,将检索到的信息与大语言模型结合,生成准确的回答。
# services/rag_service.py (核心代码片段)
# 会话历史管理
_sessions: dict[str, list[dict]] = defaultdict(list)
def get_history(session_id: str) -> list[dict]:
return _sessions.get(session_id, [])
def append_history(session_id: str, role: str, content: str) -> None:
_sessions[session_id].append({"role": role, "content": content})
async def retrieve(question: str, file_id: str) -> tuple[list[dict], str]:
"""检索相关文档片段并评估其相关性"""
vs = _load_vs(file_id)
hits = vs.similarity_search_with_score(question, k=K)
citations = []
ctx_snippets = []
scores = []
# 处理检索结果
for i, (doc, score) in enumerate(hits, start=1):
# 构建引用信息...
citations.append(...)
ctx_snippets.append(...)
scores.append(float(score))
context_text = "\n\n".join(ctx_snippets) if ctx_snippets else "(no hits)"
# 规则 + LLM 复核检索结果相关性
ok_by_score = _score_ok(scores)
if not ok_by_score:
grader = _get_grader()
grade_prompt = GRADE_PROMPT.format(context=context_text, question=question)
decision = await grader.ainvoke([{"role": "user", "content": grade_prompt}])
ok_by_llm = "yes" in (decision.content or "").lower()
else:
ok_by_llm = True
branch = "with_context" if ok_by_llm else "no_context"
return citations, context_text if branch == "with_context" else ""
async def answer_stream(
question: str,
citations: list[dict],
context_text: str,
branch: str,
session_id: str | None = None
) -> AsyncGenerator[dict, None]:
"""流式生成回答,并返回引用信息"""
# 先推送引用信息
if branch == "with_context" and citations:
for c in citations:
yield {"type": "citation", "data": c}
# 构建提示信息
llm = _get_llm()
history_msgs = get_history(session_id) if session_id else []
if branch == "with_context" and context_text:
user_prompt = ANSWER_WITH_CONTEXT.format(question=question, context=context_text)
else:
user_prompt = ANSWER_NO_CONTEXT.format(question=question)
# 完整消息序列:system + 历史多轮 + 当前用户
msgs = [{"role": "system", "content": SYSTEM_INSTRUCTION}]
msgs.extend(history_msgs)
msgs.append({"role": "user", "content": user_prompt})
# 流式生成回答
final_text_parts: list[str] = []
try:
async for chunk in llm.astream(msgs):
delta = getattr(chunk, "content", None)
if delta:
final_text_parts.append(delta)
yield {"type": "token", "data": delta}
except Exception:
# 非流式回退
resp = await llm.ainvoke(msgs)
text = resp.content or ""
final_text_parts.append(text)
# 分块推送...
# 附加相关图片
if branch == "with_context" and citations:
imgs = []
for c in citations[:2]:
url = c.get("previewUrl")
if url:
imgs.append(f"")
if imgs:
tail = "\n\n---\n**相关页面预览**\n\n" + "\n\n".join(imgs)
yield {"type": "token", "data": tail}
# 保存对话历史
if session_id:
append_history(session_id, "user", question)
append_history(session_id, "assistant", "".join(final_text_parts))
yield {"type": "done", "data": {"used_retrieval": branch == "with_context"}}
核心功能:
- 管理对话历史,支持多轮对话
- 实现检索结果的相关性评估(规则+LLM)
- 构建提示词,将检索到的上下文融入回答过程
- 支持流式回答生成,提升用户体验
- 在回答中插入相关图片引用,实现多模态输出
4. API接口层 (app.py)
该模块基于FastAPI实现了RESTful API和SSE流接口,提供前端访问的入口。
# app.py (核心代码片段)
app = FastAPI(
title="九天老师公开课:多模态RAG系统API",
version="1.0.0",
description="多模态RAG系统开发实战后端API。"
)
# 跨域配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 聊天接口(SSE流式响应)
@app.post(f"{API_PREFIX}/chat", tags=["Chat"])
async def chat_stream(req: ChatRequest):
"""SSE 事件:token | citation | done | error"""
async def gen():
try:
question = (req.message or "").strip()
session_id = (req.sessionId or "default").strip()
file_id = (req.pdfFileId or "").strip()
citations, context_text = [], ""
branch = "no_context"
if file_id:
try:
citations, context_text = await retrieve(question, file_id)
branch = "with_context" if context_text else "no_context"
except FileNotFoundError:
branch = "no_context"
# 推送引用和回答流
# ...
except Exception as e:
# 错误处理
# ...
headers = {"Cache-Control": "no-cache, no-transform", "Connection": "keep-alive"}
return StreamingResponse(gen(), media_type="text/event-stream", headers=headers)
# PDF上传接口
@app.post(f"{API_PREFIX}/pdf/upload", tags=["PDF"])
async def pdf_upload(file: UploadFile = File(...), replace: Optional[bool] = True):
# 处理文件上传
# ...
# 索引构建接口
@app.post(f"{API_PREFIX}/index/build", tags=["Index"])
async def index_build(req: BuildIndexRequest):
# 构建索引
# ...
这段代码是多模态RAG系统的API接口层实现,基于FastAPI框架开发,核心作用是对外提供标准化的接口服务,连接前端用户操作与后端业务逻辑(如PDF处理、向量检索、流式回答生成)。下面从整体架构到具体接口逐一详细讲解:
一、整体架构与核心依赖
- 框架选择:使用
FastAPI
,因其支持异步请求、自动生成API文档、性能高效,且对SSE(Server-Sent Events,服务器推送事件)有良好支持,非常适合需要流式响应的AI对话场景。 - 核心功能:提供3类核心接口(聊天流式接口、PDF上传接口、索引构建接口),并通过跨域中间件解决前后端分离架构中的跨域问题。
- 数据流转:前端通过HTTP请求调用接口,后端将请求分发到对应业务模块(如
rag_service.py
的检索与回答逻辑、pdf_service.py
的文件处理逻辑),最终将结果返回给前端。
二、关键初始化配置
1. FastAPI应用初始化
app = FastAPI(
title="九天老师公开课:多模态RAG系统API",
version="1.0.0",
description="多模态RAG系统开发实战后端API。"
)
- 初始化FastAPI应用实例
app
,并配置API的元信息(标题、版本、描述)。 - 这些元信息会自动同步到FastAPI的默认文档页面(访问
/docs
或/redoc
可查看),方便前端开发者对接接口。
2. 跨域配置(解决前后端分离问题)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 允许所有前端域名访问(生产环境建议指定具体域名,如["https://your-frontend.com"])
allow_credentials=True, # 允许携带Cookie等凭证
allow_methods=["*"], # 允许所有HTTP方法(GET、POST、PUT等)
allow_headers=["*"], # 允许所有请求头(如Authorization、Content-Type)
)
- 跨域问题的本质:浏览器的“同源策略”限制——前端(如
http://localhost:3000
)与后端(如http://localhost:8000
)域名/端口不同时,直接请求会被拦截。 - 解决方案:通过
CORSMiddleware
中间件配置跨域规则,允许前端正常调用后端接口。 - 生产环境注意:
allow_origins
建议改为具体的前端域名(而非"*"
),避免安全风险。
三、核心接口详解
1. 聊天流式接口(/chat
)—— 核心交互接口
该接口是用户与RAG系统对话的核心入口,支持SSE流式响应(即后端逐步返回回答内容,前端实时展示,类似ChatGPT的“打字”效果),同时集成了文档检索逻辑。
接口定义
@app.post(f"{API_PREFIX}/chat", tags=["Chat"])
async def chat_stream(req: ChatRequest):
"""SSE 事件:token | citation | done | error"""
async def gen():
# 业务逻辑实现
...
headers = {"Cache-Control": "no-cache, no-transform", "Connection": "keep-alive"}
return StreamingResponse(gen(), media_type="text/event-stream", headers=headers)
- 接口类型:
POST
请求(因需要传递复杂参数,如会话ID、问题、PDF文件ID)。 - 路由路径:
{API_PREFIX}/chat
(API_PREFIX
是预定义的路径前缀,如/api/v1
,用于统一管理接口版本)。 - 标签:
tags=["Chat"]
——将接口归类到“Chat”组,方便文档页面分类展示。 - 请求参数:
req: ChatRequest
——使用Pydantic模型ChatRequest
接收前端参数(如message
(用户问题)、sessionId
(会话ID)、pdfFileId
(目标PDF的ID)),FastAPI会自动校验参数格式并返回错误提示。 - 响应类型:
StreamingResponse
——FastAPI的流式响应对象,用于实现SSE。
内部逻辑拆解(gen()
生成器函数)
gen()
是异步生成器函数,负责处理请求并逐步生成响应内容(SSE事件),核心逻辑分为3步:
(1)参数解析与初始化
try:
question = (req.message or "").strip() # 获取用户问题,去除前后空格
session_id = (req.sessionId or "default").strip() # 获取会话ID,默认值为"default"(未传则使用默认会话)
file_id = (req.pdfFileId or "").strip() # 获取目标PDF的ID(用户指定基于哪个PDF回答)
citations, context_text = [], "" # 初始化引用信息(如文档片段来源)和上下文文本
branch = "no_context" # 初始化分支:默认“无上下文”(未关联PDF时)
- 会话ID(
session_id
):用于关联多轮对话历史(对应rag_service.py
的_sessions
会话管理),确保用户的多轮对话连贯(如“上一个问题的回答中提到的XX是什么意思?”)。 - PDF文件ID(
file_id
):指定RAG系统基于哪个PDF文件生成回答(若为空,则系统直接基于LLM回答,不调用检索逻辑)。
(2)检索上下文(关联PDF时)
if file_id: # 若传入了PDF的file_id,调用检索逻辑
try:
# 调用rag_service.py的retrieve函数:根据用户问题检索PDF的相关片段
citations, context_text = await retrieve(question, file_id)
# 若检索到有效上下文,切换到“有上下文”分支
branch = "with_context" if context_text else "no_context"
except FileNotFoundError:
# 若PDF文件不存在(如file_id错误),仍保持“无上下文”分支
branch = "no_context"
- 核心逻辑:将用户问题与指定PDF关联,通过
retrieve
函数从PDF中检索与问题相关的片段(上下文),并生成引用信息(citations
,如片段来源页码、预览链接)。 - 分支切换:
branch
变量控制后续回答逻辑——“有上下文”则基于PDF片段生成回答,“无上下文”则直接让LLM生成回答(类似普通聊天机器人)。
(3)推送流式响应(SSE事件)
# 推送引用信息(先于回答内容,让用户知道回答基于哪些PDF片段)
if branch == "with_context" and citations:
for c in citations:
yield {"type": "citation", "data": c} # 推送“引用”类型事件
# 调用rag_service.py的answer_stream函数,流式生成回答
async for event in answer_stream(question, citations, context_text, branch, session_id):
yield event # 推送“token”(回答片段)或“done”(完成)类型事件
except Exception as e:
# 错误处理:推送“error”类型事件,告知前端异常
yield {"type": "error", "data": f"对话出错:{str(e)}"}
- SSE事件类型:
citation
:引用信息(如“来源:PDF第3页,片段内容:XXX”),前端可展示为“参考文档”区域。token
:回答的片段(如“RAG系统的核心是将检索到的文档片段融入LLM回答,…”),前端逐段拼接展示,实现“流式打字”效果。done
:回答完成信号,前端可关闭加载状态。error
:错误信息,前端可提示用户“对话出错,请重试”。
(4)响应头配置
headers = {"Cache-Control": "no-cache, no-transform", "Connection": "keep-alive"}
return StreamingResponse(gen(), media_type="text/event-stream", headers=headers)
media_type="text/event-stream"
:SSE的标准媒体类型,告知浏览器这是流式事件响应。Cache-Control: no-cache
:禁止浏览器缓存流式内容(避免重复展示旧内容)。Connection: keep-alive
:保持HTTP连接长驻(SSE需要持续推送内容,不能断开连接)。
2. PDF上传接口(/pdf/upload
)—— 文档导入入口
@app.post(f"{API_PREFIX}/pdf/upload", tags=["PDF"])
async def pdf_upload(file: UploadFile = File(...), replace: Optional[bool] = True):
# 处理文件上传
# ...
- 功能:接收前端上传的PDF文件,调用
pdf_service.py
的save_upload
函数保存文件,并生成唯一的file_id
(后续检索、索引构建需用到该ID)。 - 请求参数:
file: UploadFile = File(...)
:前端通过form-data
形式上传的PDF文件,UploadFile
是FastAPI提供的文件对象,支持读取文件字节、文件名等信息。replace: Optional[bool] = True
:是否覆盖已存在的同名PDF(若为True
,则相同文件名的文件会被覆盖;若为False
,则返回“文件已存在”错误)。
- 返回结果:通常返回
file_id
、文件名、文件页数等信息(如{"ok": True, "fileId": "abc123", "name": "test.pdf", "pages": 10}
),供前端后续调用索引构建接口。
3. 索引构建接口(/index/build
)—— RAG检索前提
@app.post(f"{API_PREFIX}/index/build", tags=["Index"])
async def index_build(req: BuildIndexRequest):
# 构建索引
# ...
- 功能:基于已上传的PDF文件(通过
file_id
指定),调用index_service.py
的build_faiss_index
函数构建FAISS向量索引——这是RAG系统“检索”能力的前提(没有索引则无法快速匹配相关文档片段)。 - 请求参数:
req: BuildIndexRequest
——Pydantic模型,核心参数是file_id
(指定为哪个PDF构建索引)。 - 处理逻辑:
- 校验
file_id
对应的PDF是否存在。 - 调用
build_faiss_index
函数,将PDF转换为Markdown片段、生成向量、构建FAISS索引并保存到本地。 - 返回索引构建结果(如
{"ok": True, "chunks": 20}
,表示成功构建索引,共分割为20个文档片段)。
- 校验
- 注意:索引构建是“一次性操作”——PDF上传后只需构建一次索引,后续检索可直接复用索引,无需重复构建。
四、接口调用与前端交互流程
以“用户上传PDF→构建索引→基于PDF聊天”为例,完整流程如下:
- 前端调用
/pdf/upload
接口,上传PDF文件,获取file_id
。 - 前端调用
/index/build
接口,传入file_id
,触发索引构建。 - 前端调用
/chat
接口,传入message
(用户问题)、session_id
(会话ID)、file_id
(已构建索引的PDF)。 - 后端通过SSE逐步推送
citation
(引用)、token
(回答片段)、done
(完成)事件,前端实时展示。
五、核心亮点与注意事项
1. 亮点
- 流式响应:通过SSE实现“边生成边返回”,减少用户等待感,提升交互体验。
- 模块化设计:接口层仅负责“请求接收与结果返回”,业务逻辑 delegate 到其他模块(如
rag_service
、pdf_service
),代码可维护性高。 - 自动文档:FastAPI自动生成接口文档,前端无需手动编写接口文档,降低对接成本。
2. 注意事项
- 会话管理:
session_id
对应的会话历史存储在内存(rag_service.py
的_sessions
),服务重启后会话会丢失——生产环境需改用Redis等持久化存储。 - SSE连接稳定性:长连接可能因网络波动断开,前端需实现“重连逻辑”,避免对话中断。
- 安全防护:生产环境需添加接口鉴权(如JWT令牌),防止未授权用户调用接口;同时限制
file_id
的有效性,避免恶意请求。
总结
该代码是多模态RAG系统的“前端入口层”,通过FastAPI将后端的检索、回答、文件处理能力封装为标准化API,实现了“用户提问→检索上下文→流式回答”的完整交互闭环。其核心价值在于:解决前后端跨域问题、支持流式响应提升体验、提供清晰的接口分类便于对接,是连接用户操作与后端业务逻辑的关键桥梁。
核心功能:
- 提供PDF上传、解析、状态查询接口
- 实现索引构建和检索接口
- 提供基于SSE的流式聊天接口,支持实时响应
- 管理会话状态,支持多用户同时使用
系统工作流程
-
PDF处理流程:
- 用户上传PDF文件 → 系统生成唯一fileId
- 系统渲染PDF页面为图片
- 解析PDF内容,提取文本、表格和图片
- 将解析结果转换为Markdown格式
-
索引构建流程:
- 读取Markdown文件内容
- 按标题层级分割为文本片段
- 使用嵌入模型将文本片段转换为向量
- 构建并保存FAISS向量索引
-
问答流程:
- 用户输入问题,系统获取会话历史
- 检索与问题相关的文本片段
- 评估检索结果的相关性
- 构建包含上下文的提示词
- 调用LLM生成回答,并流式返回结果
- 在回答中插入相关图片引用
- 保存对话历史,支持多轮对话
总结
本系统实现了一个完整的多模态RAG解决方案,能够处理PDF文件中的文本和图片信息,通过向量检索增强大语言模型的回答能力。系统采用模块化设计,各组件职责清晰,便于维护和扩展。
该系统的特点包括:
- 多模态支持:不仅处理文本,还能提取和引用图片
- 高效检索:使用FAISS实现快速相似性搜索
- 流式响应:提供实时的回答生成体验
- 会话管理:支持多轮对话,保持上下文
- 可扩展性:各模块松耦合,便于替换或升级
通过这个系统,用户可以上传PDF文档,然后基于文档内容进行智能问答,系统会自动引用文档中的相关片段和图片,提高回答的准确性和可信度。
更多推荐
所有评论(0)