zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Redis源码之——hashtable和rehash

Redis源码 hashtable
2023-09-27 14:25:42 时间

码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071

概述

       在Redis中,整个数据库所有的key-value、hash的key-value、zset中的value-score、set的key-NULL都是通过哈希结构实现的。哈希类型的内部编码有两种,当哈希类型元素个数少于hash-max-ziplist-entries配置(默认512),同时所有值都小于hash-max-ziplist-value配置(默认64字节)时,redis会使用ziplist作为哈希的内部实现,否则会用hashtable实现,因为如果元素和字段过大时,ziplist的结构会因为频繁重新分配内存导致读写效率下降。

字典内部结构

       hashtable结构为一维数组分桶,二维链地址链表解决hash冲突。一维数组中存储的是二维链表的第一个元素指针。

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;


typedef struct dictType {
    uint64_t (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;


/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;


typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

渐进式rehash

  • 字典内部包含两个hashtable,通常情况下只有一个hashtable是有值的,在字典扩容时需要分配新的hahstable,然后进行渐进式迁移。
  • 在迁移过程中的删除、查找、更新操作,会先在旧的hashtable中查找,其次去新的hashtable中查找,而新增操作只会添加到新的hashtable中,这样保证了旧的hashtable只减不增。
  • 渐进式哈希需要重新申请新的数组,然后将旧字典所有链表中的元素重新挂接到新的数组下面,这是一个O(n)的操作,单线程Redis很难承受,所以使用渐进式rehash小步搬迁。
  • 搬迁操作通过调用 #define dictIsRehashing(d) ((d)->rehashidx != -1)宏定义,触发在当前字典的后续hset、hdel等指令中,当客户端闲下来时,redis还会在定时任务中对字典进行搬迁。
  • 在这个过程中两个hashtable分别存储旧数据和新数据,待搬迁结束后,删除旧的,新的取代旧的,新的大小为旧的2倍。
/* add时的小步搬迁 */
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;
    
    //这里进行小步搬迁
    if( dictIsRehashing(d) )  _dictRehashStep(d);
    
    //如果已存在
    if( (index = _dictKeyIndex(d, key, dictHashKey(d, key), existing)) == -1 )
        return NULL;

    //如果字典处于搬迁中,要将新的元素挂接到新的数组下面
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    //设置entry值
    dictSetKey(d, entry, key);

    return entry;
}
static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}
int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}

我们可以看到,每次对hash操作时,都会通过_dictRehashStep操作进行一步rehash,而每一次调用rehash会在检测到10个空bucket后退出。并且每一次rehash操作都是以bucket为单位的,将新的hash结构的下一个指针指向旧的hash的当前不为空的bucket。

/* 服务器定时任务 */
void databasesCron()
{
    ...
    if( server.activerehashing ){
        for(j = 0; j < dbs_per_call; j++){
            int work_done = incrementallyRehash(rehash_db);
            if(work_done)
                break
            }else{
                rehash_db++;
                rehash_db %= server.dbnum;
            }
        }
    }   
}
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }
    return 0;
}
/* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;

    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

从上边我们可以看出,在redis的每个数据库的定时任务中会出发rehash操作,每次rehahs操作会触发1ms的rehash操作,也就是所即便是定时任务的rehash操作,也只会最多阻塞redis其他命令1ms。而在服务器定时任务的rehash调用时,我们发现第二个参数n传的值为100,也就是说数据库定时任务触发的rehash操作,会允许跳过1000个空的bucket。

查找过程

function get(key)
{
    let index = hash_func(key) %size;
    let entry = table[index];
    while(entry != NULL){
        if(entry.key == target){
            return entry.value;
        }
    }   
}

扩容条件

1、当hash表的负载因子(总键值对数 / 槽的个数)大于1时,就会开始扩容,扩容的新数组是原数组大小的2倍。

2、如果redis正在做bgsave,为了减少内存页的过多分离(Copy On Write),redis尽量不去扩容,但是如果hash表已经非常满了,元素的个数已经达到了一维数组长度的5倍,则会强制扩容。

缩容条件

元素个数低于数组长度的10%,缩容不用考虑redis是否正在做bgsave(不需要做迁移)