Fix rax node defragmentaion being skipped (#13847)

First, when we do `raxSeek()` and then call raxNext, we will get the
`RAX_ITER_JUST_SEEKED` flag and return success directly.
We always set the node defrag callback after `raxSeek()`, which means
that when we break from defragmentation, the first node that comes in
again will never be defragged.

In this PR, we save the last as the next node to be processed, not the
last node to be completed.
This way we defrag the next node when we exit to avoid it being skipped
on the next resume.

---------

Co-authored-by: oranagra <oran@redislabs.com>
This commit is contained in:
debing.sun 2025-03-24 08:57:08 +08:00 committed by GitHub
parent 427c36888e
commit 87b7c3ac1a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 15 additions and 5 deletions

View File

@ -653,7 +653,7 @@ int defragRaxNode(raxNode **noderef) {
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) {
static unsigned char last[sizeof(streamID)];
static unsigned char next[sizeof(streamID)];
raxIterator ri;
long iterations = 0;
if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) {
@ -671,8 +671,11 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime)
ri.node_cb = defragRaxNode;
raxSeek(&ri,"^",NULL,0);
} else {
/* if cursor is non-zero, we seek to the static 'last' */
if (!raxSeek(&ri,">", last, sizeof(last))) {
/* if cursor is non-zero, we seek to the static 'next'.
* Since node_cb is set after seek operation, any node traversed during seek wouldn't
* be defragmented. To prevent this, we advance to next node before exiting previous
* run, ensuring it gets defragmented instead of being skipped during current seek. */
if (!raxSeek(&ri,">=", next, sizeof(next))) {
*cursor = 0;
raxStop(&ri);
return 0;
@ -690,8 +693,15 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime)
server.stat_active_defrag_scanned++;
if (++iterations > 128) {
if (getMonotonicUs() > endtime) {
serverAssert(ri.key_len==sizeof(last));
memcpy(last,ri.key,ri.key_len);
/* Move to next node. */
if (!raxNext(&ri)) {
/* If we reached the end, we can stop */
*cursor = 0;
raxStop(&ri);
return 0;
}
serverAssert(ri.key_len==sizeof(next));
memcpy(next,ri.key,ri.key_len);
raxStop(&ri);
return 1;
}