diff --git a/src/cluster.c b/src/cluster.c index 71c45d742..82f3e36a8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -93,9 +93,9 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen); /* Links to the next and previous entries for keys in the same slot are stored * in the dict entry metadata. See Slot to Key API below. */ #define dictEntryNextInSlot(de) \ - (((clusterDictEntryMetadata *)dictMetadata(de))->next) + (((clusterDictEntryMetadata *)dictEntryMetadata(de))->next) #define dictEntryPrevInSlot(de) \ - (((clusterDictEntryMetadata *)dictMetadata(de))->prev) + (((clusterDictEntryMetadata *)dictEntryMetadata(de))->prev) #define RCVBUF_INIT_LEN 1024 #define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */ @@ -7288,7 +7288,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { * understand if we have keys for a given hash slot. */ void slotToKeyAddEntry(dictEntry *entry, redisDb *db) { - sds key = entry->key; + sds key = dictGetKey(entry); unsigned int hashslot = keyHashSlot(key, sdslen(key)); slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; slot_to_keys->count++; @@ -7305,7 +7305,7 @@ void slotToKeyAddEntry(dictEntry *entry, redisDb *db) { } void slotToKeyDelEntry(dictEntry *entry, redisDb *db) { - sds key = entry->key; + sds key = dictGetKey(entry); unsigned int hashslot = keyHashSlot(key, sdslen(key)); slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; slot_to_keys->count--; @@ -7327,7 +7327,7 @@ void slotToKeyDelEntry(dictEntry *entry, redisDb *db) { /* Updates neighbour entries when an entry has been replaced (e.g. reallocated * during active defrag). */ -void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) { +void slotToKeyReplaceEntry(dict *d, dictEntry *entry) { dictEntry *next = dictEntryNextInSlot(entry); dictEntry *prev = dictEntryPrevInSlot(entry); if (next != NULL) { @@ -7337,8 +7337,10 @@ void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) { dictEntryNextInSlot(prev) = entry; } else { /* The replaced entry was the first in the list. */ - sds key = entry->key; + sds key = dictGetKey(entry); unsigned int hashslot = keyHashSlot(key, sdslen(key)); + clusterDictMetadata *dictmeta = dictMetadata(d); + redisDb *db = dictmeta->db; slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot]; slot_to_keys->head = entry; } @@ -7347,6 +7349,8 @@ void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) { /* Initialize slots-keys map of given db. */ void slotToKeyInit(redisDb *db) { db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping)); + clusterDictMetadata *dictmeta = dictMetadata(db->dict); + dictmeta->db = db; } /* Empty slots-keys map of given db. */ diff --git a/src/cluster.h b/src/cluster.h index 4c93dbc8d..b76a9a7cd 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -169,6 +169,9 @@ typedef struct clusterDictEntryMetadata { dictEntry *next; /* Next entry with key in the same slot */ } clusterDictEntryMetadata; +typedef struct { + redisDb *db; /* A link back to the db this dict belongs to */ +} clusterDictMetadata; typedef struct clusterState { clusterNode *myself; /* This node */ @@ -409,7 +412,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded); unsigned int keyHashSlot(char *key, int keylen); void slotToKeyAddEntry(dictEntry *entry, redisDb *db); void slotToKeyDelEntry(dictEntry *entry, redisDb *db); -void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db); +void slotToKeyReplaceEntry(dict *d, dictEntry *entry); void slotToKeyInit(redisDb *db); void slotToKeyFlush(redisDb *db); void slotToKeyDestroy(redisDb *db); diff --git a/src/config.c b/src/config.c index 90baed9ee..9de4ecd41 100644 --- a/src/config.c +++ b/src/config.c @@ -989,8 +989,8 @@ void configGetCommand(client *c) { /* Note that hidden configs require an exact match (not a pattern) */ if (config->flags & HIDDEN_CONFIG) continue; if (dictFind(matches, config->name)) continue; - if (stringmatch(name, de->key, 1)) { - dictAdd(matches, de->key, config); + if (stringmatch(name, dictGetKey(de), 1)) { + dictAdd(matches, dictGetKey(de), config); } } dictReleaseIterator(di); @@ -1000,7 +1000,7 @@ void configGetCommand(client *c) { addReplyMapLen(c, dictSize(matches)); while ((de = dictNext(di)) != NULL) { standardConfig *config = (standardConfig *) dictGetVal(de); - addReplyBulkCString(c, de->key); + addReplyBulkCString(c, dictGetKey(de)); addReplyBulkSds(c, config->interface.get(config)); } dictReleaseIterator(di); @@ -1754,7 +1754,7 @@ int rewriteConfig(char *path, int force_write) { standardConfig *config = dictGetVal(de); /* Only rewrite the primary names */ if (config->flags & ALIAS_CONFIG) continue; - if (config->interface.rewrite) config->interface.rewrite(config, de->key, state); + if (config->interface.rewrite) config->interface.rewrite(config, dictGetKey(de), state); } dictReleaseIterator(di); diff --git a/src/db.c b/src/db.c index aad822630..8511135c6 100644 --- a/src/db.c +++ b/src/db.c @@ -228,7 +228,6 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite) { dictEntry *de = dictFind(db->dict,key->ptr); serverAssertWithInfo(NULL,key,de != NULL); - dictEntry auxentry = *de; robj *old = dictGetVal(de); if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { val->lru = old->lru; @@ -246,17 +245,15 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite) { decrRefCount(old); /* Because of RM_StringDMA, old may be changed, so we need get old again */ old = dictGetVal(de); - /* Entry in auxentry may be changed, so we need update auxentry */ - auxentry = *de; } dictSetVal(db->dict, de, val); if (server.lazyfree_lazy_server_del) { freeObjAsync(key,old,db->id); - dictSetVal(db->dict, &auxentry, NULL); + } else { + /* This is just decrRefCount(old); */ + db->dict->type->valDestructor(db->dict, old); } - - dictFreeVal(db->dict, &auxentry); } /* Replace an existing key with a new value, we just replace value and don't @@ -942,7 +939,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) { privdata[0] = keys; privdata[1] = o; do { - cursor = dictScan(ht, cursor, scanCallback, NULL, privdata); + cursor = dictScan(ht, cursor, scanCallback, privdata); } while (cursor && maxiterations-- && listLength(keys) < (unsigned long)count); diff --git a/src/debug.c b/src/debug.c index c9495e5d8..fe46b5c62 100644 --- a/src/debug.c +++ b/src/debug.c @@ -868,7 +868,7 @@ NULL sds sizes = sdsempty(); sizes = sdscatprintf(sizes,"bits:%d ",(sizeof(void*) == 8)?64:32); sizes = sdscatprintf(sizes,"robj:%d ",(int)sizeof(robj)); - sizes = sdscatprintf(sizes,"dictentry:%d ",(int)sizeof(dictEntry)); + sizes = sdscatprintf(sizes,"dictentry:%d ",(int)dictEntryMemUsage()); sizes = sdscatprintf(sizes,"sdshdr5:%d ",(int)sizeof(struct sdshdr5)); sizes = sdscatprintf(sizes,"sdshdr8:%d ",(int)sizeof(struct sdshdr8)); sizes = sdscatprintf(sizes,"sdshdr16:%d ",(int)sizeof(struct sdshdr16)); diff --git a/src/defrag.c b/src/defrag.c index dbdf2ab62..b813ab74f 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -45,10 +45,6 @@ * pointers are worthwhile moving and which aren't */ int je_get_defrag_hint(void* ptr); -/* forward declarations*/ -void defragDictBucketCallback(dict *d, dictEntry **bucketref); -dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); - /* Defrag helper for generic allocations. * * returns NULL in case the allocation wasn't moved. @@ -68,6 +64,7 @@ void* activeDefragAlloc(void *ptr) { newptr = zmalloc_no_tcache(size); memcpy(newptr, ptr, size); zfree_no_tcache(ptr); + server.stat_active_defrag_hits++; return newptr; } @@ -92,7 +89,7 @@ sds activeDefragSds(sds sdsptr) { * returns NULL in case the allocation wasn't moved. * when it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -robj *activeDefragStringOb(robj* ob, long *defragged) { +robj *activeDefragStringOb(robj* ob) { robj *ret = NULL; if (ob->refcount!=1) return NULL; @@ -101,7 +98,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) { if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) { if ((ret = activeDefragAlloc(ob))) { ob = ret; - (*defragged)++; } } @@ -111,7 +107,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) { sds newsds = activeDefragSds((sds)ob->ptr); if (newsds) { ob->ptr = newsds; - (*defragged)++; } } else if (ob->encoding==OBJ_ENCODING_EMBSTR) { /* The sds is embedded in the object allocation, calculate the @@ -119,7 +114,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) { long ofs = (intptr_t)ob->ptr - (intptr_t)ob; if ((ret = activeDefragAlloc(ob))) { ret->ptr = (void*)((intptr_t)ret + ofs); - (*defragged)++; } } else if (ob->encoding!=OBJ_ENCODING_INT) { serverPanic("Unknown string encoding"); @@ -133,68 +127,36 @@ robj *activeDefragStringOb(robj* ob, long *defragged) { * returns NULL in case the allocation wasn't moved. * when it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -luaScript *activeDefragLuaScript(luaScript *script, long *defragged) { +luaScript *activeDefragLuaScript(luaScript *script) { luaScript *ret = NULL; /* try to defrag script struct */ if ((ret = activeDefragAlloc(script))) { script = ret; - (*defragged)++; } /* try to defrag actual script object */ - robj *ob = activeDefragStringOb(script->body, defragged); + robj *ob = activeDefragStringOb(script->body); if (ob) script->body = ob; return ret; } -/* Defrag helper for dictEntries to be used during dict iteration (called on - * each step). Returns a stat of how many pointers were moved. */ -long dictIterDefragEntry(dictIterator *iter) { - /* This function is a little bit dirty since it messes with the internals - * of the dict and it's iterator, but the benefit is that it is very easy - * to use, and require no other changes in the dict. */ - long defragged = 0; - /* Handle the next entry (if there is one), and update the pointer in the - * current entry. */ - if (iter->nextEntry) { - dictEntry *newde = activeDefragAlloc(iter->nextEntry); - if (newde) { - defragged++; - iter->nextEntry = newde; - iter->entry->next = newde; - } - } - /* handle the case of the first entry in the hash bucket. */ - if (iter->d->ht_table[iter->table][iter->index] == iter->entry) { - dictEntry *newde = activeDefragAlloc(iter->entry); - if (newde) { - iter->entry = newde; - iter->d->ht_table[iter->table][iter->index] = newde; - defragged++; - } - } - return defragged; -} - /* Defrag helper for dict main allocations (dict struct, and hash tables). * receives a pointer to the dict* and implicitly updates it when the dict * struct itself was moved. Returns a stat of how many pointers were moved. */ -long dictDefragTables(dict* d) { +void dictDefragTables(dict* d) { dictEntry **newtable; - long defragged = 0; /* handle the first hash table */ newtable = activeDefragAlloc(d->ht_table[0]); if (newtable) - defragged++, d->ht_table[0] = newtable; + d->ht_table[0] = newtable; /* handle the second hash table */ if (d->ht_table[1]) { newtable = activeDefragAlloc(d->ht_table[1]); if (newtable) - defragged++, d->ht_table[1] = newtable; + d->ht_table[1] = newtable; } - return defragged; } /* Internal function used by zslDefrag */ @@ -258,19 +220,16 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) { /* Defrag helper for sorted set. * Defrag a single dict entry key name, and corresponding skiplist struct */ -long activeDefragZsetEntry(zset *zs, dictEntry *de) { +void activeDefragZsetEntry(zset *zs, dictEntry *de) { sds newsds; double* newscore; - long defragged = 0; sds sdsele = dictGetKey(de); if ((newsds = activeDefragSds(sdsele))) - defragged++, de->key = newsds; + dictSetKey(zs->dict, de, newsds); newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds); if (newscore) { dictSetVal(zs->dict, de, newscore); - defragged++; } - return defragged; } #define DEFRAG_SDS_DICT_NO_VAL 0 @@ -279,43 +238,51 @@ long activeDefragZsetEntry(zset *zs, dictEntry *de) { #define DEFRAG_SDS_DICT_VAL_VOID_PTR 3 #define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4 -/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ -long activeDefragSdsDict(dict* d, int val_type) { - dictIterator *di; - dictEntry *de; - long defragged = 0; - di = dictGetIterator(d); - while((de = dictNext(di)) != NULL) { - sds sdsele = dictGetKey(de), newsds; +typedef struct { + dict *dict; + int val_type; +} activeDefragSdsDictData; + +void activeDefragSdsDictCallback(void *privdata, const dictEntry *_de) { + dictEntry *de = (dictEntry*)_de; + activeDefragSdsDictData *data = privdata; + dict *d = data->dict; + int val_type = data->val_type; + sds sdsele = dictGetKey(de), newsds; + if ((newsds = activeDefragSds(sdsele))) + dictSetKey(d, de, newsds); + /* defrag the value */ + if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { + sdsele = dictGetVal(de); if ((newsds = activeDefragSds(sdsele))) - de->key = newsds, defragged++; - /* defrag the value */ - if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { - sdsele = dictGetVal(de); - if ((newsds = activeDefragSds(sdsele))) - de->v.val = newsds, defragged++; - } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { - robj *newele, *ele = dictGetVal(de); - if ((newele = activeDefragStringOb(ele, &defragged))) - de->v.val = newele; - } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { - void *newptr, *ptr = dictGetVal(de); - if ((newptr = activeDefragAlloc(ptr))) - de->v.val = newptr, defragged++; - } else if (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT) { - void *newptr, *ptr = dictGetVal(de); - if ((newptr = activeDefragLuaScript(ptr, &defragged))) - de->v.val = newptr; - } - defragged += dictIterDefragEntry(di); + dictSetVal(d, de, newsds); + } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { + robj *newele, *ele = dictGetVal(de); + if ((newele = activeDefragStringOb(ele))) + dictSetVal(d, de, newele); + } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { + void *newptr, *ptr = dictGetVal(de); + if ((newptr = activeDefragAlloc(ptr))) + dictSetVal(d, de, newptr); + } else if (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT) { + void *newptr, *ptr = dictGetVal(de); + if ((newptr = activeDefragLuaScript(ptr))) + dictSetVal(d, de, newptr); } - dictReleaseIterator(di); - return defragged; +} + +/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ +void activeDefragSdsDict(dict* d, int val_type) { + activeDefragSdsDictData data = {d, val_type}; + unsigned long cursor = 0; + do { + cursor = dictScanDefrag(d, cursor, activeDefragSdsDictCallback, + activeDefragAlloc, &data); + } while (cursor != 0); } /* Defrag a list of ptr, sds or robj string values */ -long activeDefragList(list *l, int val_type) { - long defragged = 0; +void activeDefragList(list *l, int val_type) { listNode *ln, *newln; for (ln = l->head; ln; ln = ln->next) { if ((newln = activeDefragAlloc(ln))) { @@ -328,107 +295,25 @@ long activeDefragList(list *l, int val_type) { else l->tail = newln; ln = newln; - defragged++; } if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { sds newsds, sdsele = ln->value; if ((newsds = activeDefragSds(sdsele))) - ln->value = newsds, defragged++; + ln->value = newsds; } else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { robj *newele, *ele = ln->value; - if ((newele = activeDefragStringOb(ele, &defragged))) + if ((newele = activeDefragStringOb(ele))) ln->value = newele; } else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { void *newptr, *ptr = ln->value; if ((newptr = activeDefragAlloc(ptr))) - ln->value = newptr, defragged++; + ln->value = newptr; } } - return defragged; } -/* Defrag a list of sds values and a dict with the same sds keys */ -long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { - long defragged = 0; - sds newsds, sdsele; - listNode *ln, *newln; - dictIterator *di; - dictEntry *de; - /* Defrag the list and it's sds values */ - for (ln = l->head; ln; ln = ln->next) { - if ((newln = activeDefragAlloc(ln))) { - if (newln->prev) - newln->prev->next = newln; - else - l->head = newln; - if (newln->next) - newln->next->prev = newln; - else - l->tail = newln; - ln = newln; - defragged++; - } - sdsele = ln->value; - if ((newsds = activeDefragSds(sdsele))) { - /* When defragging an sds value, we need to update the dict key */ - uint64_t hash = dictGetHash(d, newsds); - dictEntry **deref = dictFindEntryRefByPtrAndHash(d, sdsele, hash); - if (deref) - (*deref)->key = newsds; - ln->value = newsds; - defragged++; - } - } - - /* Defrag the dict values (keys were already handled) */ - di = dictGetIterator(d); - while((de = dictNext(di)) != NULL) { - if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) { - sds newsds, sdsele = dictGetVal(de); - if ((newsds = activeDefragSds(sdsele))) - de->v.val = newsds, defragged++; - } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) { - robj *newele, *ele = dictGetVal(de); - if ((newele = activeDefragStringOb(ele, &defragged))) - de->v.val = newele; - } else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) { - void *newptr, *ptr = dictGetVal(de); - if ((newptr = activeDefragAlloc(ptr))) - de->v.val = newptr, defragged++; - } - defragged += dictIterDefragEntry(di); - } - dictReleaseIterator(di); - - return defragged; -} - -/* Utility function that replaces an old key pointer in the dictionary with a - * new pointer. Additionally, we try to defrag the dictEntry in that dict. - * Oldkey may be a dead pointer and should not be accessed (we get a - * pre-calculated hash value). Newkey may be null if the key pointer wasn't - * moved. Return value is the dictEntry if found, or NULL if not found. - * NOTE: this is very ugly code, but it let's us avoid the complication of - * doing a scan on another dict. */ -dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) { - dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); - if (deref) { - dictEntry *de = *deref; - dictEntry *newde = activeDefragAlloc(de); - if (newde) { - de = *deref = newde; - (*defragged)++; - } - if (newkey) - de->key = newkey; - return de; - } - return NULL; -} - -long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { +void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { quicklistNode *newnode, *node = *node_ref; - long defragged = 0; unsigned char *newzl; if ((newnode = activeDefragAlloc(node))) { if (newnode->prev) @@ -440,21 +325,17 @@ long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { else ql->tail = newnode; *node_ref = node = newnode; - defragged++; } if ((newzl = activeDefragAlloc(node->entry))) - defragged++, node->entry = newzl; - return defragged; + node->entry = newzl; } -long activeDefragQuickListNodes(quicklist *ql) { +void activeDefragQuickListNodes(quicklist *ql) { quicklistNode *node = ql->head; - long defragged = 0; while (node) { - defragged += activeDefragQuickListNode(ql, &node); + activeDefragQuickListNode(ql, &node); node = node->next; } - return defragged; } /* when the value has lots of elements, we want to handle it later and not as @@ -466,7 +347,7 @@ void defragLater(redisDb *db, dictEntry *kde) { } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { +long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) { quicklist *ql = ob->ptr; quicklistNode *node; long iterations = 0; @@ -489,7 +370,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long (*cursor)++; while (node) { - (*defragged) += activeDefragQuickListNode(ql, &node); + activeDefragQuickListNode(ql, &node); server.stat_active_defrag_scanned++; if (++iterations > 128 && !bookmark_failed) { if (ustime() > endtime) { @@ -511,82 +392,79 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long typedef struct { zset *zs; - long defragged; } scanLaterZsetData; void scanLaterZsetCallback(void *privdata, const dictEntry *_de) { dictEntry *de = (dictEntry*)_de; scanLaterZsetData *data = privdata; - data->defragged += activeDefragZsetEntry(data->zs, de); + activeDefragZsetEntry(data->zs, de); server.stat_active_defrag_scanned++; } -long scanLaterZset(robj *ob, unsigned long *cursor) { +void scanLaterZset(robj *ob, unsigned long *cursor) { if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) - return 0; + return; zset *zs = (zset*)ob->ptr; dict *d = zs->dict; - scanLaterZsetData data = {zs, 0}; - *cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data); - return data.defragged; + scanLaterZsetData data = {zs}; + *cursor = dictScanDefrag(d, *cursor, scanLaterZsetCallback, activeDefragAlloc, &data); } +typedef struct { + dict *dict; +} scanLaterDictData; + void scanLaterSetCallback(void *privdata, const dictEntry *_de) { dictEntry *de = (dictEntry*)_de; - long *defragged = privdata; + scanLaterDictData *data = privdata; sds sdsele = dictGetKey(de), newsds; if ((newsds = activeDefragSds(sdsele))) - (*defragged)++, de->key = newsds; + dictSetKey(data->dict, de, newsds); server.stat_active_defrag_scanned++; } -long scanLaterSet(robj *ob, unsigned long *cursor) { - long defragged = 0; +void scanLaterSet(robj *ob, unsigned long *cursor) { if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT) - return 0; + return; dict *d = ob->ptr; - *cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged); - return defragged; + scanLaterDictData data = {d}; + *cursor = dictScanDefrag(d, *cursor, scanLaterSetCallback, activeDefragAlloc, &data); } void scanLaterHashCallback(void *privdata, const dictEntry *_de) { dictEntry *de = (dictEntry*)_de; - long *defragged = privdata; + scanLaterDictData *data = privdata; sds sdsele = dictGetKey(de), newsds; if ((newsds = activeDefragSds(sdsele))) - (*defragged)++, de->key = newsds; + dictSetKey(data->dict, de, newsds); sdsele = dictGetVal(de); if ((newsds = activeDefragSds(sdsele))) - (*defragged)++, de->v.val = newsds; + dictSetVal(data->dict, de, newsds); server.stat_active_defrag_scanned++; } -long scanLaterHash(robj *ob, unsigned long *cursor) { - long defragged = 0; +void scanLaterHash(robj *ob, unsigned long *cursor) { if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) - return 0; + return; dict *d = ob->ptr; - *cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged); - return defragged; + scanLaterDictData data = {d}; + *cursor = dictScanDefrag(d, *cursor, scanLaterHashCallback, activeDefragAlloc, &data); } -long defragQuicklist(redisDb *db, dictEntry *kde) { +void defragQuicklist(redisDb *db, dictEntry *kde) { robj *ob = dictGetVal(kde); - long defragged = 0; quicklist *ql = ob->ptr, *newql; serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); if ((newql = activeDefragAlloc(ql))) - defragged++, ob->ptr = ql = newql; + ob->ptr = ql = newql; if (ql->len > server.active_defrag_max_scan_fields) defragLater(db, kde); else - defragged += activeDefragQuickListNodes(ql); - return defragged; + activeDefragQuickListNodes(ql); } -long defragZsetSkiplist(redisDb *db, dictEntry *kde) { +void defragZsetSkiplist(redisDb *db, dictEntry *kde) { robj *ob = dictGetVal(kde); - long defragged = 0; zset *zs = (zset*)ob->ptr; zset *newzs; zskiplist *newzsl; @@ -595,30 +473,28 @@ long defragZsetSkiplist(redisDb *db, dictEntry *kde) { struct zskiplistNode *newheader; serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST); if ((newzs = activeDefragAlloc(zs))) - defragged++, ob->ptr = zs = newzs; + ob->ptr = zs = newzs; if ((newzsl = activeDefragAlloc(zs->zsl))) - defragged++, zs->zsl = newzsl; + zs->zsl = newzsl; if ((newheader = activeDefragAlloc(zs->zsl->header))) - defragged++, zs->zsl->header = newheader; + zs->zsl->header = newheader; if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) defragLater(db, kde); else { dictIterator *di = dictGetIterator(zs->dict); while((de = dictNext(di)) != NULL) { - defragged += activeDefragZsetEntry(zs, de); + activeDefragZsetEntry(zs, de); } dictReleaseIterator(di); } /* handle the dict struct */ if ((newdict = activeDefragAlloc(zs->dict))) - defragged++, zs->dict = newdict; + zs->dict = newdict; /* defrag the dict tables */ - defragged += dictDefragTables(zs->dict); - return defragged; + dictDefragTables(zs->dict); } -long defragHash(redisDb *db, dictEntry *kde) { - long defragged = 0; +void defragHash(redisDb *db, dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); @@ -626,17 +502,15 @@ long defragHash(redisDb *db, dictEntry *kde) { if (dictSize(d) > server.active_defrag_max_scan_fields) defragLater(db, kde); else - defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS); + activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS); /* handle the dict struct */ if ((newd = activeDefragAlloc(ob->ptr))) - defragged++, ob->ptr = newd; + ob->ptr = newd; /* defrag the dict tables */ - defragged += dictDefragTables(ob->ptr); - return defragged; + dictDefragTables(ob->ptr); } -long defragSet(redisDb *db, dictEntry *kde) { - long defragged = 0; +void defragSet(redisDb *db, dictEntry *kde) { robj *ob = dictGetVal(kde); dict *d, *newd; serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); @@ -644,13 +518,12 @@ long defragSet(redisDb *db, dictEntry *kde) { if (dictSize(d) > server.active_defrag_max_scan_fields) defragLater(db, kde); else - defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); + activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); /* handle the dict struct */ if ((newd = activeDefragAlloc(ob->ptr))) - defragged++, ob->ptr = newd; + ob->ptr = newd; /* defrag the dict tables */ - defragged += dictDefragTables(ob->ptr); - return defragged; + dictDefragTables(ob->ptr); } /* Defrag callback for radix tree iterator, called for each node, @@ -665,7 +538,7 @@ int defragRaxNode(raxNode **noderef) { } /* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) { +int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) { static unsigned char last[sizeof(streamID)]; raxIterator ri; long iterations = 0; @@ -699,7 +572,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, while (raxNext(&ri)) { void *newdata = activeDefragAlloc(ri.data); if (newdata) - raxSetData(ri.node, ri.data=newdata), (*defragged)++; + raxSetData(ri.node, ri.data=newdata); server.stat_active_defrag_scanned++; if (++iterations > 128) { if (ustime() > endtime) { @@ -717,19 +590,18 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, } /* optional callback used defrag each rax element (not including the element pointer itself) */ -typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged); +typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata); /* defrag radix tree including: * 1) rax struct * 2) rax nodes * 3) rax entry data (only if defrag_data is specified) * 4) call a callback per element, and allow the callback to return a new pointer for the element */ -long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { - long defragged = 0; +void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { raxIterator ri; rax* rax; if ((rax = activeDefragAlloc(*raxref))) - defragged++, *raxref = rax; + *raxref = rax; rax = *raxref; raxStart(&ri,rax); ri.node_cb = defragRaxNode; @@ -738,14 +610,13 @@ long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_c while (raxNext(&ri)) { void *newdata = NULL; if (element_cb) - newdata = element_cb(&ri, element_cb_data, &defragged); + newdata = element_cb(&ri, element_cb_data); if (defrag_data && !newdata) newdata = activeDefragAlloc(ri.data); if (newdata) - raxSetData(ri.node, ri.data=newdata), defragged++; + raxSetData(ri.node, ri.data=newdata); } raxStop(&ri); - return defragged; } typedef struct { @@ -753,8 +624,7 @@ typedef struct { streamConsumer *c; } PendingEntryContext; -void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) { - UNUSED(defragged); +void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) { PendingEntryContext *ctx = privdata; streamNACK *nack = ri->data, *newnack; nack->consumer = ctx->c; /* update nack pointer to consumer */ @@ -764,102 +634,96 @@ void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *de void *prev; raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); serverAssert(prev==nack); - /* note: we don't increment 'defragged' that's done by the caller */ } return newnack; } -void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) { +void* defragStreamConsumer(raxIterator *ri, void *privdata) { streamConsumer *c = ri->data; streamCG *cg = privdata; void *newc = activeDefragAlloc(c); if (newc) { - /* note: we don't increment 'defragged' that's done by the caller */ c = newc; } sds newsds = activeDefragSds(c->name); if (newsds) - (*defragged)++, c->name = newsds; + c->name = newsds; if (c->pel) { PendingEntryContext pel_ctx = {cg, c}; - *defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); + defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); } return newc; /* returns NULL if c was not defragged */ } -void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) { +void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { streamCG *cg = ri->data; UNUSED(privdata); if (cg->consumers) - *defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg); + defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg); if (cg->pel) - *defragged += defragRadixTree(&cg->pel, 0, NULL, NULL); + defragRadixTree(&cg->pel, 0, NULL, NULL); return NULL; } -long defragStream(redisDb *db, dictEntry *kde) { - long defragged = 0; +void defragStream(redisDb *db, dictEntry *kde) { robj *ob = dictGetVal(kde); serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); stream *s = ob->ptr, *news; /* handle the main struct */ if ((news = activeDefragAlloc(s))) - defragged++, ob->ptr = s = news; + ob->ptr = s = news; if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { rax *newrax = activeDefragAlloc(s->rax); if (newrax) - defragged++, s->rax = newrax; + s->rax = newrax; defragLater(db, kde); } else - defragged += defragRadixTree(&s->rax, 1, NULL, NULL); + defragRadixTree(&s->rax, 1, NULL, NULL); if (s->cgroups) - defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL); - return defragged; + defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL); } /* Defrag a module key. This is either done immediately or scheduled * for later. Returns then number of pointers defragged. */ -long defragModule(redisDb *db, dictEntry *kde) { +void defragModule(redisDb *db, dictEntry *kde) { robj *obj = dictGetVal(kde); serverAssert(obj->type == OBJ_MODULE); - long defragged = 0; - if (!moduleDefragValue(dictGetKey(kde), obj, &defragged, db->id)) + if (!moduleDefragValue(dictGetKey(kde), obj, db->id)) defragLater(db, kde); - - return defragged; } /* for each key we scan in the main dict, this function will attempt to defrag * all the various pointers it has. Returns a stat of how many pointers were * moved. */ -long defragKey(redisDb *db, dictEntry *de) { +void defragKey(redisDb *db, dictEntry *de) { sds keysds = dictGetKey(de); robj *newob, *ob; unsigned char *newzl; - long defragged = 0; sds newsds; /* Try to defrag the key name. */ newsds = activeDefragSds(keysds); - if (newsds) - defragged++, de->key = newsds; - if (dictSize(db->expires)) { - /* Dirty code: - * I can't search in db->expires for that key after i already released - * the pointer it holds it won't be able to do the string compare */ - uint64_t hash = dictGetHash(db->dict, de->key); - replaceSatelliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged); + if (newsds) { + dictSetKey(db->dict, de, newsds); + if (dictSize(db->expires)) { + /* We can't search in db->expires for that key after we've released + * the pointer it holds, since it won't be able to do the string + * compare, but we can find the entry using key hash and pointer. */ + uint64_t hash = dictGetHash(db->dict, newsds); + dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires, keysds, hash); + if (expire_de) dictSetKey(db->expires, expire_de, newsds); + } } /* Try to defrag robj and / or string value. */ ob = dictGetVal(de); - if ((newob = activeDefragStringOb(ob, &defragged))) { - de->v.val = newob; + if ((newob = activeDefragStringOb(ob))) { + dictSetVal(db->dict, de, newob); ob = newob; } @@ -867,78 +731,69 @@ long defragKey(redisDb *db, dictEntry *de) { /* Already handled in activeDefragStringOb. */ } else if (ob->type == OBJ_LIST) { if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - defragged += defragQuicklist(db, de); + defragQuicklist(db, de); } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) - defragged++, ob->ptr = newzl; + ob->ptr = newzl; } else { serverPanic("Unknown list encoding"); } } else if (ob->type == OBJ_SET) { if (ob->encoding == OBJ_ENCODING_HT) { - defragged += defragSet(db, de); + defragSet(db, de); } else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) { void *newptr, *ptr = ob->ptr; if ((newptr = activeDefragAlloc(ptr))) - defragged++, ob->ptr = newptr; + ob->ptr = newptr; } else { serverPanic("Unknown set encoding"); } } else if (ob->type == OBJ_ZSET) { if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) - defragged++, ob->ptr = newzl; + ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - defragged += defragZsetSkiplist(db, de); + defragZsetSkiplist(db, de); } else { serverPanic("Unknown sorted set encoding"); } } else if (ob->type == OBJ_HASH) { if (ob->encoding == OBJ_ENCODING_LISTPACK) { if ((newzl = activeDefragAlloc(ob->ptr))) - defragged++, ob->ptr = newzl; + ob->ptr = newzl; } else if (ob->encoding == OBJ_ENCODING_HT) { - defragged += defragHash(db, de); + defragHash(db, de); } else { serverPanic("Unknown hash encoding"); } } else if (ob->type == OBJ_STREAM) { - defragged += defragStream(db, de); + defragStream(db, de); } else if (ob->type == OBJ_MODULE) { - defragged += defragModule(db, de); + defragModule(db, de); } else { serverPanic("Unknown object type"); } - return defragged; } /* Defrag scan callback for the main db dictionary. */ void defragScanCallback(void *privdata, const dictEntry *de) { - long defragged = defragKey((redisDb*)privdata, (dictEntry*)de); - server.stat_active_defrag_hits += defragged; - if(defragged) + long long hits_before = server.stat_active_defrag_hits; + defragKey((redisDb*)privdata, (dictEntry*)de); + if (server.stat_active_defrag_hits != hits_before) server.stat_active_defrag_key_hits++; else server.stat_active_defrag_key_misses++; server.stat_active_defrag_scanned++; } -/* Defrag scan callback for each hash table bucket, - * used in order to defrag the dictEntry allocations. */ -void defragDictBucketCallback(dict *d, dictEntry **bucketref) { - while(*bucketref) { - dictEntry *de = *bucketref, *newde; - if ((newde = activeDefragAlloc(de))) { - *bucketref = newde; - if (server.cluster_enabled && d == server.db[0].dict) { - /* Cluster keyspace dict. Update slot-to-entries mapping. */ - slotToKeyReplaceEntry(newde, server.db); - } - } - bucketref = &(*bucketref)->next; - } +/* Dummy scan callback used when defragging the expire dictionary. We only + * defrag the entries, which is done per bucket. */ +void defragExpireScanCallback(void *privdata, const dictEntry *de) { + UNUSED(privdata); + UNUSED(de); + server.stat_active_defrag_scanned++; } /* Utility function to get the fragmentation ratio from jemalloc. @@ -964,15 +819,13 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) { /* We may need to defrag other globals, one small allocation can hold a full allocator run. * so although small, it is still important to defrag these */ -long defragOtherGlobals() { - long defragged = 0; +void defragOtherGlobals() { /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. * but we assume most of these are short lived, we only need to defrag allocations * that remain static for a long time */ - defragged += activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); - defragged += moduleDefragGlobals(); - return defragged; + activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); + moduleDefragGlobals(); } /* returns 0 more work may or may not be needed (see non-zero cursor), @@ -981,17 +834,17 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int if (de) { robj *ob = dictGetVal(de); if (ob->type == OBJ_LIST) { - return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits); + return scanLaterList(ob, cursor, endtime); } else if (ob->type == OBJ_SET) { - server.stat_active_defrag_hits += scanLaterSet(ob, cursor); + scanLaterSet(ob, cursor); } else if (ob->type == OBJ_ZSET) { - server.stat_active_defrag_hits += scanLaterZset(ob, cursor); + scanLaterZset(ob, cursor); } else if (ob->type == OBJ_HASH) { - server.stat_active_defrag_hits += scanLaterHash(ob, cursor); + scanLaterHash(ob, cursor); } else if (ob->type == OBJ_STREAM) { - return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits); + return scanLaterStreamListpacks(ob, cursor, endtime); } else if (ob->type == OBJ_MODULE) { - return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits, dbid); + return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, dbid); } else { *cursor = 0; /* object type may have changed since we schedule it for later */ } @@ -1106,6 +959,7 @@ void computeDefragCycles() { void activeDefragCycle(void) { static int current_db = -1; static unsigned long cursor = 0; + static unsigned long expires_cursor = 0; static redisDb *db = NULL; static long long start_scan, start_stat; unsigned int iterations = 0; @@ -1151,7 +1005,7 @@ void activeDefragCycle(void) { do { /* if we're not continuing a scan from the last call or loop, start a new one */ - if (!cursor) { + if (!cursor && !expires_cursor) { /* finish any leftovers from previous db before moving to the next one */ if (db && defragLaterStep(db, endtime)) { quit = 1; /* time is up, we didn't finish all the work */ @@ -1161,7 +1015,7 @@ void activeDefragCycle(void) { /* Move on to next database, and stop if we reached the last one. */ if (++current_db >= server.dbnum) { /* defrag other items not part of the db / keys */ - server.stat_active_defrag_hits += defragOtherGlobals(); + defragOtherGlobals(); long long now = ustime(); size_t frag_bytes; @@ -1198,16 +1052,27 @@ void activeDefragCycle(void) { break; /* this will exit the function and we'll continue on the next cycle */ } - cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db); + /* Scan the keyspace dict unless we're scanning the expire dict. */ + if (!expires_cursor) + cursor = dictScanDefrag(db->dict, cursor, defragScanCallback, + activeDefragAlloc, db); + + /* When done scanning the keyspace dict, we scan the expire dict. */ + if (!cursor) + expires_cursor = dictScanDefrag(db->expires, expires_cursor, + defragExpireScanCallback, + activeDefragAlloc, NULL); /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys * (if we have a lot of pointers in one hash bucket or rehashing), * check if we reached the time limit. * But regardless, don't start a new db in this loop, this is because after * the last db we call defragOtherGlobals, which must be done in one cycle */ - if (!cursor || (++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64)) { + if (!(cursor || expires_cursor) || + ++iterations > 16 || + server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64) + { if (!cursor || ustime() > endtime) { quit = 1; break; @@ -1216,7 +1081,7 @@ void activeDefragCycle(void) { prev_defragged = server.stat_active_defrag_hits; prev_scanned = server.stat_active_defrag_scanned; } - } while(cursor && !quit); + } while((cursor || expires_cursor) && !quit); } while(!quit); latencyEndMonitor(latency); @@ -1243,9 +1108,8 @@ void *activeDefragAlloc(void *ptr) { return NULL; } -robj *activeDefragStringOb(robj *ob, long *defragged) { +robj *activeDefragStringOb(robj *ob) { UNUSED(ob); - UNUSED(defragged); return NULL; } diff --git a/src/dict.c b/src/dict.c index 90fd4f52e..ec2d4dca0 100644 --- a/src/dict.c +++ b/src/dict.c @@ -58,6 +58,22 @@ static dictResizeEnable dict_can_resize = DICT_RESIZE_ENABLE; static unsigned int dict_force_resize_ratio = 5; +/* -------------------------- types ----------------------------------------- */ + +struct dictEntry { + void *key; + union { + void *val; + uint64_t u64; + int64_t s64; + double d; + } v; + struct dictEntry *next; /* Next entry in the same hash bucket. */ + void *metadata[]; /* An arbitrary number of bytes (starting at a + * pointer-aligned address) of size as returned + * by dictType's dictEntryMetadataBytes(). */ +}; + /* -------------------------- private prototypes ---------------------------- */ static int _dictExpandIfNeeded(dict *d); @@ -104,7 +120,11 @@ static void _dictReset(dict *d, int htidx) /* Create a new hash table */ dict *dictCreate(dictType *type) { - dict *d = zmalloc(sizeof(*d)); + size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes() : 0; + dict *d = zmalloc(sizeof(*d) + metasize); + if (metasize) { + memset(dictMetadata(d), 0, metasize); + } _dictInit(d,type); return d; @@ -303,6 +323,11 @@ static void _dictRehashStep(dict *d) { if (d->pauserehash == 0) dictRehash(d,1); } +/* Return a pointer to the metadata section within the dict. */ +void *dictMetadata(dict *d) { + return &d->metadata; +} + /* Add an element to the target hash table */ int dictAdd(dict *d, void *key, void *val) { @@ -349,10 +374,10 @@ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) * system it is more likely that recently added entries are accessed * more frequently. */ htidx = dictIsRehashing(d) ? 1 : 0; - size_t metasize = dictMetadataSize(d); + size_t metasize = dictEntryMetadataSize(d); entry = zmalloc(sizeof(*entry) + metasize); if (metasize > 0) { - memset(dictMetadata(entry), 0, metasize); + memset(dictEntryMetadata(entry), 0, metasize); } entry->next = d->ht_table[htidx][index]; d->ht_table[htidx][index] = entry; @@ -596,6 +621,82 @@ void dictTwoPhaseUnlinkFree(dict *d, dictEntry *he, dictEntry **plink, int table dictResumeRehashing(d); } +void dictSetKey(dict *d, dictEntry* de, void *key) { + if (d->type->keyDup) + de->key = d->type->keyDup(d, key); + else + de->key = key; +} + +void dictSetVal(dict *d, dictEntry *de, void *val) { + de->v.val = d->type->valDup ? d->type->valDup(d, val) : val; +} + +void dictSetSignedIntegerVal(dictEntry *de, int64_t val) { + de->v.s64 = val; +} + +void dictSetUnsignedIntegerVal(dictEntry *de, uint64_t val) { + de->v.u64 = val; +} + +void dictSetDoubleVal(dictEntry *de, double val) { + de->v.d = val; +} + +int64_t dictIncrSignedIntegerVal(dictEntry *de, int64_t val) { + return de->v.s64 += val; +} + +uint64_t dictIncrUnsignedIntegerVal(dictEntry *de, uint64_t val) { + return de->v.u64 += val; +} + +double dictIncrDoubleVal(dictEntry *de, double val) { + return de->v.d += val; +} + +/* A pointer to the metadata section within the dict entry. */ +void *dictEntryMetadata(dictEntry *de) { + return &de->metadata; +} + +void *dictGetKey(const dictEntry *de) { + return de->key; +} + +void *dictGetVal(const dictEntry *de) { + return de->v.val; +} + +int64_t dictGetSignedIntegerVal(const dictEntry *de) { + return de->v.s64; +} + +uint64_t dictGetUnsignedIntegerVal(const dictEntry *de) { + return de->v.u64; +} + +double dictGetDoubleVal(const dictEntry *de) { + return de->v.d; +} + +/* Returns a mutable reference to the value as a double within the entry. */ +double *dictGetDoubleValPtr(dictEntry *de) { + return &de->v.d; +} + +/* Returns the memory usage in bytes of the dict, excluding the size of the keys + * and values. */ +size_t dictMemUsage(const dict *d) { + return dictSize(d) * sizeof(dictEntry) + + dictSlots(d) * sizeof(dictEntry*); +} + +size_t dictEntryMemUsage(void) { + return sizeof(dictEntry); +} + /* A fingerprint is a 64 bit number that represents the state of the dictionary * at a given time, it's just a few dict properties xored together. * When an unsafe iterator is initialized, we get the dict fingerprint, and check @@ -846,6 +947,21 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) { return stored; } + +/* Reallocate the dictEntry allocations in a bucket using the provided + * allocation function in order to defrag them. */ +static void dictDefragBucket(dict *d, dictEntry **bucketref, dictDefragAllocFunction *allocfn) { + while (bucketref && *bucketref) { + dictEntry *de = *bucketref, *newde; + if ((newde = allocfn(de))) { + *bucketref = newde; + if (d->type->afterReplaceEntry) + d->type->afterReplaceEntry(d, newde); + } + bucketref = &(*bucketref)->next; + } +} + /* This is like dictGetRandomKey() from the POV of the API, but will do more * work to ensure a better distribution of the returned element. * @@ -969,8 +1085,24 @@ static unsigned long rev(unsigned long v) { unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, - dictScanBucketFunction* bucketfn, void *privdata) +{ + return dictScanDefrag(d, v, fn, NULL, privdata); +} + +/* Like dictScan, but additionally reallocates the memory used by the dict + * entries using the provided allocation function. This feature was added for + * the active defrag feature. + * + * The 'defracallocfn' callback is called with a pointer to memory that callback + * can reallocate. The callback should return a new memory address or NULL, + * where NULL means that no reallocation happened and the old memory is still + * valid. */ +unsigned long dictScanDefrag(dict *d, + unsigned long v, + dictScanFunction *fn, + dictDefragAllocFunction *defragallocfn, + void *privdata) { int htidx0, htidx1; const dictEntry *de, *next; @@ -986,7 +1118,9 @@ unsigned long dictScan(dict *d, m0 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx0]); /* Emit entries at cursor */ - if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]); + if (defragallocfn) { + dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragallocfn); + } de = d->ht_table[htidx0][v & m0]; while (de) { next = de->next; @@ -1017,7 +1151,9 @@ unsigned long dictScan(dict *d, m1 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx1]); /* Emit entries at cursor */ - if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]); + if (defragallocfn) { + dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragallocfn); + } de = d->ht_table[htidx0][v & m0]; while (de) { next = de->next; @@ -1029,7 +1165,9 @@ unsigned long dictScan(dict *d, * of the index pointed to by the cursor in the smaller table */ do { /* Emit entries at cursor */ - if (bucketfn) bucketfn(d, &d->ht_table[htidx1][v & m1]); + if (defragallocfn) { + dictDefragBucket(d, &d->ht_table[htidx1][v & m1], defragallocfn); + } de = d->ht_table[htidx1][v & m1]; while (de) { next = de->next; @@ -1150,25 +1288,23 @@ uint64_t dictGetHash(dict *d, const void *key) { return dictHashKey(d, key); } -/* Finds the dictEntry reference by using pointer and pre-calculated hash. +/* Finds the dictEntry using pointer and pre-calculated hash. * oldkey is a dead pointer and should not be accessed. * the hash value should be provided using dictGetHash. * no string / key comparison is performed. - * return value is the reference to the dictEntry if found, or NULL if not found. */ -dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) { - dictEntry *he, **heref; + * return value is a pointer to the dictEntry if found, or NULL if not found. */ +dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) { + dictEntry *he; unsigned long idx, table; if (dictSize(d) == 0) return NULL; /* dict is empty */ for (table = 0; table <= 1; table++) { idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]); - heref = &d->ht_table[table][idx]; - he = *heref; + he = d->ht_table[table][idx]; while(he) { if (oldptr==he->key) - return heref; - heref = &he->next; - he = *heref; + return he; + he = he->next; } if (!dictIsRehashing(d)) return NULL; } diff --git a/src/dict.h b/src/dict.h index 00b721d3d..a8a76e69c 100644 --- a/src/dict.h +++ b/src/dict.h @@ -44,19 +44,7 @@ #define DICT_OK 0 #define DICT_ERR 1 -typedef struct dictEntry { - void *key; - union { - void *val; - uint64_t u64; - int64_t s64; - double d; - } v; - struct dictEntry *next; /* Next entry in the same hash bucket. */ - void *metadata[]; /* An arbitrary number of bytes (starting at a - * pointer-aligned address) of size as returned - * by dictType's dictEntryMetadataBytes(). */ -} dictEntry; +typedef struct dictEntry dictEntry; /* opaque */ typedef struct dict dict; @@ -68,9 +56,13 @@ typedef struct dictType { void (*keyDestructor)(dict *d, void *key); void (*valDestructor)(dict *d, void *obj); int (*expandAllowed)(size_t moreMem, double usedRatio); - /* Allow a dictEntry to carry extra caller-defined metadata. The - * extra memory is initialized to 0 when a dictEntry is allocated. */ + /* Allow each dict and dictEntry to carry extra caller-defined metadata. The + * extra memory is initialized to 0 when allocated. */ size_t (*dictEntryMetadataBytes)(dict *d); + size_t (*dictMetadataBytes)(void); + /* Optional callback called after an entry has been reallocated (due to + * active defrag). Only called if the entry has metadata. */ + void (*afterReplaceEntry)(dict *d, dictEntry *entry); } dictType; #define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp)) @@ -87,6 +79,10 @@ struct dict { /* Keep small vars at end for optimal (minimal) struct padding */ int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ signed char ht_size_exp[2]; /* exponent of size. (size = 1<type->valDestructor) \ - (d)->type->valDestructor((d), (entry)->v.val); \ + (d)->type->valDestructor((d), dictGetVal(entry)); \ } while(0) -#define dictSetVal(d, entry, _val_) do { \ - if ((d)->type->valDup) \ - (entry)->v.val = (d)->type->valDup((d), _val_); \ - else \ - (entry)->v.val = (_val_); \ -} while(0) - -#define dictSetSignedIntegerVal(entry, _val_) \ - do { (entry)->v.s64 = _val_; } while(0) - -#define dictSetUnsignedIntegerVal(entry, _val_) \ - do { (entry)->v.u64 = _val_; } while(0) - -#define dictSetDoubleVal(entry, _val_) \ - do { (entry)->v.d = _val_; } while(0) - -#define dictIncrSignedIntegerVal(entry, _val_) \ - ((entry)->v.s64 += _val_) - -#define dictIncrUnsignedIntegerVal(entry, _val_) \ - ((entry)->v.u64 += _val_) - -#define dictIncrDoubleVal(entry, _val_) \ - ((entry)->v.d += _val_) - #define dictFreeKey(d, entry) \ if ((d)->type->keyDestructor) \ - (d)->type->keyDestructor((d), (entry)->key) - -#define dictSetKey(d, entry, _key_) do { \ - if ((d)->type->keyDup) \ - (entry)->key = (d)->type->keyDup((d), _key_); \ - else \ - (entry)->key = (_key_); \ -} while(0) + (d)->type->keyDestructor((d), dictGetKey(entry)) #define dictCompareKeys(d, key1, key2) \ (((d)->type->keyCompare) ? \ (d)->type->keyCompare((d), key1, key2) : \ (key1) == (key2)) -#define dictMetadata(entry) (&(entry)->metadata) -#define dictMetadataSize(d) ((d)->type->dictEntryMetadataBytes \ - ? (d)->type->dictEntryMetadataBytes(d) : 0) +#define dictEntryMetadataSize(d) ((d)->type->dictEntryMetadataBytes \ + ? (d)->type->dictEntryMetadataBytes(d) : 0) +#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \ + ? (d)->type->dictMetadataBytes() : 0) #define dictHashKey(d, key) ((d)->type->hashFunction(key)) -#define dictGetKey(he) ((he)->key) -#define dictGetVal(he) ((he)->v.val) -#define dictGetSignedIntegerVal(he) ((he)->v.s64) -#define dictGetUnsignedIntegerVal(he) ((he)->v.u64) -#define dictGetDoubleVal(he) ((he)->v.d) #define dictSlots(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1])) #define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1]) #define dictIsRehashing(d) ((d)->rehashidx != -1) @@ -189,6 +149,7 @@ typedef enum { dict *dictCreate(dictType *type); int dictExpand(dict *d, unsigned long size); int dictTryExpand(dict *d, unsigned long size); +void *dictMetadata(dict *d); int dictAdd(dict *d, void *key, void *val); dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing); dictEntry *dictAddOrFind(dict *d, void *key); @@ -202,6 +163,23 @@ void dictRelease(dict *d); dictEntry * dictFind(dict *d, const void *key); void *dictFetchValue(dict *d, const void *key); int dictResize(dict *d); +void dictSetKey(dict *d, dictEntry* de, void *key); +void dictSetVal(dict *d, dictEntry *de, void *val); +void dictSetSignedIntegerVal(dictEntry *de, int64_t val); +void dictSetUnsignedIntegerVal(dictEntry *de, uint64_t val); +void dictSetDoubleVal(dictEntry *de, double val); +int64_t dictIncrSignedIntegerVal(dictEntry *de, int64_t val); +uint64_t dictIncrUnsignedIntegerVal(dictEntry *de, uint64_t val); +double dictIncrDoubleVal(dictEntry *de, double val); +void *dictEntryMetadata(dictEntry *de); +void *dictGetKey(const dictEntry *de); +void *dictGetVal(const dictEntry *de); +int64_t dictGetSignedIntegerVal(const dictEntry *de); +uint64_t dictGetUnsignedIntegerVal(const dictEntry *de); +double dictGetDoubleVal(const dictEntry *de); +double *dictGetDoubleValPtr(dictEntry *de); +size_t dictMemUsage(const dict *d); +size_t dictEntryMemUsage(void); dictIterator *dictGetIterator(dict *d); dictIterator *dictGetSafeIterator(dict *d); void dictInitIterator(dictIterator *iter, dict *d); @@ -221,9 +199,10 @@ int dictRehash(dict *d, int n); int dictRehashMilliseconds(dict *d, int ms); void dictSetHashFunctionSeed(uint8_t *seed); uint8_t *dictGetHashFunctionSeed(void); -unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata); +unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata); +unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragAllocFunction *allocfn, void *privdata); uint64_t dictGetHash(dict *d, const void *key); -dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); +dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); #ifdef REDIS_TEST int dictTest(int argc, char *argv[], int flags); diff --git a/src/eval.c b/src/eval.c index 6eb6ed1d4..1fa08c9dd 100644 --- a/src/eval.c +++ b/src/eval.c @@ -677,8 +677,8 @@ dict* evalScriptsDict() { unsigned long evalScriptsMemory() { return lctx.lua_scripts_mem + - dictSize(lctx.lua_scripts) * (sizeof(dictEntry) + sizeof(luaScript)) + - dictSlots(lctx.lua_scripts) * sizeof(dictEntry*); + dictMemUsage(lctx.lua_scripts) + + dictSize(lctx.lua_scripts) * sizeof(luaScript); } /* --------------------------------------------------------------------------- diff --git a/src/expire.c b/src/expire.c index 7b6b3bc9c..d07e22f66 100644 --- a/src/expire.c +++ b/src/expire.c @@ -110,6 +110,33 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { #define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which we do extra efforts. */ +/* Data used by the expire dict scan callback. */ +typedef struct { + redisDb *db; + long long now; + unsigned long sampled; /* num keys checked */ + unsigned long expired; /* num keys expired */ + long long ttl_sum; /* sum of ttl for key with ttl not yet expired */ + int ttl_samples; /* num keys with ttl not yet expired */ +} expireScanData; + +void expireScanCallback(void *privdata, const dictEntry *const_de) { + dictEntry *de = (dictEntry *)const_de; + expireScanData *data = privdata; + long long ttl = dictGetSignedIntegerVal(de) - data->now; + if (activeExpireCycleTryExpire(data->db, de, data->now)) { + data->expired++; + /* Propagate the DEL command */ + postExecutionUnitOperations(); + } + if (ttl > 0) { + /* We want the average TTL of keys yet not expired. */ + data->ttl_sum += ttl; + data->ttl_samples++; + } + data->sampled++; +} + void activeExpireCycle(int type) { /* Adjust the running parameters according to the configured expire * effort. The default effort is 1, and the maximum configurable effort @@ -186,10 +213,11 @@ void activeExpireCycle(int type) { serverAssert(server.also_propagate.numops == 0); for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { - /* Expired and checked in a single loop. */ - unsigned long expired, sampled; + /* Scan callback data including expired and checked count per iteration. */ + expireScanData data; redisDb *db = server.db+(current_db % server.dbnum); + data.db = db; /* Increment the DB now so we are sure if we run out of time * in the current DB we'll restart from the next. This allows to @@ -202,8 +230,6 @@ void activeExpireCycle(int type) { * is not fixed, but depends on the Redis configured "expire effort". */ do { unsigned long num, slots; - long long now, ttl_sum; - int ttl_samples; iteration++; /* If there is nothing to expire try next DB ASAP. */ @@ -212,7 +238,7 @@ void activeExpireCycle(int type) { break; } slots = dictSlots(db->expires); - now = mstime(); + data.now = mstime(); /* When there are less than 1% filled slots, sampling the key * space is expensive, so stop here waiting for better times... @@ -220,12 +246,12 @@ void activeExpireCycle(int type) { if (slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break; - /* The main collection cycle. Sample random keys among keys + /* The main collection cycle. Scan through keys among keys * with an expire set, checking for expired ones. */ - expired = 0; - sampled = 0; - ttl_sum = 0; - ttl_samples = 0; + data.sampled = 0; + data.expired = 0; + data.ttl_sum = 0; + data.ttl_samples = 0; if (num > config_keys_per_loop) num = config_keys_per_loop; @@ -243,46 +269,17 @@ void activeExpireCycle(int type) { long max_buckets = num*20; long checked_buckets = 0; - while (sampled < num && checked_buckets < max_buckets) { - for (int table = 0; table < 2; table++) { - if (table == 1 && !dictIsRehashing(db->expires)) break; - - unsigned long idx = db->expires_cursor; - idx &= DICTHT_SIZE_MASK(db->expires->ht_size_exp[table]); - dictEntry *de = db->expires->ht_table[table][idx]; - long long ttl; - - /* Scan the current bucket of the current table. */ - checked_buckets++; - while(de) { - /* Get the next entry now since this entry may get - * deleted. */ - dictEntry *e = de; - de = de->next; - - ttl = dictGetSignedIntegerVal(e)-now; - if (activeExpireCycleTryExpire(db,e,now)) { - expired++; - /* Propagate the DEL command */ - postExecutionUnitOperations(); - } - if (ttl > 0) { - /* We want the average TTL of keys yet - * not expired. */ - ttl_sum += ttl; - ttl_samples++; - } - sampled++; - } - } - db->expires_cursor++; + while (data.sampled < num && checked_buckets < max_buckets) { + db->expires_cursor = dictScan(db->expires, db->expires_cursor, + expireScanCallback, &data); + checked_buckets++; } - total_expired += expired; - total_sampled += sampled; + total_expired += data.expired; + total_sampled += data.sampled; /* Update the average TTL stats for this database. */ - if (ttl_samples) { - long long avg_ttl = ttl_sum/ttl_samples; + if (data.ttl_samples) { + long long avg_ttl = data.ttl_sum / data.ttl_samples; /* Do a simple running average with a few samples. * We just use the current estimate with a weight of 2% @@ -305,8 +302,8 @@ void activeExpireCycle(int type) { /* We don't repeat the cycle for the current database if there are * an acceptable amount of stale keys (logically expired but yet * not reclaimed). */ - } while (sampled == 0 || - (expired*100/sampled) > config_cycle_acceptable_stale); + } while (data.sampled == 0 || + (data.expired * 100 / data.sampled) > config_cycle_acceptable_stale); } elapsed = ustime()-start; @@ -444,8 +441,8 @@ void rememberSlaveKeyWithExpire(redisDb *db, robj *key) { * representing the key: we don't want to need to take those keys * in sync with the main DB. The keys will be removed by expireSlaveKeys() * as it scans to find keys to remove. */ - if (de->key == key->ptr) { - de->key = sdsdup(key->ptr); + if (dictGetKey(de) == key->ptr) { + dictSetKey(slaveKeysWithExpire, de, sdsdup(key->ptr)); dictSetUnsignedIntegerVal(de,0); } diff --git a/src/functions.c b/src/functions.c index 3f7b3a7ef..c60d40d3c 100644 --- a/src/functions.c +++ b/src/functions.c @@ -1091,10 +1091,9 @@ unsigned long functionsMemory() { /* Return memory overhead of all the engines combine */ unsigned long functionsMemoryOverhead() { - size_t memory_overhead = dictSize(engines) * sizeof(dictEntry) + - dictSlots(engines) * sizeof(dictEntry*); - memory_overhead += dictSize(curr_functions_lib_ctx->functions) * sizeof(dictEntry) + - dictSlots(curr_functions_lib_ctx->functions) * sizeof(dictEntry*) + sizeof(functionsLibCtx); + size_t memory_overhead = dictMemUsage(engines); + memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions); + memory_overhead += sizeof(functionsLibCtx); memory_overhead += curr_functions_lib_ctx->cache_memory; memory_overhead += engine_cache_memory; diff --git a/src/module.c b/src/module.c index cc6934dca..481553f12 100644 --- a/src/module.c +++ b/src/module.c @@ -10336,7 +10336,7 @@ int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanC } int ret = 1; ScanCBData data = { ctx, privdata, fn }; - cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data); + cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, &data); if (cursor->cursor == 0) { cursor->done = 1; ret = 0; @@ -10448,7 +10448,7 @@ int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleSc int ret = 1; if (ht) { ScanKeyCBData data = { key, privdata, fn }; - cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data); + cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, &data); if (cursor->cursor == 0) { cursor->done = 1; ret = 0; @@ -12562,7 +12562,6 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) { * defrag callback. */ struct RedisModuleDefragCtx { - long defragged; long long int endtime; unsigned long *cursor; struct redisObject *key; /* Optional name of key processed, NULL when unknown. */ @@ -12651,11 +12650,8 @@ int RM_DefragCursorGet(RedisModuleDefragCtx *ctx, unsigned long *cursor) { * be used again. */ void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) { - void *newptr = activeDefragAlloc(ptr); - if (newptr) - ctx->defragged++; - - return newptr; + UNUSED(ctx); + return activeDefragAlloc(ptr); } /* Defrag a RedisModuleString previously allocated by RM_Alloc, RM_Calloc, etc. @@ -12669,7 +12665,8 @@ void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) { * on the Redis side is dropped as soon as the command callback returns). */ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisModuleString *str) { - return activeDefragStringOb(str, &ctx->defragged); + UNUSED(ctx); + return activeDefragStringOb(str); } @@ -12678,11 +12675,11 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo * Returns a zero value (and initializes the cursor) if no more needs to be done, * or a non-zero value otherwise. */ -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid) { +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid) { moduleValue *mv = value->ptr; moduleType *mt = mv->type; - RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor, key, dbid}; + RedisModuleDefragCtx defrag_ctx = { endtime, cursor, key, dbid}; /* Invoke callback. Note that the callback may be missing if the key has been * replaced with a different type since our last visit. @@ -12691,7 +12688,6 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en if (mt->defrag) ret = mt->defrag(&defrag_ctx, key, &mv->value); - *defragged += defrag_ctx.defragged; if (!ret) { *cursor = 0; /* No more work to do */ return 0; @@ -12706,7 +12702,7 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en * Returns 1 if the operation has been completed or 0 if it needs to * be scheduled for late defrag. */ -int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) { +int moduleDefragValue(robj *key, robj *value, int dbid) { moduleValue *mv = value->ptr; moduleType *mt = mv->type; @@ -12715,7 +12711,6 @@ int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) { */ moduleValue *newmv = activeDefragAlloc(mv); if (newmv) { - (*defragged)++; value->ptr = mv = newmv; } @@ -12733,29 +12728,24 @@ int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) { return 0; /* Defrag later */ } - RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, key, dbid}; + RedisModuleDefragCtx defrag_ctx = { 0, NULL, key, dbid }; mt->defrag(&defrag_ctx, key, &mv->value); - (*defragged) += defrag_ctx.defragged; return 1; } /* Call registered module API defrag functions */ -long moduleDefragGlobals(void) { +void moduleDefragGlobals(void) { dictIterator *di = dictGetIterator(modules); dictEntry *de; - long defragged = 0; while ((de = dictNext(di)) != NULL) { struct RedisModule *module = dictGetVal(de); if (!module->defrag_cb) continue; - RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, NULL, -1}; + RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1}; module->defrag_cb(&defrag_ctx); - defragged += defrag_ctx.defragged; } dictReleaseIterator(di); - - return defragged; } /* Returns the name of the key currently being processed. diff --git a/src/object.c b/src/object.c index 54ed57958..67e372032 100644 --- a/src/object.c +++ b/src/object.c @@ -1029,7 +1029,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d)); while((de = dictNext(di)) != NULL && samples < sample_size) { ele = dictGetKey(de); - elesize += sizeof(struct dictEntry) + sdsZmallocSize(ele); + elesize += dictEntryMemUsage() + sdsZmallocSize(ele); samples++; } dictReleaseIterator(di); @@ -1053,7 +1053,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { zmalloc_size(zsl->header); while(znode != NULL && samples < sample_size) { elesize += sdsZmallocSize(znode->ele); - elesize += sizeof(struct dictEntry)+zmalloc_size(znode); + elesize += dictEntryMemUsage()+zmalloc_size(znode); samples++; znode = znode->level[0].forward; } @@ -1072,7 +1072,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { ele = dictGetKey(de); ele2 = dictGetVal(de); elesize += sdsZmallocSize(ele) + sdsZmallocSize(ele2); - elesize += sizeof(struct dictEntry); + elesize += dictEntryMemUsage(); samples++; } dictReleaseIterator(di); @@ -1242,19 +1242,18 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1)); mh->db[mh->num_dbs].dbid = j; - mem = dictSize(db->dict) * sizeof(dictEntry) + - dictSlots(db->dict) * sizeof(dictEntry*) + + mem = dictMemUsage(db->dict) + dictSize(db->dict) * sizeof(robj); mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - mem = dictSize(db->expires) * sizeof(dictEntry) + - dictSlots(db->expires) * sizeof(dictEntry*); + mem = dictMemUsage(db->expires); mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; /* Account for the slot to keys map in cluster mode */ - mem = dictSize(db->dict) * dictMetadataSize(db->dict); + mem = dictSize(db->dict) * dictEntryMetadataSize(db->dict) + + dictMetadataSize(db->dict); mh->db[mh->num_dbs].overhead_ht_slot_to_keys = mem; mem_total+=mem; @@ -1547,7 +1546,7 @@ NULL } size_t usage = objectComputeSize(c->argv[2],dictGetVal(de),samples,c->db->id); usage += sdsZmallocSize(dictGetKey(de)); - usage += sizeof(dictEntry); + usage += dictEntryMemUsage(); usage += dictMetadataSize(c->db->dict); addReplyLongLong(c,usage); } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) { diff --git a/src/pubsub.c b/src/pubsub.c index 2e2522c57..a257a8af3 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -727,10 +727,8 @@ size_t pubsubMemOverhead(client *c) { /* PubSub patterns */ size_t mem = listLength(c->pubsub_patterns) * sizeof(listNode); /* Global PubSub channels */ - mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) + - dictSlots(c->pubsub_channels) * sizeof(dictEntry*); + mem += dictMemUsage(c->pubsub_channels); /* Sharded PubSub channels */ - mem += dictSize(c->pubsubshard_channels) * sizeof(dictEntry) + - dictSlots(c->pubsubshard_channels) * sizeof(dictEntry*); + mem += dictMemUsage(c->pubsubshard_channels); return mem; } diff --git a/src/redis-cli.c b/src/redis-cli.c index c8576702c..b9d84c6d9 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -762,7 +762,7 @@ void cliInitGroupHelpEntries(dict *groups) { for (entry = dictNext(iter); entry != NULL; entry = dictNext(iter)) { tmp.argc = 1; tmp.argv = zmalloc(sizeof(sds)); - tmp.argv[0] = sdscatprintf(sdsempty(),"@%s",(char *)entry->key); + tmp.argv[0] = sdscatprintf(sdsempty(),"@%s",(char *)dictGetKey(entry)); tmp.full = tmp.argv[0]; tmp.type = CLI_HELP_GROUP; tmp.org.name = NULL; diff --git a/src/server.c b/src/server.c index ae83ec25c..910f299f3 100644 --- a/src/server.c +++ b/src/server.c @@ -399,13 +399,24 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) { /* Returns the size of the DB dict entry metadata in bytes. In cluster mode, the * metadata is used for constructing a doubly linked list of the dict entries * belonging to the same cluster slot. See the Slot to Key API in cluster.c. */ -size_t dictEntryMetadataSize(dict *d) { +size_t dbDictEntryMetadataSize(dict *d) { UNUSED(d); /* NOTICE: this also affects overhead_ht_slot_to_keys in getMemoryOverheadData. * If we ever add non-cluster related data here, that code must be modified too. */ return server.cluster_enabled ? sizeof(clusterDictEntryMetadata) : 0; } +/* Returns the size of the DB dict metadata in bytes. In cluster mode, we store + * a pointer to the db in the main db dict, used for updating the slot-to-key + * mapping when a dictEntry is reallocated. */ +size_t dbDictMetadataSize(void) { + return server.cluster_enabled ? sizeof(clusterDictMetadata) : 0; +} + +void dbDictAfterReplaceEntry(dict *d, dictEntry *de) { + if (server.cluster_enabled) slotToKeyReplaceEntry(d, de); +} + /* Generic hash table type where keys are Redis Objects, Values * dummy pointers. */ dictType objectKeyPointerValueDictType = { @@ -460,7 +471,9 @@ dictType dbDictType = { dictSdsDestructor, /* key destructor */ dictObjectDestructor, /* val destructor */ dictExpandAllowed, /* allow to expand */ - dictEntryMetadataSize /* size of entry metadata in bytes */ + dbDictEntryMetadataSize, /* size of entry metadata in bytes */ + dbDictMetadataSize, /* size of dict metadata in bytes */ + dbDictAfterReplaceEntry /* notify entry moved/reallocated */ }; /* Db->expires */ diff --git a/src/server.h b/src/server.h index 86611c321..bf0a15459 100644 --- a/src/server.h +++ b/src/server.h @@ -2442,9 +2442,9 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags); size_t moduleGetFreeEffort(robj *key, robj *val, int dbid); size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid); robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value); -int moduleDefragValue(robj *key, robj *obj, long *defragged, int dbid); -int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid); -long moduleDefragGlobals(void); +int moduleDefragValue(robj *key, robj *obj, int dbid); +int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid); +void moduleDefragGlobals(void); void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd); @@ -2988,7 +2988,7 @@ void checkChildrenDone(void); int setOOMScoreAdj(int process_class); void rejectCommandFormat(client *c, const char *fmt, ...); void *activeDefragAlloc(void *ptr); -robj *activeDefragStringOb(robj* ob, long *defragged); +robj *activeDefragStringOb(robj* ob); void dismissSds(sds s); void dismissMemory(void* ptr, size_t size_hint); void dismissMemoryInChild(void); diff --git a/src/t_zset.c b/src/t_zset.c index 048759d1f..5dadd58fd 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -1428,7 +1428,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, dou /* Note that we did not removed the original element from * the hash table representing the sorted set, so we just * update the score. */ - dictGetVal(de) = &znode->score; /* Update score ptr. */ + dictSetVal(zs->dict, de, &znode->score); /* Update score ptr. */ *out_flags |= ZADD_OUT_UPDATED; } return 1; @@ -2741,7 +2741,8 @@ void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, in * Here we access directly the dictEntry double * value inside the union as it is a big speedup * compared to using the getDouble/setDouble API. */ - zunionInterAggregate(&existing->v.d,score,aggregate); + double *existing_score_ptr = dictGetDoubleValPtr(existing); + zunionInterAggregate(existing_score_ptr, score, aggregate); } } zuiClearIterator(&src[i]);