背景

主要是为了测试用python进行redis数据迁移

如下
Python3操作redis百万级数据迁移,单机到单机,集群到集群

使用pipeline提交数据,50万各种数据类型键值对,大概7s生成

脚本

内容如下

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 2024/4/24

import random
import string
import redis
from rediscluster import RedisCluster
from datetime import datetime
import time


def generate_random_string(length):
    letters_and_digits = string.ascii_letters + string.digits
    random_string = ''.join((random.choice(letters_and_digits) for i in range(length)))
    return random_string


def redis_pipe_write(redis_client, count, batch_size=100):
    """
    使用pipeline写入测试数据,循环batch_size,提交一次数据
    :param redis_client: redis实例
    :param count: 循环次数
    :param batch_size: 每次pipeline队列数,不要太大
    :return:
    """

    # 危险操作,清空数据库
    # redis_client.flushdb()

    with redis_client.pipeline() as pipe:
        for i in range(count):
            # 创建不同类型的键值对

            ramdom_10 = generate_random_string(10)
            ramdom_20 = generate_random_string(20)
            ramdom_30 = generate_random_string(30)
            ramdom_35 = generate_random_string(35)
            ramdom_40 = generate_random_string(40)
            ramdom_50 = generate_random_string(50)

            # 字符串类型
            string_key = "test2_string_{}".format(i)
            string_value = ramdom_50
            pipe.set(string_key, string_value)

            # 列表类型
            list_key = "Test2_list_{}".format(i)
            list_value1 = ramdom_10
            list_value2 = ramdom_20
            pipe.rpush(list_key, list_value1, list_value2)

            # 集合类型
            set_key = "Test2_set_{}".format(i)
            set_value1 = ramdom_20
            set_value2 = ramdom_30
            pipe.sadd(set_key, set_value1, set_value2)

            # 有序集合类型
            zset_key = "Test2_zset_{}".format(i)
            zset_value1 = ramdom_20
            zset_value2 = ramdom_30
            pipe.zadd(zset_key, {zset_value1: 1, zset_value2: 2})

            # 哈希类型
            hash_key = "Test2_hash_{}".format(i)
            hash_field1 = ramdom_35
            hash_value1 = ramdom_40
            pipe.hset(hash_key, hash_field1, hash_value1)

            # print(f"redis 第{i}次测试数据提交完毕!")

            # 如果数据量较大
            if count >= batch_size:
                if i % batch_size == 0:
                    print(f"redis 第{i}次测试数据提交完毕!")
                    pipe.execute()
                    pipe.reset()

        # 如果不是整除,最后提交一次
        print(f"redis 第{i}次测试数据提交完毕!")
        pipe.execute()
        pipe.reset()

    print(f"redis测试数据写入完毕!")


# redis单节点
pool = redis.ConnectionPool(
    host='192.168.10.2',
    port=6369,
    db=0,
    password='',
    encoding='utf-8',
    decode_responses=True,
    socket_timeout=10,
    max_connections=100
)
r = redis.Redis(connection_pool=pool)

# # Redis集群
# source_node_list = [
#     {"host": "192.168.11.1", "port": "6379"},
#     {"host": "192.168.11.2", "port": "6379"},
#     {"host": "192.168.11.3", "port": "6379"},
#     {"host": "192.168.11.4", "port": "6379"},
#     {"host": "192.168.11.5", "port": "6379"},
#     {"host": "192.168.11.6", "port": "6379"},
# ]
# # 创建RedisCluster的实例
# # decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串
# redis_cluster_source = RedisCluster(
#     startup_nodes=source_node_list,
#     decode_responses=True,
#     password='',
#     max_connections=100
# )


if __name__ == '__main__':
    start_time = time.perf_counter()

    # 单节点,用连接池
    # 下面的1000代表5个数据类型,每个1000键值对,batch_size代表循环100次提交一次pipeline
    # 最终会生成1000*5 = 5000个各类型的键值对测试数据,根据实际情况修改参数
    redis_pipe_write(r, 1000, batch_size=100)

    # 集群
    # redis_pipe_write(redis_cluster_source, 105301, batch_size=3000)

    print("结束时间:", datetime.now())
    end_time = time.perf_counter()

    # 计算执行时间
    execution_time = end_time - start_time
    print(f"代码执行时间: {execution_time} 秒")
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐