python生产redis大量测试数据-百万级
测试用python进行redis数据迁移
·
背景
主要是为了测试用python进行redis数据迁移
如下
Python3操作redis百万级数据迁移,单机到单机,集群到集群
使用pipeline提交数据,50万各种数据类型键值对,大概7s生成
脚本
内容如下
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 2024/4/24
import random
import string
import redis
from rediscluster import RedisCluster
from datetime import datetime
import time
def generate_random_string(length):
letters_and_digits = string.ascii_letters + string.digits
random_string = ''.join((random.choice(letters_and_digits) for i in range(length)))
return random_string
def redis_pipe_write(redis_client, count, batch_size=100):
"""
使用pipeline写入测试数据,循环batch_size,提交一次数据
:param redis_client: redis实例
:param count: 循环次数
:param batch_size: 每次pipeline队列数,不要太大
:return:
"""
# 危险操作,清空数据库
# redis_client.flushdb()
with redis_client.pipeline() as pipe:
for i in range(count):
# 创建不同类型的键值对
ramdom_10 = generate_random_string(10)
ramdom_20 = generate_random_string(20)
ramdom_30 = generate_random_string(30)
ramdom_35 = generate_random_string(35)
ramdom_40 = generate_random_string(40)
ramdom_50 = generate_random_string(50)
# 字符串类型
string_key = "test2_string_{}".format(i)
string_value = ramdom_50
pipe.set(string_key, string_value)
# 列表类型
list_key = "Test2_list_{}".format(i)
list_value1 = ramdom_10
list_value2 = ramdom_20
pipe.rpush(list_key, list_value1, list_value2)
# 集合类型
set_key = "Test2_set_{}".format(i)
set_value1 = ramdom_20
set_value2 = ramdom_30
pipe.sadd(set_key, set_value1, set_value2)
# 有序集合类型
zset_key = "Test2_zset_{}".format(i)
zset_value1 = ramdom_20
zset_value2 = ramdom_30
pipe.zadd(zset_key, {zset_value1: 1, zset_value2: 2})
# 哈希类型
hash_key = "Test2_hash_{}".format(i)
hash_field1 = ramdom_35
hash_value1 = ramdom_40
pipe.hset(hash_key, hash_field1, hash_value1)
# print(f"redis 第{i}次测试数据提交完毕!")
# 如果数据量较大
if count >= batch_size:
if i % batch_size == 0:
print(f"redis 第{i}次测试数据提交完毕!")
pipe.execute()
pipe.reset()
# 如果不是整除,最后提交一次
print(f"redis 第{i}次测试数据提交完毕!")
pipe.execute()
pipe.reset()
print(f"redis测试数据写入完毕!")
# redis单节点
pool = redis.ConnectionPool(
host='192.168.10.2',
port=6369,
db=0,
password='',
encoding='utf-8',
decode_responses=True,
socket_timeout=10,
max_connections=100
)
r = redis.Redis(connection_pool=pool)
# # Redis集群
# source_node_list = [
# {"host": "192.168.11.1", "port": "6379"},
# {"host": "192.168.11.2", "port": "6379"},
# {"host": "192.168.11.3", "port": "6379"},
# {"host": "192.168.11.4", "port": "6379"},
# {"host": "192.168.11.5", "port": "6379"},
# {"host": "192.168.11.6", "port": "6379"},
# ]
# # 创建RedisCluster的实例
# # decode_responses设置为True会自动将响应数据解码为utf-8编码的字符串
# redis_cluster_source = RedisCluster(
# startup_nodes=source_node_list,
# decode_responses=True,
# password='',
# max_connections=100
# )
if __name__ == '__main__':
start_time = time.perf_counter()
# 单节点,用连接池
# 下面的1000代表5个数据类型,每个1000键值对,batch_size代表循环100次提交一次pipeline
# 最终会生成1000*5 = 5000个各类型的键值对测试数据,根据实际情况修改参数
redis_pipe_write(r, 1000, batch_size=100)
# 集群
# redis_pipe_write(redis_cluster_source, 105301, batch_size=3000)
print("结束时间:", datetime.now())
end_time = time.perf_counter()
# 计算执行时间
execution_time = end_time - start_time
print(f"代码执行时间: {execution_time} 秒")
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献2条内容
所有评论(0)