【flask】SQLAlchemy 的增删改查(CRUD)
1.增加数据增加数据的步骤很简单,只需要把模型类对象db.session.add(模型类对象)即可,不过如果没有开启自动提交的话,需要手动提交db.session.commit()from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyapp = Flask(__name__)app.config["SQLALCHEMY_DATA
·
1.增加数据
增加数据的步骤很简单,只需要把模型类对象db.session.add(模型类对象)
即可,不过如果没有开启自动提交的话,需要手动提交db.session.commit()
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@192.168.203.132:3306/python39"
app.config["SQLALCHEMY_ECHO"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
# 模型类
class Goods(db.Model):
__tablename__ = "tb_goods"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64))
count = db.Column(db.Integer, default=0)
@app.route("/add")
def add():
goods1 = Goods(name='小米10', count=1000) # 实例化模型类对象
db.session.add(goods1) # 添加单个数据
# db.session.add_all([goods1, goods2, goods3]) # 添加多个数据
# 若只是修改数据,不存在则新增,可以使用merge()方法
db.session.commit() # 提交会话(事务)
return "add success"
if __name__ == '__main__':
db.drop_all() # 删除所有表,为方便测试,实际勿用
db.create_all() # 创建所有表
app.run(debug=True)
某些情况,我们需要获取未插入数据库的数据主键id,可以使用flush()
goods1 = Goods(name='小米10', count=1000)
db.session.add(goods1)
db.session.flush()
print(goods1.id)
db.session.commit()
注意: SQLAlchemy会自动创建一个隐式事务(implicit),需要手动提交,但是事务失败,它会自动回滚
2.查询数据
2.1 准备数据
我们先定义好用户模型类,并且添加数据
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@192.168.203.132:3306/python39"
app.config["SQLALCHEMY_ECHO"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
class User(db.Model):
__tablename__ = "users" # 表名 默认使用类名的小写
# 定义类属性 记录字段
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64))
email = db.Column(db.String(64))
age = db.Column(db.Integer)
def __repr__(self): # 自定义 交互模式 & print() 的对象打印
return "(%s, %s, %s, %s)" % (self.id, self.name, self.email, self.age)
@app.route("/add")
def add():
user1 = User(name='wang', email='wang@163.com', age=20)
user2 = User(name='zhang', email='zhang@189.com', age=33)
user3 = User(name='chen', email='chen@126.com', age=23)
user4 = User(name='zhou', email='zhou@163.com', age=29)
user5 = User(name='tang', email='tang@itheima.com', age=25)
user6 = User(name='wu', email='wu@gmail.com', age=25)
user7 = User(name='qian', email='qian@gmail.com', age=23)
user8 = User(name='liu', email='liu@itheima.com', age=30)
user9 = User(name='li', email='li@163.com', age=28)
user10 = User(name='sun', email='sun@163.com', age=26)
db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10])
db.session.commit()
return "add success"
if __name__ == '__main__':
db.drop_all() # 删除所有表,为方便测试,实际勿用
db.create_all() # 创建所有表
app.run(debug=True)
2.2 查询执行器
方法 | 说明 |
---|---|
all() | 返回一个元素为查询结果的列表 |
count() | 返回查询结果的数量 |
first() | 返回第一个结果或None |
first_or_404() | 返回第一个结果或404 |
get(主键) | 返回主键对应的对象或None |
get_or_404(主键) | 返回主键对应的对象或None |
paginate(页码, 每页条数) | 返回一个分页查询结果 |
2.3 查询过滤器
过滤器 | 说明 |
---|---|
filter_by(字段名=值) | 等值查询 |
filter(函数引用/比较运算) | 函数或比较查询 |
limit(限定条数) | 限定返回结果数量 |
offset(偏移条数) | 对查询结果进行偏移 |
order_by(排序字段) | 对查询结果进行排序 |
options() | 针对原查询限定查询的字段 |
提示:以上查询得到的都是一个BaseQuery对象
2.4 查询实例
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@192.168.203.132:3306/python39"
app.config["SQLALCHEMY_ECHO"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
class User(db.Model):
__tablename__ = "users" # 表名 默认使用类名的小写
...
@app.route("/add")
def add():
...
return "add success"
@app.route("/query")
def query():
# 简单查询
User.query.all() # 查询所有用户数据
User.query.count() # 查询有多少个用户
User.query.first() # 查询第1个用户
# 根据id查询 返回模型对象/None
User.query.get(5)
User.query.filter_by(id=5).all()
User.query.filter(User.id == 5).first()
# 查询名字以某个字符开始/结尾/包含的所有用户
User.query.filter(User.name.endswith("g")).all()
User.query.filter(User.name.startswith("w")).all()
User.query.filter(User.name.contains("n")).all()
User.query.filter(User.name.like("w%n%g")).all() # 模糊查询
# 查询名字不等于wang的所有用户
from sqlalchemy import not_, and_, or_
User.query.filter(not_(User.name == "wang")).all()
User.query.filter(User.name != "wang").all()
User.query.filter(User.id.in_([1, 5, 6])).all() # 查询id在某个范围的用户
# 排序查询
User.query.order_by(User.age, User.id.desc()).limit(5).all() # 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个
# 分页查询
pn = User.query.paginate(3, 3)
# pn.pages 总页数 pn.page 当前页码 pn.items 当前页的数据 pn.total 总条数
# 去重
db.query(distinct(User.name)).all()
db.query(User.name,User.age).distinct(User.name, User.age).filter(xxx).all()
# 聚合查询
# 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合
from sqlalchemy import func
data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all()
for item in data:
# print(item[0], item[1])
print(item.age, item.count)
# 只查询所有人的姓名和邮箱 优化查询 User.query.all() # 相当于select *
from sqlalchemy.orm import load_only
data = User.query.options(load_only(User.name, User.email)).all()
# 另一种写法: data = session.query(User).with_entities(User.id).filter(xxx).all()
for item in data:
print(item.name, item.email)
data = db.session.query(User.name, User.email).all()
for item in data:
print(item.name, item.email)
return "query success"
if __name__ == '__main__':
db.drop_all() # 删除所有表,为方便测试,实际勿用
db.create_all() # 创建所有表
app.run(debug=True)
3.更新数据
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@192.168.203.132:3306/python39"
app.config["SQLALCHEMY_ECHO"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
class Goods(db.Model):
__tablename__ = "goods"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64))
count = db.Column(db.Integer, default=0)
@app.route("/add")
def add():
goods = Goods(name="下午茶", count=99)
db.session.add(goods)
db.session.commit()
return "add success"
@app.route("/update")
def update():
# 方案1:先查询模型对象,修改对象属性,再提交到数据库
goods = Goods.query.filter(Goods.name == "下午茶").first()
goods.count -= 1
db.session.add(goods) # 可以省略这步
db.session.commit()
# 方案2(推荐):
Goods.query.filter(Goods.name == "下午茶").update({"count": Goods.count - 1})
db.session.commit()
return "update success"
@app.route("/")
def index():
return "index page"
if __name__ == '__main__':
db.drop_all()
db.create_all()
app.run(debug=True)
4.删除数据
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:mysql@192.168.203.132:3306/python39"
app.config["SQLALCHEMY_ECHO"] = True
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
db = SQLAlchemy(app)
class Goods(db.Model):
__tablename__ = "tb_goods"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64))
count = db.Column(db.Integer, default=0)
@app.route("/add")
def add():
goods = Goods(name="下午茶", count=99)
db.session.add(goods)
db.session.commit()
return "add success"
@app.route("/delete")
def delete():
# 方案1:
goods = Goods.query.filter(Goods.name == "下午茶").first()
db.session.delete(goods)
db.session.commit()
# 方案2(推荐):
Goods.query.filter(Goods.name == "山寨奔驰跑车").delete()
db.session.commit()
return "delete success"
@app.route("/")
def index():
return "index page"
if __name__ == '__main__':
db.drop_all()
db.create_all()
app.run(debug=True)
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献6条内容
所有评论(0)