1. 搭建后台系统

前两篇介绍了全栈系统里面移动端和前端:
移动端篇:H5+搭建移动端应用
前端篇:Vue2.0搭建PC前端
项目线上地址:项目访问链接,账号:admin 密码:admin
本文讲述用flask创建一个简单的后台系统,系统包含以下内容:
登录模块:利用flask中session实现
用户部门模型:定义部门信息数据模型
用户职位模型:定义职位信息数据模型
用户分组模型:定义分组信息数据模型
用户角色模型:定义角色信息数据模型
用户信息模型:定义用户信息数据模型,演示用户增、删、改、查业务实现

1.1. 后端系统开发选用的技术栈如下:

开发语言:python3.7
数据库:MySQL5.7
开发框架:Flask0.10.1
开发工具:PyCharm2008
其他工具1:XAPP(集成apach和mysql)
其他工具2:Navicat 11.0.10

1.2. 系统的详细开发过程

1.2.1. 用PyCharm创建项目

在这里插入图片描述

项目创建完成后运行如下图:
在这里插入图片描述

1.2.2. 创建代码包文件

为了项目工程化,我们需要创建一个文件包FlaskDemo,创建好后,把资源文件static和模板文件templates移到包里面,如图
在这里插入图片描述

1.2.3. 安装项目需要的依赖库

项目里面需要用到session、数据库访问、邮件、WebSocket,需要提前安装,命令如下:
venv\Scripts\python.exe -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ -r requirements.txt
requirements.txt内容如下:

blinker==1.4
cachelib==0.1
certifi==2020.4.5.1
cffi==1.14.0
chardet==3.0.4
click==7.1.2
Flask==1.1.2
Flask-Cors==3.0.8
Flask-Mail==0.9.1
Flask-Session==0.3.2
Flask-Sockets==0.2.1
Flask-SQLAlchemy==2.4.3
gevent==20.5.2
gevent-websocket==0.10.1
greenlet==0.4.15
idna==2.9
itsdangerous==1.1.0
Jinja2==2.11.2
MarkupSafe==1.1.1
pycparser==2.20
PyMySQL==0.9.3
PySnooper==0.4.1
redis==3.5.2
requests==2.23.0
six==1.15.0
SQLAlchemy==1.3.17
urllib3==1.25.9
Werkzeug==1.0.1
zope.event==4.4
zope.interface==5.1.0

1.2.4. 创建项目配置文件和目录

在这里插入图片描述
项目目录结构如上图,文件和目录的说明如下:
controls:数据模型定义和业务操作
logs:日志文件目录
sources:用户上传资源文件目录
static:前端页面打包发布后存放的目录
templates:flask模板文件目录,这个目录下面的html是浏览器访问的入口
viewsclient:定义路由,并把请求分发到相应的数据模型
init.py:创建Flask App,初始化数据模型等操作
common.py:定义公共方法
config.py:flask配置文件
manager.py:flask启动管理
models.py:数据模型初始化
status_code.py:定义API请求返回码
tests:单元测试代码目录

1.2.4.1. 创建config.py

这个文件里面配置flask的基本配置:如数据库引擎、登录验证、邮件等

class Config:
    DEBUG = False
    # 默认数据库引擎,数据库根目录
    SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://{0}:{1}@{2}:3306'.format(DB_USER, DB_PWD, DB_ADDR)
    # 配置多个数据库连接
    SQLALCHEMY_BINDS = {
        'client': 'mysql+pymysql://{0}:{1}@{2}:3306/{3}?charset=utf8'.format(DB_USER, DB_PWD, DB_ADDR, DB_NAME)
    }

    # 在app设置里开启自动提交会出现 sqlalchemy.exc.InvalidRequestError: Can't reconnect until invalid transaction 
    # is rolled back
    SQLALCHEMY_COMMIT_ON_TEARDOWN = False
    SQLALCHEMY_TRACK_MODIFICATIONS = False  # 关闭数据追踪,避免内存资源浪费

    # session配置,用于登录验证
    # openssl rand -hex 32
    SECRET_KEY = "dc96245b9fa5070a4994f761a772592b0147b32ccd9c0e3150bb30371e727f70"

    # 将session存储到redis中
    SESSION_TYPE = "redis"
    SESSION_USE_SIGNER = True
    PERMANENT_SESSION_LIFETIME = 30*60  # 秒

    # 文件上传配置
    UPLOAD_FOLDER = 'static/uploads/'  # 上传目录
    MAX_CONTENT_LENGTH = 100 * 1024 * 1024  # 上传大小限制

    # 邮件发送设置
    MAIL_SERVER = 'smtp.ym.163.com'  # 邮箱服务器
    MAIL_USERNAME = 'xxxxxxxQS@163.com'  # 邮箱用户
    MAIL_PASSWORD = 'xxxxxxx'  # 用户密码
1.2.4.2. 创建manager.py

这个文件里面配置flask启动管理

def create_app(config):
    app = Flask(__name__)
    app.config.from_object(config)
    app.url_map.converters['html'] = HTMLConverter

    # session初始化,将session存储到redis中
    Session(app)

    # 系统需要的资源初始化
    system_init()

    # 日志处理
    import logging
    from logging.handlers import RotatingFileHandler
    logging.basicConfig(level=logging.DEBUG)
    file_log_handler = RotatingFileHandler(SYSTEM_LOGS_FILE, maxBytes=1024 * 1024 * 10,
                                           backupCount=10)  # 日志文件最大10M,最多备份10个
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s')
    file_log_handler.setFormatter(formatter)
    logging.getLogger().addHandler(file_log_handler)
    logging.getLogger().setLevel(logging.DEBUG)  # 处理日志不能显示到终端的问题

    return app
1.2.4.3. 创建status_code.py

这个文件定义接口返回码

class RET:
    OK = "0"
    DBERR = "4001"
    NODATA = "4002"
    DATAEXIST = "4003"
    DATAERR = "4004"
    SESSIONERR = "4101"
    LOGINERR = "4102"
    PARAMERR = "4103"
    USERERR = "4104"
    ROLEERR = "4105"
    PWDERR = "4106"
    CODEERR = "4107"
    AUTHEERR = "4108"
    KEYEERR = "4109"
    LOGINEERR = "4110"
    REQERR = "4201"
    IPERR = "4202"
    THIRDERR = "4301"
    IOERR = "4302"
    SERVERERR = "4500"
    UNKOWNERR = "4501"
    DATATYPE = '4006'
    DATAMISS = '4005'

ret_map = {
    RET.OK: u"成功",
    RET.DBERR: u"数据库查询错误",
    RET.NODATA: u"无数据",
    RET.DATAEXIST: u"数据已存在",
    RET.DATAERR: u"数据错误",
    RET.SESSIONERR: u"用户未登录",
    RET.LOGINERR: u"用户登录失败",
    RET.PARAMERR: u"参数错误",
    RET.USERERR: u"用户不存在或未激活",
    RET.ROLEERR: u"用户身份错误",
    RET.PWDERR: u"密码错误",
    RET.CODEERR: u"验证码错误",
    RET.AUTHEERR: u"权限错误",
    RET.KEYEERR: u"由于您长时间未进行任何操作,出于数据安全性要求,请重新登录",
    RET.LOGINEERR: u"用户已经登录",
    RET.REQERR: u"非法请求或请求次数受限",
    RET.IPERR: u"IP受限",
    RET.THIRDERR: u"第三方系统错误",
    RET.IOERR: u"文件读写错误",
    RET.SERVERERR: u"内部错误",
    RET.UNKOWNERR: u"未知错误",
    RET.DATATYPE: u"数据类型错误",
    RET.DATAMISS: u"参数缺失",
}
1.2.4.4. 创建common.py

这个文件定义公共方法

def system_init():  # 系统需要的资源初始化
    my_mkdir(SYSTEM_LOGS_DIR)  # 创建系统日志目录
    my_mkdir(CLIENT_SOURCES_DIR)  # 创建客户资源目录


def my_mkdir(path: str):  # 当面目录下创建文件夹
    if not os.path.exists(path):
        os.mkdir(path)
    return path


def get_md5(data: str, salt: bool=True):
    if salt:
        return hashlib.md5('1qa{0}{1}[;.]'.format(data, time.time()).encode(encoding='UTF-8')).hexdigest()
    else:
        return hashlib.md5(data.encode(encoding='UTF-8')).hexdigest()


def get_curr_format_time():
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
1.2.4.5. 创建__init__.py

这个文件创建Flask App

# 创建Flask App
app = create_app(DevelopConfig)
# 跨域请求
CORS(app, supports_credentials=True)

# 创建WebSocket,用于服务器主动向web端推送消息,避免web端轮询查询
sockets = Sockets(app)

# 用于发送邮件
mail = Mail(app)

# 2.初始化数据库
db.init_app(app)

# 3.注册蓝图
app.register_blueprint(client_page, url_prefix='/')

def init_data():
    with app.app_context():
        # 初始化数据库配置
        init_database()

@sockets.route('/echo')
def echo_socket(ws):
    key = str(ws).split(' ')[-1].rstrip('>')  # 用作服务器给web端推送的标识
    while not ws.closed:
        ws.send(json.dumps({'wskey': key}, ensure_ascii=False))  # 回传给clicent
        """ 服务端必须接收到客户端发的消息才能保持该服务运行,如果ws.receive()没有接收到客户端发送的
                 消息,那么它会关闭与客户端建立的链接
                 底层解释:Read and return a message from the stream. If `None` is returned, then
                the socket is considered closed/errored.
                所以客户端只建立连接,不与服务端交互通信,则无法实现自由通信状态,之后在客户端代码处会有详细内容。
                 """
        message = ws.receive()  # 接收到消息
        if message is not None:
            ws.send(json.dumps({'wskey': key}, ensure_ascii=False))  # 回传给clicent

init_data()
1.2.4.6. 创建models.py

这个文件初始化数据模型

from FlaskDemo.common import DB_NAME
from FlaskDemo.controls.Base import db, create_db_client
from FlaskDemo.controls.AccountDpart import AccountDpart
from FlaskDemo.controls.AccountGroup import AccountGroup
from FlaskDemo.controls.AccountPost import AccountPost
from FlaskDemo.controls.AccountRole import AccountRole
from FlaskDemo.controls.AccountUsers import AccountUsers


def create_client_db():
    # 创建所以客户数据表
    create_db_client(DB_NAME)

    # 初始化表数据
    AccountDpart().init_data()  # 初始化部门数据
    AccountRole.init_data()  # 初始化角色数据
    AccountPost.init_data()  # 初始化职位数据
    AccountUsers.init_data()  # 初始化用户数据
    AccountGroup.init_data()  # 初始化用户组数据


def init_database():
    # 创建客户数据库和数据表
    create_client_db()
1.2.4.7. 创建viewsclient

这个文件夹定义接口路由,采用的思路:定义增、删、改、查基本路由,再分发到具体的数据模型处理,这样避免定义很多个接口

定义蓝图,关联url接口地址

client_page = Blueprint('client_page', __name__)  # 定义蓝图

定义跟路由关联的接口函数

@client_page.route('/')
def index():
    return render_template('tempclient/login.html', error='')

浏览器输入:http://127.0.0.1:5000/就会跳转到跟路由关联的函数,即会跳转到tempclient目录下面的login.html文件

定义登录接口

# 登录入口
@client_page.route('/login', methods=['POST'])
def login():
    # 接收参数
    params = request.form
    logging.debug(json.dumps(params, indent=4, ensure_ascii=False))
    username = params.get('username')  # 用户名
    password = params.get('password')  # 密码
    code = params.get('code', '1234')  # 验证码
    logging.debug('username={0}, pwd={1}, code={2}'.format(username, password, code))

定义登出接口

# 用户退出
@client_page.route('/logout', methods=['POST'])
@client_is_login
def logout():
    # 删除当前session中的用户名,就退出登录
    if 'username' in session:
        session.pop('username', None)
    return jsonify(code=RET.OK, msg=ret_map[RET.OK])

定义基本路由:get、add、update、delete

# 查询数据接口
@client_page.route('/get/<qrytype>/<qryfunc>', methods=['GET'])
@client_is_login
def get(qrytype: str, qryfunc: str):
    args = request.args.to_dict()
    logging.debug('[get]qrytype = {0}, qryfunc = {1}'.format(qrytype, qryfunc))
    logging.debug(json.dumps(args, indent=4, ensure_ascii=False))

    return handle_requests(qrytype, qryfunc, args)


# 新增数据接口
@client_page.route('/add/<qrytype>/<qryfunc>', methods=['POST'])
@client_is_login
def add(qrytype: str, qryfunc: str):
    args = request.form.to_dict()
    logging.debug('[add]qrytype = {0}, qryfunc = {1}'.format(qrytype, qryfunc))
    logging.debug(json.dumps(args, indent=4, ensure_ascii=False))

    return handle_requests(qrytype, qryfunc, args)


# 编辑数据接口
@client_page.route('/update/<qrytype>/<qryfunc>/<Id>', methods=['PUT'])
@client_is_login
def update(qrytype: str, qryfunc: str, Id: int):
    logging.debug('[update]qrytype = {0}, qryfunc = {1}, Id = {2}'.format(qrytype, qryfunc, Id))
    for form in request.form:
        args = form
        break
    logging.debug(json.dumps(args, indent=4, ensure_ascii=False))

    return handle_requests(qrytype, qryfunc, args, Id)


# 删除数据接口
@client_page.route('/delete/<qrytype>/<qryfunc>/<Id>', methods=['DELETE'])
@client_is_login
def delete(qrytype: str, qryfunc: str, Id: int):
    args = request.form.to_dict()
    logging.debug('[delete]qrytype = {0}, qryfunc = {1}, Id = {2}'.format(qrytype, qryfunc, Id))
    logging.debug(json.dumps(args, indent=4, ensure_ascii=False))

    return handle_requests(qrytype, qryfunc, args, Id)

路由分发函数
后台定义函数get_type_query,获取字典:{‘数据模型名称’: 数据模型对象}

def get_type_query():
    typedict = {}
    for clazz in db.Model._decl_class_registry.values():
        try:
            typedict[clazz.__name__] = clazz
        except Exception as e:
            print(e)
    # print(typedict)
    return typedict
函数运行结果:
{'AccountDpart': <class 'FlaskDemo.controls.AccountDpart.AccountDpart'>, 
'AccountGroup': <class 'FlaskDemo.controls.AccountGroup.AccountGroup'>, 
'AccountPost': <class 'FlaskDemo.controls.AccountPost.AccountPost'>, 
'AccountRole': <class 'FlaskDemo.controls.AccountRole.AccountRole'>, 
'AccountUsers': <class 'FlaskDemo.controls.AccountUsers.AccountUsers'>}

web前端调用url传入格式:http://127.0.0.1:5000/get/[qrytype]/[qryfunc]
qrytype:查询类型,即对应的数据模型
qryfunc:数据模型中定义的具体业务函数
如查询用户表中所有用户数据,可以这样写:http://127.0.0.1:5000/get/AccountUsers/get_value_list

@pysnooper.snoop('pysnooper.log')  # 输出到文件
def handle_requests(qrytype: str, qryfunc: str, argc, Id: int = None):
    dbClass = get_type_query().get(qrytype)
    if not dbClass:
        return jsonify(code=RET.PARAMERR, msg=u'查询类型错误')

    dbcls = dbClass()
    if isinstance(argc, str):
        try:
            argcjson = json.loads(argc)
        except Exception as e:
            logging.error('[handle_requests]Failed to json.load, {0}'.format(e))
            logging.error(traceback.format_exc())
            return jsonify(code=RET.PARAMERR, msg='Failed to json.load, {0}'.format(e))
    elif isinstance(argc, dict):
        argcjson = argc
    else:
        return jsonify(code=RET.PARAMERR, msg=u'argc查询类型错误')
    # print(dir(dbcls))

    if qryfunc in dir(dbcls):
        if Id:
            value = dbcls.__getattribute__(qryfunc)(argcjson, Id)
        else:
            value = dbcls.__getattribute__(qryfunc)(argcjson)
        return value
    else:
        return jsonify(code=RET.PARAMERR, msg=qryfunc + '方法不存在,请联系开发人员')
1.2.4.8. 创建controls

这数据模型和业务逻辑目录
在这里插入图片描述

1.2.4.8.1. Base.py

这个文件定义数据模型基类BaseModel,定义全局数据库对象db,所有数据模型继承BaseModel和db,并在BaseModel类
中定义基本的数据操作函数,执行出错会事务回滚:
update:更新记录
add_update:增加一条记录
add_all_update:增加多条记录
delete:删除一条记录
to_json:SQLAlchemy转json对象

# 客户数据库连接,用作数据模型
db = SQLAlchemy()


class BaseModel(object):
    def update(self):  # 更新记录
        try:
            db.session.commit()  # 事务提交
        except Exception as e:
            db.session.rollback()  # 事务回滚
            logging.error('[BaseModel->update]Failed to run update, {0}'.format(e))
            logging.error(traceback.format_exc())
            return False

        return True

    def add_update(self):  # 增加一条记录
        try:
            db.session.add(self)  # 添加数据对象
            db.session.commit()  # 事务提交
        except Exception as e:
            db.session.rollback()  # 事务回滚
            logging.error('[BaseModel->add_update]Failed to run add_update, {0}'.format(e))
            logging.error(traceback.format_exc())
            return False

        return True

    def add_all_update(self, datas: list):  # 增加多条记录
        try:
            db.session.add_all(datas)  # 添加数据对象
            db.session.commit()  # 事务提交
        except Exception as e:
            db.session.rollback()  # 事务回滚
            logging.error('[BaseModel->add_all_update]Failed to run add_all_update, {0}'.format(e))
            logging.error(traceback.format_exc())
            return False

        return True

    def delete(self):  # 删除一条记录
        try:
            db.session.delete(self)  # 删除数据对象
            db.session.commit()  # 事务提交
        except Exception as e:
            db.session.rollback()  # 事务回滚
            logging.error('[BaseModel->delete]Failed to run delete, {0}'.format(e))
            logging.error(traceback.format_exc())
            return False

        return True

    def to_json(self):  # SQLAlchemy转json对象
        fields = {}
        for field in [x for x in self.__dict__ if not x.startswith('_') and x != 'metadata']:
            data = self.__getattribute__(field)
            try:
                if isinstance(data, datetime):
                    data = data.strftime('%Y-%m-%d %H:%M:%S')
                json.dumps(data)  # this will fail on non-encodable values, like other classes
                fields[field] = data
            except TypeError:
                fields[field] = None
                logging.error(traceback.format_exc())
        return fields
1.2.4.8.2. AccountDpart.py

这个文件是部门数据模型AccountDpart,定义用户部门信息,定义了3个业务函数:
init_data:初始化部门数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义

# 客户部门信息数据表模型
class AccountDpart(BaseModel, db.Model):
    __bind_key__ = 'client'  # 数据库引擎,这就表示数据库名称,在config.py中定义
    __tablename__ = 'account_depart'  # 设置信息->部门信息表

    Id = db.Column(db.INTEGER, primary_key=True)  # 部门ID,
    Name = db.Column(db.String(255), unique=True)  # 部门名称
    Remarks = db.Column(db.String(255))  # 部门描述
    RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True)  # 检验限制创建时间

    @staticmethod
    def init_data():
        datadict = [{'Name': '系统部', 'Remarks': '系统超级管理员'},
                    {'Name': '总经办', 'Remarks': '总经理办公室'}]
        for data in datadict:
            result = AccountDpart.query.filter(AccountDpart.Name == data.get('Name', '')).first()

            if not result:
                AccountDpart(Name=data.get('Name', ''), Remarks=data.get('Remarks', '')).add_update()

    @staticmethod
    def get_count(argc: dict):
        value = {'count': db.session.query(func.count(AccountDpart.Id)).scalar()}
        return {'code': RET.OK, 'data': value}

    @staticmethod
    def get_value_list(argc: dict):
        results = db.session.query(AccountDpart).all()
        values = []

        for res in results:
            values.append([res.to_json()])

        return {'code': RET.OK, 'data': values}
1.2.4.8.3. AccountGroup.py

这个文件是用户分组数据模型,定义用户分组信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义

# 客户用户组信息数据表模型
class AccountGroup(BaseModel, db.Model):
    __bind_key__ = 'client'  # 数据库引擎,这就表示数据库名称,在config.py中定义
    __tablename__ = 'account_group'  # 账户信息->用户组表

    Id = db.Column(db.INTEGER, primary_key=True)
    GroupName = db.Column(db.String(255), unique=True)  # 用户组名称
    Permiss = db.Column(db.Text)  # 用户组权限,json数组,对应访问页面的信息
    Remarks = db.Column(db.String(255))  # 备注
    RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True)  # 检验限制创建时间

    @staticmethod
    def init_data():
        datadict = [{'Id': 1, 'GroupName': '系统管理组', 'Permiss': '{"首页":[], "设置":[]}', 'Remarks': '超级管理组'}]
        for data in datadict:
            result = AccountGroup.query.filter(AccountGroup.Id == data.get('Id')).first()

            if not result:
                AccountGroup(Id=data.get('Id', 1), GroupName=data.get('GroupName', ''), Permiss=data.get('Permiss', ''),
                             Remarks=data.get('Remarks', '')).add_update()

    @staticmethod
    def get_count(argc: dict):
        value = {'count': db.session.query(func.count(AccountGroup.Id)).scalar()}
        return {'code': RET.OK, 'data': value}

    @staticmethod
    def get_value_list(argc: dict):
        results = db.session.query(AccountGroup).all()
        values = []

        for res in results:
            values.append([res.to_json()])

        return {'code': RET.OK, 'data': values}
1.2.4.8.4. AccountPost.py

这个文件是用户职位数据模型,定义用户职位信息
init_data:初始化职位数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义

# 客户职位信息数据表模型
class AccountPost(BaseModel, db.Model):
    __bind_key__ = 'client'  # 数据库引擎,这就表示数据库名称,在config.py中定义
    __tablename__ = 'account_post'  # 设置信息->职位信息表

    Id = db.Column(db.INTEGER, primary_key=True)  # 职位ID,
    DepartId = db.Column(db.INTEGER, nullable=False)  # 部门ID,对应数据表cli_acc_depart的ID
    Name = db.Column(db.String(255))  # 职位名称
    Remarks = db.Column(db.String(255))  # 职位描述
    Authview = db.Column(db.Text, default='{}')  # 用户查看权限
    Authoper = db.Column(db.Text, default='{}')  # 用户操作权限
    RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True)  # 检验限制创建时间

    @staticmethod
    def init_data():
        datadict = [{'DepartId': 1, 'Name': '系统管理员', 'Remarks': '系统管理员'},
                    {'DepartId': 2, 'Name': '普通用户', 'Remarks': '普通用户'}]
        for data in datadict:
            result = AccountPost.query.filter(AccountPost.DepartId == data.get('DepartId')).first()

            if not result:
                AccountPost(DepartId=data.get('DepartId', 2), Name=data.get('Name', ''),
                            Remarks=data.get('Remarks', '')).add_update()

    @staticmethod
    def get_count(argc: dict):
        value = {'count': db.session.query(func.count(AccountPost.Id)).scalar()}
        return {'code': RET.OK, 'data': value}

    @staticmethod
    def get_value_list(argc: dict):
        results = db.session.query(AccountPost, AccountDpart).filter(AccountPost.DepartId == AccountDpart.Id).all()
        values = []

        for res in results:
            values.append([res[0].to_json(), res[1].to_json()])

        return {'code': RET.OK, 'data': values}
1.2.4.8.5. AccountRole.py

这个文件是用户角色数据模型,定义用户角色信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义

# 客户角色信息数据表模型
class AccountRole(BaseModel, db.Model):
    __bind_key__ = 'client'  # 数据库引擎,这就表示数据库名称,在config.py中定义
    __tablename__ = 'account_role'  # 账户信息->用户角色表

    Id = db.Column(db.INTEGER, primary_key=True)  # 角色ID,1超级管理员,>1表示自定义权限
    RoleName = db.Column(db.String(255), unique=True)  # 角色名称
    Permiss = db.Column(db.Text)  # 角色权限,json数组,对应访问页面的信息
    Remarks = db.Column(db.String(255))  # 备注
    RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True)  # 检验限制创建时间

    @staticmethod
    def init_data():
        datadict = [{'Id': 1, 'RoleName': '系统管理员',
                     'Permiss': '["增加","删除","修改","查询","发送","下载","安装","上传"]', 'Remarks': '系统管理员'},
                    {'Id': 2, 'RoleName': '普通用户',
                     'Permiss': '["查询","发送","下载","安装","上传"]', 'Remarks': '普通用户'}
                    ]
        for data in datadict:
            result = AccountRole.query.filter(AccountRole.Id == data.get('Id')).first()

            if not result:
                AccountRole(Id=data.get('Id', 1), RoleName=data.get('RoleName', ''), Permiss=data.get('Permiss', ''),
                            Remarks=data.get('Remarks', '')).add_update()

    @staticmethod
    def get_count(argc: dict):
        value = {'count': db.session.query(func.count(AccountRole.Id)).scalar()}
        return {'code': RET.OK, 'data': value}

    @staticmethod
    def get_value_list(argc: dict):
        results = db.session.query(AccountRole).all()
        values = []

        for res in results:
            values.append([res.to_json()])

        return {'code': RET.OK, 'data': values}
1.2.4.8.6. AccountUsers.py

这个文件是分组数据模型,定义用户分组信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
add_value:增加用户数据,所有数据模型中都要定义
update_value:更新用户数据,根据用户ID来更新,所有数据模型中都要定义
delete_value:删除用户数据,根据用户ID来更新,所有数据模型中都要定义

# 客户用户信息数据表模型
class AccountUsers(BaseModel, db.Model):
    __bind_key__ = 'client'  # 数据库引擎,这就表示数据库名称,在config.py中定义
    __tablename__ = 'account_users'  # 账户信息->用户表

    Id = db.Column(db.INTEGER, primary_key=True)
    Name = db.Column(db.String(50), unique=True)  # 用户名称
    Pwd = db.Column(db.String(300))  # 用户密码
    Nick = db.Column(db.String(150))  # 用户姓名
    PostId = db.Column(db.INTEGER, nullable=False)  # 职位ID
    Mobile = db.Column(db.String(150))  # 用户手机号码
    Email = db.Column(db.String(150))  # 用户邮箱地址
    RoleIds = db.Column(db.String(255))  # 用户角色ID,json数组
    GroupIds = db.Column(db.String(255))  # 用户组ID,json数组
    Authview = db.Column(db.Text, default='{}')  # 用户查看权限
    Authoper = db.Column(db.Text, default='{}')  # 用户操作权限
    Valid = db.Column(db.INTEGER, default=1)  # 账号是否有效:0注销,1生效
    RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True)  # 检验限制创建时间

    @staticmethod
    def init_data():
        datadict = [
            {'Id': 1, 'Name': 'superadmin', 'Pwd': generate_password_hash('superadmin'), 'Nick': '超级管理员', 'PostId': 1},
            {'Id': 2, 'Name': 'admin', 'Pwd': generate_password_hash('admin'), 'Nick': '超级管理员', 'PostId': 1},
            {'Id': 3, 'Name': 'general', 'Pwd': generate_password_hash('general'), 'Nick': '超级管理员', 'PostId': 2}]
        for data in datadict:
            result = AccountUsers.query.filter(AccountUsers.Id == data.get('Id')).first()

            if not result:
                AccountUsers(Id=data.get('Id'), Name=data.get('Name'), Pwd=data.get('Pwd'), Nick=data.get('Nick'),
                             PostId=data.get('PostId')).add_update()

    @staticmethod
    def get_count(argc: dict):
        value = {'count': db.session.query(func.count(AccountUsers.Id)).scalar()}
        return {'code': RET.OK, 'data': value}

    @staticmethod
    def get_value_list(argc: dict):
        skip, limit = argc.get('skip', 0), argc.get('limit', 100)
        results = db.session.query(AccountUsers, AccountPost, AccountDpart). \
            filter(AccountUsers.PostId == AccountPost.Id).filter(AccountPost.DepartId == AccountDpart.Id).\
            offset(skip).limit(limit).all()
        values = []

        for res in results:
            values.append([res[0].to_json(), res[1].to_json(), res[2].to_json()])

        return {'code': RET.OK, 'data': values}

    @staticmethod
    def add_value(argc: dict):
        Name = argc.get('Name')  # 用户名
        Nick = argc.get('Nick')  # 姓名
        PostId = argc.get('PostId')  # 用户职位ID

        # 验证参数是否存在
        if not all([Name, Nick, PostId]):
            return {'code': RET.PARAMERR, 'msg': ret_map[RET.PARAMERR]}

        db_user = AccountUsers.query.filter(AccountUsers.Name == Name).first()
        if db_user:
            return {'code': RET.PARAMERR, 'msg': '用户名已经存在'}

        db_post = AccountPost.query.filter(AccountPost.Id == PostId).first()
        if not db_post:
            return {'code': RET.PARAMERR, 'msg': '部门ID不存在'}

        if AccountUsers(Name=Name, Pwd=generate_password_hash('123456'), Nick=Nick, PostId=PostId).add_update():
            return {'code': RET.OK, 'msg': ret_map[RET.OK]}
        else:
            logging.error(u'增加用户更新数据库失败,用户名:{0}'.format(Name))
            return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]}

    @staticmethod
    def update_value(argc: dict, Id: int):
        db_user = AccountUsers.query.filter(AccountUsers.Id == Id).first()
        if not db_user:
            return {'code': RET.PARAMERR, 'msg': '用户ID不存在'}

        for k, v in argc.items():
            if hasattr(db_user, k):
                db_user.__setattr__(k, v)

        if db_user.add_update():
            return {'code': RET.OK, 'msg': ret_map[RET.OK]}
        else:
            logging.error(u'编辑用户更新数据库失败')
            return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]}


    @staticmethod
    def delete_value(argc: dict, Id: int):
        db_user = AccountUsers.query.filter(AccountUsers.Id == Id).first()
        if not db_user:
            return {'code': RET.PARAMERR, 'msg': '用户ID不存在'}

        if db_user.delete():
            return {'code': RET.OK, 'msg': ret_map[RET.OK]}
        else:
            return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]}

1.3. 源码文件

后台源码:https://download.csdn.net/download/yyt593891927/12522457

访问地址:http://127.0.0.1:5000
默认用户名:admin
默认密码:admin

1.4. 后记

本文完整讲述了利用flask创建后台系统,下一章介绍用Vue2.0创建系统对应的前端页面。

Logo

前往低代码交流专区

更多推荐