zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Flask-SQLAlchemy学习笔记

笔记学习 Flask SqlAlchemy
2023-06-13 09:15:18 时间

Flask-SQLAlchemy是一个Flask扩展,简化了在Flask应用中使用SQLAlchemy的操作,SQLAlchemy是一个强大的关系型数据库框架,支持多种数据库后台。其安装方式与其他扩展一样使用pip安装即可:pip install flask-sqlalchemy。 在Flask-SQLAlchemy中,指定使用何种数据库是通过URL来实现的,各种主流数据库引擎使用URL格式如下:

# hostname:数据库服务所在主机
# database:使用的数据库名
# SQLite数据库没有服务器,因此不用指定hostname,username,password,而url中的database指磁盘中的文件名
------------------------------------------------------------------------------------
|数据库引擎            |     URL                                                    |
------------------------------------------------------------------------------------
|Mysql                 |     myslq://username:password@hostname/database           |
|Postgres              |     postgresql://username:password@hostname/database      |
|SQLite(Linux,macOS)   |     sqlite:////absolute/path/to/database                  |
|SQLite(windows)       |     sqlite:///c:/absolute/path/to/database                |
------------------------------------------------------------------------------------

如何应用我们所需的数据库?

# 我们只需要在配置文件中添加如下配置即可,这里以SQLite为例:
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///"+os.path.join(db_dir, "data.sqlite")

知识点笔记

笔记与下面的实列是对应的

# Windows下flask启动:
# set FLASK_APP=sql_test.py
# flask run

# windows端进入flask shell报错: Could not locate a Flask application. You did not provide the "FLASK_APP" environment variable。。。。。。。
# 可以通过如下方法进入:
# $env:FLASK_APP="sql_test.py"
# flask shell

# 创建表,进入flask shell后,创建完表后,会在当前目录下生成一个数据库文件(***.sqlite)
# from sql_test import app_db
# app_db.create_all()   # 创建表
# app_db.drop_all()     # 删除表,当数据库模型变更时,使用app_db.create_all()不会更新当前已存在的数据库,可以通过这中暴力方式3直接删除该文件,在重新生成

# 插入行,数据,同样进入flask shell模式进行操作
# from sql_test import Role,User
# admin_role=Role(name="admin")
# mod_role=Role(name="moderator")
# user_role=Role(name="user")
# user_ldd=User(username="ldd",role=admin_role)

# 将改动/新建对象添加到数据库会话管理(app_db.session)中:
# app_db.session.add(admin_role)
# app_db.session.add(mod_role)
# app_db.session.add(user_role)
# app_db.session.add(user_ldd)
# 或者简写成:
# app_db.session.add_all([admin_role,mod_role,user_role,user_ldd])

# 提交会话:commit()
# app_db.session.commit()

# 删除:delete()
# app_db.session.delete(mod_role)   # 删除mod_role角色,注意删除后需要提交才生效

# flask shell中查看数据库中对象的属性:
# print(admin_role.id)

# 注意,对数据库的操作,都需要:添加到数据库会话管理,然后在提交才会真正的在数据库中修改

# 查询:
# 使用query对象中all()方法查询查询相应表中所有记录:Role.query.all(),这里的all()是返回所有的结果,还有一个是first()方法,其表示只返回第一个结果,如果没有取到结果则返回None
# 使用过滤器(filter_by())来更加精确的搜索数据库中的数据,如:User.query.filter_by(role=admin_role).all(),表示返回user表中角色为管理员的数据
# query对象能调用的过滤器有很多,如:
# ------------------------------------------------------
# filter()	把过滤器添加到原查询上,返回一个新查询
# filter_by()	把等值过滤器添加到原查询上,返回一个新查询
# limit()	使用指定的值限制原查询返回的结果数量,返回一个新查询
# offset()	偏移原查询返回的结果,返回一个新查询
# order_by()	根据指定条件对原查询结果进行排序,返回一个新查询
# group_by()	根据指定条件对原查询结果进行分组,返回一个新查询
# -------------------------------------------------------
# 查询执行方法:
# -------------------------------------------------------
# all()	以列表形式返回查询的所有结果
# first()	返回查询的第一个结果,如果没有结果,则返回None
# first_or_484()	返回查询的第一个结果,如果没有结果,则终止请求,返回404错误响应
# get()	返回指定主键对应的行,如果没有对应的行,则返回None
# get_or_484	返回指定主键对应的行,如果没有找到指定的主键,则终止请求,返回404错误响应
# count()	返回查询结果的数量
# paginate()	返回一个Paginate对象,它包含指定范围内的结果

# 关系查询(在role表中有相关关系的定义语句):
# users=user_role.users    # user表中用户角色为user的,查询结果:[<role 'tommonkey'>],发现结果为列表形式,自动执行查询all()方法,为了禁止自动查询执行方法,我们可以在Role中的关系定义中加入lazy="dynamic"
# users = app_db.relationship("User",backref="role",lazy="dynamic")
# 这样我们添加语句后user_role.users就不会即可执行,因此就可以在后面添加过滤器来精确查找数据了
# user_role.users.order_by(User.username).all()  # 这样添加order_by来对返回的数据按照字母顺序排列
# user_role.users.count()   # 返回数量

实列练习

from flask import Flask,url_for,redirect,render_template
from flask_sqlalchemy import SQLAlchemy
import os
from flask import session
from flask_wtf import FlaskForm
from wtforms import StringField,SubmitField
from wtforms.validators import DataRequired
from flask_bootstrap import Bootstrap
from flask_migrate import Migrate

app = Flask(__name__)
base_model = Bootstrap(app)
app_db = SQLAlchemy(app)
migrate = Migrate(app,app_db)

db_dir = os.path.dirname(__file__)
app.config["SECRET_KEY"] = "TOMMONKEY"
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///"+os.path.join(db_dir, "data.sqlite")
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False

# 定义数据库模型
class Role(app_db.Model):
    __tablename__ = "role"
    id = app_db.Column(app_db.Integer, primary_key=True)
    name = app_db.Column(app_db.String(64), unique=True)   # unique:该列不允许出现重复的值

    # 定义关系:
    users = app_db.relationship("User",backref="role")

    def __repr__(self):
         return "<role %r>" % self.name

# 定义数据库模型
class User(app_db.Model):
    __tablename__ = "user"
    id = app_db.Column(app_db.Integer,primary_key=True)
    username = app_db.Column(app_db.String(64), unique=True, index=True)

    # 定义外键,这里的role_id即为role表中的id
    role_id = app_db.Column(app_db.Integer,app_db.ForeignKey("role.id"))    # 看清楚,这里是role.id,不是role_id

    def __repr__(self):
         return "<role %r>" % self.username

class NameForm(FlaskForm):
    id = StringField(u"input your ID", validators=[DataRequired("Must input something")])
    name = StringField(u'input your name', validators=[DataRequired('must input something')])
    submit = SubmitField('Submit')

# 集成pthonshell:shell上下文处理器,为了解决每次运行时都要收到进入shell导入模型和实列这种重复的工作
@app.shell_context_processor
def make_shell_comtent():
    return dict(db=app_db,User=User,Role=Role)

# 视图中操作数据库
@app.route("/queryUser",methods=['GET','POST'])
def queryUser():
    nameform = NameForm()
    if nameform.validate_on_submit():
        if nameform.name.data is not None:
            # 如果数据库中查不到该用户,就注册一个
            username = User.query.filter_by(username=nameform.name.data).first()
            if username is None:
                new_user = User(username=nameform.name.data)
                app_db.session.add(new_user)
                app_db.session.commit()
                session['known'] = False
            else:
                session['known'] = True
            session['name'] = nameform.name.data
            nameform.name.data = ""
            return redirect(url_for("queryUser"))
        else:
            return render_template("queryUser.html",form=nameform)
    return render_template("queryUser.html",form=nameform,name=session.get("name"),known=session.get("known",False))


if __name__ == "__main__":
    make_shell_comtent()
    app.run(debug=True,host="127.0.0.1",port=5000)

数据库迁移

为什么要迁移? 当我们修改数据库模型后还要自己更新数据库,每次都得删除旧的数据库表重新生成,这样得操作是不可逆得,所以我们是数据库迁移得办法类似与git的版本控制,可以监控数据库做出了那些变化,然后以增量的形式进行更新,就算出错了,我们还可也回滚!

# 创建迁移仓库之前首先安装Flask-Migrate
pip install flask-migrate
# 导入
from flask_migrate import Migrate
# 创建对象实列
migrate = Migrate(app,db)
# 初始化命令:该命令会在当前目录下创建migrations目录,所以的迁移脚本都会存放在这里
flask db init
# 自动创建迁移脚本,有时候自动创建迁移脚本是不一定能准确生成的,所以生成脚本后,记得一定要检查一下是否正确
flsk db migrate -m "initial migrate" 
# 更新数据库
flask db upgrade
# 更多关于该拓展的使用细节,请自行百度噢

OVER