redis数据存储结构

 redis的内部整体的存储结构就是一个大的hashmap,内部实现是数组实现hash,冲突通过挂链去实现,然后每个dictEntry就是一个key/value对象。dictEntry的key指向set key value命令中的key对应的对象,dictEntry的v指向set key value命令中的value对应的对象。

dictEntry 内部包含数据存储的key和v变量,同时包含一个dictEntry的next指针连接落入同一个hash桶的对象。dictEntry当中的key和v的指针指向的是redisObject。

/*
 * 哈希表节点
 */
typedef struct dictEntry {
    
    // 键
    void *key;

    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
    } v;

    // 指向下个哈希表节点,形成链表
    struct dictEntry *next;

} dictEntry;

redisObject是redis server存储最原子数据的数据结构,其中的void *ptr会指向真正的存储数据结构,我们set key value中的key和value其实由ptr指向真正保存的位置。

typedef struct redisObject {

    // 类型
    unsigned type:4;

    // 编码
    unsigned encoding:4;

    // 对象最后一次被访问的时间
    unsigned lru:REDIS_LRU_BITS; /* lru time (relative to server.lruclock) */

    // 引用计数
    int refcount;

    // 指向实际值的指针
    void *ptr;

} robj;

redis string类型转换

 我们可能以为redis在内部存储string都是用sds的数据结构实现的,其实在整个redis的数据存储过程中为了提高性能,内部做了很多优化。字符串对象的内部编码(encoding)有 3 种 :int、raw和 embstr

如果一个字符串对象保存的是整数值,并且这个整数值可以用long类型来表示,那么字符串对象会将整数值保存在字符串对象结构的ptr属性里面(将void*转换成 long),并将字符串对象的编码设置为int

如果字符串对象保存的是一个字符串,并且这个字符申的长度小于等于 32 字节(redis 2.+版本),那么字符串对象将使用一个简单动态字符串(SDS)来保存这个字符串,并将对象的编码设置为embstr, embstr编码是专门用于保存短字符串的一种优化编码方式:

如果字符串对象保存的是一个字符串,并且这个字符串的长度大于 32 字节(redis 2.+版本),那么字符串对象将使用一个简单动态字符串(SDS)来保存这个字符串,并将对象的编码设置为raw

注意,embstr 编码和 raw 编码的边界在 redis 不同版本中是不一样的:

  • redis 2.+ 是 32 字节
  • redis 3.0-4.0 是 39 字节
  • redis 5.0 是 44 字节

可以看到embstrraw编码都会使用SDS来保存值,但不同之处在于embstr会通过一次内存分配函数来分配一块连续的内存空间来保存redisObjectSDS,而raw编码会通过调用两次内存分配函数来分别分配两块空间来保存redisObjectSDS。Redis这样做会有很多好处:

  • 整数,存储字符串长度小于21且能够转化为整数的字符串。

  • EmbeddedString,存储字符串长度小于39的字符串(REDIS_ENCODING_EMBSTR_SIZE_LIMIT)。

  • SDS,剩余情况使用sds进行存储。(即raw)

embstr和sds的区别在于内存的申请和回收

  • embstr的创建只需分配一次内存,而raw为两次(一次为sds分配对象,另一次为redisObject分配对象,embstr省去了第一次)。相对地,释放内存的次数也由两次变为一次。

  • embstr的redisObject和sds放在一起,更好地利用缓存带来的优势

  • 缺点:redis并未提供任何修改embstr的方式,即embstr是只读的形式。对embstr的修改实际上是先转换为raw再进行修改。

string编码转换源码分析

 通过redis 内部的命令映射表我们找到set对应的处理函数为setCommand,相当于这个是处理set命令的入口函数,关注下tryObjectEncoding,内部对其实对Object进行转换。

/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(redisClient *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = REDIS_SET_NO_FLAGS;

    // 设置选项参数
    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
            flags |= REDIS_SET_NX;
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
            flags |= REDIS_SET_XX;
        } else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }

    // 尝试对值对象进行编码
    c->argv[2] = tryObjectEncoding(c->argv[2]);

    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}


 整个尝试编码转换的逻辑过程通过代码的注释应该是比较清楚了,过程如下:

  • 只对长度小于或等于 21 字节,并且可以被解释为整数的字符串进行编码,使用整数存储
  • 尝试将 RAW 编码的字符串编码为 EMBSTR 编码,使用EMBSTR 编码
  • 这个对象没办法进行编码,尝试从 SDS 中移除所有空余空间,使用SDS编码
/* Try to encode a string object in order to save space */
// 尝试对字符串对象进行编码,以节约内存。
robj *tryObjectEncoding(robj *o) {
    long value;

    sds s = o->ptr;
    size_t len;
    redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);

    // 只在字符串的编码为 RAW 或者 EMBSTR 时尝试进行编码
    if (!sdsEncodedObject(o)) return o;

     // 不对共享对象进行编码
     if (o->refcount > 1) return o;

    // 对字符串进行检查
    // 只对长度小于或等于 21 字节,并且可以被解释为整数的字符串进行编码
    len = sdslen(s);
    if (len <= 21 && string2l(s,len,&value)) {
        if (server.maxmemory == 0 &&
            value >= 0 &&
            value < REDIS_SHARED_INTEGERS)
        {
            decrRefCount(o);
            incrRefCount(shared.integers[value]);
            return shared.integers[value];
        } else {
            if (o->encoding == REDIS_ENCODING_RAW) sdsfree(o->ptr);
            o->encoding = REDIS_ENCODING_INT;
            o->ptr = (void*) value;
            return o;
        }
    }

    // 尝试将 RAW 编码的字符串编码为 EMBSTR 编码
    if (len <= REDIS_ENCODING_EMBSTR_SIZE_LIMIT) {
        robj *emb;

        if (o->encoding == REDIS_ENCODING_EMBSTR) return o;
        emb = createEmbeddedStringObject(s,sdslen(s));
        decrRefCount(o);
        return emb;
    }

    // 这个对象没办法进行编码,尝试从 SDS 中移除所有空余空间
    if (o->encoding == REDIS_ENCODING_RAW &&
        sdsavail(s) > len/10)
    {
        o->ptr = sdsRemoveFreeSpace(o->ptr);
    }

    /* Return the original object. */
    return o;
}

redis sds的介绍

 在C语言中,字符串可以用'\0'结尾的char数组标示。这种简单的字符串表示,在大多数情况下都能满足要求,但是不能高效的计算length和append数据。所以Redis自己实现了SDS(简单动态字符串)的抽象类型。
 SDS的数据结构如下,len表示sdshdr中数据的长度,free表示sdshdr中剩余的空间,buf表示实际存储数据的空间。
 sdslen的函数有一个细节需要我们注意,那就是通过(s-(sizeof(struct sdshdr)))来计算偏移量,之所以需要这么计算是因为sds的指针指向的是char buf[]位置,所以我们需要访问sdshdr的首地址的时候需要减去偏移量。

/*
 * 保存字符串对象的结构
 */
struct sdshdr {
    
    // buf 中已占用空间的长度
    int len;

    // buf 中剩余可用空间的长度
    int free;

    // 数据空间
    char buf[];
};

/*
 * 返回 sds 实际保存的字符串的长度
 *
 * T = O(1)
 */
static inline size_t sdslen(const sds s) {
    struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
    return sh->len;
}

/*
 * 返回 sds 可用空间的长度
 *
 * T = O(1)
 */
static inline size_t sdsavail(const sds s) {
    struct sdshdr *sh = (void*)(s-(sizeof(struct sdshdr)));
    return sh->free;
}

sds对象创建

 在创建sds对象的时候,我们上面提到过的涉及两次内存分配的过程,从下面的代码可以看出来:

  • sds对象创建sdsnewlen分配了一次内存。
  • robj对象的创建又分配了一次内存。
    整个sds对象的创建其实就是分配内存并初始化len和free字段。

/* Create a string object with encoding REDIS_ENCODING_RAW, that is a plain
 * string object where o->ptr points to a proper sds string. */
// 创建一个 REDIS_ENCODING_RAW 编码的字符对象
// 对象的指针指向一个 sds 结构
robj *createRawStringObject(char *ptr, size_t len) {
    return createObject(REDIS_STRING, sdsnewlen(ptr,len));
}
/*
 * 根据给定的初始化字符串 init 和字符串长度 initlen
 * 创建一个新的 sds
 *
 * 参数
 *  init :初始化字符串指针
 *  initlen :初始化字符串的长度
 *
 * 返回值
 *  sds :创建成功返回 sdshdr 相对应的 sds
 *        创建失败返回 NULL
 *
 * 复杂度
 *  T = O(N)
 */
sds sdsnewlen(const void *init, size_t initlen) {

    struct sdshdr *sh;

    // 根据是否有初始化内容,选择适当的内存分配方式
    // T = O(N)
    if (init) {
        // zmalloc 不初始化所分配的内存
        sh = zmalloc(sizeof(struct sdshdr)+initlen+1);
    } else {
        // zcalloc 将分配的内存全部初始化为 0
        sh = zcalloc(sizeof(struct sdshdr)+initlen+1);
    }

    // 内存分配失败,返回
    if (sh == NULL) return NULL;

    // 设置初始化长度  
    sh->len = initlen;
    // 新 sds 不预留任何空间
    sh->free = 0;
    // 如果有指定初始化内容,将它们复制到 sdshdr 的 buf 中
    // T = O(N)
    if (initlen && init)
        memcpy(sh->buf, init, initlen);
    // 以 \0 结尾
    sh->buf[initlen] = '\0';

    // 返回 buf 部分,而不是整个 sdshdr
    return (char*)sh->buf;
}
robj *createObject(int type, void *ptr) {

    robj *o = zmalloc(sizeof(*o));

    o->type = type;
    o->encoding = REDIS_ENCODING_RAW;
    o->ptr = ptr;
    o->refcount = 1;

    /* Set the LRU to the current lruclock (minutes resolution). */
    o->lru = LRU_CLOCK();
    return o;
}

sds内存扩容

 当字符串长度小于SDS_MAX_PREALLOC (1024*1024),那么就以2倍的速度扩容,当字符串长度大于SDS_MAX_PREALLOC,那么就以+SDS_MAX_PREALLOC的速度扩容(动态分配内存)。

/*
 * 对 sds 中 buf 的长度进行扩展,确保在函数执行之后,
 * buf 至少会有 addlen + 1 长度的空余空间
 * (额外的 1 字节是为 \0 准备的)
 *
 * 返回值
 *  sds :扩展成功返回扩展后的 sds
 *        扩展失败返回 NULL
 *
 * 复杂度
 *  T = O(N)
 */
sds sdsMakeRoomFor(sds s, size_t addlen) {

    struct sdshdr *sh, *newsh;

    // 获取 s 目前的空余空间长度
    size_t free = sdsavail(s);

    size_t len, newlen;

    // s 目前的空余空间已经足够,无须再进行扩展,直接返回
    if (free >= addlen) return s;

    // 获取 s 目前已占用空间的长度
    len = sdslen(s);
    sh = (void*) (s-(sizeof(struct sdshdr)));

    // s 最少需要的长度
    newlen = (len+addlen);

    // 根据新长度,为 s 分配新空间所需的大小
    if (newlen < SDS_MAX_PREALLOC)
        // 如果新长度小于 SDS_MAX_PREALLOC 
        // 那么为它分配两倍于所需长度的空间
        newlen *= 2;
    else
        // 否则,分配长度为目前长度加上 SDS_MAX_PREALLOC
        newlen += SDS_MAX_PREALLOC;
    // T = O(N)
    newsh = zrealloc(sh, sizeof(struct sdshdr)+newlen+1);

    // 内存不足,分配失败,返回
    if (newsh == NULL) return NULL;

    // 更新 sds 的空余长度
    newsh->free = newlen - len;

    // 返回 sds
    return newsh->buf;
}

sds内存缩容

 释放内存的过程中修改len和free字段,并不释放实际占用内存(惰性缩容。不释放空间,留给到期释放等机制释放。)。

/*
 * 在不释放 SDS 的字符串空间的情况下,
 * 重置 SDS 所保存的字符串为空字符串。
 *
 * 复杂度
 *  T = O(1)
 */
void sdsclear(sds s) {

    // 取出 sdshdr
    struct sdshdr *sh = (void*) (s-(sizeof(struct sdshdr)));

    // 重新计算属性
    sh->free += sh->len;
    sh->len = 0;

    // 将结束符放到最前面(相当于惰性地删除 buf中的内容)
    sh->buf[0] = '\0';
}

作者:晴天哥_374
链接:https://www.jianshu.com/p/160fb0f73841
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。]

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐