【Flask基础】八,Flask数据库操作(增删改查)
【Flask基础】八,Flask数据库操作(增删改查)
·
一,原生Mysql
- 导入方式:import pymysql
- 安装方式:pip install pymysql
- 教程直通车:
【http://runoob.com】https://www.runoob.com/python3/python3-mysql.html
【w3cschool】https://www.w3cschool.cn/mysql/
【个人博客】https://www.cnblogs.com/sunBinary/p/12416792.html
1.常规操作流程
import pymysql
from pymysql.cursors import DictCursor
# 【第一步】:连接到MySQL数据库
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='199596',
charset='utf8', database='flaskp_boke', autocommit=True)
print(conn.get_server_info())#打印数据库信息
# 【第二步】:执行SQL语句
# 1. 实例化一个游标对象,2. 定义SQL语句,3.通过游标执行,4,处理执行结果
cursor = conn.cursor()
sql = "select * from users"
cursor.execute(sql)
result = cursor.fetchall()
print(result)
"""
为元祖套元祖
((1, 'woniu@woniuxy.com', 'e10adc3949ba59abbe56e057f20f883e', '蜗牛', '1.png', '12345678', 'admin', 5034, datetime.datetime(2020, 2, 5, 12, 31, 57), datetime.datetime(2020, 2, 12, 11, 45, 57)), (2, 'qiang@woniuxy.com', 'e10adc3949ba59abbe56e057f20f883e', '强哥', '2.png', '33445566', 'editor', 558, datetime.datetime(2020, 2, 6, 15, 16, 55), datetime.datetime(2020, 2, 12, 11, 46, 1)), (3, 'denny@woniuxy.com', 'e10adc3949ba59abbe56e057f20f883e', '丹尼', '3.png', '226658397', 'user', 84, datetime.datetime(2020, 2, 6, 15, 17, 30), datetime.datetime(2020, 2, 12, 11, 46, 8)), (4, 'reader1@woniuxy.com', 'e10adc3949ba59abbe56e057f20f883e', 'reader1', '8.png', '12345678', 'user', 53, datetime.datetime(2020, 2, 16, 13, 50, 12), datetime.datetime(2020, 2, 16, 13, 50, 12)), (5, 'reader2@woniuxy.com', 'e10adc3949ba59abbe56e057f20f883e', 'reader2', '6.png', '12345678', 'user', 77, datetime.datetime(2020, 2, 16, 14, 56, 37), datetime.datetime(2020, 2, 16, 14, 56, 37)), (6, 'reader3@woniuxy.com', 'e10adc3949ba59abbe56e057f20f883e', 'reader3', '13.png', '12345678', 'user', 64, datetime.datetime(2020, 2, 16, 14, 59, 12), datetime.datetime(2020, 2, 16, 14, 59, 12)), (7, 'tester@woniuxy.com', 'e10adc3949ba59abbe56e057f20f883e', 'tester', '9.png', '12345678', 'user', 56, datetime.datetime(2020, 2, 23, 3, 38, 34), datetime.datetime(2020, 2, 23, 3, 38, 34)))
"""
# 一般情况下,不建议使用下标获取列的值(但是可以获取到)
# 建议使用Key-Value来获取数据(Key==>列名,Value==>单元格的值)
# 代码可读性更强,代码维护起来更加高效
# 建议面对一些复杂的SQL,先在Navicat调试完成后再整合到代码中
cursor = conn.cursor(DictCursor) #字典游标
sql = "select * from users where userid=3"
cursor.execute(sql)
result = cursor.fetchall()
print(result)
"""
为列表套字典
[{'userid': 3, 'username': 'denny@woniuxy.com', 'password': 'e10adc3949ba59abbe56e057f20f883e', 'nickname': '丹尼', 'avatar': '3.png', 'qq': '226658397', 'role': 'user', 'credit': 84, 'createtime': datetime.datetime(2020, 2, 6, 15, 17, 30), 'updatetime': datetime.datetime(2020, 2, 12, 11, 46, 8)}]
"""
# 更新操作需要执行commit指令
sql = "update users set qq='12345678' where userid=4"
cursor.execute(sql)
conn.commit() # 提交修改:update, insert, delete
# 【第三步】:关闭数据库连接
cursor.close()
conn.close()
2.数据库 增/删/改/查
- 增加数据
#!/usr/bin/python3
import pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='testuser',
password='test123',
database='TESTDB')
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 插入语句
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""
try:
# 执行sql语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 如果发生错误则回滚
db.rollback()
# 关闭数据库连接
db.close()
- 删除数据
#!/usr/bin/python3
import pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='testuser',
password='test123',
database='TESTDB')
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 删除语句
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)
try:
# 执行SQL语句
cursor.execute(sql)
# 提交修改
db.commit()
except:
# 发生错误时回滚
db.rollback()
# 关闭连接
db.close()
- 修改数据
#!/usr/bin/python3
import pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='testuser',
password='test123',
database='TESTDB')
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 更新语句
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')
try:
# 执行SQL语句
cursor.execute(sql)
# 提交到数据库执行
db.commit()
except:
# 发生错误时回滚
db.rollback()
# 关闭数据库连接
db.close()
- 查询数据
#!/usr/bin/python3
import pymysql
# 打开数据库连接
db = pymysql.connect(host='localhost',
user='testuser',
password='test123',
database='TESTDB')
# 使用cursor()方法获取操作游标
cursor = db.cursor()
# SQL 查询语句
sql = "SELECT * FROM EMPLOYEE \
WHERE INCOME > %s" % (1000)
try:
# 执行SQL语句
cursor.execute(sql)
# 获取所有记录列表
results = cursor.fetchall()
for row in results:
fname = row[0]
lname = row[1]
age = row[2]
sex = row[3]
income = row[4]
# 打印结果
print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
(fname, lname, age, sex, income ))
except:
print ("Error: unable to fetch data")
# 关闭数据库连接
db.close()
二,SQLAlchemy
- 导入方式:import sqlalchemy
- 安装方式:pip install sqlalchemy
- 教程直通车:
【51job】https://www.jb51.net/article/197296.htm
【个人博客】http://www.zzvips.com/article/93197.html
1.数据库连接(别管用不用,上来就是一通导入)
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Table, \
MetaData, or_, func, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, session, scoped_session
engine = create_engine('mysql+pymysql://root:199596@localhost/flaskp_boke', echo=False)
DBsession = sessionmaker(bind=engine)
dbsession = scoped_session(DBsession) # 线程安全
Base = declarative_base()
md = MetaData(bind=engine)
2.数据库 增/删/改/查
- 通过表模型创建表
一般不会这么创建表,肯定是先在navicate中创建好表结构然后在代码中使用
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Table, \
MetaData, or_, func, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, session, scoped_session
engine = create_engine('mysql+pymysql://root:199596@localhost/flaskp_boke', echo=False)
DBsession = sessionmaker(bind=engine)
dbsession = scoped_session(DBsession) # 线程安全
Base = declarative_base()
md = MetaData(bind=engine)
# 定义模型类 【不会这么创建表,肯定是先在navicate中创建好表结构然后在代码中使用】
class Users(Base):
__tablename__ = "userx"
# 如果需要在SQLAlchemy里面直接创建表结构,则详细定义列
userid = Column(Integer, primary_key=True)
username = Column(String(50))
password = Column(String(32))
nickname = Column(String(30))
qq = Column(String(15))
role = Column(String(10))
credit = Column(Integer)
createtime = Column(DateTime)
updatetime = Column(DateTime)
Users.metadata.create_all(engine) # 创建表
- 数据表操作前需要先将表结构导入
class Users(Base):
__table__ = Table('users', md, autoload=True) #表名 原数据 自动加载表结构
class Article(Base):
__table__ = Table('article', md, autoload=True)
class Comment(Base):
__table__ = Table('comment', md, autoload=True)
- 增加数据库数据
from sqlalchemy import create_engine, MetaData, or_, func, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, session, scoped_session
engine = create_engine('mysql+pymysql://root:199596@localhost/flaskp_boke', echo=False)
DBsession = sessionmaker(bind=engine)
dbsession = scoped_session(DBsession) # 线程安全
Base = declarative_base()
md = MetaData(bind=engine)
class Users(Base):
__table__ = Table('users', md, autoload=True) #表名 原数据 自动加载表结构
class Article(Base):
__table__ = Table('article', md, autoload=True)
class Comment(Base):
__table__ = Table('comment', md, autoload=True)
# 新增
user = Users(username='reader5@woniuxy.com',
password='e10adc3949ba59abbe56e057f20f883e', role='user', credit=5)
dbsession.add(user)
dbsession.commit() # 修改类操作需要手工提交
- 查询数据库数据
from sqlalchemy import create_engine, MetaData, or_, func, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, session, scoped_session
engine = create_engine('mysql+pymysql://root:199596@localhost/flaskp_boke', echo=False)
DBsession = sessionmaker(bind=engine)
dbsession = scoped_session(DBsession) # 线程安全
Base = declarative_base()
md = MetaData(bind=engine)
class Users(Base):
__table__ = Table('users', md, autoload=True) #表名 原数据 自动加载表结构
class Article(Base):
__table__ = Table('article', md, autoload=True)
class Comment(Base):
__table__ = Table('comment', md, autoload=True)
# 基础单表查询汇总, 直接打印一个类的时候,具体打印什么内容,由类的__repr__魔术方法决定,可以重写
# select userid, username from users 结果为列表包元祖[(1, 'woniu@woniuxy.com'), (2, 'qiang@woniuxy.com')]
result = dbsession.query(Users.userid, Users.username).all()
# select * from users where userid=1 and qq='12345678'
result = dbsession.query(Users).filter_by(userid=1, qq='123456789').all()
# select * from users where userid>7 or nickname='丹尼'
result = dbsession.query(Users).filter(or_(Users.userid>7, Users.nickname=='丹尼')).all()
# select * from users limit 3
result = dbsession.query(Users).limit(3).all()
# select * from users limit 3,5
result = dbsession.query(Users).limit(5).offset(3).all()
# select count(*) from users where ....
count = dbsession.query(Users).filter(Users.userid > 3).count()
print(count)
# select distinct(qq) from users #去重(几乎用不到)-- distinct(Users.qq)对应哪一列进行去重
result = dbsession.query(Users.qq).distinct(Users.qq).all()
# select * from users order by userid desc
result = dbsession.query(Users).order_by(Users.userid.desc()).all()
# select * from users where username like '%qiang%' 模糊查询
result = dbsession.query(Users).filter(Users.username.like('%qiang%')).all()
# select * from users group by role 兼有去重功能--用的比较多
result = dbsession.query(Users).group_by(Users.role).all()
# 分组后再加条件就必须用having
result = dbsession.query(Users).group_by(Users.role).having(Users.userid>2).all()
# 聚合函数:min, max, avg, sum
# select sum(credit) from users
result = dbsession.query(func.sum(Users.credit)).first()
# filter: == >= > <= < != in not
# 多表查询汇总
# 多表连接查询: select * from article inner join users on article.userid=users.userid where article.articleid=1
# 多表连接查询时,返回的结果集不再是单纯的[Model, Model]数据结构,而是每张表的结果有独立的对象来维护
result = dbsession.query(Article, Users).join(Users, Article.userid == Users.userid).filter(Article.articleid==1).all()
result = dbsession.query(Article.articleid, Article.headline, Users.nickname).join(Users, Article.userid == Users.userid).filter(Article.articleid==1).all()
print(result)
for article,users in result:
print(article.articleid,article.headline,users.userid,users.username)
for articleid, headline, nickname in result:
print(articleid, headline, nickname)
# 外连接:查询每一个用户发表过的文章的阅读总量, outerjoin默认为左外连接
# select users.userid, users.nickname, sum(article.readcount) as total from users left join article
# on users.userid=article.userid group by (users.userid)
result = dbsession.query(Users.userid, Users.nickname, func.sum(Article.readcount))\
.outerjoin(Article, Users.userid==Article.userid).group_by(Users.userid).all()
print(result)
# 复杂查询: and和or混用,username like 'qiang' or (userid>3 and nickname='reader3')
result = dbsession.query(Users).filter(or_(Users.username.like('%qiang%'), and_(Users.userid>3, Users.nickname=='reader3'))).all()
result = dbsession.query(Users).filter(and_(Users.username.like('%qiang%'), or_(Users.userid>3, Users.nickname=='reader3'))).all()
result = dbsession.query(Users).filter(Users.username.like('%qiang%'), or_(Users.userid>3, Users.nickname=='reader3')).all()
for row in result:
print(row.userid, row.username)
# 三表连接
result = dbsession.query(Comment, Users).join(Users, Comment.userid==Users.userid)\
.join(Article, Article.articleid==Comment.articleid).all()
print(result)
- 数据库数据更新
from sqlalchemy import create_engine, MetaData, or_, func, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, session, scoped_session
engine = create_engine('mysql+pymysql://root:199596@localhost/flaskp_boke', echo=False)
DBsession = sessionmaker(bind=engine)
dbsession = scoped_session(DBsession) # 线程安全
Base = declarative_base()
md = MetaData(bind=engine)
class Users(Base):
__table__ = Table('users', md, autoload=True) #表名 原数据 自动加载表结构
class Article(Base):
__table__ = Table('article', md, autoload=True)
class Comment(Base):
__table__ = Table('comment', md, autoload=True)
【单独功能使用】
query = dbsession.query(Users)
xm_user = query.filter(Users.user_name == '小华').first()
xm_user.name = 'robin'
dbsession.commit()
【链式使用】
row = dbsession.query(Users).filter_by(userid=9).first()
row.username = 'reader6@woniuxy.com'
dbsession.commit()
- 数据库数据删除
from sqlalchemy import create_engine, MetaData, or_, func, and_
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, session, scoped_session
engine = create_engine('mysql+pymysql://root:199596@localhost/flaskp_boke', echo=False)
DBsession = sessionmaker(bind=engine)
dbsession = scoped_session(DBsession) # 线程安全
Base = declarative_base()
md = MetaData(bind=engine)
class Users(Base):
__table__ = Table('users', md, autoload=True) #表名 原数据 自动加载表结构
class Article(Base):
__table__ = Table('article', md, autoload=True)
class Comment(Base):
__table__ = Table('comment', md, autoload=True)
【单独功能使用】
query = session.query(Users)
xm_user = query.filter(Users.user_name == '小华').first()
dbsession.delete(xm_user)
dbsession.commit()
【链式使用】
row = dbsession.query(Users).filter_by(userid=9).delete()
dbsession.commit()
3.利用SQLAlchemy执行原生SQL
from sqlalchemy import create_engine, MetaData,
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session
engine = create_engine('mysql+pymysql://root:199596@localhost/flaskp_boke', echo=False)
DBsession = sessionmaker(bind=engine)
dbsession = scoped_session(DBsession) # 线程安全
Base = declarative_base()
md = MetaData(bind=engine)
# 利用SQLAlchemy执行原生SQL
result = dbsession.execute("select * from users where userid>5").fetchall()
rint(result)
print(result[0].username)
dbsession.execute("delete from users where userid=10")
dbsession.commit()
三,Flask_sqlalchemy
- 导入方式:from flask_sqlalchemy import SQLAlchemy
- 安装方式:pip install flask-sqlalchemy + pip install flask-mysqldb
- 教程直通车:
【个人博客】https://www.jb51.net/article/250758.htm
【个人博客】https://www.zhangshengrong.com/p/w4N7D8oGXr/
1.数据库表结构创建(一般不通过代码创建,了解即可)
# coding:utf-8
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
class Config(object):
"""配置参数"""
# sqlalchemy的配置参数
SQLALCHEMY_DATABASE_URI = "mysql://root:199596@127.0.0.1:3306/db_python"
# 设置sqlalchemy自动更跟踪数据库
SQLALCHEMY_TRACK_MODIFICATIONS = True
app.config.from_object(Config)
# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)
class Role(db.Model):
"""用户角色/身份表"""
__tablename__ = "tbl_roles"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32), unique=True)
#设置反向关系,额外为User添加一个role属性(反推属性)
users = db.relationship("User", backref="role")
def __repr__(self):
"""定义之后,可以让显示对象的时候更直观"""
return "Role object: name=%s" % self.name
# 表名的常见规范
# ihome -> ih_user 数据库名缩写_表名
# tbl_user tbl_表名
# 创建数据库模型类
"""
常见列选项
选项名 说明
primiary_key 如果为True,代表表的主键
unique 如果为True,代表这列不允许重复
index 如果为True,为这列创建索引,提高查询效率
nullable 如果为True,允许有空值,为False,不允许有空值
default 为这列定义默认值
"""
class User(db.Model):
"""用户表"""
__tablename__ = "tbl_users" # 指明数据库的表名
id = db.Column(db.Integer, primary_key=True) # 整型的主键,会默认设置为自增主键
name = db.Column(db.String(64), unique=True)
email = db.Column(db.String(128), unique=True)
password = db.Column(db.String(128))
role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id"))
def __repr__(self):
return "User object: name=%s" % self.name
if __name__ == '__main__':
# 清除数据库里的所有数据--第一次创建会有脏数据
db.drop_all()
# 创建所有的表
db.create_all()
#【单条保存】
# 创建对象
role1 = Role(name="admin")
# session记录对象任务
db.session.add(role1)
# 提交任务到数据库中
db.session.commit()
role2 = Role(name="stuff")
db.session.add(role2)
db.session.commit()
#【多条保存】
us1 = User(name='wang', email='wang@163.com', password='123456', role_id=role1.id)
us2 = User(name='zhang', email='zhang@189.com', password='201512', role_id=role2.id)
us3 = User(name='chen', email='chen@126.com', password='987654', role_id=role2.id)
us4 = User(name='zhou', email='zhou@163.com', password='456789', role_id=role1.id)
# 一次保存多条数据
db.session.add_all([us1, us2, us3, us4])
db.session.commit()
2.数据库增删改查
# coding:utf-8
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
#创建flask应用对象,当前的模块名字
app = Flask(__name__)
class Config(object):
"""配置参数"""
# sqlalchemy的配置参数
SQLALCHEMY_DATABASE_URI = "mysql://root:199596@127.0.0.1:3306/flask_test"
# 设置sqlalchemy自动更跟踪数据库
SQLALCHEMY_TRACK_MODIFICATIONS = True
app.config.from_object(Config)
# 创建数据库sqlalchemy工具对象
db = SQLAlchemy(app)
# 创建数据库模型类--角色
class Role(db.Model):
"""用户角色/身份表"""
__tablename__ = "tbl_roles"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32), unique=True)
#设定关联关系
users = db.relationship("User", backref="role")
def __repr__(self):
"""定义之后,可以让显示对象的时候更直观,一般不用改"""
return "Role object: name=%s" % self.name
# 创建数据库模型类--用户
class User(db.Model):
"""用户表"""
__tablename__ = "tbl_users" # 指明数据库的表名
id = db.Column(db.Integer, primary_key=True) # 整型的主键,会默认设置为自增主键
name = db.Column(db.String(64), unique=True)
email = db.Column(db.String(128), unique=True)
password = db.Column(db.String(128))
#外键设置格式
role_id = db.Column(db.Integer, db.ForeignKey("tbl_roles.id"))
def __repr__(self):
return "User object: name=%s" % self.name
if __name__ == '__main__':
# 清除数据库里的所有数据
db.drop_all()
# 创建所有的表
db.create_all()
"""增加数据"""
#1.创建对象
role1 = Role(name = "admin")
#2.session记录对象任务,提交单个数据
db.session.add(role1)
#3.提交session任务
db.session.commit()
role2 = Role(name = "stuff")
db.session.add(role2)
db.session.commit()
us1 = User(name='wang',email='wang@163.com',pswd='123456',role_id=role1.id)
us2 = User(name='zhang',email='zhang@189.com',pswd='201512',role_id=role2.id)
us3 = User(name='chen',email='chen@126.com',pswd='987654',role_id=role2.id)
us4 = User(name='zhou',email='zhou@163.com',pswd='456789',role_id=role1.id)
#提交多个存储数据到记录中
db.session.add_all([us1,us2,us3,us4])
db.session.commit()
"""删除数据"""
user = User.query.get(3)
db.session.delete(user)
db.session.commit()
"""修改数据"""
#方法一:
user = User.query.get(1)
user.name = "python"
db.session.add(user)
db.session.commit()
#方法二:
User.query.filter_by(name = "zhou").update({"name":"python","email":"itcast"})
db.session.commit()
"""查询数据"""
###可以将单个数据当做对象来通过属性来提取数据
#例:查询多条数据并将第一条数据的name值取出
li = Role.query.all()#flask-sqlalchemy方法
li2 = db.session.query(Role).all()#原始sqlalchemy方法
r = li[0]
temp_name = r.name
#查询数据库中单条数据,第一条数据
r = Role.query.first()
r2 = db.session.query(Role).first()
temp_name = r.name
#通过主键值来获取单条数据
r = Role.query.get(2)
r2 = db.session.query(Role).get(2)
temp_name = r.name
##############################################################3
###查询过滤器(符合条件取出)
"""
过滤器 说明
filter() 把过滤器添加到原查询上,返回一个新查询
filter_by() 把等值过滤器添加到原查询上,返回一个新查询
limit 使用指定的值限定原查询返回的结果
offset() 偏移原查询返回的结果,返回一个新查询
order_by() 根据指定条件对原查询结果进行排序,返回一个新查询
group_by() 根据指定条件对原查询结果进行分组,返回一个新查询
all() 以列表形式返回查询的所有结果
first() 返回查询的第一个结果,如果未查到,返回None
first_or_404() 返回查询的第一个结果,如果未查到,返回404
get() 返回指定主键对应的行,如不存在,返回None
get_or_404() 返回指定主键对应的行,如不存在,返回404
count() 返回查询结果的数量
paginate() 返回一个Paginate对象,它包含指定范围内的结果
"""
#不加后面all等操作的时候为一个不会执行的查询
#查询不到会返回NoneType
user1 = User.query.filter_by(name = "wang").all()
user2 = User.query.filter_by(name = "wang",role_id = 1).first()
#filter为万能过滤器,filter_by为一个特殊的等值过滤器
user1 = User.query.filter(User.name == "wang").all()
user2 = User.query.filter(User.name == "wang",User.role_id == 1).first()
#引入或者,与,非参数
from sqlalchemy import or_,and_,not_
#或操作
user1 = User.query.filter(or_(User.name == "wang",User.email.endswith("163.com"))).all()
temp_name = user1[0].name
#例:取User表中跳过两个数据后取前两个数据
user = User.query.offset(2).limit(2).all()
##排序flask-sqlalchemy写法
User.query.order_by("-id").all()
##排序flask官方写法,asc升序
User.query.order_by(User.id.desc()).all()
##分组查询(需要的时候在查吧,看着好像也没啥用),前面为显示信息
#如果需要求和操作,from sqlalchemy import func中有好多功能,用的时候再查吧
db.session.query(User.role_id).group_by(User.role_id)
####################################################################3
###关联查询
#从Role往User中查询
ro = Role.query.get(1)
user_name = ro.users[0].name
#从User往Role中查询
user = User.query.get(1)
role_name = user.role.name #role为关系别名
四,Redis(不做详细讲解,只给出教程直通车+数据库免安装资源)
- 导入方式:import redis
- 安装方式:pip install redis
- 教程直通车
【个人博客】https://blog.csdn.net/u014651560/article/details/119562421
【个人博客】https://blog.csdn.net/csdnhxs/article/details/122450575 - redis数据库免安装包:https://download.csdn.net/download/hot7732788/86403658
五,自定义ORM(使用原生pymysql)–学习构建方法
1.抽出公共功能为基类版本
import pymysql
from pymysql.cursors import DictCursor
class MySQL:
# 实例化即创建与数据库之间的连接
def __init__(self):
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='199596',
charset='utf8', database='flaskp_boke', autocommit=True)
self.cursor = conn.cursor(DictCursor)
# 封装基础查询语句
def query(self, sql):
self.cursor.execute(sql)
result = self.cursor.fetchall()
return result
# 执行修改操作
def execute(self, sql):
try:
self.cursor.execute(sql)
return 'OK'
except:
return 'Fail'
# 封装成标准的模型类,供子类继承
# 增加field()方法来指定查询哪些列,*代表所有列
class Model:
def __init__(self, **kwargs):
for k, v in kwargs.items():
self.__setattr__(k, v)
# 通过链式操作指定查询哪些列
def field(self, columns):
self.columns = columns # 动态增加类实例属性
return self
# 带列名的查询条件
def select(self, **where):
table = self.__class__.__getattribute__(self, 'table_name')
if hasattr(self, 'columns'):
sql = "select %s from %s" % (self.columns, table)
else:
sql = "select * from %s" % table
if where is not None:
sql += " where"
for k, v in where.items():
sql += " %s='%s' and" % (k, v)
sql += ' 1=1'
result = MySQL().query(sql)
return result
# 正常新增数据
def insert(self):
keys = []
values = []
for k, v in self.__dict__.items():
keys.append(k)
values.append(str(v))
sql = "insert into %s(%s) values('%s')" % (self.table_name, ','.join(keys), "','".join(values))
result = MySQL().execute(sql)
print(result)
# 定义子类Users和Article模型类
class Users(Model):
table_name = 'users'
# 调用父类的构造方法
def __init__(self, **kwargs):
super().__init__(**kwargs)
class Article(Model):
table_name = 'article'
# 调用父类的构造方法
def __init__(self, **kwargs):
super().__init__(**kwargs)
user = Users()
# result = user.select(userid=1)
result = user.field('userid, username, nickname').select(userid=2)
print(result)
article = Article()
result = article.select(articleid=1)
print(result[0]['headline'])
2.不抽出基类版本
import pymysql
from pymysql.cursors import DictCursor
class MySQL:
# 实例化即创建与数据库之间的连接
def __init__(self):
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', password='199596',
charset='utf8', database='flaskp_boke', autocommit=True)
self.cursor = conn.cursor(DictCursor) #定义返回字典的数据库游标
# 封装基础查询语句
def query(self, sql):
self.cursor.execute(sql)
result = self.cursor.fetchall()
return result
# 执行修改操作
def execute(self, sql):
try:
self.cursor.execute(sql)
return 'OK'
except:
return 'Fail'
class Users:
table_name = 'users' # 定义表名
# 构造方法,传递字典参数作为Insert的Key和Value
def __init__(self, **kwargs):
for k, v in kwargs.items():
self.__setattr__(k, v) #把表中列名变成了属性,值变成了对应属性值
print(self.__dict__)
# 封装查询操作
def select(self, **where):
sql = "select * from %s" % self.table_name
if where is not None:
sql += " where"
for k, v in where.items():
sql += " %s='%s' and" % (k, v)
sql += ' 1=1'
print(sql)
result = MySQL().query(sql)
return result
# 封装新增: insert into table(c1, c2, c3) values(v1, v2, v3)
def insert(self):
keys = []
values = []
for k, v in self.__dict__.items():
keys.append(k)
values.append(str(v))
sql = "insert into %s(%s) values('%s')" % (self.table_name, ','.join(keys), "','".join(values))
print(sql)
result = MySQL().execute(sql)
print(result)
if __name__ == '__main__':
# user = Users()
# result = user.select("userid=1 and nickname='蜗牛'")
# result = user.select(userid=1, nickname='蜗牛')
# result = user.select(userid=3)
# print(result)
user = Users(username='reader4@woniuxy.com', password='e10adc3949ba59abbe56e057f20f883e', role='user', credit=5)
user.insert()
print(user.select(username='reader4@woniuxy.com'))
更多推荐
已为社区贡献1条内容
所有评论(0)