mirror of https://mirror.osredm.com/root/redis.git
Make RM_DefragRedisModuleDict API support incremental defragmentation for dict leaf (#13840)
After https://github.com/redis/redis/pull/13816, we make a new API to defrag RedisModuleDict. Currently, we only support incremental defragmentation of the dictionary itself, but the defragmentation of values is still not incremental. If the values are very large, it could lead to significant blocking. Therefore, in this PR, we have added incremental defragmentation for the values. The main change is to the `RedisModuleDefragDictValueCallback`, we modified the return value of this callback. When the callback returns 1, we will save the `seekTo` as the key of the current unfinished node, and the next time we enter, we will continue defragmenting this node. When the return value is 0, we will proceed to the next node. ## Test Since each dictionary in the global dict originally contained only 10 strings, but now it has been changed to a nested dictionary, each dictionary now has 10 sub-dictionaries, with each sub-dictionary containing 10 strings, this has led to a corresponding reduction in the defragmentation time obtained from other tests. Therefore, the other tests have been modified to always wait for defragmentation to be turned off before the test begins, then start it after creating fragmentation, ensuring that they can always run for a full defragmentation cycle. --------- Co-authored-by: ephraimfeldblum <ephraim.feldblum@redis.com>
This commit is contained in:
parent
7939ba031d
commit
f364dcca2d
21
src/module.c
21
src/module.c
|
@ -13959,8 +13959,9 @@ int moduleDefragRaxNode(raxNode **noderef) {
|
|||
/* Defragment a Redis Module Dictionary by scanning its contents and calling a value
|
||||
* callback for each value.
|
||||
*
|
||||
* The callback gets the current value in the dict, and should return non-NULL with a new pointer,
|
||||
* The callback gets the current value in the dict, and should update newptr to the new pointer,
|
||||
* if the value was re-allocated to a different address. The callback also gets the key name just as a reference.
|
||||
* The callback returns 0 when defrag is complete for this node, 1 when node needs more work.
|
||||
*
|
||||
* The API can work incrementally by accepting a seek position to continue from, and
|
||||
* returning the next position to seek to on the next call (or return NULL when the iteration is completed).
|
||||
|
@ -13983,13 +13984,15 @@ RedisModuleDict *RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModule
|
|||
|
||||
raxStart(&ri,dict->rax);
|
||||
if (*seekTo == NULL) {
|
||||
/* if last seek is NULL, we start new iteration */
|
||||
moduleDefragRaxNode(&dict->rax->head);
|
||||
/* assign the iterator node callback before the seek, so that the
|
||||
* initial nodes that are processed till the first item are covered */
|
||||
ri.node_cb = moduleDefragRaxNode;
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
} else {
|
||||
/* if cursor is non-zero, we seek to the static 'last' */
|
||||
if (!raxSeek(&ri,">", (*seekTo)->ptr, sdslen((*seekTo)->ptr))) {
|
||||
/* Seek to the static 'seekTo'. */
|
||||
if (!raxSeek(&ri,">=", (*seekTo)->ptr, sdslen((*seekTo)->ptr))) {
|
||||
goto cleanup;
|
||||
}
|
||||
/* assign the iterator node callback after the seek, so that the
|
||||
|
@ -13998,12 +14001,20 @@ RedisModuleDict *RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModule
|
|||
}
|
||||
|
||||
while (raxNext(&ri)) {
|
||||
int ret = 0;
|
||||
void *newdata = NULL;
|
||||
|
||||
if (valueCB) {
|
||||
void *newdata = valueCB(ctx, ri.data, ri.key, ri.key_len);
|
||||
ret = valueCB(ctx, ri.data, ri.key, ri.key_len, &newdata);
|
||||
if (newdata)
|
||||
raxSetData(ri.node, ri.data=newdata);
|
||||
}
|
||||
if (RM_DefragShouldStop(ctx)) {
|
||||
|
||||
/* Check if we need to interrupt defragmentation.
|
||||
* - For explicit interruption, use current position
|
||||
* - For timeout interruption, try to advance to next node if possible */
|
||||
if (ret == 1 || RM_DefragShouldStop(ctx)) {
|
||||
if (ret == 0 && !raxNext(&ri)) goto cleanup; /* Last node and no more work needed. */
|
||||
if (*seekTo) RM_FreeString(NULL, *seekTo);
|
||||
*seekTo = RM_CreateString(NULL, (const char *)ri.key, ri.key_len);
|
||||
raxStop(&ri);
|
||||
|
|
|
@ -842,7 +842,7 @@ typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_repor
|
|||
typedef void (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx);
|
||||
typedef int (*RedisModuleDefragFunc2)(RedisModuleDefragCtx *ctx);
|
||||
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
|
||||
typedef void *(*RedisModuleDefragDictValueCallback)(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen);
|
||||
typedef int (*RedisModuleDefragDictValueCallback)(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr);
|
||||
|
||||
/* ------------------------- End of common defines ------------------------ */
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@ unsigned long int defrag_ended = 0;
|
|||
unsigned long int global_strings_attempts = 0;
|
||||
unsigned long int global_strings_defragged = 0;
|
||||
unsigned long int global_dicts_resumes = 0; /* Number of dict defragmentation resumed from a previous break */
|
||||
unsigned long int global_subdicts_resumes = 0; /* Number of subdict defragmentation resumed from a previous break */
|
||||
unsigned long int global_dicts_attempts = 0; /* Number of attempts to defragment dictionary */
|
||||
unsigned long int global_dicts_defragged = 0; /* Number of dictionaries successfully defragmented */
|
||||
unsigned long int global_dicts_items_defragged = 0; /* Number of dictionaries items successfully defragmented */
|
||||
|
@ -85,32 +86,55 @@ static void createGlobalDicts(RedisModuleCtx *ctx, unsigned long count) {
|
|||
global_dicts_len = count;
|
||||
global_dicts = RedisModule_Alloc(sizeof(RedisModuleDict *) * count);
|
||||
|
||||
/* Create some nested dictionaries:
|
||||
* - Each main dict contains some subdicts.
|
||||
* - Each sub-dict contains some strings. */
|
||||
for (unsigned long i = 0; i < count; i++) {
|
||||
RedisModuleDict *dict = RedisModule_CreateDict(ctx);
|
||||
for (unsigned long j = 0; j < 10; j ++) {
|
||||
RedisModuleString *str = RedisModule_CreateStringFromULongLong(ctx, j);
|
||||
RedisModule_DictSet(dict, str, str);
|
||||
for (unsigned long j = 0; j < 10; j++) {
|
||||
/* Create sub dict. */
|
||||
RedisModuleDict *subdict = RedisModule_CreateDict(ctx);
|
||||
for (unsigned long k = 0; k < 10; k++) {
|
||||
RedisModuleString *str = RedisModule_CreateStringFromULongLong(ctx, k);
|
||||
RedisModule_DictSet(subdict, str, str);
|
||||
}
|
||||
|
||||
RedisModuleString *key = RedisModule_CreateStringFromULongLong(ctx, j);
|
||||
RedisModule_DictSet(dict, key, subdict);
|
||||
RedisModule_FreeString(ctx, key);
|
||||
}
|
||||
global_dicts[i] = dict;
|
||||
}
|
||||
}
|
||||
|
||||
static void freeFragGlobalSubDict(RedisModuleCtx *ctx, RedisModuleDict *subdict) {
|
||||
char *key;
|
||||
size_t keylen;
|
||||
RedisModuleString *str;
|
||||
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(subdict, "^", NULL, 0);
|
||||
while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&str))) {
|
||||
RedisModule_FreeString(ctx, str);
|
||||
}
|
||||
RedisModule_FreeDict(ctx, subdict);
|
||||
RedisModule_DictIteratorStop(iter);
|
||||
}
|
||||
|
||||
static void createFragGlobalDicts(RedisModuleCtx *ctx) {
|
||||
char *key;
|
||||
size_t keylen;
|
||||
RedisModuleString *val;
|
||||
RedisModuleDict *subdict;
|
||||
|
||||
for (unsigned long i = 0; i < global_dicts_len; i++) {
|
||||
RedisModuleDict *dict = global_dicts[i];
|
||||
if (!dict) continue;
|
||||
|
||||
/* Handle dictionaries differently based on their index in global_dicts array:
|
||||
* 1. For odd indices (i % 2 == 1): Remove the entire dictionary.
|
||||
* 1. For odd indices (i % 2 == 1): Remove the entire dictionary.
|
||||
* 2. For even indices: Keep the dictionary but remove half of its items. */
|
||||
if (i % 2 == 1) {
|
||||
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0);
|
||||
while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&val))) {
|
||||
RedisModule_FreeString(ctx, val);
|
||||
while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&subdict))) {
|
||||
freeFragGlobalSubDict(ctx, subdict);
|
||||
}
|
||||
RedisModule_FreeDict(ctx, dict);
|
||||
global_dicts[i] = NULL;
|
||||
|
@ -118,9 +142,9 @@ static void createFragGlobalDicts(RedisModuleCtx *ctx) {
|
|||
} else {
|
||||
int key_index = 0;
|
||||
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0);
|
||||
while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&val))) {
|
||||
while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&subdict))) {
|
||||
if (key_index++ % 2 == 1) {
|
||||
RedisModule_FreeString(ctx, val);
|
||||
freeFragGlobalSubDict(ctx, subdict);
|
||||
RedisModule_DictReplaceC(dict, key, keylen, NULL);
|
||||
}
|
||||
}
|
||||
|
@ -129,13 +153,26 @@ static void createFragGlobalDicts(RedisModuleCtx *ctx) {
|
|||
}
|
||||
}
|
||||
|
||||
static void *defragGlobalDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen) {
|
||||
static int defragGlobalSubDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr) {
|
||||
REDISMODULE_NOT_USED(key);
|
||||
REDISMODULE_NOT_USED(keylen);
|
||||
if (!data) return NULL;
|
||||
void *new = RedisModule_DefragAlloc(ctx, data);
|
||||
if (new) global_dicts_items_defragged++;
|
||||
return new;
|
||||
if (!data) return 0;
|
||||
*newptr = RedisModule_DefragAlloc(ctx, data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int defragGlobalDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr) {
|
||||
REDISMODULE_NOT_USED(key);
|
||||
REDISMODULE_NOT_USED(keylen);
|
||||
static RedisModuleString *seekTo = NULL;
|
||||
RedisModuleDict *subdict = data;
|
||||
if (!subdict) return 0;
|
||||
if (seekTo != NULL) global_subdicts_resumes++;
|
||||
|
||||
*newptr = RedisModule_DefragRedisModuleDict(ctx, subdict, defragGlobalSubDictValueCB, &seekTo);
|
||||
if (*newptr) global_dicts_items_defragged++;
|
||||
/* Return 1 if seekTo is not NULL, indicating this node needs more defrag work. */
|
||||
return seekTo != NULL;
|
||||
}
|
||||
|
||||
static int defragGlobalDicts(RedisModuleDefragCtx *ctx) {
|
||||
|
@ -218,6 +255,7 @@ static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) {
|
|||
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_resumes", global_dicts_resumes);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_subdicts_resumes", global_subdicts_resumes);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_attempts", global_dicts_attempts);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_defragged", global_dicts_defragged);
|
||||
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_items_defragged", global_dicts_items_defragged);
|
||||
|
@ -251,6 +289,7 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
|||
global_strings_attempts = 0;
|
||||
global_strings_defragged = 0;
|
||||
global_dicts_resumes = 0;
|
||||
global_subdicts_resumes = 0;
|
||||
global_dicts_attempts = 0;
|
||||
global_dicts_defragged = 0;
|
||||
global_dicts_items_defragged = 0;
|
||||
|
|
|
@ -12,9 +12,22 @@ start_server {tags {"modules"} overrides {{save ""}}} {
|
|||
if {[r config get activedefrag] eq "activedefrag yes"} {
|
||||
|
||||
test {Module defrag: simple key defrag works} {
|
||||
r config set activedefrag no
|
||||
wait_for_condition 100 50 {
|
||||
[s active_defrag_running] eq 0
|
||||
} else {
|
||||
fail "Unable to wait for active defrag to stop"
|
||||
}
|
||||
|
||||
r frag.create key1 1 1000 0
|
||||
|
||||
after 2000
|
||||
r config set activedefrag yes
|
||||
wait_for_condition 100 50 {
|
||||
[getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0
|
||||
} else {
|
||||
fail "Unable to wait for a complete defragmentation cycle to finish"
|
||||
}
|
||||
|
||||
set info [r info defragtest_stats]
|
||||
assert {[getInfoProperty $info defragtest_datatype_attempts] > 0}
|
||||
assert_equal 0 [getInfoProperty $info defragtest_datatype_resumes]
|
||||
|
@ -24,6 +37,13 @@ start_server {tags {"modules"} overrides {{save ""}}} {
|
|||
}
|
||||
|
||||
test {Module defrag: late defrag with cursor works} {
|
||||
r config set activedefrag no
|
||||
wait_for_condition 100 50 {
|
||||
[s active_defrag_running] eq 0
|
||||
} else {
|
||||
fail "Unable to wait for active defrag to stop"
|
||||
}
|
||||
|
||||
r flushdb
|
||||
r frag.resetstats
|
||||
|
||||
|
@ -31,7 +51,13 @@ start_server {tags {"modules"} overrides {{save ""}}} {
|
|||
# due to maxstep
|
||||
r frag.create key2 10000 100 1000
|
||||
|
||||
after 2000
|
||||
r config set activedefrag yes
|
||||
wait_for_condition 100 50 {
|
||||
[getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0
|
||||
} else {
|
||||
fail "Unable to wait for a complete defragmentation cycle to finish"
|
||||
}
|
||||
|
||||
set info [r info defragtest_stats]
|
||||
assert {[getInfoProperty $info defragtest_datatype_resumes] > 10}
|
||||
assert_equal 0 [getInfoProperty $info defragtest_datatype_wrong_cursor]
|
||||
|
@ -67,6 +93,7 @@ start_server {tags {"modules"} overrides {{save ""}}} {
|
|||
assert_morethan [getInfoProperty $info defragtest_defrag_started] 0
|
||||
assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0
|
||||
assert_morethan [getInfoProperty $info defragtest_global_dicts_resumes] [getInfoProperty $info defragtest_defrag_ended]
|
||||
assert_morethan [getInfoProperty $info defragtest_global_subdicts_resumes] [getInfoProperty $info defragtest_defrag_ended]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue