Merge remote-tracking branch 'upstream/unstable' into HEAD

This commit is contained in:
YaacovHazan 2025-03-09 10:08:03 +02:00
commit a39ffc1fe9
4 changed files with 99 additions and 22 deletions

View File

@ -13959,8 +13959,9 @@ int moduleDefragRaxNode(raxNode **noderef) {
/* Defragment a Redis Module Dictionary by scanning its contents and calling a value /* Defragment a Redis Module Dictionary by scanning its contents and calling a value
* callback for each value. * callback for each value.
* *
* The callback gets the current value in the dict, and should return non-NULL with a new pointer, * The callback gets the current value in the dict, and should update newptr to the new pointer,
* if the value was re-allocated to a different address. The callback also gets the key name just as a reference. * if the value was re-allocated to a different address. The callback also gets the key name just as a reference.
* The callback returns 0 when defrag is complete for this node, 1 when node needs more work.
* *
* The API can work incrementally by accepting a seek position to continue from, and * The API can work incrementally by accepting a seek position to continue from, and
* returning the next position to seek to on the next call (or return NULL when the iteration is completed). * returning the next position to seek to on the next call (or return NULL when the iteration is completed).
@ -13983,13 +13984,15 @@ RedisModuleDict *RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModule
raxStart(&ri,dict->rax); raxStart(&ri,dict->rax);
if (*seekTo == NULL) { if (*seekTo == NULL) {
/* if last seek is NULL, we start new iteration */
moduleDefragRaxNode(&dict->rax->head);
/* assign the iterator node callback before the seek, so that the /* assign the iterator node callback before the seek, so that the
* initial nodes that are processed till the first item are covered */ * initial nodes that are processed till the first item are covered */
ri.node_cb = moduleDefragRaxNode; ri.node_cb = moduleDefragRaxNode;
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
} else { } else {
/* if cursor is non-zero, we seek to the static 'last' */ /* Seek to the static 'seekTo'. */
if (!raxSeek(&ri,">", (*seekTo)->ptr, sdslen((*seekTo)->ptr))) { if (!raxSeek(&ri,">=", (*seekTo)->ptr, sdslen((*seekTo)->ptr))) {
goto cleanup; goto cleanup;
} }
/* assign the iterator node callback after the seek, so that the /* assign the iterator node callback after the seek, so that the
@ -13998,12 +14001,20 @@ RedisModuleDict *RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModule
} }
while (raxNext(&ri)) { while (raxNext(&ri)) {
int ret = 0;
void *newdata = NULL;
if (valueCB) { if (valueCB) {
void *newdata = valueCB(ctx, ri.data, ri.key, ri.key_len); ret = valueCB(ctx, ri.data, ri.key, ri.key_len, &newdata);
if (newdata) if (newdata)
raxSetData(ri.node, ri.data=newdata); raxSetData(ri.node, ri.data=newdata);
} }
if (RM_DefragShouldStop(ctx)) {
/* Check if we need to interrupt defragmentation.
* - For explicit interruption, use current position
* - For timeout interruption, try to advance to next node if possible */
if (ret == 1 || RM_DefragShouldStop(ctx)) {
if (ret == 0 && !raxNext(&ri)) goto cleanup; /* Last node and no more work needed. */
if (*seekTo) RM_FreeString(NULL, *seekTo); if (*seekTo) RM_FreeString(NULL, *seekTo);
*seekTo = RM_CreateString(NULL, (const char *)ri.key, ri.key_len); *seekTo = RM_CreateString(NULL, (const char *)ri.key, ri.key_len);
raxStop(&ri); raxStop(&ri);

View File

@ -842,7 +842,7 @@ typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_repor
typedef void (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx); typedef void (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx);
typedef int (*RedisModuleDefragFunc2)(RedisModuleDefragCtx *ctx); typedef int (*RedisModuleDefragFunc2)(RedisModuleDefragCtx *ctx);
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata); typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
typedef void *(*RedisModuleDefragDictValueCallback)(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen); typedef int (*RedisModuleDefragDictValueCallback)(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr);
/* ------------------------- End of common defines ------------------------ */ /* ------------------------- End of common defines ------------------------ */

View File

@ -28,6 +28,7 @@ unsigned long int defrag_ended = 0;
unsigned long int global_strings_attempts = 0; unsigned long int global_strings_attempts = 0;
unsigned long int global_strings_defragged = 0; unsigned long int global_strings_defragged = 0;
unsigned long int global_dicts_resumes = 0; /* Number of dict defragmentation resumed from a previous break */ unsigned long int global_dicts_resumes = 0; /* Number of dict defragmentation resumed from a previous break */
unsigned long int global_subdicts_resumes = 0; /* Number of subdict defragmentation resumed from a previous break */
unsigned long int global_dicts_attempts = 0; /* Number of attempts to defragment dictionary */ unsigned long int global_dicts_attempts = 0; /* Number of attempts to defragment dictionary */
unsigned long int global_dicts_defragged = 0; /* Number of dictionaries successfully defragmented */ unsigned long int global_dicts_defragged = 0; /* Number of dictionaries successfully defragmented */
unsigned long int global_dicts_items_defragged = 0; /* Number of dictionaries items successfully defragmented */ unsigned long int global_dicts_items_defragged = 0; /* Number of dictionaries items successfully defragmented */
@ -85,20 +86,43 @@ static void createGlobalDicts(RedisModuleCtx *ctx, unsigned long count) {
global_dicts_len = count; global_dicts_len = count;
global_dicts = RedisModule_Alloc(sizeof(RedisModuleDict *) * count); global_dicts = RedisModule_Alloc(sizeof(RedisModuleDict *) * count);
/* Create some nested dictionaries:
* - Each main dict contains some subdicts.
* - Each sub-dict contains some strings. */
for (unsigned long i = 0; i < count; i++) { for (unsigned long i = 0; i < count; i++) {
RedisModuleDict *dict = RedisModule_CreateDict(ctx); RedisModuleDict *dict = RedisModule_CreateDict(ctx);
for (unsigned long j = 0; j < 10; j ++) { for (unsigned long j = 0; j < 10; j++) {
RedisModuleString *str = RedisModule_CreateStringFromULongLong(ctx, j); /* Create sub dict. */
RedisModule_DictSet(dict, str, str); RedisModuleDict *subdict = RedisModule_CreateDict(ctx);
for (unsigned long k = 0; k < 10; k++) {
RedisModuleString *str = RedisModule_CreateStringFromULongLong(ctx, k);
RedisModule_DictSet(subdict, str, str);
}
RedisModuleString *key = RedisModule_CreateStringFromULongLong(ctx, j);
RedisModule_DictSet(dict, key, subdict);
RedisModule_FreeString(ctx, key);
} }
global_dicts[i] = dict; global_dicts[i] = dict;
} }
} }
static void freeFragGlobalSubDict(RedisModuleCtx *ctx, RedisModuleDict *subdict) {
char *key;
size_t keylen;
RedisModuleString *str;
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(subdict, "^", NULL, 0);
while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&str))) {
RedisModule_FreeString(ctx, str);
}
RedisModule_FreeDict(ctx, subdict);
RedisModule_DictIteratorStop(iter);
}
static void createFragGlobalDicts(RedisModuleCtx *ctx) { static void createFragGlobalDicts(RedisModuleCtx *ctx) {
char *key; char *key;
size_t keylen; size_t keylen;
RedisModuleString *val; RedisModuleDict *subdict;
for (unsigned long i = 0; i < global_dicts_len; i++) { for (unsigned long i = 0; i < global_dicts_len; i++) {
RedisModuleDict *dict = global_dicts[i]; RedisModuleDict *dict = global_dicts[i];
@ -109,8 +133,8 @@ static void createFragGlobalDicts(RedisModuleCtx *ctx) {
* 2. For even indices: Keep the dictionary but remove half of its items. */ * 2. For even indices: Keep the dictionary but remove half of its items. */
if (i % 2 == 1) { if (i % 2 == 1) {
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0); RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0);
while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&val))) { while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&subdict))) {
RedisModule_FreeString(ctx, val); freeFragGlobalSubDict(ctx, subdict);
} }
RedisModule_FreeDict(ctx, dict); RedisModule_FreeDict(ctx, dict);
global_dicts[i] = NULL; global_dicts[i] = NULL;
@ -118,9 +142,9 @@ static void createFragGlobalDicts(RedisModuleCtx *ctx) {
} else { } else {
int key_index = 0; int key_index = 0;
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0); RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0);
while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&val))) { while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&subdict))) {
if (key_index++ % 2 == 1) { if (key_index++ % 2 == 1) {
RedisModule_FreeString(ctx, val); freeFragGlobalSubDict(ctx, subdict);
RedisModule_DictReplaceC(dict, key, keylen, NULL); RedisModule_DictReplaceC(dict, key, keylen, NULL);
} }
} }
@ -129,13 +153,26 @@ static void createFragGlobalDicts(RedisModuleCtx *ctx) {
} }
} }
static void *defragGlobalDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen) { static int defragGlobalSubDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr) {
REDISMODULE_NOT_USED(key); REDISMODULE_NOT_USED(key);
REDISMODULE_NOT_USED(keylen); REDISMODULE_NOT_USED(keylen);
if (!data) return NULL; if (!data) return 0;
void *new = RedisModule_DefragAlloc(ctx, data); *newptr = RedisModule_DefragAlloc(ctx, data);
if (new) global_dicts_items_defragged++; return 0;
return new; }
static int defragGlobalDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr) {
REDISMODULE_NOT_USED(key);
REDISMODULE_NOT_USED(keylen);
static RedisModuleString *seekTo = NULL;
RedisModuleDict *subdict = data;
if (!subdict) return 0;
if (seekTo != NULL) global_subdicts_resumes++;
*newptr = RedisModule_DefragRedisModuleDict(ctx, subdict, defragGlobalSubDictValueCB, &seekTo);
if (*newptr) global_dicts_items_defragged++;
/* Return 1 if seekTo is not NULL, indicating this node needs more defrag work. */
return seekTo != NULL;
} }
static int defragGlobalDicts(RedisModuleDefragCtx *ctx) { static int defragGlobalDicts(RedisModuleDefragCtx *ctx) {
@ -218,6 +255,7 @@ static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) {
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts); RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts);
RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged); RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged);
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_resumes", global_dicts_resumes); RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_resumes", global_dicts_resumes);
RedisModule_InfoAddFieldLongLong(ctx, "global_subdicts_resumes", global_subdicts_resumes);
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_attempts", global_dicts_attempts); RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_attempts", global_dicts_attempts);
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_defragged", global_dicts_defragged); RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_defragged", global_dicts_defragged);
RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_items_defragged", global_dicts_items_defragged); RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_items_defragged", global_dicts_items_defragged);
@ -251,6 +289,7 @@ static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
global_strings_attempts = 0; global_strings_attempts = 0;
global_strings_defragged = 0; global_strings_defragged = 0;
global_dicts_resumes = 0; global_dicts_resumes = 0;
global_subdicts_resumes = 0;
global_dicts_attempts = 0; global_dicts_attempts = 0;
global_dicts_defragged = 0; global_dicts_defragged = 0;
global_dicts_items_defragged = 0; global_dicts_items_defragged = 0;

View File

@ -12,9 +12,22 @@ start_server {tags {"modules"} overrides {{save ""}}} {
if {[r config get activedefrag] eq "activedefrag yes"} { if {[r config get activedefrag] eq "activedefrag yes"} {
test {Module defrag: simple key defrag works} { test {Module defrag: simple key defrag works} {
r config set activedefrag no
wait_for_condition 100 50 {
[s active_defrag_running] eq 0
} else {
fail "Unable to wait for active defrag to stop"
}
r frag.create key1 1 1000 0 r frag.create key1 1 1000 0
after 2000 r config set activedefrag yes
wait_for_condition 100 50 {
[getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0
} else {
fail "Unable to wait for a complete defragmentation cycle to finish"
}
set info [r info defragtest_stats] set info [r info defragtest_stats]
assert {[getInfoProperty $info defragtest_datatype_attempts] > 0} assert {[getInfoProperty $info defragtest_datatype_attempts] > 0}
assert_equal 0 [getInfoProperty $info defragtest_datatype_resumes] assert_equal 0 [getInfoProperty $info defragtest_datatype_resumes]
@ -24,6 +37,13 @@ start_server {tags {"modules"} overrides {{save ""}}} {
} }
test {Module defrag: late defrag with cursor works} { test {Module defrag: late defrag with cursor works} {
r config set activedefrag no
wait_for_condition 100 50 {
[s active_defrag_running] eq 0
} else {
fail "Unable to wait for active defrag to stop"
}
r flushdb r flushdb
r frag.resetstats r frag.resetstats
@ -31,7 +51,13 @@ start_server {tags {"modules"} overrides {{save ""}}} {
# due to maxstep # due to maxstep
r frag.create key2 10000 100 1000 r frag.create key2 10000 100 1000
after 2000 r config set activedefrag yes
wait_for_condition 100 50 {
[getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0
} else {
fail "Unable to wait for a complete defragmentation cycle to finish"
}
set info [r info defragtest_stats] set info [r info defragtest_stats]
assert {[getInfoProperty $info defragtest_datatype_resumes] > 10} assert {[getInfoProperty $info defragtest_datatype_resumes] > 10}
assert_equal 0 [getInfoProperty $info defragtest_datatype_wrong_cursor] assert_equal 0 [getInfoProperty $info defragtest_datatype_wrong_cursor]
@ -67,6 +93,7 @@ start_server {tags {"modules"} overrides {{save ""}}} {
assert_morethan [getInfoProperty $info defragtest_defrag_started] 0 assert_morethan [getInfoProperty $info defragtest_defrag_started] 0
assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0 assert_morethan [getInfoProperty $info defragtest_defrag_ended] 0
assert_morethan [getInfoProperty $info defragtest_global_dicts_resumes] [getInfoProperty $info defragtest_defrag_ended] assert_morethan [getInfoProperty $info defragtest_global_dicts_resumes] [getInfoProperty $info defragtest_defrag_ended]
assert_morethan [getInfoProperty $info defragtest_global_subdicts_resumes] [getInfoProperty $info defragtest_defrag_ended]
} }
} }
} }