一、Flask-session使用

Flask内置的Session会把数据加密后保存到浏览器 我们自己重写Session类保存到Reids中只需要重写open_session和save_session方法

而在这中间有一个模块就做了这件事,那就是flask-session,以把数据存放到文件、redis、mongodb、关系型数据库等中

安装flask-session

	pip install flask-session

1.使用方式一

	from flask import Flask,session
	app = Flask(__name__)
	app.debug=True
	app.secret_key='jlkdoasfiuz'
	# 只要使用session就需要secret_key
	1.安装flask-session  pip install flask-session

	'使用方式一'
	2.导入(这里我写入到redis缓存数据库中)
	from flask_session import RedisSessionInterface
	3.把app.session_interface替换成RedisSessionInterface的对象
	# 替换了就会走RedisSessionInterface的open_session和save_session
	from redis import Redis
	conn=Redis(host='127.0.0.1',port=6379,db=2)
	# 需要传入的参数 redis, key_prefix, use_signer, permanent, sid_length
	'''
	1.redis 是传入链接的redis库,链接对象
	2.key_prefix    是保存在redis中名称的前缀
	3.use_signer    是如果是False就无需配置secret_key,默认设置True
	4.permanent 是关闭浏览器cookie是否失效
	5.sid_length    是生成session_key的长度,会以cookie形式写入到浏览器cookie中,但是去掉redis中session这个前缀
	去掉前缀就是session_key的长度限制
	'''
	app.session_interface=RedisSessionInterface(
	    redis=conn,key_prefix='session',use_signer=False,
	    permanent=True,sid_length=32
	)
	
	
	@app.route('/set_session')
	def set_session():
	    session['name'] = 'jack'
	    return 'set_session'
	
	
	@app.route('/get_session')
	def get_session():
	    print(session.get('name'))
	    return 'get_session'
	
	
	if __name__ == '__main__':
	    app.run()

2.使用方式二

	from flask import Flask,session
	app = Flask(__name__)
	app.debug=True
	app.secret_key='jlkdoasfiuz'
	# 只要使用session就需要secret_key
	1.安装flask-session  pip install flask-session
	
	'使用方式二'
	2.在flask配置文件中加入配置
	from redis import Redis  # 导入redis
	app.config['SESSION_TYPE'] = 'redis'  # 配置链接的类型
	app.config['SESSION_REDIS']=Redis(host='127.0.0.1',port=6379,db=2)
	# app.config['SESSION_KEY_PREFIX'] = 'session'  # 如果不写,默认以SESSION_COOKIE_NAME作为key
	# app.config.from_pyfile('./settings')  # 第二种导入配置文件方式
	3.导入Session
	from flask_session import Session
	Session(app)  # 核心和方式一一模一样,具体看源码
	
	
	@app.route('/set_session')
	def set_session():
	    session['name'] = 'jack'
	    return 'set_session'
	
	
	@app.route('/get_session')
	def get_session():
	    print(session.get('name'))
	    return 'get_session'
	
	
	if __name__ == '__main__':
	    app.run()

3.读RedisSessionInterface源码

	1.RedisSessionInterface的open_session(在它的父类中)
    def open_session(self, app, request):
    	# -取到前端传入,在cookie中得随机字符串
        sid = request.cookies.get(app.config["SESSION_COOKIE_NAME"])
        if not sid:
            sid = self._generate_sid(self.sid_length)
            # 当sid不为空,把sid传入到session_class得到对象
            return self.session_class(sid=sid, permanent=self.permanent)
        if self.use_signer:  # 用来加密,所以第一种方式的ues_signer最好不要改为False
            try:
                sid = self._unsign(app, sid)
            except BadSignature:
                sid = self._generate_sid(self.sid_length)
                return self.session_class(sid=sid, permanent=self.permanent)
        return self.fetch_session(sid)
	
    def fetch_session(self, sid):
        # 取到随机字符串
        prefixed_session_id = self.key_prefix + sid
        # 从redis中取出key为前缀+随机字符串对应的value值
        value = self.redis.get(prefixed_session_id)
        if value is not None:
            try:
            	# 解密成字符串
                session_data = self.serializer.loads(value)
                # 把解密后的数据,组装到 session对象中
                return self.session_class(session_data, sid=sid)
            except pickle.UnpicklingError:
                return self.session_class(sid=sid, permanent=self.permanent)
        return self.session_class(sid=sid, permanent=self.permanent)
	
	2.RedisSessionInterface的save_session(在它自己内)
   	def save_session(self, app, session, response):
       if not self.should_set_cookie(app, session):
           return
       domain = self.get_cookie_domain(app)
       path = self.get_cookie_path(app)
       
       if not session: # 如果session有值
           if session.modified:  # 如果值被修改过,就把cookie中的删除,并且删除redis中的
               self.redis.delete(self.key_prefix + session.sid)
               response.delete_cookie(
                   app.config["SESSION_COOKIE_NAME"], domain=domain, path=path
               )
           return
       # 
       expiration_datetime = self.get_expiration_time(app, session)
       serialized_session_data = self.serializer.dumps(dict(session))
		# 放到redis中
       self.redis.set(
           name=self.key_prefix + session.sid,
           value=serialized_session_data,
           ex=total_seconds(app.permanent_session_lifetime),  # 过期时间
       )
       # 把session对应的随机字符串放到cookie中
       self.set_cookie_to_response(app, session, response, expiration_datetime)

4.flask-session补充

	- session的前缀如果不传,默认:config.setdefault('SESSION_KEY_PREFIX', 'session:')
    - session过期时间:通过配置,如果不写,会有默认
    	'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#这个配置文件控制
    -设置cookie时,如何设定关闭浏览器则cookie失效
    	permanent=False
        app.config['SESSION_PERMANENT'] = False

二、数据库连接池

1.flask中使用mysql

	'settings.py'
	SECRET_KEY = 'fdsjakluiz'
	DEBUG = True
	MYSQL_USER = 'root'
	MYSQL_HOST = '127.0.0.1'
	MYSQL_PORT = 3306
	MYSQL_PASSWORD = '1234'
	MYSQL_DATABASE = 'cnblogs'
	JSON_AS_ASCII = False

	'app.py'
	import pymysql.cursors
	from flask import Flask, jsonify
	app = Flask(__name__)
	app.config.from_pyfile('./settings.py')
	
	# pymysql操作mysql
	from pymysql import Connect
	conn = Connect(
	    user=app.config.get('MYSQL_USER'),
	    password=app.config.get('MYSQL_PASSWORD'),
	    host=app.config.get('MYSQL_HOST'),
	    database=app.config.get('MYSQL_DATABASE'),
	    port=app.config.get('MYSQL_PORT'),
	)
	# pymysql.cursors.DictCursor查出来的是列表套字典的形式
	cursor = conn.cursor(pymysql.cursors.DictCursor)
	# cursor = conn.cursor()
	# app.config['JSON_AS_ASCII'] = False  # 前端显示json格式中文
	@app.route('/')
	def articles():
	    cursor.execute('select id,title,author from article limit 10')
	    article_list = cursor.fetchall()  # 拿出所有
	    return jsonify(article_list)
	
	
	if __name__ == '__main__':
	    app.run()

这种方式conncursor如果是全局,出现如下问题:

	'上面的  conn和cursor 都是全局的'
	假设极端情况:同时并发两个用户
		-一个用户查询所有文章
	    -一个用户查询所有用户
	    
	'在线程中全局变量是共享的'
	就会出现,第一个线程拿着cursor执行了:
	cursor.execute('select id,title,author from article limit 10')
	然后第二个线程拿着 cursor 执行了:
	cursor.execute('select id,name from user limit 10')
		    
	第一个线程开始执行:(用的全是同一个cursor)
	article_list = cursor.fetchall()
	就会出现查询article的cursor取出来的数据是  用户相关数据---》出现数据错乱

2.上述问题解决

每个人:线程用自己的从conn和cursor,在视图函数中,拿到链接和cursor

	import pymysql.cursors
	from flask import Flask, jsonify
	app = Flask(__name__)
	app.config.from_pyfile('./settings.py')
	
	# pymysql操作mysql
	from pymysql import Connect
	
	@app.route('/')
	def articles():
	    conn = Connect(
	        user=app.config.get('MYSQL_USER'),
	        password=app.config.get('MYSQL_PASSWORD'),
	        host=app.config.get('MYSQL_HOST'),
	        database=app.config.get('MYSQL_DATABASE'),
	        port=app.config.get('MYSQL_PORT'),
	    )
	    cursor = conn.cursor(pymysql.cursors.DictCursor)
	    cursor.execute('select id,title,author from article limit 10')
	    article_list = cursor.fetchall()  # 拿出所有
	    return jsonify(article_list)
	
	
	@app.route('/desc')
	def desc():
	    conn = Connect(
	        user=app.config.get('MYSQL_USER'),
	        password=app.config.get('MYSQL_PASSWORD'),
	        host=app.config.get('MYSQL_HOST'),
	        database=app.config.get('MYSQL_DATABASE'),
	        port=app.config.get('MYSQL_PORT'),
	    )
	    cursor = conn.cursor(pymysql.cursors.DictCursor)
	    cursor.execute('select id,real_desc from article limit 10')
	    article_list = cursor.fetchall()  # 拿出所有
	    return jsonify(article_list)
	
	
	if __name__ == '__main__':
	    app.run()

但是这种如果并发量过高,就会出现连接数过多的问题,mysql的性能就降低了

使用数据库连接池

	'上述操作存在的问题'
	1.原生pymysql操作,最好有一个rom---->sqlalchemy
	2.并发问题:conn和cursor要做成单例,还是每个视图函数一个?
		-如果使用单例,数据会错乱
		-咱们需要,每个视图函数,哪一个链接,如果并发数过多,mysql链接数就会很多,所以使用连接池解决
	
	# django orm操作,一个请求,就会拿到一个mysql的链接,用完后就释放
	'所以想要彻底解决,得使用数据库连接池'
	-限定 mysql链接最多,无论多少线程操作,都是从池中取链接使用

	'解决上面的两个问题'
	-数据库连接池
	-创建一个全局的池
	-每次进入视图函数,从池中取一个连接使用,使用完放回到池中,只要控制池的大小,就能控制mysql连接数

1.第三方数据库连接池

	'pool.py' 在这个文件中配置池也也可以在视图函数中配置
	# 1 安装 pip install dbutils
	# 2 使用:实例化得到一个池对象---》池是单例
	from dbutils.pooled_db import PooledDB
	import pymysql
	
	POOL=PooledDB(
	    creator=pymysql,  # 使用链接数据库的模块
	    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
	    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
	    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
	    maxshared=3,
	    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
	    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
	    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
	    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
	    ping=0,
	    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
	    host='127.0.0.1',
	    port=3306,
	    user='root',
	    password='1234',
	    database='cnblogs',
	    charset='utf8'
	)
	
	'app.py'  视图函数
	from flask import Flask,jsonify
	app = Flask(__name__)
	app.config.from_pyfile('./settings.py')
	import time
	from pool import POOL
	from pymysql.cursors import DictCursor
	
	## 3 在视图函数中导入使用
	@app.route('/article')
	def article():
	    conn = POOL.connection()
	    cursor = conn.cursor(DictCursor)
	    # 获取10条文章
	    cursor.execute('select id,title,author from article limit 10')
	    time.sleep(1)
	    # 切换
	    res = cursor.fetchall()
	    print(res)
	    return jsonify({'code': 100, 'msg': '成功', 'result': res})
	
	
	if __name__ == '__main__':
	    app.run()

2.操作数据库不带池版

	from flask import Flask,jsonify
	app = Flask(__name__)
	app.config.from_pyfile('./settings.py')
	import time
	## 3 在视图函数中导入使用
	@app.route('/article')
	def article():
	    import pymysql
	    from pymysql.cursors import DictCursor
	    conn = pymysql.connect(user='root',
	                           password="1234",
	                           host='127.0.0.1',
	                           database='cnblogs',
	                           port=3306)
	    cursor = conn.cursor(DictCursor)
	    # 获取10条文章
	    cursor.execute('select id,title,author from article limit 10')
	    time.sleep(1)
	    # 切换
	    res = cursor.fetchall()
	    print(res)
	    return jsonify({'code': 100, 'msg': '成功', 'result': res})
	
	
	if __name__ == '__main__':
	    app.run(port=5001)

3.池版和非池版压测

	压测代码  jmeter工具---》java
	import requests
	from threading import Thread
	
	
	# 没有连接池
	def task():
	    # res = requests.get('http://127.0.0.1:5000/article')  # 带连接池版
	    res = requests.get('http://127.0.0.1:5001/article')  # 不带连接池版
	    print(res.json())
	
	
	if __name__ == '__main__':
	    l = []
	    for i in range(100):
	        t = Thread(target=task)
	        t.start()
	        l.append(t)
	
	    for i in l:
	        i.join()
	
	'''效果是:使用池的连接数明显小,不使用池连接数明显很大'''

查看数据库连接数:show status like '%Threads%';

在这里插入图片描述

三、wtforms

wtforms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证、渲染错误信息、渲染页面

app.py

from flask import Flask,render_template,request,redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app=Flask(__name__)
app.debug=True

class LoginForm(Form):
    # 字段(内部包含正则表达式)
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空.'),
            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(), # 页面上显示的插件
        render_kw={'class': 'form-control'}

    )
    # 字段(内部包含正则表达式)
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            validators.Length(min=8, message='用户名长度必须大于%(min)d'),
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )



@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'GET':
        form = LoginForm()
        return render_template('login.html', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('login.html', form=form)


if __name__ == '__main__':
    app.run()

login.html

	<!DOCTYPE html>
	<html lang="en">
	<head>
	    <meta charset="UTF-8">
	    <title>Title</title>
	</head>
	<body>
	<h1>登录</h1>
	<form method="post">
	    <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
	    <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
	    <input type="submit" value="提交">
	</form>
	</body>
	</html>

四、Flask定制命令

1.使用 flask-script定制命令(老版本,新版本不用了)

	flask 老版本中,没有命令运行项目,自定制命令
	flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令
	
	新版的flask--》官方支持定制命令  click 定制命令,这个模块就弃用了
	flask-migrate 老版本基于flask-script,新版本基于flask-click写的
	
	使用步骤
		-1 pip3 install  Flask-Script==2.0.3
	    -2 pip3 install flask==1.1.4
	    -3 pip3 install markupsafe=1.1.1
		-4 使用
	    from flask_script import Manager
	    manager = Manager(app)
	    if __name__ == '__main__':
	    	manager.run()
	    -5 自定制命令
	    @manager.command
	    def custom(arg):
	        """自定义命令
	        python manage.py custom 123
	        """
	        print(arg)
	        
	    - 6 执行自定制命令
	    python manage.py custom 123

2.新版本定制命令

	from flask import Flask
	import click
	app = Flask(__name__)
	
	自定制命令,通过create-user传入用户名就可以创建一个用户来
	@app.cli.command('create-user')
	@click.argument('name')
	def create_user(name):
	    # from pool import POOL
	    # conn = POOL.connection()
	    # cursor=conn.cursor()
	    # cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'hello123'])
	    # conn.commit()
	    print(name)
	
	
	命令行中执行
	-flask --app .\Flask定制命令.py:app create-user jack
	-简写成 前提条件式app所在的py文件名叫app.py
	-flask create-user jack
	
	
	@app.route('/')
	def index():
	    return 'index'
	
	
	-运行项目的命令:flask --app .\Flask定制命令.py:app run
	
	if __name__ == '__main__':
	    app.run()

3.Django中自定制命令

	1 app下新建文件夹
		management/commands/
	2 在该文件夹下新建py文件,随便命名(命令名)
	
	3 在py文件中写代码
		from django.core.management.base import BaseCommand
		class Command(BaseCommand):
		    help = '命令提示'
		    def handle(self, *args, **kwargs):
				命令逻辑  
	4 使用命令
		python manage.py  py文件(命令名)

五、Flask-Cache

具体使用可以自寻去官方查看:https://flask-caching.readthedocs.io/en/latest/

	from flask import Flask,render_template
	# 安装 pip install Flask-Caching
	from flask_caching import Cache
	
	config = {
	    "DEBUG": True,
	    "CACHE_TYPE": "SimpleCache",
	    "CACHE_DEFAULT_TIMEOUT": 300
	}
	app = Flask(__name__)
	app.config.from_mapping(config)
	cache = Cache(app)
	
	
	@app.route("/")
	@cache.cached(timeout=50)
	def index():
	    return render_template('index.html')
	
	
	@app.route('/set_cache')
	def set_cache():
	    cache.set('name','xxxx')
	    return 'set_cache'
	
	
	@app.route('/get_cache')
	def get_cache():
	    res = cache.get('name')
	    return res
	
	
	if __name__ == '__main__':
	    app.run()

六、信号

信号量:Semaphore信号量可以理解为多把锁,同时允许多个线程来更改数据
信号:Signal是Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

1.内置信号

内置信号,只在flask请求过程中,源码中定义的信号,这个无需我们定义和触发,只要写了函数跟它对应,执行到这里就会自动触发函数的执行

	安装信号:pip install blinker
	request_started = _signals.signal('request-started')                # 请求到来前执行
	request_finished = _signals.signal('request-finished')              # 请求结束后执行
	before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
	template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
	got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
	request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
	appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
	appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
	appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
	message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发

可用于模版渲染之前执行(before_render_template),可以记录日志

2.内置信号的使用步骤

'以模版渲染之前的信号函数为例'
from flask import Flask,render_template
app = Flask(__name__)

1.写一个函数
def before_render(*args,**kwargs):
    print(args)
    # args 第0个位置就是app对象
    print(kwargs)
    # kwargs 其他都在这里 哪个模板,上下文,请求对象,session
    print('记录日志,模版要渲染了')

2.跟内置信号绑定
from flask.signals import before_render_template
before_render_template.connect(before_render)
3.源码中触发信号执行(我们无需动),等待信号被触发执行
# before_render_template.send()  源码在模版渲染之前,它已经写了
'这样以后,只要有模版都会执行信号函数,不管是哪个函数,它会自己区分'

@app.route('/')
def index():
    return render_template('index.html')


if __name__ == '__main__':
    app.run(debug=True)

3.自定义信号

from flask import Flask,render_template
app = Flask(__name__)

1.自定义信号
from flask.signals import _signals
# 自定义一个数据库保存执行的信号
db_save = _signals.signal('db_save')


2.写一个函数
def db_save_fun(*args,**kwargs):
    print(args)
    # args 没有拿到数据
    print(kwargs)
    # kwargs 拿到其他数据,自定义的都是可以自己定制返回数据
    print('表数据插入了')

3.跟自定义信号绑定
db_save.connect(db_save_fun)

4.在真正的数据库保存的时候执行
import pymysql
from pymysql import Connect

@app.route('/create_article')
def create_article():
    conn = Connect(
        user='root',
        password='1234',
        host='127.0.0.1',
        database='cnblogs',
        port=3306,
    )
    cursor = conn.cursor()
    cursor.execute('INSERT INTO article (title,author) VALUES (%s,%s)',args=['测试标题1','测试作者1'])
    conn.commit()  # 提交
    '在这里需要手动触发信号执行,需要在代码中写,可以传参数,传入的参数,自定义信号绑定的函数可以拿到'
    db_save.send(table_name='article', info={'title', '测试标题', 'author', '测试作者'})  # 这里是自己定制写的
    conn.close()
    return '插入成功'


if __name__ == '__main__':
    app.run(debug=True)

4.Django中信号的使用

Django提供一种信号机制。其实就是观察者模式,又叫发布-订阅(Publish/Subscribe) 。当发生一些动作的时候,发出信号,然后监听了这个信号的函数就会执行。

通俗来讲,就是一些动作发生的时候,信号允许特定的发送者去提醒一些接受者。用于在框架执行操作时解耦。

	利用信号能干啥?
		1 某个位置记录日志
	    2 用户只要一注册---》发个注册邮件通知
	    3 用户只要一异地登录,发个邮件
    使用信号好处?
    	解耦(降低耦合度)

Django中内置信号

Model signals
	-pre_init                    # django的modal执行其构造方法前,自动触发
	-post_init                   # django的modal执行其构造方法后,自动触发
	-pre_save                    # django的modal对象保存前,自动触发
	-post_save                   # django的modal对象保存后,自动触发
	-pre_delete                  # django的modal对象删除前,自动触发
	-post_delete                 # django的modal对象删除后,自动触发
	-m2m_changed                 # django的modal中使用m2m字段操作第三张表(add,remove,clear)前后,自动触发
	-class_prepared              # 程序启动时,检测已注册的app中modal类,对于每一个类,自动触发
Management signals
	-pre_migrate                 # 执行migrate命令前,自动触发
	-post_migrate                # 执行migrate命令后,自动触发
Request/response signals
	-request_started             # 请求到来前,自动触发
	-request_finished            # 请求结束后,自动触发
	-got_request_exception       # 请求异常后,自动触发
Test signals
	-setting_changed             # 使用test测试修改配置文件时,自动触发
	-template_rendered           # 使用test测试渲染模板时,自动触发
Database Wrappers
	-connection_created          # 创建数据库连接时,自动触发

对于Django内置的信号,仅需注册指定信号,当程序执行相应操作时,自动触发注册函数:

使用Django内置信号方式

	from django.core.signals import request_finished
    from django.core.signals import request_started
    from django.core.signals import got_request_exception
    from django.db.models.signals import class_prepared
    from django.db.models.signals import pre_init, post_init
    from django.db.models.signals import pre_save, post_save
    from django.db.models.signals import pre_delete, post_delete
    from django.db.models.signals import m2m_changed
    from django.db.models.signals import pre_migrate, post_migrate
    from django.test.signals import setting_changed
    from django.test.signals import template_rendered
    from django.db.backends.signals import connection_created

使用案例

	'(只要用户一注册,我们就发邮件,伪代码)'
	'views.py'
	from .models import User
	def index(request):
	    User.objects.create(name='jack',password='123')
	    return HttpResponse('注册成功')
    
    '放到__init__.py里面'
    from django.db.models.signals import post_save  # 导入需要的信号
    import logging
	def callBack(sender, **kwargs):
	    print(sender)  # 拿到的是表
	    print(kwargs.get('instance').name)  # 这里就可以拿到对象
	    print('发短信了')
	
	pre_save.connect(callBack)

Django使用自定义信号

	a. 定义信号(一般创建一个py文件)(toppings,size 是接受的参数)
	import django.dispatch
	pizza_done = django.dispatch.Signal(providing_args=["toppings", "size"])
	
	b. 注册信号
	def callback(sender, **kwargs):
	    print("callback")
	    print(sender,kwargs)
	pizza_done.connect(callback)
	
	c. 触发信号
	from 路径 import pizza_done
	pizza_done.send(sender='seven',toppings=123, size=456)
	
	由于内置信号的触发者已经集成到Django中,所以其会自动调用,而对于自定义信号则需要开发者在任意位置触发。 
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐