Flask入门三(Flask-session的使用、数据库链接池、wtforms、Flask定制命令、Flask-Cache、信号)
Flask内置的Session会把数据加密后保存到浏览器 我们自己重写Session类保存到Reids中只需要重写open_session和save_session方法。而在这中间有一个模块就做了这件事,那就是flask-session,以把数据存放到文件、redis、mongodb、关系型数据库等中。每个人:线程用自己的从conn和cursor,在视图函数中,拿到链接和cursor。但是这种如果
文章目录
一、Flask-session使用
Flask内置的Session会把数据加密后保存到浏览器 我们自己重写Session类保存到Reids中只需要重写open_session和save_session方法
而在这中间有一个模块就做了这件事,那就是flask-session,以把数据存放到文件、redis、mongodb、关系型数据库等中
安装flask-session
pip install flask-session
1.使用方式一
from flask import Flask,session
app = Flask(__name__)
app.debug=True
app.secret_key='jlkdoasfiuz'
# 只要使用session就需要secret_key
1.安装flask-session pip install flask-session
'使用方式一'
2.导入(这里我写入到redis缓存数据库中)
from flask_session import RedisSessionInterface
3.把app.session_interface替换成RedisSessionInterface的对象
# 替换了就会走RedisSessionInterface的open_session和save_session
from redis import Redis
conn=Redis(host='127.0.0.1',port=6379,db=2)
# 需要传入的参数 redis, key_prefix, use_signer, permanent, sid_length
'''
1.redis 是传入链接的redis库,链接对象
2.key_prefix 是保存在redis中名称的前缀
3.use_signer 是如果是False就无需配置secret_key,默认设置True
4.permanent 是关闭浏览器cookie是否失效
5.sid_length 是生成session_key的长度,会以cookie形式写入到浏览器cookie中,但是去掉redis中session这个前缀
去掉前缀就是session_key的长度限制
'''
app.session_interface=RedisSessionInterface(
redis=conn,key_prefix='session',use_signer=False,
permanent=True,sid_length=32
)
@app.route('/set_session')
def set_session():
session['name'] = 'jack'
return 'set_session'
@app.route('/get_session')
def get_session():
print(session.get('name'))
return 'get_session'
if __name__ == '__main__':
app.run()
2.使用方式二
from flask import Flask,session
app = Flask(__name__)
app.debug=True
app.secret_key='jlkdoasfiuz'
# 只要使用session就需要secret_key
1.安装flask-session pip install flask-session
'使用方式二'
2.在flask配置文件中加入配置
from redis import Redis # 导入redis
app.config['SESSION_TYPE'] = 'redis' # 配置链接的类型
app.config['SESSION_REDIS']=Redis(host='127.0.0.1',port=6379,db=2)
# app.config['SESSION_KEY_PREFIX'] = 'session' # 如果不写,默认以SESSION_COOKIE_NAME作为key
# app.config.from_pyfile('./settings') # 第二种导入配置文件方式
3.导入Session
from flask_session import Session
Session(app) # 核心和方式一一模一样,具体看源码
@app.route('/set_session')
def set_session():
session['name'] = 'jack'
return 'set_session'
@app.route('/get_session')
def get_session():
print(session.get('name'))
return 'get_session'
if __name__ == '__main__':
app.run()
3.读RedisSessionInterface源码
1.RedisSessionInterface的open_session(在它的父类中)
def open_session(self, app, request):
# -取到前端传入,在cookie中得随机字符串
sid = request.cookies.get(app.config["SESSION_COOKIE_NAME"])
if not sid:
sid = self._generate_sid(self.sid_length)
# 当sid不为空,把sid传入到session_class得到对象
return self.session_class(sid=sid, permanent=self.permanent)
if self.use_signer: # 用来加密,所以第一种方式的ues_signer最好不要改为False
try:
sid = self._unsign(app, sid)
except BadSignature:
sid = self._generate_sid(self.sid_length)
return self.session_class(sid=sid, permanent=self.permanent)
return self.fetch_session(sid)
def fetch_session(self, sid):
# 取到随机字符串
prefixed_session_id = self.key_prefix + sid
# 从redis中取出key为前缀+随机字符串对应的value值
value = self.redis.get(prefixed_session_id)
if value is not None:
try:
# 解密成字符串
session_data = self.serializer.loads(value)
# 把解密后的数据,组装到 session对象中
return self.session_class(session_data, sid=sid)
except pickle.UnpicklingError:
return self.session_class(sid=sid, permanent=self.permanent)
return self.session_class(sid=sid, permanent=self.permanent)
2.RedisSessionInterface的save_session(在它自己内)
def save_session(self, app, session, response):
if not self.should_set_cookie(app, session):
return
domain = self.get_cookie_domain(app)
path = self.get_cookie_path(app)
if not session: # 如果session有值
if session.modified: # 如果值被修改过,就把cookie中的删除,并且删除redis中的
self.redis.delete(self.key_prefix + session.sid)
response.delete_cookie(
app.config["SESSION_COOKIE_NAME"], domain=domain, path=path
)
return
#
expiration_datetime = self.get_expiration_time(app, session)
serialized_session_data = self.serializer.dumps(dict(session))
# 放到redis中
self.redis.set(
name=self.key_prefix + session.sid,
value=serialized_session_data,
ex=total_seconds(app.permanent_session_lifetime), # 过期时间
)
# 把session对应的随机字符串放到cookie中
self.set_cookie_to_response(app, session, response, expiration_datetime)
4.flask-session补充
- session的前缀如果不传,默认:config.setdefault('SESSION_KEY_PREFIX', 'session:')
- session过期时间:通过配置,如果不写,会有默认
'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制
-设置cookie时,如何设定关闭浏览器则cookie失效
permanent=False
app.config['SESSION_PERMANENT'] = False
二、数据库连接池
1.flask中使用mysql
'settings.py'
SECRET_KEY = 'fdsjakluiz'
DEBUG = True
MYSQL_USER = 'root'
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_PASSWORD = '1234'
MYSQL_DATABASE = 'cnblogs'
JSON_AS_ASCII = False
'app.py'
import pymysql.cursors
from flask import Flask, jsonify
app = Flask(__name__)
app.config.from_pyfile('./settings.py')
# pymysql操作mysql
from pymysql import Connect
conn = Connect(
user=app.config.get('MYSQL_USER'),
password=app.config.get('MYSQL_PASSWORD'),
host=app.config.get('MYSQL_HOST'),
database=app.config.get('MYSQL_DATABASE'),
port=app.config.get('MYSQL_PORT'),
)
# pymysql.cursors.DictCursor查出来的是列表套字典的形式
cursor = conn.cursor(pymysql.cursors.DictCursor)
# cursor = conn.cursor()
# app.config['JSON_AS_ASCII'] = False # 前端显示json格式中文
@app.route('/')
def articles():
cursor.execute('select id,title,author from article limit 10')
article_list = cursor.fetchall() # 拿出所有
return jsonify(article_list)
if __name__ == '__main__':
app.run()
这种方式
conn
和cursor
如果是全局,出现如下问题:
'上面的 conn和cursor 都是全局的'
假设极端情况:同时并发两个用户
-一个用户查询所有文章
-一个用户查询所有用户
'在线程中全局变量是共享的'
就会出现,第一个线程拿着cursor执行了:
cursor.execute('select id,title,author from article limit 10')
然后第二个线程拿着 cursor 执行了:
cursor.execute('select id,name from user limit 10')
第一个线程开始执行:(用的全是同一个cursor)
article_list = cursor.fetchall()
就会出现查询article的cursor取出来的数据是 用户相关数据---》出现数据错乱
2.上述问题解决
每个人:线程用自己的从conn和cursor,在视图函数中,拿到链接和cursor
import pymysql.cursors
from flask import Flask, jsonify
app = Flask(__name__)
app.config.from_pyfile('./settings.py')
# pymysql操作mysql
from pymysql import Connect
@app.route('/')
def articles():
conn = Connect(
user=app.config.get('MYSQL_USER'),
password=app.config.get('MYSQL_PASSWORD'),
host=app.config.get('MYSQL_HOST'),
database=app.config.get('MYSQL_DATABASE'),
port=app.config.get('MYSQL_PORT'),
)
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute('select id,title,author from article limit 10')
article_list = cursor.fetchall() # 拿出所有
return jsonify(article_list)
@app.route('/desc')
def desc():
conn = Connect(
user=app.config.get('MYSQL_USER'),
password=app.config.get('MYSQL_PASSWORD'),
host=app.config.get('MYSQL_HOST'),
database=app.config.get('MYSQL_DATABASE'),
port=app.config.get('MYSQL_PORT'),
)
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute('select id,real_desc from article limit 10')
article_list = cursor.fetchall() # 拿出所有
return jsonify(article_list)
if __name__ == '__main__':
app.run()
但是这种如果并发量过高,就会出现连接数过多的问题,mysql的性能就降低了
使用数据库连接池
'上述操作存在的问题'
1.原生pymysql操作,最好有一个rom---->sqlalchemy
2.并发问题:conn和cursor要做成单例,还是每个视图函数一个?
-如果使用单例,数据会错乱
-咱们需要,每个视图函数,哪一个链接,如果并发数过多,mysql链接数就会很多,所以使用连接池解决
# django orm操作,一个请求,就会拿到一个mysql的链接,用完后就释放
'所以想要彻底解决,得使用数据库连接池'
-限定 mysql链接最多,无论多少线程操作,都是从池中取链接使用
'解决上面的两个问题'
-数据库连接池
-创建一个全局的池
-每次进入视图函数,从池中取一个连接使用,使用完放回到池中,只要控制池的大小,就能控制mysql连接数
1.第三方数据库连接池
'pool.py' 在这个文件中配置池也也可以在视图函数中配置
# 1 安装 pip install dbutils
# 2 使用:实例化得到一个池对象---》池是单例
from dbutils.pooled_db import PooledDB
import pymysql
POOL=PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='1234',
database='cnblogs',
charset='utf8'
)
'app.py' 视图函数
from flask import Flask,jsonify
app = Flask(__name__)
app.config.from_pyfile('./settings.py')
import time
from pool import POOL
from pymysql.cursors import DictCursor
## 3 在视图函数中导入使用
@app.route('/article')
def article():
conn = POOL.connection()
cursor = conn.cursor(DictCursor)
# 获取10条文章
cursor.execute('select id,title,author from article limit 10')
time.sleep(1)
# 切换
res = cursor.fetchall()
print(res)
return jsonify({'code': 100, 'msg': '成功', 'result': res})
if __name__ == '__main__':
app.run()
2.操作数据库不带池版
from flask import Flask,jsonify
app = Flask(__name__)
app.config.from_pyfile('./settings.py')
import time
## 3 在视图函数中导入使用
@app.route('/article')
def article():
import pymysql
from pymysql.cursors import DictCursor
conn = pymysql.connect(user='root',
password="1234",
host='127.0.0.1',
database='cnblogs',
port=3306)
cursor = conn.cursor(DictCursor)
# 获取10条文章
cursor.execute('select id,title,author from article limit 10')
time.sleep(1)
# 切换
res = cursor.fetchall()
print(res)
return jsonify({'code': 100, 'msg': '成功', 'result': res})
if __name__ == '__main__':
app.run(port=5001)
3.池版和非池版压测
压测代码 jmeter工具---》java
import requests
from threading import Thread
# 没有连接池
def task():
# res = requests.get('http://127.0.0.1:5000/article') # 带连接池版
res = requests.get('http://127.0.0.1:5001/article') # 不带连接池版
print(res.json())
if __name__ == '__main__':
l = []
for i in range(100):
t = Thread(target=task)
t.start()
l.append(t)
for i in l:
i.join()
'''效果是:使用池的连接数明显小,不使用池连接数明显很大'''
查看数据库连接数:
show status like '%Threads%';
三、wtforms
wtforms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证、渲染错误信息、渲染页面
app.py
from flask import Flask,render_template,request,redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app=Flask(__name__)
app.debug=True
class LoginForm(Form):
# 字段(内部包含正则表达式)
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(), # 页面上显示的插件
render_kw={'class': 'form-control'}
)
# 字段(内部包含正则表达式)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
form = LoginForm()
return render_template('login.html', form=form)
else:
form = LoginForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('login.html', form=form)
if __name__ == '__main__':
app.run()
login.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
<p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
<p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
<input type="submit" value="提交">
</form>
</body>
</html>
四、Flask定制命令
1.使用 flask-script定制命令(老版本,新版本不用了)
flask 老版本中,没有命令运行项目,自定制命令
flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令
新版的flask--》官方支持定制命令 click 定制命令,这个模块就弃用了
flask-migrate 老版本基于flask-script,新版本基于flask-click写的
使用步骤
-1 pip3 install Flask-Script==2.0.3
-2 pip3 install flask==1.1.4
-3 pip3 install markupsafe=1.1.1
-4 使用
from flask_script import Manager
manager = Manager(app)
if __name__ == '__main__':
manager.run()
-5 自定制命令
@manager.command
def custom(arg):
"""自定义命令
python manage.py custom 123
"""
print(arg)
- 6 执行自定制命令
python manage.py custom 123
2.新版本定制命令
from flask import Flask
import click
app = Flask(__name__)
自定制命令,通过create-user传入用户名就可以创建一个用户来
@app.cli.command('create-user')
@click.argument('name')
def create_user(name):
# from pool import POOL
# conn = POOL.connection()
# cursor=conn.cursor()
# cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'hello123'])
# conn.commit()
print(name)
命令行中执行
-flask --app .\Flask定制命令.py:app create-user jack
-简写成 前提条件式app所在的py文件名叫app.py
-flask create-user jack
@app.route('/')
def index():
return 'index'
-运行项目的命令:flask --app .\Flask定制命令.py:app run
if __name__ == '__main__':
app.run()
3.Django中自定制命令
1 app下新建文件夹
management/commands/
2 在该文件夹下新建py文件,随便命名(命令名)
3 在py文件中写代码
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = '命令提示'
def handle(self, *args, **kwargs):
命令逻辑
4 使用命令
python manage.py py文件(命令名)
五、Flask-Cache
具体使用可以自寻去官方查看:https://flask-caching.readthedocs.io/en/latest/
from flask import Flask,render_template
# 安装 pip install Flask-Caching
from flask_caching import Cache
config = {
"DEBUG": True,
"CACHE_TYPE": "SimpleCache",
"CACHE_DEFAULT_TIMEOUT": 300
}
app = Flask(__name__)
app.config.from_mapping(config)
cache = Cache(app)
@app.route("/")
@cache.cached(timeout=50)
def index():
return render_template('index.html')
@app.route('/set_cache')
def set_cache():
cache.set('name','xxxx')
return 'set_cache'
@app.route('/get_cache')
def get_cache():
res = cache.get('name')
return res
if __name__ == '__main__':
app.run()
六、信号
信号量:
Semaphore信号量可以理解为多把锁,同时允许多个线程来更改数据
信号:
Signal是Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为
1.内置信号
内置信号,只在flask请求过程中,源码中定义的信号,这个无需我们定义和触发,只要写了函数跟它对应,执行到这里就会自动触发函数的执行
安装信号:pip install blinker
request_started = _signals.signal('request-started') # 请求到来前执行
request_finished = _signals.signal('request-finished') # 请求结束后执行
before_render_template = _signals.signal('before-render-template') # 模板渲染前执行
template_rendered = _signals.signal('template-rendered') # 模板渲染后执行
got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行
request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
可用于模版渲染之前执行
(before_render_template)
,可以记录日志
2.内置信号的使用步骤
'以模版渲染之前的信号函数为例'
from flask import Flask,render_template
app = Flask(__name__)
1.写一个函数
def before_render(*args,**kwargs):
print(args)
# args 第0个位置就是app对象
print(kwargs)
# kwargs 其他都在这里 哪个模板,上下文,请求对象,session
print('记录日志,模版要渲染了')
2.跟内置信号绑定
from flask.signals import before_render_template
before_render_template.connect(before_render)
3.源码中触发信号执行(我们无需动),等待信号被触发执行
# before_render_template.send() 源码在模版渲染之前,它已经写了
'这样以后,只要有模版都会执行信号函数,不管是哪个函数,它会自己区分'
@app.route('/')
def index():
return render_template('index.html')
if __name__ == '__main__':
app.run(debug=True)
3.自定义信号
from flask import Flask,render_template
app = Flask(__name__)
1.自定义信号
from flask.signals import _signals
# 自定义一个数据库保存执行的信号
db_save = _signals.signal('db_save')
2.写一个函数
def db_save_fun(*args,**kwargs):
print(args)
# args 没有拿到数据
print(kwargs)
# kwargs 拿到其他数据,自定义的都是可以自己定制返回数据
print('表数据插入了')
3.跟自定义信号绑定
db_save.connect(db_save_fun)
4.在真正的数据库保存的时候执行
import pymysql
from pymysql import Connect
@app.route('/create_article')
def create_article():
conn = Connect(
user='root',
password='1234',
host='127.0.0.1',
database='cnblogs',
port=3306,
)
cursor = conn.cursor()
cursor.execute('INSERT INTO article (title,author) VALUES (%s,%s)',args=['测试标题1','测试作者1'])
conn.commit() # 提交
'在这里需要手动触发信号执行,需要在代码中写,可以传参数,传入的参数,自定义信号绑定的函数可以拿到'
db_save.send(table_name='article', info={'title', '测试标题', 'author', '测试作者'}) # 这里是自己定制写的
conn.close()
return '插入成功'
if __name__ == '__main__':
app.run(debug=True)
4.Django中信号的使用
Django提供一种信号机制。其实就是观察者模式,又叫发布-订阅(Publish/Subscribe) 。当发生一些动作的时候,发出信号,然后监听了这个信号的函数就会执行。
通俗来讲,就是一些动作发生的时候,信号允许特定的发送者去提醒一些接受者。用于在框架执行操作时解耦。
利用信号能干啥?
1 某个位置记录日志
2 用户只要一注册---》发个注册邮件通知
3 用户只要一异地登录,发个邮件
使用信号好处?
解耦(降低耦合度)
Django中内置信号
Model signals
-pre_init # django的modal执行其构造方法前,自动触发
-post_init # django的modal执行其构造方法后,自动触发
-pre_save # django的modal对象保存前,自动触发
-post_save # django的modal对象保存后,自动触发
-pre_delete # django的modal对象删除前,自动触发
-post_delete # django的modal对象删除后,自动触发
-m2m_changed # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发
-class_prepared # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signals
-pre_migrate # 执行migrate命令前,自动触发
-post_migrate # 执行migrate命令后,自动触发
Request/response signals
-request_started # 请求到来前,自动触发
-request_finished # 请求结束后,自动触发
-got_request_exception # 请求异常后,自动触发
Test signals
-setting_changed # 使用test测试修改配置文件时,自动触发
-template_rendered # 使用test测试渲染模板时,自动触发
Database Wrappers
-connection_created # 创建数据库连接时,自动触发
对于Django内置的信号,仅需注册指定信号,当程序执行相应操作时,自动触发注册函数:
使用Django内置信号方式
from django.core.signals import request_finished
from django.core.signals import request_started
from django.core.signals import got_request_exception
from django.db.models.signals import class_prepared
from django.db.models.signals import pre_init, post_init
from django.db.models.signals import pre_save, post_save
from django.db.models.signals import pre_delete, post_delete
from django.db.models.signals import m2m_changed
from django.db.models.signals import pre_migrate, post_migrate
from django.test.signals import setting_changed
from django.test.signals import template_rendered
from django.db.backends.signals import connection_created
使用案例
'(只要用户一注册,我们就发邮件,伪代码)'
'views.py'
from .models import User
def index(request):
User.objects.create(name='jack',password='123')
return HttpResponse('注册成功')
'放到__init__.py里面'
from django.db.models.signals import post_save # 导入需要的信号
import logging
def callBack(sender, **kwargs):
print(sender) # 拿到的是表
print(kwargs.get('instance').name) # 这里就可以拿到对象
print('发短信了')
pre_save.connect(callBack)
Django使用自定义信号
a. 定义信号(一般创建一个py文件)(toppings,size 是接受的参数)
import django.dispatch
pizza_done = django.dispatch.Signal(providing_args=["toppings", "size"])
b. 注册信号
def callback(sender, **kwargs):
print("callback")
print(sender,kwargs)
pizza_done.connect(callback)
c. 触发信号
from 路径 import pizza_done
pizza_done.send(sender='seven',toppings=123, size=456)
由于内置信号的触发者已经集成到Django中,所以其会自动调用,而对于自定义信号则需要开发者在任意位置触发。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)