diff --git a/.github/workflows/redis_docs_sync.yaml b/.github/workflows/redis_docs_sync.yaml index 2d6ef96cf..508b8839d 100644 --- a/.github/workflows/redis_docs_sync.yaml +++ b/.github/workflows/redis_docs_sync.yaml @@ -21,4 +21,15 @@ jobs: GH_TOKEN: ${{ steps.generate-token.outputs.token }} RELEASE_NAME: ${{ github.event.release.tag_name }} run: | - gh workflow run -R redis/docs redis_docs_sync.yaml -f release="${RELEASE_NAME}" \ No newline at end of file + LATEST_RELEASE=$( + curl -Ls \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer ${GH_TOKEN}" \ + -H "X-GitHub-Api-Version: 2022-11-28" \ + https://api.github.com/repos/redis/redis/releases/latest \ + | jq -r '.tag_name' + ) + + if [[ "${LATEST_RELEASE}" == "${RELEASE_NAME}" ]]; then + gh workflow run -R redis/docs redis_docs_sync.yaml -f release="${RELEASE_NAME}" + fi diff --git a/deps/lua/src/lua_bit.c b/deps/lua/src/lua_bit.c index 9f83b8594..7e43faea4 100644 --- a/deps/lua/src/lua_bit.c +++ b/deps/lua/src/lua_bit.c @@ -132,6 +132,7 @@ static int bit_tohex(lua_State *L) const char *hexdigits = "0123456789abcdef"; char buf[8]; int i; + if (n == INT32_MIN) n = INT32_MIN+1; if (n < 0) { n = -n; hexdigits = "0123456789ABCDEF"; } if (n > 8) n = 8; for (i = (int)n; --i >= 0; ) { buf[i] = hexdigits[b & 15]; b >>= 4; } diff --git a/modules/redisbloom/Makefile b/modules/redisbloom/Makefile index bb10b8308..02f52eaf1 100644 --- a/modules/redisbloom/Makefile +++ b/modules/redisbloom/Makefile @@ -1,5 +1,5 @@ SRC_DIR = src -MODULE_VERSION = v2.8.2 +MODULE_VERSION = v7.99.1 MODULE_REPO = https://github.com/redisbloom/redisbloom TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/redisbloom.so diff --git a/modules/redisearch/Makefile b/modules/redisearch/Makefile index 70452fb63..4fd69c7bd 100644 --- a/modules/redisearch/Makefile +++ b/modules/redisearch/Makefile @@ -1,7 +1,7 @@ SRC_DIR = src -MODULE_VERSION = v7.99.0 +MODULE_VERSION = v7.99.1 MODULE_REPO = https://github.com/redisearch/redisearch -TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/coord-oss/redisearch.so +TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/search-community/redisearch.so include ../common.mk diff --git a/modules/redisjson/Makefile b/modules/redisjson/Makefile index 098e1e961..90b11e8da 100644 --- a/modules/redisjson/Makefile +++ b/modules/redisjson/Makefile @@ -1,5 +1,5 @@ SRC_DIR = src -MODULE_VERSION = v2.8.4 +MODULE_VERSION = v7.99.1 MODULE_REPO = https://github.com/redisjson/redisjson TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/rejson.so diff --git a/modules/redistimeseries/Makefile b/modules/redistimeseries/Makefile index c7c7d3f31..43c548af5 100644 --- a/modules/redistimeseries/Makefile +++ b/modules/redistimeseries/Makefile @@ -1,5 +1,5 @@ SRC_DIR = src -MODULE_VERSION = v1.12.2 +MODULE_VERSION = v7.99.1 MODULE_REPO = https://github.com/redistimeseries/redistimeseries TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/redistimeseries.so diff --git a/src/acl.c b/src/acl.c index 6d698427f..699a3808a 100644 --- a/src/acl.c +++ b/src/acl.c @@ -1066,7 +1066,7 @@ int ACLSetSelector(aclSelector *selector, const char* op, size_t oplen) { flags |= ACL_READ_PERMISSION; } else if (toupper(op[offset]) == 'W' && !(flags & ACL_WRITE_PERMISSION)) { flags |= ACL_WRITE_PERMISSION; - } else if (op[offset] == '~') { + } else if (op[offset] == '~' && flags) { offset++; break; } else { diff --git a/src/bio.c b/src/bio.c index 6f96ef709..0bddf8a6b 100644 --- a/src/bio.c +++ b/src/bio.c @@ -81,6 +81,7 @@ static int job_comp_pipe[2]; /* Pipe used to awake the event loop */ typedef struct bio_comp_item { comp_fn *func; /* callback after completion job will be processed */ uint64_t arg; /* user data to be passed to the function */ + void *ptr; /* user pointer to be passed to the function */ } bio_comp_item; /* This structure represents a background Job. It is only used locally to this @@ -110,6 +111,7 @@ typedef union bio_job { int type; /* header */ comp_fn *fn; /* callback. Handover to main thread to cb as notify for job completion */ uint64_t arg; /* callback arguments */ + void *ptr; /* callback pointer */ } comp_rq; } bio_job; @@ -200,7 +202,7 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) { bioSubmitJob(BIO_LAZY_FREE, job); } -void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data) { +void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data, void *user_ptr) { int type; switch (assigned_worker) { case BIO_WORKER_CLOSE_FILE: @@ -219,6 +221,7 @@ void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_ bio_job *job = zmalloc(sizeof(*job)); job->comp_rq.fn = func; job->comp_rq.arg = user_data; + job->comp_rq.ptr = user_ptr; bioSubmitJob(type, job); } @@ -339,6 +342,7 @@ void *bioProcessBackgroundJobs(void *arg) { bio_comp_item *comp_rsp = zmalloc(sizeof(bio_comp_item)); comp_rsp->func = job->comp_rq.fn; comp_rsp->arg = job->comp_rq.arg; + comp_rsp->ptr = job->comp_rq.ptr; /* just write it to completion job responses */ pthread_mutex_lock(&bio_mutex_comp); @@ -432,7 +436,7 @@ void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask) { listNode *ln = listFirst(tmp_list); bio_comp_item *rsp = ln->value; listDelNode(tmp_list, ln); - rsp->func(rsp->arg); + rsp->func(rsp->arg, rsp->ptr); zfree(rsp); } listRelease(tmp_list); diff --git a/src/bio.h b/src/bio.h index 2679a2bf5..614736c20 100644 --- a/src/bio.h +++ b/src/bio.h @@ -10,7 +10,7 @@ #define __BIO_H typedef void lazy_free_fn(void *args[]); -typedef void comp_fn(uint64_t user_data); +typedef void comp_fn(uint64_t user_data, void *user_ptr); typedef enum bio_worker_t { BIO_WORKER_CLOSE_FILE = 0, @@ -40,7 +40,7 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache); void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache); void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache); void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...); -void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data); +void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data, void *user_ptr); #endif diff --git a/src/bitops.c b/src/bitops.c index c0388a15d..2222c05ea 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -16,18 +16,49 @@ /* Count number of bits set in the binary array pointed by 's' and long * 'count' bytes. The implementation of this function is required to * work with an input string length up to 512 MB or more (server.proto_max_bulk_len) */ +ATTRIBUTE_TARGET_POPCNT long long redisPopcount(void *s, long count) { long long bits = 0; unsigned char *p = s; uint32_t *p4; +#if defined(HAVE_POPCNT) + int use_popcnt = __builtin_cpu_supports("popcnt"); /* Check if CPU supports POPCNT instruction. */ +#else + int use_popcnt = 0; /* Assume CPU does not support POPCNT if + * __builtin_cpu_supports() is not available. */ +#endif static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8}; - - /* Count initial bytes not aligned to 32 bit. */ - while((unsigned long)p & 3 && count) { + + /* Count initial bytes not aligned to 64-bit when using the POPCNT instruction, + * otherwise align to 32-bit. */ + int align = use_popcnt ? 7 : 3; + while ((unsigned long)p & align && count) { bits += bitsinbyte[*p++]; count--; } + if (likely(use_popcnt)) { + /* Use separate counters to make the CPU think there are no + * dependencies between these popcnt operations. */ + uint64_t cnt[4]; + memset(cnt, 0, sizeof(cnt)); + + /* Count bits 32 bytes at a time by using popcnt. + * Unroll the loop to avoid the overhead of a single popcnt per iteration, + * allowing the CPU to extract more instruction-level parallelism. + * Reference: https://danluu.com/assembly-intrinsics/ */ + while (count >= 32) { + cnt[0] += __builtin_popcountll(*(uint64_t*)(p)); + cnt[1] += __builtin_popcountll(*(uint64_t*)(p + 8)); + cnt[2] += __builtin_popcountll(*(uint64_t*)(p + 16)); + cnt[3] += __builtin_popcountll(*(uint64_t*)(p + 24)); + count -= 32; + p += 32; + } + bits += cnt[0] + cnt[1] + cnt[2] + cnt[3]; + goto remain; + } + /* Count bits 28 bytes at a time */ p4 = (uint32_t*)p; while(count>=28) { @@ -64,8 +95,10 @@ long long redisPopcount(void *s, long count) { ((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) + ((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24; } - /* Count the remaining bytes. */ p = (unsigned char*)p4; + +remain: + /* Count the remaining bytes. */ while(count--) bits += bitsinbyte[*p++]; return bits; } diff --git a/src/cluster.c b/src/cluster.c index 3d8c716bd..876b1327f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1558,6 +1558,126 @@ void readonlyCommand(client *c) { addReply(c,shared.ok); } +void replySlotsFlushAndFree(client *c, SlotsFlush *sflush) { + addReplyArrayLen(c, sflush->numRanges); + for (int i = 0 ; i < sflush->numRanges ; i++) { + addReplyArrayLen(c, 2); + addReplyLongLong(c, sflush->ranges[i].first); + addReplyLongLong(c, sflush->ranges[i].last); + } + zfree(sflush); +} + +/* Partially flush destination DB in a cluster node, based on the slot range. + * + * Usage: SFLUSH [ ]* [SYNC|ASYNC] + * + * This is an initial implementation of SFLUSH (slots flush) which is limited to + * flushing a single shard as a whole, but in the future the same command may be + * used to partially flush a shard based on hash slots. Currently only if provided + * slots cover entirely the slots of a node, the node will be flushed and the + * return value will be pairs of slot ranges. Otherwise, a single empty set will + * be returned. If possible, SFLUSH SYNC will be run as blocking ASYNC as an + * optimization. + */ +void sflushCommand(client *c) { + int flags = EMPTYDB_NO_FLAGS, argc = c->argc; + + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); + return; + } + + /* check if last argument is SYNC or ASYNC */ + if (!strcasecmp(c->argv[c->argc-1]->ptr,"sync")) { + flags = EMPTYDB_NO_FLAGS; + argc--; + } else if (!strcasecmp(c->argv[c->argc-1]->ptr,"async")) { + flags = EMPTYDB_ASYNC; + argc--; + } else if (server.lazyfree_lazy_user_flush) { + flags = EMPTYDB_ASYNC; + } + + /* parse the slot range */ + if (argc % 2 == 0) { + addReplyErrorArity(c); + return; + } + + /* Verify slot pairs are valid and not overlapping */ + long long j, first, last; + unsigned char slotsToFlushRq[CLUSTER_SLOTS] = {0}; + for (j = 1; j < argc; j += 2) { + /* check if the first slot is valid */ + if (getLongLongFromObject(c->argv[j], &first) != C_OK || first < 0 || first >= CLUSTER_SLOTS) { + addReplyError(c,"Invalid or out of range slot"); + return; + } + + /* check if the last slot is valid */ + if (getLongLongFromObject(c->argv[j+1], &last) != C_OK || last < 0 || last >= CLUSTER_SLOTS) { + addReplyError(c,"Invalid or out of range slot"); + return; + } + + if (first > last) { + addReplyErrorFormat(c,"start slot number %lld is greater than end slot number %lld", first, last); + return; + } + + /* Mark the slots in slotsToFlushRq[] */ + for (int i = first; i <= last; i++) { + if (slotsToFlushRq[i]) { + addReplyErrorFormat(c, "Slot %d specified multiple times", i); + return; + } + slotsToFlushRq[i] = 1; + } + } + + /* Verify slotsToFlushRq[] covers ALL slots of myNode. */ + clusterNode *myNode = getMyClusterNode(); + /* During iteration trace also the slot range pairs and save in SlotsFlush. + * It is allocated on heap since there is a chance that FLUSH SYNC will be + * running as blocking ASYNC and only later reply with slot ranges */ + int capacity = 32; /* Initial capacity */ + SlotsFlush *sflush = zmalloc(sizeof(SlotsFlush) + sizeof(SlotRange) * capacity); + sflush->numRanges = 0; + int inSlotRange = 0; + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (myNode == getNodeBySlot(i)) { + if (!slotsToFlushRq[i]) { + addReplySetLen(c, 0); /* Not all slots of mynode got covered. See sflushCommand() comment. */ + zfree(sflush); + return; + } + + if (!inSlotRange) { /* If start another slot range */ + sflush->ranges[sflush->numRanges].first = i; + inSlotRange = 1; + } + } else { + if (inSlotRange) { /* If end another slot range */ + sflush->ranges[sflush->numRanges++].last = i - 1; + inSlotRange = 0; + /* If reached 'sflush' capacity, double the capacity */ + if (sflush->numRanges >= capacity) { + capacity *= 2; + sflush = zrealloc(sflush, sizeof(SlotsFlush) + sizeof(SlotRange) * capacity); + } + } + } + } + + /* Update last pair if last cluster slot is also end of last range */ + if (inSlotRange) sflush->ranges[sflush->numRanges++].last = CLUSTER_SLOTS - 1; + + /* Flush selected slots. If not flush as blocking async, then reply immediately */ + if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, sflush) == 0) + replySlotsFlushAndFree(c, sflush); +} + /* The READWRITE command just clears the READONLY command state. */ void readwriteCommand(client *c) { if (server.cluster_enabled == 0) { diff --git a/src/commands/sflush.json b/src/commands/sflush.json new file mode 100644 index 000000000..bac27ad2d --- /dev/null +++ b/src/commands/sflush.json @@ -0,0 +1,75 @@ +{ + "SFLUSH": { + "summary": "Remove all keys from selected range of slots.", + "complexity": "O(N)+O(k) where N is the number of keys and k is the number of slots.", + "group": "server", + "since": "8.0.0", + "arity": -3, + "function": "sflushCommand", + "command_flags": [ + "WRITE", + "EXPERIMENTAL" + ], + "acl_categories": [ + "KEYSPACE", + "DANGEROUS" + ], + "command_tips": [ + ], + "reply_schema": { + "description": "List of slot ranges", + "type": "array", + "minItems": 0, + "maxItems": 4294967295, + "items": { + "type": "array", + "minItems": 2, + "maxItems": 2, + "items": [ + { + "description": "start slot number", + "type": "integer" + }, + { + "description": "end slot number", + "type": "integer" + } + ] + } + }, + "arguments": [ + { + "name": "data", + "type": "block", + "multiple": true, + "arguments": [ + { + "name": "slot-start", + "type": "integer" + }, + { + "name": "slot-last", + "type": "integer" + } + ] + }, + { + "name": "flush-type", + "type": "oneof", + "optional": true, + "arguments": [ + { + "name": "async", + "type": "pure-token", + "token": "ASYNC" + }, + { + "name": "sync", + "type": "pure-token", + "token": "SYNC" + } + ] + } + ] + } +} diff --git a/src/config.h b/src/config.h index 6910951f8..ae072c9df 100644 --- a/src/config.h +++ b/src/config.h @@ -307,4 +307,15 @@ void setcpuaffinity(const char *cpulist); #define HAVE_FADVISE #endif +#if defined(__x86_64__) && ((defined(__GNUC__) && __GNUC__ > 5) || (defined(__clang__))) + #if defined(__has_attribute) && __has_attribute(target) + #define HAVE_POPCNT + #define ATTRIBUTE_TARGET_POPCNT __attribute__((target("popcnt"))) + #else + #define ATTRIBUTE_TARGET_POPCNT + #endif +#else + #define ATTRIBUTE_TARGET_POPCNT +#endif + #endif diff --git a/src/db.c b/src/db.c index b0f25f262..49c374fb1 100644 --- a/src/db.c +++ b/src/db.c @@ -375,23 +375,22 @@ robj *dbRandomKey(redisDb *db) { key = dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); - if (dbFindExpires(db, key)) { - if (allvolatile && server.masterhost && --maxtries == 0) { - /* If the DB is composed only of keys with an expire set, - * it could happen that all the keys are already logically - * expired in the slave, so the function cannot stop because - * expireIfNeeded() is false, nor it can stop because - * dictGetFairRandomKey() returns NULL (there are keys to return). - * To prevent the infinite loop we do some tries, but if there - * are the conditions for an infinite loop, eventually we - * return a key name that may be already expired. */ - return keyobj; - } - if (expireIfNeeded(db,keyobj,0) != KEY_VALID) { - decrRefCount(keyobj); - continue; /* search for another key. This expired. */ - } + if (allvolatile && server.masterhost && --maxtries == 0) { + /* If the DB is composed only of keys with an expire set, + * it could happen that all the keys are already logically + * expired in the slave, so the function cannot stop because + * expireIfNeeded() is false, nor it can stop because + * dictGetFairRandomKey() returns NULL (there are keys to return). + * To prevent the infinite loop we do some tries, but if there + * are the conditions for an infinite loop, eventually we + * return a key name that may be already expired. */ + return keyobj; } + if (expireIfNeeded(db,keyobj,0) != KEY_VALID) { + decrRefCount(keyobj); + continue; /* search for another key. This expired. */ + } + return keyobj; } } @@ -730,9 +729,12 @@ void flushAllDataAndResetRDB(int flags) { #endif } -/* Optimized FLUSHALL\FLUSHDB SYNC command finished to run by lazyfree thread */ -void flushallSyncBgDone(uint64_t client_id) { - +/* CB function on blocking ASYNC FLUSH completion + * + * Utilized by commands SFLUSH, FLUSHALL and FLUSHDB. + */ +void flushallSyncBgDone(uint64_t client_id, void *sflush) { + SlotsFlush *slotsFlush = sflush; client *c = lookupClientByID(client_id); /* Verify that client still exists */ @@ -745,8 +747,11 @@ void flushallSyncBgDone(uint64_t client_id) { /* Don't update blocked_us since command was processed in bg by lazy_free thread */ updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0); - /* lazyfree bg job always succeed */ - addReply(c, shared.ok); + /* Only SFLUSH command pass pointer to `SlotsFlush` */ + if (slotsFlush) + replySlotsFlushAndFree(c, slotsFlush); + else + addReply(c, shared.ok); /* mark client as unblocked */ unblockClient(c, 1); @@ -761,10 +766,17 @@ void flushallSyncBgDone(uint64_t client_id) { server.current_client = old_client; } -void flushCommandCommon(client *c, int isFlushAll) { - int blocking_async = 0; /* FLUSHALL\FLUSHDB SYNC opt to run as blocking ASYNC */ - int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; +/* Common flush command implementation for FLUSHALL and FLUSHDB. + * + * Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC + * Return 0 otherwise + * + * sflush - provided only by SFLUSH command, otherwise NULL. Will be used on + * completion to reply with the slots flush result. Ownership is passed + * to the completion job in case of `blocking_async`. + */ +int flushCommandCommon(client *c, int type, int flags, SlotsFlush *sflush) { + int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */ /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) { @@ -773,7 +785,7 @@ void flushCommandCommon(client *c, int isFlushAll) { blocking_async = 1; } - if (isFlushAll) + if (type == FLUSH_TYPE_ALL) flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); else server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL); @@ -791,10 +803,9 @@ void flushCommandCommon(client *c, int isFlushAll) { c->bstate.timeout = 0; blockClient(c,BLOCKED_LAZYFREE); - bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id); - } else { - addReply(c, shared.ok); + bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id, sflush); } + #if defined(USE_JEMALLOC) /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. * for large databases, flushdb blocks for long anyway, so a bit more won't @@ -802,7 +813,7 @@ void flushCommandCommon(client *c, int isFlushAll) { * * Take care purge only FLUSHDB for sync flow. FLUSHALL sync flow already * applied at flushAllDataAndResetRDB. Async flow will apply only later on */ - if ((!isFlushAll) && (!(flags & EMPTYDB_ASYNC))) { + if ((type != FLUSH_TYPE_ALL) && (!(flags & EMPTYDB_ASYNC))) { /* Only clear the current thread cache. * Ignore the return call since this will fail if the tcache is disabled. */ je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); @@ -810,20 +821,32 @@ void flushCommandCommon(client *c, int isFlushAll) { jemalloc_purge(); } #endif + return blocking_async; } /* FLUSHALL [SYNC|ASYNC] * * Flushes the whole server data set. */ void flushallCommand(client *c) { - flushCommandCommon(c, 1); + int flags; + if (getFlushCommandFlags(c,&flags) == C_ERR) return; + + /* If FLUSH SYNC isn't running as blocking async, then reply */ + if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0) + addReply(c, shared.ok); } /* FLUSHDB [SYNC|ASYNC] * * Flushes the currently SELECTed Redis DB. */ void flushdbCommand(client *c) { - flushCommandCommon(c, 0); + int flags; + if (getFlushCommandFlags(c,&flags) == C_ERR) return; + + /* If FLUSH SYNC isn't running as blocking async, then reply */ + if (flushCommandCommon(c, FLUSH_TYPE_DB,flags, NULL) == 0) + addReply(c, shared.ok); + } /* This command implements DEL and UNLINK. */ @@ -1961,17 +1984,72 @@ long long getExpire(redisDb *db, robj *key) { return dictGetSignedIntegerVal(de); } -/* Delete the specified expired key and propagate expire. */ -void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { - mstime_t expire_latency; - latencyStartMonitor(expire_latency); - dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED); - latencyEndMonitor(expire_latency); - latencyAddSampleIfNeeded("expire-del",expire_latency); - notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id); + +/* Delete the specified expired or evicted key and propagate to replicas. + * Currently notify_type can only be NOTIFY_EXPIRED or NOTIFY_EVICTED, + * and it affects other aspects like the latency monitor event name and, + * which config to look for lazy free, stats var to increment, and so on. + * + * key_mem_freed is an out parameter which contains the estimated + * amount of memory freed due to the trimming (may be NULL) */ +static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, long long *key_mem_freed) { + mstime_t latency; + int del_flag = notify_type == NOTIFY_EXPIRED ? DB_FLAG_KEY_EXPIRED : DB_FLAG_KEY_EVICTED; + int lazy_flag = notify_type == NOTIFY_EXPIRED ? server.lazyfree_lazy_expire : server.lazyfree_lazy_eviction; + char *latency_name = notify_type == NOTIFY_EXPIRED ? "expire-del" : "evict-del"; + char *notify_name = notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"; + + /* The key needs to be converted from static to heap before deleted */ + int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; + if (static_key) { + keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); + } + + serverLog(LL_DEBUG,"key %s %s: deleting it", (char*)keyobj->ptr, notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"); + + /* We compute the amount of memory freed by db*Delete() alone. + * It is possible that actually the memory needed to propagate + * the DEL in AOF and replication link is greater than the one + * we are freeing removing the key, but we can't account for + * that otherwise we would never exit the loop. + * + * Same for CSC invalidation messages generated by signalModifiedKey. + * + * AOF and Output buffer memory will be freed eventually so + * we only care about memory used by the key space. + * + * The code here used to first propagate and then record delta + * using only zmalloc_used_memory but in CRDT we can't do that + * so we use freeMemoryGetNotCountedMemory to avoid counting + * AOF and slave buffers */ + if (key_mem_freed) *key_mem_freed = (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory(); + latencyStartMonitor(latency); + dbGenericDelete(db, keyobj, lazy_flag, del_flag); + latencyEndMonitor(latency); + latencyAddSampleIfNeeded(latency_name, latency); + if (key_mem_freed) *key_mem_freed -= (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory(); + + notifyKeyspaceEvent(notify_type, notify_name,keyobj, db->id); signalModifiedKey(NULL, db, keyobj); - propagateDeletion(db,keyobj,server.lazyfree_lazy_expire); - server.stat_expiredkeys++; + propagateDeletion(db, keyobj, lazy_flag); + + if (notify_type == NOTIFY_EXPIRED) + server.stat_expiredkeys++; + else + server.stat_evictedkeys++; + + if (static_key) + decrRefCount(keyobj); +} + +/* Delete the specified expired key and propagate. */ +void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { + deleteKeyAndPropagate(db, keyobj, NOTIFY_EXPIRED, NULL); +} + +/* Delete the specified evicted key and propagate. */ +void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed) { + deleteKeyAndPropagate(db, keyobj, NOTIFY_EVICTED, key_mem_freed); } /* Propagate an implicit key deletion into replicas and the AOF file. @@ -2061,9 +2139,9 @@ int keyIsExpired(redisDb *db, robj *key) { * The function returns KEY_EXPIRED if the key is expired BUT not deleted, * or returns KEY_DELETED if the key is expired and deleted. */ keyStatus expireIfNeeded(redisDb *db, robj *key, int flags) { - if ((!keyIsExpired(db,key)) || - (server.lazy_expire_disabled) || - (flags & EXPIRE_ALLOW_ACCESS_EXPIRED)) + if ((server.allow_access_expired) || + (flags & EXPIRE_ALLOW_ACCESS_EXPIRED) || + (!keyIsExpired(db,key))) return KEY_VALID; /* If we are running in the context of a replica, instead of @@ -2094,16 +2172,9 @@ keyStatus expireIfNeeded(redisDb *db, robj *key, int flags) { * will have failed over and the new primary will send us the expire. */ if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return KEY_EXPIRED; - /* The key needs to be converted from static to heap before deleted */ - int static_key = key->refcount == OBJ_STATIC_REFCOUNT; - if (static_key) { - key = createStringObject(key->ptr, sdslen(key->ptr)); - } /* Delete the key */ deleteExpiredKeyAndPropagate(db,key); - if (static_key) { - decrRefCount(key); - } + return KEY_DELETED; } diff --git a/src/evict.c b/src/evict.c index 890a845d5..059e82fe3 100644 --- a/src/evict.c +++ b/src/evict.c @@ -525,8 +525,7 @@ int performEvictions(void) { int keys_freed = 0; size_t mem_reported, mem_tofree; long long mem_freed; /* May be negative */ - mstime_t latency, eviction_latency; - long long delta; + mstime_t latency; int slaves = listLength(server.slaves); int result = EVICT_FAIL; @@ -659,34 +658,18 @@ int performEvictions(void) { /* Finally remove the selected key. */ if (bestkey) { + long long key_mem_freed; db = server.db+bestdbid; - robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); - /* We compute the amount of memory freed by db*Delete() alone. - * It is possible that actually the memory needed to propagate - * the DEL in AOF and replication link is greater than the one - * we are freeing removing the key, but we can't account for - * that otherwise we would never exit the loop. - * - * Same for CSC invalidation messages generated by signalModifiedKey. - * - * AOF and Output buffer memory will be freed eventually so - * we only care about memory used by the key space. */ + enterExecutionUnit(1, 0); - delta = (long long) zmalloc_used_memory(); - latencyStartMonitor(eviction_latency); - dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED); - latencyEndMonitor(eviction_latency); - latencyAddSampleIfNeeded("eviction-del",eviction_latency); - delta -= (long long) zmalloc_used_memory(); - mem_freed += delta; - server.stat_evictedkeys++; - signalModifiedKey(NULL,db,keyobj); - notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", - keyobj, db->id); - propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction); - exitExecutionUnit(); - postExecutionUnitOperations(); + robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); + deleteEvictedKeyAndPropagate(db, keyobj, &key_mem_freed); decrRefCount(keyobj); + exitExecutionUnit(); + /* Propagate the DEL command */ + postExecutionUnitOperations(); + + mem_freed += key_mem_freed; keys_freed++; if (keys_freed % 16 == 0) { diff --git a/src/expire.c b/src/expire.c index 1a49f5384..dee55f178 100644 --- a/src/expire.c +++ b/src/expire.c @@ -36,17 +36,18 @@ static double avg_ttl_factor[16] = {0.98, 0.9604, 0.941192, 0.922368, 0.903921, * to the function to avoid too many gettimeofday() syscalls. */ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { long long t = dictGetSignedIntegerVal(de); - if (now > t) { - enterExecutionUnit(1, 0); - sds key = dictGetKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); - deleteExpiredKeyAndPropagate(db,keyobj); - decrRefCount(keyobj); - exitExecutionUnit(); - return 1; - } else { + if (now < t) return 0; - } + + enterExecutionUnit(1, 0); + sds key = dictGetKey(de); + robj *keyobj = createStringObject(key,sdslen(key)); + deleteExpiredKeyAndPropagate(db,keyobj); + decrRefCount(keyobj); + exitExecutionUnit(); + /* Propagate the DEL command */ + postExecutionUnitOperations(); + return 1; } /* Try to expire a few timed out keys. The algorithm used is adaptive and @@ -113,8 +114,6 @@ void expireScanCallback(void *privdata, const dictEntry *const_de) { long long ttl = dictGetSignedIntegerVal(de) - data->now; if (activeExpireCycleTryExpire(data->db, de, data->now)) { data->expired++; - /* Propagate the DEL command */ - postExecutionUnitOperations(); } if (ttl > 0) { /* We want the average TTL of keys yet not expired. */ @@ -465,20 +464,13 @@ void expireSlaveKeys(void) { if ((dbids & 1) != 0) { redisDb *db = server.db+dbid; dictEntry *expire = dbFindExpires(db, keyname); - - if (expire && - activeExpireCycleTryExpire(server.db+dbid,expire,start)) - { - /* Propagate the DEL (writable replicas do not propagate anything to other replicas, - * but they might propagate to AOF) and trigger module hooks. */ - postExecutionUnitOperations(); - } + int expired = expire && activeExpireCycleTryExpire(server.db+dbid,expire,start); /* If the key was not expired in this DB, we need to set the * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - else if (expire) { + if (expire && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } diff --git a/src/module.c b/src/module.c index b686df13c..5238408c7 100644 --- a/src/module.c +++ b/src/module.c @@ -8849,9 +8849,9 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) * it will not be notified about it. */ int prev_active = sub->active; sub->active = 1; - server.lazy_expire_disabled++; + server.allow_access_expired++; sub->notify_callback(&ctx, type, event, key); - server.lazy_expire_disabled--; + server.allow_access_expired--; sub->active = prev_active; moduleFreeContext(&ctx); } @@ -11890,7 +11890,7 @@ void processModuleLoadingProgressEvent(int is_aof) { /* When a key is deleted (in dbAsyncDelete/dbSyncDelete/setKey), it * will be called to tell the module which key is about to be released. */ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) { - server.lazy_expire_disabled++; + server.allow_access_expired++; int subevent = REDISMODULE_SUBEVENT_KEY_DELETED; if (flags & DB_FLAG_KEY_EXPIRED) { subevent = REDISMODULE_SUBEVENT_KEY_EXPIRED; @@ -11913,7 +11913,7 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) { mt->unlink(key,mv->value); } } - server.lazy_expire_disabled--; + server.allow_access_expired--; } /* Return the free_effort of the module, it will automatically choose to call diff --git a/src/server.c b/src/server.c index f4248740f..72208c7e2 100644 --- a/src/server.c +++ b/src/server.c @@ -2074,7 +2074,7 @@ void initServerConfig(void) { server.bindaddr[j] = zstrdup(default_bindaddr[j]); memset(server.listeners, 0x00, sizeof(server.listeners)); server.active_expire_enabled = 1; - server.lazy_expire_disabled = 0; + server.allow_access_expired = 0; server.skip_checksum_validation = 0; server.loading = 0; server.async_loading = 0; diff --git a/src/server.h b/src/server.h index 0328c9975..b650f2699 100644 --- a/src/server.h +++ b/src/server.h @@ -1744,7 +1744,7 @@ struct redisServer { int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ int active_expire_effort; /* From 1 (default) to 10, active effort. */ - int lazy_expire_disabled; /* If > 0, don't trigger lazy expire */ + int allow_access_expired; /* If > 0, allow access to logically expired keys */ int active_defrag_enabled; int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */ int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */ @@ -3365,6 +3365,7 @@ int setModuleNumericConfig(ModuleConfig *config, long long val, const char **err /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj); +void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed); void propagateDeletion(redisDb *db, robj *key, int lazy); int keyIsExpired(redisDb *db, robj *key); long long getExpire(redisDb *db, robj *key); @@ -3411,7 +3412,18 @@ int dbDelete(redisDb *db, robj *key); robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); robj *dbUnshareStringValueWithDictEntry(redisDb *db, robj *key, robj *o, dictEntry *de); - +#define FLUSH_TYPE_ALL 0 +#define FLUSH_TYPE_DB 1 +#define FLUSH_TYPE_SLOTS 2 +typedef struct SlotRange { + unsigned short first, last; +} SlotRange; +typedef struct SlotsFlush { + int numRanges; + SlotRange ranges[]; +} SlotsFlush; +void replySlotsFlushAndFree(client *c, SlotsFlush *sflush); +int flushCommandCommon(client *c, int type, int flags, SlotsFlush *sflush); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ #define EMPTYDB_NOFUNCTIONS (1<<1) /* Indicate not to flush the functions. */ @@ -3782,6 +3794,7 @@ void migrateCommand(client *c); void askingCommand(client *c); void readonlyCommand(client *c); void readwriteCommand(client *c); +void sflushCommand(client *c); int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr); void dumpCommand(client *c); void objectCommand(client *c); diff --git a/src/sort.c b/src/sort.c index de4bc17ba..01035e218 100644 --- a/src/sort.c +++ b/src/sort.c @@ -241,8 +241,12 @@ void sortCommandGeneric(client *c, int readonly) { } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) { /* If GET is specified with a real pattern, we can't accept it in cluster mode, * unless we can make sure the keys formed by the pattern are in the same slot - * as the key to sort. */ - if (server.cluster_enabled && patternHashSlot(c->argv[j+1]->ptr, sdslen(c->argv[j+1]->ptr)) != getKeySlot(c->argv[1]->ptr)) { + * as the key to sort. The pattern # represents the key itself, so just skip + * pattern slot check. */ + if (server.cluster_enabled && + strcmp(c->argv[j+1]->ptr, "#") && + patternHashSlot(c->argv[j+1]->ptr, sdslen(c->argv[j+1]->ptr)) != getKeySlot(c->argv[1]->ptr)) + { addReplyError(c, "GET option of SORT denied in Cluster mode when " "keys formed by the pattern may be in different slots."); syntax_error++; diff --git a/src/t_hash.c b/src/t_hash.c index dc4ddf53f..1625513eb 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -747,7 +747,7 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs } if ((server.loading) || - (server.lazy_expire_disabled) || + (server.allow_access_expired) || (hfeFlags & HFE_LAZY_AVOID_FIELD_DEL) || (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE))) return GETF_EXPIRED; @@ -1929,7 +1929,7 @@ static int hashTypeExpireIfNeeded(redisDb *db, robj *o) { /* Follow expireIfNeeded() conditions of when not lazy-expire */ if ( (server.loading) || - (server.lazy_expire_disabled) || + (server.allow_access_expired) || (server.masterhost) || /* master-client or user-client, don't delete */ (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE))) return 0; diff --git a/src/util.c b/src/util.c index 115863b22..c0e69af4d 100644 --- a/src/util.c +++ b/src/util.c @@ -56,8 +56,11 @@ /* Glob-style pattern matching. */ static int stringmatchlen_impl(const char *pattern, int patternLen, - const char *string, int stringLen, int nocase, int *skipLongerMatches) + const char *string, int stringLen, int nocase, int *skipLongerMatches, int nesting) { + /* Protection against abusive patterns. */ + if (nesting > 1000) return 0; + while(patternLen && stringLen) { switch(pattern[0]) { case '*': @@ -69,7 +72,7 @@ static int stringmatchlen_impl(const char *pattern, int patternLen, return 1; /* match */ while(stringLen) { if (stringmatchlen_impl(pattern+1, patternLen-1, - string, stringLen, nocase, skipLongerMatches)) + string, stringLen, nocase, skipLongerMatches, nesting+1)) return 1; /* match */ if (*skipLongerMatches) return 0; /* no match */ @@ -191,7 +194,7 @@ static int stringmatchlen_impl(const char *pattern, int patternLen, int stringmatchlen(const char *pattern, int patternLen, const char *string, int stringLen, int nocase) { int skipLongerMatches = 0; - return stringmatchlen_impl(pattern,patternLen,string,stringLen,nocase,&skipLongerMatches); + return stringmatchlen_impl(pattern,patternLen,string,stringLen,nocase,&skipLongerMatches,0); } int stringmatch(const char *pattern, const char *string, int nocase) { diff --git a/tests/integration/redis-cli.tcl b/tests/integration/redis-cli.tcl index 7d7b24f88..590c5711f 100644 --- a/tests/integration/redis-cli.tcl +++ b/tests/integration/redis-cli.tcl @@ -6,6 +6,9 @@ if {$::singledb} { set ::dbnum 9 } +file delete ./.rediscli_history_test +set ::env(REDISCLI_HISTFILE) ".rediscli_history_test" + start_server {tags {"cli"}} { proc open_cli {{opts ""} {infile ""}} { if { $opts == "" } { @@ -68,10 +71,8 @@ start_server {tags {"cli"}} { set _ [format_output [read_cli $fd]] } - file delete ./.rediscli_history_test proc test_interactive_cli_with_prompt {name code} { set ::env(FAKETTY_WITH_PROMPT) 1 - set ::env(REDISCLI_HISTFILE) ".rediscli_history_test" test_interactive_cli $name $code unset ::env(FAKETTY_WITH_PROMPT) } @@ -839,3 +840,4 @@ start_server {tags {"cli external:skip"}} { } } +file delete ./.rediscli_history_test diff --git a/tests/support/server.tcl b/tests/support/server.tcl index 43b74d061..0db72cbfe 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -241,6 +241,11 @@ proc tags_acceptable {tags err_return} { return 0 } + if { [lsearch $tags "experimental"] >=0 && [lsearch $::allowtags "experimental"] == -1 } { + set err "experimental test not allowed" + return 0 + } + return 1 } diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 93bd62e2e..a5e5859a4 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -506,7 +506,7 @@ proc the_end {} { } } -# The client is not even driven (the test server is instead) as we just need +# The client is not event driven (the test server is instead) as we just need # to read the command, execute, reply... all this in a loop. proc test_client_main server_port { set ::test_server_fd [socket localhost $server_port] diff --git a/tests/unit/cluster/multi-slot-operations.tcl b/tests/unit/cluster/multi-slot-operations.tcl index cc7bb7ae0..5d2d03e85 100644 --- a/tests/unit/cluster/multi-slot-operations.tcl +++ b/tests/unit/cluster/multi-slot-operations.tcl @@ -107,3 +107,76 @@ test "DELSLOTSRANGE command with several boundary conditions test suite" { assert_match "*9829 11000*12001 12100*12201 13104*" [$master4 CLUSTER SLOTS] } } cluster_allocate_with_continuous_slots_local + +start_cluster 2 0 {tags {external:skip cluster experimental}} { + +set master1 [srv 0 "client"] +set master2 [srv -1 "client"] + +test "SFLUSH - Errors and output validation" { + assert_match "* 0-8191*" [$master1 CLUSTER NODES] + assert_match "* 8192-16383*" [$master2 CLUSTER NODES] + assert_match "*0 8191*" [$master1 CLUSTER SLOTS] + assert_match "*8192 16383*" [$master2 CLUSTER SLOTS] + + # make master1 non-continuous slots + $master1 cluster DELSLOTSRANGE 1000 2000 + + # Test SFLUSH errors validation + assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4} + assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4 SYNC} + assert_error {ERR Invalid or out of range slot} {$master1 SFLUSH x 4} + assert_error {ERR Invalid or out of range slot} {$master1 SFLUSH 0 12x} + assert_error {ERR Slot 3 specified multiple times} {$master1 SFLUSH 2 4 3 5} + assert_error {ERR start slot number 8 is greater than*} {$master1 SFLUSH 8 4} + assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4 8 10} + assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 0 999 2001 8191 ASYNCX} + + # Test SFLUSH output validation + assert_match "" [$master1 SFLUSH 2 4] + assert_match "" [$master1 SFLUSH 0 4] + assert_match "" [$master2 SFLUSH 0 4] + assert_match "" [$master1 SFLUSH 1 8191] + assert_match "" [$master1 SFLUSH 0 8190] + assert_match "" [$master1 SFLUSH 0 998 2001 8191] + assert_match "" [$master1 SFLUSH 1 999 2001 8191] + assert_match "" [$master1 SFLUSH 0 999 2001 8190] + assert_match "" [$master1 SFLUSH 0 999 2002 8191] + assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 999 2001 8191] + assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 8191] + assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 4000 4001 8191] + assert_match "" [$master2 SFLUSH 8193 16383] + assert_match "" [$master2 SFLUSH 8192 16382] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 SYNC] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 ASYNC] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383 SYNC] + assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383 ASYNC] + + # restore master1 continuous slots + $master1 cluster ADDSLOTSRANGE 1000 2000 +} + +test "SFLUSH - Deletes the keys with argument /SYNC/ASYNC" { + foreach op {"" "SYNC" "ASYNC"} { + for {set i 0} {$i < 100} {incr i} { + catch {$master1 SET key$i val$i} + catch {$master2 SET key$i val$i} + } + + assert {[$master1 DBSIZE] > 0} + assert {[$master2 DBSIZE] > 0} + if {$op eq ""} { + assert_match "{0 8191}" [ $master1 SFLUSH 0 8191] + } else { + assert_match "{0 8191}" [ $master1 SFLUSH 0 8191 $op] + } + assert {[$master1 DBSIZE] == 0} + assert {[$master2 DBSIZE] > 0} + assert_match "{8192 16383}" [ $master2 SFLUSH 8192 16383] + assert {[$master2 DBSIZE] == 0} + } +} + +} diff --git a/tests/unit/keyspace.tcl b/tests/unit/keyspace.tcl index d11cf8365..d7c14e429 100644 --- a/tests/unit/keyspace.tcl +++ b/tests/unit/keyspace.tcl @@ -509,6 +509,12 @@ foreach {type large} [array get largevalue] { r KEYS "a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*b" } {} + test {Regression for pattern matching very long nested loops} { + r flushdb + r SET [string repeat "a" 50000] 1 + r KEYS [string repeat "*?" 50000] + } {} + test {Coverage: basic SWAPDB test and unhappy path} { r flushall r select 0 diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index edc03f4aa..023a26c6b 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -691,6 +691,12 @@ start_server {tags {"scripting"}} { set e } {ERR *Attempt to modify a readonly table*} + test {lua bit.tohex bug} { + set res [run_script {return bit.tohex(65535, -2147483648)} 0] + r ping + set res + } {0000FFFF} + test {Test an example script DECR_IF_GT} { set decr_if_gt { local current diff --git a/tests/unit/sort.tcl b/tests/unit/sort.tcl index a46f77cf9..35ec1606e 100644 --- a/tests/unit/sort.tcl +++ b/tests/unit/sort.tcl @@ -73,7 +73,7 @@ start_server { set result [create_random_dataset 16 lpush] test "SORT GET #" { assert_equal [lsort -integer $result] [r sort tosort GET #] - } {} {cluster:skip} + } foreach command {SORT SORT_RO} { test "$command GET " { @@ -393,6 +393,11 @@ start_cluster 1 0 {tags {"external:skip cluster sort"}} { r sort "{a}mylist" by "{a}by*" get "{a}get*" } {30 200 100} + test "sort get # in cluster mode" { + assert_equal [r sort "{a}mylist" by "{a}by*" get # ] {3 1 2} + r sort "{a}mylist" by "{a}by*" get "{a}get*" get # + } {30 3 200 1 100 2} + test "sort_ro by in cluster mode" { catch {r sort_ro "{a}mylist" by by*} e assert_match {ERR BY option of SORT denied in Cluster mode when *} $e @@ -404,4 +409,9 @@ start_cluster 1 0 {tags {"external:skip cluster sort"}} { assert_match {ERR GET option of SORT denied in Cluster mode when *} $e r sort_ro "{a}mylist" by "{a}by*" get "{a}get*" } {30 200 100} + + test "sort_ro get # in cluster mode" { + assert_equal [r sort_ro "{a}mylist" by "{a}by*" get # ] {3 1 2} + r sort_ro "{a}mylist" by "{a}by*" get "{a}get*" get # + } {30 3 200 1 100 2} } diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 8df8f9928..3a5c77b7e 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -1454,6 +1454,8 @@ start_server { assert_equal [dict get $group entries-read] 3 assert_equal [dict get $group lag] 0 + wait_for_ofs_sync $master $replica + set reply [$replica XINFO STREAM mystream FULL] set group [lindex [dict get $reply groups] 0] assert_equal [dict get $group entries-read] 3 diff --git a/utils/generate-command-code.py b/utils/generate-command-code.py index 2d7cc5b0d..76c8c3b15 100755 --- a/utils/generate-command-code.py +++ b/utils/generate-command-code.py @@ -517,6 +517,11 @@ class Subcommand(Command): def create_command(name, desc): + flags = desc.get("command_flags") + if flags and "EXPERIMENTAL" in flags: + print("Command %s is experimental, skipping..." % name) + return + if desc.get("container"): cmd = Subcommand(name.upper(), desc) subcommands.setdefault(desc["container"].upper(), {})[name] = cmd