Redis源码之——hashtable和rehash
码字不易,转载请附原链,搬砖繁忙回复不及时见谅,技术交流请加QQ群:909211071
概述
在Redis中,整个数据库所有的key-value、hash的key-value、zset中的value-score、set的key-NULL都是通过哈希结构实现的。哈希类型的内部编码有两种,当哈希类型元素个数少于hash-max-ziplist-entries配置(默认512),同时所有值都小于hash-max-ziplist-value配置(默认64字节)时,redis会使用ziplist作为哈希的内部实现,否则会用hashtable实现,因为如果元素和字段过大时,ziplist的结构会因为频繁重新分配内存导致读写效率下降。
字典内部结构
hashtable结构为一维数组分桶,二维链地址链表解决hash冲突。一维数组中存储的是二维链表的第一个元素指针。
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;
渐进式rehash
- 字典内部包含两个hashtable,通常情况下只有一个hashtable是有值的,在字典扩容时需要分配新的hahstable,然后进行渐进式迁移。
- 在迁移过程中的删除、查找、更新操作,会先在旧的hashtable中查找,其次去新的hashtable中查找,而新增操作只会添加到新的hashtable中,这样保证了旧的hashtable只减不增。
- 渐进式哈希需要重新申请新的数组,然后将旧字典所有链表中的元素重新挂接到新的数组下面,这是一个O(n)的操作,单线程Redis很难承受,所以使用渐进式rehash小步搬迁。
- 搬迁操作通过调用 #define dictIsRehashing(d) ((d)->rehashidx != -1)宏定义,触发在当前字典的后续hset、hdel等指令中,当客户端闲下来时,redis还会在定时任务中对字典进行搬迁。
- 在这个过程中两个hashtable分别存储旧数据和新数据,待搬迁结束后,删除旧的,新的取代旧的,新的大小为旧的2倍。
/* add时的小步搬迁 */
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
//这里进行小步搬迁
if( dictIsRehashing(d) ) _dictRehashStep(d);
//如果已存在
if( (index = _dictKeyIndex(d, key, dictHashKey(d, key), existing)) == -1 )
return NULL;
//如果字典处于搬迁中,要将新的元素挂接到新的数组下面
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
//设置entry值
dictSetKey(d, entry, key);
return entry;
}
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
我们可以看到,每次对hash操作时,都会通过_dictRehashStep操作进行一步rehash,而每一次调用rehash会在检测到10个空bucket后退出。并且每一次rehash操作都是以bucket为单位的,将新的hash结构的下一个指针指向旧的hash的当前不为空的bucket。
/* 服务器定时任务 */
void databasesCron()
{
...
if( server.activerehashing ){
for(j = 0; j < dbs_per_call; j++){
int work_done = incrementallyRehash(rehash_db);
if(work_done)
break
}else{
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
/* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;
while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}
从上边我们可以看出,在redis的每个数据库的定时任务中会出发rehash操作,每次rehahs操作会触发1ms的rehash操作,也就是所即便是定时任务的rehash操作,也只会最多阻塞redis其他命令1ms。而在服务器定时任务的rehash调用时,我们发现第二个参数n传的值为100,也就是说数据库定时任务触发的rehash操作,会允许跳过1000个空的bucket。
查找过程
function get(key)
{
let index = hash_func(key) %size;
let entry = table[index];
while(entry != NULL){
if(entry.key == target){
return entry.value;
}
}
}
扩容条件
1、当hash表的负载因子(总键值对数 / 槽的个数)大于1时,就会开始扩容,扩容的新数组是原数组大小的2倍。
2、如果redis正在做bgsave,为了减少内存页的过多分离(Copy On Write),redis尽量不去扩容,但是如果hash表已经非常满了,元素的个数已经达到了一维数组长度的5倍,则会强制扩容。
缩容条件
元素个数低于数组长度的10%,缩容不用考虑redis是否正在做bgsave(不需要做迁移)