diff --git a/src/db.c b/src/db.c index fac2eb82c..02ed6578c 100644 --- a/src/db.c +++ b/src/db.c @@ -65,7 +65,7 @@ dict *dbGetDictFromIterator(dbIterator *dbit) { else if (dbit->keyType == DB_EXPIRES) return dbit->db->expires[dbit->slot]; else - serverAssert(0); + serverPanic("Unknown keyType"); } /* Returns next dictionary from the iterator, or NULL if iteration is complete. */ @@ -503,7 +503,7 @@ static inline unsigned long dictSizebySlot(redisDb *db, int slot, dbKeyType keyT else if (keyType == DB_EXPIRES) return dictSize(db->expires[slot]); else - serverAssert(0); + serverPanic("Unknown keyType"); } /* Finds a slot containing target element in a key space ordered by slot id. @@ -1390,14 +1390,16 @@ unsigned long long int dbSize(redisDb *db, dbKeyType keyType) { /* This method proivdes the cumulative sum of all the dictionary buckets * across dictionaries in a database. */ unsigned long dbBuckets(redisDb *db, dbKeyType keyType) { - unsigned long buckets = 0; - dict *d; - dbIterator *dbit = dbIteratorInit(db, keyType); - while ((d = dbIteratorNextDict(dbit))) { - buckets += dictBuckets(d); + if (server.cluster_enabled) { + return db->sub_dict[keyType].bucket_count; + } else { + if (keyType == DB_MAIN) + return dictBuckets(db->dict[0]); + else if (keyType == DB_EXPIRES) + return dictBuckets(db->expires[0]); + else + serverPanic("Unknown keyType"); } - zfree(dbit); - return buckets; } size_t dbMemUsage(redisDb *db, dbKeyType keyType) { @@ -1419,7 +1421,7 @@ dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType){ else if (keyType == DB_EXPIRES) return dictFind(db->expires[slot], key); else - serverAssert(0); + serverPanic("Unknown keyType"); } /* @@ -1444,7 +1446,7 @@ unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long v, else if (keyType == DB_EXPIRES) d = db->expires[slot]; else - serverAssert(0); + serverPanic("Unknown keyType"); int is_dict_valid = (dictScanValidFunction == NULL || dictScanValidFunction(d) == C_OK); if (is_dict_valid) { diff --git a/src/dict.c b/src/dict.c index e5214ae37..cc7ea1b53 100644 --- a/src/dict.c +++ b/src/dict.c @@ -259,6 +259,8 @@ int _dictExpand(dict *d, unsigned long size, int* malloc_failed) /* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ if (d->ht_table[0] == NULL) { + if (d->type->rehashingStarted) d->type->rehashingStarted(d); + if (d->type->rehashingCompleted) d->type->rehashingCompleted(d); d->ht_size_exp[0] = new_ht_size_exp; d->ht_used[0] = new_ht_used; d->ht_table[0] = new_ht_table; @@ -369,6 +371,7 @@ int dictRehash(dict *d, int n) { /* Check if we already rehashed the whole table... */ if (d->ht_used[0] == 0) { + if (d->type->rehashingCompleted) d->type->rehashingCompleted(d); zfree(d->ht_table[0]); /* Copy the new ht onto the old one */ d->ht_table[0] = d->ht_table[1]; @@ -1504,6 +1507,21 @@ dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) return NULL; } +/* Provides the old and new ht size for a given dictionary during rehashing. This method + * should only be invoked during initialization/rehashing. */ +void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size) { + /* Expansion during initialization. */ + if (d->ht_size_exp[0] == -1) { + *from_size = DICTHT_SIZE(d->ht_size_exp[0]); + *to_size = DICTHT_SIZE(DICT_HT_INITIAL_EXP); + return; + } + /* Invalid method usage if rehashing isn't ongoing. */ + assert(dictIsRehashing(d)); + *from_size = DICTHT_SIZE(d->ht_size_exp[0]); + *to_size = DICTHT_SIZE(d->ht_size_exp[1]); +} + /* ------------------------------- Debugging ---------------------------------*/ #define DICT_STATS_VECTLEN 50 void dictFreeStats(dictStats *stats) { diff --git a/src/dict.h b/src/dict.h index c016598fa..5b7daf915 100644 --- a/src/dict.h +++ b/src/dict.h @@ -55,7 +55,11 @@ typedef struct dictType { void (*keyDestructor)(dict *d, void *key); void (*valDestructor)(dict *d, void *obj); int (*expandAllowed)(size_t moreMem, double usedRatio); + /* Invoked at the start of dict initialization/rehashing (old and new ht are already created) */ void (*rehashingStarted)(dict *d); + /* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists + * and are cleaned up after this callback. */ + void (*rehashingCompleted)(dict *d); /* Flags */ /* The 'no_value' flag, if set, indicates that values are not used, i.e. the * dict is a set. When this flag is set, it's not possible to access the @@ -218,6 +222,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); uint64_t dictGetHash(dict *d, const void *key); dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); +void dictRehashingInfo(dict *d, unsigned long long *from, unsigned long long *to); size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full); dictStats* dictGetStatsHt(dict *d, int htidx, int full); diff --git a/src/server.c b/src/server.c index 912b5e148..3743fc09e 100644 --- a/src/server.c +++ b/src/server.c @@ -400,17 +400,52 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) { } } -/* Adds dictionary to the rehashing list in cluster mode, which allows us +/* Updates the bucket count in cluster-mode for the given dictionary in a DB. bucket count + * incremented with the new ht size during the rehashing phase. + * And also adds dictionary to the rehashing list in cluster mode, which allows us * to quickly find rehash targets during incremental rehashing. - * In non-cluster mode, we don't need this list as there is only one dictionary per DB. */ + * + * In non-cluster mode, bucket count can be retrieved directly from single dict bucket and + * we don't need this list as there is only one dictionary per DB. */ void dictRehashingStarted(dict *d) { - if (!server.cluster_enabled || !server.activerehashing) return; - listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d); + if (!server.cluster_enabled) return; + + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + server.db[0].sub_dict[DB_MAIN].bucket_count += to; /* Started rehashing (Add the new ht size) */ + if (from == 0) return; /* No entries are to be moved. */ + if (server.activerehashing) { + listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d); + } +} + +/* Updates the bucket count for the given dictionary in a DB. It removes + * the old ht size of the dictionary from the total sum of buckets for a DB. */ +void dictRehashingCompleted(dict *d) { + if (!server.cluster_enabled) return; + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + server.db[0].sub_dict[DB_MAIN].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ } void dictRehashingStartedForExpires(dict *d) { - if (!server.cluster_enabled || !server.activerehashing) return; - listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d); + if (!server.cluster_enabled) return; + + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + server.db[0].sub_dict[DB_EXPIRES].bucket_count += to; /* Started rehashing (Add the new ht size) */ + if (from == 0) return; /* No entries are to be moved. */ + if (server.activerehashing) { + listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d); + } +} + +void dictRehashingCompletedForExpires(dict *d) { + if (!server.cluster_enabled) return; + + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + server.db[0].sub_dict[DB_EXPIRES].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ } /* Generic hash table type where keys are Redis Objects, Values @@ -469,6 +504,7 @@ dictType dbDictType = { dictObjectDestructor, /* val destructor */ dictExpandAllowed, /* allow to expand */ dictRehashingStarted, + dictRehashingCompleted, }; /* Db->expires */ @@ -481,6 +517,7 @@ dictType dbExpiresDictType = { NULL, /* val destructor */ dictExpandAllowed, /* allow to expand */ dictRehashingStartedForExpires, + dictRehashingCompletedForExpires, }; /* Command table. sds string -> command struct pointer. */ @@ -2603,6 +2640,7 @@ void initDbState(redisDb *db){ db->sub_dict[subdict].key_count = 0; db->sub_dict[subdict].resize_cursor = 0; db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL; + db->sub_dict[subdict].bucket_count = 0; } } diff --git a/src/server.h b/src/server.h index 852a45c00..5ededdf3f 100644 --- a/src/server.h +++ b/src/server.h @@ -962,8 +962,9 @@ typedef struct replBufBlock { typedef struct dbDictState { list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */ - int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries. */ + int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used for cluster-enabled). */ unsigned long long key_count; /* Total number of keys in this DB. */ + unsigned long long bucket_count; /* Total number of buckets in this DB across dictionaries (only used for cluster-enabled). */ unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */ } dbDictState;