SQLServer简介

以下摘自百度百科

SQL Server是由Microsoft开发和推广的关系数据库管理系统(DBMS),它最初是由Microsoft、Sybase和Ashton-Tate三家公司共同开发的,并于1988年推出了第一个OS/2版本。Microsoft SQL Server近年来不断更新版本,1996年,Microsoft 推出了SQL Server 6.5版本;1998年,SQL Server 7.0版本和用户见面;SQL Server 2000是Microsoft公司于2000年推出,目前最新版本是2019年份推出的SQL SERVER 2019。

SQL Server是微软推出的重量级的数据库,目前有多个版本,如2000、2008、2012等,这些版本名字均为该版本推出的年份,每个版本的差异并不是特别大。本文使用SQL Server2016。

传送门:

Python数据库操作【一】—— Sqlite

Python数据库操作【二】—— MySQL 

本文代码已上传至GitHub,项目地址如下:

https://github.com/XMNHCAS/SQLServerPythonDemo


安装pymssql

python内未集成SQLServer的操作库,需要安装pymssql库。

pip3 install pymssql

安装完成后使用import pymssql导入。

pymssql有官方文档,里面有更详细的使用示例,链接地址如下:

GitHub

pymssql文档


增删改查(CRUD)

数据库远程访问配置

使用pymssql访问SQLServer,需要启用SQLServer的TCP/IP协议。

如下图所示,打开SQLServer配置管理器,选择“SQL Server网络配置”中对应的需要使用的实例,在右侧窗口中启用TCP/IP协议。

接着重启SQL Server服务,就可以通过IP的形式访问数据库了。

 服务重启完成后,使用Navicat测试,可以看到连接成功。

​ 

新建数据库

新建CreateDatabase.py,使用pymssql创建TestDB数据库。

db.cursor()创建的游标对象,默认返回的是元组,通过设置as_dict=True可以将返回的数据改成字典类型。

如果最后再通过db.commit()来提交事务操作,可能会出现创建数据库失败的情况,故此处设置为autocommit,在游标执行sql的时候就提交操作。

import pymssql

# 打开数据库连接
db = pymssql.connect(server='localhost', user='test', password='123456')

# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)

# 设置立即操作
db.autocommit(True)

# 查看现有数据库
sql = "SELECT * FROM SYSDATABASES"
cursor.execute(sql)
print("现有数据库:")
while True:
    msg = cursor.fetchone()
    if msg is None:
        break
    print(msg['name'])

# 创建数据库
sql = '''
IF NOT EXISTS(SELECT * FROM SYSDATABASES WHERE NAME = 'TestDB')
CREATE DATABASE TestDB;
'''
cursor.execute(sql)

# 查看现有数据库
sql = "SELECT * FROM SYSDATABASES"
cursor.execute(sql)
print("创建后的数据库:")
while True:
    msg = cursor.fetchone()
    if msg is None:
        break
    print(msg['name'])

# 关闭游标与数据库连接
cursor.close()
db.close()

运行结果如下:

新建数据表 

新建CreateTable.py,使用pymssql在TestDB数据库中新建一个Persons表。

import pymssql

# 打开数据库连接
db = pymssql.connect(server='localhost',
                     user='test',
                     password='123456',
                     database='TestDB')

# 创建一个游标对象
cursor = db.cursor()

# 查看现有表
sql = "SELECT NAME FROM TESTDB.SYS.TABLES"
msg = cursor.execute(sql)
print(cursor.fetchall())

# 创建Person表
sql = """
USE TestDB
CREATE TABLE [Persons](
ID INT PRIMARY KEY IDENTITY(1, 1),
Name VARCHAR(50),
Age INT,
)
"""
cursor.execute(sql)

# 查看现有表
sql = "SELECT NAME FROM TESTDB.SYS.TABLES"
cursor.execute(sql)
print(cursor.fetchall())

# 提交事务
db.commit()

# 关闭游标与数据库连接
cursor.close()
db.close()

运行结果如下:

新增记录(Create)

使用游标执行插入语句,不同于pymysql,pymssql不会返回游标执行后影响的行数。

而且由于编码问题,pymssql查询出来的中文数据,如果不进行转码则会输出乱码,所以后面均采用循环输出数据,同时将中文数据改成gbk的编码。该乱码问题可以通过设置数据库字段的编码格式的方式来解决,但是会稍微影响数据库存储性能,故此处不进行演示。

此方式支持单条数据插入和批量插入。

import pymssql

# 打开数据库连接
db = pymssql.connect(server='localhost',
                     user='test',
                     password='123456',
                     database='TestDB')

# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)

# 插入单条数据
cursor.execute("INSERT INTO Persons(Name, Age) VALUES ('张三',18 )")

print("插入单条数据后的数据")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
    item["Name"] = item["Name"].encode("latin1").decode("gbk")
    print(item)


# 批量插入数据
sql = "INSERT INTO Persons(Name, Age) VALUES (%s,%s)"
cursor.executemany(sql, [('李四', 20), ('王五', 10)])

print("批量插入后的数据")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
    item["Name"] = item["Name"].encode("latin1").decode("gbk")
    print(item)

# 提交事务
db.commit()

# 关闭游标与数据库连接
cursor.close()
db.close()

运行结果如下:

检索记录(Retrieve)

执行查询语句后,可以使用游标的fetchone方法获取单条数据,也可以使用fetchmany获取指定数量的数据,还可以用fetchall直接获取全部数据。为了结果显示得更加清晰,此处全部使用循环进行数据输出。

import pymssql

# 打开数据库连接
db = pymssql.connect(server='localhost',
                     user='test',
                     password='123456',
                     database='TestDB')

# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)

# 获取全部记录
cursor.execute("SELECT * FROM Persons")
allData = cursor.fetchall()
print("直接获取全部记录:")
for item in allData:
    item["Name"] = item["Name"].encode("latin1").decode("gbk")
    print(item)

# 获取前N条记录
cursor.execute("SELECT * FROM Persons")
manyData = cursor.fetchmany(2)
print("获取部分结果:")
for item in manyData:
    item["Name"] = item["Name"].encode("latin1").decode("gbk")
    print(item)

# 一次读取一条结果,循环获取所有记录
cursor.execute("SELECT * FROM Persons")
print("一次读取一条结果,循环获取所有记录:")
while True:
    singleData = cursor.fetchone()
    if singleData is None:
        break
    singleData["Name"] = singleData["Name"].encode("latin1").decode("gbk")
    print(singleData)

# 关闭游标
cursor.close()

# 提交事务
db.commit()

# 关闭数据库连接
db.close()

运行结果如下:

更新记录(Update)

使用游标执行update语句。此方法支持单条数据修改及批量修改。

import pymssql


# 打开数据库连接
db = pymssql.connect(server='localhost',
                     user='test',
                     password='123456',
                     database='TestDB')

# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)

cursor.execute("SELECT * FROM Persons")
print("修改前的数据:")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
    item["Name"] = item["Name"].encode("latin1").decode("gbk")
    print(item)

# 执行单条数据修改
cursor.execute("UPDATE Persons SET Age = 20 WHERE Name = '张三'")

print("修改单条数据后的数据:")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
    item["Name"] = item["Name"].encode("latin1").decode("gbk")
    print(item)

# 执行批量修改
sql = "UPDATE Persons SET Age = %s WHERE Name = %s"
batchUpdate = cursor.executemany(sql, [(25, '李四'), (35, '王五')])

print("批量修改后的数据:")
cursor.execute("SELECT * FROM Persons")
for item in cursor.fetchall():
    item["Name"] = item["Name"].encode("latin1").decode("gbk")
    print(item)

# 关闭游标
cursor.close()

# 提交事务
db.commit()

# 关闭数据库连接
db.close()

运行结果如下:

删除数据(Delete)

使用游标执行delete语句。此方法也支持单条数据删除以及批量删除。

import pymssql

# 打开数据库连接
db = pymssql.connect(server='localhost',
                     user='test',
                     password='123456',
                     database='TestDB')

# 创建游标对象,并设置返回数据的类型为字典
cursor = db.cursor(as_dict=True)

print("删除前的数据:")
cursor.execute("SELECT * FROM Persons")
data = cursor.fetchall()
if len(data) == 0:
    print(data)
else:
    for item in data:
        item["Name"] = item["Name"].encode("latin1").decode("gbk")
        print(item)

# 执行单条数据删除
cursor.execute("DELETE FROM Persons WHERE ID = 1")

print("删除单条数据后的数据:")
cursor.execute("SELECT * FROM Persons")
data = cursor.fetchall()
if len(data) == 0:
    print(data)
else:
    for item in data:
        item["Name"] = item["Name"].encode("latin1").decode("gbk")
        print(item)

# 执行批量删除
sql = "DELETE FROM Persons WHERE ID = %s"
cursor.executemany(sql, [('2'), ('3')])

print("批量删除后的的数据:")
cursor.execute("SELECT * FROM Persons")
data = cursor.fetchall()
if len(data) == 0:
    print(data)
else:
    for item in data:
        item["Name"] = item["Name"].encode("latin1").decode("gbk")
        print(item)

# 关闭游标
cursor.close()

# 提交事务
db.commit()

# 关闭数据库连接
db.close()

运行结果如下:

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐