mirror of https://mirror.osredm.com/root/redis.git
Make dictEntry opaque (#11465)
This PR refactors the abstraction of the dictEntry by making it opaque. This enables future optimizations of the dict implementation without affecting the code using it. The PR contains 5 commits. More detailed commit messages are found in each commit. * Make dictEntry opaque * Let active expire cycle use dictScan instead of messing with internals * activeDefragSdsDict use scan instead of iterator and drop dictSetNext * Remove the bucket-cb from dictScan and move dictEntry defrag to dictScanDefrag * Move stat_active_defrag_hits increment to activeDefragAlloc
This commit is contained in:
commit
12826fa38f
|
@ -93,9 +93,9 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
|
||||||
/* Links to the next and previous entries for keys in the same slot are stored
|
/* Links to the next and previous entries for keys in the same slot are stored
|
||||||
* in the dict entry metadata. See Slot to Key API below. */
|
* in the dict entry metadata. See Slot to Key API below. */
|
||||||
#define dictEntryNextInSlot(de) \
|
#define dictEntryNextInSlot(de) \
|
||||||
(((clusterDictEntryMetadata *)dictMetadata(de))->next)
|
(((clusterDictEntryMetadata *)dictEntryMetadata(de))->next)
|
||||||
#define dictEntryPrevInSlot(de) \
|
#define dictEntryPrevInSlot(de) \
|
||||||
(((clusterDictEntryMetadata *)dictMetadata(de))->prev)
|
(((clusterDictEntryMetadata *)dictEntryMetadata(de))->prev)
|
||||||
|
|
||||||
#define RCVBUF_INIT_LEN 1024
|
#define RCVBUF_INIT_LEN 1024
|
||||||
#define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
|
#define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
|
||||||
|
@ -7288,7 +7288,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
|
||||||
* understand if we have keys for a given hash slot. */
|
* understand if we have keys for a given hash slot. */
|
||||||
|
|
||||||
void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
|
void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
|
||||||
sds key = entry->key;
|
sds key = dictGetKey(entry);
|
||||||
unsigned int hashslot = keyHashSlot(key, sdslen(key));
|
unsigned int hashslot = keyHashSlot(key, sdslen(key));
|
||||||
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
|
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
|
||||||
slot_to_keys->count++;
|
slot_to_keys->count++;
|
||||||
|
@ -7305,7 +7305,7 @@ void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
|
void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
|
||||||
sds key = entry->key;
|
sds key = dictGetKey(entry);
|
||||||
unsigned int hashslot = keyHashSlot(key, sdslen(key));
|
unsigned int hashslot = keyHashSlot(key, sdslen(key));
|
||||||
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
|
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
|
||||||
slot_to_keys->count--;
|
slot_to_keys->count--;
|
||||||
|
@ -7327,7 +7327,7 @@ void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
|
||||||
|
|
||||||
/* Updates neighbour entries when an entry has been replaced (e.g. reallocated
|
/* Updates neighbour entries when an entry has been replaced (e.g. reallocated
|
||||||
* during active defrag). */
|
* during active defrag). */
|
||||||
void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) {
|
void slotToKeyReplaceEntry(dict *d, dictEntry *entry) {
|
||||||
dictEntry *next = dictEntryNextInSlot(entry);
|
dictEntry *next = dictEntryNextInSlot(entry);
|
||||||
dictEntry *prev = dictEntryPrevInSlot(entry);
|
dictEntry *prev = dictEntryPrevInSlot(entry);
|
||||||
if (next != NULL) {
|
if (next != NULL) {
|
||||||
|
@ -7337,8 +7337,10 @@ void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) {
|
||||||
dictEntryNextInSlot(prev) = entry;
|
dictEntryNextInSlot(prev) = entry;
|
||||||
} else {
|
} else {
|
||||||
/* The replaced entry was the first in the list. */
|
/* The replaced entry was the first in the list. */
|
||||||
sds key = entry->key;
|
sds key = dictGetKey(entry);
|
||||||
unsigned int hashslot = keyHashSlot(key, sdslen(key));
|
unsigned int hashslot = keyHashSlot(key, sdslen(key));
|
||||||
|
clusterDictMetadata *dictmeta = dictMetadata(d);
|
||||||
|
redisDb *db = dictmeta->db;
|
||||||
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
|
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
|
||||||
slot_to_keys->head = entry;
|
slot_to_keys->head = entry;
|
||||||
}
|
}
|
||||||
|
@ -7347,6 +7349,8 @@ void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) {
|
||||||
/* Initialize slots-keys map of given db. */
|
/* Initialize slots-keys map of given db. */
|
||||||
void slotToKeyInit(redisDb *db) {
|
void slotToKeyInit(redisDb *db) {
|
||||||
db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping));
|
db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping));
|
||||||
|
clusterDictMetadata *dictmeta = dictMetadata(db->dict);
|
||||||
|
dictmeta->db = db;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Empty slots-keys map of given db. */
|
/* Empty slots-keys map of given db. */
|
||||||
|
|
|
@ -169,6 +169,9 @@ typedef struct clusterDictEntryMetadata {
|
||||||
dictEntry *next; /* Next entry with key in the same slot */
|
dictEntry *next; /* Next entry with key in the same slot */
|
||||||
} clusterDictEntryMetadata;
|
} clusterDictEntryMetadata;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
redisDb *db; /* A link back to the db this dict belongs to */
|
||||||
|
} clusterDictMetadata;
|
||||||
|
|
||||||
typedef struct clusterState {
|
typedef struct clusterState {
|
||||||
clusterNode *myself; /* This node */
|
clusterNode *myself; /* This node */
|
||||||
|
@ -409,7 +412,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded);
|
||||||
unsigned int keyHashSlot(char *key, int keylen);
|
unsigned int keyHashSlot(char *key, int keylen);
|
||||||
void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
|
void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
|
||||||
void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
|
void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
|
||||||
void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db);
|
void slotToKeyReplaceEntry(dict *d, dictEntry *entry);
|
||||||
void slotToKeyInit(redisDb *db);
|
void slotToKeyInit(redisDb *db);
|
||||||
void slotToKeyFlush(redisDb *db);
|
void slotToKeyFlush(redisDb *db);
|
||||||
void slotToKeyDestroy(redisDb *db);
|
void slotToKeyDestroy(redisDb *db);
|
||||||
|
|
|
@ -989,8 +989,8 @@ void configGetCommand(client *c) {
|
||||||
/* Note that hidden configs require an exact match (not a pattern) */
|
/* Note that hidden configs require an exact match (not a pattern) */
|
||||||
if (config->flags & HIDDEN_CONFIG) continue;
|
if (config->flags & HIDDEN_CONFIG) continue;
|
||||||
if (dictFind(matches, config->name)) continue;
|
if (dictFind(matches, config->name)) continue;
|
||||||
if (stringmatch(name, de->key, 1)) {
|
if (stringmatch(name, dictGetKey(de), 1)) {
|
||||||
dictAdd(matches, de->key, config);
|
dictAdd(matches, dictGetKey(de), config);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
@ -1000,7 +1000,7 @@ void configGetCommand(client *c) {
|
||||||
addReplyMapLen(c, dictSize(matches));
|
addReplyMapLen(c, dictSize(matches));
|
||||||
while ((de = dictNext(di)) != NULL) {
|
while ((de = dictNext(di)) != NULL) {
|
||||||
standardConfig *config = (standardConfig *) dictGetVal(de);
|
standardConfig *config = (standardConfig *) dictGetVal(de);
|
||||||
addReplyBulkCString(c, de->key);
|
addReplyBulkCString(c, dictGetKey(de));
|
||||||
addReplyBulkSds(c, config->interface.get(config));
|
addReplyBulkSds(c, config->interface.get(config));
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
@ -1754,7 +1754,7 @@ int rewriteConfig(char *path, int force_write) {
|
||||||
standardConfig *config = dictGetVal(de);
|
standardConfig *config = dictGetVal(de);
|
||||||
/* Only rewrite the primary names */
|
/* Only rewrite the primary names */
|
||||||
if (config->flags & ALIAS_CONFIG) continue;
|
if (config->flags & ALIAS_CONFIG) continue;
|
||||||
if (config->interface.rewrite) config->interface.rewrite(config, de->key, state);
|
if (config->interface.rewrite) config->interface.rewrite(config, dictGetKey(de), state);
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
|
||||||
|
|
11
src/db.c
11
src/db.c
|
@ -228,7 +228,6 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite) {
|
||||||
dictEntry *de = dictFind(db->dict,key->ptr);
|
dictEntry *de = dictFind(db->dict,key->ptr);
|
||||||
|
|
||||||
serverAssertWithInfo(NULL,key,de != NULL);
|
serverAssertWithInfo(NULL,key,de != NULL);
|
||||||
dictEntry auxentry = *de;
|
|
||||||
robj *old = dictGetVal(de);
|
robj *old = dictGetVal(de);
|
||||||
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||||
val->lru = old->lru;
|
val->lru = old->lru;
|
||||||
|
@ -246,17 +245,15 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite) {
|
||||||
decrRefCount(old);
|
decrRefCount(old);
|
||||||
/* Because of RM_StringDMA, old may be changed, so we need get old again */
|
/* Because of RM_StringDMA, old may be changed, so we need get old again */
|
||||||
old = dictGetVal(de);
|
old = dictGetVal(de);
|
||||||
/* Entry in auxentry may be changed, so we need update auxentry */
|
|
||||||
auxentry = *de;
|
|
||||||
}
|
}
|
||||||
dictSetVal(db->dict, de, val);
|
dictSetVal(db->dict, de, val);
|
||||||
|
|
||||||
if (server.lazyfree_lazy_server_del) {
|
if (server.lazyfree_lazy_server_del) {
|
||||||
freeObjAsync(key,old,db->id);
|
freeObjAsync(key,old,db->id);
|
||||||
dictSetVal(db->dict, &auxentry, NULL);
|
} else {
|
||||||
|
/* This is just decrRefCount(old); */
|
||||||
|
db->dict->type->valDestructor(db->dict, old);
|
||||||
}
|
}
|
||||||
|
|
||||||
dictFreeVal(db->dict, &auxentry);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Replace an existing key with a new value, we just replace value and don't
|
/* Replace an existing key with a new value, we just replace value and don't
|
||||||
|
@ -942,7 +939,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
|
||||||
privdata[0] = keys;
|
privdata[0] = keys;
|
||||||
privdata[1] = o;
|
privdata[1] = o;
|
||||||
do {
|
do {
|
||||||
cursor = dictScan(ht, cursor, scanCallback, NULL, privdata);
|
cursor = dictScan(ht, cursor, scanCallback, privdata);
|
||||||
} while (cursor &&
|
} while (cursor &&
|
||||||
maxiterations-- &&
|
maxiterations-- &&
|
||||||
listLength(keys) < (unsigned long)count);
|
listLength(keys) < (unsigned long)count);
|
||||||
|
|
|
@ -868,7 +868,7 @@ NULL
|
||||||
sds sizes = sdsempty();
|
sds sizes = sdsempty();
|
||||||
sizes = sdscatprintf(sizes,"bits:%d ",(sizeof(void*) == 8)?64:32);
|
sizes = sdscatprintf(sizes,"bits:%d ",(sizeof(void*) == 8)?64:32);
|
||||||
sizes = sdscatprintf(sizes,"robj:%d ",(int)sizeof(robj));
|
sizes = sdscatprintf(sizes,"robj:%d ",(int)sizeof(robj));
|
||||||
sizes = sdscatprintf(sizes,"dictentry:%d ",(int)sizeof(dictEntry));
|
sizes = sdscatprintf(sizes,"dictentry:%d ",(int)dictEntryMemUsage());
|
||||||
sizes = sdscatprintf(sizes,"sdshdr5:%d ",(int)sizeof(struct sdshdr5));
|
sizes = sdscatprintf(sizes,"sdshdr5:%d ",(int)sizeof(struct sdshdr5));
|
||||||
sizes = sdscatprintf(sizes,"sdshdr8:%d ",(int)sizeof(struct sdshdr8));
|
sizes = sdscatprintf(sizes,"sdshdr8:%d ",(int)sizeof(struct sdshdr8));
|
||||||
sizes = sdscatprintf(sizes,"sdshdr16:%d ",(int)sizeof(struct sdshdr16));
|
sizes = sdscatprintf(sizes,"sdshdr16:%d ",(int)sizeof(struct sdshdr16));
|
||||||
|
|
468
src/defrag.c
468
src/defrag.c
|
@ -45,10 +45,6 @@
|
||||||
* pointers are worthwhile moving and which aren't */
|
* pointers are worthwhile moving and which aren't */
|
||||||
int je_get_defrag_hint(void* ptr);
|
int je_get_defrag_hint(void* ptr);
|
||||||
|
|
||||||
/* forward declarations*/
|
|
||||||
void defragDictBucketCallback(dict *d, dictEntry **bucketref);
|
|
||||||
dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
|
|
||||||
|
|
||||||
/* Defrag helper for generic allocations.
|
/* Defrag helper for generic allocations.
|
||||||
*
|
*
|
||||||
* returns NULL in case the allocation wasn't moved.
|
* returns NULL in case the allocation wasn't moved.
|
||||||
|
@ -68,6 +64,7 @@ void* activeDefragAlloc(void *ptr) {
|
||||||
newptr = zmalloc_no_tcache(size);
|
newptr = zmalloc_no_tcache(size);
|
||||||
memcpy(newptr, ptr, size);
|
memcpy(newptr, ptr, size);
|
||||||
zfree_no_tcache(ptr);
|
zfree_no_tcache(ptr);
|
||||||
|
server.stat_active_defrag_hits++;
|
||||||
return newptr;
|
return newptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -92,7 +89,7 @@ sds activeDefragSds(sds sdsptr) {
|
||||||
* returns NULL in case the allocation wasn't moved.
|
* returns NULL in case the allocation wasn't moved.
|
||||||
* when it returns a non-null value, the old pointer was already released
|
* when it returns a non-null value, the old pointer was already released
|
||||||
* and should NOT be accessed. */
|
* and should NOT be accessed. */
|
||||||
robj *activeDefragStringOb(robj* ob, long *defragged) {
|
robj *activeDefragStringOb(robj* ob) {
|
||||||
robj *ret = NULL;
|
robj *ret = NULL;
|
||||||
if (ob->refcount!=1)
|
if (ob->refcount!=1)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -101,7 +98,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) {
|
||||||
if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
|
if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
|
||||||
if ((ret = activeDefragAlloc(ob))) {
|
if ((ret = activeDefragAlloc(ob))) {
|
||||||
ob = ret;
|
ob = ret;
|
||||||
(*defragged)++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +107,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) {
|
||||||
sds newsds = activeDefragSds((sds)ob->ptr);
|
sds newsds = activeDefragSds((sds)ob->ptr);
|
||||||
if (newsds) {
|
if (newsds) {
|
||||||
ob->ptr = newsds;
|
ob->ptr = newsds;
|
||||||
(*defragged)++;
|
|
||||||
}
|
}
|
||||||
} else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
|
} else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
|
||||||
/* The sds is embedded in the object allocation, calculate the
|
/* The sds is embedded in the object allocation, calculate the
|
||||||
|
@ -119,7 +114,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) {
|
||||||
long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
|
long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
|
||||||
if ((ret = activeDefragAlloc(ob))) {
|
if ((ret = activeDefragAlloc(ob))) {
|
||||||
ret->ptr = (void*)((intptr_t)ret + ofs);
|
ret->ptr = (void*)((intptr_t)ret + ofs);
|
||||||
(*defragged)++;
|
|
||||||
}
|
}
|
||||||
} else if (ob->encoding!=OBJ_ENCODING_INT) {
|
} else if (ob->encoding!=OBJ_ENCODING_INT) {
|
||||||
serverPanic("Unknown string encoding");
|
serverPanic("Unknown string encoding");
|
||||||
|
@ -133,68 +127,36 @@ robj *activeDefragStringOb(robj* ob, long *defragged) {
|
||||||
* returns NULL in case the allocation wasn't moved.
|
* returns NULL in case the allocation wasn't moved.
|
||||||
* when it returns a non-null value, the old pointer was already released
|
* when it returns a non-null value, the old pointer was already released
|
||||||
* and should NOT be accessed. */
|
* and should NOT be accessed. */
|
||||||
luaScript *activeDefragLuaScript(luaScript *script, long *defragged) {
|
luaScript *activeDefragLuaScript(luaScript *script) {
|
||||||
luaScript *ret = NULL;
|
luaScript *ret = NULL;
|
||||||
|
|
||||||
/* try to defrag script struct */
|
/* try to defrag script struct */
|
||||||
if ((ret = activeDefragAlloc(script))) {
|
if ((ret = activeDefragAlloc(script))) {
|
||||||
script = ret;
|
script = ret;
|
||||||
(*defragged)++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* try to defrag actual script object */
|
/* try to defrag actual script object */
|
||||||
robj *ob = activeDefragStringOb(script->body, defragged);
|
robj *ob = activeDefragStringOb(script->body);
|
||||||
if (ob) script->body = ob;
|
if (ob) script->body = ob;
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag helper for dictEntries to be used during dict iteration (called on
|
|
||||||
* each step). Returns a stat of how many pointers were moved. */
|
|
||||||
long dictIterDefragEntry(dictIterator *iter) {
|
|
||||||
/* This function is a little bit dirty since it messes with the internals
|
|
||||||
* of the dict and it's iterator, but the benefit is that it is very easy
|
|
||||||
* to use, and require no other changes in the dict. */
|
|
||||||
long defragged = 0;
|
|
||||||
/* Handle the next entry (if there is one), and update the pointer in the
|
|
||||||
* current entry. */
|
|
||||||
if (iter->nextEntry) {
|
|
||||||
dictEntry *newde = activeDefragAlloc(iter->nextEntry);
|
|
||||||
if (newde) {
|
|
||||||
defragged++;
|
|
||||||
iter->nextEntry = newde;
|
|
||||||
iter->entry->next = newde;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* handle the case of the first entry in the hash bucket. */
|
|
||||||
if (iter->d->ht_table[iter->table][iter->index] == iter->entry) {
|
|
||||||
dictEntry *newde = activeDefragAlloc(iter->entry);
|
|
||||||
if (newde) {
|
|
||||||
iter->entry = newde;
|
|
||||||
iter->d->ht_table[iter->table][iter->index] = newde;
|
|
||||||
defragged++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return defragged;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Defrag helper for dict main allocations (dict struct, and hash tables).
|
/* Defrag helper for dict main allocations (dict struct, and hash tables).
|
||||||
* receives a pointer to the dict* and implicitly updates it when the dict
|
* receives a pointer to the dict* and implicitly updates it when the dict
|
||||||
* struct itself was moved. Returns a stat of how many pointers were moved. */
|
* struct itself was moved. Returns a stat of how many pointers were moved. */
|
||||||
long dictDefragTables(dict* d) {
|
void dictDefragTables(dict* d) {
|
||||||
dictEntry **newtable;
|
dictEntry **newtable;
|
||||||
long defragged = 0;
|
|
||||||
/* handle the first hash table */
|
/* handle the first hash table */
|
||||||
newtable = activeDefragAlloc(d->ht_table[0]);
|
newtable = activeDefragAlloc(d->ht_table[0]);
|
||||||
if (newtable)
|
if (newtable)
|
||||||
defragged++, d->ht_table[0] = newtable;
|
d->ht_table[0] = newtable;
|
||||||
/* handle the second hash table */
|
/* handle the second hash table */
|
||||||
if (d->ht_table[1]) {
|
if (d->ht_table[1]) {
|
||||||
newtable = activeDefragAlloc(d->ht_table[1]);
|
newtable = activeDefragAlloc(d->ht_table[1]);
|
||||||
if (newtable)
|
if (newtable)
|
||||||
defragged++, d->ht_table[1] = newtable;
|
d->ht_table[1] = newtable;
|
||||||
}
|
}
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Internal function used by zslDefrag */
|
/* Internal function used by zslDefrag */
|
||||||
|
@ -258,19 +220,16 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
|
||||||
|
|
||||||
/* Defrag helper for sorted set.
|
/* Defrag helper for sorted set.
|
||||||
* Defrag a single dict entry key name, and corresponding skiplist struct */
|
* Defrag a single dict entry key name, and corresponding skiplist struct */
|
||||||
long activeDefragZsetEntry(zset *zs, dictEntry *de) {
|
void activeDefragZsetEntry(zset *zs, dictEntry *de) {
|
||||||
sds newsds;
|
sds newsds;
|
||||||
double* newscore;
|
double* newscore;
|
||||||
long defragged = 0;
|
|
||||||
sds sdsele = dictGetKey(de);
|
sds sdsele = dictGetKey(de);
|
||||||
if ((newsds = activeDefragSds(sdsele)))
|
if ((newsds = activeDefragSds(sdsele)))
|
||||||
defragged++, de->key = newsds;
|
dictSetKey(zs->dict, de, newsds);
|
||||||
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
|
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
|
||||||
if (newscore) {
|
if (newscore) {
|
||||||
dictSetVal(zs->dict, de, newscore);
|
dictSetVal(zs->dict, de, newscore);
|
||||||
defragged++;
|
|
||||||
}
|
}
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#define DEFRAG_SDS_DICT_NO_VAL 0
|
#define DEFRAG_SDS_DICT_NO_VAL 0
|
||||||
|
@ -279,43 +238,51 @@ long activeDefragZsetEntry(zset *zs, dictEntry *de) {
|
||||||
#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3
|
#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3
|
||||||
#define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4
|
#define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4
|
||||||
|
|
||||||
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
|
typedef struct {
|
||||||
long activeDefragSdsDict(dict* d, int val_type) {
|
dict *dict;
|
||||||
dictIterator *di;
|
int val_type;
|
||||||
dictEntry *de;
|
} activeDefragSdsDictData;
|
||||||
long defragged = 0;
|
|
||||||
di = dictGetIterator(d);
|
void activeDefragSdsDictCallback(void *privdata, const dictEntry *_de) {
|
||||||
while((de = dictNext(di)) != NULL) {
|
dictEntry *de = (dictEntry*)_de;
|
||||||
|
activeDefragSdsDictData *data = privdata;
|
||||||
|
dict *d = data->dict;
|
||||||
|
int val_type = data->val_type;
|
||||||
sds sdsele = dictGetKey(de), newsds;
|
sds sdsele = dictGetKey(de), newsds;
|
||||||
if ((newsds = activeDefragSds(sdsele)))
|
if ((newsds = activeDefragSds(sdsele)))
|
||||||
de->key = newsds, defragged++;
|
dictSetKey(d, de, newsds);
|
||||||
/* defrag the value */
|
/* defrag the value */
|
||||||
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
||||||
sdsele = dictGetVal(de);
|
sdsele = dictGetVal(de);
|
||||||
if ((newsds = activeDefragSds(sdsele)))
|
if ((newsds = activeDefragSds(sdsele)))
|
||||||
de->v.val = newsds, defragged++;
|
dictSetVal(d, de, newsds);
|
||||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
||||||
robj *newele, *ele = dictGetVal(de);
|
robj *newele, *ele = dictGetVal(de);
|
||||||
if ((newele = activeDefragStringOb(ele, &defragged)))
|
if ((newele = activeDefragStringOb(ele)))
|
||||||
de->v.val = newele;
|
dictSetVal(d, de, newele);
|
||||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
||||||
void *newptr, *ptr = dictGetVal(de);
|
void *newptr, *ptr = dictGetVal(de);
|
||||||
if ((newptr = activeDefragAlloc(ptr)))
|
if ((newptr = activeDefragAlloc(ptr)))
|
||||||
de->v.val = newptr, defragged++;
|
dictSetVal(d, de, newptr);
|
||||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT) {
|
} else if (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT) {
|
||||||
void *newptr, *ptr = dictGetVal(de);
|
void *newptr, *ptr = dictGetVal(de);
|
||||||
if ((newptr = activeDefragLuaScript(ptr, &defragged)))
|
if ((newptr = activeDefragLuaScript(ptr)))
|
||||||
de->v.val = newptr;
|
dictSetVal(d, de, newptr);
|
||||||
}
|
}
|
||||||
defragged += dictIterDefragEntry(di);
|
}
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
|
||||||
return defragged;
|
void activeDefragSdsDict(dict* d, int val_type) {
|
||||||
|
activeDefragSdsDictData data = {d, val_type};
|
||||||
|
unsigned long cursor = 0;
|
||||||
|
do {
|
||||||
|
cursor = dictScanDefrag(d, cursor, activeDefragSdsDictCallback,
|
||||||
|
activeDefragAlloc, &data);
|
||||||
|
} while (cursor != 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag a list of ptr, sds or robj string values */
|
/* Defrag a list of ptr, sds or robj string values */
|
||||||
long activeDefragList(list *l, int val_type) {
|
void activeDefragList(list *l, int val_type) {
|
||||||
long defragged = 0;
|
|
||||||
listNode *ln, *newln;
|
listNode *ln, *newln;
|
||||||
for (ln = l->head; ln; ln = ln->next) {
|
for (ln = l->head; ln; ln = ln->next) {
|
||||||
if ((newln = activeDefragAlloc(ln))) {
|
if ((newln = activeDefragAlloc(ln))) {
|
||||||
|
@ -328,107 +295,25 @@ long activeDefragList(list *l, int val_type) {
|
||||||
else
|
else
|
||||||
l->tail = newln;
|
l->tail = newln;
|
||||||
ln = newln;
|
ln = newln;
|
||||||
defragged++;
|
|
||||||
}
|
}
|
||||||
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
||||||
sds newsds, sdsele = ln->value;
|
sds newsds, sdsele = ln->value;
|
||||||
if ((newsds = activeDefragSds(sdsele)))
|
if ((newsds = activeDefragSds(sdsele)))
|
||||||
ln->value = newsds, defragged++;
|
ln->value = newsds;
|
||||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
||||||
robj *newele, *ele = ln->value;
|
robj *newele, *ele = ln->value;
|
||||||
if ((newele = activeDefragStringOb(ele, &defragged)))
|
if ((newele = activeDefragStringOb(ele)))
|
||||||
ln->value = newele;
|
ln->value = newele;
|
||||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
||||||
void *newptr, *ptr = ln->value;
|
void *newptr, *ptr = ln->value;
|
||||||
if ((newptr = activeDefragAlloc(ptr)))
|
if ((newptr = activeDefragAlloc(ptr)))
|
||||||
ln->value = newptr, defragged++;
|
ln->value = newptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag a list of sds values and a dict with the same sds keys */
|
void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
|
||||||
long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) {
|
|
||||||
long defragged = 0;
|
|
||||||
sds newsds, sdsele;
|
|
||||||
listNode *ln, *newln;
|
|
||||||
dictIterator *di;
|
|
||||||
dictEntry *de;
|
|
||||||
/* Defrag the list and it's sds values */
|
|
||||||
for (ln = l->head; ln; ln = ln->next) {
|
|
||||||
if ((newln = activeDefragAlloc(ln))) {
|
|
||||||
if (newln->prev)
|
|
||||||
newln->prev->next = newln;
|
|
||||||
else
|
|
||||||
l->head = newln;
|
|
||||||
if (newln->next)
|
|
||||||
newln->next->prev = newln;
|
|
||||||
else
|
|
||||||
l->tail = newln;
|
|
||||||
ln = newln;
|
|
||||||
defragged++;
|
|
||||||
}
|
|
||||||
sdsele = ln->value;
|
|
||||||
if ((newsds = activeDefragSds(sdsele))) {
|
|
||||||
/* When defragging an sds value, we need to update the dict key */
|
|
||||||
uint64_t hash = dictGetHash(d, newsds);
|
|
||||||
dictEntry **deref = dictFindEntryRefByPtrAndHash(d, sdsele, hash);
|
|
||||||
if (deref)
|
|
||||||
(*deref)->key = newsds;
|
|
||||||
ln->value = newsds;
|
|
||||||
defragged++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Defrag the dict values (keys were already handled) */
|
|
||||||
di = dictGetIterator(d);
|
|
||||||
while((de = dictNext(di)) != NULL) {
|
|
||||||
if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
|
||||||
sds newsds, sdsele = dictGetVal(de);
|
|
||||||
if ((newsds = activeDefragSds(sdsele)))
|
|
||||||
de->v.val = newsds, defragged++;
|
|
||||||
} else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
|
||||||
robj *newele, *ele = dictGetVal(de);
|
|
||||||
if ((newele = activeDefragStringOb(ele, &defragged)))
|
|
||||||
de->v.val = newele;
|
|
||||||
} else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
|
||||||
void *newptr, *ptr = dictGetVal(de);
|
|
||||||
if ((newptr = activeDefragAlloc(ptr)))
|
|
||||||
de->v.val = newptr, defragged++;
|
|
||||||
}
|
|
||||||
defragged += dictIterDefragEntry(di);
|
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
|
||||||
|
|
||||||
return defragged;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Utility function that replaces an old key pointer in the dictionary with a
|
|
||||||
* new pointer. Additionally, we try to defrag the dictEntry in that dict.
|
|
||||||
* Oldkey may be a dead pointer and should not be accessed (we get a
|
|
||||||
* pre-calculated hash value). Newkey may be null if the key pointer wasn't
|
|
||||||
* moved. Return value is the dictEntry if found, or NULL if not found.
|
|
||||||
* NOTE: this is very ugly code, but it let's us avoid the complication of
|
|
||||||
* doing a scan on another dict. */
|
|
||||||
dictEntry* replaceSatelliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) {
|
|
||||||
dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash);
|
|
||||||
if (deref) {
|
|
||||||
dictEntry *de = *deref;
|
|
||||||
dictEntry *newde = activeDefragAlloc(de);
|
|
||||||
if (newde) {
|
|
||||||
de = *deref = newde;
|
|
||||||
(*defragged)++;
|
|
||||||
}
|
|
||||||
if (newkey)
|
|
||||||
de->key = newkey;
|
|
||||||
return de;
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
|
|
||||||
quicklistNode *newnode, *node = *node_ref;
|
quicklistNode *newnode, *node = *node_ref;
|
||||||
long defragged = 0;
|
|
||||||
unsigned char *newzl;
|
unsigned char *newzl;
|
||||||
if ((newnode = activeDefragAlloc(node))) {
|
if ((newnode = activeDefragAlloc(node))) {
|
||||||
if (newnode->prev)
|
if (newnode->prev)
|
||||||
|
@ -440,21 +325,17 @@ long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
|
||||||
else
|
else
|
||||||
ql->tail = newnode;
|
ql->tail = newnode;
|
||||||
*node_ref = node = newnode;
|
*node_ref = node = newnode;
|
||||||
defragged++;
|
|
||||||
}
|
}
|
||||||
if ((newzl = activeDefragAlloc(node->entry)))
|
if ((newzl = activeDefragAlloc(node->entry)))
|
||||||
defragged++, node->entry = newzl;
|
node->entry = newzl;
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long activeDefragQuickListNodes(quicklist *ql) {
|
void activeDefragQuickListNodes(quicklist *ql) {
|
||||||
quicklistNode *node = ql->head;
|
quicklistNode *node = ql->head;
|
||||||
long defragged = 0;
|
|
||||||
while (node) {
|
while (node) {
|
||||||
defragged += activeDefragQuickListNode(ql, &node);
|
activeDefragQuickListNode(ql, &node);
|
||||||
node = node->next;
|
node = node->next;
|
||||||
}
|
}
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* when the value has lots of elements, we want to handle it later and not as
|
/* when the value has lots of elements, we want to handle it later and not as
|
||||||
|
@ -466,7 +347,7 @@ void defragLater(redisDb *db, dictEntry *kde) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
|
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
|
||||||
long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
|
long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) {
|
||||||
quicklist *ql = ob->ptr;
|
quicklist *ql = ob->ptr;
|
||||||
quicklistNode *node;
|
quicklistNode *node;
|
||||||
long iterations = 0;
|
long iterations = 0;
|
||||||
|
@ -489,7 +370,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long
|
||||||
|
|
||||||
(*cursor)++;
|
(*cursor)++;
|
||||||
while (node) {
|
while (node) {
|
||||||
(*defragged) += activeDefragQuickListNode(ql, &node);
|
activeDefragQuickListNode(ql, &node);
|
||||||
server.stat_active_defrag_scanned++;
|
server.stat_active_defrag_scanned++;
|
||||||
if (++iterations > 128 && !bookmark_failed) {
|
if (++iterations > 128 && !bookmark_failed) {
|
||||||
if (ustime() > endtime) {
|
if (ustime() > endtime) {
|
||||||
|
@ -511,82 +392,79 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
zset *zs;
|
zset *zs;
|
||||||
long defragged;
|
|
||||||
} scanLaterZsetData;
|
} scanLaterZsetData;
|
||||||
|
|
||||||
void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
|
void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
|
||||||
dictEntry *de = (dictEntry*)_de;
|
dictEntry *de = (dictEntry*)_de;
|
||||||
scanLaterZsetData *data = privdata;
|
scanLaterZsetData *data = privdata;
|
||||||
data->defragged += activeDefragZsetEntry(data->zs, de);
|
activeDefragZsetEntry(data->zs, de);
|
||||||
server.stat_active_defrag_scanned++;
|
server.stat_active_defrag_scanned++;
|
||||||
}
|
}
|
||||||
|
|
||||||
long scanLaterZset(robj *ob, unsigned long *cursor) {
|
void scanLaterZset(robj *ob, unsigned long *cursor) {
|
||||||
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
|
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
|
||||||
return 0;
|
return;
|
||||||
zset *zs = (zset*)ob->ptr;
|
zset *zs = (zset*)ob->ptr;
|
||||||
dict *d = zs->dict;
|
dict *d = zs->dict;
|
||||||
scanLaterZsetData data = {zs, 0};
|
scanLaterZsetData data = {zs};
|
||||||
*cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data);
|
*cursor = dictScanDefrag(d, *cursor, scanLaterZsetCallback, activeDefragAlloc, &data);
|
||||||
return data.defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
dict *dict;
|
||||||
|
} scanLaterDictData;
|
||||||
|
|
||||||
void scanLaterSetCallback(void *privdata, const dictEntry *_de) {
|
void scanLaterSetCallback(void *privdata, const dictEntry *_de) {
|
||||||
dictEntry *de = (dictEntry*)_de;
|
dictEntry *de = (dictEntry*)_de;
|
||||||
long *defragged = privdata;
|
scanLaterDictData *data = privdata;
|
||||||
sds sdsele = dictGetKey(de), newsds;
|
sds sdsele = dictGetKey(de), newsds;
|
||||||
if ((newsds = activeDefragSds(sdsele)))
|
if ((newsds = activeDefragSds(sdsele)))
|
||||||
(*defragged)++, de->key = newsds;
|
dictSetKey(data->dict, de, newsds);
|
||||||
server.stat_active_defrag_scanned++;
|
server.stat_active_defrag_scanned++;
|
||||||
}
|
}
|
||||||
|
|
||||||
long scanLaterSet(robj *ob, unsigned long *cursor) {
|
void scanLaterSet(robj *ob, unsigned long *cursor) {
|
||||||
long defragged = 0;
|
|
||||||
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT)
|
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT)
|
||||||
return 0;
|
return;
|
||||||
dict *d = ob->ptr;
|
dict *d = ob->ptr;
|
||||||
*cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged);
|
scanLaterDictData data = {d};
|
||||||
return defragged;
|
*cursor = dictScanDefrag(d, *cursor, scanLaterSetCallback, activeDefragAlloc, &data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void scanLaterHashCallback(void *privdata, const dictEntry *_de) {
|
void scanLaterHashCallback(void *privdata, const dictEntry *_de) {
|
||||||
dictEntry *de = (dictEntry*)_de;
|
dictEntry *de = (dictEntry*)_de;
|
||||||
long *defragged = privdata;
|
scanLaterDictData *data = privdata;
|
||||||
sds sdsele = dictGetKey(de), newsds;
|
sds sdsele = dictGetKey(de), newsds;
|
||||||
if ((newsds = activeDefragSds(sdsele)))
|
if ((newsds = activeDefragSds(sdsele)))
|
||||||
(*defragged)++, de->key = newsds;
|
dictSetKey(data->dict, de, newsds);
|
||||||
sdsele = dictGetVal(de);
|
sdsele = dictGetVal(de);
|
||||||
if ((newsds = activeDefragSds(sdsele)))
|
if ((newsds = activeDefragSds(sdsele)))
|
||||||
(*defragged)++, de->v.val = newsds;
|
dictSetVal(data->dict, de, newsds);
|
||||||
server.stat_active_defrag_scanned++;
|
server.stat_active_defrag_scanned++;
|
||||||
}
|
}
|
||||||
|
|
||||||
long scanLaterHash(robj *ob, unsigned long *cursor) {
|
void scanLaterHash(robj *ob, unsigned long *cursor) {
|
||||||
long defragged = 0;
|
|
||||||
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT)
|
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT)
|
||||||
return 0;
|
return;
|
||||||
dict *d = ob->ptr;
|
dict *d = ob->ptr;
|
||||||
*cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged);
|
scanLaterDictData data = {d};
|
||||||
return defragged;
|
*cursor = dictScanDefrag(d, *cursor, scanLaterHashCallback, activeDefragAlloc, &data);
|
||||||
}
|
}
|
||||||
|
|
||||||
long defragQuicklist(redisDb *db, dictEntry *kde) {
|
void defragQuicklist(redisDb *db, dictEntry *kde) {
|
||||||
robj *ob = dictGetVal(kde);
|
robj *ob = dictGetVal(kde);
|
||||||
long defragged = 0;
|
|
||||||
quicklist *ql = ob->ptr, *newql;
|
quicklist *ql = ob->ptr, *newql;
|
||||||
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
|
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
|
||||||
if ((newql = activeDefragAlloc(ql)))
|
if ((newql = activeDefragAlloc(ql)))
|
||||||
defragged++, ob->ptr = ql = newql;
|
ob->ptr = ql = newql;
|
||||||
if (ql->len > server.active_defrag_max_scan_fields)
|
if (ql->len > server.active_defrag_max_scan_fields)
|
||||||
defragLater(db, kde);
|
defragLater(db, kde);
|
||||||
else
|
else
|
||||||
defragged += activeDefragQuickListNodes(ql);
|
activeDefragQuickListNodes(ql);
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
|
void defragZsetSkiplist(redisDb *db, dictEntry *kde) {
|
||||||
robj *ob = dictGetVal(kde);
|
robj *ob = dictGetVal(kde);
|
||||||
long defragged = 0;
|
|
||||||
zset *zs = (zset*)ob->ptr;
|
zset *zs = (zset*)ob->ptr;
|
||||||
zset *newzs;
|
zset *newzs;
|
||||||
zskiplist *newzsl;
|
zskiplist *newzsl;
|
||||||
|
@ -595,30 +473,28 @@ long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
|
||||||
struct zskiplistNode *newheader;
|
struct zskiplistNode *newheader;
|
||||||
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
|
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
|
||||||
if ((newzs = activeDefragAlloc(zs)))
|
if ((newzs = activeDefragAlloc(zs)))
|
||||||
defragged++, ob->ptr = zs = newzs;
|
ob->ptr = zs = newzs;
|
||||||
if ((newzsl = activeDefragAlloc(zs->zsl)))
|
if ((newzsl = activeDefragAlloc(zs->zsl)))
|
||||||
defragged++, zs->zsl = newzsl;
|
zs->zsl = newzsl;
|
||||||
if ((newheader = activeDefragAlloc(zs->zsl->header)))
|
if ((newheader = activeDefragAlloc(zs->zsl->header)))
|
||||||
defragged++, zs->zsl->header = newheader;
|
zs->zsl->header = newheader;
|
||||||
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)
|
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)
|
||||||
defragLater(db, kde);
|
defragLater(db, kde);
|
||||||
else {
|
else {
|
||||||
dictIterator *di = dictGetIterator(zs->dict);
|
dictIterator *di = dictGetIterator(zs->dict);
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
defragged += activeDefragZsetEntry(zs, de);
|
activeDefragZsetEntry(zs, de);
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
}
|
}
|
||||||
/* handle the dict struct */
|
/* handle the dict struct */
|
||||||
if ((newdict = activeDefragAlloc(zs->dict)))
|
if ((newdict = activeDefragAlloc(zs->dict)))
|
||||||
defragged++, zs->dict = newdict;
|
zs->dict = newdict;
|
||||||
/* defrag the dict tables */
|
/* defrag the dict tables */
|
||||||
defragged += dictDefragTables(zs->dict);
|
dictDefragTables(zs->dict);
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long defragHash(redisDb *db, dictEntry *kde) {
|
void defragHash(redisDb *db, dictEntry *kde) {
|
||||||
long defragged = 0;
|
|
||||||
robj *ob = dictGetVal(kde);
|
robj *ob = dictGetVal(kde);
|
||||||
dict *d, *newd;
|
dict *d, *newd;
|
||||||
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
|
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
|
||||||
|
@ -626,17 +502,15 @@ long defragHash(redisDb *db, dictEntry *kde) {
|
||||||
if (dictSize(d) > server.active_defrag_max_scan_fields)
|
if (dictSize(d) > server.active_defrag_max_scan_fields)
|
||||||
defragLater(db, kde);
|
defragLater(db, kde);
|
||||||
else
|
else
|
||||||
defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
|
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
|
||||||
/* handle the dict struct */
|
/* handle the dict struct */
|
||||||
if ((newd = activeDefragAlloc(ob->ptr)))
|
if ((newd = activeDefragAlloc(ob->ptr)))
|
||||||
defragged++, ob->ptr = newd;
|
ob->ptr = newd;
|
||||||
/* defrag the dict tables */
|
/* defrag the dict tables */
|
||||||
defragged += dictDefragTables(ob->ptr);
|
dictDefragTables(ob->ptr);
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
long defragSet(redisDb *db, dictEntry *kde) {
|
void defragSet(redisDb *db, dictEntry *kde) {
|
||||||
long defragged = 0;
|
|
||||||
robj *ob = dictGetVal(kde);
|
robj *ob = dictGetVal(kde);
|
||||||
dict *d, *newd;
|
dict *d, *newd;
|
||||||
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
|
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
|
||||||
|
@ -644,13 +518,12 @@ long defragSet(redisDb *db, dictEntry *kde) {
|
||||||
if (dictSize(d) > server.active_defrag_max_scan_fields)
|
if (dictSize(d) > server.active_defrag_max_scan_fields)
|
||||||
defragLater(db, kde);
|
defragLater(db, kde);
|
||||||
else
|
else
|
||||||
defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
|
activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
|
||||||
/* handle the dict struct */
|
/* handle the dict struct */
|
||||||
if ((newd = activeDefragAlloc(ob->ptr)))
|
if ((newd = activeDefragAlloc(ob->ptr)))
|
||||||
defragged++, ob->ptr = newd;
|
ob->ptr = newd;
|
||||||
/* defrag the dict tables */
|
/* defrag the dict tables */
|
||||||
defragged += dictDefragTables(ob->ptr);
|
dictDefragTables(ob->ptr);
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag callback for radix tree iterator, called for each node,
|
/* Defrag callback for radix tree iterator, called for each node,
|
||||||
|
@ -665,7 +538,7 @@ int defragRaxNode(raxNode **noderef) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
|
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
|
||||||
int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
|
int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) {
|
||||||
static unsigned char last[sizeof(streamID)];
|
static unsigned char last[sizeof(streamID)];
|
||||||
raxIterator ri;
|
raxIterator ri;
|
||||||
long iterations = 0;
|
long iterations = 0;
|
||||||
|
@ -699,7 +572,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime,
|
||||||
while (raxNext(&ri)) {
|
while (raxNext(&ri)) {
|
||||||
void *newdata = activeDefragAlloc(ri.data);
|
void *newdata = activeDefragAlloc(ri.data);
|
||||||
if (newdata)
|
if (newdata)
|
||||||
raxSetData(ri.node, ri.data=newdata), (*defragged)++;
|
raxSetData(ri.node, ri.data=newdata);
|
||||||
server.stat_active_defrag_scanned++;
|
server.stat_active_defrag_scanned++;
|
||||||
if (++iterations > 128) {
|
if (++iterations > 128) {
|
||||||
if (ustime() > endtime) {
|
if (ustime() > endtime) {
|
||||||
|
@ -717,19 +590,18 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* optional callback used defrag each rax element (not including the element pointer itself) */
|
/* optional callback used defrag each rax element (not including the element pointer itself) */
|
||||||
typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged);
|
typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata);
|
||||||
|
|
||||||
/* defrag radix tree including:
|
/* defrag radix tree including:
|
||||||
* 1) rax struct
|
* 1) rax struct
|
||||||
* 2) rax nodes
|
* 2) rax nodes
|
||||||
* 3) rax entry data (only if defrag_data is specified)
|
* 3) rax entry data (only if defrag_data is specified)
|
||||||
* 4) call a callback per element, and allow the callback to return a new pointer for the element */
|
* 4) call a callback per element, and allow the callback to return a new pointer for the element */
|
||||||
long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
|
void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
|
||||||
long defragged = 0;
|
|
||||||
raxIterator ri;
|
raxIterator ri;
|
||||||
rax* rax;
|
rax* rax;
|
||||||
if ((rax = activeDefragAlloc(*raxref)))
|
if ((rax = activeDefragAlloc(*raxref)))
|
||||||
defragged++, *raxref = rax;
|
*raxref = rax;
|
||||||
rax = *raxref;
|
rax = *raxref;
|
||||||
raxStart(&ri,rax);
|
raxStart(&ri,rax);
|
||||||
ri.node_cb = defragRaxNode;
|
ri.node_cb = defragRaxNode;
|
||||||
|
@ -738,14 +610,13 @@ long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_c
|
||||||
while (raxNext(&ri)) {
|
while (raxNext(&ri)) {
|
||||||
void *newdata = NULL;
|
void *newdata = NULL;
|
||||||
if (element_cb)
|
if (element_cb)
|
||||||
newdata = element_cb(&ri, element_cb_data, &defragged);
|
newdata = element_cb(&ri, element_cb_data);
|
||||||
if (defrag_data && !newdata)
|
if (defrag_data && !newdata)
|
||||||
newdata = activeDefragAlloc(ri.data);
|
newdata = activeDefragAlloc(ri.data);
|
||||||
if (newdata)
|
if (newdata)
|
||||||
raxSetData(ri.node, ri.data=newdata), defragged++;
|
raxSetData(ri.node, ri.data=newdata);
|
||||||
}
|
}
|
||||||
raxStop(&ri);
|
raxStop(&ri);
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -753,8 +624,7 @@ typedef struct {
|
||||||
streamConsumer *c;
|
streamConsumer *c;
|
||||||
} PendingEntryContext;
|
} PendingEntryContext;
|
||||||
|
|
||||||
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) {
|
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
|
||||||
UNUSED(defragged);
|
|
||||||
PendingEntryContext *ctx = privdata;
|
PendingEntryContext *ctx = privdata;
|
||||||
streamNACK *nack = ri->data, *newnack;
|
streamNACK *nack = ri->data, *newnack;
|
||||||
nack->consumer = ctx->c; /* update nack pointer to consumer */
|
nack->consumer = ctx->c; /* update nack pointer to consumer */
|
||||||
|
@ -764,102 +634,96 @@ void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *de
|
||||||
void *prev;
|
void *prev;
|
||||||
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
|
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
|
||||||
serverAssert(prev==nack);
|
serverAssert(prev==nack);
|
||||||
/* note: we don't increment 'defragged' that's done by the caller */
|
|
||||||
}
|
}
|
||||||
return newnack;
|
return newnack;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) {
|
void* defragStreamConsumer(raxIterator *ri, void *privdata) {
|
||||||
streamConsumer *c = ri->data;
|
streamConsumer *c = ri->data;
|
||||||
streamCG *cg = privdata;
|
streamCG *cg = privdata;
|
||||||
void *newc = activeDefragAlloc(c);
|
void *newc = activeDefragAlloc(c);
|
||||||
if (newc) {
|
if (newc) {
|
||||||
/* note: we don't increment 'defragged' that's done by the caller */
|
|
||||||
c = newc;
|
c = newc;
|
||||||
}
|
}
|
||||||
sds newsds = activeDefragSds(c->name);
|
sds newsds = activeDefragSds(c->name);
|
||||||
if (newsds)
|
if (newsds)
|
||||||
(*defragged)++, c->name = newsds;
|
c->name = newsds;
|
||||||
if (c->pel) {
|
if (c->pel) {
|
||||||
PendingEntryContext pel_ctx = {cg, c};
|
PendingEntryContext pel_ctx = {cg, c};
|
||||||
*defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
|
defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
|
||||||
}
|
}
|
||||||
return newc; /* returns NULL if c was not defragged */
|
return newc; /* returns NULL if c was not defragged */
|
||||||
}
|
}
|
||||||
|
|
||||||
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) {
|
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
|
||||||
streamCG *cg = ri->data;
|
streamCG *cg = ri->data;
|
||||||
UNUSED(privdata);
|
UNUSED(privdata);
|
||||||
if (cg->consumers)
|
if (cg->consumers)
|
||||||
*defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
|
defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
|
||||||
if (cg->pel)
|
if (cg->pel)
|
||||||
*defragged += defragRadixTree(&cg->pel, 0, NULL, NULL);
|
defragRadixTree(&cg->pel, 0, NULL, NULL);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
long defragStream(redisDb *db, dictEntry *kde) {
|
void defragStream(redisDb *db, dictEntry *kde) {
|
||||||
long defragged = 0;
|
|
||||||
robj *ob = dictGetVal(kde);
|
robj *ob = dictGetVal(kde);
|
||||||
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
|
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
|
||||||
stream *s = ob->ptr, *news;
|
stream *s = ob->ptr, *news;
|
||||||
|
|
||||||
/* handle the main struct */
|
/* handle the main struct */
|
||||||
if ((news = activeDefragAlloc(s)))
|
if ((news = activeDefragAlloc(s)))
|
||||||
defragged++, ob->ptr = s = news;
|
ob->ptr = s = news;
|
||||||
|
|
||||||
if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
|
if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
|
||||||
rax *newrax = activeDefragAlloc(s->rax);
|
rax *newrax = activeDefragAlloc(s->rax);
|
||||||
if (newrax)
|
if (newrax)
|
||||||
defragged++, s->rax = newrax;
|
s->rax = newrax;
|
||||||
defragLater(db, kde);
|
defragLater(db, kde);
|
||||||
} else
|
} else
|
||||||
defragged += defragRadixTree(&s->rax, 1, NULL, NULL);
|
defragRadixTree(&s->rax, 1, NULL, NULL);
|
||||||
|
|
||||||
if (s->cgroups)
|
if (s->cgroups)
|
||||||
defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
|
defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag a module key. This is either done immediately or scheduled
|
/* Defrag a module key. This is either done immediately or scheduled
|
||||||
* for later. Returns then number of pointers defragged.
|
* for later. Returns then number of pointers defragged.
|
||||||
*/
|
*/
|
||||||
long defragModule(redisDb *db, dictEntry *kde) {
|
void defragModule(redisDb *db, dictEntry *kde) {
|
||||||
robj *obj = dictGetVal(kde);
|
robj *obj = dictGetVal(kde);
|
||||||
serverAssert(obj->type == OBJ_MODULE);
|
serverAssert(obj->type == OBJ_MODULE);
|
||||||
long defragged = 0;
|
|
||||||
|
|
||||||
if (!moduleDefragValue(dictGetKey(kde), obj, &defragged, db->id))
|
if (!moduleDefragValue(dictGetKey(kde), obj, db->id))
|
||||||
defragLater(db, kde);
|
defragLater(db, kde);
|
||||||
|
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* for each key we scan in the main dict, this function will attempt to defrag
|
/* for each key we scan in the main dict, this function will attempt to defrag
|
||||||
* all the various pointers it has. Returns a stat of how many pointers were
|
* all the various pointers it has. Returns a stat of how many pointers were
|
||||||
* moved. */
|
* moved. */
|
||||||
long defragKey(redisDb *db, dictEntry *de) {
|
void defragKey(redisDb *db, dictEntry *de) {
|
||||||
sds keysds = dictGetKey(de);
|
sds keysds = dictGetKey(de);
|
||||||
robj *newob, *ob;
|
robj *newob, *ob;
|
||||||
unsigned char *newzl;
|
unsigned char *newzl;
|
||||||
long defragged = 0;
|
|
||||||
sds newsds;
|
sds newsds;
|
||||||
|
|
||||||
/* Try to defrag the key name. */
|
/* Try to defrag the key name. */
|
||||||
newsds = activeDefragSds(keysds);
|
newsds = activeDefragSds(keysds);
|
||||||
if (newsds)
|
if (newsds) {
|
||||||
defragged++, de->key = newsds;
|
dictSetKey(db->dict, de, newsds);
|
||||||
if (dictSize(db->expires)) {
|
if (dictSize(db->expires)) {
|
||||||
/* Dirty code:
|
/* We can't search in db->expires for that key after we've released
|
||||||
* I can't search in db->expires for that key after i already released
|
* the pointer it holds, since it won't be able to do the string
|
||||||
* the pointer it holds it won't be able to do the string compare */
|
* compare, but we can find the entry using key hash and pointer. */
|
||||||
uint64_t hash = dictGetHash(db->dict, de->key);
|
uint64_t hash = dictGetHash(db->dict, newsds);
|
||||||
replaceSatelliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
|
dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires, keysds, hash);
|
||||||
|
if (expire_de) dictSetKey(db->expires, expire_de, newsds);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Try to defrag robj and / or string value. */
|
/* Try to defrag robj and / or string value. */
|
||||||
ob = dictGetVal(de);
|
ob = dictGetVal(de);
|
||||||
if ((newob = activeDefragStringOb(ob, &defragged))) {
|
if ((newob = activeDefragStringOb(ob))) {
|
||||||
de->v.val = newob;
|
dictSetVal(db->dict, de, newob);
|
||||||
ob = newob;
|
ob = newob;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -867,78 +731,69 @@ long defragKey(redisDb *db, dictEntry *de) {
|
||||||
/* Already handled in activeDefragStringOb. */
|
/* Already handled in activeDefragStringOb. */
|
||||||
} else if (ob->type == OBJ_LIST) {
|
} else if (ob->type == OBJ_LIST) {
|
||||||
if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
|
if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
|
||||||
defragged += defragQuicklist(db, de);
|
defragQuicklist(db, de);
|
||||||
} else if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
} else if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||||
defragged++, ob->ptr = newzl;
|
ob->ptr = newzl;
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown list encoding");
|
serverPanic("Unknown list encoding");
|
||||||
}
|
}
|
||||||
} else if (ob->type == OBJ_SET) {
|
} else if (ob->type == OBJ_SET) {
|
||||||
if (ob->encoding == OBJ_ENCODING_HT) {
|
if (ob->encoding == OBJ_ENCODING_HT) {
|
||||||
defragged += defragSet(db, de);
|
defragSet(db, de);
|
||||||
} else if (ob->encoding == OBJ_ENCODING_INTSET ||
|
} else if (ob->encoding == OBJ_ENCODING_INTSET ||
|
||||||
ob->encoding == OBJ_ENCODING_LISTPACK)
|
ob->encoding == OBJ_ENCODING_LISTPACK)
|
||||||
{
|
{
|
||||||
void *newptr, *ptr = ob->ptr;
|
void *newptr, *ptr = ob->ptr;
|
||||||
if ((newptr = activeDefragAlloc(ptr)))
|
if ((newptr = activeDefragAlloc(ptr)))
|
||||||
defragged++, ob->ptr = newptr;
|
ob->ptr = newptr;
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown set encoding");
|
serverPanic("Unknown set encoding");
|
||||||
}
|
}
|
||||||
} else if (ob->type == OBJ_ZSET) {
|
} else if (ob->type == OBJ_ZSET) {
|
||||||
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||||
defragged++, ob->ptr = newzl;
|
ob->ptr = newzl;
|
||||||
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
|
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||||
defragged += defragZsetSkiplist(db, de);
|
defragZsetSkiplist(db, de);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown sorted set encoding");
|
serverPanic("Unknown sorted set encoding");
|
||||||
}
|
}
|
||||||
} else if (ob->type == OBJ_HASH) {
|
} else if (ob->type == OBJ_HASH) {
|
||||||
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||||
defragged++, ob->ptr = newzl;
|
ob->ptr = newzl;
|
||||||
} else if (ob->encoding == OBJ_ENCODING_HT) {
|
} else if (ob->encoding == OBJ_ENCODING_HT) {
|
||||||
defragged += defragHash(db, de);
|
defragHash(db, de);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown hash encoding");
|
serverPanic("Unknown hash encoding");
|
||||||
}
|
}
|
||||||
} else if (ob->type == OBJ_STREAM) {
|
} else if (ob->type == OBJ_STREAM) {
|
||||||
defragged += defragStream(db, de);
|
defragStream(db, de);
|
||||||
} else if (ob->type == OBJ_MODULE) {
|
} else if (ob->type == OBJ_MODULE) {
|
||||||
defragged += defragModule(db, de);
|
defragModule(db, de);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown object type");
|
serverPanic("Unknown object type");
|
||||||
}
|
}
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag scan callback for the main db dictionary. */
|
/* Defrag scan callback for the main db dictionary. */
|
||||||
void defragScanCallback(void *privdata, const dictEntry *de) {
|
void defragScanCallback(void *privdata, const dictEntry *de) {
|
||||||
long defragged = defragKey((redisDb*)privdata, (dictEntry*)de);
|
long long hits_before = server.stat_active_defrag_hits;
|
||||||
server.stat_active_defrag_hits += defragged;
|
defragKey((redisDb*)privdata, (dictEntry*)de);
|
||||||
if(defragged)
|
if (server.stat_active_defrag_hits != hits_before)
|
||||||
server.stat_active_defrag_key_hits++;
|
server.stat_active_defrag_key_hits++;
|
||||||
else
|
else
|
||||||
server.stat_active_defrag_key_misses++;
|
server.stat_active_defrag_key_misses++;
|
||||||
server.stat_active_defrag_scanned++;
|
server.stat_active_defrag_scanned++;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag scan callback for each hash table bucket,
|
/* Dummy scan callback used when defragging the expire dictionary. We only
|
||||||
* used in order to defrag the dictEntry allocations. */
|
* defrag the entries, which is done per bucket. */
|
||||||
void defragDictBucketCallback(dict *d, dictEntry **bucketref) {
|
void defragExpireScanCallback(void *privdata, const dictEntry *de) {
|
||||||
while(*bucketref) {
|
UNUSED(privdata);
|
||||||
dictEntry *de = *bucketref, *newde;
|
UNUSED(de);
|
||||||
if ((newde = activeDefragAlloc(de))) {
|
server.stat_active_defrag_scanned++;
|
||||||
*bucketref = newde;
|
|
||||||
if (server.cluster_enabled && d == server.db[0].dict) {
|
|
||||||
/* Cluster keyspace dict. Update slot-to-entries mapping. */
|
|
||||||
slotToKeyReplaceEntry(newde, server.db);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bucketref = &(*bucketref)->next;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Utility function to get the fragmentation ratio from jemalloc.
|
/* Utility function to get the fragmentation ratio from jemalloc.
|
||||||
|
@ -964,15 +819,13 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) {
|
||||||
|
|
||||||
/* We may need to defrag other globals, one small allocation can hold a full allocator run.
|
/* We may need to defrag other globals, one small allocation can hold a full allocator run.
|
||||||
* so although small, it is still important to defrag these */
|
* so although small, it is still important to defrag these */
|
||||||
long defragOtherGlobals() {
|
void defragOtherGlobals() {
|
||||||
long defragged = 0;
|
|
||||||
|
|
||||||
/* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc.
|
/* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc.
|
||||||
* but we assume most of these are short lived, we only need to defrag allocations
|
* but we assume most of these are short lived, we only need to defrag allocations
|
||||||
* that remain static for a long time */
|
* that remain static for a long time */
|
||||||
defragged += activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT);
|
activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT);
|
||||||
defragged += moduleDefragGlobals();
|
moduleDefragGlobals();
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* returns 0 more work may or may not be needed (see non-zero cursor),
|
/* returns 0 more work may or may not be needed (see non-zero cursor),
|
||||||
|
@ -981,17 +834,17 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int
|
||||||
if (de) {
|
if (de) {
|
||||||
robj *ob = dictGetVal(de);
|
robj *ob = dictGetVal(de);
|
||||||
if (ob->type == OBJ_LIST) {
|
if (ob->type == OBJ_LIST) {
|
||||||
return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits);
|
return scanLaterList(ob, cursor, endtime);
|
||||||
} else if (ob->type == OBJ_SET) {
|
} else if (ob->type == OBJ_SET) {
|
||||||
server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
|
scanLaterSet(ob, cursor);
|
||||||
} else if (ob->type == OBJ_ZSET) {
|
} else if (ob->type == OBJ_ZSET) {
|
||||||
server.stat_active_defrag_hits += scanLaterZset(ob, cursor);
|
scanLaterZset(ob, cursor);
|
||||||
} else if (ob->type == OBJ_HASH) {
|
} else if (ob->type == OBJ_HASH) {
|
||||||
server.stat_active_defrag_hits += scanLaterHash(ob, cursor);
|
scanLaterHash(ob, cursor);
|
||||||
} else if (ob->type == OBJ_STREAM) {
|
} else if (ob->type == OBJ_STREAM) {
|
||||||
return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
|
return scanLaterStreamListpacks(ob, cursor, endtime);
|
||||||
} else if (ob->type == OBJ_MODULE) {
|
} else if (ob->type == OBJ_MODULE) {
|
||||||
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits, dbid);
|
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, dbid);
|
||||||
} else {
|
} else {
|
||||||
*cursor = 0; /* object type may have changed since we schedule it for later */
|
*cursor = 0; /* object type may have changed since we schedule it for later */
|
||||||
}
|
}
|
||||||
|
@ -1106,6 +959,7 @@ void computeDefragCycles() {
|
||||||
void activeDefragCycle(void) {
|
void activeDefragCycle(void) {
|
||||||
static int current_db = -1;
|
static int current_db = -1;
|
||||||
static unsigned long cursor = 0;
|
static unsigned long cursor = 0;
|
||||||
|
static unsigned long expires_cursor = 0;
|
||||||
static redisDb *db = NULL;
|
static redisDb *db = NULL;
|
||||||
static long long start_scan, start_stat;
|
static long long start_scan, start_stat;
|
||||||
unsigned int iterations = 0;
|
unsigned int iterations = 0;
|
||||||
|
@ -1151,7 +1005,7 @@ void activeDefragCycle(void) {
|
||||||
|
|
||||||
do {
|
do {
|
||||||
/* if we're not continuing a scan from the last call or loop, start a new one */
|
/* if we're not continuing a scan from the last call or loop, start a new one */
|
||||||
if (!cursor) {
|
if (!cursor && !expires_cursor) {
|
||||||
/* finish any leftovers from previous db before moving to the next one */
|
/* finish any leftovers from previous db before moving to the next one */
|
||||||
if (db && defragLaterStep(db, endtime)) {
|
if (db && defragLaterStep(db, endtime)) {
|
||||||
quit = 1; /* time is up, we didn't finish all the work */
|
quit = 1; /* time is up, we didn't finish all the work */
|
||||||
|
@ -1161,7 +1015,7 @@ void activeDefragCycle(void) {
|
||||||
/* Move on to next database, and stop if we reached the last one. */
|
/* Move on to next database, and stop if we reached the last one. */
|
||||||
if (++current_db >= server.dbnum) {
|
if (++current_db >= server.dbnum) {
|
||||||
/* defrag other items not part of the db / keys */
|
/* defrag other items not part of the db / keys */
|
||||||
server.stat_active_defrag_hits += defragOtherGlobals();
|
defragOtherGlobals();
|
||||||
|
|
||||||
long long now = ustime();
|
long long now = ustime();
|
||||||
size_t frag_bytes;
|
size_t frag_bytes;
|
||||||
|
@ -1198,16 +1052,27 @@ void activeDefragCycle(void) {
|
||||||
break; /* this will exit the function and we'll continue on the next cycle */
|
break; /* this will exit the function and we'll continue on the next cycle */
|
||||||
}
|
}
|
||||||
|
|
||||||
cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db);
|
/* Scan the keyspace dict unless we're scanning the expire dict. */
|
||||||
|
if (!expires_cursor)
|
||||||
|
cursor = dictScanDefrag(db->dict, cursor, defragScanCallback,
|
||||||
|
activeDefragAlloc, db);
|
||||||
|
|
||||||
|
/* When done scanning the keyspace dict, we scan the expire dict. */
|
||||||
|
if (!cursor)
|
||||||
|
expires_cursor = dictScanDefrag(db->expires, expires_cursor,
|
||||||
|
defragExpireScanCallback,
|
||||||
|
activeDefragAlloc, NULL);
|
||||||
|
|
||||||
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
|
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
|
||||||
* (if we have a lot of pointers in one hash bucket or rehashing),
|
* (if we have a lot of pointers in one hash bucket or rehashing),
|
||||||
* check if we reached the time limit.
|
* check if we reached the time limit.
|
||||||
* But regardless, don't start a new db in this loop, this is because after
|
* But regardless, don't start a new db in this loop, this is because after
|
||||||
* the last db we call defragOtherGlobals, which must be done in one cycle */
|
* the last db we call defragOtherGlobals, which must be done in one cycle */
|
||||||
if (!cursor || (++iterations > 16 ||
|
if (!(cursor || expires_cursor) ||
|
||||||
|
++iterations > 16 ||
|
||||||
server.stat_active_defrag_hits - prev_defragged > 512 ||
|
server.stat_active_defrag_hits - prev_defragged > 512 ||
|
||||||
server.stat_active_defrag_scanned - prev_scanned > 64)) {
|
server.stat_active_defrag_scanned - prev_scanned > 64)
|
||||||
|
{
|
||||||
if (!cursor || ustime() > endtime) {
|
if (!cursor || ustime() > endtime) {
|
||||||
quit = 1;
|
quit = 1;
|
||||||
break;
|
break;
|
||||||
|
@ -1216,7 +1081,7 @@ void activeDefragCycle(void) {
|
||||||
prev_defragged = server.stat_active_defrag_hits;
|
prev_defragged = server.stat_active_defrag_hits;
|
||||||
prev_scanned = server.stat_active_defrag_scanned;
|
prev_scanned = server.stat_active_defrag_scanned;
|
||||||
}
|
}
|
||||||
} while(cursor && !quit);
|
} while((cursor || expires_cursor) && !quit);
|
||||||
} while(!quit);
|
} while(!quit);
|
||||||
|
|
||||||
latencyEndMonitor(latency);
|
latencyEndMonitor(latency);
|
||||||
|
@ -1243,9 +1108,8 @@ void *activeDefragAlloc(void *ptr) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
robj *activeDefragStringOb(robj *ob, long *defragged) {
|
robj *activeDefragStringOb(robj *ob) {
|
||||||
UNUSED(ob);
|
UNUSED(ob);
|
||||||
UNUSED(defragged);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
168
src/dict.c
168
src/dict.c
|
@ -58,6 +58,22 @@
|
||||||
static dictResizeEnable dict_can_resize = DICT_RESIZE_ENABLE;
|
static dictResizeEnable dict_can_resize = DICT_RESIZE_ENABLE;
|
||||||
static unsigned int dict_force_resize_ratio = 5;
|
static unsigned int dict_force_resize_ratio = 5;
|
||||||
|
|
||||||
|
/* -------------------------- types ----------------------------------------- */
|
||||||
|
|
||||||
|
struct dictEntry {
|
||||||
|
void *key;
|
||||||
|
union {
|
||||||
|
void *val;
|
||||||
|
uint64_t u64;
|
||||||
|
int64_t s64;
|
||||||
|
double d;
|
||||||
|
} v;
|
||||||
|
struct dictEntry *next; /* Next entry in the same hash bucket. */
|
||||||
|
void *metadata[]; /* An arbitrary number of bytes (starting at a
|
||||||
|
* pointer-aligned address) of size as returned
|
||||||
|
* by dictType's dictEntryMetadataBytes(). */
|
||||||
|
};
|
||||||
|
|
||||||
/* -------------------------- private prototypes ---------------------------- */
|
/* -------------------------- private prototypes ---------------------------- */
|
||||||
|
|
||||||
static int _dictExpandIfNeeded(dict *d);
|
static int _dictExpandIfNeeded(dict *d);
|
||||||
|
@ -104,7 +120,11 @@ static void _dictReset(dict *d, int htidx)
|
||||||
/* Create a new hash table */
|
/* Create a new hash table */
|
||||||
dict *dictCreate(dictType *type)
|
dict *dictCreate(dictType *type)
|
||||||
{
|
{
|
||||||
dict *d = zmalloc(sizeof(*d));
|
size_t metasize = type->dictMetadataBytes ? type->dictMetadataBytes() : 0;
|
||||||
|
dict *d = zmalloc(sizeof(*d) + metasize);
|
||||||
|
if (metasize) {
|
||||||
|
memset(dictMetadata(d), 0, metasize);
|
||||||
|
}
|
||||||
|
|
||||||
_dictInit(d,type);
|
_dictInit(d,type);
|
||||||
return d;
|
return d;
|
||||||
|
@ -303,6 +323,11 @@ static void _dictRehashStep(dict *d) {
|
||||||
if (d->pauserehash == 0) dictRehash(d,1);
|
if (d->pauserehash == 0) dictRehash(d,1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Return a pointer to the metadata section within the dict. */
|
||||||
|
void *dictMetadata(dict *d) {
|
||||||
|
return &d->metadata;
|
||||||
|
}
|
||||||
|
|
||||||
/* Add an element to the target hash table */
|
/* Add an element to the target hash table */
|
||||||
int dictAdd(dict *d, void *key, void *val)
|
int dictAdd(dict *d, void *key, void *val)
|
||||||
{
|
{
|
||||||
|
@ -349,10 +374,10 @@ dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
|
||||||
* system it is more likely that recently added entries are accessed
|
* system it is more likely that recently added entries are accessed
|
||||||
* more frequently. */
|
* more frequently. */
|
||||||
htidx = dictIsRehashing(d) ? 1 : 0;
|
htidx = dictIsRehashing(d) ? 1 : 0;
|
||||||
size_t metasize = dictMetadataSize(d);
|
size_t metasize = dictEntryMetadataSize(d);
|
||||||
entry = zmalloc(sizeof(*entry) + metasize);
|
entry = zmalloc(sizeof(*entry) + metasize);
|
||||||
if (metasize > 0) {
|
if (metasize > 0) {
|
||||||
memset(dictMetadata(entry), 0, metasize);
|
memset(dictEntryMetadata(entry), 0, metasize);
|
||||||
}
|
}
|
||||||
entry->next = d->ht_table[htidx][index];
|
entry->next = d->ht_table[htidx][index];
|
||||||
d->ht_table[htidx][index] = entry;
|
d->ht_table[htidx][index] = entry;
|
||||||
|
@ -596,6 +621,82 @@ void dictTwoPhaseUnlinkFree(dict *d, dictEntry *he, dictEntry **plink, int table
|
||||||
dictResumeRehashing(d);
|
dictResumeRehashing(d);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dictSetKey(dict *d, dictEntry* de, void *key) {
|
||||||
|
if (d->type->keyDup)
|
||||||
|
de->key = d->type->keyDup(d, key);
|
||||||
|
else
|
||||||
|
de->key = key;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dictSetVal(dict *d, dictEntry *de, void *val) {
|
||||||
|
de->v.val = d->type->valDup ? d->type->valDup(d, val) : val;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dictSetSignedIntegerVal(dictEntry *de, int64_t val) {
|
||||||
|
de->v.s64 = val;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dictSetUnsignedIntegerVal(dictEntry *de, uint64_t val) {
|
||||||
|
de->v.u64 = val;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dictSetDoubleVal(dictEntry *de, double val) {
|
||||||
|
de->v.d = val;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t dictIncrSignedIntegerVal(dictEntry *de, int64_t val) {
|
||||||
|
return de->v.s64 += val;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t dictIncrUnsignedIntegerVal(dictEntry *de, uint64_t val) {
|
||||||
|
return de->v.u64 += val;
|
||||||
|
}
|
||||||
|
|
||||||
|
double dictIncrDoubleVal(dictEntry *de, double val) {
|
||||||
|
return de->v.d += val;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* A pointer to the metadata section within the dict entry. */
|
||||||
|
void *dictEntryMetadata(dictEntry *de) {
|
||||||
|
return &de->metadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *dictGetKey(const dictEntry *de) {
|
||||||
|
return de->key;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *dictGetVal(const dictEntry *de) {
|
||||||
|
return de->v.val;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t dictGetSignedIntegerVal(const dictEntry *de) {
|
||||||
|
return de->v.s64;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t dictGetUnsignedIntegerVal(const dictEntry *de) {
|
||||||
|
return de->v.u64;
|
||||||
|
}
|
||||||
|
|
||||||
|
double dictGetDoubleVal(const dictEntry *de) {
|
||||||
|
return de->v.d;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns a mutable reference to the value as a double within the entry. */
|
||||||
|
double *dictGetDoubleValPtr(dictEntry *de) {
|
||||||
|
return &de->v.d;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Returns the memory usage in bytes of the dict, excluding the size of the keys
|
||||||
|
* and values. */
|
||||||
|
size_t dictMemUsage(const dict *d) {
|
||||||
|
return dictSize(d) * sizeof(dictEntry) +
|
||||||
|
dictSlots(d) * sizeof(dictEntry*);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t dictEntryMemUsage(void) {
|
||||||
|
return sizeof(dictEntry);
|
||||||
|
}
|
||||||
|
|
||||||
/* A fingerprint is a 64 bit number that represents the state of the dictionary
|
/* A fingerprint is a 64 bit number that represents the state of the dictionary
|
||||||
* at a given time, it's just a few dict properties xored together.
|
* at a given time, it's just a few dict properties xored together.
|
||||||
* When an unsafe iterator is initialized, we get the dict fingerprint, and check
|
* When an unsafe iterator is initialized, we get the dict fingerprint, and check
|
||||||
|
@ -846,6 +947,21 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {
|
||||||
return stored;
|
return stored;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Reallocate the dictEntry allocations in a bucket using the provided
|
||||||
|
* allocation function in order to defrag them. */
|
||||||
|
static void dictDefragBucket(dict *d, dictEntry **bucketref, dictDefragAllocFunction *allocfn) {
|
||||||
|
while (bucketref && *bucketref) {
|
||||||
|
dictEntry *de = *bucketref, *newde;
|
||||||
|
if ((newde = allocfn(de))) {
|
||||||
|
*bucketref = newde;
|
||||||
|
if (d->type->afterReplaceEntry)
|
||||||
|
d->type->afterReplaceEntry(d, newde);
|
||||||
|
}
|
||||||
|
bucketref = &(*bucketref)->next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* This is like dictGetRandomKey() from the POV of the API, but will do more
|
/* This is like dictGetRandomKey() from the POV of the API, but will do more
|
||||||
* work to ensure a better distribution of the returned element.
|
* work to ensure a better distribution of the returned element.
|
||||||
*
|
*
|
||||||
|
@ -969,7 +1085,23 @@ static unsigned long rev(unsigned long v) {
|
||||||
unsigned long dictScan(dict *d,
|
unsigned long dictScan(dict *d,
|
||||||
unsigned long v,
|
unsigned long v,
|
||||||
dictScanFunction *fn,
|
dictScanFunction *fn,
|
||||||
dictScanBucketFunction* bucketfn,
|
void *privdata)
|
||||||
|
{
|
||||||
|
return dictScanDefrag(d, v, fn, NULL, privdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Like dictScan, but additionally reallocates the memory used by the dict
|
||||||
|
* entries using the provided allocation function. This feature was added for
|
||||||
|
* the active defrag feature.
|
||||||
|
*
|
||||||
|
* The 'defracallocfn' callback is called with a pointer to memory that callback
|
||||||
|
* can reallocate. The callback should return a new memory address or NULL,
|
||||||
|
* where NULL means that no reallocation happened and the old memory is still
|
||||||
|
* valid. */
|
||||||
|
unsigned long dictScanDefrag(dict *d,
|
||||||
|
unsigned long v,
|
||||||
|
dictScanFunction *fn,
|
||||||
|
dictDefragAllocFunction *defragallocfn,
|
||||||
void *privdata)
|
void *privdata)
|
||||||
{
|
{
|
||||||
int htidx0, htidx1;
|
int htidx0, htidx1;
|
||||||
|
@ -986,7 +1118,9 @@ unsigned long dictScan(dict *d,
|
||||||
m0 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx0]);
|
m0 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx0]);
|
||||||
|
|
||||||
/* Emit entries at cursor */
|
/* Emit entries at cursor */
|
||||||
if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]);
|
if (defragallocfn) {
|
||||||
|
dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragallocfn);
|
||||||
|
}
|
||||||
de = d->ht_table[htidx0][v & m0];
|
de = d->ht_table[htidx0][v & m0];
|
||||||
while (de) {
|
while (de) {
|
||||||
next = de->next;
|
next = de->next;
|
||||||
|
@ -1017,7 +1151,9 @@ unsigned long dictScan(dict *d,
|
||||||
m1 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx1]);
|
m1 = DICTHT_SIZE_MASK(d->ht_size_exp[htidx1]);
|
||||||
|
|
||||||
/* Emit entries at cursor */
|
/* Emit entries at cursor */
|
||||||
if (bucketfn) bucketfn(d, &d->ht_table[htidx0][v & m0]);
|
if (defragallocfn) {
|
||||||
|
dictDefragBucket(d, &d->ht_table[htidx0][v & m0], defragallocfn);
|
||||||
|
}
|
||||||
de = d->ht_table[htidx0][v & m0];
|
de = d->ht_table[htidx0][v & m0];
|
||||||
while (de) {
|
while (de) {
|
||||||
next = de->next;
|
next = de->next;
|
||||||
|
@ -1029,7 +1165,9 @@ unsigned long dictScan(dict *d,
|
||||||
* of the index pointed to by the cursor in the smaller table */
|
* of the index pointed to by the cursor in the smaller table */
|
||||||
do {
|
do {
|
||||||
/* Emit entries at cursor */
|
/* Emit entries at cursor */
|
||||||
if (bucketfn) bucketfn(d, &d->ht_table[htidx1][v & m1]);
|
if (defragallocfn) {
|
||||||
|
dictDefragBucket(d, &d->ht_table[htidx1][v & m1], defragallocfn);
|
||||||
|
}
|
||||||
de = d->ht_table[htidx1][v & m1];
|
de = d->ht_table[htidx1][v & m1];
|
||||||
while (de) {
|
while (de) {
|
||||||
next = de->next;
|
next = de->next;
|
||||||
|
@ -1150,25 +1288,23 @@ uint64_t dictGetHash(dict *d, const void *key) {
|
||||||
return dictHashKey(d, key);
|
return dictHashKey(d, key);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Finds the dictEntry reference by using pointer and pre-calculated hash.
|
/* Finds the dictEntry using pointer and pre-calculated hash.
|
||||||
* oldkey is a dead pointer and should not be accessed.
|
* oldkey is a dead pointer and should not be accessed.
|
||||||
* the hash value should be provided using dictGetHash.
|
* the hash value should be provided using dictGetHash.
|
||||||
* no string / key comparison is performed.
|
* no string / key comparison is performed.
|
||||||
* return value is the reference to the dictEntry if found, or NULL if not found. */
|
* return value is a pointer to the dictEntry if found, or NULL if not found. */
|
||||||
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) {
|
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash) {
|
||||||
dictEntry *he, **heref;
|
dictEntry *he;
|
||||||
unsigned long idx, table;
|
unsigned long idx, table;
|
||||||
|
|
||||||
if (dictSize(d) == 0) return NULL; /* dict is empty */
|
if (dictSize(d) == 0) return NULL; /* dict is empty */
|
||||||
for (table = 0; table <= 1; table++) {
|
for (table = 0; table <= 1; table++) {
|
||||||
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
|
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
|
||||||
heref = &d->ht_table[table][idx];
|
he = d->ht_table[table][idx];
|
||||||
he = *heref;
|
|
||||||
while(he) {
|
while(he) {
|
||||||
if (oldptr==he->key)
|
if (oldptr==he->key)
|
||||||
return heref;
|
return he;
|
||||||
heref = &he->next;
|
he = he->next;
|
||||||
he = *heref;
|
|
||||||
}
|
}
|
||||||
if (!dictIsRehashing(d)) return NULL;
|
if (!dictIsRehashing(d)) return NULL;
|
||||||
}
|
}
|
||||||
|
|
97
src/dict.h
97
src/dict.h
|
@ -44,19 +44,7 @@
|
||||||
#define DICT_OK 0
|
#define DICT_OK 0
|
||||||
#define DICT_ERR 1
|
#define DICT_ERR 1
|
||||||
|
|
||||||
typedef struct dictEntry {
|
typedef struct dictEntry dictEntry; /* opaque */
|
||||||
void *key;
|
|
||||||
union {
|
|
||||||
void *val;
|
|
||||||
uint64_t u64;
|
|
||||||
int64_t s64;
|
|
||||||
double d;
|
|
||||||
} v;
|
|
||||||
struct dictEntry *next; /* Next entry in the same hash bucket. */
|
|
||||||
void *metadata[]; /* An arbitrary number of bytes (starting at a
|
|
||||||
* pointer-aligned address) of size as returned
|
|
||||||
* by dictType's dictEntryMetadataBytes(). */
|
|
||||||
} dictEntry;
|
|
||||||
|
|
||||||
typedef struct dict dict;
|
typedef struct dict dict;
|
||||||
|
|
||||||
|
@ -68,9 +56,13 @@ typedef struct dictType {
|
||||||
void (*keyDestructor)(dict *d, void *key);
|
void (*keyDestructor)(dict *d, void *key);
|
||||||
void (*valDestructor)(dict *d, void *obj);
|
void (*valDestructor)(dict *d, void *obj);
|
||||||
int (*expandAllowed)(size_t moreMem, double usedRatio);
|
int (*expandAllowed)(size_t moreMem, double usedRatio);
|
||||||
/* Allow a dictEntry to carry extra caller-defined metadata. The
|
/* Allow each dict and dictEntry to carry extra caller-defined metadata. The
|
||||||
* extra memory is initialized to 0 when a dictEntry is allocated. */
|
* extra memory is initialized to 0 when allocated. */
|
||||||
size_t (*dictEntryMetadataBytes)(dict *d);
|
size_t (*dictEntryMetadataBytes)(dict *d);
|
||||||
|
size_t (*dictMetadataBytes)(void);
|
||||||
|
/* Optional callback called after an entry has been reallocated (due to
|
||||||
|
* active defrag). Only called if the entry has metadata. */
|
||||||
|
void (*afterReplaceEntry)(dict *d, dictEntry *entry);
|
||||||
} dictType;
|
} dictType;
|
||||||
|
|
||||||
#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp))
|
#define DICTHT_SIZE(exp) ((exp) == -1 ? 0 : (unsigned long)1<<(exp))
|
||||||
|
@ -87,6 +79,10 @@ struct dict {
|
||||||
/* Keep small vars at end for optimal (minimal) struct padding */
|
/* Keep small vars at end for optimal (minimal) struct padding */
|
||||||
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
|
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
|
||||||
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
|
signed char ht_size_exp[2]; /* exponent of size. (size = 1<<exp) */
|
||||||
|
|
||||||
|
void *metadata[]; /* An arbitrary number of bytes (starting at a
|
||||||
|
* pointer-aligned address) of size as defined
|
||||||
|
* by dictType's dictEntryBytes. */
|
||||||
};
|
};
|
||||||
|
|
||||||
/* If safe is set to 1 this is a safe iterator, that means, you can call
|
/* If safe is set to 1 this is a safe iterator, that means, you can call
|
||||||
|
@ -103,7 +99,7 @@ typedef struct dictIterator {
|
||||||
} dictIterator;
|
} dictIterator;
|
||||||
|
|
||||||
typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
|
typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
|
||||||
typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref);
|
typedef void *(dictDefragAllocFunction)(void *ptr);
|
||||||
|
|
||||||
/* This is the initial size of every hash table */
|
/* This is the initial size of every hash table */
|
||||||
#define DICT_HT_INITIAL_EXP 2
|
#define DICT_HT_INITIAL_EXP 2
|
||||||
|
@ -112,60 +108,24 @@ typedef void (dictScanBucketFunction)(dict *d, dictEntry **bucketref);
|
||||||
/* ------------------------------- Macros ------------------------------------*/
|
/* ------------------------------- Macros ------------------------------------*/
|
||||||
#define dictFreeVal(d, entry) do { \
|
#define dictFreeVal(d, entry) do { \
|
||||||
if ((d)->type->valDestructor) \
|
if ((d)->type->valDestructor) \
|
||||||
(d)->type->valDestructor((d), (entry)->v.val); \
|
(d)->type->valDestructor((d), dictGetVal(entry)); \
|
||||||
} while(0)
|
} while(0)
|
||||||
|
|
||||||
#define dictSetVal(d, entry, _val_) do { \
|
|
||||||
if ((d)->type->valDup) \
|
|
||||||
(entry)->v.val = (d)->type->valDup((d), _val_); \
|
|
||||||
else \
|
|
||||||
(entry)->v.val = (_val_); \
|
|
||||||
} while(0)
|
|
||||||
|
|
||||||
#define dictSetSignedIntegerVal(entry, _val_) \
|
|
||||||
do { (entry)->v.s64 = _val_; } while(0)
|
|
||||||
|
|
||||||
#define dictSetUnsignedIntegerVal(entry, _val_) \
|
|
||||||
do { (entry)->v.u64 = _val_; } while(0)
|
|
||||||
|
|
||||||
#define dictSetDoubleVal(entry, _val_) \
|
|
||||||
do { (entry)->v.d = _val_; } while(0)
|
|
||||||
|
|
||||||
#define dictIncrSignedIntegerVal(entry, _val_) \
|
|
||||||
((entry)->v.s64 += _val_)
|
|
||||||
|
|
||||||
#define dictIncrUnsignedIntegerVal(entry, _val_) \
|
|
||||||
((entry)->v.u64 += _val_)
|
|
||||||
|
|
||||||
#define dictIncrDoubleVal(entry, _val_) \
|
|
||||||
((entry)->v.d += _val_)
|
|
||||||
|
|
||||||
#define dictFreeKey(d, entry) \
|
#define dictFreeKey(d, entry) \
|
||||||
if ((d)->type->keyDestructor) \
|
if ((d)->type->keyDestructor) \
|
||||||
(d)->type->keyDestructor((d), (entry)->key)
|
(d)->type->keyDestructor((d), dictGetKey(entry))
|
||||||
|
|
||||||
#define dictSetKey(d, entry, _key_) do { \
|
|
||||||
if ((d)->type->keyDup) \
|
|
||||||
(entry)->key = (d)->type->keyDup((d), _key_); \
|
|
||||||
else \
|
|
||||||
(entry)->key = (_key_); \
|
|
||||||
} while(0)
|
|
||||||
|
|
||||||
#define dictCompareKeys(d, key1, key2) \
|
#define dictCompareKeys(d, key1, key2) \
|
||||||
(((d)->type->keyCompare) ? \
|
(((d)->type->keyCompare) ? \
|
||||||
(d)->type->keyCompare((d), key1, key2) : \
|
(d)->type->keyCompare((d), key1, key2) : \
|
||||||
(key1) == (key2))
|
(key1) == (key2))
|
||||||
|
|
||||||
#define dictMetadata(entry) (&(entry)->metadata)
|
#define dictEntryMetadataSize(d) ((d)->type->dictEntryMetadataBytes \
|
||||||
#define dictMetadataSize(d) ((d)->type->dictEntryMetadataBytes \
|
|
||||||
? (d)->type->dictEntryMetadataBytes(d) : 0)
|
? (d)->type->dictEntryMetadataBytes(d) : 0)
|
||||||
|
#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \
|
||||||
|
? (d)->type->dictMetadataBytes() : 0)
|
||||||
|
|
||||||
#define dictHashKey(d, key) ((d)->type->hashFunction(key))
|
#define dictHashKey(d, key) ((d)->type->hashFunction(key))
|
||||||
#define dictGetKey(he) ((he)->key)
|
|
||||||
#define dictGetVal(he) ((he)->v.val)
|
|
||||||
#define dictGetSignedIntegerVal(he) ((he)->v.s64)
|
|
||||||
#define dictGetUnsignedIntegerVal(he) ((he)->v.u64)
|
|
||||||
#define dictGetDoubleVal(he) ((he)->v.d)
|
|
||||||
#define dictSlots(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1]))
|
#define dictSlots(d) (DICTHT_SIZE((d)->ht_size_exp[0])+DICTHT_SIZE((d)->ht_size_exp[1]))
|
||||||
#define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1])
|
#define dictSize(d) ((d)->ht_used[0]+(d)->ht_used[1])
|
||||||
#define dictIsRehashing(d) ((d)->rehashidx != -1)
|
#define dictIsRehashing(d) ((d)->rehashidx != -1)
|
||||||
|
@ -189,6 +149,7 @@ typedef enum {
|
||||||
dict *dictCreate(dictType *type);
|
dict *dictCreate(dictType *type);
|
||||||
int dictExpand(dict *d, unsigned long size);
|
int dictExpand(dict *d, unsigned long size);
|
||||||
int dictTryExpand(dict *d, unsigned long size);
|
int dictTryExpand(dict *d, unsigned long size);
|
||||||
|
void *dictMetadata(dict *d);
|
||||||
int dictAdd(dict *d, void *key, void *val);
|
int dictAdd(dict *d, void *key, void *val);
|
||||||
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
|
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing);
|
||||||
dictEntry *dictAddOrFind(dict *d, void *key);
|
dictEntry *dictAddOrFind(dict *d, void *key);
|
||||||
|
@ -202,6 +163,23 @@ void dictRelease(dict *d);
|
||||||
dictEntry * dictFind(dict *d, const void *key);
|
dictEntry * dictFind(dict *d, const void *key);
|
||||||
void *dictFetchValue(dict *d, const void *key);
|
void *dictFetchValue(dict *d, const void *key);
|
||||||
int dictResize(dict *d);
|
int dictResize(dict *d);
|
||||||
|
void dictSetKey(dict *d, dictEntry* de, void *key);
|
||||||
|
void dictSetVal(dict *d, dictEntry *de, void *val);
|
||||||
|
void dictSetSignedIntegerVal(dictEntry *de, int64_t val);
|
||||||
|
void dictSetUnsignedIntegerVal(dictEntry *de, uint64_t val);
|
||||||
|
void dictSetDoubleVal(dictEntry *de, double val);
|
||||||
|
int64_t dictIncrSignedIntegerVal(dictEntry *de, int64_t val);
|
||||||
|
uint64_t dictIncrUnsignedIntegerVal(dictEntry *de, uint64_t val);
|
||||||
|
double dictIncrDoubleVal(dictEntry *de, double val);
|
||||||
|
void *dictEntryMetadata(dictEntry *de);
|
||||||
|
void *dictGetKey(const dictEntry *de);
|
||||||
|
void *dictGetVal(const dictEntry *de);
|
||||||
|
int64_t dictGetSignedIntegerVal(const dictEntry *de);
|
||||||
|
uint64_t dictGetUnsignedIntegerVal(const dictEntry *de);
|
||||||
|
double dictGetDoubleVal(const dictEntry *de);
|
||||||
|
double *dictGetDoubleValPtr(dictEntry *de);
|
||||||
|
size_t dictMemUsage(const dict *d);
|
||||||
|
size_t dictEntryMemUsage(void);
|
||||||
dictIterator *dictGetIterator(dict *d);
|
dictIterator *dictGetIterator(dict *d);
|
||||||
dictIterator *dictGetSafeIterator(dict *d);
|
dictIterator *dictGetSafeIterator(dict *d);
|
||||||
void dictInitIterator(dictIterator *iter, dict *d);
|
void dictInitIterator(dictIterator *iter, dict *d);
|
||||||
|
@ -221,9 +199,10 @@ int dictRehash(dict *d, int n);
|
||||||
int dictRehashMilliseconds(dict *d, int ms);
|
int dictRehashMilliseconds(dict *d, int ms);
|
||||||
void dictSetHashFunctionSeed(uint8_t *seed);
|
void dictSetHashFunctionSeed(uint8_t *seed);
|
||||||
uint8_t *dictGetHashFunctionSeed(void);
|
uint8_t *dictGetHashFunctionSeed(void);
|
||||||
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata);
|
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
|
||||||
|
unsigned long dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragAllocFunction *allocfn, void *privdata);
|
||||||
uint64_t dictGetHash(dict *d, const void *key);
|
uint64_t dictGetHash(dict *d, const void *key);
|
||||||
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
|
dictEntry *dictFindEntryByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
|
||||||
|
|
||||||
#ifdef REDIS_TEST
|
#ifdef REDIS_TEST
|
||||||
int dictTest(int argc, char *argv[], int flags);
|
int dictTest(int argc, char *argv[], int flags);
|
||||||
|
|
|
@ -677,8 +677,8 @@ dict* evalScriptsDict() {
|
||||||
|
|
||||||
unsigned long evalScriptsMemory() {
|
unsigned long evalScriptsMemory() {
|
||||||
return lctx.lua_scripts_mem +
|
return lctx.lua_scripts_mem +
|
||||||
dictSize(lctx.lua_scripts) * (sizeof(dictEntry) + sizeof(luaScript)) +
|
dictMemUsage(lctx.lua_scripts) +
|
||||||
dictSlots(lctx.lua_scripts) * sizeof(dictEntry*);
|
dictSize(lctx.lua_scripts) * sizeof(luaScript);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ---------------------------------------------------------------------------
|
/* ---------------------------------------------------------------------------
|
||||||
|
|
97
src/expire.c
97
src/expire.c
|
@ -110,6 +110,33 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
|
||||||
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
|
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
|
||||||
we do extra efforts. */
|
we do extra efforts. */
|
||||||
|
|
||||||
|
/* Data used by the expire dict scan callback. */
|
||||||
|
typedef struct {
|
||||||
|
redisDb *db;
|
||||||
|
long long now;
|
||||||
|
unsigned long sampled; /* num keys checked */
|
||||||
|
unsigned long expired; /* num keys expired */
|
||||||
|
long long ttl_sum; /* sum of ttl for key with ttl not yet expired */
|
||||||
|
int ttl_samples; /* num keys with ttl not yet expired */
|
||||||
|
} expireScanData;
|
||||||
|
|
||||||
|
void expireScanCallback(void *privdata, const dictEntry *const_de) {
|
||||||
|
dictEntry *de = (dictEntry *)const_de;
|
||||||
|
expireScanData *data = privdata;
|
||||||
|
long long ttl = dictGetSignedIntegerVal(de) - data->now;
|
||||||
|
if (activeExpireCycleTryExpire(data->db, de, data->now)) {
|
||||||
|
data->expired++;
|
||||||
|
/* Propagate the DEL command */
|
||||||
|
postExecutionUnitOperations();
|
||||||
|
}
|
||||||
|
if (ttl > 0) {
|
||||||
|
/* We want the average TTL of keys yet not expired. */
|
||||||
|
data->ttl_sum += ttl;
|
||||||
|
data->ttl_samples++;
|
||||||
|
}
|
||||||
|
data->sampled++;
|
||||||
|
}
|
||||||
|
|
||||||
void activeExpireCycle(int type) {
|
void activeExpireCycle(int type) {
|
||||||
/* Adjust the running parameters according to the configured expire
|
/* Adjust the running parameters according to the configured expire
|
||||||
* effort. The default effort is 1, and the maximum configurable effort
|
* effort. The default effort is 1, and the maximum configurable effort
|
||||||
|
@ -186,10 +213,11 @@ void activeExpireCycle(int type) {
|
||||||
serverAssert(server.also_propagate.numops == 0);
|
serverAssert(server.also_propagate.numops == 0);
|
||||||
|
|
||||||
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
|
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
|
||||||
/* Expired and checked in a single loop. */
|
/* Scan callback data including expired and checked count per iteration. */
|
||||||
unsigned long expired, sampled;
|
expireScanData data;
|
||||||
|
|
||||||
redisDb *db = server.db+(current_db % server.dbnum);
|
redisDb *db = server.db+(current_db % server.dbnum);
|
||||||
|
data.db = db;
|
||||||
|
|
||||||
/* Increment the DB now so we are sure if we run out of time
|
/* Increment the DB now so we are sure if we run out of time
|
||||||
* in the current DB we'll restart from the next. This allows to
|
* in the current DB we'll restart from the next. This allows to
|
||||||
|
@ -202,8 +230,6 @@ void activeExpireCycle(int type) {
|
||||||
* is not fixed, but depends on the Redis configured "expire effort". */
|
* is not fixed, but depends on the Redis configured "expire effort". */
|
||||||
do {
|
do {
|
||||||
unsigned long num, slots;
|
unsigned long num, slots;
|
||||||
long long now, ttl_sum;
|
|
||||||
int ttl_samples;
|
|
||||||
iteration++;
|
iteration++;
|
||||||
|
|
||||||
/* If there is nothing to expire try next DB ASAP. */
|
/* If there is nothing to expire try next DB ASAP. */
|
||||||
|
@ -212,7 +238,7 @@ void activeExpireCycle(int type) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
slots = dictSlots(db->expires);
|
slots = dictSlots(db->expires);
|
||||||
now = mstime();
|
data.now = mstime();
|
||||||
|
|
||||||
/* When there are less than 1% filled slots, sampling the key
|
/* When there are less than 1% filled slots, sampling the key
|
||||||
* space is expensive, so stop here waiting for better times...
|
* space is expensive, so stop here waiting for better times...
|
||||||
|
@ -220,12 +246,12 @@ void activeExpireCycle(int type) {
|
||||||
if (slots > DICT_HT_INITIAL_SIZE &&
|
if (slots > DICT_HT_INITIAL_SIZE &&
|
||||||
(num*100/slots < 1)) break;
|
(num*100/slots < 1)) break;
|
||||||
|
|
||||||
/* The main collection cycle. Sample random keys among keys
|
/* The main collection cycle. Scan through keys among keys
|
||||||
* with an expire set, checking for expired ones. */
|
* with an expire set, checking for expired ones. */
|
||||||
expired = 0;
|
data.sampled = 0;
|
||||||
sampled = 0;
|
data.expired = 0;
|
||||||
ttl_sum = 0;
|
data.ttl_sum = 0;
|
||||||
ttl_samples = 0;
|
data.ttl_samples = 0;
|
||||||
|
|
||||||
if (num > config_keys_per_loop)
|
if (num > config_keys_per_loop)
|
||||||
num = config_keys_per_loop;
|
num = config_keys_per_loop;
|
||||||
|
@ -243,46 +269,17 @@ void activeExpireCycle(int type) {
|
||||||
long max_buckets = num*20;
|
long max_buckets = num*20;
|
||||||
long checked_buckets = 0;
|
long checked_buckets = 0;
|
||||||
|
|
||||||
while (sampled < num && checked_buckets < max_buckets) {
|
while (data.sampled < num && checked_buckets < max_buckets) {
|
||||||
for (int table = 0; table < 2; table++) {
|
db->expires_cursor = dictScan(db->expires, db->expires_cursor,
|
||||||
if (table == 1 && !dictIsRehashing(db->expires)) break;
|
expireScanCallback, &data);
|
||||||
|
|
||||||
unsigned long idx = db->expires_cursor;
|
|
||||||
idx &= DICTHT_SIZE_MASK(db->expires->ht_size_exp[table]);
|
|
||||||
dictEntry *de = db->expires->ht_table[table][idx];
|
|
||||||
long long ttl;
|
|
||||||
|
|
||||||
/* Scan the current bucket of the current table. */
|
|
||||||
checked_buckets++;
|
checked_buckets++;
|
||||||
while(de) {
|
|
||||||
/* Get the next entry now since this entry may get
|
|
||||||
* deleted. */
|
|
||||||
dictEntry *e = de;
|
|
||||||
de = de->next;
|
|
||||||
|
|
||||||
ttl = dictGetSignedIntegerVal(e)-now;
|
|
||||||
if (activeExpireCycleTryExpire(db,e,now)) {
|
|
||||||
expired++;
|
|
||||||
/* Propagate the DEL command */
|
|
||||||
postExecutionUnitOperations();
|
|
||||||
}
|
}
|
||||||
if (ttl > 0) {
|
total_expired += data.expired;
|
||||||
/* We want the average TTL of keys yet
|
total_sampled += data.sampled;
|
||||||
* not expired. */
|
|
||||||
ttl_sum += ttl;
|
|
||||||
ttl_samples++;
|
|
||||||
}
|
|
||||||
sampled++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
db->expires_cursor++;
|
|
||||||
}
|
|
||||||
total_expired += expired;
|
|
||||||
total_sampled += sampled;
|
|
||||||
|
|
||||||
/* Update the average TTL stats for this database. */
|
/* Update the average TTL stats for this database. */
|
||||||
if (ttl_samples) {
|
if (data.ttl_samples) {
|
||||||
long long avg_ttl = ttl_sum/ttl_samples;
|
long long avg_ttl = data.ttl_sum / data.ttl_samples;
|
||||||
|
|
||||||
/* Do a simple running average with a few samples.
|
/* Do a simple running average with a few samples.
|
||||||
* We just use the current estimate with a weight of 2%
|
* We just use the current estimate with a weight of 2%
|
||||||
|
@ -305,8 +302,8 @@ void activeExpireCycle(int type) {
|
||||||
/* We don't repeat the cycle for the current database if there are
|
/* We don't repeat the cycle for the current database if there are
|
||||||
* an acceptable amount of stale keys (logically expired but yet
|
* an acceptable amount of stale keys (logically expired but yet
|
||||||
* not reclaimed). */
|
* not reclaimed). */
|
||||||
} while (sampled == 0 ||
|
} while (data.sampled == 0 ||
|
||||||
(expired*100/sampled) > config_cycle_acceptable_stale);
|
(data.expired * 100 / data.sampled) > config_cycle_acceptable_stale);
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed = ustime()-start;
|
elapsed = ustime()-start;
|
||||||
|
@ -444,8 +441,8 @@ void rememberSlaveKeyWithExpire(redisDb *db, robj *key) {
|
||||||
* representing the key: we don't want to need to take those keys
|
* representing the key: we don't want to need to take those keys
|
||||||
* in sync with the main DB. The keys will be removed by expireSlaveKeys()
|
* in sync with the main DB. The keys will be removed by expireSlaveKeys()
|
||||||
* as it scans to find keys to remove. */
|
* as it scans to find keys to remove. */
|
||||||
if (de->key == key->ptr) {
|
if (dictGetKey(de) == key->ptr) {
|
||||||
de->key = sdsdup(key->ptr);
|
dictSetKey(slaveKeysWithExpire, de, sdsdup(key->ptr));
|
||||||
dictSetUnsignedIntegerVal(de,0);
|
dictSetUnsignedIntegerVal(de,0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1091,10 +1091,9 @@ unsigned long functionsMemory() {
|
||||||
|
|
||||||
/* Return memory overhead of all the engines combine */
|
/* Return memory overhead of all the engines combine */
|
||||||
unsigned long functionsMemoryOverhead() {
|
unsigned long functionsMemoryOverhead() {
|
||||||
size_t memory_overhead = dictSize(engines) * sizeof(dictEntry) +
|
size_t memory_overhead = dictMemUsage(engines);
|
||||||
dictSlots(engines) * sizeof(dictEntry*);
|
memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions);
|
||||||
memory_overhead += dictSize(curr_functions_lib_ctx->functions) * sizeof(dictEntry) +
|
memory_overhead += sizeof(functionsLibCtx);
|
||||||
dictSlots(curr_functions_lib_ctx->functions) * sizeof(dictEntry*) + sizeof(functionsLibCtx);
|
|
||||||
memory_overhead += curr_functions_lib_ctx->cache_memory;
|
memory_overhead += curr_functions_lib_ctx->cache_memory;
|
||||||
memory_overhead += engine_cache_memory;
|
memory_overhead += engine_cache_memory;
|
||||||
|
|
||||||
|
|
34
src/module.c
34
src/module.c
|
@ -10336,7 +10336,7 @@ int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanC
|
||||||
}
|
}
|
||||||
int ret = 1;
|
int ret = 1;
|
||||||
ScanCBData data = { ctx, privdata, fn };
|
ScanCBData data = { ctx, privdata, fn };
|
||||||
cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, NULL, &data);
|
cursor->cursor = dictScan(ctx->client->db->dict, cursor->cursor, moduleScanCallback, &data);
|
||||||
if (cursor->cursor == 0) {
|
if (cursor->cursor == 0) {
|
||||||
cursor->done = 1;
|
cursor->done = 1;
|
||||||
ret = 0;
|
ret = 0;
|
||||||
|
@ -10448,7 +10448,7 @@ int RM_ScanKey(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleSc
|
||||||
int ret = 1;
|
int ret = 1;
|
||||||
if (ht) {
|
if (ht) {
|
||||||
ScanKeyCBData data = { key, privdata, fn };
|
ScanKeyCBData data = { key, privdata, fn };
|
||||||
cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, NULL, &data);
|
cursor->cursor = dictScan(ht, cursor->cursor, moduleScanKeyCallback, &data);
|
||||||
if (cursor->cursor == 0) {
|
if (cursor->cursor == 0) {
|
||||||
cursor->done = 1;
|
cursor->done = 1;
|
||||||
ret = 0;
|
ret = 0;
|
||||||
|
@ -12562,7 +12562,6 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) {
|
||||||
* defrag callback.
|
* defrag callback.
|
||||||
*/
|
*/
|
||||||
struct RedisModuleDefragCtx {
|
struct RedisModuleDefragCtx {
|
||||||
long defragged;
|
|
||||||
long long int endtime;
|
long long int endtime;
|
||||||
unsigned long *cursor;
|
unsigned long *cursor;
|
||||||
struct redisObject *key; /* Optional name of key processed, NULL when unknown. */
|
struct redisObject *key; /* Optional name of key processed, NULL when unknown. */
|
||||||
|
@ -12651,11 +12650,8 @@ int RM_DefragCursorGet(RedisModuleDefragCtx *ctx, unsigned long *cursor) {
|
||||||
* be used again.
|
* be used again.
|
||||||
*/
|
*/
|
||||||
void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) {
|
void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) {
|
||||||
void *newptr = activeDefragAlloc(ptr);
|
UNUSED(ctx);
|
||||||
if (newptr)
|
return activeDefragAlloc(ptr);
|
||||||
ctx->defragged++;
|
|
||||||
|
|
||||||
return newptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Defrag a RedisModuleString previously allocated by RM_Alloc, RM_Calloc, etc.
|
/* Defrag a RedisModuleString previously allocated by RM_Alloc, RM_Calloc, etc.
|
||||||
|
@ -12669,7 +12665,8 @@ void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) {
|
||||||
* on the Redis side is dropped as soon as the command callback returns).
|
* on the Redis side is dropped as soon as the command callback returns).
|
||||||
*/
|
*/
|
||||||
RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisModuleString *str) {
|
RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisModuleString *str) {
|
||||||
return activeDefragStringOb(str, &ctx->defragged);
|
UNUSED(ctx);
|
||||||
|
return activeDefragStringOb(str);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -12678,11 +12675,11 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo
|
||||||
* Returns a zero value (and initializes the cursor) if no more needs to be done,
|
* Returns a zero value (and initializes the cursor) if no more needs to be done,
|
||||||
* or a non-zero value otherwise.
|
* or a non-zero value otherwise.
|
||||||
*/
|
*/
|
||||||
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid) {
|
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid) {
|
||||||
moduleValue *mv = value->ptr;
|
moduleValue *mv = value->ptr;
|
||||||
moduleType *mt = mv->type;
|
moduleType *mt = mv->type;
|
||||||
|
|
||||||
RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor, key, dbid};
|
RedisModuleDefragCtx defrag_ctx = { endtime, cursor, key, dbid};
|
||||||
|
|
||||||
/* Invoke callback. Note that the callback may be missing if the key has been
|
/* Invoke callback. Note that the callback may be missing if the key has been
|
||||||
* replaced with a different type since our last visit.
|
* replaced with a different type since our last visit.
|
||||||
|
@ -12691,7 +12688,6 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en
|
||||||
if (mt->defrag)
|
if (mt->defrag)
|
||||||
ret = mt->defrag(&defrag_ctx, key, &mv->value);
|
ret = mt->defrag(&defrag_ctx, key, &mv->value);
|
||||||
|
|
||||||
*defragged += defrag_ctx.defragged;
|
|
||||||
if (!ret) {
|
if (!ret) {
|
||||||
*cursor = 0; /* No more work to do */
|
*cursor = 0; /* No more work to do */
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -12706,7 +12702,7 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en
|
||||||
* Returns 1 if the operation has been completed or 0 if it needs to
|
* Returns 1 if the operation has been completed or 0 if it needs to
|
||||||
* be scheduled for late defrag.
|
* be scheduled for late defrag.
|
||||||
*/
|
*/
|
||||||
int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) {
|
int moduleDefragValue(robj *key, robj *value, int dbid) {
|
||||||
moduleValue *mv = value->ptr;
|
moduleValue *mv = value->ptr;
|
||||||
moduleType *mt = mv->type;
|
moduleType *mt = mv->type;
|
||||||
|
|
||||||
|
@ -12715,7 +12711,6 @@ int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) {
|
||||||
*/
|
*/
|
||||||
moduleValue *newmv = activeDefragAlloc(mv);
|
moduleValue *newmv = activeDefragAlloc(mv);
|
||||||
if (newmv) {
|
if (newmv) {
|
||||||
(*defragged)++;
|
|
||||||
value->ptr = mv = newmv;
|
value->ptr = mv = newmv;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12733,29 +12728,24 @@ int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) {
|
||||||
return 0; /* Defrag later */
|
return 0; /* Defrag later */
|
||||||
}
|
}
|
||||||
|
|
||||||
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, key, dbid};
|
RedisModuleDefragCtx defrag_ctx = { 0, NULL, key, dbid };
|
||||||
mt->defrag(&defrag_ctx, key, &mv->value);
|
mt->defrag(&defrag_ctx, key, &mv->value);
|
||||||
(*defragged) += defrag_ctx.defragged;
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Call registered module API defrag functions */
|
/* Call registered module API defrag functions */
|
||||||
long moduleDefragGlobals(void) {
|
void moduleDefragGlobals(void) {
|
||||||
dictIterator *di = dictGetIterator(modules);
|
dictIterator *di = dictGetIterator(modules);
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
long defragged = 0;
|
|
||||||
|
|
||||||
while ((de = dictNext(di)) != NULL) {
|
while ((de = dictNext(di)) != NULL) {
|
||||||
struct RedisModule *module = dictGetVal(de);
|
struct RedisModule *module = dictGetVal(de);
|
||||||
if (!module->defrag_cb)
|
if (!module->defrag_cb)
|
||||||
continue;
|
continue;
|
||||||
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, NULL, -1};
|
RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1};
|
||||||
module->defrag_cb(&defrag_ctx);
|
module->defrag_cb(&defrag_ctx);
|
||||||
defragged += defrag_ctx.defragged;
|
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
|
||||||
return defragged;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns the name of the key currently being processed.
|
/* Returns the name of the key currently being processed.
|
||||||
|
|
17
src/object.c
17
src/object.c
|
@ -1029,7 +1029,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
|
||||||
asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
|
asize = sizeof(*o)+sizeof(dict)+(sizeof(struct dictEntry*)*dictSlots(d));
|
||||||
while((de = dictNext(di)) != NULL && samples < sample_size) {
|
while((de = dictNext(di)) != NULL && samples < sample_size) {
|
||||||
ele = dictGetKey(de);
|
ele = dictGetKey(de);
|
||||||
elesize += sizeof(struct dictEntry) + sdsZmallocSize(ele);
|
elesize += dictEntryMemUsage() + sdsZmallocSize(ele);
|
||||||
samples++;
|
samples++;
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
@ -1053,7 +1053,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
|
||||||
zmalloc_size(zsl->header);
|
zmalloc_size(zsl->header);
|
||||||
while(znode != NULL && samples < sample_size) {
|
while(znode != NULL && samples < sample_size) {
|
||||||
elesize += sdsZmallocSize(znode->ele);
|
elesize += sdsZmallocSize(znode->ele);
|
||||||
elesize += sizeof(struct dictEntry)+zmalloc_size(znode);
|
elesize += dictEntryMemUsage()+zmalloc_size(znode);
|
||||||
samples++;
|
samples++;
|
||||||
znode = znode->level[0].forward;
|
znode = znode->level[0].forward;
|
||||||
}
|
}
|
||||||
|
@ -1072,7 +1072,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
|
||||||
ele = dictGetKey(de);
|
ele = dictGetKey(de);
|
||||||
ele2 = dictGetVal(de);
|
ele2 = dictGetVal(de);
|
||||||
elesize += sdsZmallocSize(ele) + sdsZmallocSize(ele2);
|
elesize += sdsZmallocSize(ele) + sdsZmallocSize(ele2);
|
||||||
elesize += sizeof(struct dictEntry);
|
elesize += dictEntryMemUsage();
|
||||||
samples++;
|
samples++;
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
@ -1242,19 +1242,18 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
|
||||||
mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1));
|
mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1));
|
||||||
mh->db[mh->num_dbs].dbid = j;
|
mh->db[mh->num_dbs].dbid = j;
|
||||||
|
|
||||||
mem = dictSize(db->dict) * sizeof(dictEntry) +
|
mem = dictMemUsage(db->dict) +
|
||||||
dictSlots(db->dict) * sizeof(dictEntry*) +
|
|
||||||
dictSize(db->dict) * sizeof(robj);
|
dictSize(db->dict) * sizeof(robj);
|
||||||
mh->db[mh->num_dbs].overhead_ht_main = mem;
|
mh->db[mh->num_dbs].overhead_ht_main = mem;
|
||||||
mem_total+=mem;
|
mem_total+=mem;
|
||||||
|
|
||||||
mem = dictSize(db->expires) * sizeof(dictEntry) +
|
mem = dictMemUsage(db->expires);
|
||||||
dictSlots(db->expires) * sizeof(dictEntry*);
|
|
||||||
mh->db[mh->num_dbs].overhead_ht_expires = mem;
|
mh->db[mh->num_dbs].overhead_ht_expires = mem;
|
||||||
mem_total+=mem;
|
mem_total+=mem;
|
||||||
|
|
||||||
/* Account for the slot to keys map in cluster mode */
|
/* Account for the slot to keys map in cluster mode */
|
||||||
mem = dictSize(db->dict) * dictMetadataSize(db->dict);
|
mem = dictSize(db->dict) * dictEntryMetadataSize(db->dict) +
|
||||||
|
dictMetadataSize(db->dict);
|
||||||
mh->db[mh->num_dbs].overhead_ht_slot_to_keys = mem;
|
mh->db[mh->num_dbs].overhead_ht_slot_to_keys = mem;
|
||||||
mem_total+=mem;
|
mem_total+=mem;
|
||||||
|
|
||||||
|
@ -1547,7 +1546,7 @@ NULL
|
||||||
}
|
}
|
||||||
size_t usage = objectComputeSize(c->argv[2],dictGetVal(de),samples,c->db->id);
|
size_t usage = objectComputeSize(c->argv[2],dictGetVal(de),samples,c->db->id);
|
||||||
usage += sdsZmallocSize(dictGetKey(de));
|
usage += sdsZmallocSize(dictGetKey(de));
|
||||||
usage += sizeof(dictEntry);
|
usage += dictEntryMemUsage();
|
||||||
usage += dictMetadataSize(c->db->dict);
|
usage += dictMetadataSize(c->db->dict);
|
||||||
addReplyLongLong(c,usage);
|
addReplyLongLong(c,usage);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
|
||||||
|
|
|
@ -727,10 +727,8 @@ size_t pubsubMemOverhead(client *c) {
|
||||||
/* PubSub patterns */
|
/* PubSub patterns */
|
||||||
size_t mem = listLength(c->pubsub_patterns) * sizeof(listNode);
|
size_t mem = listLength(c->pubsub_patterns) * sizeof(listNode);
|
||||||
/* Global PubSub channels */
|
/* Global PubSub channels */
|
||||||
mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) +
|
mem += dictMemUsage(c->pubsub_channels);
|
||||||
dictSlots(c->pubsub_channels) * sizeof(dictEntry*);
|
|
||||||
/* Sharded PubSub channels */
|
/* Sharded PubSub channels */
|
||||||
mem += dictSize(c->pubsubshard_channels) * sizeof(dictEntry) +
|
mem += dictMemUsage(c->pubsubshard_channels);
|
||||||
dictSlots(c->pubsubshard_channels) * sizeof(dictEntry*);
|
|
||||||
return mem;
|
return mem;
|
||||||
}
|
}
|
||||||
|
|
|
@ -762,7 +762,7 @@ void cliInitGroupHelpEntries(dict *groups) {
|
||||||
for (entry = dictNext(iter); entry != NULL; entry = dictNext(iter)) {
|
for (entry = dictNext(iter); entry != NULL; entry = dictNext(iter)) {
|
||||||
tmp.argc = 1;
|
tmp.argc = 1;
|
||||||
tmp.argv = zmalloc(sizeof(sds));
|
tmp.argv = zmalloc(sizeof(sds));
|
||||||
tmp.argv[0] = sdscatprintf(sdsempty(),"@%s",(char *)entry->key);
|
tmp.argv[0] = sdscatprintf(sdsempty(),"@%s",(char *)dictGetKey(entry));
|
||||||
tmp.full = tmp.argv[0];
|
tmp.full = tmp.argv[0];
|
||||||
tmp.type = CLI_HELP_GROUP;
|
tmp.type = CLI_HELP_GROUP;
|
||||||
tmp.org.name = NULL;
|
tmp.org.name = NULL;
|
||||||
|
|
17
src/server.c
17
src/server.c
|
@ -399,13 +399,24 @@ int dictExpandAllowed(size_t moreMem, double usedRatio) {
|
||||||
/* Returns the size of the DB dict entry metadata in bytes. In cluster mode, the
|
/* Returns the size of the DB dict entry metadata in bytes. In cluster mode, the
|
||||||
* metadata is used for constructing a doubly linked list of the dict entries
|
* metadata is used for constructing a doubly linked list of the dict entries
|
||||||
* belonging to the same cluster slot. See the Slot to Key API in cluster.c. */
|
* belonging to the same cluster slot. See the Slot to Key API in cluster.c. */
|
||||||
size_t dictEntryMetadataSize(dict *d) {
|
size_t dbDictEntryMetadataSize(dict *d) {
|
||||||
UNUSED(d);
|
UNUSED(d);
|
||||||
/* NOTICE: this also affects overhead_ht_slot_to_keys in getMemoryOverheadData.
|
/* NOTICE: this also affects overhead_ht_slot_to_keys in getMemoryOverheadData.
|
||||||
* If we ever add non-cluster related data here, that code must be modified too. */
|
* If we ever add non-cluster related data here, that code must be modified too. */
|
||||||
return server.cluster_enabled ? sizeof(clusterDictEntryMetadata) : 0;
|
return server.cluster_enabled ? sizeof(clusterDictEntryMetadata) : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Returns the size of the DB dict metadata in bytes. In cluster mode, we store
|
||||||
|
* a pointer to the db in the main db dict, used for updating the slot-to-key
|
||||||
|
* mapping when a dictEntry is reallocated. */
|
||||||
|
size_t dbDictMetadataSize(void) {
|
||||||
|
return server.cluster_enabled ? sizeof(clusterDictMetadata) : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void dbDictAfterReplaceEntry(dict *d, dictEntry *de) {
|
||||||
|
if (server.cluster_enabled) slotToKeyReplaceEntry(d, de);
|
||||||
|
}
|
||||||
|
|
||||||
/* Generic hash table type where keys are Redis Objects, Values
|
/* Generic hash table type where keys are Redis Objects, Values
|
||||||
* dummy pointers. */
|
* dummy pointers. */
|
||||||
dictType objectKeyPointerValueDictType = {
|
dictType objectKeyPointerValueDictType = {
|
||||||
|
@ -460,7 +471,9 @@ dictType dbDictType = {
|
||||||
dictSdsDestructor, /* key destructor */
|
dictSdsDestructor, /* key destructor */
|
||||||
dictObjectDestructor, /* val destructor */
|
dictObjectDestructor, /* val destructor */
|
||||||
dictExpandAllowed, /* allow to expand */
|
dictExpandAllowed, /* allow to expand */
|
||||||
dictEntryMetadataSize /* size of entry metadata in bytes */
|
dbDictEntryMetadataSize, /* size of entry metadata in bytes */
|
||||||
|
dbDictMetadataSize, /* size of dict metadata in bytes */
|
||||||
|
dbDictAfterReplaceEntry /* notify entry moved/reallocated */
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Db->expires */
|
/* Db->expires */
|
||||||
|
|
|
@ -2442,9 +2442,9 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags);
|
||||||
size_t moduleGetFreeEffort(robj *key, robj *val, int dbid);
|
size_t moduleGetFreeEffort(robj *key, robj *val, int dbid);
|
||||||
size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid);
|
size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid);
|
||||||
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
|
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
|
||||||
int moduleDefragValue(robj *key, robj *obj, long *defragged, int dbid);
|
int moduleDefragValue(robj *key, robj *obj, int dbid);
|
||||||
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid);
|
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid);
|
||||||
long moduleDefragGlobals(void);
|
void moduleDefragGlobals(void);
|
||||||
void *moduleGetHandleByName(char *modulename);
|
void *moduleGetHandleByName(char *modulename);
|
||||||
int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd);
|
int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd);
|
||||||
|
|
||||||
|
@ -2988,7 +2988,7 @@ void checkChildrenDone(void);
|
||||||
int setOOMScoreAdj(int process_class);
|
int setOOMScoreAdj(int process_class);
|
||||||
void rejectCommandFormat(client *c, const char *fmt, ...);
|
void rejectCommandFormat(client *c, const char *fmt, ...);
|
||||||
void *activeDefragAlloc(void *ptr);
|
void *activeDefragAlloc(void *ptr);
|
||||||
robj *activeDefragStringOb(robj* ob, long *defragged);
|
robj *activeDefragStringOb(robj* ob);
|
||||||
void dismissSds(sds s);
|
void dismissSds(sds s);
|
||||||
void dismissMemory(void* ptr, size_t size_hint);
|
void dismissMemory(void* ptr, size_t size_hint);
|
||||||
void dismissMemoryInChild(void);
|
void dismissMemoryInChild(void);
|
||||||
|
|
|
@ -1428,7 +1428,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, dou
|
||||||
/* Note that we did not removed the original element from
|
/* Note that we did not removed the original element from
|
||||||
* the hash table representing the sorted set, so we just
|
* the hash table representing the sorted set, so we just
|
||||||
* update the score. */
|
* update the score. */
|
||||||
dictGetVal(de) = &znode->score; /* Update score ptr. */
|
dictSetVal(zs->dict, de, &znode->score); /* Update score ptr. */
|
||||||
*out_flags |= ZADD_OUT_UPDATED;
|
*out_flags |= ZADD_OUT_UPDATED;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -2741,7 +2741,8 @@ void zunionInterDiffGenericCommand(client *c, robj *dstkey, int numkeysIndex, in
|
||||||
* Here we access directly the dictEntry double
|
* Here we access directly the dictEntry double
|
||||||
* value inside the union as it is a big speedup
|
* value inside the union as it is a big speedup
|
||||||
* compared to using the getDouble/setDouble API. */
|
* compared to using the getDouble/setDouble API. */
|
||||||
zunionInterAggregate(&existing->v.d,score,aggregate);
|
double *existing_score_ptr = dictGetDoubleValPtr(existing);
|
||||||
|
zunionInterAggregate(existing_score_ptr, score, aggregate);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
zuiClearIterator(&src[i]);
|
zuiClearIterator(&src[i]);
|
||||||
|
|
Loading…
Reference in New Issue