From 4145d628b4417310706357a9eb8a13d813f5d712 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Fri, 27 Oct 2023 12:05:40 -0700 Subject: [PATCH] Reduce dbBuckets operation time complexity from O(N) to O(1) (#12697) As part of #11695 independent dictionaries were introduced per slot. Time complexity to discover total no. of buckets across all dictionaries increased to O(N) with straightforward implementation of iterating over all dictionaries and adding dictBuckets of each. To optimize the time complexity, we could maintain a global counter at db level to keep track of the count of buckets and update it on the start and end of rehashing. --------- Co-authored-by: Roshan Khatri --- src/db.c | 24 +++++++++++++----------- src/dict.c | 18 ++++++++++++++++++ src/dict.h | 5 +++++ src/server.c | 50 ++++++++++++++++++++++++++++++++++++++++++++------ src/server.h | 3 ++- 5 files changed, 82 insertions(+), 18 deletions(-) diff --git a/src/db.c b/src/db.c index fac2eb82c..02ed6578c 100644 --- a/src/db.c +++ b/src/db.c @@ -65,7 +65,7 @@ dict *dbGetDictFromIterator(dbIterator *dbit) { else if (dbit->keyType == DB_EXPIRES) return dbit->db->expires[dbit->slot]; else - serverAssert(0); + serverPanic("Unknown keyType"); } /* Returns next dictionary from the iterator, or NULL if iteration is complete. */ @@ -503,7 +503,7 @@ static inline unsigned long dictSizebySlot(redisDb *db, int slot, dbKeyType keyT else if (keyType == DB_EXPIRES) return dictSize(db->expires[slot]); else - serverAssert(0); + serverPanic("Unknown keyType"); } /* Finds a slot containing target element in a key space ordered by slot id. @@ -1390,14 +1390,16 @@ unsigned long long int dbSize(redisDb *db, dbKeyType keyType) { /* This method proivdes the cumulative sum of all the dictionary buckets * across dictionaries in a database. */ unsigned long dbBuckets(redisDb *db, dbKeyType keyType) { - unsigned long buckets = 0; - dict *d; - dbIterator *dbit = dbIteratorInit(db, keyType); - while ((d = dbIteratorNextDict(dbit))) { - buckets += dictBuckets(d); + if (server.cluster_enabled) { + return db->sub_dict[keyType].bucket_count; + } else { + if (keyType == DB_MAIN) + return dictBuckets(db->dict[0]); + else if (keyType == DB_EXPIRES) + return dictBuckets(db->expires[0]); + else + serverPanic("Unknown keyType"); } - zfree(dbit); - return buckets; } size_t dbMemUsage(redisDb *db, dbKeyType keyType) { @@ -1419,7 +1421,7 @@ dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType){ else if (keyType == DB_EXPIRES) return dictFind(db->expires[slot], key); else - serverAssert(0); + serverPanic("Unknown keyType"); } /* @@ -1444,7 +1446,7 @@ unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long v, else if (keyType == DB_EXPIRES) d = db->expires[slot]; else - serverAssert(0); + serverPanic("Unknown keyType"); int is_dict_valid = (dictScanValidFunction == NULL || dictScanValidFunction(d) == C_OK); if (is_dict_valid) { diff --git a/src/dict.c b/src/dict.c index e5214ae37..cc7ea1b53 100644 --- a/src/dict.c +++ b/src/dict.c @@ -259,6 +259,8 @@ int _dictExpand(dict *d, unsigned long size, int* malloc_failed) /* Is this the first initialization? If so it's not really a rehashing * we just set the first hash table so that it can accept keys. */ if (d->ht_table[0] == NULL) { + if (d->type->rehashingStarted) d->type->rehashingStarted(d); + if (d->type->rehashingCompleted) d->type->rehashingCompleted(d); d->ht_size_exp[0] = new_ht_size_exp; d->ht_used[0] = new_ht_used; d->ht_table[0] = new_ht_table; @@ -369,6 +371,7 @@ int dictRehash(dict *d, int n) { /* Check if we already rehashed the whole table... */ if (d->ht_used[0] == 0) { + if (d->type->rehashingCompleted) d->type->rehashingCompleted(d); zfree(d->ht_table[0]); /* Copy the new ht onto the old one */ d->ht_table[0] = d->ht_table[1]; @@ -1504,6 +1507,21 @@ dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) return NULL; } +/* Provides the old and new ht size for a given dictionary during rehashing. This method + * should only be invoked during initialization/rehashing. */ +void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size) { + /* Expansion during initialization. */ + if (d->ht_size_exp[0] == -1) { + *from_size = DICTHT_SIZE(d->ht_size_exp[0]); + *to_size = DICTHT_SIZE(DICT_HT_INITIAL_EXP); + return; + } + /* Invalid method usage if rehashing isn't ongoing. */ + assert(dictIsRehashing(d)); + *from_size = DICTHT_SIZE(d->ht_size_exp[0]); + *to_size = DICTHT_SIZE(d->ht_size_exp[1]); +} + /* ------------------------------- Debugging ---------------------------------*/ #define DICT_STATS_VECTLEN 50 void dictFreeStats(dictStats *stats) { diff --git a/src/dict.h b/src/dict.h index c016598fa..5b7daf915 100644 --- a/src/dict.h +++ b/src/dict.h @@ -55,7 +55,11 @@ typedef struct dictType { void (*keyDestructor)(dict *d, void *key); void (*valDestructor)(dict *d, void *obj); int (*expandAllowed)(size_t moreMem, double usedRatio); + /* Invoked at the start of dict initialization/rehashing (old and new ht are already created) */ void (*rehashingStarted)(dict *d); + /* Invoked at the end of dict initialization/rehashing of all the entries from old to new ht. Both ht still exists + * and are cleaned up after this callback. */ + void (*rehashingCompleted)(dict *d); /* Flags */ /* The 'no_value' flag, if set, indicates that values are not used, i.e. the * dict is a set. When this flag is set, it's not possible to access the @@ -218,6 +222,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata); uint64_t dictGetHash(dict *d, const void *key); dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); +void dictRehashingInfo(dict *d, unsigned long long *from, unsigned long long *to); size_t dictGetStatsMsg(char *buf, size_t bufsize, dictStats *stats, int full); dictStats* dictGetStatsHt(dict *d, int htidx, int full); diff --git a/src/server.c b/src/server.c index 912b5e148..3743fc09e 100644 --- a/src/server.c +++ b/src/server.c @@ -400,17 +400,52 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) { } } -/* Adds dictionary to the rehashing list in cluster mode, which allows us +/* Updates the bucket count in cluster-mode for the given dictionary in a DB. bucket count + * incremented with the new ht size during the rehashing phase. + * And also adds dictionary to the rehashing list in cluster mode, which allows us * to quickly find rehash targets during incremental rehashing. - * In non-cluster mode, we don't need this list as there is only one dictionary per DB. */ + * + * In non-cluster mode, bucket count can be retrieved directly from single dict bucket and + * we don't need this list as there is only one dictionary per DB. */ void dictRehashingStarted(dict *d) { - if (!server.cluster_enabled || !server.activerehashing) return; - listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d); + if (!server.cluster_enabled) return; + + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + server.db[0].sub_dict[DB_MAIN].bucket_count += to; /* Started rehashing (Add the new ht size) */ + if (from == 0) return; /* No entries are to be moved. */ + if (server.activerehashing) { + listAddNodeTail(server.db[0].sub_dict[DB_MAIN].rehashing, d); + } +} + +/* Updates the bucket count for the given dictionary in a DB. It removes + * the old ht size of the dictionary from the total sum of buckets for a DB. */ +void dictRehashingCompleted(dict *d) { + if (!server.cluster_enabled) return; + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + server.db[0].sub_dict[DB_MAIN].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ } void dictRehashingStartedForExpires(dict *d) { - if (!server.cluster_enabled || !server.activerehashing) return; - listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d); + if (!server.cluster_enabled) return; + + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + server.db[0].sub_dict[DB_EXPIRES].bucket_count += to; /* Started rehashing (Add the new ht size) */ + if (from == 0) return; /* No entries are to be moved. */ + if (server.activerehashing) { + listAddNodeTail(server.db[0].sub_dict[DB_EXPIRES].rehashing, d); + } +} + +void dictRehashingCompletedForExpires(dict *d) { + if (!server.cluster_enabled) return; + + unsigned long long from, to; + dictRehashingInfo(d, &from, &to); + server.db[0].sub_dict[DB_EXPIRES].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */ } /* Generic hash table type where keys are Redis Objects, Values @@ -469,6 +504,7 @@ dictType dbDictType = { dictObjectDestructor, /* val destructor */ dictExpandAllowed, /* allow to expand */ dictRehashingStarted, + dictRehashingCompleted, }; /* Db->expires */ @@ -481,6 +517,7 @@ dictType dbExpiresDictType = { NULL, /* val destructor */ dictExpandAllowed, /* allow to expand */ dictRehashingStartedForExpires, + dictRehashingCompletedForExpires, }; /* Command table. sds string -> command struct pointer. */ @@ -2603,6 +2640,7 @@ void initDbState(redisDb *db){ db->sub_dict[subdict].key_count = 0; db->sub_dict[subdict].resize_cursor = 0; db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL; + db->sub_dict[subdict].bucket_count = 0; } } diff --git a/src/server.h b/src/server.h index 852a45c00..5ededdf3f 100644 --- a/src/server.h +++ b/src/server.h @@ -962,8 +962,9 @@ typedef struct replBufBlock { typedef struct dbDictState { list *rehashing; /* List of dictionaries in this DB that are currently rehashing. */ - int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries. */ + int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used for cluster-enabled). */ unsigned long long key_count; /* Total number of keys in this DB. */ + unsigned long long bucket_count; /* Total number of buckets in this DB across dictionaries (only used for cluster-enabled). */ unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */ } dbDictState;