部分选自 redis源码分析之跳跃表

一.Redis

         Redis 是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库。Redis 与其他 key - value 缓存产品有以下三个特点:

  • Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。
  • Redis不仅仅支持简单的key-value类型的数据,同时还提供list,set,zset,hash等数据结构的存储。
  • Redis支持数据的备份,即master-slave模式的数据备份。

       它可以支持五种基本数据类型,分别是字符串(string),列表(list),集合(set),有序集合(zset)以及哈希(hash),下面具体介绍下它们的特点以及内部实现。

1.1 .RedisObject

typedef struct redisObject{
//类型
unsigned type:4;
//编码
unsigned encoding:4;
//指向底层数据结构的指针
void *ptr;
//引用计数器
int refCount;
//最后一次的访问时间
unsigned lru:
}
  • type:对象的数据类型,string、hash、list、set、zset。
  • encoding:表示redisObject对象的底层编码实现,主要有简单动态字符串,链表,字典,跳跃表,整数集合以及压缩列表,每一个value都是由两种及以上的上述编码所构成。
  • *ptr :指向底层实现的数据的指针。
  • refCount :引用计数器,当创建一个对象的时间便将它的值初始化为1,当它被其它程序引用之时则加1,不再被引用则减1,当它的引用计数值变为0时,对象所占用的内存就会被释放。因此它主要有两个用途,内存回收的标志以及用于对象共享。 
  • 对象共享:当新建的两个或多个键都是整数值并且相同时,则它们的键会共享这一个值对象,这样可以减少内存的分配和回收
  • lru:最后一次访问该对象的时间,可以通过Object idletime查看当前时间距离该键的lru的时间。

1.2 字符串对象

struct sdshdr{
//记录buf数组中已使用字节的长度
int len;
//记录buf数组中剩余空间的长度
int free;
//字节数组,用于存储字符串
char buf[];
};

                                                       int用来存放整型数据,sds存放字节/字符串和浮点型数据 

       当free为0时表示没有任何未使用的空间,len表示字符串的长度,buf中存储每一个字节以及空字符,由于SDS遵循了C字符串以为’\0’结尾的特点,因此它也可以直接重用部分C函数库里的函数。 
相比于C字符串,SDS具有以下特点:获取长度的时间复杂度为O(1),因为len直接保存了当前字符串的长度;避免缓存区溢出,当C字符串进行拼接之时,如果未事先对当前字符串的空间进行调整,则可能会导致当前字符串的数据溢出,导致拼接过来的字符串内容被意外的修改,而SDS在拼接之前会进行自动的调整和扩展;减少内存分配次数,在SDS拼接发生以后,如果此时的len小于1MB则它会多分配和len大小相同的未使用空间,用free表示,如果大于1MB,则会分配1MB的为使用空间;惰性空间释放,当字符串进行缩短的时候,程序并不会立即回收空间(也可以调用API立即释放),而是记录到free之中,以便于后序拼接的使用。

1.3 列表对象

链表提供了节点重排以及节点顺序访问的能力,redis中的列表对象主要是由压缩列表和双端链表实现。

type struct list{
//表头节点
listNode *head;
//表尾节点
listNode *tail;
//包含的节点总数
unsigned long len;
//一些操作函数 dup free match...
}

压缩表:

  • previous_entry_length,记录了压缩列表中前一节点的字节长度,当小于254字节时,它的长度为1字节,当大于254字节时,长度为5字节且后4字节保存真正的长度,用于表尾向表头遍历。
  • content,节点所存储的内容,可以是一个字节数组或者整数。
  • encoding,记录content属性中所保存的数据类型以及长度。

1.4 集合对象

typedef struct intset{
//编码方式
uint32_t encoding;
//元素数量
uint32_t length;
//存储元素的数组
int8_t contents[];
}

     集合对象的编码可以是整数集合(intset)或者字典(hashtable)。 整数集合的每个元素都是contents数组的一个数组项,各个项在数组中按值得大小从小到大有序排列,并且不包含重复的项。contents数组中元素的类型由encoding决定,当新加入元素之时,如果元素的编码大于contents是数组的编码,则会将所有元素的编码升级为新加入元素的编码,然后再插入。编码不会发生降级。

字典:

typedef struct dict{
//类型特定函数
dictType *type;
//哈希表 两个,一个用于实时存储,一个用于rehash
dictht ht[2];
//rehash索引 数据迁移时使用
unsigned rehashidx;
}

       其中键值对都保存在节点dictEntry之中,并且通过拉链法解决哈希冲突,存储时通过MurmurHash算法来计算键的哈希值,能够更好的提供随机分布性且速度也快。扩容时时采用渐进式的rehash,采用分而治之的方法,通过改变rehashidx的值,来一个个将元素移动到ht[1]中,完成以后将ht[1]变为ht[0],原先的ht[0]变为ht[1],同时将redhashidx置为-1。

哈希表:

typedef struct dictht{
//哈希表数组
dictEntry **table;
//哈希表大小
unsigned long size;
//哈希表掩码,总是等于size-1,存储时计算索引值
unsigned long sizemask;
//已有元素数量
unsigned long used;
}

1.5  有序集合

       有序集合的编码可以是压缩列表(ziplist)或者跳跃表(skiplist)。 当元素数量小于128个并且所有元素成员的长度都小于64字节之时使用ziplist编码,否则使用skiplist编码。

1.5.1 跳跃表结构

       如果我们想找到67,需要遍历5个节点,时间复杂度是O(N)。如果我们提取链表中的节点到高层,利用高层大的区间查询节点会加快范围,如下图:

       通过高层的区间指针,我们只需遍历3个节点就能查到数据。如果我们继续扩大区间,将进一步减少需要遍历的节点:

       发现只需遍历2个节点就可以查到数据。

1.5.2 跳跃表结构

数据节点:

/**
 * ZSETs use a specialized version of Skiplists
 * 跳跃表中的数据节点
 */
typedef struct zskiplistNode {
    //存放value值
    sds ele;
    //zset的score值
    double score;
    // 后退指针,只能指向当前节点最底层节点的前一个节点,头节点和第一个节点的backward为NULL
    struct zskiplistNode *backward;
    // 层
    struct zskiplistLevel {
        // 指向当前层级的下一个节点
        struct zskiplistNode *forward;
        /**
         * forward指向的节点与本节点之间的元素个数,span值越大,跳过的节点个数越多
         */
        unsigned long span;
    } level[];
} zskiplistNode;

跳表结构体:

/**
 * 跳跃表结构体
 */
typedef struct zskiplist {
    //头,尾指针,分别指向头尾结点
    struct zskiplistNode *header, *tail;
    //跳表长度,除头节点之外的节点总数
    unsigned long length;
    //跳表高度
    int level;
} zskiplist

有序集合结构体:

/**
 * 有序集合结构体
 */
typedef struct zset {
    /*
     * Redis 会将跳跃表中所有的元素和分值组成 
     * key-value 的形式保存在字典中
     */
    dict *dict;
    /*
     * 底层指向的跳跃表的指针
     */
    zskiplist *zsl;
} zset;

1.5.3 跳跃表初始化

 

server.c:180行
 {"zadd",zaddCommand,-4,"wmF",0,NULL,1,1,1,0,0},
 
 void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_NONE);
}

void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    //中间省略不重要代码
    ...
    //先查看是否存在key
    zobj = lookupKeyWrite(c->db,key);
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        //zset_max_ziplist_entries 配置文件中默认为128
        //zset_max_ziplist_value默认为64,就是说,你的值长度如果大于64,就默认创建set,否则创建ziplist
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetZiplistObject();
        }
        dbAdd(c->db,key,zobj);
    } else {
        if (zobj->type != OBJ_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }
    
    //继续看createZsetObject
    robj *createZsetObject(void) {
        zset *zs = zmalloc(sizeof(*zs));
        robj *o;
    
        zs->dict = dictCreate(&zsetDictType,NULL);
        //创建skiplist
        zs->zsl = zslCreate();
        o = createObject(OBJ_ZSET,zs);
        o->encoding = OBJ_ENCODING_SKIPLIST;
        return o;
    }
    
    //继续看zslCreate
    zskiplist *zslCreate(void) {
        int j;
        zskiplist *zsl;
    
        zsl = zmalloc(sizeof(*zsl));
        zsl->level = 1;
        zsl->length = 0;
        zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
        for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
            zsl->header->level[j].forward = NULL;
            zsl->header->level[j].span = 0;
        }
        zsl->header->backward = NULL;
        zsl->tail = NULL;
        return zsl;
    }
    
    #define ZSKIPLIST_MAXLEVEL 64 /* Should be enough for 2^64 elements */
    
    //创建skpilistNode
    zskiplistNode *zslCreateNode(int level, double score, sds ele) {
        zskiplistNode *zn =
            zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
        zn->score = score;
        zn->ele = ele;
        return zn;
    }

 1.5.4 跳跃表插入节点

 获取随机层高:

//随机函数
int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}


random()生成一个uint32的数字,random()&0xFFFF 就是把高16位清零,
那低16位取值就落在0x0000-0xFFFF之间,假如random()生成的随机数比较均匀,那
random()&0xFFFF < 0xFFFF/4的概率就是1/4

插入操作过程如下:

  • 查找要插入的位置
  • 调整跳表高度
  • 插入节点
  • 调整backward

1.查找要插入的位置:

 

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    // update存放需要更新的节点
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    //rank记录当前header节点到update[i]所经历的步长
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    // 第一步,收集需要更新的节点与步长信息
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        // score可以重复,重复时使用ele大小进行排序
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    

查找节点(score=22,level=3)的插入位置,逻辑如下:
1)第一次for循环,i=1。x现在为跳跃表的头节点;
2)现在i的值与zsl->level-1相等,所以rank[1]的值为0;
3)header->level[1].forward存在,并且header->level[1].forward->score(1)小于要插入值的score,所以while循环可以进入,rank[1]=1,x赋值为第一个节点;
4)第一个节点的第1层的forward指向NULL,所以while循环不会再进入,经过第一次for循环,rank[1]=1,x和update[1]都为第一个节点(score=1)。
如下图 :

5)for循环进入第二次,i=0。x为跳跃表第一个节点(score=1)
6)现在i的值与zsl->level-1不相等,所以rank[0]等于rank[1]的值赋值为1;
6)x->level[0]->forward存在,并且x->level[0].foreard->score(13)小于要插入的score,所以while循环可以进入,rank[0]=2,x为第二个节点(score=13)。
7)x->level[0]->forward存在,并且x->level[0].foreard->score(45)大于要插入的score,所以while不会再进入,经过第二次for循环,rank[0]=2,x和update[0]都为第二个节点(score=13),如下图:

 2. 调整跳表高度

    level = zslRandomLevel();
    //更新
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }

  第二步, 获取随机层高,加入随机的层高是3,如下图: 

3.插入节点

 第三步,创建并分层插入节点,同时更新同层前一节点步长信息

x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    level的值为3,所以for循环可以执行三遍,插入过程如下:
    for循环第一遍:
    1)x的level[0]的forward为update[0]的level[0]的forward节点,即x->level[0].forward为score=45的节点;
    2)update[0]的level[0]的下个节点为新插入的节点
    3)rank[0]-rank[0]的值为0,update[0]->level[0].span=1,所以x->level[0].span=1
    update[0]->level[0].span=1。

for (i = 1; i < level; i++) {
    x->level[i].forward = update[i]->level[i].forward;
    update[i]->level[i].forward = x;

    /* update span covered by update[i] as x is inserted here */
    x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

for循环第二遍:
1)x的level[1]的forward为update[1]的level[1]的forward节点,即x->level[1].forward为NULL;
2)update[1]的level[1]的下个节点为新插入的节点
3)rank[0]-rank[1]的值为1,update[1]->level[1].span等于2,所以x->level[1].span=1
4)update[1]->level[1].span=2。

 

for (i = 2; i < level; i++) {
    x->level[i].forward = update[i]->level[i].forward;
    update[i]->level[i].forward = x;

    /* update span covered by update[i] as x is inserted here */
    x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

 for循环第三遍:
1)x的level[2]的forward为update[2]的level[2]的forward节点,即x->level[2].forward为NULL;
2)update[2]的level[2]的下个节点为新插入的节点;
3)rank[0]-rank[2]的值为2,因为update[2]->level[2].span等于跳跃表的总长度3,所以x->level[2].span=1;
4)update[2]->level[2].span=3。

4.调整 backward

   第四步,更新新增节点未涉及层节点的步长信息,以及跳表相关信息:

5.查找节点

unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) <= 0))) {
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->ele && sdscmp(x->ele,ele) == 0) {
            return rank;
        }
    }
    return 0;
}

1.6 哈希

       哈希对象的编码可以是压缩列表(ziplist)或者字典(hashtable),当哈希对象保存的所有键值对的键和值得长度都小于64字节并且元素数量小于512个时使用ziplist,否则使用hashtable。使用ziplist时,是依次将键和值压入链表之中,两者相邻。使用hashtable是是将键值对存于dictEntry之中。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐