基于全阿里RAG技术栈,实现企业内部PDF格式规章制度内容的查询
本文介绍了一个基于阿里云技术栈的企业规章制度查询系统实现方案。系统采用PyPDF2提取PDF文档内容并进行切分,使用阿里云text-embedding-v1工具生成文本向量,通过Chroma_DB向量数据库存储和检索文档。系统核心功能包括:PDF内容提取与分段、远程Embedding处理、向量数据库存储、相似内容检索,以及调用Qwen-max大模型生成回答。该方案实现了对企业规章制度的智能问答功能
企业员工查询企业内部的规章制度,是企业经常遇到的日常问题。本例给出了基于全阿里技术栈的实现方案,轻松实现对企业规章制度的查询。
使用阿里云百炼Qwen-max大模型。
文档内容提取和内容切分,使用Python内嵌的PyPDF2。
Embedding采用阿里云内嵌的远程"text-embedding-v1"工具。
向量数据库采用Chroma_DB,与Python无缝集成。
import os
import json
from PyPDF2 import PdfReader
import chromadb
from chromadb.utils import embedding_functions
import dashscope
from dashscope import TextEmbedding, Generation
# ==================== 系统参数配置====================
DASHSCOPE_API_KEY = os.getenv("DASHSCOPE_API_KEY")
PDF_PATH = "enterprise_rules.pdf" # 要处理的PDF 文件
QWEN_MODEL = "qwen-max" # 使用Qwen大模型
EMBEDDING_MODEL = "text-embedding-v1" # 阿里云集成Embedding工具,推荐使用 v1 或 v2
CHROMA_DB_PATH = "./chroma_db_company" # 本地数据库保存路径
# ============================================================
# 设置 DashScope API Key
dashscope.api_key = DASHSCOPE_API_KEY
# ================== Step 1: 提取 PDF 并分段 ==================
def extract_text_from_pdf(pdf_path):
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"找不到文件: {pdf_path}")
reader = PdfReader(pdf_path)
text = ""
for page in reader.pages:
text += page.extract_text() or ""
return text
def split_text(text, chunk_size=300, overlap=50):
words = text.split()
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk = " ".join(words[start:end])
chunks.append(chunk)
start = end - overlap
return [c.strip() for c in chunks if len(c.strip()) > 20]
# ================== Step 2: 调用阿里云获取 Embedding ==================
def get_embedding(texts):
if isinstance(texts, str):
texts = [texts] # 把 texts 包装成一个单元素列表,以满足 embedding 函数 input 参数要求
response = TextEmbedding.call(
model=EMBEDDING_MODEL,
input=texts #embedding 接口要求输入是“文本列表”(list of strings),而不是单个字符串。
)
if response.status_code == 200:
# 使用字典键访问数据
embeddings = [d["embedding"] for d in response.output['embeddings']]
return embeddings[0] if len(embeddings) == 1 else embeddings
else:
raise Exception(f"❌ Embedding 调用失败: {response.status_code} {response.message}")
# ================== Step 3: 初始化 ChromaDB(使用远程 Embedding)==================
client = chromadb.PersistentClient(path=CHROMA_DB_PATH)
# 我们不使用本地 embedding 函数,只存原始文档
collection = client.get_or_create_collection(
name="company_rules",
metadata={"hnsw:space": "cosine"}
)
# ================== Step 4: 将 PDF 内容写入向量库 ==================
def load_pdf_to_vector_db(pdf_path):
print("📄 正在读取 PDF...")
full_text = extract_text_from_pdf(pdf_path)
chunks = split_text(full_text, chunk_size=300, overlap=50)
print(f"✅ 切分为 {len(chunks)} 个段落")
# 批量获取 embedding(分批避免超限)
batch_size = 10
all_embeddings = []
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
print(f"☁️ 正在处理第 {i//batch_size + 1} 批 Embedding...")
embs = get_embedding(batch)
all_embeddings.extend(embs)
# 存入 ChromaDB
collection.upsert(
ids=[f"chunk_{i}" for i in range(len(chunks))],
documents=chunks,
embeddings=all_embeddings,
metadatas=[{"source": os.path.basename(pdf_path), "id": f"chunk_{i}"} for i in range(len(chunks))]
)
print("💾 已将文档存入本地向量数据库!")
# ================== Step 5: 检索最相关的内容 ==================
def retrieve_relevant_context(question, n_results=2):
print("🔍 正在检索知识库...")
# 对问题做 embedding
q_emb = get_embedding(question)
# 查询最相似的文档
results = collection.query(
query_embeddings=[q_emb],
n_results=n_results
)
return results['documents'][0] # 返回 top 2 段落
# ================== Step 6: 调用 Qwen 生成回答 ==================
def call_qwen(prompt):
try:
response = Generation.call(
model=QWEN_MODEL,
messages=[
{'role': 'system', 'content': '你是一个企业规章制度顾问,请根据提供的资料准确回答问题。'},
{'role': 'user', 'content': prompt}
],
temperature=0.5,
top_p=0.8
)
if response.status_code == 200:
return response.output.choices[0].message.content
else:
return f"❌ 调用失败:{response.status_code} {response.message}"
except Exception as e:
return f"🚨 调用出错:{str(e)}"
# ================== Step 7: 主问答逻辑 ==================
def ask_company_rules(question):
context = retrieve_relevant_context(question, n_results=2)
prompt = f"""
你是一名公司制度专家,请根据以下真实资料回答问题。请回答简洁、准确、口语化。
【参考资料】
{''.join(f"- {c}\n" for c in context)}
【问题】
{question}
⚠️ 要求:
1. 如果资料中没有相关信息,请说“暂时无法找到相关信息”。
2. 不要编造内容。
3. 回答用中文。
"""
print("☁️ 正在调用 阿里云百炼Qwen 大模型...")
answer = call_qwen(prompt)
return answer
# ================== 主程序入口 ==================
if __name__ == "__main__":
# 第一次运行时加载 PDF(只需一次),将内容导入向量数据库
if collection.count() == 0:
print("📥 检测到数据库为空,正在导入 PDF 数据...")
load_pdf_to_vector_db(PDF_PATH)
else:
print(f"📊 已加载 {collection.count()} 条数据")
print("\n💬 欢迎使用【公司制度智能助手】(输入 'quit' 退出)")
print("💡 示例问题:")
print(" • 年假怎么申请?")
print(" • 加班有补贴吗?")
print(" • 试用期多久?")
while True:
question = input("\n❓ 请输入你的问题:").strip()
if question.lower() == 'quit':
print("👋 感谢使用,再见!")
break
if not question:
continue
answer = ask_company_rules(question)
print("\n" + "=" * 60)
print("🤖 回答:")
print(answer)
print("=" * 60)
更多推荐
所有评论(0)