Fix defrag when type/encoding changes during scan (#13883)
CI / build-32bit (push) Failing after 32s Details
CI / test-sanitizer-address (push) Failing after 32s Details
CI / build-libc-malloc (push) Failing after 32s Details
CI / build-debian-old (push) Failing after 32s Details
CI / build-centos-jemalloc (push) Failing after 31s Details
CI / build-old-chain-jemalloc (push) Failing after 31s Details
Codecov / code-coverage (push) Failing after 31s Details
CI / test-ubuntu-latest (push) Failing after 2m7s Details
Spellcheck / Spellcheck (push) Failing after 31s Details
CI / build-macos-latest (push) Has been cancelled Details
External Server Tests / test-external-standalone (push) Failing after 31s Details
Coverity Scan / coverity (push) Has been skipped Details
External Server Tests / test-external-cluster (push) Failing after 32s Details
External Server Tests / test-external-nodebug (push) Failing after 31s Details

This PR is based on: https://github.com/valkey-io/valkey/pull/1801

[SoftlyRaining](https://github.com/SoftlyRaining) was hunting for defrag
bugs with Jim and found a couple of improvements to make. Jim pointed
out that in several of the callbacks, if the encoding were to change it
simply returns without doing anything to `cursor` to make it reach 0,
meaning that it would continue no-op working on that item without making
any progress. Type and encoding can change while the defrag scan is in
progress if the value is mutated or replaced by something else with the
same key.

---------

Signed-off-by: Rain Valentine <rsg000@gmail.com>
Co-authored-by: Rain Valentine <rsg000@gmail.com>
This commit is contained in:
debing.sun 2025-03-27 08:58:57 +08:00 committed by GitHub
parent a0da8390a2
commit 87d8e71708
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 11 additions and 18 deletions

View File

@ -482,8 +482,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) {
quicklistNode *node;
long iterations = 0;
int bookmark_failed = 0;
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
return 0;
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
@ -532,8 +531,7 @@ void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
}
void scanLaterZset(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
return;
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
zset *zs = (zset*)ob->ptr;
dict *d = zs->dict;
scanLaterZsetData data = {zs};
@ -549,8 +547,7 @@ void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
}
void scanLaterSet(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT)
return;
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
dict *d = ob->ptr;
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
@ -560,8 +557,7 @@ void scanLaterSet(robj *ob, unsigned long *cursor) {
}
void scanLaterHash(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT)
return;
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
dict *d = ob->ptr;
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
@ -656,10 +652,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime)
static unsigned char next[sizeof(streamID)];
raxIterator ri;
long iterations = 0;
if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) {
*cursor = 0;
return 0;
}
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
stream *s = ob->ptr;
raxStart(&ri,s->rax);
@ -1005,22 +998,22 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) {
int defragLaterItem(dictEntry *de, unsigned long *cursor, monotime endtime, int dbid) {
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
if (ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST) {
return scanLaterList(ob, cursor, endtime);
} else if (ob->type == OBJ_SET) {
} else if (ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT) {
scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET) {
} else if (ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST) {
scanLaterZset(ob, cursor);
} else if (ob->type == OBJ_HASH) {
} else if (ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT) {
scanLaterHash(ob, cursor);
} else if (ob->type == OBJ_STREAM) {
} else if (ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM) {
return scanLaterStreamListpacks(ob, cursor, endtime);
} else if (ob->type == OBJ_MODULE) {
robj keyobj;
initStaticStringObject(keyobj, dictGetKey(de));
return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid);
} else {
*cursor = 0; /* object type may have changed since we schedule it for later */
*cursor = 0; /* object type/encoding may have changed since we schedule it for later */
}
} else {
*cursor = 0; /* object may have been deleted already */