背景:

基于chatglm构建agnet:chatglm实现Agent控制 - 知乎

前面一篇文章已经介绍了如何去搭建LLM Agent控制系统,也简单介绍了如何去构建Toolset和构建Action。但是在上篇文章中Toolset其实是基于搜索api构建的,从这篇文章开始后面几篇文章会围绕具体的工具展开介绍如何搭建专业工具。这篇文章介绍的是如何构建临时文件填充工具:向量检索。

向量检索有两大部分:内容存储部分、内容检索部分。

开始细节讲解之前,先来一个整体例子介绍:何谓向量化:

## 
# 导入分割文本的工具,并把上面给出的解释分成文档块

from langchain.text_splitter import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
    chunk_size = 100,
    chunk_overlap  = 20,
)
explanation = '''Autoencoder(自动编码器)是一种非常有趣的数学模型,就像一个魔法盒子,可以帮助我们理解数据是如何在空间中转换和变换的。

这个魔法盒子里有两个部分:编码器(Encoder)和解码器(Decoder)。

首先,让我们来看一下编码器。它是一个把输入数据(比如一张图片、一段视频或者一篇文章)变得更容易看懂的神奇机器。它将输入数据压缩成一个更小的空间,这样原始数据中的信息就会更加强烈地保留下来。

接下来是解码器。它是一个把编码器压缩后的结果恢复成原始数据的神奇机器。它将编码器得到的结果还原成输入数据(也就是我们刚刚压缩过的数据),这样解码器就得到了和原始数据完全一样的输出。

那么,为什么说Autoencoder能够提高数据处理的效率呢?

因为通过有效的数据压缩和恢复,Autoencoder能够减少数据量,从而更快地处理和分析数据。这就好像把一个大箱子变成了一个更小的箱子,虽然里面东西的总量没有变,但是可以更轻松地拿取和移动箱子。

所以,Autoencoder是一个非常有趣的数学模型,它可以帮助我们更好地理解数据在空间中的转换和变换。
'''

texts = text_splitter.create_documents([explanation])

切割完后数据如下:

对切割完的数据embedding:

from langchain.embeddings import HuggingFaceEmbeddings
       
model_name = "nghuyong/ernie-3.0-xbase-zh"
#model_name = "nghuyong/ernie-3.0-nano-zh"
#model_name = "shibing624/text2vec-base-chinese"
#model_name = "GanymedeNil/text2vec-large-chinese"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': False}
hf = HuggingFaceEmbeddings(
   model_name=model_name,
   model_kwargs=model_kwargs,
   encode_kwargs=encode_kwargs,
   cache_folder = "/root/autodl-tmp/ChatGLM2-6B/llm_model"
)
query_result = hf.embed_query(texts[0].page_content)
print(query_result)

embbding后的数据如下:

技术点:

数据加载

使用文档加载器从文档源加载数据。 文档是一段文本和关联的元数据。 例如,有一些文档加载器可以加载简单的 .txt 文件、加载任何网页的文本内容,甚至加载 YouTube 视频的脚本。

文档加载器公开了一个“加载”方法,用于从配置的源将数据加载为文档。 它们还可以选择实现“延迟加载”,以便将数据延迟加载到内存中。

数据类型

txt

from langchain.document_loaders import TextLoader

loader = TextLoader("./index.md")
loader.load()

pdf

#pip install pypdf
from langchain.document_loaders import PyPDFLoader

loader = PyPDFLoader("example_data/layout-parser-paper.pdf")
pages = loader.load_and_split()
pages[0]

数据分块

RecursiveCharacterTextSplitter:对于一般文本,推荐使用此文本分割器。 它由字符列表参数化。 它尝试按顺序分割它们,直到块足够小。 默认列表为 ["\n\n", "\n", " ", ""]。 这样做的效果是尝试将所有段落(然后是句子,然后是单词)尽可能长时间地放在一起,因为这些通常看起来是语义相关性最强的文本片段。

文本如何分割:按字符列表

如何测量块大小:按字符数

实现代码:

from langchain.text_splitter import RecursiveCharacterTextSplitter
text_splitter = RecursiveCharacterTextSplitter(
    # Set a really small chunk size, just to show.
    chunk_size = 100,
    chunk_overlap  = 20,
    length_function = len,
)
texts = text_splitter.create_documents([explanation])
print(texts[0])

CharacterTextSplitter:是最简单的方法。 这基于字符(默认为“\n\n”)进行分割,并按字符数测量块长度。

文本如何分割:按单个字符

如何测量块大小:按字符数

from langchain.text_splitter import CharacterTextSplitter
from langchain.document_loaders import TextLoader

loader = TextLoader("/root/autodl-tmp/ChatGLM2-6B/read.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)

文本embbeding处理

embbeding的作用就是对上面切好块的数据块,用预训练好的文本模型来做一道编码转换,压成有语意的向量。后面在检索的时候,就是通过对检索的问题做Embbding向量化,然后通过问题向量和存储的数据做相似度计算;把相关的数据捞出来做后续处理。

from langchain.embeddings import HuggingFaceEmbeddings
       
model_name = "nghuyong/ernie-3.0-xbase-zh"
#model_name = "nghuyong/ernie-3.0-nano-zh"
#model_name = "shibing624/text2vec-base-chinese"
#model_name = "GanymedeNil/text2vec-large-chinese"
model_kwargs = {'device': 'cpu'}
encode_kwargs = {'normalize_embeddings': False}
hf = HuggingFaceEmbeddings(
   model_name=model_name,
   model_kwargs=model_kwargs,
   encode_kwargs=encode_kwargs,
   cache_folder = "/root/autodl-tmp/ChatGLM2-6B/llm_model"
)

数据存储向量库

存储到本地:

from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS

# 导入分割文本的工具,并把上面给出的解释分成文档块

from langchain.text_splitter import RecursiveCharacterTextSplitter

loader = TextLoader("/root/autodl-tmp/ChatGLM2-6B/read.txt")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
db = FAISS.from_documents(docs, hf)

如果数据量大,且数据是需要长期使用的,而不是临时使用用完就丢,数据可以存储到es:

import os
import shutil
from elasticsearch import Elasticsearch
from langchain.vectorstores import ElasticKnnSearch
from langchain.document_loaders import TextLoader
from langchain.text_splitter import CharacterTextSplitter
from configs.params import ESParams
from embedding import Embeddings
from typing import Dict


def _default_knn_mapping(dims: int) -> Dict:
    """Generates a default index mapping for kNN search."""
    return {
        "properties": {
            "text": {"type": "text"},
            "vector": {
                "type": "dense_vector",
                "dims": dims,
                "index": True,
                "similarity": "cosine",
            },
        }
    }


def load_file(filepath, chunk_size, chunk_overlap):
    loader = TextLoader(filepath, encoding='utf-8')
    documents = loader.load()
    text_splitter = CharacterTextSplitter(separator='\n', chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    docs = text_splitter.split_documents(documents)
    return docs


class ES:
    def __init__(self, embedding_model_path):
        self.es_params = ESParams()
        self.client = Elasticsearch(['{}:{}'.format(self.es_params.url, self.es_params.port)],
                                    basic_auth=(self.es_params.username, self.es_params.passwd),
                                    verify_certs=False)
        self.embedding = Embeddings(embedding_model_path)
        self.es = ElasticKnnSearch(index_name=self.es_params.index_name, embedding=self.embedding,
                                   es_connection=self.client)

    def doc_upload(self, file_obj, chunk_size, chunk_overlap):
        try:
            if not self.client.indices.exists(index=self.es_params.index_name):
                dims = len(self.embedding.embed_query("test"))
                mapping = _default_knn_mapping(dims)
                self.client.indices.create(index=self.es_params.index_name, body={"mappings": mapping})
            filename = os.path.split(file_obj.name)[-1]
            file_path = 'data/' + filename
            shutil.move(file_obj.name, file_path)
            docs = load_file(file_path, chunk_size, chunk_overlap)
            self.es.add_documents(docs)
            return "插入成功"
        except Exception as e:
            return e

    def exact_search(self, query, top_k):
        result = []
        similar_docs = self.es.similarity_search_with_score(query, k=top_k)
        for i in similar_docs:
            result.append({
                'content': i[0].page_content,
                'source': i[0].metadata['source'],
                'score': i[1]
            })
        return result

    def knn_search(self, query, top_k):
        result = []
        query_vector = self.embedding.embed_query(query)
        similar_docs = self.es.knn_search(query=query, query_vector=query_vector, k=top_k)
        hits = [hit for hit in similar_docs["hits"]["hits"]]
        for i in hits:
            result.append({
                'content': i['_source']['text'],
                'source': i['_source']['metadata']['source'],
                'score': i['_score']
            })
        return result

    def hybrid_search(self, query, top_k, knn_boost):
        result = []
        query_vector = self.embedding.embed_query(query)
        similar_docs = self.es.knn_hybrid_search(query=query, query_vector=query_vector, knn_boost=knn_boost,
                                                 query_boost=1 - knn_boost, k=top_k)
        hits = [hit for hit in similar_docs["hits"]["hits"]]
        for i in hits:
            result.append({
                'content': i['_source']['text'],
                'source': i['_source']['metadata']['source'],
                'score': i['_score']
            })
        result = result[:top_k]
        return result


from configs.params import ESParams
from elasticsearch import Elasticsearch

es_params = ESParams()
index_name = es_params.index_name

# %% 初始化ES对象
client = Elasticsearch(['{}:{}'.format(es_params.url, es_params.port)],
                       basic_auth=(es_params.username, es_params.passwd),
                       verify_certs=False)

# %% 连通测试
client.ping()

# %% 检查索引是否存在
index_exists = client.indices.exists(index=index_name)

# %% 新建索引
response = client.indices.create(index=index_name, body=mapping)

# %% 插入数据
response = client.index(index=index_name, id=document_id, document=data)

# %% 更新
rp = client.update(index=index_name, id=document_id, body={"doc": data})

# %% 检查文档是否存在
document_exists = client.exists(index=index_name, id=document_id)

# %% 根据ID删除文档
response = client.delete(index=index_name, id=document_id)

用户检索

检索转写

基于距离的向量数据库检索在高维空间中嵌入(表示)查询,并根据“距离”查找相似的嵌入文档。 但是,如果查询措辞发生细微变化,或者嵌入不能很好地捕获数据的语义,检索可能会产生不同的结果。 有时会进行及时的工程/调整来手动解决这些问题,但这可能很乏味。

MultiQueryRetriever 通过使用 LLM 从不同角度为给定的用户输入查询生成多个查询,从而自动执行提示调整过程。 对于每个查询,它都会检索一组相关文档,并采用所有查询之间的唯一并集来获取更大的一组潜在相关文档。 通过对同一问题生成多个视角,MultiQueryRetriever 或许能够克服基于距离的检索的一些限制,并获得更丰富的结果集。

from langchain.retrievers.multi_query import MultiQueryRetriever

llm = ChatGLM(model_path="/root/autodl-tmp/ChatGLM2-6B/llm_model/models--THUDM--chatglm2-6b/snapshots/8eb45c842594b8473f291d0f94e7bbe86ffc67d8")
llm.load_model()

retriever_from_llm = MultiQueryRetriever.from_llm(
    retriever=db.as_retriever(), llm=llm
)
query = "chatglm是什么?"
len(retriever_from_llm.get_relevant_documents(query=query))

从多个角度去生成了可能的提问方式:

这部分功能是怎么实现的呢,其实很简单就是让LLM大模型对输入的问题从多个角度来生成可能的问法。你可以通过下面的代码来按自己要求生成自己的问法。把下面代码中 QUERY_PROMPT的template改成你自己的控制约束就行了。

from typing import List
from langchain import LLMChain
from pydantic import BaseModel, Field
from langchain.prompts import PromptTemplate
from langchain.output_parsers import PydanticOutputParser


# Output parser will split the LLM result into a list of queries
class LineList(BaseModel):
    # "lines" is the key (attribute name) of the parsed output
    lines: List[str] = Field(description="Lines of text")


class LineListOutputParser(PydanticOutputParser):
    def __init__(self) -> None:
        super().__init__(pydantic_object=LineList)

    def parse(self, text: str) -> LineList:
        lines = text.strip().split("\n")
        return LineList(lines=lines)


output_parser = LineListOutputParser()

QUERY_PROMPT = PromptTemplate(
    input_variables=["question"],
    template="""You are an AI language model assistant. Your task is to generate five 
    different versions of the given user question to retrieve relevant documents from a vector 
    database. By generating multiple perspectives on the user question, your goal is to help
    the user overcome some of the limitations of the distance-based similarity search. 
    Provide these alternative questions seperated by newlines.
    Original question: {question}""",
)
llm = ChatGLM(model_path="/root/autodl-tmp/ChatGLM2-6B/llm_model/models--THUDM--chatglm2-6b/snapshots/8eb45c842594b8473f291d0f94e7bbe86ffc67d8")
llm.load_model()

# Chain
llm_chain = LLMChain(llm=llm, prompt=QUERY_PROMPT, output_parser=output_parser)

# Other inputs
question = "What are the approaches to Task Decomposition?"

检索结果整理

如果只是把问题做embbding,然后通过相似度检索召回文本,代码实现如下。

query = "chatglm是什么?"
docs = db.similarity_search(query)

检索回来的结果如下:

如果是直接把召回的结果作为答案,那这个肯定达不到我们希望的效果。所以比然要对召回的结果做处理,一般的处理其实就是让LLM模型,利用召回的数据作为参考资料,回答我们的问题。其实这背后的逻辑就是对信息做了过滤,把相关的数据权重加重,然后让模型基于加重权重的数据做回答。

Stuff检索框架

填充文档链(“填充”如“填充”或“填充”)是最直接的文档链。 它需要一个文档列表,将它们全部插入到提示中,并将该提示传递给LLM模型。该链非常适合文档较小且大多数调用只传递少量文档的应用程序。

实现代码如下:

from langchain.chains import RetrievalQA
ruff = RetrievalQA.from_chain_type(
    llm=llm, chain_type="stuff", retriever=db.as_retriever()
)
ruff.run(query)

生成结果如下:

refine检索框架

细化文档链通过循环输入文档并迭代更新其答案来构建响应。 对于每个文档,它将所有非文档输入、当前文档和最新的中间答案传递给 LLM 链以获得新答案。

由于 Refine 链一次仅将单个文档传递给 LLM,因此它非常适合需要分析的文档数量多于模型上下文的任务。 明显的权衡是,该链将比 Stuff 文档链进行更多的 LLM 调用。 还有一些任务很难迭代完成。 例如,当文档频繁相互交叉引用或一项任务需要来自许多文档的详细信息时,Refine 链可能会表现不佳。

from langchain.chains import RetrievalQA
ruff = RetrievalQA.from_chain_type(
    llm=llm, chain_type="refine", retriever=db.as_retriever()
)
ruff.run(query)

生成结果如下:

maprecude信息聚合检索框架

MapReduce 文档链首先将 LLM 链单独应用于每个文档(Map 步骤),将链输出视为新文档。 然后,它将所有新文档传递到单独的组合文档链以获得单个输出(Reduce 步骤)。 它可以选择首先压缩或折叠映射的文档,以确保它们适合组合文档链(这通常会将它们传递给LLM)。 如有必要,该压缩步骤会递归执行。

实现代码如下:

from langchain.chains import RetrievalQA
ruff = RetrievalQA.from_chain_type(
    llm=llm, chain_type="map_reduce", retriever=db.as_retriever()
)
ruff.run(query)

生成效果如下:

用户自定义检索框架

上面介绍了几种端到端query向量数据库生成答案的方式,但往往这样的端到端生成方法无法满足我们需要。我们要如何去改进提高呢,要回答这个问题就要知道基于向量库生成答案到底做了什么事。拆看代码看其实就两部分:1.通过语意相似度召回相关资料 2.把召回的相关资料做了摘要总结生成答案。可以通过下面一个示例代码,看如何来把两个部分合成一个。

# 导入LLMChain并定义一个链,用语言模型和提示作为参数。

from langchain.chains import LLMChain
chain = LLMChain(llm=llm, prompt=prompt)

# 只指定输入变量来运行链。
print(chain.run("autoencoder"))
# 定义一个第二个提示

second_prompt = PromptTemplate(
    input_variables=["ml_concept"],
    template="把{ml_concept}的概念描述转换成用500字向我解释,就像我是一个五岁的孩子一样",
)
chain_two = LLMChain(llm=llm, prompt=second_prompt)

# 用上面的两个链定义一个顺序链:第二个链把第一个链的输出作为输入

from langchain.chains import SimpleSequentialChain
overall_chain = SimpleSequentialChain(chains=[chain, chain_two], verbose=True)

# 只指定第一个链的输入变量来运行链。
explanation = overall_chain.run("autoencoder")
print(explanation)

所以我们如果要定制化解决方案,其实只要把摘要生成答案部分加入策略就行了。具体代码如下,只要把你的约束策略放到prompt_template就定制化满足你的要求了。

from langchain.prompts import PromptTemplate

prompt_template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.

{context}

Question: {question}
Helpful Answer:"""
PROMPT = PromptTemplate(
    template=prompt_template, input_variables=["context", "question"]
)

chain = load_summarize_chain(llm, chain_type="stuff", prompt=PROMPT)
chain.run(docs)

把上面模块封装成Tool

#把基于向量库的检索生成封装成Tool
ruff = RetrievalQA.from_chain_type(
    llm=llm, chain_type="stuff", retriever=db.as_retriever()
)

tools = [
    Tool(
        name="Ruff QA System",
        func=ruff.run,
        description="useful for when you need to answer questions about ruff (a python linter). Input should be a fully formed question.",
    ),
]

#挂到Agent上测试效果
agent = initialize_agent(
    tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True
)

agent.run(
    "What did biden say about ketanji brown jackson in the state of the union address?"
)

小结:

1.总体介绍了基于向量检索的框架,主要分为两大块:内容存储、内容检索

2.具体介绍了内容存储部分技术细节:数据加载模块、数据切块模块、数据embbeding模块、数据存储模块及代码实现

3.具体介绍了内容检索部分:向量相似度召回+基于上下文生成问题答案,实现原理和实现代码

4.介绍了如何把向量检索生成封装成tool供agnet使用

项目代码:https://github.com/liangwq/Chatglm_lora_multi-gpu/tree/main/APP_example/chatglm_agent

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐