diff --git a/src/module.c b/src/module.c index a5110c3b8..aacc797f5 100644 --- a/src/module.c +++ b/src/module.c @@ -13959,8 +13959,9 @@ int moduleDefragRaxNode(raxNode **noderef) { /* Defragment a Redis Module Dictionary by scanning its contents and calling a value * callback for each value. * - * The callback gets the current value in the dict, and should return non-NULL with a new pointer, + * The callback gets the current value in the dict, and should update newptr to the new pointer, * if the value was re-allocated to a different address. The callback also gets the key name just as a reference. + * The callback returns 0 when defrag is complete for this node, 1 when node needs more work. * * The API can work incrementally by accepting a seek position to continue from, and * returning the next position to seek to on the next call (or return NULL when the iteration is completed). @@ -13983,13 +13984,15 @@ RedisModuleDict *RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModule raxStart(&ri,dict->rax); if (*seekTo == NULL) { + /* if last seek is NULL, we start new iteration */ + moduleDefragRaxNode(&dict->rax->head); /* assign the iterator node callback before the seek, so that the * initial nodes that are processed till the first item are covered */ ri.node_cb = moduleDefragRaxNode; raxSeek(&ri,"^",NULL,0); } else { - /* if cursor is non-zero, we seek to the static 'last' */ - if (!raxSeek(&ri,">", (*seekTo)->ptr, sdslen((*seekTo)->ptr))) { + /* Seek to the static 'seekTo'. */ + if (!raxSeek(&ri,">=", (*seekTo)->ptr, sdslen((*seekTo)->ptr))) { goto cleanup; } /* assign the iterator node callback after the seek, so that the @@ -13998,12 +14001,20 @@ RedisModuleDict *RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModule } while (raxNext(&ri)) { + int ret = 0; + void *newdata = NULL; + if (valueCB) { - void *newdata = valueCB(ctx, ri.data, ri.key, ri.key_len); + ret = valueCB(ctx, ri.data, ri.key, ri.key_len, &newdata); if (newdata) raxSetData(ri.node, ri.data=newdata); } - if (RM_DefragShouldStop(ctx)) { + + /* Check if we need to interrupt defragmentation. + * - For explicit interruption, use current position + * - For timeout interruption, try to advance to next node if possible */ + if (ret == 1 || RM_DefragShouldStop(ctx)) { + if (ret == 0 && !raxNext(&ri)) goto cleanup; /* Last node and no more work needed. */ if (*seekTo) RM_FreeString(NULL, *seekTo); *seekTo = RM_CreateString(NULL, (const char *)ri.key, ri.key_len); raxStop(&ri); diff --git a/src/redismodule.h b/src/redismodule.h index 3328b49cd..0b395feaf 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -842,7 +842,7 @@ typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_repor typedef void (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx); typedef int (*RedisModuleDefragFunc2)(RedisModuleDefragCtx *ctx); typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); -typedef void *(*RedisModuleDefragDictValueCallback)(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen); +typedef int (*RedisModuleDefragDictValueCallback)(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr); /* ------------------------- End of common defines ------------------------ */ diff --git a/tests/modules/defragtest.c b/tests/modules/defragtest.c index 08a6ee080..cdb548058 100644 --- a/tests/modules/defragtest.c +++ b/tests/modules/defragtest.c @@ -28,6 +28,7 @@ unsigned long int defrag_ended = 0; unsigned long int global_strings_attempts = 0; unsigned long int global_strings_defragged = 0; unsigned long int global_dicts_resumes = 0; /* Number of dict defragmentation resumed from a previous break */ +unsigned long int global_subdicts_resumes = 0; /* Number of subdict defragmentation resumed from a previous break */ unsigned long int global_dicts_attempts = 0; /* Number of attempts to defragment dictionary */ unsigned long int global_dicts_defragged = 0; /* Number of dictionaries successfully defragmented */ unsigned long int global_dicts_items_defragged = 0; /* Number of dictionaries items successfully defragmented */ @@ -85,32 +86,55 @@ static void createGlobalDicts(RedisModuleCtx *ctx, unsigned long count) { global_dicts_len = count; global_dicts = RedisModule_Alloc(sizeof(RedisModuleDict *) * count); + /* Create some nested dictionaries: + * - Each main dict contains some subdicts. + * - Each sub-dict contains some strings. */ for (unsigned long i = 0; i < count; i++) { RedisModuleDict *dict = RedisModule_CreateDict(ctx); - for (unsigned long j = 0; j < 10; j ++) { - RedisModuleString *str = RedisModule_CreateStringFromULongLong(ctx, j); - RedisModule_DictSet(dict, str, str); + for (unsigned long j = 0; j < 10; j++) { + /* Create sub dict. */ + RedisModuleDict *subdict = RedisModule_CreateDict(ctx); + for (unsigned long k = 0; k < 10; k++) { + RedisModuleString *str = RedisModule_CreateStringFromULongLong(ctx, k); + RedisModule_DictSet(subdict, str, str); + } + + RedisModuleString *key = RedisModule_CreateStringFromULongLong(ctx, j); + RedisModule_DictSet(dict, key, subdict); + RedisModule_FreeString(ctx, key); } global_dicts[i] = dict; } } +static void freeFragGlobalSubDict(RedisModuleCtx *ctx, RedisModuleDict *subdict) { + char *key; + size_t keylen; + RedisModuleString *str; + RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(subdict, "^", NULL, 0); + while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&str))) { + RedisModule_FreeString(ctx, str); + } + RedisModule_FreeDict(ctx, subdict); + RedisModule_DictIteratorStop(iter); +} + static void createFragGlobalDicts(RedisModuleCtx *ctx) { char *key; size_t keylen; - RedisModuleString *val; + RedisModuleDict *subdict; for (unsigned long i = 0; i < global_dicts_len; i++) { RedisModuleDict *dict = global_dicts[i]; if (!dict) continue; /* Handle dictionaries differently based on their index in global_dicts array: - * 1. For odd indices (i % 2 == 1): Remove the entire dictionary. + * 1. For odd indices (i % 2 == 1): Remove the entire dictionary. * 2. For even indices: Keep the dictionary but remove half of its items. */ if (i % 2 == 1) { RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0); - while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&val))) { - RedisModule_FreeString(ctx, val); + while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&subdict))) { + freeFragGlobalSubDict(ctx, subdict); } RedisModule_FreeDict(ctx, dict); global_dicts[i] = NULL; @@ -118,9 +142,9 @@ static void createFragGlobalDicts(RedisModuleCtx *ctx) { } else { int key_index = 0; RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0); - while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&val))) { + while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&subdict))) { if (key_index++ % 2 == 1) { - RedisModule_FreeString(ctx, val); + freeFragGlobalSubDict(ctx, subdict); RedisModule_DictReplaceC(dict, key, keylen, NULL); } } @@ -129,13 +153,26 @@ static void createFragGlobalDicts(RedisModuleCtx *ctx) { } } -static void *defragGlobalDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen) { +static int defragGlobalSubDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr) { REDISMODULE_NOT_USED(key); REDISMODULE_NOT_USED(keylen); - if (!data) return NULL; - void *new = RedisModule_DefragAlloc(ctx, data); - if (new) global_dicts_items_defragged++; - return new; + if (!data) return 0; + *newptr = RedisModule_DefragAlloc(ctx, data); + return 0; +} + +static int defragGlobalDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr) { + REDISMODULE_NOT_USED(key); + REDISMODULE_NOT_USED(keylen); + static RedisModuleString *seekTo = NULL; + RedisModuleDict *subdict = data; + if (!subdict) return 0; + if (seekTo != NULL) global_subdicts_resumes++; + + *newptr = RedisModule_DefragRedisModuleDict(ctx, subdict, defragGlobalSubDictValueCB, &seekTo); + if (*newptr) global_dicts_items_defragged++; + /* Return 1 if seekTo is not NULL, indicating this node needs more defrag work. */ + return seekTo != NULL; } static int defragGlobalDicts(RedisModuleDefragCtx *ctx) { @@ -218,6 +255,7 @@ static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) { RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts); RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged); RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_resumes", global_dicts_resumes); + RedisModule_InfoAddFieldLongLong(ctx, "global_subdicts_resumes", global_subdicts_resumes); RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_attempts", global_dicts_attempts); RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_defragged", global_dicts_defragged); RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_items_defragged", global_dicts_items_defragged); @@ -251,6 +289,7 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv, global_strings_attempts = 0; global_strings_defragged = 0; global_dicts_resumes = 0; + global_subdicts_resumes = 0; global_dicts_attempts = 0; global_dicts_defragged = 0; global_dicts_items_defragged = 0; diff --git a/tests/unit/moduleapi/defrag.tcl b/tests/unit/moduleapi/defrag.tcl index b44ce5830..110b9094d 100644 --- a/tests/unit/moduleapi/defrag.tcl +++ b/tests/unit/moduleapi/defrag.tcl @@ -12,9 +12,22 @@ start_server {tags {"modules"} overrides {{save ""}}} { if {[r config get activedefrag] eq "activedefrag yes"} { test {Module defrag: simple key defrag works} { + r config set activedefrag no + wait_for_condition 100 50 { + [s active_defrag_running] eq 0 + } else { + fail "Unable to wait for active defrag to stop" + } + r frag.create key1 1 1000 0 - after 2000 + r config set activedefrag yes + wait_for_condition 100 50 { + [getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0 + } else { + fail "Unable to wait for a complete defragmentation cycle to finish" + } + set info [r info defragtest_stats] assert {[getInfoProperty $info defragtest_datatype_attempts] > 0} assert_equal 0 [getInfoProperty $info defragtest_datatype_resumes] @@ -24,6 +37,13 @@ start_server {tags {"modules"} overrides {{save ""}}} { } test {Module defrag: late defrag with cursor works} { + r config set activedefrag no + wait_for_condition 100 50 { + [s active_defrag_running] eq 0 + } else { + fail "Unable to wait for active defrag to stop" + } + r flushdb r frag.resetstats @@ -31,7 +51,13 @@ start_server {tags {"modules"} overrides {{save ""}}} { # due to maxstep r frag.create key2 10000 100 1000 - after 2000 + r config set activedefrag yes + wait_for_condition 100 50 { + [getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0 + } else { + fail "Unable to wait for a complete defragmentation cycle to finish" + } + set info [r info defragtest_stats] assert {[getInfoProperty $info defragtest_datatype_resumes] > 10} assert_equal 0 [getInfoProperty $info defragtest_datatype_wrong_cursor] @@ -67,6 +93,7 @@ start_server {tags {"modules"} overrides {{save ""}}} { assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 assert_morethan [getInfoProperty $info defragtest_global_dicts_resumes] [getInfoProperty $info defragtest_defrag_ended] + assert_morethan [getInfoProperty $info defragtest_global_subdicts_resumes] [getInfoProperty $info defragtest_defrag_ended] } } }