zl程序教程

您现在的位置是:首页 >  后端

当前栏目

三十五、python学习之Flask框架(七)数据库:Flask对数据库的基本操作、常见关系模板、数据库迁移、综合案例:图书管理

2023-09-27 14:29:29 时间

补充:
  使用SQL_Alchemy定义一个模型类,不可以不指定primary_key=True创建表.

一、数据库基本操作

1. 数据库的基本操作(CRUD):

  • 在Flask-SQLAlchemy中,插入、修改、删除操作,均由数据库会话管理。

    • 会话用 db.session 表示。在准备把数据写入数据库前,要先将数据添加到会话中然后调用 commit() 方法提交会话。
  • 在 Flask-SQLAlchemy 中,查询操作是通过 query 对象操作数据。

    • 最基本的查询是返回表中所有数据,可以通过过滤器进行更精确的数据库查询。

2.在视图函数中定义模型类:

# !/usr/bin python
# coding=utf-8

from flask import Flask
# 导入flask_sqlalchemy扩展
from flask_sqlalchemy import SQLAlchemy


app = Flask(__name__)
# 配置数据库连接
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:19970125@127.0.0.1:3306/python32"
# 关闭动态追踪修改的警告
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 展示sql语句
app.config['SQLALCHEMY_ECHO'] = True

# 实例化sqlalchemy对象
db = SQLAlchemy(app)

# 定义用户类型数据模型
class Role(db.Model):
    __tablename__ = "roles"
    id = db.Column(db.Integer, primary_key = True)
    name = db.Column(db.String(32), unique=True)
    
	# 定义关系引用,第一个参数User表示多方的类名
    # 第二个backref表示的是反向引用,给User模型用,实现多对一的查询
    # 等号左边给一方Role使用,backref给多方User使用
    us = db.relationship('User',backref='role')

    # 定义方法,输出查询结果
    def __repr__(self):
        return "name:%s" % self.name


# 定义用户数据模型
class User(db.Model):
    __tabelname__ = "users"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(32))
    email = db.Column(db.String(32), unique=True)
    pswd = db.Column(db.String(32))
    # 定义外键
    role_id = db.Column(db.Integer, db.ForeignKey("roles.id"))

    def __repr__(self):
        return "name:%s, email:%s, passwd:%s" % (self.name, self.email, self.pswd)


# 定义视图
@app.route("/")
def index():
    return "hello world!!"


if __name__ == '__main__':
    # 删除数据库
    db.drop_all()
    # 创建新库
    db.create_all()

    # 实例化用户类型模型类
    ro1 = Role(name = "admin")
    ro2 = Role(name = "user")
    # 通过db.session提交数据库
    db.session.add_all([ro1, ro2])
    db.session.commit()

    # 实例化用户模型累
    us0 = User(name = "张三", email = "117128@qq.com", pswd = "123456", role_id = ro1.id)
    us1 = User(name='wang', email='wang@163.com', pswd='123456', role_id=ro1.id)
    us2 = User(name='zhang', email='zhang@189.com', pswd='201512', role_id=ro2.id)
    us3 = User(name='chen', email='chen@126.com', pswd='987654', role_id=ro2.id)
    us4 = User(name='zhou', email='zhou@163.com', pswd='456789', role_id=ro1.id)
    db.session.add_all([us0,us1,us2,us3,us4])
    db.session.commit()

    app.run(debug=True)

3.常用的SQLAlchemy查询过滤器:

过滤器说明
filter()把过滤器添加到原查询上,返回一个新查询
filter_by()把等值过滤器添加到原查询上,返回一个新查询
limit()使用指定的值限定原查询返回的结果,限制查询的条数
offset()偏移原查询返回的结果,返回一个新的查询
order_by()根据指定的条件对原查询结果进行排序,返回一个新查询
group_by()根据指定条件对原查询结果进行分组,返回一个新查询

分析:

  • filter(): 可以进行逻辑判断,(相对重要)
  • filter_by():等值就是赋值语句
  • order_by(): (相对重要)
    • 参数:desc:使用降序排序; asc:使用升序排序(默认)

4.常用的SQLAlchemy查询执行器:

方法说明
all()以列表形式返回查询结果
first()返回查询结果的第一个结果,如果未找查到,返回None
first_or_404()返回查询的第一个结果,如果未查到返回404
get()返回指定的主键对应的行,如果不存在,返回None
get_or_404()返回指定主键对应的行,如果不存在,返回404
count()返回查询结果的数量
paginate()返回一个paginate对象,它包含指定范围内的结果

分析:

  • 查询器得到一个类,通过执行器拿到结果
  • all,first,get常用
  • paginate:分页,比较难,重点掌握一下

5.数据库的查询操作:

5.1 查询所有

User.query.all()    # 返回一个列表,是对象列表

解决问题:在模型类中定义方法,实现查询结果显示成可读字符串:

def __repr__(self):
	return 'name: %s' % self.name   # 返回刻度对象中的属性
	 # Python中这个_repr_函数,对应repr(object)这个函数,返回一个可以用来表示对象的可打印字符串.
	 # __str__就是基于__repr__实现的.

5.2 查询一个

User.query.first()  # 直接返回一个数据

5.3 查询过滤器:类似于sql中的where

User.query.filter(User.name == "wang")  # 返回一个基本查询对象
  • 是过滤查询;
  • 接收的参数必须使用模型的类名;
  • 必须使用查询过滤器;
  • filter查询不加条件,相当于查询所有;

加上执行器即可显示查询内容:

User.query.filter(User.name == "wang").first()

可以使用字符串的操作语句,来约束要查询的条件:
例如:

User.query.filter(User.name.endseith("g")).all()    # 查询name以字符串"g"结尾的数据
User.query.filter(User.name.startswith("z")).all()  # 查询name以"z"开头的数据

使用查询执行器显示需求的数据:
例如:

User.query.filter(User.name.startswith("z")).count  # 查询name以"z"开头的数据个数

在filter中加多个查询条件(条件以逗号分隔):
例如:

User.query.filter(User.name != "wang", User.email.endswith('163.com)).all()

sqlalchemy中的and_, or_, not_操作:
例如:

# 导入sqlalchemy中的判断语句
from sqlalchemy import or_, and_, not_
User.query.filter(or_(User.name != "wang", User.email.endswith('163.com))).all()```

5.4等值过滤器:

User.query.filter(name = "wang")    # 返回一个基本查询对象
  • 只能使用的等值操作;
  • 参数为具体模型的字段名,但是不能出现具体的类名;
  • 必须使用查询执行器

5.5 限制查询结果的数量:

User.query.filter().limit(2).all()  # 返回一个列表,限制查询结果的数量

5.6 分页查询:

paginate = User.query.filter().paginate(1, 2, False)
    # 分页操作的参数,可以是2个,也可以是3个
    # 参数1:当前页号(1~n);
    # 参数2:每一页的数据;
    # 参数3:如果分页异常不报错;
    # 得到一个分页对象;
paginate.page   # 获取当前页
paginate.pages  # 获取总页数
paginate.items  # 获取当前页中的数据

5.7 get查询:

User,get(key)   # get查询接受的参数是主键值,不填报错

5.8 排序查询:

User.query.order_by().all() # 默认根据主键的顺序排
User.query.order_by(User.id.desc()).all()   # 使用id字段降序排序
User.query.order_by(User.id.asc()).all()   # 使用id字段升序排序

5.9 练习:

  • 查询所有用户数据:
User.query.all()
  • 查询有多少个用户:
User.query.count()
  • 查询第1个用户:
User.query.first()
  • 查询id为4的用户[3种方式]:
# 方法一:
User.query.filter(User.id==4).all()
# 方法二:
User.query.filter_by(id=4).all()
# 方法三:
User.query.get(4)
  • 查询名字结尾字符为z的所有数据[开始/包含]:
User.query.filter(User.name.startswith("z")).all()
  • 查询名字不等于wang的所有数据[2种方式]:
# 方法一:
User.query.filter(User.name != "wang").all()
# 方法二:
from sqlalchemy import or_, and_,not_
User.query.filter(not_(User.name=="wang")).all()
  • 查询名字和邮箱都以 chen 开头的所有数据[2种方式]:
# 方法一:
User.query.filter(User.name.startswith("chen"), User.email.startswith("chen")).all()
# 方法二:
User.query.filter(and_(User.name.startswith("chen"), User.email.startswith("chen"))).all()
  • 查询password是 123456 或者 emailitheima.com 结尾的所有数据:
User.query.filter(or_(User.pswd == "123456", User.email=="itcast.com")).all()
  • 查询id为 [1, 3, 5, 7, 9] 的用户列表:
User.query.filter(User.id.in_([1,3,5,7,9])).all()
  • 查询name为zhou的角色数据:
User.query.filter(User.name == "zhou").all()
  • 查询所有用户数据,并以邮箱排序:
User.query.order_by(User.email).all()
  • 每页3个,查询第2页的数据:
User.query.paginate(2,3,False).items

6.数据库修改操作:

完成后需要由sqlalchemy的对象db通过session.commit()提交操作

6.1更新数据:

方法一:

user = User.query.first()
user.name = 'dong'
db.session.commit()

方法二:

User.query,filter(User.name == "zhang").update({"name":"li"})
db.session.commit()

方法三:

user = user = User.query.get(2)
user.name = "zhang"
User.query.all()
db.session.add(user)    # 通过额外的一个对象修改的数据,需要先添加这个对象,再提交
db.session.commit()

7. 一对多和多对多查询:

7.1 一对多:

r = Role.query.get(1)	# 拿到对象
r.us	# 使用关系引用返回的对象

7.2 多对一

u = User.query.get(2)
u.role  

二、综合案例:图书管理系统:

1.开发基本过程:

  • 需求: 实现图书案例,数据的增删改查.
  • 流程:
    • 1.web表单:wtf扩展,添加数据,验证函数,csrf保护,自定义表单类,设置秘钥,表单域中需要设置csrf_token
    • 2.模板:使用模板语法,获取视图返回的数据,使用语句遍历后端返回的数据;
    • 3.模型类:flask_sqlalchemy,配置数据的连接和动态追踪修改,定义模型(作者和图书),创建表,添加测试数据
    • 4.业务逻辑:视图中数显wtf表单,实现数据的查询,删除,调用模板

2. 代码实现:

  • 实现简单的功能,然后版本迭代

版本一: 功能实现

模板文件:index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>图书管理</title>
</head>
<body>
    <form method="post">
        {{ form.csrf_token }}
        <p>作者: {{ form.wtf_author }}</p>
        <p>书名: {{ form.wtf_book }}</p>
        <p>{{ form.wtf_submit}}</p>
    </form>

    <ul>
        {% for author in authors %}
            <li>{{ author.name}}</li>
            <a href="/delete_author/{{ author.id }}">删除</a>
            <br>
        {% endfor %}
    </ul>

    <ul>
        {% for book in books %}
            <li>{{ book.info }}</li>
            <a href="/delete_book/{{ book.id }}">删除</a>
            <br>
        {% endfor %}
    </ul>
</body>
</html>

视图模块:demo02.py

导入使用的模块

from flask import Flask, render_template,url_for,redirect
# 导入wtf表单
from flask_wtf import FlaskForm
# 导入wtf的字段类型
from wtforms import StringField, SubmitField
# 导入表单的验证字段
from wtforms.validators import DataRequired
# 导入sqlalcjemy模块
from flask_sqlalchemy import SQLAlchemy

实例化Flask对象, 数据库对象和设置配置文件

app = Flask(__name__)

# 设置秘钥
app.config['SECRET_KEY'] = "frekflnlngldnglk"
# 配置数据库
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql://root:passwd@127.0.0.1:3306/auth_book"
# 关闭动检测
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False

db = SQLAlchemy(app)

定义表单类

# 定义表单
class Form(FlaskForm):
    wtf_author = StringField(validators=[DataRequired()])
    wtf_book = StringField(validators=[DataRequired()])
    wtf_submit = SubmitField("提交")

定义作者模型类

# 定义作者模型
class Author(db.Model):
    __tablename__ = "authors"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(32), unique=True)

    # 重写方法
    def __repr__(self):
        return "name: %s \n" % self.name

定义书籍模型类

# 定义书籍
class Book(db.Model):
    __tablename__ = "books"
    id = db.Column(db.Integer, primary_key=True)
    info = db.Column(db.String(32), unique=True)

    # 重写方法
    def __repr__(self):
        return "book_name: %s \n" % self.info

实现主页面视图函数:

# 定义主页面的视图
@app.route('/',methods=['GET','POST'])
def index():
    form = Form()
    # 获取数据
    authors = Author.query.all()
    books = Book.query.all()

    if form.validate_on_submit():
        author = form.wtf_author.data
        book = form.wtf_book.data

        au = Author(name = author)
        bk = Book(info = book)
        db.session.add_all([au, bk])
        db.session.commit()

        authors = Author.query.all()
        books = Book.query.all()

    print(form.wtf_author.data)
    print(form.wtf_book.data)
    print(form.validate_on_submit())

    return render_template("index.html", authors = authors, books = books,form=form)

实现删除作者的视图函数:

 # 定义删除作者的视图函数
@app.route("/delete_author/<int:id>")
def del_auth(id):
    author = Author.query.get(id)
    db.session.delete(author)
    db.session.commit()
    return redirect(url_for("index"))

实现删除图书的视图函数:

 # 定义删除图书的视图函数
@app.route("/delete_book/<int:id>")
def del_book(id):
    book = Book.query.get(id)
    db.session.delete(book)
    db.session.commit()
    return redirect(url_for("index"))

入口主函数

if __name__ == '__main__':
    app.run(debug=True)

版本二: 在原有基础上做异常处理:

主页面视图加入异常处理:

# 定义主页面的视图
@app.route('/',methods=['GET','POST'])
def index():
    form = Form()
    authors, books = None, None
    # 获取数据
    try:
        authors = Author.query.all()
        books = Book.query.all()
    except Exception as e:
        print(e)

    if form.validate_on_submit():
        author = form.wtf_author.data
        book = form.wtf_book.data

        try:
            au = Author(name = author)
            bk = Book(info = book)
            db.session.add_all([au, bk])
            db.session.commit()
        except Exception as e:
            print(e)
            # 添加数据如果发生异常,需要进行回滚
            db.session.rollback()

        # 获取数据
        try:
            authors = Author.query.all()
            books = Book.query.all()
        except Exception as e:
                print(e)

    return render_template("index.html", authors = authors, books = books,form=form)

删除作者视图加入异常处理:

# 定义删除作者的视图函数
@app.route("/delete_author/<int:id>")
def del_auth(id):
    author = Author.query.get(id)
    try:
        # 执行删除
        db.session.delete(author)
        db.session.commit()
    except Exception as e:
        print(e)
        db.session.rollback()
    return redirect(url_for("index"))

删除图书视图加入异常处理:

# 定义删除图书的视图函数
@app.route("/delete_book/<int:id>")
def del_book(id):
    book = Book.query.get(id)
    try:
        # 执行删除
        db.session.delete(book)
        db.session.commit()
    except Exception as e:
        print(e)
        db.session.rollback()
    return redirect(url_for("index"))

需要异常处理的情况:

  • 对数据库的增删改查都需要异常处理;
  • 修改数据库时需要异常处理。如果发生异常,需要回滚数据库:db.session.rollback()

在请求过程中自动提交数据:(但是不建议使用)

app.config["SQLALCHEMY_COMMIT_ON_TEARDOWN] = True

三、常见关系模板代码:

1.一对多:

示例场景:

  • 用户与其发布的帖子(用户表与帖子表)
  • 角色与所属于该角色的用户(角色表与多用户表)

实例代码:

class Role(db.Model):
    """角色表"""
    __tablename__ = 'roles'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)
    users = db.relationship('User', backref='role', lazy='dynamic')

class User(db.Model):
    """用户表"""
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True, index=True)

2.多对多:

示例场景

  • 讲师与其上课的班级(讲师表与班级表)
  • 用户与其收藏的新闻(用户表与新闻表)
  • 学生与其选修的课程(学生表与选修课程表)

示例代码:

tb_student_course = db.Table('tb_student_course',
                             db.Column('student_id', db.Integer, db.ForeignKey('students.id')),
                             db.Column('course_id', db.Integer, db.ForeignKey('courses.id'))
                             )

class Student(db.Model):
    __tablename__ = "students"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)

    courses = db.relationship('Course', secondary=tb_student_course,
                              backref=db.backref('students', lazy='dynamic'),
                              lazy='dynamic')

class Course(db.Model):
    __tablename__ = "courses"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)```

3.自关联一对多:

示例场景

  • 评论与该评论的子评论(评论表)
  • 参考网易新闻
  • 省市区表

实例代码:

class Comment(db.Model):
    """评论"""
    __tablename__ = "comments"

    id = db.Column(db.Integer, primary_key=True)
    # 评论内容
    content = db.Column(db.Text, nullable=False)
    # 父评论id
    parent_id = db.Column(db.Integer, db.ForeignKey("comments.id"))
    # 父评论(也是评论模型)
    parent = db.relationship("Comment", remote_side=[id],
                             backref=db.backref('childs', lazy='dynamic'))

# 测试代码
if __name__ == '__main__':
    db.drop_all()
    db.create_all()

    com1 = Comment(content='我是主评论1')
    com2 = Comment(content='我是主评论2')
    com11 = Comment(content='我是回复主评论1的子评论1')
    com11.parent = com1
    com12 = Comment(content='我是回复主评论1的子评论2')
    com12.parent = com1

    db.session.add_all([com1, com2, com11, com12])
    db.session.commit()
    app.run(debug=True)

4.自关联多对多:

示例场景

  • 用户关注其他用户(用户表,中间表)

示例代码:

tb_user_follows = db.Table(
    "tb_user_follows",
    db.Column('follower_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True),  # 粉丝id
    db.Column('followed_id', db.Integer, db.ForeignKey('info_user.id'), primary_key=True)  # 被关注人的id
)

class User(db.Model):
    """用户表"""
    __tablename__ = "info_user"

    id = db.Column(db.Integer, primary_key=True)  
    name = db.Column(db.String(32), unique=True, nullable=False)

    # 用户所有的粉丝,添加了反向引用followed,代表用户都关注了哪些人
    followers = db.relationship('User',
                                secondary=tb_user_follows,
                                primaryjoin=id == tb_user_follows.c.followed_id,
                                secondaryjoin=id == tb_user_follows.c.follower_id,
                                backref=db.backref('followed', lazy='dynamic'),
                                lazy='dynamic')

四、数据库的迁移:

1.为什么要迁移:

  • 在开发过程中,需要修改数据库模型,而且还要在修改之后更新数据库。最直接的方式就是删除旧表,但这样会丢失数据。
  • 更好的解决办法是使用数据库迁移框架,它可以追踪数据库模式的变化,然后把变动应用到数据库中。
  • 在Flask中可以使用Flask-Migrate扩展,来实现数据迁移。并且集成到Flask-Script中,所有操作通过命令就能完成。
  • 为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上。

2.迁移流程:

  • 在开发过程中,需要修改数据库模型,而且还要在修改之后更新数据库。最直接的方式就是删除旧表,但这样会丢失数据。
  • 更好的解决办法是使用数据库迁移框架,它可以追踪数据库模式的变化,然后把变动应用到数据库中。
  • 在Flask中可以使用Flask-Migrate扩展,来实现数据迁移。并且集成到Flask-Script中,所有操作通过命令就能完成。
  • 为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上。

文件名:sql_moe.py

from flask import Flask

from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate,MigrateCommand
from flask_script import Shell,Manager

app = Flask(__name__)
manager = Manager(app)

app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/Flask_test'
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

#第一个参数是Flask的实例,第二个参数是Sqlalchemy数据库实例
migrate = Migrate(app,db) 

#manager是Flask-Script的实例,这条语句在flask-Script中添加一个db命令
manager.add_command('db',MigrateCommand)

#定义模型Role
class Role(db.Model):
    # 定义表名
    __tablename__ = 'roles'
    # 定义列对象
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)
    user = db.relationship('User', backref='role')

    #repr()方法显示一个可读字符串,
    def __repr__(self):
        return 'Role:'.format(self.name)

#定义用户
class User(db.Model):
    __talbe__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(64), unique=True, index=True)
    #设置外键
    role_id = db.Column(db.Integer, db.ForeignKey('roles.id'))

    def __repr__(self):
        return 'User:'.format(self.username)


if __name__ == '__main__':
    manager.run()

步骤:

  • python 文件名.py db init

  • python 文件名 db migrate -m “版本描述”

  • python 文件名.py db upgrade

  • 观察mysql表结构,完成第一次迁移

  • 根据要求修改模型

  • python 文件名.py db migrate -m “版本描述”

  • python 文件名.py db upgrade

  • 观察表结构,完成修改

  • 若返回版本,则利用python 文件名 db history 查看版本号

  • python 文件名 db downgrade(或 upgrade) 版本号