pandas

Pandas 中用于与数据库交互的两个关键方法:read_sql 和 to_sql ,本文使用 sqlite 文件数据库进行连接来对这两个方法的使用进行简单演示。

read_sql

read_sql 是 Pandas 提供的用于从数据库读取数据的方法,常用参数为:

  • sql:SQL查询语句,必须提供。
  • con:数据库连接对象,可以是字符串(表示连接字符串)或SQLAlchemy引擎。
  • index_col:指定作为DataFrame索引的列。
  • parse_dates:指定需要解析为日期时间的列。
  • params:SQL查询中的参数,可以使用字典形式提供。
import pandas as pd

from sqlalchemy import create_engine

engine = create_engine('sqlite:///example.db')


def create_data():
    data_date = pd.date_range("2024-05-20", periods=4, freq='D')
    data_date = list(map(lambda d: d.strftime("%Y-%m-%d"), data_date))
    print(data_date)  # ['2024-05-20', '2024-05-21', '2024-05-22', '2024-05-23']
    data = {
        "age": [18, 30, 25, 40],
        "city": ["BeiJing", "ShangHai", "GuangZhou", "ShenZhen"],
        "created": data_date
    }
    index = pd.Index(data=["Tom", "Bob", "Mary", "James"], name="name")
    user_info = pd.DataFrame(data=data, index=index)
    user_info.to_sql(name='employees', con=engine, if_exists='replace', index=True)


def select_data():
    res = pd.read_sql('SELECT * FROM employees', con=engine, parse_dates=["created"])
    res.to_csv("employee.csv", header=True, index=False)  # 查询结果写入 csv 文件
    print(res)
    print()
    #     name  age       city    created
    # 0    Tom   18    BeiJing 2024-05-20
    # 1    Bob   30   ShangHai 2024-05-21
    # 2   Mary   25  GuangZhou 2024-05-22
    # 3  James   40   ShenZhen 2024-05-23
    res = pd.read_sql('SELECT * FROM employees', con=engine, chunksize=2)
    for i, r in enumerate(res):
        file_path = f"employee_{i}.csv"
        r.to_csv(file_path, header=True, index=False)  # 查询结果写入 csv 文件
        print(r)
        print()
        #   name  age      city     created
        # 0  Tom   18   BeiJing  2024-05-20
        # 1  Bob   30  ShangHai  2024-05-21

        #     name  age       city     created
        # 0   Mary   25  GuangZhou  2024-05-22
        # 1  James   40   ShenZhen  2024-05-23


if __name__ == '__main__':
    create_data()
    select_data()

employee.csv

name,age,city,created
Tom,18,BeiJing,2024-05-20
Bob,30,ShangHai,2024-05-21
Mary,25,GuangZhou,2024-05-22
James,40,ShenZhen,2024-05-23

employee_0.csv

name,age,city,created
Tom,18,BeiJing,2024-05-20
Bob,30,ShangHai,2024-05-21

employee_1.csv

name,age,city,created
Mary,25,GuangZhou,2024-05-22
James,40,ShenZhen,2024-05-23

to_sql

to_sql 是Pandas用于将 DataFrame 数据写入数据库的方法。常用参数为:

  • name:目标数据库表的名称。
  • con:数据库连接对象,可以是字符串(表示连接字符串)或 SQLAlchemy 引擎。
  • if_exists:如果表已存在,指定处理方式(‘fail’、‘replace’、‘append’)fail 表示直接报错失败,replace 表示对同名的表进行替换覆盖,append 表示将数据追加到原数据后面。
  • index:是否将 DataFrame 的索引写入数据库,为 True 时会将索引字段也写到 数据库。
如果只需要导入指定列,可以用 reindex 重建索引选取指定列生成新的 DataFrame 即可
import pandas as pd

from sqlalchemy import create_engine

engine = create_engine('sqlite:///example.db')


def create_data():
    data = {
        "age": [18, 30, 25, 40],
        "city": ["BeiJing", "ShangHai", "GuangZhou", "ShenZhen"]
    }
    index = pd.Index(data=["Tom", "Bob", "Mary", "James"], name="name")
    user_info = pd.DataFrame(data=data, index=index)
    user_info.to_sql(name='employees', con=engine, if_exists='replace', index=True)

    # 如果只需要导入指定列,可以重建索引选取指定列生成新的DataFrame
    user_info = user_info.reindex(columns=['age', 'city'])
    user_info.to_sql(name='employees', con=engine, if_exists='replace', index=False)


def select_data():
    res = pd.read_sql('SELECT * FROM employees', con=engine)
    print(res)
    #     name  age       city
    # 0    Tom   18    BeiJing
    # 1    Bob   30   ShangHai
    # 2   Mary   25  GuangZhou
    # 3  James   40   ShenZhen


if __name__ == '__main__':
    create_data()
    select_data()

数据迁移

还可以利用 pandas 对多数据库的支持实现不同数据库之间的数据迁移。

import pandas as pd

from sqlalchemy import create_engine

engine_db1 = create_engine('sqlite:///example.db')
engine_db2 = create_engine('mysql+pymysql://root:12345678@localhost/mysql')

if __name__ == '__main__':
    df_db1 = pd.read_sql('SELECT * FROM employees', con=engine_db1)
    #
    # 将从第一个数据库查询的数据写入第二个数据库
    df_db1.to_sql(name='employees', con=engine_db2, if_exists='replace', index=False)
D:\Users\Administrator>mysql -u root -p
Enter password: ********
mysql> use mysql
Database changed
mysql> select * from employees;
+-------+------+-----------+------------+
| name  | age  | city      | created    |
+-------+------+-----------+------------+
| Tom   |   18 | BeiJing   | 2024-05-20 |
| Bob   |   30 | ShangHai  | 2024-05-21 |
| Mary  |   25 | GuangZhou | 2024-05-22 |
| James |   40 | ShenZhen  | 2024-05-23 |
+-------+------+-----------+------------+
4 rows in set (0.00 sec)

pymysql

查询示例 

import pymysql

con = pymysql.connect(
    host="localhost",
    database="mysql",
    user="root",
    password="12345678"
)

if __name__ == '__main__':
    cursor = con.cursor()
    sql = 'SELECT * FROM employees'
    cursor.execute(sql)
    description = cursor.description
    header = [x[0] for x in description]
    print(header)
    # ['name', 'age', 'city', 'created']
    rows = cursor.fetchall()
    for row in rows:
        print(row)
        # ('Tom', 18, 'BeiJing', '2024-05-20')
        # ('Bob', 30, 'ShangHai', '2024-05-21')
        # ('Mary', 25, 'GuangZhou', '2024-05-22')
        # ('James', 40, 'ShenZhen', '2024-05-23')

    sql = 'select column_name from information_schema.columns where table_schema="mysql" and table_name="employees"'
    cursor.execute(sql)
    rows = cursor.fetchall()
    header = [x[0] for x in rows]
    print(header)
    # ['name', 'age', 'city', 'created']
    # 指定游标类型为字典类型,返回的结果以字典的形式返回
    cursor = con.cursor(pymysql.cursors.DictCursor)
    sql = 'SELECT * FROM employees limit 4'
    cursor.execute(sql)
    rows = cursor.fetchall()
    for row in rows:
        print(row)
        # {'name': 'Tom', 'age': 18, 'city': 'BeiJing', 'created': '2024-05-20'}
        # {'name': 'Bob', 'age': 30, 'city': 'ShangHai', 'created': '2024-05-21'}
        # {'name': 'Mary', 'age': 25, 'city': 'GuangZhou', 'created': '2024-05-22'}
        # {'name': 'James', 'age': 40, 'city': 'ShenZhen', 'created': '2024-05-23'}

写入示例

import pymysql

con = pymysql.connect(
    host="localhost",
    database="mysql",
    user="root",
    password="12345678"
)

if __name__ == '__main__':
    cursor = con.cursor()
    sql = 'insert into employees (name, age, city) values ("TTT", 10, "SiChuan")'
    cursor.execute(sql)
    con.commit()
    # sql = 'SELECT * FROM employees'
    # cursor.execute(sql)
    # rows = cursor.fetchall()
    # for row in rows:
    #     print(row)

sqlalchemy

查询示例

import flask
import sqlalchemy
import flask_sqlalchemy

app = flask.Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:12345678@localhost/mysql"
db = flask_sqlalchemy.SQLAlchemy(app)

if __name__ == '__main__':
    sql = 'SELECT * FROM employees'
    with app.app_context():
        rows = db.session.execute(sqlalchemy.text(sql)).fetchall()
        for row in rows:
            print(row)
            # ('Tom', 18, 'BeiJing', '2024-05-20')
            # ('Bob', 30, 'ShangHai', '2024-05-21')
            # ('Mary', 25, 'GuangZhou', '2024-05-22')
            # ('James', 40, 'ShenZhen', '2024-05-23')
        # 如果需要 key: value 的结果,可以使用 mappings() 方法返回带列名字段的数据
        rows = db.session.execute(sqlalchemy.text(sql)).mappings() 
        for row in rows:
            print(row)
            # {'name': 'Tom', 'age': 18, 'city': 'BeiJing', 'created': '2024-05-20'}
            # {'name': 'Bob', 'age': 30, 'city': 'ShangHai', 'created': '2024-05-21'}
            # {'name': 'Mary', 'age': 25, 'city': 'GuangZhou', 'created': '2024-05-22'}
            # {'name': 'James', 'age': 40, 'city': 'ShenZhen', 'created': '2024-05-23'}

写入示例

import flask
import sqlalchemy
import flask_sqlalchemy

app = flask.Flask(__name__)
app.config["SQLALCHEMY_DATABASE_URI"] = "mysql+pymysql://root:12345678@localhost/mysql"
db = flask_sqlalchemy.SQLAlchemy(app)

if __name__ == '__main__':
    sql = 'insert into employees (name, age, city) values ("Looking", 10, "SiChuan")'
    with app.app_context():
        db.session.execute(sqlalchemy.text(sql))
        db.session.commit()
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐