Flask+Vue搭建系统
一个人的系统欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入欢迎使用Markdow
1. 搭建后台系统
前两篇介绍了全栈系统里面移动端和前端:
移动端篇:H5+搭建移动端应用
前端篇:Vue2.0搭建PC前端
项目线上地址:项目访问链接,账号:admin 密码:admin
本文讲述用flask创建一个简单的后台系统,系统包含以下内容:
登录模块:利用flask中session实现
用户部门模型:定义部门信息数据模型
用户职位模型:定义职位信息数据模型
用户分组模型:定义分组信息数据模型
用户角色模型:定义角色信息数据模型
用户信息模型:定义用户信息数据模型,演示用户增、删、改、查业务实现
1.1. 后端系统开发选用的技术栈如下:
开发语言:python3.7
数据库:MySQL5.7
开发框架:Flask0.10.1
开发工具:PyCharm2008
其他工具1:XAPP(集成apach和mysql)
其他工具2:Navicat 11.0.10
1.2. 系统的详细开发过程
1.2.1. 用PyCharm创建项目
项目创建完成后运行如下图:
1.2.2. 创建代码包文件
为了项目工程化,我们需要创建一个文件包FlaskDemo,创建好后,把资源文件static和模板文件templates移到包里面,如图
1.2.3. 安装项目需要的依赖库
项目里面需要用到session、数据库访问、邮件、WebSocket,需要提前安装,命令如下:
venv\Scripts\python.exe -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ -r requirements.txt
requirements.txt内容如下:
blinker==1.4
cachelib==0.1
certifi==2020.4.5.1
cffi==1.14.0
chardet==3.0.4
click==7.1.2
Flask==1.1.2
Flask-Cors==3.0.8
Flask-Mail==0.9.1
Flask-Session==0.3.2
Flask-Sockets==0.2.1
Flask-SQLAlchemy==2.4.3
gevent==20.5.2
gevent-websocket==0.10.1
greenlet==0.4.15
idna==2.9
itsdangerous==1.1.0
Jinja2==2.11.2
MarkupSafe==1.1.1
pycparser==2.20
PyMySQL==0.9.3
PySnooper==0.4.1
redis==3.5.2
requests==2.23.0
six==1.15.0
SQLAlchemy==1.3.17
urllib3==1.25.9
Werkzeug==1.0.1
zope.event==4.4
zope.interface==5.1.0
1.2.4. 创建项目配置文件和目录
项目目录结构如上图,文件和目录的说明如下:
controls:数据模型定义和业务操作
logs:日志文件目录
sources:用户上传资源文件目录
static:前端页面打包发布后存放的目录
templates:flask模板文件目录,这个目录下面的html是浏览器访问的入口
viewsclient:定义路由,并把请求分发到相应的数据模型
init.py:创建Flask App,初始化数据模型等操作
common.py:定义公共方法
config.py:flask配置文件
manager.py:flask启动管理
models.py:数据模型初始化
status_code.py:定义API请求返回码
tests:单元测试代码目录
1.2.4.1. 创建config.py
这个文件里面配置flask的基本配置:如数据库引擎、登录验证、邮件等
class Config:
DEBUG = False
# 默认数据库引擎,数据库根目录
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://{0}:{1}@{2}:3306'.format(DB_USER, DB_PWD, DB_ADDR)
# 配置多个数据库连接
SQLALCHEMY_BINDS = {
'client': 'mysql+pymysql://{0}:{1}@{2}:3306/{3}?charset=utf8'.format(DB_USER, DB_PWD, DB_ADDR, DB_NAME)
}
# 在app设置里开启自动提交会出现 sqlalchemy.exc.InvalidRequestError: Can't reconnect until invalid transaction
# is rolled back
SQLALCHEMY_COMMIT_ON_TEARDOWN = False
SQLALCHEMY_TRACK_MODIFICATIONS = False # 关闭数据追踪,避免内存资源浪费
# session配置,用于登录验证
# openssl rand -hex 32
SECRET_KEY = "dc96245b9fa5070a4994f761a772592b0147b32ccd9c0e3150bb30371e727f70"
# 将session存储到redis中
SESSION_TYPE = "redis"
SESSION_USE_SIGNER = True
PERMANENT_SESSION_LIFETIME = 30*60 # 秒
# 文件上传配置
UPLOAD_FOLDER = 'static/uploads/' # 上传目录
MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 上传大小限制
# 邮件发送设置
MAIL_SERVER = 'smtp.ym.163.com' # 邮箱服务器
MAIL_USERNAME = 'xxxxxxxQS@163.com' # 邮箱用户
MAIL_PASSWORD = 'xxxxxxx' # 用户密码
1.2.4.2. 创建manager.py
这个文件里面配置flask启动管理
def create_app(config):
app = Flask(__name__)
app.config.from_object(config)
app.url_map.converters['html'] = HTMLConverter
# session初始化,将session存储到redis中
Session(app)
# 系统需要的资源初始化
system_init()
# 日志处理
import logging
from logging.handlers import RotatingFileHandler
logging.basicConfig(level=logging.DEBUG)
file_log_handler = RotatingFileHandler(SYSTEM_LOGS_FILE, maxBytes=1024 * 1024 * 10,
backupCount=10) # 日志文件最大10M,最多备份10个
formatter = logging.Formatter('%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s')
file_log_handler.setFormatter(formatter)
logging.getLogger().addHandler(file_log_handler)
logging.getLogger().setLevel(logging.DEBUG) # 处理日志不能显示到终端的问题
return app
1.2.4.3. 创建status_code.py
这个文件定义接口返回码
class RET:
OK = "0"
DBERR = "4001"
NODATA = "4002"
DATAEXIST = "4003"
DATAERR = "4004"
SESSIONERR = "4101"
LOGINERR = "4102"
PARAMERR = "4103"
USERERR = "4104"
ROLEERR = "4105"
PWDERR = "4106"
CODEERR = "4107"
AUTHEERR = "4108"
KEYEERR = "4109"
LOGINEERR = "4110"
REQERR = "4201"
IPERR = "4202"
THIRDERR = "4301"
IOERR = "4302"
SERVERERR = "4500"
UNKOWNERR = "4501"
DATATYPE = '4006'
DATAMISS = '4005'
ret_map = {
RET.OK: u"成功",
RET.DBERR: u"数据库查询错误",
RET.NODATA: u"无数据",
RET.DATAEXIST: u"数据已存在",
RET.DATAERR: u"数据错误",
RET.SESSIONERR: u"用户未登录",
RET.LOGINERR: u"用户登录失败",
RET.PARAMERR: u"参数错误",
RET.USERERR: u"用户不存在或未激活",
RET.ROLEERR: u"用户身份错误",
RET.PWDERR: u"密码错误",
RET.CODEERR: u"验证码错误",
RET.AUTHEERR: u"权限错误",
RET.KEYEERR: u"由于您长时间未进行任何操作,出于数据安全性要求,请重新登录",
RET.LOGINEERR: u"用户已经登录",
RET.REQERR: u"非法请求或请求次数受限",
RET.IPERR: u"IP受限",
RET.THIRDERR: u"第三方系统错误",
RET.IOERR: u"文件读写错误",
RET.SERVERERR: u"内部错误",
RET.UNKOWNERR: u"未知错误",
RET.DATATYPE: u"数据类型错误",
RET.DATAMISS: u"参数缺失",
}
1.2.4.4. 创建common.py
这个文件定义公共方法
def system_init(): # 系统需要的资源初始化
my_mkdir(SYSTEM_LOGS_DIR) # 创建系统日志目录
my_mkdir(CLIENT_SOURCES_DIR) # 创建客户资源目录
def my_mkdir(path: str): # 当面目录下创建文件夹
if not os.path.exists(path):
os.mkdir(path)
return path
def get_md5(data: str, salt: bool=True):
if salt:
return hashlib.md5('1qa{0}{1}[;.]'.format(data, time.time()).encode(encoding='UTF-8')).hexdigest()
else:
return hashlib.md5(data.encode(encoding='UTF-8')).hexdigest()
def get_curr_format_time():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
1.2.4.5. 创建__init__.py
这个文件创建Flask App
# 创建Flask App
app = create_app(DevelopConfig)
# 跨域请求
CORS(app, supports_credentials=True)
# 创建WebSocket,用于服务器主动向web端推送消息,避免web端轮询查询
sockets = Sockets(app)
# 用于发送邮件
mail = Mail(app)
# 2.初始化数据库
db.init_app(app)
# 3.注册蓝图
app.register_blueprint(client_page, url_prefix='/')
def init_data():
with app.app_context():
# 初始化数据库配置
init_database()
@sockets.route('/echo')
def echo_socket(ws):
key = str(ws).split(' ')[-1].rstrip('>') # 用作服务器给web端推送的标识
while not ws.closed:
ws.send(json.dumps({'wskey': key}, ensure_ascii=False)) # 回传给clicent
""" 服务端必须接收到客户端发的消息才能保持该服务运行,如果ws.receive()没有接收到客户端发送的
消息,那么它会关闭与客户端建立的链接
底层解释:Read and return a message from the stream. If `None` is returned, then
the socket is considered closed/errored.
所以客户端只建立连接,不与服务端交互通信,则无法实现自由通信状态,之后在客户端代码处会有详细内容。
"""
message = ws.receive() # 接收到消息
if message is not None:
ws.send(json.dumps({'wskey': key}, ensure_ascii=False)) # 回传给clicent
init_data()
1.2.4.6. 创建models.py
这个文件初始化数据模型
from FlaskDemo.common import DB_NAME
from FlaskDemo.controls.Base import db, create_db_client
from FlaskDemo.controls.AccountDpart import AccountDpart
from FlaskDemo.controls.AccountGroup import AccountGroup
from FlaskDemo.controls.AccountPost import AccountPost
from FlaskDemo.controls.AccountRole import AccountRole
from FlaskDemo.controls.AccountUsers import AccountUsers
def create_client_db():
# 创建所以客户数据表
create_db_client(DB_NAME)
# 初始化表数据
AccountDpart().init_data() # 初始化部门数据
AccountRole.init_data() # 初始化角色数据
AccountPost.init_data() # 初始化职位数据
AccountUsers.init_data() # 初始化用户数据
AccountGroup.init_data() # 初始化用户组数据
def init_database():
# 创建客户数据库和数据表
create_client_db()
1.2.4.7. 创建viewsclient
这个文件夹定义接口路由,采用的思路:定义增、删、改、查基本路由,再分发到具体的数据模型处理,这样避免定义很多个接口
定义蓝图,关联url接口地址
client_page = Blueprint('client_page', __name__) # 定义蓝图
定义跟路由关联的接口函数
@client_page.route('/')
def index():
return render_template('tempclient/login.html', error='')
浏览器输入:http://127.0.0.1:5000/就会跳转到跟路由关联的函数,即会跳转到tempclient目录下面的login.html文件
定义登录接口
# 登录入口
@client_page.route('/login', methods=['POST'])
def login():
# 接收参数
params = request.form
logging.debug(json.dumps(params, indent=4, ensure_ascii=False))
username = params.get('username') # 用户名
password = params.get('password') # 密码
code = params.get('code', '1234') # 验证码
logging.debug('username={0}, pwd={1}, code={2}'.format(username, password, code))
定义登出接口
# 用户退出
@client_page.route('/logout', methods=['POST'])
@client_is_login
def logout():
# 删除当前session中的用户名,就退出登录
if 'username' in session:
session.pop('username', None)
return jsonify(code=RET.OK, msg=ret_map[RET.OK])
定义基本路由:get、add、update、delete
# 查询数据接口
@client_page.route('/get/<qrytype>/<qryfunc>', methods=['GET'])
@client_is_login
def get(qrytype: str, qryfunc: str):
args = request.args.to_dict()
logging.debug('[get]qrytype = {0}, qryfunc = {1}'.format(qrytype, qryfunc))
logging.debug(json.dumps(args, indent=4, ensure_ascii=False))
return handle_requests(qrytype, qryfunc, args)
# 新增数据接口
@client_page.route('/add/<qrytype>/<qryfunc>', methods=['POST'])
@client_is_login
def add(qrytype: str, qryfunc: str):
args = request.form.to_dict()
logging.debug('[add]qrytype = {0}, qryfunc = {1}'.format(qrytype, qryfunc))
logging.debug(json.dumps(args, indent=4, ensure_ascii=False))
return handle_requests(qrytype, qryfunc, args)
# 编辑数据接口
@client_page.route('/update/<qrytype>/<qryfunc>/<Id>', methods=['PUT'])
@client_is_login
def update(qrytype: str, qryfunc: str, Id: int):
logging.debug('[update]qrytype = {0}, qryfunc = {1}, Id = {2}'.format(qrytype, qryfunc, Id))
for form in request.form:
args = form
break
logging.debug(json.dumps(args, indent=4, ensure_ascii=False))
return handle_requests(qrytype, qryfunc, args, Id)
# 删除数据接口
@client_page.route('/delete/<qrytype>/<qryfunc>/<Id>', methods=['DELETE'])
@client_is_login
def delete(qrytype: str, qryfunc: str, Id: int):
args = request.form.to_dict()
logging.debug('[delete]qrytype = {0}, qryfunc = {1}, Id = {2}'.format(qrytype, qryfunc, Id))
logging.debug(json.dumps(args, indent=4, ensure_ascii=False))
return handle_requests(qrytype, qryfunc, args, Id)
路由分发函数
后台定义函数get_type_query,获取字典:{‘数据模型名称’: 数据模型对象}
def get_type_query():
typedict = {}
for clazz in db.Model._decl_class_registry.values():
try:
typedict[clazz.__name__] = clazz
except Exception as e:
print(e)
# print(typedict)
return typedict
函数运行结果:
{'AccountDpart': <class 'FlaskDemo.controls.AccountDpart.AccountDpart'>,
'AccountGroup': <class 'FlaskDemo.controls.AccountGroup.AccountGroup'>,
'AccountPost': <class 'FlaskDemo.controls.AccountPost.AccountPost'>,
'AccountRole': <class 'FlaskDemo.controls.AccountRole.AccountRole'>,
'AccountUsers': <class 'FlaskDemo.controls.AccountUsers.AccountUsers'>}
web前端调用url传入格式:http://127.0.0.1:5000/get/[qrytype]/[qryfunc]
qrytype:查询类型,即对应的数据模型
qryfunc:数据模型中定义的具体业务函数
如查询用户表中所有用户数据,可以这样写:http://127.0.0.1:5000/get/AccountUsers/get_value_list
@pysnooper.snoop('pysnooper.log') # 输出到文件
def handle_requests(qrytype: str, qryfunc: str, argc, Id: int = None):
dbClass = get_type_query().get(qrytype)
if not dbClass:
return jsonify(code=RET.PARAMERR, msg=u'查询类型错误')
dbcls = dbClass()
if isinstance(argc, str):
try:
argcjson = json.loads(argc)
except Exception as e:
logging.error('[handle_requests]Failed to json.load, {0}'.format(e))
logging.error(traceback.format_exc())
return jsonify(code=RET.PARAMERR, msg='Failed to json.load, {0}'.format(e))
elif isinstance(argc, dict):
argcjson = argc
else:
return jsonify(code=RET.PARAMERR, msg=u'argc查询类型错误')
# print(dir(dbcls))
if qryfunc in dir(dbcls):
if Id:
value = dbcls.__getattribute__(qryfunc)(argcjson, Id)
else:
value = dbcls.__getattribute__(qryfunc)(argcjson)
return value
else:
return jsonify(code=RET.PARAMERR, msg=qryfunc + '方法不存在,请联系开发人员')
1.2.4.8. 创建controls
这数据模型和业务逻辑目录
1.2.4.8.1. Base.py
这个文件定义数据模型基类BaseModel,定义全局数据库对象db,所有数据模型继承BaseModel和db,并在BaseModel类
中定义基本的数据操作函数,执行出错会事务回滚:
update:更新记录
add_update:增加一条记录
add_all_update:增加多条记录
delete:删除一条记录
to_json:SQLAlchemy转json对象
# 客户数据库连接,用作数据模型
db = SQLAlchemy()
class BaseModel(object):
def update(self): # 更新记录
try:
db.session.commit() # 事务提交
except Exception as e:
db.session.rollback() # 事务回滚
logging.error('[BaseModel->update]Failed to run update, {0}'.format(e))
logging.error(traceback.format_exc())
return False
return True
def add_update(self): # 增加一条记录
try:
db.session.add(self) # 添加数据对象
db.session.commit() # 事务提交
except Exception as e:
db.session.rollback() # 事务回滚
logging.error('[BaseModel->add_update]Failed to run add_update, {0}'.format(e))
logging.error(traceback.format_exc())
return False
return True
def add_all_update(self, datas: list): # 增加多条记录
try:
db.session.add_all(datas) # 添加数据对象
db.session.commit() # 事务提交
except Exception as e:
db.session.rollback() # 事务回滚
logging.error('[BaseModel->add_all_update]Failed to run add_all_update, {0}'.format(e))
logging.error(traceback.format_exc())
return False
return True
def delete(self): # 删除一条记录
try:
db.session.delete(self) # 删除数据对象
db.session.commit() # 事务提交
except Exception as e:
db.session.rollback() # 事务回滚
logging.error('[BaseModel->delete]Failed to run delete, {0}'.format(e))
logging.error(traceback.format_exc())
return False
return True
def to_json(self): # SQLAlchemy转json对象
fields = {}
for field in [x for x in self.__dict__ if not x.startswith('_') and x != 'metadata']:
data = self.__getattribute__(field)
try:
if isinstance(data, datetime):
data = data.strftime('%Y-%m-%d %H:%M:%S')
json.dumps(data) # this will fail on non-encodable values, like other classes
fields[field] = data
except TypeError:
fields[field] = None
logging.error(traceback.format_exc())
return fields
1.2.4.8.2. AccountDpart.py
这个文件是部门数据模型AccountDpart,定义用户部门信息,定义了3个业务函数:
init_data:初始化部门数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
# 客户部门信息数据表模型
class AccountDpart(BaseModel, db.Model):
__bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义
__tablename__ = 'account_depart' # 设置信息->部门信息表
Id = db.Column(db.INTEGER, primary_key=True) # 部门ID,
Name = db.Column(db.String(255), unique=True) # 部门名称
Remarks = db.Column(db.String(255)) # 部门描述
RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间
@staticmethod
def init_data():
datadict = [{'Name': '系统部', 'Remarks': '系统超级管理员'},
{'Name': '总经办', 'Remarks': '总经理办公室'}]
for data in datadict:
result = AccountDpart.query.filter(AccountDpart.Name == data.get('Name', '')).first()
if not result:
AccountDpart(Name=data.get('Name', ''), Remarks=data.get('Remarks', '')).add_update()
@staticmethod
def get_count(argc: dict):
value = {'count': db.session.query(func.count(AccountDpart.Id)).scalar()}
return {'code': RET.OK, 'data': value}
@staticmethod
def get_value_list(argc: dict):
results = db.session.query(AccountDpart).all()
values = []
for res in results:
values.append([res.to_json()])
return {'code': RET.OK, 'data': values}
1.2.4.8.3. AccountGroup.py
这个文件是用户分组数据模型,定义用户分组信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
# 客户用户组信息数据表模型
class AccountGroup(BaseModel, db.Model):
__bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义
__tablename__ = 'account_group' # 账户信息->用户组表
Id = db.Column(db.INTEGER, primary_key=True)
GroupName = db.Column(db.String(255), unique=True) # 用户组名称
Permiss = db.Column(db.Text) # 用户组权限,json数组,对应访问页面的信息
Remarks = db.Column(db.String(255)) # 备注
RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间
@staticmethod
def init_data():
datadict = [{'Id': 1, 'GroupName': '系统管理组', 'Permiss': '{"首页":[], "设置":[]}', 'Remarks': '超级管理组'}]
for data in datadict:
result = AccountGroup.query.filter(AccountGroup.Id == data.get('Id')).first()
if not result:
AccountGroup(Id=data.get('Id', 1), GroupName=data.get('GroupName', ''), Permiss=data.get('Permiss', ''),
Remarks=data.get('Remarks', '')).add_update()
@staticmethod
def get_count(argc: dict):
value = {'count': db.session.query(func.count(AccountGroup.Id)).scalar()}
return {'code': RET.OK, 'data': value}
@staticmethod
def get_value_list(argc: dict):
results = db.session.query(AccountGroup).all()
values = []
for res in results:
values.append([res.to_json()])
return {'code': RET.OK, 'data': values}
1.2.4.8.4. AccountPost.py
这个文件是用户职位数据模型,定义用户职位信息
init_data:初始化职位数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
# 客户职位信息数据表模型
class AccountPost(BaseModel, db.Model):
__bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义
__tablename__ = 'account_post' # 设置信息->职位信息表
Id = db.Column(db.INTEGER, primary_key=True) # 职位ID,
DepartId = db.Column(db.INTEGER, nullable=False) # 部门ID,对应数据表cli_acc_depart的ID
Name = db.Column(db.String(255)) # 职位名称
Remarks = db.Column(db.String(255)) # 职位描述
Authview = db.Column(db.Text, default='{}') # 用户查看权限
Authoper = db.Column(db.Text, default='{}') # 用户操作权限
RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间
@staticmethod
def init_data():
datadict = [{'DepartId': 1, 'Name': '系统管理员', 'Remarks': '系统管理员'},
{'DepartId': 2, 'Name': '普通用户', 'Remarks': '普通用户'}]
for data in datadict:
result = AccountPost.query.filter(AccountPost.DepartId == data.get('DepartId')).first()
if not result:
AccountPost(DepartId=data.get('DepartId', 2), Name=data.get('Name', ''),
Remarks=data.get('Remarks', '')).add_update()
@staticmethod
def get_count(argc: dict):
value = {'count': db.session.query(func.count(AccountPost.Id)).scalar()}
return {'code': RET.OK, 'data': value}
@staticmethod
def get_value_list(argc: dict):
results = db.session.query(AccountPost, AccountDpart).filter(AccountPost.DepartId == AccountDpart.Id).all()
values = []
for res in results:
values.append([res[0].to_json(), res[1].to_json()])
return {'code': RET.OK, 'data': values}
1.2.4.8.5. AccountRole.py
这个文件是用户角色数据模型,定义用户角色信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
# 客户角色信息数据表模型
class AccountRole(BaseModel, db.Model):
__bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义
__tablename__ = 'account_role' # 账户信息->用户角色表
Id = db.Column(db.INTEGER, primary_key=True) # 角色ID,1超级管理员,>1表示自定义权限
RoleName = db.Column(db.String(255), unique=True) # 角色名称
Permiss = db.Column(db.Text) # 角色权限,json数组,对应访问页面的信息
Remarks = db.Column(db.String(255)) # 备注
RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间
@staticmethod
def init_data():
datadict = [{'Id': 1, 'RoleName': '系统管理员',
'Permiss': '["增加","删除","修改","查询","发送","下载","安装","上传"]', 'Remarks': '系统管理员'},
{'Id': 2, 'RoleName': '普通用户',
'Permiss': '["查询","发送","下载","安装","上传"]', 'Remarks': '普通用户'}
]
for data in datadict:
result = AccountRole.query.filter(AccountRole.Id == data.get('Id')).first()
if not result:
AccountRole(Id=data.get('Id', 1), RoleName=data.get('RoleName', ''), Permiss=data.get('Permiss', ''),
Remarks=data.get('Remarks', '')).add_update()
@staticmethod
def get_count(argc: dict):
value = {'count': db.session.query(func.count(AccountRole.Id)).scalar()}
return {'code': RET.OK, 'data': value}
@staticmethod
def get_value_list(argc: dict):
results = db.session.query(AccountRole).all()
values = []
for res in results:
values.append([res.to_json()])
return {'code': RET.OK, 'data': values}
1.2.4.8.6. AccountUsers.py
这个文件是分组数据模型,定义用户分组信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
add_value:增加用户数据,所有数据模型中都要定义
update_value:更新用户数据,根据用户ID来更新,所有数据模型中都要定义
delete_value:删除用户数据,根据用户ID来更新,所有数据模型中都要定义
# 客户用户信息数据表模型
class AccountUsers(BaseModel, db.Model):
__bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义
__tablename__ = 'account_users' # 账户信息->用户表
Id = db.Column(db.INTEGER, primary_key=True)
Name = db.Column(db.String(50), unique=True) # 用户名称
Pwd = db.Column(db.String(300)) # 用户密码
Nick = db.Column(db.String(150)) # 用户姓名
PostId = db.Column(db.INTEGER, nullable=False) # 职位ID
Mobile = db.Column(db.String(150)) # 用户手机号码
Email = db.Column(db.String(150)) # 用户邮箱地址
RoleIds = db.Column(db.String(255)) # 用户角色ID,json数组
GroupIds = db.Column(db.String(255)) # 用户组ID,json数组
Authview = db.Column(db.Text, default='{}') # 用户查看权限
Authoper = db.Column(db.Text, default='{}') # 用户操作权限
Valid = db.Column(db.INTEGER, default=1) # 账号是否有效:0注销,1生效
RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间
@staticmethod
def init_data():
datadict = [
{'Id': 1, 'Name': 'superadmin', 'Pwd': generate_password_hash('superadmin'), 'Nick': '超级管理员', 'PostId': 1},
{'Id': 2, 'Name': 'admin', 'Pwd': generate_password_hash('admin'), 'Nick': '超级管理员', 'PostId': 1},
{'Id': 3, 'Name': 'general', 'Pwd': generate_password_hash('general'), 'Nick': '超级管理员', 'PostId': 2}]
for data in datadict:
result = AccountUsers.query.filter(AccountUsers.Id == data.get('Id')).first()
if not result:
AccountUsers(Id=data.get('Id'), Name=data.get('Name'), Pwd=data.get('Pwd'), Nick=data.get('Nick'),
PostId=data.get('PostId')).add_update()
@staticmethod
def get_count(argc: dict):
value = {'count': db.session.query(func.count(AccountUsers.Id)).scalar()}
return {'code': RET.OK, 'data': value}
@staticmethod
def get_value_list(argc: dict):
skip, limit = argc.get('skip', 0), argc.get('limit', 100)
results = db.session.query(AccountUsers, AccountPost, AccountDpart). \
filter(AccountUsers.PostId == AccountPost.Id).filter(AccountPost.DepartId == AccountDpart.Id).\
offset(skip).limit(limit).all()
values = []
for res in results:
values.append([res[0].to_json(), res[1].to_json(), res[2].to_json()])
return {'code': RET.OK, 'data': values}
@staticmethod
def add_value(argc: dict):
Name = argc.get('Name') # 用户名
Nick = argc.get('Nick') # 姓名
PostId = argc.get('PostId') # 用户职位ID
# 验证参数是否存在
if not all([Name, Nick, PostId]):
return {'code': RET.PARAMERR, 'msg': ret_map[RET.PARAMERR]}
db_user = AccountUsers.query.filter(AccountUsers.Name == Name).first()
if db_user:
return {'code': RET.PARAMERR, 'msg': '用户名已经存在'}
db_post = AccountPost.query.filter(AccountPost.Id == PostId).first()
if not db_post:
return {'code': RET.PARAMERR, 'msg': '部门ID不存在'}
if AccountUsers(Name=Name, Pwd=generate_password_hash('123456'), Nick=Nick, PostId=PostId).add_update():
return {'code': RET.OK, 'msg': ret_map[RET.OK]}
else:
logging.error(u'增加用户更新数据库失败,用户名:{0}'.format(Name))
return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]}
@staticmethod
def update_value(argc: dict, Id: int):
db_user = AccountUsers.query.filter(AccountUsers.Id == Id).first()
if not db_user:
return {'code': RET.PARAMERR, 'msg': '用户ID不存在'}
for k, v in argc.items():
if hasattr(db_user, k):
db_user.__setattr__(k, v)
if db_user.add_update():
return {'code': RET.OK, 'msg': ret_map[RET.OK]}
else:
logging.error(u'编辑用户更新数据库失败')
return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]}
@staticmethod
def delete_value(argc: dict, Id: int):
db_user = AccountUsers.query.filter(AccountUsers.Id == Id).first()
if not db_user:
return {'code': RET.PARAMERR, 'msg': '用户ID不存在'}
if db_user.delete():
return {'code': RET.OK, 'msg': ret_map[RET.OK]}
else:
return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]}
1.3. 源码文件
后台源码:https://download.csdn.net/download/yyt593891927/12522457
访问地址:http://127.0.0.1:5000
默认用户名:admin
默认密码:admin
1.4. 后记
本文完整讲述了利用flask创建后台系统,下一章介绍用Vue2.0创建系统对应的前端页面。
更多推荐
所有评论(0)