mirror of https://mirror.osredm.com/root/redis.git
Merge 73f510222b
into d86cf66101
This commit is contained in:
commit
abc5962e15
636
src/db.c
636
src/db.c
|
@ -1286,9 +1286,16 @@ void keysCommand(client *c) {
|
||||||
setDeferredArrayLen(c,replylen,numkeys);
|
setDeferredArrayLen(c,replylen,numkeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define SCAN_KEYS_INITIAL_CAPACITY 64
|
||||||
|
|
||||||
/* Data used by the dict scan callback. */
|
/* Data used by the dict scan callback. */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
list *keys; /* elements that collect from dict */
|
/* Dynamic array for all scan optimizations */
|
||||||
|
void **array_items; /* pointer to items array (stack or heap allocated) */
|
||||||
|
int array_count; /* number of items collected */
|
||||||
|
int array_capacity; /* current capacity of items array */
|
||||||
|
void **array_stack_buffer; /* reference to original stack buffer for cleanup check */
|
||||||
|
int array_stores_pairs; /* 1 if storing key-value pairs, 0 if single items */
|
||||||
robj *o; /* o must be a hash/set/zset object, NULL means current db */
|
robj *o; /* o must be a hash/set/zset object, NULL means current db */
|
||||||
long long type; /* the particular type when scan the db */
|
long long type; /* the particular type when scan the db */
|
||||||
sds pattern; /* pattern string, NULL means no pattern */
|
sds pattern; /* pattern string, NULL means no pattern */
|
||||||
|
@ -1299,6 +1306,28 @@ typedef struct {
|
||||||
redisDb *db; /* database reference for expiration checks */
|
redisDb *db; /* database reference for expiration checks */
|
||||||
} scanData;
|
} scanData;
|
||||||
|
|
||||||
|
/* Helper function to add an item to the scan results (for all scan commands) */
|
||||||
|
static void scanItemsPush(scanData *data, void *item) {
|
||||||
|
if (unlikely(data->array_count >= data->array_capacity)) {
|
||||||
|
/* Need to grow the array */
|
||||||
|
int new_capacity = data->array_capacity * 2;
|
||||||
|
|
||||||
|
if (data->array_items == data->array_stack_buffer) {
|
||||||
|
/* First time growing from stack buffer - need to allocate and copy */
|
||||||
|
void **new_items = zmalloc(new_capacity * sizeof(void*));
|
||||||
|
memcpy(new_items, data->array_items, data->array_count * sizeof(void*));
|
||||||
|
data->array_items = new_items;
|
||||||
|
} else {
|
||||||
|
/* Already heap allocated - use zrealloc */
|
||||||
|
data->array_items = zrealloc(data->array_items, new_capacity * sizeof(void*));
|
||||||
|
}
|
||||||
|
|
||||||
|
data->array_capacity = new_capacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
data->array_items[data->array_count++] = item;
|
||||||
|
}
|
||||||
|
|
||||||
/* Helper function to compare key type in scan commands */
|
/* Helper function to compare key type in scan commands */
|
||||||
int objectTypeCompare(robj *o, long long target) {
|
int objectTypeCompare(robj *o, long long target) {
|
||||||
if (o->type != OBJ_MODULE) {
|
if (o->type != OBJ_MODULE) {
|
||||||
|
@ -1319,7 +1348,6 @@ int objectTypeCompare(robj *o, long long target) {
|
||||||
void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
|
void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
|
||||||
UNUSED(plink);
|
UNUSED(plink);
|
||||||
scanData *data = (scanData *)privdata;
|
scanData *data = (scanData *)privdata;
|
||||||
list *keys = data->keys;
|
|
||||||
robj *o = data->o;
|
robj *o = data->o;
|
||||||
sds val = NULL;
|
sds val = NULL;
|
||||||
void *key = NULL; /* if OBJ_HASH then key is of type `hfield`. Otherwise, `sds` */
|
void *key = NULL; /* if OBJ_HASH then key is of type `hfield`. Otherwise, `sds` */
|
||||||
|
@ -1360,28 +1388,47 @@ void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (o == NULL) {
|
if (o == NULL) {
|
||||||
key = keyStr;
|
/* SCAN command - single keys */
|
||||||
|
data->array_stores_pairs = 0;
|
||||||
|
scanItemsPush(data, keyStr);
|
||||||
|
return;
|
||||||
} else if (o->type == OBJ_SET) {
|
} else if (o->type == OBJ_SET) {
|
||||||
key = keyStr;
|
/* SSCAN command - single members */
|
||||||
|
data->array_stores_pairs = 0;
|
||||||
|
scanItemsPush(data, keyStr);
|
||||||
|
return;
|
||||||
} else if (o->type == OBJ_HASH) {
|
} else if (o->type == OBJ_HASH) {
|
||||||
key = keyStr;
|
key = keyStr;
|
||||||
val = dictGetVal(de);
|
val = dictGetVal(de);
|
||||||
|
|
||||||
/* If field is expired, then ignore */
|
/* HSCAN command - field-value pairs */
|
||||||
if (hfieldIsExpired(key))
|
if (hfieldIsExpired(keyStr))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
if (data->no_values) {
|
||||||
|
data->array_stores_pairs = 0;
|
||||||
|
scanItemsPush(data, key);
|
||||||
|
} else {
|
||||||
|
data->array_stores_pairs = 1;
|
||||||
|
scanItemsPush(data, key);
|
||||||
|
scanItemsPush(data, val);
|
||||||
|
}
|
||||||
|
return;
|
||||||
} else if (o->type == OBJ_ZSET) {
|
} else if (o->type == OBJ_ZSET) {
|
||||||
|
/* ZSCAN command - member-score pairs */
|
||||||
char buf[MAX_LONG_DOUBLE_CHARS];
|
char buf[MAX_LONG_DOUBLE_CHARS];
|
||||||
int len = ld2string(buf, sizeof(buf), *(double *)dictGetVal(de), LD_STR_AUTO);
|
int len = ld2string(buf, sizeof(buf), *(double *)dictGetVal(de), LD_STR_AUTO);
|
||||||
|
|
||||||
key = sdsdup(keyStr);
|
key = sdsdup(keyStr);
|
||||||
val = sdsnewlen(buf, len);
|
val = sdsnewlen(buf, len);
|
||||||
|
|
||||||
|
data->array_stores_pairs = 1;
|
||||||
|
scanItemsPush(data, key);
|
||||||
|
scanItemsPush(data, val);
|
||||||
|
return;
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Type not handled in SCAN callback.");
|
serverPanic("Type not handled in SCAN callback.");
|
||||||
}
|
}
|
||||||
|
|
||||||
listAddNodeTail(keys, key);
|
|
||||||
if (val && !data->no_values) listAddNodeTail(keys, val);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Try to parse a SCAN cursor stored at object 'o':
|
/* Try to parse a SCAN cursor stored at object 'o':
|
||||||
|
@ -1436,330 +1483,309 @@ char *getObjectTypeName(robj *o) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This command implements SCAN, HSCAN and SSCAN commands.
|
/* Parse SCAN command options (COUNT, MATCH, TYPE, NOVALUES).
|
||||||
* If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
|
* Does NOT parse cursor - that should be done separately.
|
||||||
* if 'o' is NULL the command will operate on the dictionary associated with
|
* Returns C_OK on success, C_ERR on error (reply already sent). */
|
||||||
* the current database.
|
int parseScanOptionsOrReply(client *c, robj *o, int start_argc, scanOptions *opts) {
|
||||||
*
|
int i = start_argc;
|
||||||
* When 'o' is not NULL the function assumes that the first argument in
|
int j;
|
||||||
* the client arguments vector is a key so it skips it before iterating
|
|
||||||
* in order to parse options.
|
|
||||||
*
|
|
||||||
* In the case of a Hash object the function returns both the field and value
|
|
||||||
* of every element on the Hash. */
|
|
||||||
void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
|
|
||||||
int isKeysHfield = 0;
|
|
||||||
int i, j;
|
|
||||||
listNode *node;
|
|
||||||
long count = 10;
|
|
||||||
sds pat = NULL;
|
|
||||||
sds typename = NULL;
|
|
||||||
long long type = LLONG_MAX;
|
|
||||||
int patlen = 0, use_pattern = 0, no_values = 0;
|
|
||||||
dict *ht;
|
|
||||||
|
|
||||||
/* Object must be NULL (to iterate keys names), or the type of the object
|
/* Initialize option defaults (cursor should already be set) */
|
||||||
* must be Set, Sorted Set, or Hash. */
|
opts->count = 10;
|
||||||
serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
|
opts->pattern = NULL;
|
||||||
o->type == OBJ_ZSET);
|
opts->patlen = 0;
|
||||||
|
opts->use_pattern = 0;
|
||||||
|
opts->type = LLONG_MAX;
|
||||||
|
opts->typename = NULL;
|
||||||
|
opts->no_values = 0;
|
||||||
|
|
||||||
/* Set i to the first option argument. The previous one is the cursor. */
|
/* Parse options starting from start_argc */
|
||||||
i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
|
|
||||||
|
|
||||||
/* Step 1: Parse options. */
|
|
||||||
while (i < c->argc) {
|
while (i < c->argc) {
|
||||||
j = c->argc - i;
|
j = c->argc - i;
|
||||||
if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
|
if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
|
||||||
if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
|
if (getLongFromObjectOrReply(c, c->argv[i+1], &opts->count, NULL) != C_OK) {
|
||||||
!= C_OK)
|
return C_ERR;
|
||||||
{
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
if (opts->count < 1) {
|
||||||
if (count < 1) {
|
addReplyErrorObject(c, shared.syntaxerr);
|
||||||
addReplyErrorObject(c,shared.syntaxerr);
|
return C_ERR;
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
i += 2;
|
i += 2;
|
||||||
} else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
|
} else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
|
||||||
pat = c->argv[i+1]->ptr;
|
opts->pattern = c->argv[i+1]->ptr;
|
||||||
patlen = sdslen(pat);
|
opts->patlen = sdslen(opts->pattern);
|
||||||
|
|
||||||
/* The pattern always matches if it is exactly "*", so it is
|
/* The pattern always matches if it is exactly "*", so it is
|
||||||
* equivalent to disabling it. */
|
* equivalent to disabling it. */
|
||||||
use_pattern = !(patlen == 1 && pat[0] == '*');
|
opts->use_pattern = !(opts->patlen == 1 && opts->pattern[0] == '*');
|
||||||
|
|
||||||
i += 2;
|
i += 2;
|
||||||
} else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
|
} else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
|
||||||
/* SCAN for a particular type only applies to the db dict */
|
/* SCAN for a particular type only applies to the db dict */
|
||||||
typename = c->argv[i+1]->ptr;
|
opts->typename = c->argv[i+1]->ptr;
|
||||||
type = getObjectTypeByName(typename);
|
opts->type = getObjectTypeByName(opts->typename);
|
||||||
if (type == LLONG_MAX) {
|
if (opts->type == LLONG_MAX) {
|
||||||
/* TODO: uncomment in redis 8.0
|
/* TODO: uncomment in redis 8.0
|
||||||
addReplyErrorFormat(c, "unknown type name '%s'", typename);
|
addReplyErrorFormat(c, "unknown type name '%s'", opts->typename);
|
||||||
return; */
|
return C_ERR; */
|
||||||
}
|
}
|
||||||
i+= 2;
|
i += 2;
|
||||||
} else if (!strcasecmp(c->argv[i]->ptr, "novalues")) {
|
} else if (!strcasecmp(c->argv[i]->ptr, "novalues")) {
|
||||||
if (!o || o->type != OBJ_HASH) {
|
if (!o || o->type != OBJ_HASH) {
|
||||||
addReplyError(c, "NOVALUES option can only be used in HSCAN");
|
addReplyError(c, "NOVALUES option can only be used in HSCAN");
|
||||||
return;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
no_values = 1;
|
opts->no_values = 1;
|
||||||
i++;
|
i++;
|
||||||
} else {
|
} else {
|
||||||
addReplyErrorObject(c,shared.syntaxerr);
|
addReplyErrorObject(c, shared.syntaxerr);
|
||||||
return;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Step 2: Iterate the collection.
|
return C_OK;
|
||||||
*
|
|
||||||
* Note that if the object is encoded with a listpack, intset, or any other
|
|
||||||
* representation that is not a hash table, we are sure that it is also
|
|
||||||
* composed of a small number of elements. So to avoid taking state we
|
|
||||||
* just return everything inside the object in a single call, setting the
|
|
||||||
* cursor to zero to signal the end of the iteration. */
|
|
||||||
|
|
||||||
/* Handle the case of a hash table. */
|
|
||||||
ht = NULL;
|
|
||||||
if (o == NULL) {
|
|
||||||
ht = NULL;
|
|
||||||
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
|
|
||||||
ht = o->ptr;
|
|
||||||
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
|
|
||||||
isKeysHfield = 1;
|
|
||||||
ht = o->ptr;
|
|
||||||
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
|
|
||||||
zset *zs = o->ptr;
|
|
||||||
ht = zs->dict;
|
|
||||||
}
|
|
||||||
|
|
||||||
list *keys = listCreate();
|
|
||||||
/* Set a free callback for the contents of the collected keys list.
|
|
||||||
* For the main keyspace dict, and when we scan a key that's dict encoded
|
|
||||||
* (we have 'ht'), we don't need to define free method because the strings
|
|
||||||
* in the list are just a shallow copy from the pointer in the dictEntry.
|
|
||||||
* When scanning a key with other encodings (e.g. listpack), we need to
|
|
||||||
* free the temporary strings we add to that list.
|
|
||||||
* The exception to the above is ZSET, where we do allocate temporary
|
|
||||||
* strings even when scanning a dict. */
|
|
||||||
if (o && (!ht || o->type == OBJ_ZSET)) {
|
|
||||||
listSetFreeMethod(keys, sdsfreegeneric);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* For main dictionary scan or data structure using hashtable. */
|
|
||||||
if (!o || ht) {
|
|
||||||
/* We set the max number of iterations to ten times the specified
|
|
||||||
* COUNT, so if the hash table is in a pathological state (very
|
|
||||||
* sparsely populated) we avoid to block too much time at the cost
|
|
||||||
* of returning no or very few elements. */
|
|
||||||
long maxiterations = count*10;
|
|
||||||
|
|
||||||
/* We pass scanData which have three pointers to the callback:
|
|
||||||
* 1. data.keys: the list to which it will add new elements;
|
|
||||||
* 2. data.o: the object containing the dictionary so that
|
|
||||||
* it is possible to fetch more data in a type-dependent way;
|
|
||||||
* 3. data.type: the specified type scan in the db, LLONG_MAX means
|
|
||||||
* type matching is no needed;
|
|
||||||
* 4. data.pattern: the pattern string;
|
|
||||||
* 5. data.sampled: the maxiteration limit is there in case we're
|
|
||||||
* working on an empty dict, one with a lot of empty buckets, and
|
|
||||||
* for the buckets are not empty, we need to limit the spampled number
|
|
||||||
* to prevent a long hang time caused by filtering too many keys;
|
|
||||||
* 6. data.no_values: to control whether values will be returned or
|
|
||||||
* only keys are returned. */
|
|
||||||
scanData data = {
|
|
||||||
.keys = keys,
|
|
||||||
.o = o,
|
|
||||||
.type = type,
|
|
||||||
.pattern = use_pattern ? pat : NULL,
|
|
||||||
.sampled = 0,
|
|
||||||
.no_values = no_values,
|
|
||||||
.strlen = (isKeysHfield) ? hfieldlen : sdslen,
|
|
||||||
.typename = typename,
|
|
||||||
.db = c->db,
|
|
||||||
};
|
|
||||||
|
|
||||||
/* A pattern may restrict all matching keys to one cluster slot. */
|
|
||||||
int onlydidx = -1;
|
|
||||||
if (o == NULL && use_pattern && server.cluster_enabled) {
|
|
||||||
onlydidx = patternHashSlot(pat, patlen);
|
|
||||||
}
|
|
||||||
do {
|
|
||||||
/* In cluster mode there is a separate dictionary for each slot.
|
|
||||||
* If cursor is empty, we should try exploring next non-empty slot. */
|
|
||||||
if (o == NULL) {
|
|
||||||
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, NULL, &data);
|
|
||||||
} else {
|
|
||||||
cursor = dictScan(ht, cursor, scanCallback, &data);
|
|
||||||
}
|
|
||||||
} while (cursor && maxiterations-- && data.sampled < count);
|
|
||||||
} else if (o->type == OBJ_SET) {
|
|
||||||
unsigned long array_reply_len = 0;
|
|
||||||
void *replylen = NULL;
|
|
||||||
listRelease(keys);
|
|
||||||
char *str;
|
|
||||||
char buf[LONG_STR_SIZE];
|
|
||||||
size_t len;
|
|
||||||
int64_t llele;
|
|
||||||
/* Reply to the client. */
|
|
||||||
addReplyArrayLen(c, 2);
|
|
||||||
/* Cursor is always 0 given we iterate over all set */
|
|
||||||
addReplyBulkLongLong(c,0);
|
|
||||||
/* If there is no pattern the length is the entire set size, otherwise we defer the reply size */
|
|
||||||
if (use_pattern)
|
|
||||||
replylen = addReplyDeferredLen(c);
|
|
||||||
else {
|
|
||||||
array_reply_len = setTypeSize(o);
|
|
||||||
addReplyArrayLen(c, array_reply_len);
|
|
||||||
}
|
|
||||||
|
|
||||||
setTypeIterator *si = setTypeInitIterator(o);
|
|
||||||
unsigned long cur_length = 0;
|
|
||||||
while (setTypeNext(si, &str, &len, &llele) != -1) {
|
|
||||||
if (str == NULL) {
|
|
||||||
len = ll2string(buf, sizeof(buf), llele);
|
|
||||||
}
|
|
||||||
char *key = str ? str : buf;
|
|
||||||
if (use_pattern && !stringmatchlen(pat, patlen, key, len, 0)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
addReplyBulkCBuffer(c, key, len);
|
|
||||||
cur_length++;
|
|
||||||
}
|
|
||||||
setTypeReleaseIterator(si);
|
|
||||||
if (use_pattern)
|
|
||||||
setDeferredArrayLen(c,replylen,cur_length);
|
|
||||||
else
|
|
||||||
serverAssert(cur_length == array_reply_len); /* fail on corrupt data */
|
|
||||||
return;
|
|
||||||
} else if ((o->type == OBJ_HASH || o->type == OBJ_ZSET) &&
|
|
||||||
o->encoding == OBJ_ENCODING_LISTPACK)
|
|
||||||
{
|
|
||||||
unsigned char *p = lpFirst(o->ptr);
|
|
||||||
unsigned char *str;
|
|
||||||
int64_t len;
|
|
||||||
unsigned long array_reply_len = 0;
|
|
||||||
unsigned char intbuf[LP_INTBUF_SIZE];
|
|
||||||
void *replylen = NULL;
|
|
||||||
listRelease(keys);
|
|
||||||
|
|
||||||
/* Reply to the client. */
|
|
||||||
addReplyArrayLen(c, 2);
|
|
||||||
/* Cursor is always 0 given we iterate over all set */
|
|
||||||
addReplyBulkLongLong(c,0);
|
|
||||||
/* If there is no pattern the length is the entire set size, otherwise we defer the reply size */
|
|
||||||
if (use_pattern)
|
|
||||||
replylen = addReplyDeferredLen(c);
|
|
||||||
else {
|
|
||||||
array_reply_len = o->type == OBJ_HASH ? hashTypeLength(o, 0) : zsetLength(o);
|
|
||||||
if (!no_values) {
|
|
||||||
array_reply_len *= 2;
|
|
||||||
}
|
|
||||||
addReplyArrayLen(c, array_reply_len);
|
|
||||||
}
|
|
||||||
unsigned long cur_length = 0;
|
|
||||||
while(p) {
|
|
||||||
str = lpGet(p, &len, intbuf);
|
|
||||||
/* point to the value */
|
|
||||||
p = lpNext(o->ptr, p);
|
|
||||||
if (use_pattern && !stringmatchlen(pat, patlen, (char *)str, len, 0)) {
|
|
||||||
/* jump to the next key/val pair */
|
|
||||||
p = lpNext(o->ptr, p);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
/* add key object */
|
|
||||||
addReplyBulkCBuffer(c, str, len);
|
|
||||||
cur_length++;
|
|
||||||
/* add value object */
|
|
||||||
if (!no_values) {
|
|
||||||
str = lpGet(p, &len, intbuf);
|
|
||||||
addReplyBulkCBuffer(c, str, len);
|
|
||||||
cur_length++;
|
|
||||||
}
|
|
||||||
p = lpNext(o->ptr, p);
|
|
||||||
}
|
|
||||||
if (use_pattern)
|
|
||||||
setDeferredArrayLen(c,replylen,cur_length);
|
|
||||||
else
|
|
||||||
serverAssert(cur_length == array_reply_len); /* fail on corrupt data */
|
|
||||||
return;
|
|
||||||
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
|
||||||
int64_t len;
|
|
||||||
long long expire_at;
|
|
||||||
unsigned char *lp = hashTypeListpackGetLp(o);
|
|
||||||
unsigned char *p = lpFirst(lp);
|
|
||||||
unsigned char *str, *val;
|
|
||||||
unsigned char intbuf[LP_INTBUF_SIZE];
|
|
||||||
void *replylen = NULL;
|
|
||||||
|
|
||||||
listRelease(keys);
|
|
||||||
/* Reply to the client. */
|
|
||||||
addReplyArrayLen(c, 2);
|
|
||||||
/* Cursor is always 0 given we iterate over all set */
|
|
||||||
addReplyBulkLongLong(c,0);
|
|
||||||
/* In the case of OBJ_ENCODING_LISTPACK_EX we always defer the reply size given some fields might be expired */
|
|
||||||
replylen = addReplyDeferredLen(c);
|
|
||||||
unsigned long cur_length = 0;
|
|
||||||
|
|
||||||
while (p) {
|
|
||||||
str = lpGet(p, &len, intbuf);
|
|
||||||
p = lpNext(lp, p);
|
|
||||||
val = p; /* Keep pointer to value */
|
|
||||||
|
|
||||||
p = lpNext(lp, p);
|
|
||||||
serverAssert(p && lpGetIntegerValue(p, &expire_at));
|
|
||||||
|
|
||||||
if (hashTypeIsExpired(o, expire_at) ||
|
|
||||||
(use_pattern && !stringmatchlen(pat, patlen, (char *)str, len, 0)))
|
|
||||||
{
|
|
||||||
/* jump to the next key/val pair */
|
|
||||||
p = lpNext(lp, p);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* add key object */
|
|
||||||
addReplyBulkCBuffer(c, str, len);
|
|
||||||
cur_length++;
|
|
||||||
/* add value object */
|
|
||||||
if (!no_values) {
|
|
||||||
str = lpGet(val, &len, intbuf);
|
|
||||||
addReplyBulkCBuffer(c, str, len);
|
|
||||||
cur_length++;
|
|
||||||
}
|
|
||||||
p = lpNext(lp, p);
|
|
||||||
}
|
|
||||||
setDeferredArrayLen(c,replylen,cur_length);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
serverPanic("Not handled encoding in SCAN.");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Step 3: Reply to the client. */
|
|
||||||
addReplyArrayLen(c, 2);
|
|
||||||
addReplyBulkLongLong(c,cursor);
|
|
||||||
|
|
||||||
unsigned long long idx = 0;
|
|
||||||
addReplyArrayLen(c, listLength(keys));
|
|
||||||
while ((node = listFirst(keys)) != NULL) {
|
|
||||||
void *key = listNodeValue(node);
|
|
||||||
/* For HSCAN, list will contain keys value pairs unless no_values arg
|
|
||||||
* was given. We should call mstrlen for the keys only. */
|
|
||||||
int hfieldkey = isKeysHfield && (no_values || (idx++ % 2 == 0));
|
|
||||||
addReplyBulkCBuffer(c, key, hfieldkey ? mstrlen(key) : sdslen(key));
|
|
||||||
listDelNode(keys, node);
|
|
||||||
}
|
|
||||||
|
|
||||||
listRelease(keys);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* The SCAN command completely relies on scanGenericCommand. */
|
/* Scan hash table (used by SCAN, SSCAN, HSCAN, ZSCAN with hash table encodings) */
|
||||||
|
void scanHashTable(client *c, robj *o, dict *ht, scanOptions *opts, int isKeysHfield) {
|
||||||
|
scanData data;
|
||||||
|
void *items_stack_buffer[SCAN_KEYS_INITIAL_CAPACITY];
|
||||||
|
|
||||||
|
/* We set the max number of iterations to ten times the specified
|
||||||
|
* COUNT, so if the hash table is in a pathological state (very
|
||||||
|
* sparsely populated) we avoid to block too much time at the cost
|
||||||
|
* of returning no or very few elements. */
|
||||||
|
long maxiterations = opts->count * 10;
|
||||||
|
|
||||||
|
/* We pass scanData to the callback with:
|
||||||
|
* 1. data.array_items: the dynamic array to collect scan results;
|
||||||
|
* 2. data.o: the object containing the dictionary so that
|
||||||
|
* it is possible to fetch more data in a type-dependent way;
|
||||||
|
* 3. data.type: the specified type scan in the db, LLONG_MAX means
|
||||||
|
* type matching is no needed;
|
||||||
|
* 4. data.pattern: the pattern string;
|
||||||
|
* 5. data.sampled: the maxiteration limit is there in case we're
|
||||||
|
* working on an empty dict, one with a lot of empty buckets, and
|
||||||
|
* for the buckets are not empty, we need to limit the spampled number
|
||||||
|
* to prevent a long hang time caused by filtering too many keys;
|
||||||
|
* 6. data.no_values: to control whether values will be returned or
|
||||||
|
* only keys are returned. */
|
||||||
|
data = (scanData) {
|
||||||
|
.array_items = items_stack_buffer,
|
||||||
|
.array_capacity = SCAN_KEYS_INITIAL_CAPACITY,
|
||||||
|
.array_stack_buffer = items_stack_buffer,
|
||||||
|
.array_count = 0,
|
||||||
|
.array_stores_pairs = 0, /* Will be set by scanCallback */
|
||||||
|
.o = o,
|
||||||
|
.type = opts->type,
|
||||||
|
.pattern = opts->use_pattern ? opts->pattern : NULL,
|
||||||
|
.sampled = 0,
|
||||||
|
.no_values = opts->no_values,
|
||||||
|
.strlen = (isKeysHfield) ? hfieldlen : sdslen,
|
||||||
|
.typename = opts->typename,
|
||||||
|
.db = c->db,
|
||||||
|
};
|
||||||
|
|
||||||
|
/* A pattern may restrict all matching keys to one cluster slot. */
|
||||||
|
int onlydidx = -1;
|
||||||
|
if (o == NULL && opts->use_pattern && server.cluster_enabled) {
|
||||||
|
onlydidx = patternHashSlot(opts->pattern, opts->patlen);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Scan the hash table */
|
||||||
|
do {
|
||||||
|
if (o == NULL) {
|
||||||
|
opts->cursor = kvstoreScan(c->db->keys, opts->cursor, onlydidx, scanCallback, NULL, &data);
|
||||||
|
} else {
|
||||||
|
opts->cursor = dictScan(ht, opts->cursor, scanCallback, &data);
|
||||||
|
}
|
||||||
|
} while (opts->cursor && maxiterations-- && data.sampled < opts->count);
|
||||||
|
|
||||||
|
/* Reply to the client. */
|
||||||
|
addReplyArrayLen(c, 2);
|
||||||
|
addReplyBulkLongLong(c, opts->cursor);
|
||||||
|
|
||||||
|
/* Generate response from array (unified for all scan commands) */
|
||||||
|
addReplyArrayLen(c, data.array_count);
|
||||||
|
|
||||||
|
for (int i = 0; i < data.array_count; i++) {
|
||||||
|
void *item = data.array_items[i];
|
||||||
|
|
||||||
|
/* Special case: HSCAN fields need mstrlen, everything else uses sdslen */
|
||||||
|
if (o && o->type == OBJ_HASH &&
|
||||||
|
(!data.array_stores_pairs || (i % 2 == 0))) {
|
||||||
|
/* HSCAN field or single field (NOVALUES) */
|
||||||
|
addReplyBulkCBuffer(c, item, mstrlen((char*)item));
|
||||||
|
} else {
|
||||||
|
/* Everything else: SCAN keys, SSCAN members, ZSCAN members/scores, HSCAN values */
|
||||||
|
addReplyBulkCBuffer(c, item, sdslen((sds)item));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Cleanup: free allocated values for ZSCAN */
|
||||||
|
if (o && o->type == OBJ_ZSET) {
|
||||||
|
for (int i = 0; i < data.array_count; i++) {
|
||||||
|
sdsfree((sds)data.array_items[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Cleanup: free heap-allocated array if needed */
|
||||||
|
if (data.array_items != items_stack_buffer) {
|
||||||
|
zfree(data.array_items);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Scan set (used by SSCAN with intset encoding) */
|
||||||
|
void scanIntSet(client *c, robj *o, scanOptions *opts) {
|
||||||
|
unsigned long array_reply_len = 0;
|
||||||
|
void *replylen = NULL;
|
||||||
|
char *str;
|
||||||
|
char buf[LONG_STR_SIZE];
|
||||||
|
size_t len;
|
||||||
|
int64_t llele;
|
||||||
|
|
||||||
|
/* Reply to the client. */
|
||||||
|
addReplyArrayLen(c, 2);
|
||||||
|
/* Cursor is always 0 given we iterate over all set */
|
||||||
|
addReplyBulkLongLong(c, 0);
|
||||||
|
|
||||||
|
/* If there is no pattern the length is the entire set size, otherwise we defer the reply size */
|
||||||
|
if (opts->use_pattern)
|
||||||
|
replylen = addReplyDeferredLen(c);
|
||||||
|
else {
|
||||||
|
array_reply_len = setTypeSize(o);
|
||||||
|
addReplyArrayLen(c, array_reply_len);
|
||||||
|
}
|
||||||
|
|
||||||
|
setTypeIterator *si = setTypeInitIterator(o);
|
||||||
|
unsigned long cur_length = 0;
|
||||||
|
while (setTypeNext(si, &str, &len, &llele) != -1) {
|
||||||
|
if (str == NULL) {
|
||||||
|
len = ll2string(buf, sizeof(buf), llele);
|
||||||
|
}
|
||||||
|
char *key = str ? str : buf;
|
||||||
|
if (opts->use_pattern && !stringmatchlen(opts->pattern, opts->patlen, key, len, 0)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
addReplyBulkCBuffer(c, key, len);
|
||||||
|
cur_length++;
|
||||||
|
}
|
||||||
|
setTypeReleaseIterator(si);
|
||||||
|
|
||||||
|
if (opts->use_pattern)
|
||||||
|
setDeferredArrayLen(c, replylen, cur_length);
|
||||||
|
else
|
||||||
|
serverAssert(cur_length == array_reply_len); /* fail on corrupt data */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Scan listpack (used by HSCAN, ZSCAN with listpack encodings) */
|
||||||
|
void scanListpack(client *c, robj *o, scanOptions *opts) {
|
||||||
|
unsigned char *p = lpFirst(o->ptr);
|
||||||
|
unsigned char *str;
|
||||||
|
int64_t len;
|
||||||
|
unsigned long array_reply_len = 0;
|
||||||
|
unsigned char intbuf[LP_INTBUF_SIZE];
|
||||||
|
void *replylen = NULL;
|
||||||
|
|
||||||
|
/* Reply to the client. */
|
||||||
|
addReplyArrayLen(c, 2);
|
||||||
|
/* Cursor is always 0 given we iterate over all listpack */
|
||||||
|
addReplyBulkLongLong(c, 0);
|
||||||
|
|
||||||
|
/* If there is no pattern the length is the entire collection size, otherwise we defer the reply size */
|
||||||
|
if (opts->use_pattern)
|
||||||
|
replylen = addReplyDeferredLen(c);
|
||||||
|
else {
|
||||||
|
array_reply_len = o->type == OBJ_HASH ? hashTypeLength(o, 0) : zsetLength(o);
|
||||||
|
if (!opts->no_values) {
|
||||||
|
array_reply_len *= 2;
|
||||||
|
}
|
||||||
|
addReplyArrayLen(c, array_reply_len);
|
||||||
|
}
|
||||||
|
|
||||||
|
unsigned long cur_length = 0;
|
||||||
|
while(p) {
|
||||||
|
str = lpGet(p, &len, intbuf);
|
||||||
|
/* point to the value */
|
||||||
|
p = lpNext(o->ptr, p);
|
||||||
|
if (opts->use_pattern && !stringmatchlen(opts->pattern, opts->patlen, (char *)str, len, 0)) {
|
||||||
|
/* jump to the next key/val pair */
|
||||||
|
p = lpNext(o->ptr, p);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
/* add key object */
|
||||||
|
addReplyBulkCBuffer(c, str, len);
|
||||||
|
cur_length++;
|
||||||
|
/* add value object */
|
||||||
|
if (!opts->no_values) {
|
||||||
|
str = lpGet(p, &len, intbuf);
|
||||||
|
addReplyBulkCBuffer(c, str, len);
|
||||||
|
cur_length++;
|
||||||
|
}
|
||||||
|
p = lpNext(o->ptr, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opts->use_pattern)
|
||||||
|
setDeferredArrayLen(c, replylen, cur_length);
|
||||||
|
else
|
||||||
|
serverAssert(cur_length == array_reply_len); /* fail on corrupt data */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Scan listpack with expiration (used by HSCAN with listpack_ex encoding) */
|
||||||
|
void scanListpackEx(client *c, robj *o, scanOptions *opts) {
|
||||||
|
int64_t len;
|
||||||
|
long long expire_at;
|
||||||
|
unsigned char *lp = hashTypeListpackGetLp(o);
|
||||||
|
unsigned char *p = lpFirst(lp);
|
||||||
|
unsigned char *str, *val;
|
||||||
|
unsigned char intbuf[LP_INTBUF_SIZE];
|
||||||
|
void *replylen = NULL;
|
||||||
|
|
||||||
|
/* Reply to the client. */
|
||||||
|
addReplyArrayLen(c, 2);
|
||||||
|
/* Cursor is always 0 given we iterate over all listpack */
|
||||||
|
addReplyBulkLongLong(c, 0);
|
||||||
|
/* In the case of OBJ_ENCODING_LISTPACK_EX we always defer the reply size given some fields might be expired */
|
||||||
|
replylen = addReplyDeferredLen(c);
|
||||||
|
unsigned long cur_length = 0;
|
||||||
|
|
||||||
|
while (p) {
|
||||||
|
str = lpGet(p, &len, intbuf);
|
||||||
|
p = lpNext(lp, p);
|
||||||
|
val = p; /* Keep pointer to value */
|
||||||
|
|
||||||
|
p = lpNext(lp, p);
|
||||||
|
serverAssert(p && lpGetIntegerValue(p, &expire_at));
|
||||||
|
|
||||||
|
if (hashTypeIsExpired(o, expire_at) ||
|
||||||
|
(opts->use_pattern && !stringmatchlen(opts->pattern, opts->patlen, (char *)str, len, 0)))
|
||||||
|
{
|
||||||
|
/* jump to the next key/val pair */
|
||||||
|
p = lpNext(lp, p);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* add key object */
|
||||||
|
addReplyBulkCBuffer(c, str, len);
|
||||||
|
cur_length++;
|
||||||
|
/* add value object */
|
||||||
|
if (!opts->no_values) {
|
||||||
|
str = lpGet(val, &len, intbuf);
|
||||||
|
addReplyBulkCBuffer(c, str, len);
|
||||||
|
cur_length++;
|
||||||
|
}
|
||||||
|
p = lpNext(lp, p);
|
||||||
|
}
|
||||||
|
setDeferredArrayLen(c, replylen, cur_length);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* The SCAN command directly uses scanHashTable for database keys. */
|
||||||
void scanCommand(client *c) {
|
void scanCommand(client *c) {
|
||||||
unsigned long long cursor;
|
scanOptions opts;
|
||||||
if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
|
|
||||||
scanGenericCommand(c,NULL,cursor);
|
if (parseScanCursorOrReply(c, c->argv[1], &opts.cursor) == C_ERR) return;
|
||||||
|
|
||||||
|
if (parseScanOptionsOrReply(c, NULL, 2, &opts) == C_ERR) return;
|
||||||
|
|
||||||
|
scanHashTable(c, NULL, NULL, &opts, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dbsizeCommand(client *c) {
|
void dbsizeCommand(client *c) {
|
||||||
|
|
17
src/server.h
17
src/server.h
|
@ -3715,12 +3715,27 @@ long long dbTotalServerKeyCount(void);
|
||||||
redisDb *initTempDb(void);
|
redisDb *initTempDb(void);
|
||||||
void discardTempDb(redisDb *tempDb);
|
void discardTempDb(redisDb *tempDb);
|
||||||
|
|
||||||
|
/* Options for SCAN commands (SCAN, HSCAN, SSCAN, ZSCAN) */
|
||||||
|
typedef struct {
|
||||||
|
unsigned long long cursor; /* Cursor position */
|
||||||
|
long count; /* COUNT option value */
|
||||||
|
sds pattern; /* MATCH pattern string */
|
||||||
|
int patlen; /* Pattern length */
|
||||||
|
int use_pattern; /* Whether to use pattern matching */
|
||||||
|
long long type; /* TYPE filter for SCAN command */
|
||||||
|
sds typename; /* TYPE name string */
|
||||||
|
int no_values; /* NOVALUES option for HSCAN */
|
||||||
|
} scanOptions;
|
||||||
|
|
||||||
int selectDb(client *c, int id);
|
int selectDb(client *c, int id);
|
||||||
void signalModifiedKey(client *c, redisDb *db, robj *key);
|
void signalModifiedKey(client *c, redisDb *db, robj *key);
|
||||||
void signalFlushedDb(int dbid, int async);
|
void signalFlushedDb(int dbid, int async);
|
||||||
void scanGenericCommand(client *c, robj *o, unsigned long long cursor);
|
void scanHashTable(client *c, robj *o, dict *ht, scanOptions *opts, int isKeysHfield);
|
||||||
|
void scanListpack(client *c, robj *o, scanOptions *opts);
|
||||||
|
void scanListpackEx(client *c, robj *o, scanOptions *opts);
|
||||||
|
void scanIntSet(client *c, robj *o, scanOptions *opts);
|
||||||
int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor);
|
int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor);
|
||||||
|
int parseScanOptionsOrReply(client *c, robj *o, int start_argc, scanOptions *opts);
|
||||||
int dbAsyncDelete(redisDb *db, robj *key);
|
int dbAsyncDelete(redisDb *db, robj *key);
|
||||||
void emptyDbAsync(redisDb *db);
|
void emptyDbAsync(redisDb *db);
|
||||||
size_t lazyfreeGetPendingObjectsCount(void);
|
size_t lazyfreeGetPendingObjectsCount(void);
|
||||||
|
|
23
src/t_hash.c
23
src/t_hash.c
|
@ -3033,14 +3033,25 @@ void hexistsCommand(client *c) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void hscanCommand(client *c) {
|
void hscanCommand(client *c) {
|
||||||
kvobj *o;
|
robj *o;
|
||||||
unsigned long long cursor;
|
scanOptions opts;
|
||||||
|
|
||||||
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
|
if (parseScanCursorOrReply(c, c->argv[2], &opts.cursor) == C_ERR) return;
|
||||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
|
if ((o = lookupKeyReadOrReply(c, c->argv[1], shared.emptyscan)) == NULL ||
|
||||||
checkType(c,o,OBJ_HASH)) return;
|
checkType(c, o, OBJ_HASH)) return;
|
||||||
|
|
||||||
scanGenericCommand(c,o,cursor);
|
if (parseScanOptionsOrReply(c, o, 3, &opts) == C_ERR) return;
|
||||||
|
|
||||||
|
/* Handle hash encoding-specific scanning */
|
||||||
|
if (o->encoding == OBJ_ENCODING_HT) {
|
||||||
|
scanHashTable(c, o, o->ptr, &opts, 1);
|
||||||
|
} else if (o->encoding == OBJ_ENCODING_LISTPACK) {
|
||||||
|
scanListpack(c, o, &opts);
|
||||||
|
} else if (o->encoding == OBJ_ENCODING_LISTPACK_EX) {
|
||||||
|
scanListpackEx(c, o, &opts);
|
||||||
|
} else {
|
||||||
|
serverPanic("Not handled encoding in HSCAN.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
|
static void hrandfieldReplyWithListpack(client *c, unsigned int count, listpackEntry *keys, listpackEntry *vals) {
|
||||||
|
|
19
src/t_set.c
19
src/t_set.c
|
@ -1731,11 +1731,18 @@ void sdiffstoreCommand(client *c) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void sscanCommand(client *c) {
|
void sscanCommand(client *c) {
|
||||||
kvobj *set;
|
robj *set;
|
||||||
unsigned long long cursor;
|
scanOptions opts;
|
||||||
|
|
||||||
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
|
if (parseScanCursorOrReply(c, c->argv[2], &opts.cursor) == C_ERR) return;
|
||||||
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
|
if ((set = lookupKeyReadOrReply(c, c->argv[1], shared.emptyscan)) == NULL ||
|
||||||
checkType(c,set,OBJ_SET)) return;
|
checkType(c, set, OBJ_SET)) return;
|
||||||
scanGenericCommand(c,set,cursor);
|
|
||||||
|
if (parseScanOptionsOrReply(c, set, 3, &opts) == C_ERR) return;
|
||||||
|
|
||||||
|
if(set->encoding == OBJ_ENCODING_HT) {
|
||||||
|
scanHashTable(c, set, set->ptr, &opts, 0);
|
||||||
|
} else {
|
||||||
|
scanIntSet(c,set, &opts);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
20
src/t_zset.c
20
src/t_zset.c
|
@ -3900,13 +3900,21 @@ void zrevrankCommand(client *c) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void zscanCommand(client *c) {
|
void zscanCommand(client *c) {
|
||||||
kvobj *o;
|
robj *o;
|
||||||
unsigned long long cursor;
|
scanOptions opts;
|
||||||
|
|
||||||
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == C_ERR) return;
|
if (parseScanCursorOrReply(c, c->argv[2], &opts.cursor) == C_ERR) return;
|
||||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
|
if ((o = lookupKeyReadOrReply(c, c->argv[1], shared.emptyscan)) == NULL ||
|
||||||
checkType(c,o,OBJ_ZSET)) return;
|
checkType(c, o, OBJ_ZSET)) return;
|
||||||
scanGenericCommand(c,o,cursor);
|
|
||||||
|
if (parseScanOptionsOrReply(c, o, 3, &opts) == C_ERR) return;
|
||||||
|
|
||||||
|
if(o->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||||
|
zset *zs = o->ptr;
|
||||||
|
scanHashTable(c, o, zs->dict, &opts, 0);
|
||||||
|
} else {
|
||||||
|
scanListpack(c, o, &opts);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This command implements the generic zpop operation, used by:
|
/* This command implements the generic zpop operation, used by:
|
||||||
|
|
Loading…
Reference in New Issue