pymysql封装总结

前言

由于在小工具地开发中频繁使用到pymysql,所以一直想要对PyMySQL进行完整封装,而不是简单的对方法的重复使用。但信心满满地完成一次次封装后,总是并不满意。意识到当前水平不足后,尝试过寻找相关博客,和请教大佬,但一直没有好的方案。

庆幸的是,在学习pymysql源码底层时,发现一篇非常优秀的文章,所以根据大佬的思路,重演了整个封装过程,并记录下这一篇笔记。

链接

python操作mysql之只看这篇就够了 - 简书 (jianshu.com)

关于PooledDB使用autocommit的方法_月之影·影之海的技术博客_51CTO博客_autocommit 如何设置

准备

创建数据库

在mysql上创建一个people表

CREATE TABLE `people` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(20) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

mysql配置

由于连接mysql的配置需要重复使用,所以创建一个文件mysql_config.py,用于存储mysql相关配置

host = ""
port = 3306
user = ""
password = ""
db = "test"

在封装类直接引用即可

from mysql_config import host,port,user,password,db

依赖

封装类使用到了两个依赖库,分别是pymysql和DBUtils,其中pymysql是python操作mysql的常用库,另一个DBUtils是数据库连接池,可以通过以下命令进行安装

pip install pymysql
pip install DBUtils

使用

简单使用

首先对pymysql的简单使用,测试是否可以正常使用;这也是平时常常的使用方式。

代码
#! /usr/bin/python
# -*- coding: UTF-8 -*-
# 引入pymysql依赖
import pymysql
# mysql相关配置
from config import host,port,user,password,db
# 连接数据库
def get_connection():
    mysql_connect = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
    return mysql_connect
# 使用连接对mysql进行操作
def get_people_count():
    mysql_connect = get_connection()
    cursor = mysql_connect.cursor(pymysql.cursors.DictCursor)
    cursor.execute("select count(id) as total from people")
    data = cursor.fetchone()
    print("查询结果为:",data)
    cursor.close()
    mysql_connect.close()
if __name__ == '__main__':
    get_people_count()
优缺点

这样使用的优点是非常简单,缺点是需要对游标和连接进行关闭。单个使用还好,但如果如果需要多个操作方法,则需要多个关闭。

使用with对代码进行优化

with介绍

python的with代码块的处理非常友好,以前常常使用with进行文件操作,如下:

def read_content_list(file_path, sep = '\t'):
    '''read content to list by file,and set the separator,default is \t

    Parameters:
        file_path - str, file path
        sep - str, separator, default \t

    Returns:
        None
    '''
    res = []
    with open(file_path,"r",encoding="utf-8") as file:
        for line in  file:
            line = line.replace("\n","")
            res.append(line.split(sep))
    return res

但其实也可以使用with完成pymysql的封装,如下:

基础代码
#! /usr/bin/python
# -*- coding: UTF-8 -*-
# 引入pymysql依赖
import pymysql
# mysql相关配置
from config import host,port,user,password,db

def get_connection(host=host,port=port,user=user,password=password,db=db):
    mysql_connect = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
    return mysql_connect

class DB_MySQL():
    def __init__(self,host=host,port=port,user=user,password=password,db=db) -> None:
        self.__host = host
        self.__port = port
        self.__user = user
        self.__password = password
        self.__db = db
    def __enter__(self):
        connect = get_connection(host=self.__host, port=self.__port, db=self.__db, user=self.__user, password=self.__password)
        cursor = connect.cursor(pymysql.cursors.DictCursor)
        connect.autocommit = False

        self._connect = connect
        self._cursor = cursor
        return self
    def __exit__(self, *exc_info):
        self._connect.commit()
        self._cursor.close()
        self._connect.close()
    @property
    def cursor(self):
        return self._cursor
增删改查

使用上面的封装类,进行增删改查操作

#! /usr/bin/python
# -*- coding: UTF-8 -*-
from db_mysql import DB_MySQL

def get_people_count(cursor):
    cursor.execute("select count(id) as total from people")
    data = cursor.fetchone()
    return data
def insert_people(cursor,name,age):
    sql = "insert into people(name,age) values('{}',{})".format(name,age)
    res = cursor.execute(sql)
    return res
def delete_people_by_name(cursor,name):
    sql = "delete from people where name = '{}'".format(name)
    res = cursor.execute(sql)
    return res
def update_people_by_name(cursor,old_name,new_name):
    sql = "update people set name = '{}' where name = '{}'".format(new_name,old_name)
    res = cursor.execute(sql)
    return res
def select_all(cursor):
    sql = "select * from people"
    cursor.execute(sql)
    res = cursor.fetchall()
    return res
if __name__ == "__main__":
    with DB_MySQL() as db:
        res = insert_people(db.cursor,"zhangsan",21)
        res = insert_people(db.cursor,"zhj1121",21)
        print("成功插入条数:",res)
        res = get_people_count(db.cursor)
        print("当前记录数:",res)
        res = delete_people_by_name(db.cursor,"zhj1121")
        print("成功删除条数:",res)
        res = update_people_by_name(db.cursor,"zhangsan","zhj1121")
        print("成功修改记录条数:",res)
        res = select_all(db.cursor)
        print("获取的结果为:",res)

再次封装

可以将上面的基础代码和使用操作进行封装,如下:

#! /usr/bin/python
# -*- coding: UTF-8 -*-
# 引入pymysql依赖
import pymysql
# mysql相关配置
from config import host,port,user,password,db

def get_connection(host=host,port=port,user=user,password=password,db=db):
    mysql_connect = pymysql.connect(host=host, port=port, db=db, user=user, password=password)
    return mysql_connect

class DB_MySQL():
    def __init__(self,host=host,port=port,user=user,password=password,db=db) -> None:
        self.__host = host
        self.__port = port
        self.__user = user
        self.__password = password
        self.__db = db
    def __enter__(self):
        connect = get_connection(host=self.__host, port=self.__port, db=self.__db, user=self.__user, password=self.__password)
        cursor = connect.cursor(pymysql.cursors.DictCursor)
        connect.autocommit = False
        self._connect = connect
        self._cursor = cursor
        return self
    def __exit__(self, *exc_info):
        self._connect.commit()
        self._cursor.close()
        self._connect.close()
    def get_people_count(self):
        self.cursor.execute("select count(id) as total from people")
        data = self.cursor.fetchone()
        return data
    def insert_people(self,name,age):
        sql = "insert into people(name,age) values('{}',{})".format(name,age)
        res = self.cursor.execute(sql)
        return res
    def delete_people_by_name(self,name):
        sql = "delete from people where name = '{}'".format(name)
        res = self.cursor.execute(sql)
        return res
    def update_people_by_name(self,old_name,new_name):
        sql = "update people set name = '{}' where name = '{}'".format(new_name,old_name)
        res = self.cursor.execute(sql)
        return res
    def select_all(self):
        sql = "select * from people"
        self.cursor.execute(sql)
        res = self.cursor.fetchall()
        return res
    @property
    def cursor(self):
        return self._cursor

if __name__ == "__main__":
    with DB_MySQL() as db:
        res = db.insert_people("zhangsan",21)
        res = db.insert_people("zhj1121",21)
        print("成功插入条数:",res)
        res = db.get_people_count()
        print("当前记录数:",res)
        res = db.delete_people_by_name("zhj1121")
        print("成功删除条数:",res)
        res = db.update_people_by_name("zhangsan","zhj1121")
        print("成功修改记录条数:",res)
        res = db.select_all()
        print("获取的结果为:",res)

PyMySQL与连接池

连接池对编码操作数据库的作用不言而喻,可以使用DBUtils实现连接池。如下:

#! /usr/bin/python
# -*- coding: UTF-8 -*-
from mysql_config import host, port, user, password, db
import pymysql
from dbutils.pooled_db import PooledDB

class DB_MySQL_Pool():
    __pool = None
    __MAX_CONNECTIONS = 100  # 创建连接池的最大数量
    __MIN_CACHED = 10  # 连接池中空闲连接的初始数量
    __MAX_CACHED = 20  # 连接池中空闲连接的最大数量
    __MAX_SHARED = 10  # 共享连接的最大数量
    __BLOCK = True  # 超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
    __MAX_USAGE = 100  # 单个连接的最大重复使用次数
    __CHARSET = 'UTF8'
    '''
        setsession: optional list of SQL commands that may serve to prepare
            the session, e.g. ["set datestyle to ...", "set time zone ..."]
        reset: how connections should be reset when returned to the pool
            (False or None to rollback transcations started with begin(),
            True to always issue a rollback for safety's sake)
    '''
    __RESET = True
    __SET_SESSION = ['SET AUTOCOMMIT = 1']  # 设置自动提交

    def __init__(self, host, port, user, password, database):
        if not self.__pool:
            self.__class__.__pool = PooledDB(creator=pymysql, host=host, port=port, user=user, password=password, database=database,
                                             maxconnections=self.__MAX_CONNECTIONS,
                                             mincached=self.__MIN_CACHED,
                                             maxcached=self.__MAX_CACHED,
                                             maxshared=self.__MAX_SHARED,
                                             blocking=self.__BLOCK,
                                             maxusage=self.__MAX_USAGE,
                                             setsession=self.__SET_SESSION,
                                             reset=self.__RESET,
                                             charset=self.__CHARSET)

    def get_connect(self):
        return self.__pool.connection()


class DB_MySQL():
    def __init__(self, host=host, port=port, user=user, password=password, db=db) -> None:
        self.__host = host
        self.__port = port
        self.__user = user
        self.__password = password
        self.__database = db
        self.connects_pool = DB_MySQL_Pool(
            host=self.__host, port=self.__port, user=self.__user, password=self.__password, database=self.__database)

    def __enter__(self):
        connect = self.connects_pool.get_connect()
        cursor = connect.cursor(pymysql.cursors.DictCursor)
        # https://blog.51cto.com/abyss/1736844
        # connect.autocommit = False # 如果使用连接池 则不能在取出后设置 而应该在创建线程池时设置
        self._connect = connect
        self._cursor = cursor
        return self

    def __exit__(self, *exc_info):
        self._connect.commit()
        self._cursor.close()
        self._connect.close()

    def get_people_count(self):
        self.cursor.execute("select count(id) as total from people")
        data = self.cursor.fetchone()
        return data

    def insert_people(self, name, age):
        sql = "insert into people(name,age) values('{}',{})".format(name, age)
        res = self.cursor.execute(sql)
        return res

    def delete_people_by_name(self, name):
        sql = "delete from people where name = '{}'".format(name)
        res = self.cursor.execute(sql)
        return res

    def update_people_by_name(self, old_name, new_name):
        sql = "update people set name = '{}' where name = '{}'".format(
            new_name, old_name)
        res = self.cursor.execute(sql)
        return res

    def select_all(self):
        sql = "select * from people"
        self.cursor.execute(sql)
        res = self.cursor.fetchall()
        return res

    @property
    def cursor(self):
        return self._cursor


if __name__ == "__main__":
    with DB_MySQL() as db:
        res = db.insert_people("zhangsan", 21)
        print("成功插入条数:", res)
        res = db.insert_people("zhj1121", 21)
        print("成功插入条数:", res)
        res = db.get_people_count()
        print("当前记录数:", res)
        res = db.delete_people_by_name("zhj1121")
        print("成功删除条数:", res)
        res = db.update_people_by_name("zhangsan", "zhj1121")
        print("成功修改记录条数:", res)
        res = db.select_all()
        print("获取的结果为:", res)

总结

这一篇笔记是站在巨人的肩膀上完成的,非常感谢大佬小肥爬爬 的文章python操作mysql之只看这篇就够了 - 简书 (jianshu.com)

在以后的使用中,也会在这篇的基础上进行补充与完善。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐