Python数据库操作【三】—— SQLServer
Python数据库操作之Microsoft SQL Server
SQLServer简介
以下摘自百度百科:
SQL Server是由Microsoft开发和推广的关系数据库管理系统(DBMS),它最初是由Microsoft、Sybase和Ashton-Tate三家公司共同开发的,并于1988年推出了第一个OS/2版本。Microsoft SQL Server近年来不断更新版本,1996年,Microsoft 推出了SQL Server 6.5版本;1998年,SQL Server 7.0版本和用户见面;SQL Server 2000是Microsoft公司于2000年推出,目前最新版本是2019年份推出的SQL SERVER 2019。
SQL Server是微软推出的重量级的数据库,目前有多个版本,如2000、2008、2012等,这些版本名字均为该版本推出的年份,每个版本的差异并不是特别大。本文使用SQL Server2016。
传送门:
本文代码已上传至GitHub,项目地址如下:
https://github.com/XMNHCAS/SQLServerPythonDemo
安装pymssql
python内未集成SQLServer的操作库,需要安装pymssql库。
pip3 install pymssql
安装完成后使用import pymssql导入。
pymssql有官方文档,里面有更详细的使用示例,链接地址如下:
增删改查(CRUD)
数据库远程访问配置
使用pymssql访问SQLServer,需要启用SQLServer的TCP/IP协议。
如下图所示,打开SQLServer配置管理器,选择“SQL Server网络配置”中对应的需要使用的实例,在右侧窗口中启用TCP/IP协议。
接着重启SQL Server服务,就可以通过IP的形式访问数据库了。
服务重启完成后,使用Navicat测试,可以看到连接成功。
新建数据库
新建CreateDatabase.py,使用pymssql创建TestDB数据库。
db.cursor()创建的游标对象,默认返回的是元组,通过设置as_dict=True可以将返回的数据改成字典类型。
如果最后再通过db.commit()来提交事务操作,可能会出现创建数据库失败的情况,故此处设置为autocommit,在游标执行sql的时候就提交操作。
import pymssql
# 打开数据库连接
db = pymssql.connect(server='localhost', user='test', password='123456')
# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)
# 设置立即操作
db.autocommit(True)
# 查看现有数据库
sql = "SELECT * FROM SYSDATABASES"
cursor.execute(sql)
print("现有数据库:")
while True:
msg = cursor.fetchone()
if msg is None:
break
print(msg['name'])
# 创建数据库
sql = '''
IF NOT EXISTS(SELECT * FROM SYSDATABASES WHERE NAME = 'TestDB')
CREATE DATABASE TestDB;
'''
cursor.execute(sql)
# 查看现有数据库
sql = "SELECT * FROM SYSDATABASES"
cursor.execute(sql)
print("创建后的数据库:")
while True:
msg = cursor.fetchone()
if msg is None:
break
print(msg['name'])
# 关闭游标与数据库连接
cursor.close()
db.close()
运行结果如下:
新建数据表
新建CreateTable.py,使用pymssql在TestDB数据库中新建一个Persons表。
import pymssql
# 打开数据库连接
db = pymssql.connect(server='localhost',
user='test',
password='123456',
database='TestDB')
# 创建一个游标对象
cursor = db.cursor()
# 查看现有表
sql = "SELECT NAME FROM TESTDB.SYS.TABLES"
msg = cursor.execute(sql)
print(cursor.fetchall())
# 创建Person表
sql = """
USE TestDB
CREATE TABLE [Persons](
ID INT PRIMARY KEY IDENTITY(1, 1),
Name VARCHAR(50),
Age INT,
)
"""
cursor.execute(sql)
# 查看现有表
sql = "SELECT NAME FROM TESTDB.SYS.TABLES"
cursor.execute(sql)
print(cursor.fetchall())
# 提交事务
db.commit()
# 关闭游标与数据库连接
cursor.close()
db.close()
运行结果如下:
新增记录(Create)
使用游标执行插入语句,不同于pymysql,pymssql不会返回游标执行后影响的行数。
而且由于编码问题,pymssql查询出来的中文数据,如果不进行转码则会输出乱码,所以后面均采用循环输出数据,同时将中文数据改成gbk的编码。该乱码问题可以通过设置数据库字段的编码格式的方式来解决,但是会稍微影响数据库存储性能,故此处不进行演示。
此方式支持单条数据插入和批量插入。
import pymssql
# 打开数据库连接
db = pymssql.connect(server='localhost',
user='test',
password='123456',
database='TestDB')
# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)
# 插入单条数据
cursor.execute("INSERT INTO Persons(Name, Age) VALUES ('张三',18 )")
print("插入单条数据后的数据")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 批量插入数据
sql = "INSERT INTO Persons(Name, Age) VALUES (%s,%s)"
cursor.executemany(sql, [('李四', 20), ('王五', 10)])
print("批量插入后的数据")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 提交事务
db.commit()
# 关闭游标与数据库连接
cursor.close()
db.close()
运行结果如下:
检索记录(Retrieve)
执行查询语句后,可以使用游标的fetchone方法获取单条数据,也可以使用fetchmany获取指定数量的数据,还可以用fetchall直接获取全部数据。为了结果显示得更加清晰,此处全部使用循环进行数据输出。
import pymssql
# 打开数据库连接
db = pymssql.connect(server='localhost',
user='test',
password='123456',
database='TestDB')
# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)
# 获取全部记录
cursor.execute("SELECT * FROM Persons")
allData = cursor.fetchall()
print("直接获取全部记录:")
for item in allData:
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 获取前N条记录
cursor.execute("SELECT * FROM Persons")
manyData = cursor.fetchmany(2)
print("获取部分结果:")
for item in manyData:
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 一次读取一条结果,循环获取所有记录
cursor.execute("SELECT * FROM Persons")
print("一次读取一条结果,循环获取所有记录:")
while True:
singleData = cursor.fetchone()
if singleData is None:
break
singleData["Name"] = singleData["Name"].encode("latin1").decode("gbk")
print(singleData)
# 关闭游标
cursor.close()
# 提交事务
db.commit()
# 关闭数据库连接
db.close()
运行结果如下:
更新记录(Update)
使用游标执行update语句。此方法支持单条数据修改及批量修改。
import pymssql
# 打开数据库连接
db = pymssql.connect(server='localhost',
user='test',
password='123456',
database='TestDB')
# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)
cursor.execute("SELECT * FROM Persons")
print("修改前的数据:")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 执行单条数据修改
cursor.execute("UPDATE Persons SET Age = 20 WHERE Name = '张三'")
print("修改单条数据后的数据:")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 执行批量修改
sql = "UPDATE Persons SET Age = %s WHERE Name = %s"
batchUpdate = cursor.executemany(sql, [(25, '李四'), (35, '王五')])
print("批量修改后的数据:")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 关闭游标
cursor.close()
# 提交事务
db.commit()
# 关闭数据库连接
db.close()
运行结果如下:
删除数据(Delete)
使用游标执行delete语句。此方法也支持单条数据删除以及批量删除。
import pymssql
# 打开数据库连接
db = pymssql.connect(server='localhost',
user='test',
password='123456',
database='TestDB')
# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)
print("删除前的数据:")
cursor.execute("SELECT * FROM Persons")
data = cursor.fetchall()
if len(data) == 0:
print(data)
else:
for item in data:
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 执行单条数据删除
cursor.execute("DELETE FROM Persons WHERE ID = 1")
print("删除单条数据后的数据:")
cursor.execute("SELECT * FROM Persons")
data = cursor.fetchall()
if len(data) == 0:
print(data)
else:
for item in data:
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 执行批量删除
sql = "DELETE FROM Persons WHERE ID = %s"
cursor.executemany(sql, [('2'), ('3')])
print("批量删除后的的数据:")
cursor.execute("SELECT * FROM Persons")
data = cursor.fetchall()
if len(data) == 0:
print(data)
else:
for item in data:
item["Name"] = item["Name"].encode("latin1").decode("gbk")
print(item)
# 关闭游标
cursor.close()
# 提交事务
db.commit()
# 关闭数据库连接
db.close()
运行结果如下:
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)