This commit is contained in:
debing.sun 2025-05-14 09:09:16 +08:00 committed by GitHub
commit 9ac880b0d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 388 additions and 114 deletions

View File

@ -117,6 +117,13 @@ typedef struct {
} defragKeysCtx; } defragKeysCtx;
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
/* Context for hexpires */
typedef struct {
int dbid;
ebuckets hexpires;
unsigned long cursor;
} defragHExpiresCtx;
/* Context for pubsub kvstores */ /* Context for pubsub kvstores */
typedef dict *(*getClientChannelsFn)(client *); typedef dict *(*getClientChannelsFn)(client *);
typedef struct { typedef struct {
@ -200,6 +207,28 @@ hfield activeDefragHfield(hfield hf) {
return NULL; return NULL;
} }
/* Defrag helper for hfield strings and update the reference in the dict.
*
* returns NULL in case the allocation wasn't moved.
* when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
void *activeDefragHfieldAndUpdateRef(void *ptr, void *privdata) {
dict *d = privdata;
dictEntryLink link;
/* Before the key is released, obtain the link to
* ensure we can safely access and update the key. */
dictUseStoredKeyApi(d, 1);
link = dictFindLink(d, ptr, NULL);
serverAssert(link);
dictUseStoredKeyApi(d, 0);
hfield newhf = activeDefragHfield(ptr);
if (newhf)
dictSetKeyAtLink(d, newhf, &link, 0);
return newhf;
}
/* Defrag helper for robj and/or string objects with expected refcount. /* Defrag helper for robj and/or string objects with expected refcount.
* *
* Like activeDefragStringOb, but it requires the caller to pass in the expected * Like activeDefragStringOb, but it requires the caller to pass in the expected
@ -384,18 +413,15 @@ void activeDefragSdsDictCallback(void *privdata, const dictEntry *de, dictEntryL
void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
UNUSED(plink); UNUSED(plink);
dict *d = privdata; dict *d = privdata;
hfield newhf, hf = dictGetKey(de); hfield newhf = NULL, hf = dictGetKey(de);
/* If the hfield does not have TTL, we directly defrag it.
* Fields with TTL are skipped here and will be defragmented later
* during the hash expiry ebuckets defragmentation phase. */
if (hfieldGetExpireTime(hf) == EB_EXPIRE_TIME_INVALID) { if (hfieldGetExpireTime(hf) == EB_EXPIRE_TIME_INVALID) {
/* If the hfield does not have TTL, we directly defrag it. */ if ((newhf = activeDefragHfield(hf)))
newhf = activeDefragHfield(hf); dictSetKey(d, (dictEntry *)de, newhf);
} else {
/* Update its reference in the ebucket while defragging it. */
ebuckets *eb = hashTypeGetDictMetaHFE(d);
newhf = ebDefragItem(eb, &hashFieldExpireBucketsType, hf, (ebDefragFunction *)activeDefragHfield);
} }
if (newhf) dictSetKey(d, (dictEntry *) de, newhf);
} }
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ /* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
@ -428,6 +454,19 @@ void activeDefragHfieldDict(dict *d) {
cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback, cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback,
&defragfns, d); &defragfns, d);
} while (cursor != 0); } while (cursor != 0);
/* Continue with defragmentation of hash fields that have with TTL.
* During the dictionary defragmentaion above, we skipped fields with TTL,
* Now we continue to defrag those fields by using the expiry buckets. */
if (d->type == &mstrHashDictTypeWithHFE) {
cursor = 0;
ebDefragFunctions eb_defragfns = {
.defragAlloc = activeDefragAlloc,
.defragItem = activeDefragHfieldAndUpdateRef
};
ebuckets *eb = hashTypeGetDictMetaHFE(d);
while (ebScanDefrag(eb, &hashFieldExpireBucketsType, &cursor, &eb_defragfns, d)) {}
}
} }
/* Defrag a list of ptr, sds or robj string values */ /* Defrag a list of ptr, sds or robj string values */
@ -555,12 +594,46 @@ void scanLaterSet(robj *ob, unsigned long *cursor) {
void scanLaterHash(robj *ob, unsigned long *cursor) { void scanLaterHash(robj *ob, unsigned long *cursor) {
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
dict *d = ob->ptr; dict *d = ob->ptr;
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc, typedef enum {
.defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */ HASH_DEFRAG_NONE = 0,
.defragVal = (dictDefragAllocFunction *)activeDefragSds HASH_DEFRAG_DICT = 1,
}; HASH_DEFRAG_EBUCKETS = 2
*cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d); } hashDefragPhase;
static hashDefragPhase defrag_phase = HASH_DEFRAG_NONE;
/* Start a new hash defrag. */
if (!*cursor || defrag_phase == HASH_DEFRAG_NONE)
defrag_phase = HASH_DEFRAG_DICT;
/* Defrag hash dictionary but skip TTL fields. */
if (defrag_phase == HASH_DEFRAG_DICT) {
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
.defragVal = (dictDefragAllocFunction *)activeDefragSds
};
*cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d);
/* Move to next phase. */
if (!*cursor) defrag_phase = HASH_DEFRAG_EBUCKETS;
}
/* Defrag ebuckets and TTL fields. */
if (defrag_phase == HASH_DEFRAG_EBUCKETS) {
if (d->type == &mstrHashDictTypeWithHFE) {
ebDefragFunctions eb_defragfns = {
.defragAlloc = activeDefragAlloc,
.defragItem = activeDefragHfieldAndUpdateRef
};
ebuckets *eb = hashTypeGetDictMetaHFE(d);
ebScanDefrag(eb, &hashFieldExpireBucketsType, cursor, &eb_defragfns, d);
} else {
/* Finish defragmentation if this dict doesn't have expired fields. */
*cursor = 0;
}
if (!*cursor) defrag_phase = HASH_DEFRAG_NONE;
}
} }
void defragQuicklist(defragKeysCtx *ctx, kvobj *kv) { void defragQuicklist(defragKeysCtx *ctx, kvobj *kv) {
@ -630,7 +703,8 @@ void defragSet(defragKeysCtx *ctx, kvobj *ob) {
/* Defrag callback for radix tree iterator, called for each node, /* Defrag callback for radix tree iterator, called for each node,
* used in order to defrag the nodes allocations. */ * used in order to defrag the nodes allocations. */
int defragRaxNode(raxNode **noderef) { int defragRaxNode(raxNode **noderef, void *privdata) {
UNUSED(privdata);
raxNode *newnode = activeDefragAlloc(*noderef); raxNode *newnode = activeDefragAlloc(*noderef);
if (newnode) { if (newnode) {
*noderef = newnode; *noderef = newnode;
@ -650,7 +724,7 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime)
raxStart(&ri,s->rax); raxStart(&ri,s->rax);
if (*cursor == 0) { if (*cursor == 0) {
/* if cursor is 0, we start new iteration */ /* if cursor is 0, we start new iteration */
defragRaxNode(&s->rax->head); defragRaxNode(&s->rax->head, NULL);
/* assign the iterator node callback before the seek, so that the /* assign the iterator node callback before the seek, so that the
* initial nodes that are processed till the first item are covered */ * initial nodes that are processed till the first item are covered */
ri.node_cb = defragRaxNode; ri.node_cb = defragRaxNode;
@ -714,7 +788,7 @@ void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_c
rax = *raxref; rax = *raxref;
raxStart(&ri,rax); raxStart(&ri,rax);
ri.node_cb = defragRaxNode; ri.node_cb = defragRaxNode;
defragRaxNode(&rax->head); defragRaxNode(&rax->head, NULL);
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
while (raxNext(&ri)) { while (raxNext(&ri)) {
void *newdata = NULL; void *newdata = NULL;
@ -808,8 +882,9 @@ void defragModule(defragKeysCtx *ctx, redisDb *db, kvobj *kv) {
/* for each key we scan in the main dict, this function will attempt to defrag /* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */ * all the various pointers it has. */
void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) { void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) {
UNUSED(link);
dictEntryLink exlink = NULL; dictEntryLink exlink = NULL;
kvobj *kvnew, *ob = dictGetKV(de); kvobj *kvnew = NULL, *ob = dictGetKV(de);
redisDb *db = &server.db[ctx->dbid]; redisDb *db = &server.db[ctx->dbid];
int slot = ctx->kvstate.slot; int slot = ctx->kvstate.slot;
unsigned char *newzl; unsigned char *newzl;
@ -823,12 +898,9 @@ void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) {
serverAssert(exlink != NULL); serverAssert(exlink != NULL);
} }
/* Try to defrag robj and / or string value. */ /* Try to defrag robj and/or string value. For hash objects with HFEs,
if (unlikely(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) { * defer defragmentation until processing db's hexpires. */
/* Update its reference in the ebucket while defragging it. */ if (!(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) {
kvnew = ebDefragItem(&db->hexpires, &hashExpireBucketsType, ob,
(ebDefragFunction *)activeDefragStringOb);
} else {
/* If the dict doesn't have metadata, we directly defrag it. */ /* If the dict doesn't have metadata, we directly defrag it. */
kvnew = activeDefragStringOb(ob); kvnew = activeDefragStringOb(ob);
} }
@ -1183,6 +1255,61 @@ static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) {
scanCallbackCountScanned, NULL, &defragfns); scanCallbackCountScanned, NULL, &defragfns);
} }
/* Defragment hash object with HFE and update its reference in the DB keys. */
void *activeDefragHExpiresOB(void *ptr, void *privdata) {
redisDb *db = privdata;
dictEntryLink link, exlink = NULL;
kvobj *kvobj = ptr;
sds keystr = kvobjGetKey(kvobj);
unsigned int slot = calculateKeySlot(keystr);
serverAssert(kvobj->type == OBJ_HASH);
long long expire = kvobjGetExpire(kvobj);
/* We can't search in db->expires for that KV after we've released
* the pointer it holds, since it won't be able to do the string
* compare. Search it before, if needed. */
if (expire != -1) {
exlink = kvstoreDictFindLink(db->expires, slot, kvobjGetKey(kvobj), NULL);
serverAssert(exlink != NULL);
}
if ((kvobj = activeDefragAlloc(kvobj))) {
/* Update its reference in the DB keys. */
link = kvstoreDictFindLink(db->keys, slot, keystr, NULL);
serverAssert(link != NULL);
kvstoreDictSetAtLink(db->keys, slot, kvobj, &link, 0);
if (expire != -1)
kvstoreDictSetAtLink(db->expires, slot, kvobj, &exlink, 0);
}
return kvobj;
}
static doneStatus defragStageHExpires(void *ctx, monotime endtime) {
unsigned int iterations = 0;
defragHExpiresCtx *defrag_hexpires_ctx = ctx;
redisDb *db = &server.db[defrag_hexpires_ctx->dbid];
if (db->hexpires != defrag_hexpires_ctx->hexpires) {
/* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */
return DEFRAG_DONE;
}
ebDefragFunctions eb_defragfns = {
.defragAlloc = activeDefragAlloc,
.defragItem = activeDefragHExpiresOB
};
while (1) {
if (!ebScanDefrag(&db->hexpires, &hashExpireBucketsType, &defrag_hexpires_ctx->cursor, &eb_defragfns, db))
return DEFRAG_DONE;
if (++iterations > 16) {
if (getMonotonicUs() >= endtime) break;
iterations = 0;
}
}
return DEFRAG_NOT_DONE;
}
static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) { static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) {
static dictDefragFunctions defragfns = { static dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc, .defragAlloc = activeDefragAlloc,
@ -1510,6 +1637,12 @@ static void beginDefragCycle(void) {
defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires); defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires);
defrag_expires_ctx->dbid = dbid; defrag_expires_ctx->dbid = dbid;
addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx); addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx);
/* Add stage for hexpires. */
defragHExpiresCtx *defrag_hexpires_ctx = zcalloc(sizeof(defragHExpiresCtx));
defrag_hexpires_ctx->hexpires = db->hexpires;
defrag_hexpires_ctx->dbid = dbid;
addDefragStage(defragStageHExpires, zfree, defrag_hexpires_ctx);
} }
/* Add stage for pubsub channels. */ /* Add stage for pubsub channels. */

View File

@ -10,6 +10,7 @@
#include <stddef.h> #include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <inttypes.h> #include <inttypes.h>
#include <string.h>
#include "zmalloc.h" #include "zmalloc.h"
#include "redisassert.h" #include "redisassert.h"
#include "config.h" #include "config.h"
@ -1807,66 +1808,182 @@ void ebValidate(ebuckets eb, EbucketsType *type) {
ebValidateRax(ebGetRaxPtr(eb), type); ebValidateRax(ebGetRaxPtr(eb), type);
} }
/* Reallocates the memory used by the item using the provided allocation function. /* Defrag callback for radix tree iterator, called for each node,
* This feature was added for the active defrag feature. * used in order to defrag the nodes allocations. */
* int ebDefragRaxNode(raxNode **noderef, void *privdata) {
* The 'defragfn' callbacks are called with a pointer to memory that callback ebDefragFunctions *defragfns = privdata;
* can reallocate. The callbacks should return a new memory address or NULL, raxNode *newnode = defragfns->defragAlloc(*noderef);
* where NULL means that no reallocation happened and the old memory is still valid. if (newnode) {
* *noderef = newnode;
* Note: It is the caller's responsibility to ensure that the item has a valid expire time. */ return 1;
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *defragfn) { }
assert(!ebIsEmpty(*eb)); return 0;
if (ebIsList(*eb)) { }
ExpireMeta *prevem = NULL;
eItem curitem = ebGetListPtr(type, *eb); /* Defragments items in list-based bucket. */
while (curitem != NULL) { void ebDefragList(ebuckets *eb, EbucketsType *type, ebDefragFunctions *defragfns, void *privdata) {
if (curitem == item) { ExpireMeta *previtem = NULL;
if ((curitem = defragfn(curitem))) { eItem newitem, curitem = ebGetListPtr(type, *eb);
if (prevem) while (curitem != NULL) {
prevem->next = curitem; if ((newitem = defragfns->defragItem(curitem, privdata))) {
else curitem = newitem;
*eb = ebMarkAsList(curitem); if (previtem) {
} previtem->next = curitem;
return curitem; } else {
} *eb = ebMarkAsList(curitem);
}
/* Move to the next item in the list. */ }
prevem = type->getExpireMeta(curitem); /* Move to the next item in the list. */
curitem = prevem->next; previtem = type->getExpireMeta(curitem);
} curitem = previtem->next;
} else { }
CommonSegHdr *currHdr; }
ExpireMeta *mIter = type->getExpireMeta(item);
assert(mIter->trash != 1); /* Defragments a single bucket in rax, including its segments and items. */
while (mIter->lastInSegment == 0) void ebDefragRaxBucket(EbucketsType *type, raxIterator *ri,
mIter = type->getExpireMeta(mIter->next); ebDefragFunctions *defragfns, void *privdata)
{
if (mIter->lastItemBucket) CommonSegHdr *currentSegHdr = ri->data;
currHdr = (CommonSegHdr *) mIter->next; eItem iter = ((FirstSegHdr*)currentSegHdr)->head;
else ExpireMeta *mHead = type->getExpireMeta(iter);
currHdr = (CommonSegHdr *) ((NextSegHdr *) mIter->next)->prevSeg; ExpireMeta *prevSegLastItem = NULL; /* The last item of the previous segment */
/* If the item is the first in the segment, then update the segment header */
if (currHdr->head == item) { while (1) {
if ((item = defragfn(item))) { unsigned int numItems = mHead->numItems;
currHdr->head = item; assert(numItems); /* Avoid compiler warning with old build chain. */
} ExpireMeta *prevIter = NULL;
return item; ExpireMeta *mIter = NULL;
}
for (unsigned int i = 0; i < numItems; ++i) {
/* Iterate over all items in the segment until the next is 'item' */ eItem newiter = defragfns->defragItem(iter, privdata);
ExpireMeta *mHead = type->getExpireMeta(currHdr->head); if (newiter) {
mIter = mHead; iter = newiter;
while (mIter->next != item)
mIter = type->getExpireMeta(mIter->next); if (prevIter == NULL) {
assert(mIter->next == item); /* If this is the first item in the segment, update the segment
* header to point to the new item location. */
if ((item = defragfn(item))) { currentSegHdr->head = iter;
mIter->next = item; } else {
} /* Update the previous item's next pointer to point to the newly defragmented item */
return item; prevIter->next = iter;
}
}
mIter = type->getExpireMeta(iter);
prevIter = mIter;
iter = mIter->next;
}
/* Try to defragment the current segment. */
CommonSegHdr *newSegHdr = defragfns->defragAlloc(currentSegHdr);
if (newSegHdr) {
if (currentSegHdr == ri->data) {
/* If it's the first segment, update the rax data pointer. */
raxSetData(ri->node, ri->data=newSegHdr);
} else {
/* For non-first segments, update the previous segment's next
* item to new pointer. */
prevSegLastItem->next = newSegHdr;
}
currentSegHdr = newSegHdr;
}
/* Remember last item in this segment for next iteration */
prevSegLastItem = mIter;
if (mIter->lastItemBucket) {
/* The last eitem needs to point back to the segment. */
if (newSegHdr) mIter->next = currentSegHdr;
break;
}
NextSegHdr *nextSegHdr = mIter->next;
if (newSegHdr) {
/* Update next segment's prev to point to the defragmented segment. */
nextSegHdr->prevSeg = newSegHdr;
}
/* Update pointers for next segment iteration */
iter = nextSegHdr->head;
mHead = type->getExpireMeta(iter);
currentSegHdr = (CommonSegHdr *)nextSegHdr;
}
}
/* Defragments items in rax-based bucket.
* returns 0 if no more work needs to be been done, and 1 if more work is needed. */
int ebDefragRax(ebuckets *eb, EbucketsType *type, unsigned long *cursor,
ebDefragFunctions *defragfns, void *privdata)
{
rax *rax = ebGetRaxPtr(*eb);
raxIterator ri;
static unsigned char next[EB_KEY_SIZE];
raxStart(&ri,rax);
if (!*cursor) {
ebDefragRaxNode(&rax->head, defragfns);
/* assign the iterator node callback before the seek, so that the
* initial nodes that are processed till the first item are covered */
ri.node_cb = ebDefragRaxNode;
ri.privdata = defragfns;
raxSeek(&ri, "^", NULL, 0);
} else {
/* if cursor is non-zero, we seek to the static 'next'.
* Since node_cb is set after seek operation, any node traversed during seek wouldn't
* be defragmented. To prevent this, we advance to next node before exiting previous
* run, ensuring it gets defragmented instead of being skipped during current seek. */
if (!raxSeek(&ri, ">=", next, EB_KEY_SIZE)) {
*cursor = 0;
raxStop(&ri);
return 0;
}
/* assign the iterator node callback after the seek, so that the
* initial nodes that are processed till now aren't covered */
ri.node_cb = ebDefragRaxNode;
ri.privdata = defragfns;
}
/* Defrag the bucket in the rax node. */
assert(raxNext(&ri));
ebDefragRaxBucket(type, &ri, defragfns, privdata);
/* Move to next node. */
if (!raxNext(&ri)) {
/* If we reached the end, we can stop. */
*cursor = 0;
raxStop(&ri);
return 0;
}
(*cursor)++;
assert(ri.key_len == sizeof(next));
memcpy(next, ri.key, ri.key_len);
raxStop(&ri);
return 1;
}
/* Reallocates the memory used by ebucket components (segments and items)
* using the provided allocation functions. This feature was added for
* the active defrag feature.
*
* The 'defragfns' callbacks are called with a pointer to memory that callback
* can reallocate. The callbacks should return a new memory address or NULL,
* where NULL means that no reallocation happened and the old memory is still
* valid. */
int ebScanDefrag(ebuckets *eb, EbucketsType *type, unsigned long *cursor,
ebDefragFunctions *defragfns, void *privdata)
{
if (ebIsEmpty(*eb)) {
*cursor = 0;
return 0;
}
if (ebIsList(*eb)) {
ebDefragList(eb, type, defragfns, privdata);
*cursor = 0;
return 0;
} else {
return ebDefragRax(eb, type, cursor, defragfns, privdata);
} }
redis_unreachable();
} }
/* Retrieves the expiration time associated with the given item. If associated /* Retrieves the expiration time associated with the given item. If associated
@ -2166,11 +2283,21 @@ void distributeTest(int lowestTime,
#define UNUSED(x) (void)(x) #define UNUSED(x) (void)(x)
#define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0])) #define ARRAY_SIZE(arr) (sizeof(arr) / sizeof((arr)[0]))
eItem defragCallback(const eItem item) { void *defragCallback(void *ptr) {
size_t size = zmalloc_usable_size(item); size_t size = zmalloc_usable_size(ptr);
eItem newitem = zmalloc(size); void *newitem = zmalloc(size);
memcpy(newitem, item, size); memcpy(newitem, ptr, size);
zfree(item); zfree(ptr);
return newitem;
}
void *defragItemCallback(void *ptr, void *privdata) {
MyItem *item = ptr;
MyItem **items = privdata;
int index = item->index;
void *newitem = defragCallback(ptr);
if (newitem)
items[index] = newitem;
return newitem; return newitem;
} }
@ -2561,10 +2688,12 @@ int ebucketsTest(int argc, char **argv, int flags) {
} }
assert((s <= EB_LIST_MAX_ITEMS) ? ebIsList(eb) : !ebIsList(eb)); assert((s <= EB_LIST_MAX_ITEMS) ? ebIsList(eb) : !ebIsList(eb));
/* Defrag all the items. */ /* Defrag all the items. */
for (int i = 0; i < s; i++) { unsigned long cursor = 0;
MyItem *newitem = ebDefragItem(&eb, &myEbucketsType, items[i], defragCallback); ebDefragFunctions defragfns = {
if (newitem) items[i] = newitem; .defragAlloc = defragCallback,
} .defragItem = defragItemCallback,
};
while (ebScanDefrag(&eb, &myEbucketsType, &cursor, &defragfns, items)) {}
/* Verify that the data is not corrupted. */ /* Verify that the data is not corrupted. */
ebValidate(eb, &myEbucketsType); ebValidate(eb, &myEbucketsType);
for (int i = 0; i < s; i++) for (int i = 0; i < s; i++)

View File

@ -270,6 +270,13 @@ typedef struct EbucketsIterator {
uint64_t itemsCurrBucket; /* Number of items in current bucket. */ uint64_t itemsCurrBucket; /* Number of items in current bucket. */
} EbucketsIterator; } EbucketsIterator;
typedef void *(ebDefragAllocFunction)(void *ptr);
typedef void *(ebDefragAllocItemFunction)(void *ptr, void *privdata);
typedef struct {
ebDefragAllocFunction *defragAlloc; /* Used for rax nodes, segment etc. */
ebDefragAllocItemFunction *defragItem; /* Defrag-realloc eitem */
} ebDefragFunctions;
/* ebuckets API */ /* ebuckets API */
static inline ebuckets ebCreate(void) { return NULL; } /* Empty ebuckets */ static inline ebuckets ebCreate(void) { return NULL; } /* Empty ebuckets */
@ -304,8 +311,8 @@ int ebNext(EbucketsIterator *iter);
int ebNextBucket(EbucketsIterator *iter); int ebNextBucket(EbucketsIterator *iter);
typedef eItem (ebDefragFunction)(const eItem item); int ebScanDefrag(ebuckets *eb, EbucketsType *type, unsigned long *cursor,
eItem ebDefragItem(ebuckets *eb, EbucketsType *type, eItem item, ebDefragFunction *fn); ebDefragFunctions *defragfns, void *privdata);
static inline uint64_t ebGetMetaExpTime(ExpireMeta *expMeta) { static inline uint64_t ebGetMetaExpTime(ExpireMeta *expMeta) {
return (((uint64_t)(expMeta)->expireTimeHi << 32) | (expMeta)->expireTimeLo); return (((uint64_t)(expMeta)->expireTimeHi << 32) | (expMeta)->expireTimeLo);

View File

@ -13996,7 +13996,8 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo
/* Defrag callback for radix tree iterator, called for each node, /* Defrag callback for radix tree iterator, called for each node,
* used in order to defrag the nodes allocations. */ * used in order to defrag the nodes allocations. */
int moduleDefragRaxNode(raxNode **noderef) { int moduleDefragRaxNode(raxNode **noderef, void *privdata) {
UNUSED(privdata);
raxNode *newnode = activeDefragAlloc(*noderef); raxNode *newnode = activeDefragAlloc(*noderef);
if (newnode) { if (newnode) {
*noderef = newnode; *noderef = newnode;
@ -14034,7 +14035,7 @@ RedisModuleDict *RM_DefragRedisModuleDict(RedisModuleDefragCtx *ctx, RedisModule
raxStart(&ri,dict->rax); raxStart(&ri,dict->rax);
if (*seekTo == NULL) { if (*seekTo == NULL) {
/* if last seek is NULL, we start new iteration */ /* if last seek is NULL, we start new iteration */
moduleDefragRaxNode(&dict->rax->head); moduleDefragRaxNode(&dict->rax->head, NULL);
/* assign the iterator node callback before the seek, so that the /* assign the iterator node callback before the seek, so that the
* initial nodes that are processed till the first item are covered */ * initial nodes that are processed till the first item are covered */
ri.node_cb = moduleDefragRaxNode; ri.node_cb = moduleDefragRaxNode;

View File

@ -1270,6 +1270,7 @@ void raxStart(raxIterator *it, rax *rt) {
it->key_max = RAX_ITER_STATIC_LEN; it->key_max = RAX_ITER_STATIC_LEN;
it->data = NULL; it->data = NULL;
it->node_cb = NULL; it->node_cb = NULL;
it->privdata = NULL;
raxStackInit(&it->stack); raxStackInit(&it->stack);
} }
@ -1346,7 +1347,7 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
memcpy(&it->node,cp,sizeof(it->node)); memcpy(&it->node,cp,sizeof(it->node));
/* Call the node callback if any, and replace the node pointer /* Call the node callback if any, and replace the node pointer
* if the callback returns true. */ * if the callback returns true. */
if (it->node_cb && it->node_cb(&it->node)) if (it->node_cb && it->node_cb(&it->node, it->privdata))
memcpy(cp,&it->node,sizeof(it->node)); memcpy(cp,&it->node,sizeof(it->node));
/* For "next" step, stop every time we find a key along the /* For "next" step, stop every time we find a key along the
* way, since the key is lexicographically smaller compared to * way, since the key is lexicographically smaller compared to
@ -1402,7 +1403,7 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
memcpy(&it->node,cp,sizeof(it->node)); memcpy(&it->node,cp,sizeof(it->node));
/* Call the node callback if any, and replace the node /* Call the node callback if any, and replace the node
* pointer if the callback returns true. */ * pointer if the callback returns true. */
if (it->node_cb && it->node_cb(&it->node)) if (it->node_cb && it->node_cb(&it->node, it->privdata))
memcpy(cp,&it->node,sizeof(it->node)); memcpy(cp,&it->node,sizeof(it->node));
if (it->node->iskey) { if (it->node->iskey) {
it->data = raxGetData(it->node); it->data = raxGetData(it->node);

View File

@ -143,7 +143,7 @@ typedef struct raxStack {
* Redis application for this callback). * Redis application for this callback).
* *
* This is currently only supported in forward iterations (raxNext) */ * This is currently only supported in forward iterations (raxNext) */
typedef int (*raxNodeCallback)(raxNode **noderef); typedef int (*raxNodeCallback)(raxNode **noderef, void *privdata);
/* Radix tree iterator state is encapsulated into this data structure. */ /* Radix tree iterator state is encapsulated into this data structure. */
#define RAX_ITER_STATIC_LEN 128 #define RAX_ITER_STATIC_LEN 128
@ -164,6 +164,7 @@ typedef struct raxIterator {
raxNode *node; /* Current node. Only for unsafe iteration. */ raxNode *node; /* Current node. Only for unsafe iteration. */
raxStack stack; /* Stack used for unsafe iteration. */ raxStack stack; /* Stack used for unsafe iteration. */
raxNodeCallback node_cb; /* Optional node callback. Normally set to NULL. */ raxNodeCallback node_cb; /* Optional node callback. Normally set to NULL. */
void *privdata; /* Optional private data for node callback. */
} raxIterator; } raxIterator;
/* Exported API. */ /* Exported API. */

View File

@ -297,7 +297,11 @@ run_solo {defrag} {
r config set maxmemory 0 r config set maxmemory 0
r config set list-max-ziplist-size 5 ;# list of 10k items will have 2000 quicklist nodes r config set list-max-ziplist-size 5 ;# list of 10k items will have 2000 quicklist nodes
r config set stream-node-max-entries 5 r config set stream-node-max-entries 5
r hmset hash h1 v1 h2 v2 h3 v3 r config set hash-max-listpack-entries 10
r hmset hash_lp h1 v1 h2 v2 h3 v3
assert_encoding listpack hash_lp
r hmset hash_ht h1 v1 h2 v2 h3 v3 h4 v4 h5 v5 h6 v6 h7 v7 h8 v8 h9 v9 h10 v10 h11 v11
assert_encoding hashtable hash_ht
r lpush list a b c d r lpush list a b c d
r zadd zset 0 a 1 b 2 c 3 d r zadd zset 0 a 1 b 2 c 3 d
r sadd set a b c d r sadd set a b c d
@ -347,7 +351,7 @@ run_solo {defrag} {
for {set j 0} {$j < 500000} {incr j} { for {set j 0} {$j < 500000} {incr j} {
$rd read ; # Discard replies $rd read ; # Discard replies
} }
assert_equal [r dbsize] 500015 assert_equal [r dbsize] 500016
# create some fragmentation # create some fragmentation
for {set j 0} {$j < 500000} {incr j 2} { for {set j 0} {$j < 500000} {incr j 2} {
@ -356,7 +360,7 @@ run_solo {defrag} {
for {set j 0} {$j < 500000} {incr j 2} { for {set j 0} {$j < 500000} {incr j 2} {
$rd read ; # Discard replies $rd read ; # Discard replies
} }
assert_equal [r dbsize] 250015 assert_equal [r dbsize] 250016
# start defrag # start defrag
after 120 ;# serverCron only updates the info once in 100ms after 120 ;# serverCron only updates the info once in 100ms
@ -512,15 +516,14 @@ run_solo {defrag} {
$rd_pubsub close $rd_pubsub close
} }
test "Active Defrag HFE: $type" { foreach {eb_container fields n} {eblist 16 3000 ebrax 300 160} {
test "Active Defrag HFE with $eb_container: $type" {
r flushdb r flushdb
r config set hz 100 r config set hz 100
r config set activedefrag no r config set activedefrag no
wait_for_defrag_stop 500 100 wait_for_defrag_stop 500 100
r config resetstat r config resetstat
# TODO: Lower the threshold after defraging the ebuckets. r config set active-defrag-threshold-lower 8
# Now just to ensure that the reference is updated correctly.
r config set active-defrag-threshold-lower 12
r config set active-defrag-cycle-min 65 r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75 r config set active-defrag-cycle-max 75
r config set active-defrag-ignore-bytes 1500kb r config set active-defrag-ignore-bytes 1500kb
@ -529,14 +532,12 @@ run_solo {defrag} {
r config set hash-max-listpack-entries 10 r config set hash-max-listpack-entries 10
# Populate memory with interleaving hash field of same size # Populate memory with interleaving hash field of same size
set n 3000
set fields 16 ;# make all the fields in an eblist.
set dummy_field "[string repeat x 400]" set dummy_field "[string repeat x 400]"
set rd [redis_deferring_client] set rd [redis_deferring_client]
for {set i 0} {$i < $n} {incr i} { for {set i 0} {$i < $n} {incr i} {
for {set j 0} {$j < $fields} {incr j} { for {set j 0} {$j < $fields} {incr j} {
$rd hset h$i f$j $dummy_field $rd hset h$i $dummy_field$j v
$rd hexpire h$i 9999999 FIELDS 1 f$j $rd hexpire h$i 9999999 FIELDS 1 $dummy_field$j
$rd set "k$i$j" $dummy_field $rd set "k$i$j" $dummy_field
} }
} }
@ -547,8 +548,8 @@ run_solo {defrag} {
} }
# Coverage for listpackex. # Coverage for listpackex.
r hset h_lpex f0 $dummy_field r hset h_lpex $dummy_field v
r hexpire h_lpex 9999999 FIELDS 1 f0 r hexpire h_lpex 9999999 FIELDS 1 $dummy_field
assert_encoding listpackex h_lpex assert_encoding listpackex h_lpex
after 120 ;# serverCron only updates the info once in 100ms after 120 ;# serverCron only updates the info once in 100ms
@ -591,7 +592,7 @@ run_solo {defrag} {
} }
# wait for the active defrag to stop working # wait for the active defrag to stop working
wait_for_defrag_stop 500 100 1.5 wait_for_defrag_stop 500 100 1.08
# test the fragmentation is lower # test the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms after 120 ;# serverCron only updates the info once in 100ms
@ -603,6 +604,7 @@ run_solo {defrag} {
} }
} }
} }
} ;# end of foreach
test "Active defrag for argv retained by the main thread from IO thread: $type" { test "Active defrag for argv retained by the main thread from IO thread: $type" {
r flushdb r flushdb