Qwen3-4B-Thinking-GGUF部署教程:Chainlit多用户权限隔离与审计日志

1. 引言:从单机部署到多用户协作的挑战

如果你已经成功在单机上部署了Qwen3-4B-Thinking模型,体验过它的强大推理能力,那么接下来可能会遇到一个现实问题:如何让团队里的多个成员都能安全、有序地使用这个AI助手?

想象一下这样的场景:开发团队需要用它来审查代码,产品经理想让它帮忙写需求文档,测试人员用它生成测试用例。如果大家都用同一个账号登录,不仅权限混乱,出了问题也找不到是谁干的。更麻烦的是,如果某个成员不小心删除了重要对话记录,或者输入了不合适的提示词,你连追溯都困难。

这就是我们今天要解决的问题。我将带你一步步搭建一个支持多用户、权限隔离、完整审计日志的Qwen3-4B-Thinking部署方案。通过Chainlit这个强大的前端框架,我们可以轻松实现:

  • 用户认证:不同成员用不同账号登录
  • 权限控制:不同角色看到不同的功能界面
  • 操作审计:谁在什么时候做了什么,一目了然
  • 对话隔离:每个人的对话记录互不干扰

整个方案基于你已经熟悉的vLLM部署环境,不需要复杂的架构改造。即使你之前只用过简单的单用户部署,跟着这个教程也能轻松上手。

2. 环境准备与基础部署

2.1 检查现有部署状态

首先确认你的Qwen3-4B-Thinking模型已经通过vLLM成功部署。打开终端,运行:

# 查看模型服务日志
cat /root/workspace/llm.log

如果看到类似下面的输出,说明模型服务运行正常:

INFO 07-15 10:30:25 llm_engine.py:73] Initializing an LLM engine...
INFO 07-15 10:30:28 model_runner.py:83] Loading model weights...
INFO 07-15 10:30:45 llm_engine.py:161] LLM engine is ready.

2.2 安装Chainlit及相关依赖

接下来安装Chainlit和必要的扩展包。Chainlit不仅提供了漂亮的聊天界面,还内置了用户管理的基础功能。

# 安装Chainlit核心包
pip install chainlit

# 安装用户认证相关的扩展
pip install python-jose[cryptography]  # JWT token处理
pip install passlib[bcrypt]  # 密码哈希
pip install python-multipart  # 表单数据处理

# 安装数据库支持(使用SQLite,简单易用)
pip install sqlalchemy

2.3 创建项目目录结构

为了代码清晰,建议按以下结构组织文件:

qwen-multi-user/
├── app.py              # 主应用文件
├── auth.py             # 认证和权限管理
├── database.py         # 数据库模型和操作
├── config.py           # 配置文件
├── requirements.txt    # 依赖列表
├── static/            # 静态文件
│   └── styles.css     # 自定义样式
└── templates/         # HTML模板(如果需要自定义登录页)

创建基础配置文件 config.py

# config.py - 应用配置
import os
from datetime import timedelta

class Config:
    # 应用密钥,用于加密会话和令牌
    SECRET_KEY = os.environ.get('SECRET_KEY') or 'your-secret-key-change-in-production'
    
    # JWT配置
    JWT_SECRET_KEY = SECRET_KEY
    JWT_ACCESS_TOKEN_EXPIRES = timedelta(hours=1)
    JWT_REFRESH_TOKEN_EXPIRES = timedelta(days=30)
    
    # 数据库配置
    DATABASE_URL = 'sqlite:///users.db'  # 使用SQLite,生产环境可换MySQL/PostgreSQL
    
    # vLLM模型服务地址
    VLLM_API_URL = 'http://localhost:8000/v1/completions'
    
    # 用户角色定义
    ROLES = {
        'admin': ['*'],  # 管理员拥有所有权限
        'developer': ['code_review', 'debug_assist', 'documentation'],
        'product_manager': ['requirement_analysis', 'documentation', 'brainstorming'],
        'tester': ['test_case_generation', 'bug_analysis'],
        'viewer': ['view_only']  # 只读用户
    }
    
    # 审计日志配置
    AUDIT_LOG_ENABLED = True
    LOG_FILE = 'audit.log'

3. 实现多用户认证与权限系统

3.1 设计用户数据库模型

database.py 中定义用户数据模型:

# database.py - 数据库模型
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import hashlib
import uuid

Base = declarative_base()

class User(Base):
    """用户表"""
    __tablename__ = 'users'
    
    id = Column(Integer, primary_key=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), unique=True, nullable=False)
    password_hash = Column(String(255), nullable=False)
    role = Column(String(50), default='viewer')  # 用户角色
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    last_login = Column(DateTime, nullable=True)
    
    # 用于密码重置等操作
    reset_token = Column(String(100), nullable=True)
    reset_token_expiry = Column(DateTime, nullable=True)

class AuditLog(Base):
    """审计日志表"""
    __tablename__ = 'audit_logs'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False)
    username = Column(String(50), nullable=False)
    action = Column(String(100), nullable=False)  # 操作类型:login, query, delete, etc.
    details = Column(Text, nullable=True)  # 操作详情
    ip_address = Column(String(50), nullable=True)
    user_agent = Column(Text, nullable=True)
    timestamp = Column(DateTime, default=datetime.utcnow)
    
    # 对于模型查询,记录相关信息
    model_query = Column(Text, nullable=True)
    model_response = Column(Text, nullable=True)
    tokens_used = Column(Integer, nullable=True)

class Conversation(Base):
    """用户对话记录表"""
    __tablename__ = 'conversations'
    
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False)
    conversation_id = Column(String(100), default=lambda: str(uuid.uuid4()))
    title = Column(String(200), nullable=True)  # 对话标题(自动生成)
    messages = Column(Text, nullable=False)  # JSON格式的对话记录
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
    is_archived = Column(Boolean, default=False)

# 初始化数据库
def init_db():
    from config import Config
    engine = create_engine(Config.DATABASE_URL)
    Base.metadata.create_all(engine)
    return engine

def get_session():
    engine = init_db()
    Session = sessionmaker(bind=engine)
    return Session()

3.2 实现用户认证逻辑

auth.py 中实现完整的认证系统:

# auth.py - 认证和权限管理
from datetime import datetime, timedelta
from functools import wraps
import jwt
from passlib.context import CryptContext
from database import get_session, User, AuditLog
from config import Config
import hashlib

# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class AuthManager:
    def __init__(self):
        self.config = Config
    
    def hash_password(self, password: str) -> str:
        """哈希密码"""
        return pwd_context.hash(password)
    
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """验证密码"""
        return pwd_context.verify(plain_password, hashed_password)
    
    def create_user(self, username: str, email: str, password: str, role: str = 'viewer'):
        """创建新用户"""
        session = get_session()
        try:
            # 检查用户是否已存在
            existing_user = session.query(User).filter(
                (User.username == username) | (User.email == email)
            ).first()
            
            if existing_user:
                return False, "用户名或邮箱已存在"
            
            # 创建新用户
            new_user = User(
                username=username,
                email=email,
                password_hash=self.hash_password(password),
                role=role,
                created_at=datetime.utcnow()
            )
            
            session.add(new_user)
            session.commit()
            
            # 记录审计日志
            self.log_audit(
                user_id=new_user.id,
                username=username,
                action='user_created',
                details=f'创建用户 {username},角色:{role}'
            )
            
            return True, "用户创建成功"
        except Exception as e:
            session.rollback()
            return False, f"创建用户失败:{str(e)}"
        finally:
            session.close()
    
    def authenticate_user(self, username: str, password: str, ip: str = None, user_agent: str = None):
        """用户认证"""
        session = get_session()
        try:
            user = session.query(User).filter(User.username == username).first()
            
            if not user or not user.is_active:
                return None, "用户不存在或已被禁用"
            
            if not self.verify_password(password, user.password_hash):
                # 记录失败的登录尝试
                self.log_audit(
                    user_id=user.id,
                    username=username,
                    action='login_failed',
                    details='密码错误',
                    ip_address=ip,
                    user_agent=user_agent
                )
                return None, "密码错误"
            
            # 更新最后登录时间
            user.last_login = datetime.utcnow()
            session.commit()
            
            # 生成JWT令牌
            token = self.create_access_token({
                'user_id': user.id,
                'username': user.username,
                'role': user.role
            })
            
            # 记录成功的登录
            self.log_audit(
                user_id=user.id,
                username=username,
                action='login_success',
                details='登录成功',
                ip_address=ip,
                user_agent=user_agent
            )
            
            return {
                'access_token': token,
                'token_type': 'bearer',
                'user': {
                    'id': user.id,
                    'username': user.username,
                    'email': user.email,
                    'role': user.role
                }
            }, "登录成功"
            
        except Exception as e:
            return None, f"认证失败:{str(e)}"
        finally:
            session.close()
    
    def create_access_token(self, data: dict):
        """创建JWT访问令牌"""
        to_encode = data.copy()
        expire = datetime.utcnow() + self.config.JWT_ACCESS_TOKEN_EXPIRES
        to_encode.update({'exp': expire})
        encoded_jwt = jwt.encode(to_encode, self.config.JWT_SECRET_KEY, algorithm='HS256')
        return encoded_jwt
    
    def verify_token(self, token: str):
        """验证JWT令牌"""
        try:
            payload = jwt.decode(token, self.config.JWT_SECRET_KEY, algorithms=['HS256'])
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def check_permission(self, user_role: str, required_permission: str) -> bool:
        """检查用户是否有特定权限"""
        if user_role not in self.config.ROLES:
            return False
        
        user_permissions = self.config.ROLES[user_role]
        
        # 管理员拥有所有权限
        if user_role == 'admin':
            return True
        
        # 检查具体权限
        return required_permission in user_permissions
    
    def log_audit(self, user_id: int, username: str, action: str, 
                  details: str = None, ip_address: str = None, 
                  user_agent: str = None, model_query: str = None,
                  model_response: str = None, tokens_used: int = None):
        """记录审计日志"""
        if not self.config.AUDIT_LOG_ENABLED:
            return
        
        session = get_session()
        try:
            log_entry = AuditLog(
                user_id=user_id,
                username=username,
                action=action,
                details=details,
                ip_address=ip_address,
                user_agent=user_agent,
                model_query=model_query,
                model_response=model_response,
                tokens_used=tokens_used,
                timestamp=datetime.utcnow()
            )
            session.add(log_entry)
            session.commit()
        except Exception as e:
            print(f"记录审计日志失败:{str(e)}")
        finally:
            session.close()

# 权限装饰器
def require_permission(permission: str):
    """权限检查装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 这里简化处理,实际应用中需要从请求中获取用户信息
            user_info = kwargs.get('user_info', {})
            user_role = user_info.get('role', 'viewer')
            
            auth = AuthManager()
            if not auth.check_permission(user_role, permission):
                return {
                    'success': False,
                    'message': f'权限不足,需要 {permission} 权限'
                }
            return func(*args, **kwargs)
        return wrapper
    return decorator

4. 集成Chainlit实现多用户界面

4.1 创建主应用文件

现在我们来创建主应用文件 app.py,集成Chainlit和我们的认证系统:

# app.py - 主应用文件
import chainlit as cl
from chainlit.types import AskFileResponse
import aiohttp
import json
from datetime import datetime
from typing import Optional, List
import asyncio

from auth import AuthManager, require_permission
from database import get_session, User, Conversation, AuditLog
from config import Config

# 初始化认证管理器
auth_manager = AuthManager()

# 存储用户会话状态
user_sessions = {}

class UserSession:
    """用户会话管理"""
    def __init__(self, user_id: int, username: str, role: str):
        self.user_id = user_id
        self.username = username
        self.role = role
        self.conversation_history = []
        self.current_conversation_id = None
        self.token_count = 0
        
    def add_message(self, role: str, content: str):
        """添加消息到当前对话"""
        message = {
            'role': role,
            'content': content,
            'timestamp': datetime.utcnow().isoformat()
        }
        self.conversation_history.append(message)
        
        # 记录到数据库
        self.save_conversation()
    
    def save_conversation(self):
        """保存对话到数据库"""
        if not self.conversation_history:
            return
        
        session = get_session()
        try:
            # 如果是新对话,创建对话记录
            if not self.current_conversation_id:
                # 自动生成对话标题(使用第一条用户消息的前50个字符)
                user_messages = [msg for msg in self.conversation_history if msg['role'] == 'user']
                title = user_messages[0]['content'][:50] + '...' if user_messages else '新对话'
                
                new_conversation = Conversation(
                    user_id=self.user_id,
                    title=title,
                    messages=json.dumps(self.conversation_history, ensure_ascii=False)
                )
                session.add(new_conversation)
                session.commit()
                self.current_conversation_id = new_conversation.id
            else:
                # 更新现有对话
                conversation = session.query(Conversation).filter(
                    Conversation.id == self.current_conversation_id,
                    Conversation.user_id == self.user_id
                ).first()
                
                if conversation:
                    conversation.messages = json.dumps(self.conversation_history, ensure_ascii=False)
                    conversation.updated_at = datetime.utcnow()
                    session.commit()
        except Exception as e:
            print(f"保存对话失败:{str(e)}")
        finally:
            session.close()

@cl.on_chat_start
async def on_chat_start():
    """聊天开始时的处理"""
    # 检查用户是否已登录
    user_session = cl.user_session.get("user_session")
    
    if not user_session:
        # 显示登录界面
        await show_login_interface()
        return
    
    # 已登录用户,显示欢迎信息
    await cl.Message(
        content=f"欢迎回来,{user_session.username}!\n\n"
                f"您的角色:{user_session.role}\n"
                f"今天您已经使用了 {user_session.token_count} 个token\n\n"
                f"请输入您的问题,我将为您调用Qwen3-4B-Thinking模型进行回答。"
    ).send()
    
    # 显示历史对话(如果有)
    await load_conversation_history(user_session)

async def show_login_interface():
    """显示登录界面"""
    # 使用Chainlit的输入组件实现登录表单
    settings = await cl.ChatSettings(
        [
            cl.input_widget.TextInput(
                id="username",
                label="用户名",
                placeholder="请输入用户名"
            ),
            cl.input_widget.TextInput(
                id="password",
                label="密码",
                placeholder="请输入密码",
                type="password"
            )
        ]
    ).send()
    
    # 等待用户输入
    res = await cl.AskUserMessage(
        content="请在上方输入用户名和密码,然后点击登录按钮",
        timeout=300
    ).send()
    
    if res:
        # 获取用户输入
        username = settings.get("username")
        password = settings.get("password")
        
        if not username or not password:
            await cl.Message(content="用户名和密码不能为空").send()
            await show_login_interface()
            return
        
        # 进行认证
        auth_result, message = auth_manager.authenticate_user(
            username=username,
            password=password,
            ip=cl.user_session.get("client_ip", ""),
            user_agent=cl.user_session.get("user_agent", "")
        )
        
        if auth_result:
            # 认证成功,创建用户会话
            user_session = UserSession(
                user_id=auth_result['user']['id'],
                username=auth_result['user']['username'],
                role=auth_result['user']['role']
            )
            
            cl.user_session.set("user_session", user_session)
            cl.user_session.set("access_token", auth_result['access_token'])
            
            # 发送欢迎消息
            await cl.Message(
                content=f"登录成功!欢迎 {username}。\n\n"
                        f"您的权限级别:{user_session.role}\n"
                        f"现在您可以开始使用Qwen3-4B-Thinking模型了。"
            ).send()
            
            # 重新加载聊天界面
            await on_chat_start()
        else:
            await cl.Message(content=f"登录失败:{message}").send()
            await show_login_interface()

async def load_conversation_history(user_session: UserSession):
    """加载用户的历史对话"""
    session = get_session()
    try:
        conversations = session.query(Conversation).filter(
            Conversation.user_id == user_session.user_id,
            Conversation.is_archived == False
        ).order_by(Conversation.updated_at.desc()).limit(10).all()
        
        if conversations:
            # 创建对话选择界面
            elements = []
            for conv in conversations:
                elements.append(
                    cl.Button(
                        name=f"conv_{conv.id}",
                        value=conv.id,
                        label=conv.title,
                        description=conv.updated_at.strftime("%Y-%m-%d %H:%M")
                    )
                )
            
            # 显示历史对话按钮
            actions = await cl.Message(
                content="您有以下历史对话:",
                actions=elements
            ).send()
            
            # 等待用户选择
            if actions:
                res = await cl.AskActionMessage(
                    content="请选择要加载的对话,或输入新问题开始新对话",
                    actions=elements,
                    timeout=60
                ).send()
                
                if res and res.get('value'):
                    # 加载选中的对话
                    conversation_id = int(res['value'])
                    conversation = session.query(Conversation).filter(
                        Conversation.id == conversation_id,
                        Conversation.user_id == user_session.user_id
                    ).first()
                    
                    if conversation:
                        user_session.current_conversation_id = conversation_id
                        user_session.conversation_history = json.loads(conversation.messages)
                        
                        # 显示对话历史
                        await cl.Message(content="已加载历史对话:").send()
                        for msg in user_session.conversation_history[-5:]:  # 显示最近5条
                            role = "您" if msg['role'] == 'user' else "AI助手"
                            await cl.Message(content=f"{role}: {msg['content'][:100]}...").send()
    
    except Exception as e:
        print(f"加载历史对话失败:{str(e)}")
    finally:
        session.close()

@cl.on_message
async def on_message(message: cl.Message):
    """处理用户消息"""
    # 获取用户会话
    user_session = cl.user_session.get("user_session")
    
    if not user_session:
        await cl.Message(content="请先登录").send()
        await show_login_interface()
        return
    
    # 检查用户是否有查询权限
    if not auth_manager.check_permission(user_session.role, 'query_model'):
        await cl.Message(
            content="抱歉,您的账户没有使用模型的权限。请联系管理员。"
        ).send()
        return
    
    # 显示思考状态
    msg = cl.Message(content="")
    await msg.send()
    
    # 添加用户消息到历史
    user_session.add_message('user', message.content)
    
    try:
        # 调用vLLM API
        async with aiohttp.ClientSession() as session:
            payload = {
                "model": "Qwen3-4B-Thinking",
                "prompt": message.content,
                "max_tokens": 1024,
                "temperature": 0.7,
                "top_p": 0.9
            }
            
            async with session.post(
                Config.VLLM_API_URL,
                json=payload,
                timeout=aiohttp.ClientTimeout(total=300)
            ) as response:
                if response.status == 200:
                    result = await response.json()
                    ai_response = result['choices'][0]['text']
                    
                    # 更新消息内容
                    await msg.stream_token(ai_response)
                    
                    # 添加AI回复到历史
                    user_session.add_message('assistant', ai_response)
                    
                    # 记录审计日志
                    auth_manager.log_audit(
                        user_id=user_session.user_id,
                        username=user_session.username,
                        action='model_query',
                        details='查询模型',
                        model_query=message.content[:500],  # 只记录前500字符
                        model_response=ai_response[:500],
                        tokens_used=result.get('usage', {}).get('total_tokens', 0),
                        ip_address=cl.user_session.get("client_ip", ""),
                        user_agent=cl.user_session.get("user_agent", "")
                    )
                    
                    # 更新token计数
                    user_session.token_count += result.get('usage', {}).get('total_tokens', 0)
                    
                else:
                    error_text = await response.text()
                    await msg.update(content=f"模型调用失败:{error_text}")
                    
                    # 记录错误日志
                    auth_manager.log_audit(
                        user_id=user_session.user_id,
                        username=user_session.username,
                        action='model_error',
                        details=f'模型调用失败,状态码:{response.status}',
                        model_query=message.content[:500]
                    )
    
    except Exception as e:
        error_msg = f"请求出错:{str(e)}"
        await msg.update(content=error_msg)
        
        # 记录异常日志
        auth_manager.log_audit(
            user_id=user_session.user_id,
            username=user_session.username,
            action='system_error',
            details=f'系统异常:{str(e)}'
        )

@cl.on_chat_end
def on_chat_end():
    """聊天结束时的清理工作"""
    user_session = cl.user_session.get("user_session")
    if user_session:
        # 保存最终对话状态
        user_session.save_conversation()
        
        # 记录登出日志
        auth_manager.log_audit(
            user_id=user_session.user_id,
            username=user_session.username,
            action='logout',
            details='用户退出聊天'
        )

@cl.password_auth_callback
def password_auth_callback(username: str, password: str) -> Optional[cl.User]:
    """Chainlit内置的密码认证回调(备用方案)"""
    auth_result, message = auth_manager.authenticate_user(username, password)
    
    if auth_result:
        user_info = auth_result['user']
        return cl.User(
            identifier=user_info['username'],
            metadata={
                "role": user_info['role'],
                "user_id": user_info['id']
            }
        )
    return None

@cl.header_auth_callback
def header_auth_callback(headers: dict) -> Optional[cl.User]:
    """Header认证回调(用于API调用)"""
    auth_header = headers.get("Authorization", "")
    if auth_header.startswith("Bearer "):
        token = auth_header[7:]
        payload = auth_manager.verify_token(token)
        
        if payload:
            return cl.User(
                identifier=payload['username'],
                metadata={
                    "role": payload['role'],
                    "user_id": payload['user_id']
                }
            )
    return None

# 自定义动作处理
@cl.action_callback("new_conversation")
async def on_new_conversation(action: cl.Action):
    """开始新对话"""
    user_session = cl.user_session.get("user_session")
    if user_session:
        user_session.conversation_history = []
        user_session.current_conversation_id = None
        await cl.Message(content="已开始新对话").send()

@cl.action_callback("save_conversation")
async def on_save_conversation(action: cl.Action):
    """保存当前对话"""
    user_session = cl.user_session.get("user_session")
    if user_session:
        user_session.save_conversation()
        await cl.Message(content="对话已保存").send()

@cl.action_callback("view_audit_logs")
@require_permission('view_audit')
async def on_view_audit_logs(action: cl.Action):
    """查看审计日志(需要权限)"""
    user_session = cl.user_session.get("user_session")
    if not user_session:
        return
    
    session = get_session()
    try:
        # 只显示当前用户自己的日志(管理员可以看到所有)
        if user_session.role == 'admin':
            logs = session.query(AuditLog).order_by(AuditLog.timestamp.desc()).limit(50).all()
        else:
            logs = session.query(AuditLog).filter(
                AuditLog.user_id == user_session.user_id
            ).order_by(AuditLog.timestamp.desc()).limit(50).all()
        
        if logs:
            log_text = "## 最近操作记录\n\n"
            for log in logs:
                log_text += f"**{log.timestamp.strftime('%Y-%m-%d %H:%M')}** - {log.action}\n"
                if log.details:
                    log_text += f"详情:{log.details}\n"
                if log.model_query:
                    log_text += f"查询:{log.model_query[:100]}...\n"
                log_text += "---\n"
            
            await cl.Message(content=log_text).send()
        else:
            await cl.Message(content="暂无操作记录").send()
    
    except Exception as e:
        await cl.Message(content=f"获取日志失败:{str(e)}").send()
    finally:
        session.close()

# 设置Chainlit配置
cl.instrument()

4.2 创建Chainlit配置文件

创建 chainlit.md 文件来配置应用界面:

# Qwen3-4B-Thinking 多用户平台

欢迎使用基于Qwen3-4B-Thinking模型的多用户AI助手平台。

## 🚀 功能特性

- **多用户支持**:支持多个用户同时使用,数据完全隔离
- **权限管理**:基于角色的权限控制系统
- **完整审计**:记录所有用户操作,便于追溯
- **对话历史**:自动保存和加载历史对话
- **Token统计**:实时统计模型使用量

## 👥 用户角色说明

- **管理员**:拥有所有权限,可以管理用户和查看所有日志
- **开发者**:可以使用代码审查、调试辅助等功能
- **产品经理**:可以使用需求分析、文档生成等功能
- **测试人员**:可以使用测试用例生成、缺陷分析等功能
- **观察者**:只能查看,不能使用模型

## 🔒 安全说明

- 所有密码都经过加密存储
- 用户会话使用JWT令牌保护
- 操作日志完整记录,便于审计
- 数据隔离,用户只能访问自己的对话记录

## 📞 技术支持

如遇到问题,请联系系统管理员。

5. 部署与使用指南

5.1 初始化系统

首先,我们需要初始化数据库和创建管理员账户:

# 进入项目目录
cd qwen-multi-user

# 初始化数据库
python -c "
from database import init_db
init_db()
print('数据库初始化完成')
"

# 创建管理员账户
python -c "
from auth import AuthManager
auth = AuthManager()
success, message = auth.create_user('admin', 'admin@example.com', 'Admin123!', 'admin')
print(f'创建管理员账户:{message}')
"

5.2 启动应用

启动Chainlit应用:

# 启动应用(默认端口8000)
chainlit run app.py -w --port 8000

# 或者指定主机和端口
chainlit run app.py -h 0.0.0.0 -p 8000

应用启动后,访问 http://你的服务器IP:8000 即可看到登录界面。

5.3 创建更多用户

你可以通过脚本批量创建用户,或者让用户自行注册(如果需要的话)。这里提供一个创建示例用户的脚本:

# create_users.py - 批量创建用户
from auth import AuthManager

users = [
    {'username': 'developer1', 'email': 'dev1@example.com', 'password': 'Dev123!', 'role': 'developer'},
    {'username': 'pm1', 'email': 'pm1@example.com', 'password': 'Pm123!', 'role': 'product_manager'},
    {'username': 'tester1', 'email': 'tester1@example.com', 'password': 'Test123!', 'role': 'tester'},
    {'username': 'viewer1', 'email': 'viewer1@example.com', 'password': 'View123!', 'role': 'viewer'},
]

auth = AuthManager()

for user in users:
    success, message = auth.create_user(
        username=user['username'],
        email=user['email'],
        password=user['password'],
        role=user['role']
    )
    print(f"创建用户 {user['username']}: {message}")

5.4 使用示例

5.4.1 用户登录
  1. 访问应用地址,看到登录界面
  2. 输入用户名和密码
  3. 系统根据角色显示不同的功能界面
5.4.2 不同角色的界面差异

管理员界面

  • 可以看到所有功能
  • 有"查看审计日志"按钮
  • 有"用户管理"选项

开发者界面

  • 可以看到代码相关功能
  • 可以使用代码审查、调试辅助
  • 不能查看审计日志

观察者界面

  • 只能查看历史对话
  • 不能发送新的查询
  • 功能菜单最少
5.4.3 审计日志查看

管理员可以查看完整的审计日志:

# 查看审计日志的示例代码
from database import get_session, AuditLog
from datetime import datetime, timedelta

def get_recent_audit_logs(hours=24):
    """获取最近24小时的审计日志"""
    session = get_session()
    try:
        time_threshold = datetime.utcnow() - timedelta(hours=hours)
        
        logs = session.query(AuditLog).filter(
            AuditLog.timestamp >= time_threshold
        ).order_by(AuditLog.timestamp.desc()).all()
        
        for log in logs:
            print(f"{log.timestamp} - {log.username} - {log.action}")
            if log.details:
                print(f"  详情:{log.details}")
            if log.model_query:
                print(f"  查询:{log.model_query[:50]}...")
            print()
            
        return logs
    finally:
        session.close()

# 运行查看
get_recent_audit_logs()

5.5 高级功能扩展

5.5.1 添加API接口

如果你需要提供API服务,可以添加FastAPI或Flask来提供RESTful API:

# api.py - 提供API接口
from fastapi import FastAPI, Depends, HTTPException, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import Optional
import uvicorn

from auth import AuthManager
from database import get_session, User

app = FastAPI(title="Qwen3-4B-Thinking API")
security = HTTPBearer()
auth_manager = AuthManager()

class QueryRequest(BaseModel):
    prompt: str
    max_tokens: int = 1024
    temperature: float = 0.7

class QueryResponse(BaseModel):
    success: bool
    response: Optional[str] = None
    tokens_used: Optional[int] = None
    error: Optional[str] = None

def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
    """获取当前用户"""
    token = credentials.credentials
    payload = auth_manager.verify_token(token)
    
    if not payload:
        raise HTTPException(status_code=401, detail="无效的令牌")
    
    return payload

@app.post("/api/query", response_model=QueryResponse)
async def query_model(
    request: QueryRequest,
    current_user: dict = Depends(get_current_user)
):
    """API查询接口"""
    # 检查权限
    if not auth_manager.check_permission(current_user['role'], 'query_model'):
        raise HTTPException(status_code=403, detail="权限不足")
    
    try:
        # 调用vLLM API
        import aiohttp
        async with aiohttp.ClientSession() as session:
            payload = {
                "model": "Qwen3-4B-Thinking",
                "prompt": request.prompt,
                "max_tokens": request.max_tokens,
                "temperature": request.temperature
            }
            
            async with session.post(
                "http://localhost:8000/v1/completions",
                json=payload,
                timeout=aiohttp.ClientTimeout(total=300)
            ) as response:
                if response.status == 200:
                    result = await response.json()
                    
                    # 记录审计日志
                    auth_manager.log_audit(
                        user_id=current_user['user_id'],
                        username=current_user['username'],
                        action='api_query',
                        details='API查询',
                        model_query=request.prompt[:500],
                        model_response=result['choices'][0]['text'][:500],
                        tokens_used=result.get('usage', {}).get('total_tokens', 0)
                    )
                    
                    return QueryResponse(
                        success=True,
                        response=result['choices'][0]['text'],
                        tokens_used=result.get('usage', {}).get('total_tokens', 0)
                    )
                else:
                    error_text = await response.text()
                    return QueryResponse(
                        success=False,
                        error=f"模型调用失败:{error_text}"
                    )
    
    except Exception as e:
        return QueryResponse(
            success=False,
            error=f"请求出错:{str(e)}"
        )

@app.get("/api/users/me")
async def get_current_user_info(current_user: dict = Depends(get_current_user)):
    """获取当前用户信息"""
    return current_user

@app.get("/api/stats/token-usage")
async def get_token_usage(
    days: int = 7,
    current_user: dict = Depends(get_current_user)
):
    """获取token使用统计(仅管理员)"""
    if current_user['role'] != 'admin':
        raise HTTPException(status_code=403, detail="需要管理员权限")
    
    from datetime import datetime, timedelta
    from sqlalchemy import func
    
    session = get_session()
    try:
        time_threshold = datetime.utcnow() - timedelta(days=days)
        
        # 按用户统计
        stats = session.query(
            AuditLog.username,
            func.sum(AuditLog.tokens_used).label('total_tokens'),
            func.count(AuditLog.id).label('query_count')
        ).filter(
            AuditLog.timestamp >= time_threshold,
            AuditLog.tokens_used.isnot(None)
        ).group_by(AuditLog.username).all()
        
        return {
            "period_days": days,
            "stats": [
                {
                    "username": stat.username,
                    "total_tokens": stat.total_tokens or 0,
                    "query_count": stat.query_count
                }
                for stat in stats
            ]
        }
    finally:
        session.close()

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8001)
5.5.2 添加用户管理界面

为管理员添加用户管理功能:

# admin.py - 管理员功能
import chainlit as cl
from database import get_session, User
from auth import AuthManager

auth_manager = AuthManager()

@cl.action_callback("manage_users")
async def on_manage_users(action: cl.Action):
    """用户管理界面"""
    user_session = cl.user_session.get("user_session")
    
    if not user_session or user_session.role != 'admin':
        await cl.Message(content="需要管理员权限").send()
        return
    
    # 获取所有用户
    session = get_session()
    try:
        users = session.query(User).order_by(User.created_at.desc()).all()
        
        if users:
            user_list = "## 用户列表\n\n"
            user_list += "| 用户名 | 邮箱 | 角色 | 状态 | 创建时间 |\n"
            user_list += "|--------|------|------|------|----------|\n"
            
            for user in users:
                status = "✅ 活跃" if user.is_active else "❌ 禁用"
                user_list += f"| {user.username} | {user.email} | {user.role} | {status} | {user.created_at.strftime('%Y-%m-%d')} |\n"
            
            await cl.Message(content=user_list).send()
            
            # 提供管理操作
            actions = [
                cl.Button(name="create_user", value="create", label="创建用户"),
                cl.Button(name="disable_user", value="disable", label="禁用用户"),
                cl.Button(name="reset_password", value="reset", label="重置密码"),
            ]
            
            await cl.Message(
                content="请选择要执行的操作:",
                actions=actions
            ).send()
        else:
            await cl.Message(content="暂无用户数据").send()
    
    except Exception as e:
        await cl.Message(content=f"获取用户列表失败:{str(e)}").send()
    finally:
        session.close()

6. 总结与最佳实践

6.1 部署总结

通过本教程,我们成功构建了一个完整的Qwen3-4B-Thinking多用户部署方案。这个方案的主要优势包括:

  1. 完整的用户管理体系:支持多用户注册、登录、权限控制
  2. 细粒度的权限隔离:不同角色看到不同的功能界面
  3. 完整的审计日志:所有操作都有记录,便于追溯和审计
  4. 对话历史管理:自动保存和加载用户对话
  5. Token使用统计:监控模型使用情况,便于成本控制

6.2 安全最佳实践

在生产环境中使用时,建议遵循以下安全最佳实践:

  1. 修改默认密钥:务必修改 config.py 中的 SECRET_KEY
  2. 使用强密码策略:要求用户使用复杂密码
  3. 启用HTTPS:在生产环境一定要启用HTTPS
  4. 定期备份数据库:定期备份用户数据和对话记录
  5. 监控审计日志:定期检查审计日志,发现异常行为
  6. 限制API调用频率:防止恶意用户过度使用

6.3 性能优化建议

如果用户量较大,可以考虑以下优化:

  1. 使用生产级数据库:将SQLite换成MySQL或PostgreSQL
  2. 添加缓存层:对频繁查询的结果进行缓存
  3. 实现连接池:数据库连接使用连接池
  4. 异步处理:将耗时的操作改为异步处理
  5. 负载均衡:多个实例部署,使用负载均衡器

6.4 扩展功能建议

根据实际需求,你还可以扩展以下功能:

  1. 用户注册功能:允许用户自行注册
  2. 密码重置:通过邮件重置密码
  3. 对话分享:允许用户分享对话记录
  4. 模型版本管理:支持切换不同版本的模型
  5. 使用配额:为不同用户设置不同的使用限额
  6. 插件系统:支持自定义插件扩展功能

6.5 故障排除

如果遇到问题,可以按以下步骤排查:

  1. 检查模型服务:确保vLLM服务正常运行
  2. 检查数据库:确认数据库文件权限正确
  3. 查看日志:检查Chainlit和应用的日志输出
  4. 验证网络:确保服务间网络连通
  5. 测试认证:使用脚本单独测试认证功能

这个多用户部署方案不仅适用于Qwen3-4B-Thinking模型,也可以轻松适配其他AI模型。通过Chainlit的强大前端能力和我们构建的后台管理系统,你可以快速搭建起一个安全、可控、易用的AI服务平台。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐