Redis五种数据类型内部实现
部分选自 redis源码分析之跳跃表一.RedisRedis 是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库。Redis 与其他 key - value 缓存产品有以下三个特点:Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。Redis不仅仅支持简单的key-value类型的数据,同时还提供li...
部分选自 redis源码分析之跳跃表
一.Redis
Redis 是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库。Redis 与其他 key - value 缓存产品有以下三个特点:
- Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。
- Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。
- Redis支持数据的备份,即master-slave模式的数据备份。
它可以支持五种基本数据类型,分别是字符串(string),列表(list),集合(set),有序集合(zset)以及哈希(hash),下面具体介绍下它们的特点以及内部实现。
1.1 .RedisObject
typedef struct redisObject{
//类型
unsigned type:4;
//编码
unsigned encoding:4;
//指向底层数据结构的指针
void *ptr;
//引用计数器
int refCount;
//最后一次的访问时间
unsigned lru:
}
- type:对象的数据类型,string、hash、list、set、zset。
- encoding:表示redisObject对象的底层编码实现,主要有简单动态字符串,链表,字典,跳跃表,整数集合以及压缩列表,每一个value都是由两种及以上的上述编码所构成。
- *ptr :指向底层实现的数据的指针。
- refCount :引用计数器,当创建一个对象的时间便将它的值初始化为1,当它被其它程序引用之时则加1,不再被引用则减1,当它的引用计数值变为0时,对象所占用的内存就会被释放。因此它主要有两个用途,内存回收的标志以及用于对象共享。
- 对象共享:当新建的两个或多个键都是整数值并且相同时,则它们的键会共享这一个值对象,这样可以减少内存的分配和回收
- lru:最后一次访问该对象的时间,可以通过Object idletime查看当前时间距离该键的lru的时间。
1.2 字符串对象
struct sdshdr{
//记录buf数组中已使用字节的长度
int len;
//记录buf数组中剩余空间的长度
int free;
//字节数组,用于存储字符串
char buf[];
};
int用来存放整型数据,sds存放字节/字符串和浮点型数据
当free为0时表示没有任何未使用的空间,len表示字符串的长度,buf中存储每一个字节以及空字符,由于SDS遵循了C字符串以为’\0’结尾的特点,因此它也可以直接重用部分C函数库里的函数。
相比于C字符串,SDS具有以下特点:获取长度的时间复杂度为O(1),因为len直接保存了当前字符串的长度;避免缓存区溢出,当C字符串进行拼接之时,如果未事先对当前字符串的空间进行调整,则可能会导致当前字符串的数据溢出,导致拼接过来的字符串内容被意外的修改,而SDS在拼接之前会进行自动的调整和扩展;减少内存分配次数,在SDS拼接发生以后,如果此时的len小于1MB则它会多分配和len大小相同的未使用空间,用free表示,如果大于1MB,则会分配1MB的为使用空间;惰性空间释放,当字符串进行缩短的时候,程序并不会立即回收空间(也可以调用API立即释放),而是记录到free之中,以便于后序拼接的使用。
1.3 列表对象
链表提供了节点重排以及节点顺序访问的能力,redis中的列表对象主要是由压缩列表和双端链表实现。
type struct list{
//表头节点
listNode *head;
//表尾节点
listNode *tail;
//包含的节点总数
unsigned long len;
//一些操作函数 dup free match...
}
压缩表:
- previous_entry_length,记录了压缩列表中前一节点的字节长度,当小于254字节时,它的长度为1字节,当大于254字节时,长度为5字节且后4字节保存真正的长度,用于表尾向表头遍历。
- content,节点所存储的内容,可以是一个字节数组或者整数。
- encoding,记录content属性中所保存的数据类型以及长度。
1.4 集合对象
typedef struct intset{
//编码方式
uint32_t encoding;
//元素数量
uint32_t length;
//存储元素的数组
int8_t contents[];
}
集合对象的编码可以是整数集合(intset)或者字典(hashtable)。 整数集合的每个元素都是contents数组的一个数组项,各个项在数组中按值得大小从小到大有序排列,并且不包含重复的项。contents数组中元素的类型由encoding决定,当新加入元素之时,如果元素的编码大于contents是数组的编码,则会将所有元素的编码升级为新加入元素的编码,然后再插入。编码不会发生降级。
字典:
typedef struct dict{
//类型特定函数
dictType *type;
//哈希表 两个,一个用于实时存储,一个用于rehash
dictht ht[2];
//rehash索引 数据迁移时使用
unsigned rehashidx;
}
其中键值对都保存在节点dictEntry之中,并且通过拉链法解决哈希冲突,存储时通过MurmurHash算法来计算键的哈希值,能够更好的提供随机分布性且速度也快。扩容时时采用渐进式的rehash,采用分而治之的方法,通过改变rehashidx的值,来一个个将元素移动到ht[1]中,完成以后将ht[1]变为ht[0],原先的ht[0]变为ht[1],同时将redhashidx置为-1。
哈希表:
typedef struct dictht{
//哈希表数组
dictEntry **table;
//哈希表大小
unsigned long size;
//哈希表掩码,总是等于size-1,存储时计算索引值
unsigned long sizemask;
//已有元素数量
unsigned long used;
}
1.5 有序集合
有序集合的编码可以是压缩列表(ziplist)或者跳跃表(skiplist)。 当元素数量小于128个并且所有元素成员的长度都小于64字节之时使用ziplist编码,否则使用skiplist编码。
1.5.1 跳跃表结构
如果我们想找到67,需要遍历5个节点,时间复杂度是O(N)。如果我们提取链表中的节点到高层,利用高层大的区间查询节点会加快范围,如下图:
通过高层的区间指针,我们只需遍历3个节点就能查到数据。如果我们继续扩大区间,将进一步减少需要遍历的节点:
发现只需遍历2个节点就可以查到数据。
1.5.2 跳跃表结构
数据节点:
/**
* ZSETs use a specialized version of Skiplists
* 跳跃表中的数据节点
*/
typedef struct zskiplistNode {
//存放value值
sds ele;
//zset的score值
double score;
// 后退指针,只能指向当前节点最底层节点的前一个节点,头节点和第一个节点的backward为NULL
struct zskiplistNode *backward;
// 层
struct zskiplistLevel {
// 指向当前层级的下一个节点
struct zskiplistNode *forward;
/**
* forward指向的节点与本节点之间的元素个数,span值越大,跳过的节点个数越多
*/
unsigned long span;
} level[];
} zskiplistNode;
跳表结构体:
/**
* 跳跃表结构体
*/
typedef struct zskiplist {
//头,尾指针,分别指向头尾结点
struct zskiplistNode *header, *tail;
//跳表长度,除头节点之外的节点总数
unsigned long length;
//跳表高度
int level;
} zskiplist
有序集合结构体:
/**
* 有序集合结构体
*/
typedef struct zset {
/*
* Redis 会将跳跃表中所有的元素和分值组成
* key-value 的形式保存在字典中
*/
dict *dict;
/*
* 底层指向的跳跃表的指针
*/
zskiplist *zsl;
} zset;
1.5.3 跳跃表初始化
server.c:180行
{"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_NONE);
}
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
//中间省略不重要代码
...
//先查看是否存在key
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
//zset_max_ziplist_entries 配置文件中默认为128
//zset_max_ziplist_value默认为64,就是说,你的值长度如果大于64,就默认创建set,否则创建ziplist
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
} else {
if (zobj->type != OBJ_ZSET) {
addReply(c,shared.wrongtypeerr);
goto cleanup;
}
}
//继续看createZsetObject
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType,NULL);
//创建skiplist
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
//继续看zslCreate
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
#define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
//创建skpilistNode
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}
1.5.4 跳跃表插入节点
获取随机层高:
//随机函数
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
random()生成一个uint32的数字,random()&0xFFFF 就是把高16位清零,
那低16位取值就落在0x0000-0xFFFF之间,假如random()生成的随机数比较均匀,那
random()&0xFFFF < 0xFFFF/4的概率就是1/4
插入操作过程如下:
- 查找要插入的位置
- 调整跳表高度
- 插入节点
- 调整backward
1.查找要插入的位置:
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
// update存放需要更新的节点
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
//rank记录当前header节点到update[i]所经历的步长
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
// 第一步,收集需要更新的节点与步长信息
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// score可以重复,重复时使用ele大小进行排序
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
查找节点(score=22,level=3)的插入位置,逻辑如下:
1)第一次for循环,i=1。x现在为跳跃表的头节点;
2)现在i的值与zsl->level-1相等,所以rank[1]的值为0;
3)header->level[1].forward存在,并且header->level[1].forward->score(1)小于要插入值的score,所以while循环可以进入,rank[1]=1,x赋值为第一个节点;
4)第一个节点的第1层的forward指向NULL,所以while循环不会再进入,经过第一次for循环,rank[1]=1,x和update[1]都为第一个节点(score=1)。
如下图 :
5)for循环进入第二次,i=0。x为跳跃表第一个节点(score=1)
6)现在i的值与zsl->level-1不相等,所以rank[0]等于rank[1]的值赋值为1;
6)x->level[0]->forward存在,并且x->level[0].foreard->score(13)小于要插入的score,所以while循环可以进入,rank[0]=2,x为第二个节点(score=13)。
7)x->level[0]->forward存在,并且x->level[0].foreard->score(45)大于要插入的score,所以while不会再进入,经过第二次for循环,rank[0]=2,x和update[0]都为第二个节点(score=13),如下图:
2. 调整跳表高度
level = zslRandomLevel();
//更新
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
第二步, 获取随机层高,加入随机的层高是3,如下图:
3.插入节点
第三步,创建并分层插入节点,同时更新同层前一节点步长信息
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
level的值为3,所以for循环可以执行三遍,插入过程如下:
for循环第一遍:
1)x的level[0]的forward为update[0]的level[0]的forward节点,即x->level[0].forward为score=45的节点;
2)update[0]的level[0]的下个节点为新插入的节点
3)rank[0]-rank[0]的值为0,update[0]->level[0].span=1,所以x->level[0].span=1
update[0]->level[0].span=1。
for (i = 1; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for循环第二遍:
1)x的level[1]的forward为update[1]的level[1]的forward节点,即x->level[1].forward为NULL;
2)update[1]的level[1]的下个节点为新插入的节点
3)rank[0]-rank[1]的值为1,update[1]->level[1].span等于2,所以x->level[1].span=1
4)update[1]->level[1].span=2。
for (i = 2; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
for循环第三遍:
1)x的level[2]的forward为update[2]的level[2]的forward节点,即x->level[2].forward为NULL;
2)update[2]的level[2]的下个节点为新插入的节点;
3)rank[0]-rank[2]的值为2,因为update[2]->level[2].span等于跳跃表的总长度3,所以x->level[2].span=1;
4)update[2]->level[2].span=3。
4.调整 backward
第四步,更新新增节点未涉及层节点的步长信息,以及跳表相关信息:
5.查找节点
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}
1.6 哈希
哈希对象的编码可以是压缩列表(ziplist)或者字典(hashtable),当哈希对象保存的所有键值对的键和值得长度都小于64字节并且元素数量小于512个时使用ziplist,否则使用hashtable。使用ziplist时,是依次将键和值压入链表之中,两者相邻。使用hashtable是是将键值对存于dictEntry之中。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)