Python 之数据库表读取和写入
Python 之数据库表读取和写入
·
pandas
Pandas 中用于与数据库交互的两个关键方法:read_sql 和 to_sql ,本文使用 sqlite 文件数据库进行连接来对这两个方法的使用进行简单演示。
read_sql
read_sql
是 Pandas 提供的用于从数据库读取数据的方法,常用参数为:
- sql:SQL查询语句,必须提供。
- con:数据库连接对象,可以是字符串(表示连接字符串)或SQLAlchemy引擎。
- index_col:指定作为DataFrame索引的列。
- parse_dates:指定需要解析为日期时间的列。
- params:SQL查询中的参数,可以使用字典形式提供。
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('sqlite:///example.db')
def create_data():
data_date = pd.date_range("2024-05-20", periods=4, freq='D')
data_date = list(map(lambda d: d.strftime("%Y-%m-%d"), data_date))
print(data_date) # ['2024-05-20', '2024-05-21', '2024-05-22', '2024-05-23']
data = {
"age": [18, 30, 25, 40],
"city": ["BeiJing", "ShangHai", "GuangZhou", "ShenZhen"],
"created": data_date
}
index = pd.Index(data=["Tom", "Bob", "Mary", "James"], name="name")
user_info = pd.DataFrame(data=data, index=index)
user_info.to_sql(name='employees', con=engine, if_exists='replace', index=True)
def select_data():
res = pd.read_sql('SELECT * FROM employees', con=engine, parse_dates=["created"])
res.to_csv("employee.csv", header=True, index=False) # 查询结果写入 csv 文件
print(res)
print()
# name age city created
# 0 Tom 18 BeiJing 2024-05-20
# 1 Bob 30 ShangHai 2024-05-21
# 2 Mary 25 GuangZhou 2024-05-22
# 3 James 40 ShenZhen 2024-05-23
res = pd.read_sql('SELECT * FROM employees', con=engine, chunksize=2)
for i, r in enumerate(res):
file_path = f"employee_{i}.csv"
r.to_csv(file_path, header=True, index=False) # 查询结果写入 csv 文件
print(r)
print()
# name age city created
# 0 Tom 18 BeiJing 2024-05-20
# 1 Bob 30 ShangHai 2024-05-21
# name age city created
# 0 Mary 25 GuangZhou 2024-05-22
# 1 James 40 ShenZhen 2024-05-23
if __name__ == '__main__':
create_data()
select_data()
employee.csv
name,age,city,created
Tom,18,BeiJing,2024-05-20
Bob,30,ShangHai,2024-05-21
Mary,25,GuangZhou,2024-05-22
James,40,ShenZhen,2024-05-23
employee_0.csv
name,age,city,created
Tom,18,BeiJing,2024-05-20
Bob,30,ShangHai,2024-05-21
employee_1.csv
name,age,city,created
Mary,25,GuangZhou,2024-05-22
James,40,ShenZhen,2024-05-23
to_sql
to_sql 是Pandas用于将 DataFrame 数据写入数据库的方法。常用参数为:
- name:目标数据库表的名称。
- con:数据库连接对象,可以是字符串(表示连接字符串)或 SQLAlchemy 引擎。
- if_exists:如果表已存在,指定处理方式(‘fail’、‘replace’、‘append’)fail 表示直接报错失败,replace 表示对同名的表进行替换覆盖,append 表示将数据追加到原数据后面。
- index:是否将 DataFrame 的索引写入数据库,为 True 时会将索引字段也写到 数据库。
如果只需要导入指定列,可以用 reindex 重建索引选取指定列生成新的 DataFrame 即可
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine('sqlite:///example.db')
def create_data():
data = {
"age": [18, 30, 25, 40],
"city": ["BeiJing", "ShangHai", "GuangZhou", "ShenZhen"]
}
index = pd.Index(data=["Tom", "Bob", "Mary", "James"], name="name")
user_info = pd.DataFrame(data=data, index=index)
user_info.to_sql(name='employees', con=engine, if_exists='replace', index=True)
# 如果只需要导入指定列,可以重建索引选取指定列生成新的DataFrame
user_info = user_info.reindex(columns=['age', 'city'])
user_info.to_sql(name='employees', con=engine, if_exists='replace', index=False)
def select_data():
res = pd.read_sql('SELECT * FROM employees', con=engine)
print(res)
# name age city
# 0 Tom 18 BeiJing
# 1 Bob 30 ShangHai
# 2 Mary 25 GuangZhou
# 3 James 40 ShenZhen
if __name__ == '__main__':
create_data()
select_data()
数据迁移
还可以利用 pandas 对多数据库的支持实现不同数据库之间的数据迁移。
import pandas as pd
from sqlalchemy import create_engine
engine_db1 = create_engine('sqlite:///example.db')
engine_db2 = create_engine('mysql+pymysql://root:12345678@localhost/mysql')
if __name__ == '__main__':
df_db1 = pd.read_sql('SELECT * FROM employees', con=engine_db1)
#
# 将从第一个数据库查询的数据写入第二个数据库
df_db1.to_sql(name='employees', con=engine_db2, if_exists='replace', index=False)
D:\Users\Administrator>mysql -u root -p
Enter password: ********
mysql> use mysql
Database changed
mysql> select * from employees;
+-------+------+-----------+------------+
| name | age | city | created |
+-------+------+-----------+------------+
| Tom | 18 | BeiJing | 2024-05-20 |
| Bob | 30 | ShangHai | 2024-05-21 |
| Mary | 25 | GuangZhou | 2024-05-22 |
| James | 40 | ShenZhen | 2024-05-23 |
+-------+------+-----------+------------+
4 rows in set (0.00 sec)
pymysql
查询示例
import pymysql
con = pymysql.connect(
host="localhost",
database="mysql",
user="root",
password="12345678"
)
if __name__ == '__main__':
cursor = con.cursor()
sql = 'SELECT * FROM employees'
cursor.execute(sql)
description = cursor.description
header = [x[0] for x in description]
print(header)
# ['name', 'age', 'city', 'created']
rows = cursor.fetchall()
for row in rows:
print(row)
# ('Tom', 18, 'BeiJing', '2024-05-20')
# ('Bob', 30, 'ShangHai', '2024-05-21')
# ('Mary', 25, 'GuangZhou', '2024-05-22')
# ('James', 40, 'ShenZhen', '2024-05-23')
sql = 'select column_name from information_schema.columns where table_schema="mysql" and table_name="employees"'
cursor.execute(sql)
rows = cursor.fetchall()
header = [x[0] for x in rows]
print(header)
# ['name', 'age', 'city', 'created']
# 指定游标类型为字典类型,返回的结果以字典的形式返回
cursor = con.cursor(pymysql.cursors.DictCursor)
sql = 'SELECT * FROM employees limit 4'
cursor.execute(sql)
rows = cursor.fetchall()
for row in rows:
print(row)
# {'name': 'Tom', 'age': 18, 'city': 'BeiJing', 'created': '2024-05-20'}
# {'name': 'Bob', 'age': 30, 'city': 'ShangHai', 'created': '2024-05-21'}
# {'name': 'Mary', 'age': 25, 'city': 'GuangZhou', 'created': '2024-05-22'}
# {'name': 'James', 'age': 40, 'city': 'ShenZhen', 'created': '2024-05-23'}
写入示例
import pymysql
con = pymysql.connect(
host="localhost",
database="mysql",
user="root",
password="12345678"
)
if __name__ == '__main__':
cursor = con.cursor()
sql = 'insert into employees (name, age, city) values ("TTT", 10, "SiChuan")'
cursor.execute(sql)
con.commit()
# sql = 'SELECT * FROM employees'
# cursor.execute(sql)
# rows = cursor.fetchall()
# for row in rows:
# print(row)
sqlalchemy
查询示例
import flask
import sqlalchemy
import flask_sqlalchemy
app = flask.Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:12345678@localhost/mysql"
db = flask_sqlalchemy.SQLAlchemy(app)
if __name__ == '__main__':
sql = 'SELECT * FROM employees'
with app.app_context():
rows = db.session.execute(sqlalchemy.text(sql)).fetchall()
for row in rows:
print(row)
# ('Tom', 18, 'BeiJing', '2024-05-20')
# ('Bob', 30, 'ShangHai', '2024-05-21')
# ('Mary', 25, 'GuangZhou', '2024-05-22')
# ('James', 40, 'ShenZhen', '2024-05-23')
# 如果需要 key: value 的结果,可以使用 mappings() 方法返回带列名字段的数据
rows = db.session.execute(sqlalchemy.text(sql)).mappings()
for row in rows:
print(row)
# {'name': 'Tom', 'age': 18, 'city': 'BeiJing', 'created': '2024-05-20'}
# {'name': 'Bob', 'age': 30, 'city': 'ShangHai', 'created': '2024-05-21'}
# {'name': 'Mary', 'age': 25, 'city': 'GuangZhou', 'created': '2024-05-22'}
# {'name': 'James', 'age': 40, 'city': 'ShenZhen', 'created': '2024-05-23'}
写入示例
import flask
import sqlalchemy
import flask_sqlalchemy
app = flask.Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:12345678@localhost/mysql"
db = flask_sqlalchemy.SQLAlchemy(app)
if __name__ == '__main__':
sql = 'insert into employees (name, age, city) values ("Looking", 10, "SiChuan")'
with app.app_context():
db.session.execute(sqlalchemy.text(sql))
db.session.commit()
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献7条内容
所有评论(0)