pymysql封装总结
pymysql封装总结前言由于在小工具地开发中频繁使用到pymysql,所以一直想要对PyMySQL进行完整封装,而不是简单的对方法的重复使用。但信心满满地完成一次次封装后,总是并不满意。意识到当前水平不足后,尝试过寻找相关博客,和请教大佬,但一直没有好的方案。庆幸的是,在学习pymysql源码底层时,发现一篇非常优秀的文章,所以根据大佬的思路,重演了整个封装过程,并记录下这一篇笔记。链接pyth
pymysql封装总结
前言
由于在小工具地开发中频繁使用到pymysql,所以一直想要对PyMySQL进行完整封装,而不是简单的对方法的重复使用。但信心满满地完成一次次封装后,总是并不满意。意识到当前水平不足后,尝试过寻找相关博客,和请教大佬,但一直没有好的方案。
庆幸的是,在学习pymysql源码底层时,发现一篇非常优秀的文章,所以根据大佬的思路,重演了整个封装过程,并记录下这一篇笔记。
链接
python操作mysql之只看这篇就够了 - 简书 (jianshu.com)
关于PooledDB使用autocommit的方法_月之影·影之海的技术博客_51CTO博客_autocommit 如何设置
准备
创建数据库
在mysql上创建一个people表
CREATE TABLE `people` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(20) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
mysql配置
由于连接mysql的配置需要重复使用,所以创建一个文件mysql_config.py,用于存储mysql相关配置
host = ""
port = 3306
user = ""
password = ""
db = "test"
在封装类直接引用即可
from mysql_config import host,port,user,password,db
依赖
封装类使用到了两个依赖库,分别是pymysql和DBUtils,其中pymysql是python操作mysql的常用库,另一个DBUtils是数据库连接池,可以通过以下命令进行安装
pip install pymysql
pip install DBUtils
使用
简单使用
首先对pymysql的简单使用,测试是否可以正常使用;这也是平时常常的使用方式。
代码
#! /usr/bin/python
# -*- coding: UTF-8 -*-
# 引入pymysql依赖
import pymysql
# mysql相关配置
from config import host,port,user,password,db
# 连接数据库
def get_connection():
mysql_connect = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
return mysql_connect
# 使用连接对mysql进行操作
def get_people_count():
mysql_connect = get_connection()
cursor = mysql_connect.cursor(pymysql.cursors.DictCursor)
cursor.execute("select count(id) as total from people")
data = cursor.fetchone()
print("查询结果为:",data)
cursor.close()
mysql_connect.close()
if __name__ == '__main__':
get_people_count()
优缺点
这样使用的优点是非常简单,缺点是需要对游标和连接进行关闭。单个使用还好,但如果如果需要多个操作方法,则需要多个关闭。
使用with对代码进行优化
with介绍
python的with代码块的处理非常友好,以前常常使用with进行文件操作,如下:
def read_content_list(file_path, sep = '\t'):
'''read content to list by file,and set the separator,default is \t
Parameters:
file_path - str, file path
sep - str, separator, default \t
Returns:
None
'''
res = []
with open(file_path,"r",encoding="utf-8") as file:
for line in file:
line = line.replace("\n","")
res.append(line.split(sep))
return res
但其实也可以使用with完成pymysql的封装,如下:
基础代码
#! /usr/bin/python
# -*- coding: UTF-8 -*-
# 引入pymysql依赖
import pymysql
# mysql相关配置
from config import host,port,user,password,db
def get_connection(host=host,port=port,user=user,password=password,db=db):
mysql_connect = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
return mysql_connect
class DB_MySQL():
def __init__(self,host=host,port=port,user=user,password=password,db=db) -> None:
self.__host = host
self.__port = port
self.__user = user
self.__password = password
self.__db = db
def __enter__(self):
connect = get_connection(host=self.__host, port=self.__port, db=self.__db, user=self.__user, password=self.__password)
cursor = connect.cursor(pymysql.cursors.DictCursor)
connect.autocommit = False
self._connect = connect
self._cursor = cursor
return self
def __exit__(self, *exc_info):
self._connect.commit()
self._cursor.close()
self._connect.close()
@property
def cursor(self):
return self._cursor
增删改查
使用上面的封装类,进行增删改查操作
#! /usr/bin/python
# -*- coding: UTF-8 -*-
from db_mysql import DB_MySQL
def get_people_count(cursor):
cursor.execute("select count(id) as total from people")
data = cursor.fetchone()
return data
def insert_people(cursor,name,age):
sql = "insert into people(name,age) values('{}',{})".format(name,age)
res = cursor.execute(sql)
return res
def delete_people_by_name(cursor,name):
sql = "delete from people where name = '{}'".format(name)
res = cursor.execute(sql)
return res
def update_people_by_name(cursor,old_name,new_name):
sql = "update people set name = '{}' where name = '{}'".format(new_name,old_name)
res = cursor.execute(sql)
return res
def select_all(cursor):
sql = "select * from people"
cursor.execute(sql)
res = cursor.fetchall()
return res
if __name__ == "__main__":
with DB_MySQL() as db:
res = insert_people(db.cursor,"zhangsan",21)
res = insert_people(db.cursor,"zhj1121",21)
print("成功插入条数:",res)
res = get_people_count(db.cursor)
print("当前记录数:",res)
res = delete_people_by_name(db.cursor,"zhj1121")
print("成功删除条数:",res)
res = update_people_by_name(db.cursor,"zhangsan","zhj1121")
print("成功修改记录条数:",res)
res = select_all(db.cursor)
print("获取的结果为:",res)
再次封装
可以将上面的基础代码和使用操作进行封装,如下:
#! /usr/bin/python
# -*- coding: UTF-8 -*-
# 引入pymysql依赖
import pymysql
# mysql相关配置
from config import host,port,user,password,db
def get_connection(host=host,port=port,user=user,password=password,db=db):
mysql_connect = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
return mysql_connect
class DB_MySQL():
def __init__(self,host=host,port=port,user=user,password=password,db=db) -> None:
self.__host = host
self.__port = port
self.__user = user
self.__password = password
self.__db = db
def __enter__(self):
connect = get_connection(host=self.__host, port=self.__port, db=self.__db, user=self.__user, password=self.__password)
cursor = connect.cursor(pymysql.cursors.DictCursor)
connect.autocommit = False
self._connect = connect
self._cursor = cursor
return self
def __exit__(self, *exc_info):
self._connect.commit()
self._cursor.close()
self._connect.close()
def get_people_count(self):
self.cursor.execute("select count(id) as total from people")
data = self.cursor.fetchone()
return data
def insert_people(self,name,age):
sql = "insert into people(name,age) values('{}',{})".format(name,age)
res = self.cursor.execute(sql)
return res
def delete_people_by_name(self,name):
sql = "delete from people where name = '{}'".format(name)
res = self.cursor.execute(sql)
return res
def update_people_by_name(self,old_name,new_name):
sql = "update people set name = '{}' where name = '{}'".format(new_name,old_name)
res = self.cursor.execute(sql)
return res
def select_all(self):
sql = "select * from people"
self.cursor.execute(sql)
res = self.cursor.fetchall()
return res
@property
def cursor(self):
return self._cursor
if __name__ == "__main__":
with DB_MySQL() as db:
res = db.insert_people("zhangsan",21)
res = db.insert_people("zhj1121",21)
print("成功插入条数:",res)
res = db.get_people_count()
print("当前记录数:",res)
res = db.delete_people_by_name("zhj1121")
print("成功删除条数:",res)
res = db.update_people_by_name("zhangsan","zhj1121")
print("成功修改记录条数:",res)
res = db.select_all()
print("获取的结果为:",res)
PyMySQL与连接池
连接池对编码操作数据库的作用不言而喻,可以使用DBUtils实现连接池。如下:
#! /usr/bin/python
# -*- coding: UTF-8 -*-
from mysql_config import host, port, user, password, db
import pymysql
from dbutils.pooled_db import PooledDB
class DB_MySQL_Pool():
__pool = None
__MAX_CONNECTIONS = 100 # 创建连接池的最大数量
__MIN_CACHED = 10 # 连接池中空闲连接的初始数量
__MAX_CACHED = 20 # 连接池中空闲连接的最大数量
__MAX_SHARED = 10 # 共享连接的最大数量
__BLOCK = True # 超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
__MAX_USAGE = 100 # 单个连接的最大重复使用次数
__CHARSET = 'UTF8'
'''
setsession: optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
reset: how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
'''
__RESET = True
__SET_SESSION = ['SET AUTOCOMMIT = 1'] # 设置自动提交
def __init__(self, host, port, user, password, database):
if not self.__pool:
self.__class__.__pool = PooledDB(creator=pymysql, host=host, port=port, user=user, password=password, database=database,
maxconnections=self.__MAX_CONNECTIONS,
mincached=self.__MIN_CACHED,
maxcached=self.__MAX_CACHED,
maxshared=self.__MAX_SHARED,
blocking=self.__BLOCK,
maxusage=self.__MAX_USAGE,
setsession=self.__SET_SESSION,
reset=self.__RESET,
charset=self.__CHARSET)
def get_connect(self):
return self.__pool.connection()
class DB_MySQL():
def __init__(self, host=host, port=port, user=user, password=password, db=db) -> None:
self.__host = host
self.__port = port
self.__user = user
self.__password = password
self.__database = db
self.connects_pool = DB_MySQL_Pool(
host=self.__host, port=self.__port, user=self.__user, password=self.__password, database=self.__database)
def __enter__(self):
connect = self.connects_pool.get_connect()
cursor = connect.cursor(pymysql.cursors.DictCursor)
# https://blog.51cto.com/abyss/1736844
# connect.autocommit = False # 如果使用连接池 则不能在取出后设置 而应该在创建线程池时设置
self._connect = connect
self._cursor = cursor
return self
def __exit__(self, *exc_info):
self._connect.commit()
self._cursor.close()
self._connect.close()
def get_people_count(self):
self.cursor.execute("select count(id) as total from people")
data = self.cursor.fetchone()
return data
def insert_people(self, name, age):
sql = "insert into people(name,age) values('{}',{})".format(name, age)
res = self.cursor.execute(sql)
return res
def delete_people_by_name(self, name):
sql = "delete from people where name = '{}'".format(name)
res = self.cursor.execute(sql)
return res
def update_people_by_name(self, old_name, new_name):
sql = "update people set name = '{}' where name = '{}'".format(
new_name, old_name)
res = self.cursor.execute(sql)
return res
def select_all(self):
sql = "select * from people"
self.cursor.execute(sql)
res = self.cursor.fetchall()
return res
@property
def cursor(self):
return self._cursor
if __name__ == "__main__":
with DB_MySQL() as db:
res = db.insert_people("zhangsan", 21)
print("成功插入条数:", res)
res = db.insert_people("zhj1121", 21)
print("成功插入条数:", res)
res = db.get_people_count()
print("当前记录数:", res)
res = db.delete_people_by_name("zhj1121")
print("成功删除条数:", res)
res = db.update_people_by_name("zhangsan", "zhj1121")
print("成功修改记录条数:", res)
res = db.select_all()
print("获取的结果为:", res)
总结
这一篇笔记是站在巨人的肩膀上完成的,非常感谢大佬小肥爬爬 的文章python操作mysql之只看这篇就够了 - 简书 (jianshu.com)。
在以后的使用中,也会在这篇的基础上进行补充与完善。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)