1、下载项目代码

源码地址:
https://github.com/xujinhelaw/chat-bot-ananas.git
clone代码

git clone https://github.com/xujinhelaw/chat-bot-ananas.git

2、代码结构

项目结构如下

chat-bot-ananas/ (根项目)
├── llm-server/ (大模型服务端模块)
│   ├── api.py(大模型启动和开发接口代码)
│   ├── chatmachine.py(大模型访问客户端代码)
│   ├── download.py(大模型下载代码)
│   ├── environment.yml(大模型部署和访问客户端需要的依赖包)

3、搭建python的执行环境

首先要安装python环境管理和依赖软件包管理工具,这里选择conda,conda支持创建不同的环境,下载安装不同版本的Python和软件包,并支持灵活切换。

3.1 安装miniconda(仅包含python、conda 和 conda 的依赖项)

官网:
https://www.anaconda.com/
最新版本:
https://docs.conda.io/projects/conda/en/latest/index.html
旧版本:
https://repo.anaconda.com/miniconda/
miniconda
conda安装成功
miniconda安装成功

3.2 配置下载源

打开的 Anaconda prompt 控制台(非必须但建议配,因为environment.yml有指定)

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/

3.3 配置conda的环境信息(非必须,如果需要powershell直接使用conda命令,则配置)

在Path变量中添加conda.exe的路径
conda环境变量配置

4 执行代码

4.1 创建py环境并安装依赖包

打开的 Anaconda prompt 控制台,进到chat-bot-ananas/llm-server/目录,导入环境文件chat-bot-ananas/llm-server/environment.yml

conda env create -f environment.yml
# windows 切换环境
conda activate qwen
# Linux/Unix 切换环境
source activate qwen

environment.yml依赖如下:

name: qwen
channels:
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main/
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch/
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/pro
  - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
dependencies:
  - python=3.8.10
  - pip=24.2
  - pip:
    - -i https://pypi.tuna.tsinghua.edu.cn/simple/
    - fastapi==0.104.1
    - uvicorn==0.24.0.post1
    - requests==2.32.4
    - modelscope==1.9.5
    - transformers==4.35.2
    - transformers_stream_generator==0.0.4
    - pip==24.2

依赖包安装成功

4.2 模型下载

到modelscope查看要下载的模型的名称
https://www.modelscope.cn/home
大模型名称

执行模型下载代码

python download.py

如果执行过程中有哪些包没有安装,例如
缺包报错

则执行下面命令单独安装依赖包

pip install modelscope==1.9.5

模型下载代码如下所示:

import torch
from modelscope import snapshot_download, AutoModel, AutoTokenizer
from modelscope import GenerationConfig
# 第一个参数为模型名称,参数cache_dir为模型的下载路径
model_dir = snapshot_download('qwen/Qwen-7B-Chat', cache_dir='/root/model',revision='v1.1.4')

等待模型下载完成
模型下载完成

4.3 启动大模型

执行模型启动代码

python api.py

大模型启动和接口开放代码如下:

#fastapi 用于开放大模型访问接口
from fastapi import FastAPI, Request
#StreamingResponse 用于开放大模型访问接口流式响应
from fastapi.responses import StreamingResponse
#threading  用于多线程处理大模型接口访问
from threading import Thread
#在没有大并发的情况下,asyncio 异步io用于模拟延迟
import asyncio
#transformers ,大名鼎鼎的大模型注意力机制框架实现库
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig, TextIteratorStreamer
#一个基于asyncio开发的一个轻量级高效的web服务器框架
import uvicorn
import json
import datetime
# torch 开源机器学习库,主要用于张量计算和梯度计算
import torch
#解决跨域问题
from fastapi.middleware.cors import CORSMiddleware

# 设置设备参数
DEVICE = "cuda"  # 使用CUDA
DEVICE_ID = "0"  # CUDA设备ID,如果未设置则为空
CUDA_DEVICE = f"{DEVICE}:{DEVICE_ID}" if DEVICE_ID else DEVICE  # 组合CUDA设备信息

# 清理GPU内存函数
def torch_gc():
    if torch.cuda.is_available():  # 检查是否可用CUDA
        with torch.cuda.device(CUDA_DEVICE):  # 为训练工具指定CUDA设备
            torch.cuda.empty_cache()  # 清空CUDA缓存
            torch.cuda.ipc_collect()  # 收集CUDA内存碎片

# 创建FastAPI应用
app = FastAPI()

# 添加 CORS 中间件
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 或指定具体域名,如 ["http://localhost:6006"]
    allow_credentials=True,
    allow_methods=["*"],  # 允许所有方法(GET, POST, OPTIONS 等)
    allow_headers=["*"],  # 允许所有头
)

# --- 1. 定义流式生成函数 ---
def generate_stream(messages: str, max_new_tokens: int = 512, temperature=0.7):
    global model, tokenizer  # 声明全局变量以便在函数内部使用模型和分词器
    print(f"生成流式响应的异步生成器,messages is {messages}")

    input_ids = tokenizer.apply_chat_template(
        messages,  # 要格式化的消息
        tokenize=False, # 不进行分词
        add_generation_prompt=True # 添加生成提示
    )
    # 对输入进行编码
    model_inputs = tokenizer([input_ids], return_tensors="pt").to(model.device)

    # 创建TextIteratorStreamer,用于流式获取生成的token
    streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, timeout=10.0)

    # 执行大模型计算输出
    model.generate(model_inputs.input_ids, max_new_tokens=512, temperature=0.7, streamer=streamer)

    # 对输出进行流式响应
    async def stream_response():
        for text in streamer:
            if text:
                # 处理消息分割符,<|im_start|>表示一条消息的开始,
                #<|im_end|> 表示一条消息的结束,<|endoftext|>表示文本生成结束
                if text.find("<|im_end|>")!= -1:
                    text = text.split("<|im_end|>")[0]
                if text.find("<|endoftext|>")!=-1:
                    text = text.split("<|endoftext|>")[0]
                chunk = {
                    "id": f"chatcmpl-{datetime.datetime.now().timestamp()}",
                    "object": "chat.completion.chunk",
                    "created": int(datetime.datetime.now().timestamp()),
                    "model": "qwen",
                    "choices": [
                        {
                            "index": 0,
                            "delta": {"content": text},
                            "finish_reason": None
                        }
                    ]
                }
                # 用yield迭代推送对话内容
                yield f"data: {json.dumps(chunk)}\n\n"
                await asyncio.sleep(0.1) #模拟延迟
                print(f"data is: {json.dumps(chunk)}")
                print(f"content is:{repr(text)}")

        # 发送结束 chunk
        final_chunk = {
            "id": f"chatcmpl-{datetime.datetime.now().timestamp()}",
            "object": "chat.completion.chunk",
            "created": int(datetime.datetime.now().timestamp()),
            "model": "qwen",
            "choices": [
                {
                    "index": 0,
                    "delta": {},
                    "finish_reason": "stop"
                }
            ]
        }
        yield f"data: {json.dumps(final_chunk)}\n\n"
        yield "data: [DONE]\n\n"
        print(f"响应结束!!!")
    # StreamingResponse下的返回信息
    return StreamingResponse(
        stream_response(),
        media_type="text/event-stream"
    )

@app.post("/v1/chat/completions")
async def stream_chat(request: Request):
    """
    流式聊天API端点。
    客户端发送JSON: {"prompt": "你的问题", "max_tokens": 512}
    服务器返回text/event-stream流。
    """
    json_post_raw = await request.json()  # 获取POST请求的JSON数据
    print(f"生成流式响应的异步生成器,request_raw is {json_post_raw}")
    json_post = json.dumps(json_post_raw)  # 将JSON数据转换为字符串
    json_post_list = json.loads(json_post)  # 将字符串转换为Python对象
    messages = json_post_list.get('messages')  # 获取请求中的提示
    print(f"生成流式响应的异步生成器,messages is {messages}")
    return generate_stream(messages,10240)

# 主函数入口
if __name__ == '__main__':
    # 加载预训练的分词器和模型
    print("正在加载模型... ...")
    tokenizer = AutoTokenizer.from_pretrained("./qwen/Qwen-7B-Chat", trust_remote_code=True)
    # 可以通过自定义device_map将模型的不同分层分配到不同的GPU达到GPU的高效使用
    model = AutoModelForCausalLM.from_pretrained("./qwen/Qwen-7B-Chat",torch_dtype=torch.float16, device_map="auto", trust_remote_code=True).eval()
    model.generation_config = GenerationConfig.from_pretrained("./qwen/Qwen-7B-Chat", trust_remote_code=True) # 可指定
    model.eval()  # 设置模型为评估模式
    print("模型加载完成。")
    # 启动FastAPI应用
    uvicorn.run(app, host='0.0.0.0', port=6006)  # 在指定端口和主机上启动应用

4.4 访问大模型

重新开一个终端,通过curl命令访问

curl -H "Content-Type:octet-stream" -X POST -d '{"messages": [{"role": "system","content": "你是一个 helpful assistant。"},
 {"role": "user", "content": "介绍一下你自己"}]}' http://127.0.0.1:6006/v1/chat/completions

大模型curl成功

4.5 通过python实现的客户端访问大模型

启动客户端

# 因为是新开的终端,记得切到虚拟环境
# windows 切换环境
conda activate qwen
# Linux/Unix 切换环境
source activate qwen
python chatmachine.py

大模型访问的客户端代码如下:

import requests
import json
 
class ChatBot:
    def __init__(self, api_key, model="Qwen-7B-Chat", url="http://127.0.0.1:6006"):
        self.api_key = api_key
        self.model = model
        self.url = url
        self.headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {self.api_key}'
        }
 
        self.messages = [
            {
                "role": "system",
                "content": "you are a chatbot"
            }
        ]

    def chat(self, user_message):
        print(f"user_message is {user_message}")
        self.messages.append({"role": "user", "content": user_message})

        payload = json.dumps({
            "model": self.model,
            "messages": self.messages
        }).encode("utf-8")

        print(f"payload is {payload}")

        response = requests.post(self.url, headers=self.headers, data=payload)

        if response.status_code == 200:
            response_data = response.json()
            bot_reply = response_data['choices'][0]['message']['content']
            self.messages.append({"role": "assistant", "content": bot_reply})
            return bot_reply
        else:
            return f"Error: {response.status_code}, {response.text}"
    def chatLocal(self, prompt):
        headers = {'Content-Type': 'application/json'}
        data = {"prompt": prompt, "history": []}
        response = requests.post(url='http://127.0.0.1:6006', headers=headers, data=json.dumps(data))
        return response.json()['response']

    def chatSSE(self, user_message):
        headers = {'Content-Type': 'application/octet-stream'}
        print(f"user_message is {user_message}")
        self.messages.append({"role": "user", "content": user_message})

        payload = json.dumps({
            "model": self.model,
            "messages": self.messages
        }).encode("utf-8")

        print(f"payload is {payload}")
        try:
            print("发起模型请求")
            response = requests.post(url='http://127.0.0.1:6006/v1/chat/completions/', headers=headers, data=payload,stream=True)
            print(f"response: {response}")
            print("机器人: ")
            for line in response.iter_lines():
                if not line:
                    continue
                line = line.decode('utf-8').strip()
                if line.startswith("data: "):
                    data_str = line[6:].strip()
                if data_str == "[DONE]":
                    print("") #换行
                    break
                try:
                    data = json.loads(data_str)
                    content_str = data["choices"][0]["delta"].get("content", "")
                    if content_str:
                        yield content_str
                except json.JSONDecodeError:
                    continue
        except requests.RequestException as e:
            print(f"Request failed: {e}")

def main():
    api_key = "你的API密钥"  
    # 聊天机器人的入口
    bot = ChatBot(api_key)
    print("你好! 你可以开始与机器人对话了. 输入 'exit' 来结束对话.")
 
    while True:
        user_input = input("你: ")
        if user_input.lower() == 'exit':
            break

        for data in bot.chatSSE(user_input):
            print(data, end="", flush=True)
 
if __name__ == "__main__":
    main()

python客户端访问大模型
大模型访问客户端如果本地python的聊天机器人不能满足要求,需要构建网页版的聊天机器人,可以参见:https://www.jianshu.com/p/03f79ac2b145

5 附录

其他有用的命令

# 创建环境
conda create --name myenv python=3.8
# 删除环境
conda remove --name myenv --all
# 查看已有的 conda 环境
conda env list
# windows 切换环境
conda activate myenv
# Linux/Unix 切换环境
source activate myenv
# 退出环境
conda deactivate
# 下载安装 fastapi
conda install fastapi
# 卸载 fastapi
conda remove fastapi
# 导出环境
conda env export > environment.yml
# 导入环境
conda env create -f environment.yml
# 查询pip安装的包的版本
pip show transformers|grep Version
Logo

更多推荐