mirror of https://mirror.osredm.com/root/redis.git
Move stat_active_defrag_hits increment to activeDefragAlloc
instead of passing it around to every defrag function
This commit is contained in:
parent
b60d33c91e
commit
2bbc89196a
268
src/defrag.c
268
src/defrag.c
|
@ -64,6 +64,7 @@ void* activeDefragAlloc(void *ptr) {
|
|||
newptr = zmalloc_no_tcache(size);
|
||||
memcpy(newptr, ptr, size);
|
||||
zfree_no_tcache(ptr);
|
||||
server.stat_active_defrag_hits++;
|
||||
return newptr;
|
||||
}
|
||||
|
||||
|
@ -88,7 +89,7 @@ sds activeDefragSds(sds sdsptr) {
|
|||
* returns NULL in case the allocation wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
robj *activeDefragStringOb(robj* ob, long *defragged) {
|
||||
robj *activeDefragStringOb(robj* ob) {
|
||||
robj *ret = NULL;
|
||||
if (ob->refcount!=1)
|
||||
return NULL;
|
||||
|
@ -97,7 +98,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) {
|
|||
if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
|
||||
if ((ret = activeDefragAlloc(ob))) {
|
||||
ob = ret;
|
||||
(*defragged)++;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -107,7 +107,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) {
|
|||
sds newsds = activeDefragSds((sds)ob->ptr);
|
||||
if (newsds) {
|
||||
ob->ptr = newsds;
|
||||
(*defragged)++;
|
||||
}
|
||||
} else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
|
||||
/* The sds is embedded in the object allocation, calculate the
|
||||
|
@ -115,7 +114,6 @@ robj *activeDefragStringOb(robj* ob, long *defragged) {
|
|||
long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
|
||||
if ((ret = activeDefragAlloc(ob))) {
|
||||
ret->ptr = (void*)((intptr_t)ret + ofs);
|
||||
(*defragged)++;
|
||||
}
|
||||
} else if (ob->encoding!=OBJ_ENCODING_INT) {
|
||||
serverPanic("Unknown string encoding");
|
||||
|
@ -129,17 +127,16 @@ robj *activeDefragStringOb(robj* ob, long *defragged) {
|
|||
* returns NULL in case the allocation wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
luaScript *activeDefragLuaScript(luaScript *script, long *defragged) {
|
||||
luaScript *activeDefragLuaScript(luaScript *script) {
|
||||
luaScript *ret = NULL;
|
||||
|
||||
/* try to defrag script struct */
|
||||
if ((ret = activeDefragAlloc(script))) {
|
||||
script = ret;
|
||||
(*defragged)++;
|
||||
}
|
||||
|
||||
/* try to defrag actual script object */
|
||||
robj *ob = activeDefragStringOb(script->body, defragged);
|
||||
robj *ob = activeDefragStringOb(script->body);
|
||||
if (ob) script->body = ob;
|
||||
|
||||
return ret;
|
||||
|
@ -148,20 +145,18 @@ luaScript *activeDefragLuaScript(luaScript *script, long *defragged) {
|
|||
/* Defrag helper for dict main allocations (dict struct, and hash tables).
|
||||
* receives a pointer to the dict* and implicitly updates it when the dict
|
||||
* struct itself was moved. Returns a stat of how many pointers were moved. */
|
||||
long dictDefragTables(dict* d) {
|
||||
void dictDefragTables(dict* d) {
|
||||
dictEntry **newtable;
|
||||
long defragged = 0;
|
||||
/* handle the first hash table */
|
||||
newtable = activeDefragAlloc(d->ht_table[0]);
|
||||
if (newtable)
|
||||
defragged++, d->ht_table[0] = newtable;
|
||||
d->ht_table[0] = newtable;
|
||||
/* handle the second hash table */
|
||||
if (d->ht_table[1]) {
|
||||
newtable = activeDefragAlloc(d->ht_table[1]);
|
||||
if (newtable)
|
||||
defragged++, d->ht_table[1] = newtable;
|
||||
d->ht_table[1] = newtable;
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* Internal function used by zslDefrag */
|
||||
|
@ -225,19 +220,16 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
|
|||
|
||||
/* Defrag helper for sorted set.
|
||||
* Defrag a single dict entry key name, and corresponding skiplist struct */
|
||||
long activeDefragZsetEntry(zset *zs, dictEntry *de) {
|
||||
void activeDefragZsetEntry(zset *zs, dictEntry *de) {
|
||||
sds newsds;
|
||||
double* newscore;
|
||||
long defragged = 0;
|
||||
sds sdsele = dictGetKey(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, dictSetKey(zs->dict, de, newsds);
|
||||
dictSetKey(zs->dict, de, newsds);
|
||||
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
|
||||
if (newscore) {
|
||||
dictSetVal(zs->dict, de, newscore);
|
||||
defragged++;
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
#define DEFRAG_SDS_DICT_NO_VAL 0
|
||||
|
@ -249,7 +241,6 @@ long activeDefragZsetEntry(zset *zs, dictEntry *de) {
|
|||
typedef struct {
|
||||
dict *dict;
|
||||
int val_type;
|
||||
long defragged;
|
||||
} activeDefragSdsDictData;
|
||||
|
||||
void activeDefragSdsDictCallback(void *privdata, const dictEntry *_de) {
|
||||
|
@ -259,41 +250,39 @@ void activeDefragSdsDictCallback(void *privdata, const dictEntry *_de) {
|
|||
int val_type = data->val_type;
|
||||
sds sdsele = dictGetKey(de), newsds;
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
dictSetKey(d, de, newsds), data->defragged++;
|
||||
dictSetKey(d, de, newsds);
|
||||
/* defrag the value */
|
||||
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
||||
sdsele = dictGetVal(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
dictSetVal(d, de, newsds), data->defragged++;
|
||||
dictSetVal(d, de, newsds);
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
||||
robj *newele, *ele = dictGetVal(de);
|
||||
if ((newele = activeDefragStringOb(ele, &data->defragged)))
|
||||
if ((newele = activeDefragStringOb(ele)))
|
||||
dictSetVal(d, de, newele);
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
||||
void *newptr, *ptr = dictGetVal(de);
|
||||
if ((newptr = activeDefragAlloc(ptr)))
|
||||
dictSetVal(d, de, newptr), data->defragged++;
|
||||
dictSetVal(d, de, newptr);
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT) {
|
||||
void *newptr, *ptr = dictGetVal(de);
|
||||
if ((newptr = activeDefragLuaScript(ptr, &data->defragged)))
|
||||
if ((newptr = activeDefragLuaScript(ptr)))
|
||||
dictSetVal(d, de, newptr);
|
||||
}
|
||||
}
|
||||
|
||||
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
|
||||
long activeDefragSdsDict(dict* d, int val_type) {
|
||||
activeDefragSdsDictData data = {d, val_type, 0};
|
||||
void activeDefragSdsDict(dict* d, int val_type) {
|
||||
activeDefragSdsDictData data = {d, val_type};
|
||||
unsigned long cursor = 0;
|
||||
do {
|
||||
cursor = dictScanDefrag(d, cursor, activeDefragSdsDictCallback,
|
||||
activeDefragAlloc, &data);
|
||||
} while (cursor != 0);
|
||||
return data.defragged;
|
||||
}
|
||||
|
||||
/* Defrag a list of ptr, sds or robj string values */
|
||||
long activeDefragList(list *l, int val_type) {
|
||||
long defragged = 0;
|
||||
void activeDefragList(list *l, int val_type) {
|
||||
listNode *ln, *newln;
|
||||
for (ln = l->head; ln; ln = ln->next) {
|
||||
if ((newln = activeDefragAlloc(ln))) {
|
||||
|
@ -306,28 +295,25 @@ long activeDefragList(list *l, int val_type) {
|
|||
else
|
||||
l->tail = newln;
|
||||
ln = newln;
|
||||
defragged++;
|
||||
}
|
||||
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
||||
sds newsds, sdsele = ln->value;
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
ln->value = newsds, defragged++;
|
||||
ln->value = newsds;
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
||||
robj *newele, *ele = ln->value;
|
||||
if ((newele = activeDefragStringOb(ele, &defragged)))
|
||||
if ((newele = activeDefragStringOb(ele)))
|
||||
ln->value = newele;
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
||||
void *newptr, *ptr = ln->value;
|
||||
if ((newptr = activeDefragAlloc(ptr)))
|
||||
ln->value = newptr, defragged++;
|
||||
ln->value = newptr;
|
||||
}
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
|
||||
void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
|
||||
quicklistNode *newnode, *node = *node_ref;
|
||||
long defragged = 0;
|
||||
unsigned char *newzl;
|
||||
if ((newnode = activeDefragAlloc(node))) {
|
||||
if (newnode->prev)
|
||||
|
@ -339,21 +325,17 @@ long activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
|
|||
else
|
||||
ql->tail = newnode;
|
||||
*node_ref = node = newnode;
|
||||
defragged++;
|
||||
}
|
||||
if ((newzl = activeDefragAlloc(node->entry)))
|
||||
defragged++, node->entry = newzl;
|
||||
return defragged;
|
||||
node->entry = newzl;
|
||||
}
|
||||
|
||||
long activeDefragQuickListNodes(quicklist *ql) {
|
||||
void activeDefragQuickListNodes(quicklist *ql) {
|
||||
quicklistNode *node = ql->head;
|
||||
long defragged = 0;
|
||||
while (node) {
|
||||
defragged += activeDefragQuickListNode(ql, &node);
|
||||
activeDefragQuickListNode(ql, &node);
|
||||
node = node->next;
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* when the value has lots of elements, we want to handle it later and not as
|
||||
|
@ -365,7 +347,7 @@ void defragLater(redisDb *db, dictEntry *kde) {
|
|||
}
|
||||
|
||||
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
|
||||
long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
|
||||
long scanLaterList(robj *ob, unsigned long *cursor, long long endtime) {
|
||||
quicklist *ql = ob->ptr;
|
||||
quicklistNode *node;
|
||||
long iterations = 0;
|
||||
|
@ -388,7 +370,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long
|
|||
|
||||
(*cursor)++;
|
||||
while (node) {
|
||||
(*defragged) += activeDefragQuickListNode(ql, &node);
|
||||
activeDefragQuickListNode(ql, &node);
|
||||
server.stat_active_defrag_scanned++;
|
||||
if (++iterations > 128 && !bookmark_failed) {
|
||||
if (ustime() > endtime) {
|
||||
|
@ -410,29 +392,26 @@ long scanLaterList(robj *ob, unsigned long *cursor, long long endtime, long long
|
|||
|
||||
typedef struct {
|
||||
zset *zs;
|
||||
long defragged;
|
||||
} scanLaterZsetData;
|
||||
|
||||
void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
|
||||
dictEntry *de = (dictEntry*)_de;
|
||||
scanLaterZsetData *data = privdata;
|
||||
data->defragged += activeDefragZsetEntry(data->zs, de);
|
||||
activeDefragZsetEntry(data->zs, de);
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
long scanLaterZset(robj *ob, unsigned long *cursor) {
|
||||
void scanLaterZset(robj *ob, unsigned long *cursor) {
|
||||
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
|
||||
return 0;
|
||||
return;
|
||||
zset *zs = (zset*)ob->ptr;
|
||||
dict *d = zs->dict;
|
||||
scanLaterZsetData data = {zs, 0};
|
||||
scanLaterZsetData data = {zs};
|
||||
*cursor = dictScanDefrag(d, *cursor, scanLaterZsetCallback, activeDefragAlloc, &data);
|
||||
return data.defragged;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
dict *dict;
|
||||
long defragged;
|
||||
} scanLaterDictData;
|
||||
|
||||
void scanLaterSetCallback(void *privdata, const dictEntry *_de) {
|
||||
|
@ -440,17 +419,16 @@ void scanLaterSetCallback(void *privdata, const dictEntry *_de) {
|
|||
scanLaterDictData *data = privdata;
|
||||
sds sdsele = dictGetKey(de), newsds;
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
data->defragged++, dictSetKey(data->dict, de, newsds);
|
||||
dictSetKey(data->dict, de, newsds);
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
long scanLaterSet(robj *ob, unsigned long *cursor) {
|
||||
void scanLaterSet(robj *ob, unsigned long *cursor) {
|
||||
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT)
|
||||
return 0;
|
||||
return;
|
||||
dict *d = ob->ptr;
|
||||
scanLaterDictData data = {d, 0};
|
||||
scanLaterDictData data = {d};
|
||||
*cursor = dictScanDefrag(d, *cursor, scanLaterSetCallback, activeDefragAlloc, &data);
|
||||
return data.defragged;
|
||||
}
|
||||
|
||||
void scanLaterHashCallback(void *privdata, const dictEntry *_de) {
|
||||
|
@ -458,39 +436,35 @@ void scanLaterHashCallback(void *privdata, const dictEntry *_de) {
|
|||
scanLaterDictData *data = privdata;
|
||||
sds sdsele = dictGetKey(de), newsds;
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
data->defragged++, dictSetKey(data->dict, de, newsds);
|
||||
dictSetKey(data->dict, de, newsds);
|
||||
sdsele = dictGetVal(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
data->defragged++, dictSetVal(data->dict, de, newsds);
|
||||
dictSetVal(data->dict, de, newsds);
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
long scanLaterHash(robj *ob, unsigned long *cursor) {
|
||||
void scanLaterHash(robj *ob, unsigned long *cursor) {
|
||||
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT)
|
||||
return 0;
|
||||
return;
|
||||
dict *d = ob->ptr;
|
||||
scanLaterDictData data = {d, 0};
|
||||
scanLaterDictData data = {d};
|
||||
*cursor = dictScanDefrag(d, *cursor, scanLaterHashCallback, activeDefragAlloc, &data);
|
||||
return data.defragged;
|
||||
}
|
||||
|
||||
long defragQuicklist(redisDb *db, dictEntry *kde) {
|
||||
void defragQuicklist(redisDb *db, dictEntry *kde) {
|
||||
robj *ob = dictGetVal(kde);
|
||||
long defragged = 0;
|
||||
quicklist *ql = ob->ptr, *newql;
|
||||
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
|
||||
if ((newql = activeDefragAlloc(ql)))
|
||||
defragged++, ob->ptr = ql = newql;
|
||||
ob->ptr = ql = newql;
|
||||
if (ql->len > server.active_defrag_max_scan_fields)
|
||||
defragLater(db, kde);
|
||||
else
|
||||
defragged += activeDefragQuickListNodes(ql);
|
||||
return defragged;
|
||||
activeDefragQuickListNodes(ql);
|
||||
}
|
||||
|
||||
long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
|
||||
void defragZsetSkiplist(redisDb *db, dictEntry *kde) {
|
||||
robj *ob = dictGetVal(kde);
|
||||
long defragged = 0;
|
||||
zset *zs = (zset*)ob->ptr;
|
||||
zset *newzs;
|
||||
zskiplist *newzsl;
|
||||
|
@ -499,30 +473,28 @@ long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
|
|||
struct zskiplistNode *newheader;
|
||||
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
|
||||
if ((newzs = activeDefragAlloc(zs)))
|
||||
defragged++, ob->ptr = zs = newzs;
|
||||
ob->ptr = zs = newzs;
|
||||
if ((newzsl = activeDefragAlloc(zs->zsl)))
|
||||
defragged++, zs->zsl = newzsl;
|
||||
zs->zsl = newzsl;
|
||||
if ((newheader = activeDefragAlloc(zs->zsl->header)))
|
||||
defragged++, zs->zsl->header = newheader;
|
||||
zs->zsl->header = newheader;
|
||||
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)
|
||||
defragLater(db, kde);
|
||||
else {
|
||||
dictIterator *di = dictGetIterator(zs->dict);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
defragged += activeDefragZsetEntry(zs, de);
|
||||
activeDefragZsetEntry(zs, de);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
/* handle the dict struct */
|
||||
if ((newdict = activeDefragAlloc(zs->dict)))
|
||||
defragged++, zs->dict = newdict;
|
||||
zs->dict = newdict;
|
||||
/* defrag the dict tables */
|
||||
defragged += dictDefragTables(zs->dict);
|
||||
return defragged;
|
||||
dictDefragTables(zs->dict);
|
||||
}
|
||||
|
||||
long defragHash(redisDb *db, dictEntry *kde) {
|
||||
long defragged = 0;
|
||||
void defragHash(redisDb *db, dictEntry *kde) {
|
||||
robj *ob = dictGetVal(kde);
|
||||
dict *d, *newd;
|
||||
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
|
||||
|
@ -530,17 +502,15 @@ long defragHash(redisDb *db, dictEntry *kde) {
|
|||
if (dictSize(d) > server.active_defrag_max_scan_fields)
|
||||
defragLater(db, kde);
|
||||
else
|
||||
defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
|
||||
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
|
||||
/* handle the dict struct */
|
||||
if ((newd = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newd;
|
||||
ob->ptr = newd;
|
||||
/* defrag the dict tables */
|
||||
defragged += dictDefragTables(ob->ptr);
|
||||
return defragged;
|
||||
dictDefragTables(ob->ptr);
|
||||
}
|
||||
|
||||
long defragSet(redisDb *db, dictEntry *kde) {
|
||||
long defragged = 0;
|
||||
void defragSet(redisDb *db, dictEntry *kde) {
|
||||
robj *ob = dictGetVal(kde);
|
||||
dict *d, *newd;
|
||||
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
|
||||
|
@ -548,13 +518,12 @@ long defragSet(redisDb *db, dictEntry *kde) {
|
|||
if (dictSize(d) > server.active_defrag_max_scan_fields)
|
||||
defragLater(db, kde);
|
||||
else
|
||||
defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
|
||||
activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
|
||||
/* handle the dict struct */
|
||||
if ((newd = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newd;
|
||||
ob->ptr = newd;
|
||||
/* defrag the dict tables */
|
||||
defragged += dictDefragTables(ob->ptr);
|
||||
return defragged;
|
||||
dictDefragTables(ob->ptr);
|
||||
}
|
||||
|
||||
/* Defrag callback for radix tree iterator, called for each node,
|
||||
|
@ -569,7 +538,7 @@ int defragRaxNode(raxNode **noderef) {
|
|||
}
|
||||
|
||||
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
|
||||
int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime, long long *defragged) {
|
||||
int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime) {
|
||||
static unsigned char last[sizeof(streamID)];
|
||||
raxIterator ri;
|
||||
long iterations = 0;
|
||||
|
@ -603,7 +572,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime,
|
|||
while (raxNext(&ri)) {
|
||||
void *newdata = activeDefragAlloc(ri.data);
|
||||
if (newdata)
|
||||
raxSetData(ri.node, ri.data=newdata), (*defragged)++;
|
||||
raxSetData(ri.node, ri.data=newdata);
|
||||
server.stat_active_defrag_scanned++;
|
||||
if (++iterations > 128) {
|
||||
if (ustime() > endtime) {
|
||||
|
@ -621,19 +590,18 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, long long endtime,
|
|||
}
|
||||
|
||||
/* optional callback used defrag each rax element (not including the element pointer itself) */
|
||||
typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata, long *defragged);
|
||||
typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata);
|
||||
|
||||
/* defrag radix tree including:
|
||||
* 1) rax struct
|
||||
* 2) rax nodes
|
||||
* 3) rax entry data (only if defrag_data is specified)
|
||||
* 4) call a callback per element, and allow the callback to return a new pointer for the element */
|
||||
long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
|
||||
long defragged = 0;
|
||||
void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
|
||||
raxIterator ri;
|
||||
rax* rax;
|
||||
if ((rax = activeDefragAlloc(*raxref)))
|
||||
defragged++, *raxref = rax;
|
||||
*raxref = rax;
|
||||
rax = *raxref;
|
||||
raxStart(&ri,rax);
|
||||
ri.node_cb = defragRaxNode;
|
||||
|
@ -642,14 +610,13 @@ long defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_c
|
|||
while (raxNext(&ri)) {
|
||||
void *newdata = NULL;
|
||||
if (element_cb)
|
||||
newdata = element_cb(&ri, element_cb_data, &defragged);
|
||||
newdata = element_cb(&ri, element_cb_data);
|
||||
if (defrag_data && !newdata)
|
||||
newdata = activeDefragAlloc(ri.data);
|
||||
if (newdata)
|
||||
raxSetData(ri.node, ri.data=newdata), defragged++;
|
||||
raxSetData(ri.node, ri.data=newdata);
|
||||
}
|
||||
raxStop(&ri);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
|
@ -657,8 +624,7 @@ typedef struct {
|
|||
streamConsumer *c;
|
||||
} PendingEntryContext;
|
||||
|
||||
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *defragged) {
|
||||
UNUSED(defragged);
|
||||
void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
|
||||
PendingEntryContext *ctx = privdata;
|
||||
streamNACK *nack = ri->data, *newnack;
|
||||
nack->consumer = ctx->c; /* update nack pointer to consumer */
|
||||
|
@ -668,90 +634,81 @@ void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata, long *de
|
|||
void *prev;
|
||||
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
|
||||
serverAssert(prev==nack);
|
||||
/* note: we don't increment 'defragged' that's done by the caller */
|
||||
}
|
||||
return newnack;
|
||||
}
|
||||
|
||||
void* defragStreamConsumer(raxIterator *ri, void *privdata, long *defragged) {
|
||||
void* defragStreamConsumer(raxIterator *ri, void *privdata) {
|
||||
streamConsumer *c = ri->data;
|
||||
streamCG *cg = privdata;
|
||||
void *newc = activeDefragAlloc(c);
|
||||
if (newc) {
|
||||
/* note: we don't increment 'defragged' that's done by the caller */
|
||||
c = newc;
|
||||
}
|
||||
sds newsds = activeDefragSds(c->name);
|
||||
if (newsds)
|
||||
(*defragged)++, c->name = newsds;
|
||||
c->name = newsds;
|
||||
if (c->pel) {
|
||||
PendingEntryContext pel_ctx = {cg, c};
|
||||
*defragged += defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
|
||||
defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
|
||||
}
|
||||
return newc; /* returns NULL if c was not defragged */
|
||||
}
|
||||
|
||||
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata, long *defragged) {
|
||||
void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
|
||||
streamCG *cg = ri->data;
|
||||
UNUSED(privdata);
|
||||
if (cg->consumers)
|
||||
*defragged += defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
|
||||
defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
|
||||
if (cg->pel)
|
||||
*defragged += defragRadixTree(&cg->pel, 0, NULL, NULL);
|
||||
defragRadixTree(&cg->pel, 0, NULL, NULL);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
long defragStream(redisDb *db, dictEntry *kde) {
|
||||
long defragged = 0;
|
||||
void defragStream(redisDb *db, dictEntry *kde) {
|
||||
robj *ob = dictGetVal(kde);
|
||||
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
|
||||
stream *s = ob->ptr, *news;
|
||||
|
||||
/* handle the main struct */
|
||||
if ((news = activeDefragAlloc(s)))
|
||||
defragged++, ob->ptr = s = news;
|
||||
ob->ptr = s = news;
|
||||
|
||||
if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
|
||||
rax *newrax = activeDefragAlloc(s->rax);
|
||||
if (newrax)
|
||||
defragged++, s->rax = newrax;
|
||||
s->rax = newrax;
|
||||
defragLater(db, kde);
|
||||
} else
|
||||
defragged += defragRadixTree(&s->rax, 1, NULL, NULL);
|
||||
defragRadixTree(&s->rax, 1, NULL, NULL);
|
||||
|
||||
if (s->cgroups)
|
||||
defragged += defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
|
||||
return defragged;
|
||||
defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
|
||||
}
|
||||
|
||||
/* Defrag a module key. This is either done immediately or scheduled
|
||||
* for later. Returns then number of pointers defragged.
|
||||
*/
|
||||
long defragModule(redisDb *db, dictEntry *kde) {
|
||||
void defragModule(redisDb *db, dictEntry *kde) {
|
||||
robj *obj = dictGetVal(kde);
|
||||
serverAssert(obj->type == OBJ_MODULE);
|
||||
long defragged = 0;
|
||||
|
||||
if (!moduleDefragValue(dictGetKey(kde), obj, &defragged, db->id))
|
||||
if (!moduleDefragValue(dictGetKey(kde), obj, db->id))
|
||||
defragLater(db, kde);
|
||||
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* for each key we scan in the main dict, this function will attempt to defrag
|
||||
* all the various pointers it has. Returns a stat of how many pointers were
|
||||
* moved. */
|
||||
long defragKey(redisDb *db, dictEntry *de) {
|
||||
void defragKey(redisDb *db, dictEntry *de) {
|
||||
sds keysds = dictGetKey(de);
|
||||
robj *newob, *ob;
|
||||
unsigned char *newzl;
|
||||
long defragged = 0;
|
||||
sds newsds;
|
||||
|
||||
/* Try to defrag the key name. */
|
||||
newsds = activeDefragSds(keysds);
|
||||
if (newsds) {
|
||||
defragged++;
|
||||
dictSetKey(db->dict, de, newsds);
|
||||
if (dictSize(db->expires)) {
|
||||
/* We can't search in db->expires for that key after we've released
|
||||
|
@ -765,7 +722,7 @@ long defragKey(redisDb *db, dictEntry *de) {
|
|||
|
||||
/* Try to defrag robj and / or string value. */
|
||||
ob = dictGetVal(de);
|
||||
if ((newob = activeDefragStringOb(ob, &defragged))) {
|
||||
if ((newob = activeDefragStringOb(ob))) {
|
||||
dictSetVal(db->dict, de, newob);
|
||||
ob = newob;
|
||||
}
|
||||
|
@ -774,58 +731,57 @@ long defragKey(redisDb *db, dictEntry *de) {
|
|||
/* Already handled in activeDefragStringOb. */
|
||||
} else if (ob->type == OBJ_LIST) {
|
||||
if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
|
||||
defragged += defragQuicklist(db, de);
|
||||
defragQuicklist(db, de);
|
||||
} else if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
ob->ptr = newzl;
|
||||
} else {
|
||||
serverPanic("Unknown list encoding");
|
||||
}
|
||||
} else if (ob->type == OBJ_SET) {
|
||||
if (ob->encoding == OBJ_ENCODING_HT) {
|
||||
defragged += defragSet(db, de);
|
||||
defragSet(db, de);
|
||||
} else if (ob->encoding == OBJ_ENCODING_INTSET ||
|
||||
ob->encoding == OBJ_ENCODING_LISTPACK)
|
||||
{
|
||||
void *newptr, *ptr = ob->ptr;
|
||||
if ((newptr = activeDefragAlloc(ptr)))
|
||||
defragged++, ob->ptr = newptr;
|
||||
ob->ptr = newptr;
|
||||
} else {
|
||||
serverPanic("Unknown set encoding");
|
||||
}
|
||||
} else if (ob->type == OBJ_ZSET) {
|
||||
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
ob->ptr = newzl;
|
||||
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
defragged += defragZsetSkiplist(db, de);
|
||||
defragZsetSkiplist(db, de);
|
||||
} else {
|
||||
serverPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else if (ob->type == OBJ_HASH) {
|
||||
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
ob->ptr = newzl;
|
||||
} else if (ob->encoding == OBJ_ENCODING_HT) {
|
||||
defragged += defragHash(db, de);
|
||||
defragHash(db, de);
|
||||
} else {
|
||||
serverPanic("Unknown hash encoding");
|
||||
}
|
||||
} else if (ob->type == OBJ_STREAM) {
|
||||
defragged += defragStream(db, de);
|
||||
defragStream(db, de);
|
||||
} else if (ob->type == OBJ_MODULE) {
|
||||
defragged += defragModule(db, de);
|
||||
defragModule(db, de);
|
||||
} else {
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* Defrag scan callback for the main db dictionary. */
|
||||
void defragScanCallback(void *privdata, const dictEntry *de) {
|
||||
long defragged = defragKey((redisDb*)privdata, (dictEntry*)de);
|
||||
server.stat_active_defrag_hits += defragged;
|
||||
if(defragged)
|
||||
long long hits_before = server.stat_active_defrag_hits;
|
||||
defragKey((redisDb*)privdata, (dictEntry*)de);
|
||||
if (server.stat_active_defrag_hits != hits_before)
|
||||
server.stat_active_defrag_key_hits++;
|
||||
else
|
||||
server.stat_active_defrag_key_misses++;
|
||||
|
@ -863,15 +819,13 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) {
|
|||
|
||||
/* We may need to defrag other globals, one small allocation can hold a full allocator run.
|
||||
* so although small, it is still important to defrag these */
|
||||
long defragOtherGlobals() {
|
||||
long defragged = 0;
|
||||
void defragOtherGlobals() {
|
||||
|
||||
/* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc.
|
||||
* but we assume most of these are short lived, we only need to defrag allocations
|
||||
* that remain static for a long time */
|
||||
defragged += activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT);
|
||||
defragged += moduleDefragGlobals();
|
||||
return defragged;
|
||||
activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT);
|
||||
moduleDefragGlobals();
|
||||
}
|
||||
|
||||
/* returns 0 more work may or may not be needed (see non-zero cursor),
|
||||
|
@ -880,17 +834,17 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int
|
|||
if (de) {
|
||||
robj *ob = dictGetVal(de);
|
||||
if (ob->type == OBJ_LIST) {
|
||||
return scanLaterList(ob, cursor, endtime, &server.stat_active_defrag_hits);
|
||||
return scanLaterList(ob, cursor, endtime);
|
||||
} else if (ob->type == OBJ_SET) {
|
||||
server.stat_active_defrag_hits += scanLaterSet(ob, cursor);
|
||||
scanLaterSet(ob, cursor);
|
||||
} else if (ob->type == OBJ_ZSET) {
|
||||
server.stat_active_defrag_hits += scanLaterZset(ob, cursor);
|
||||
scanLaterZset(ob, cursor);
|
||||
} else if (ob->type == OBJ_HASH) {
|
||||
server.stat_active_defrag_hits += scanLaterHash(ob, cursor);
|
||||
scanLaterHash(ob, cursor);
|
||||
} else if (ob->type == OBJ_STREAM) {
|
||||
return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
|
||||
return scanLaterStreamListpacks(ob, cursor, endtime);
|
||||
} else if (ob->type == OBJ_MODULE) {
|
||||
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits, dbid);
|
||||
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, dbid);
|
||||
} else {
|
||||
*cursor = 0; /* object type may have changed since we schedule it for later */
|
||||
}
|
||||
|
@ -1061,7 +1015,7 @@ void activeDefragCycle(void) {
|
|||
/* Move on to next database, and stop if we reached the last one. */
|
||||
if (++current_db >= server.dbnum) {
|
||||
/* defrag other items not part of the db / keys */
|
||||
server.stat_active_defrag_hits += defragOtherGlobals();
|
||||
defragOtherGlobals();
|
||||
|
||||
long long now = ustime();
|
||||
size_t frag_bytes;
|
||||
|
@ -1100,19 +1054,14 @@ void activeDefragCycle(void) {
|
|||
|
||||
/* Scan the keyspace dict unless we're scanning the expire dict. */
|
||||
if (!expires_cursor)
|
||||
cursor = dictScanDefrag(db->dict,
|
||||
cursor,
|
||||
defragScanCallback,
|
||||
activeDefragAlloc,
|
||||
db);
|
||||
cursor = dictScanDefrag(db->dict, cursor, defragScanCallback,
|
||||
activeDefragAlloc, db);
|
||||
|
||||
/* When done scanning the keyspace dict, we scan the expire dict. */
|
||||
if (!cursor)
|
||||
expires_cursor = dictScanDefrag(db->expires,
|
||||
expires_cursor,
|
||||
expires_cursor = dictScanDefrag(db->expires, expires_cursor,
|
||||
defragExpireScanCallback,
|
||||
activeDefragAlloc,
|
||||
NULL);
|
||||
activeDefragAlloc, NULL);
|
||||
|
||||
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
|
||||
* (if we have a lot of pointers in one hash bucket or rehashing),
|
||||
|
@ -1159,9 +1108,8 @@ void *activeDefragAlloc(void *ptr) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
robj *activeDefragStringOb(robj *ob, long *defragged) {
|
||||
robj *activeDefragStringOb(robj *ob) {
|
||||
UNUSED(ob);
|
||||
UNUSED(defragged);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
30
src/module.c
30
src/module.c
|
@ -12562,7 +12562,6 @@ const char *RM_GetCurrentCommandName(RedisModuleCtx *ctx) {
|
|||
* defrag callback.
|
||||
*/
|
||||
struct RedisModuleDefragCtx {
|
||||
long defragged;
|
||||
long long int endtime;
|
||||
unsigned long *cursor;
|
||||
struct redisObject *key; /* Optional name of key processed, NULL when unknown. */
|
||||
|
@ -12651,11 +12650,8 @@ int RM_DefragCursorGet(RedisModuleDefragCtx *ctx, unsigned long *cursor) {
|
|||
* be used again.
|
||||
*/
|
||||
void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) {
|
||||
void *newptr = activeDefragAlloc(ptr);
|
||||
if (newptr)
|
||||
ctx->defragged++;
|
||||
|
||||
return newptr;
|
||||
UNUSED(ctx);
|
||||
return activeDefragAlloc(ptr);
|
||||
}
|
||||
|
||||
/* Defrag a RedisModuleString previously allocated by RM_Alloc, RM_Calloc, etc.
|
||||
|
@ -12669,7 +12665,8 @@ void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) {
|
|||
* on the Redis side is dropped as soon as the command callback returns).
|
||||
*/
|
||||
RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisModuleString *str) {
|
||||
return activeDefragStringOb(str, &ctx->defragged);
|
||||
UNUSED(ctx);
|
||||
return activeDefragStringOb(str);
|
||||
}
|
||||
|
||||
|
||||
|
@ -12678,11 +12675,11 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo
|
|||
* Returns a zero value (and initializes the cursor) if no more needs to be done,
|
||||
* or a non-zero value otherwise.
|
||||
*/
|
||||
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid) {
|
||||
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid) {
|
||||
moduleValue *mv = value->ptr;
|
||||
moduleType *mt = mv->type;
|
||||
|
||||
RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor, key, dbid};
|
||||
RedisModuleDefragCtx defrag_ctx = { endtime, cursor, key, dbid};
|
||||
|
||||
/* Invoke callback. Note that the callback may be missing if the key has been
|
||||
* replaced with a different type since our last visit.
|
||||
|
@ -12691,7 +12688,6 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en
|
|||
if (mt->defrag)
|
||||
ret = mt->defrag(&defrag_ctx, key, &mv->value);
|
||||
|
||||
*defragged += defrag_ctx.defragged;
|
||||
if (!ret) {
|
||||
*cursor = 0; /* No more work to do */
|
||||
return 0;
|
||||
|
@ -12706,7 +12702,7 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en
|
|||
* Returns 1 if the operation has been completed or 0 if it needs to
|
||||
* be scheduled for late defrag.
|
||||
*/
|
||||
int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) {
|
||||
int moduleDefragValue(robj *key, robj *value, int dbid) {
|
||||
moduleValue *mv = value->ptr;
|
||||
moduleType *mt = mv->type;
|
||||
|
||||
|
@ -12715,7 +12711,6 @@ int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) {
|
|||
*/
|
||||
moduleValue *newmv = activeDefragAlloc(mv);
|
||||
if (newmv) {
|
||||
(*defragged)++;
|
||||
value->ptr = mv = newmv;
|
||||
}
|
||||
|
||||
|
@ -12733,29 +12728,24 @@ int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) {
|
|||
return 0; /* Defrag later */
|
||||
}
|
||||
|
||||
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, key, dbid};
|
||||
RedisModuleDefragCtx defrag_ctx = { 0, NULL, key, dbid };
|
||||
mt->defrag(&defrag_ctx, key, &mv->value);
|
||||
(*defragged) += defrag_ctx.defragged;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Call registered module API defrag functions */
|
||||
long moduleDefragGlobals(void) {
|
||||
void moduleDefragGlobals(void) {
|
||||
dictIterator *di = dictGetIterator(modules);
|
||||
dictEntry *de;
|
||||
long defragged = 0;
|
||||
|
||||
while ((de = dictNext(di)) != NULL) {
|
||||
struct RedisModule *module = dictGetVal(de);
|
||||
if (!module->defrag_cb)
|
||||
continue;
|
||||
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, NULL, -1};
|
||||
RedisModuleDefragCtx defrag_ctx = { 0, NULL, NULL, -1};
|
||||
module->defrag_cb(&defrag_ctx);
|
||||
defragged += defrag_ctx.defragged;
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* Returns the name of the key currently being processed.
|
||||
|
|
|
@ -2442,9 +2442,9 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags);
|
|||
size_t moduleGetFreeEffort(robj *key, robj *val, int dbid);
|
||||
size_t moduleGetMemUsage(robj *key, robj *val, size_t sample_size, int dbid);
|
||||
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
|
||||
int moduleDefragValue(robj *key, robj *obj, long *defragged, int dbid);
|
||||
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid);
|
||||
long moduleDefragGlobals(void);
|
||||
int moduleDefragValue(robj *key, robj *obj, int dbid);
|
||||
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, int dbid);
|
||||
void moduleDefragGlobals(void);
|
||||
void *moduleGetHandleByName(char *modulename);
|
||||
int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd);
|
||||
|
||||
|
@ -2988,7 +2988,7 @@ void checkChildrenDone(void);
|
|||
int setOOMScoreAdj(int process_class);
|
||||
void rejectCommandFormat(client *c, const char *fmt, ...);
|
||||
void *activeDefragAlloc(void *ptr);
|
||||
robj *activeDefragStringOb(robj* ob, long *defragged);
|
||||
robj *activeDefragStringOb(robj* ob);
|
||||
void dismissSds(sds s);
|
||||
void dismissMemory(void* ptr, size_t size_hint);
|
||||
void dismissMemoryInChild(void);
|
||||
|
|
Loading…
Reference in New Issue