mirror of https://mirror.osredm.com/root/redis.git
Merge unstable into 8.0 (#13622)
This commit is contained in:
commit
b08fd9f989
|
@ -21,4 +21,15 @@ jobs:
|
|||
GH_TOKEN: ${{ steps.generate-token.outputs.token }}
|
||||
RELEASE_NAME: ${{ github.event.release.tag_name }}
|
||||
run: |
|
||||
LATEST_RELEASE=$(
|
||||
curl -Ls \
|
||||
-H "Accept: application/vnd.github+json" \
|
||||
-H "Authorization: Bearer ${GH_TOKEN}" \
|
||||
-H "X-GitHub-Api-Version: 2022-11-28" \
|
||||
https://api.github.com/repos/redis/redis/releases/latest \
|
||||
| jq -r '.tag_name'
|
||||
)
|
||||
|
||||
if [[ "${LATEST_RELEASE}" == "${RELEASE_NAME}" ]]; then
|
||||
gh workflow run -R redis/docs redis_docs_sync.yaml -f release="${RELEASE_NAME}"
|
||||
fi
|
||||
|
|
|
@ -132,6 +132,7 @@ static int bit_tohex(lua_State *L)
|
|||
const char *hexdigits = "0123456789abcdef";
|
||||
char buf[8];
|
||||
int i;
|
||||
if (n == INT32_MIN) n = INT32_MIN+1;
|
||||
if (n < 0) { n = -n; hexdigits = "0123456789ABCDEF"; }
|
||||
if (n > 8) n = 8;
|
||||
for (i = (int)n; --i >= 0; ) { buf[i] = hexdigits[b & 15]; b >>= 4; }
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
SRC_DIR = src
|
||||
MODULE_VERSION = v2.8.2
|
||||
MODULE_VERSION = v7.99.1
|
||||
MODULE_REPO = https://github.com/redisbloom/redisbloom
|
||||
TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/redisbloom.so
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
SRC_DIR = src
|
||||
MODULE_VERSION = v7.99.0
|
||||
MODULE_VERSION = v7.99.1
|
||||
MODULE_REPO = https://github.com/redisearch/redisearch
|
||||
TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/coord-oss/redisearch.so
|
||||
TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/search-community/redisearch.so
|
||||
|
||||
include ../common.mk
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
SRC_DIR = src
|
||||
MODULE_VERSION = v2.8.4
|
||||
MODULE_VERSION = v7.99.1
|
||||
MODULE_REPO = https://github.com/redisjson/redisjson
|
||||
TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/rejson.so
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
SRC_DIR = src
|
||||
MODULE_VERSION = v1.12.2
|
||||
MODULE_VERSION = v7.99.1
|
||||
MODULE_REPO = https://github.com/redistimeseries/redistimeseries
|
||||
TARGET_MODULE = $(SRC_DIR)/bin/$(FULL_VARIANT)/redistimeseries.so
|
||||
|
||||
|
|
|
@ -1066,7 +1066,7 @@ int ACLSetSelector(aclSelector *selector, const char* op, size_t oplen) {
|
|||
flags |= ACL_READ_PERMISSION;
|
||||
} else if (toupper(op[offset]) == 'W' && !(flags & ACL_WRITE_PERMISSION)) {
|
||||
flags |= ACL_WRITE_PERMISSION;
|
||||
} else if (op[offset] == '~') {
|
||||
} else if (op[offset] == '~' && flags) {
|
||||
offset++;
|
||||
break;
|
||||
} else {
|
||||
|
|
|
@ -81,6 +81,7 @@ static int job_comp_pipe[2]; /* Pipe used to awake the event loop */
|
|||
typedef struct bio_comp_item {
|
||||
comp_fn *func; /* callback after completion job will be processed */
|
||||
uint64_t arg; /* user data to be passed to the function */
|
||||
void *ptr; /* user pointer to be passed to the function */
|
||||
} bio_comp_item;
|
||||
|
||||
/* This structure represents a background Job. It is only used locally to this
|
||||
|
@ -110,6 +111,7 @@ typedef union bio_job {
|
|||
int type; /* header */
|
||||
comp_fn *fn; /* callback. Handover to main thread to cb as notify for job completion */
|
||||
uint64_t arg; /* callback arguments */
|
||||
void *ptr; /* callback pointer */
|
||||
} comp_rq;
|
||||
} bio_job;
|
||||
|
||||
|
@ -200,7 +202,7 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
|
|||
bioSubmitJob(BIO_LAZY_FREE, job);
|
||||
}
|
||||
|
||||
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data) {
|
||||
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data, void *user_ptr) {
|
||||
int type;
|
||||
switch (assigned_worker) {
|
||||
case BIO_WORKER_CLOSE_FILE:
|
||||
|
@ -219,6 +221,7 @@ void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_
|
|||
bio_job *job = zmalloc(sizeof(*job));
|
||||
job->comp_rq.fn = func;
|
||||
job->comp_rq.arg = user_data;
|
||||
job->comp_rq.ptr = user_ptr;
|
||||
bioSubmitJob(type, job);
|
||||
}
|
||||
|
||||
|
@ -339,6 +342,7 @@ void *bioProcessBackgroundJobs(void *arg) {
|
|||
bio_comp_item *comp_rsp = zmalloc(sizeof(bio_comp_item));
|
||||
comp_rsp->func = job->comp_rq.fn;
|
||||
comp_rsp->arg = job->comp_rq.arg;
|
||||
comp_rsp->ptr = job->comp_rq.ptr;
|
||||
|
||||
/* just write it to completion job responses */
|
||||
pthread_mutex_lock(&bio_mutex_comp);
|
||||
|
@ -432,7 +436,7 @@ void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||
listNode *ln = listFirst(tmp_list);
|
||||
bio_comp_item *rsp = ln->value;
|
||||
listDelNode(tmp_list, ln);
|
||||
rsp->func(rsp->arg);
|
||||
rsp->func(rsp->arg, rsp->ptr);
|
||||
zfree(rsp);
|
||||
}
|
||||
listRelease(tmp_list);
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
#define __BIO_H
|
||||
|
||||
typedef void lazy_free_fn(void *args[]);
|
||||
typedef void comp_fn(uint64_t user_data);
|
||||
typedef void comp_fn(uint64_t user_data, void *user_ptr);
|
||||
|
||||
typedef enum bio_worker_t {
|
||||
BIO_WORKER_CLOSE_FILE = 0,
|
||||
|
@ -40,7 +40,7 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache);
|
|||
void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
|
||||
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
|
||||
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
|
||||
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data);
|
||||
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data, void *user_ptr);
|
||||
|
||||
|
||||
#endif
|
||||
|
|
39
src/bitops.c
39
src/bitops.c
|
@ -16,18 +16,49 @@
|
|||
/* Count number of bits set in the binary array pointed by 's' and long
|
||||
* 'count' bytes. The implementation of this function is required to
|
||||
* work with an input string length up to 512 MB or more (server.proto_max_bulk_len) */
|
||||
ATTRIBUTE_TARGET_POPCNT
|
||||
long long redisPopcount(void *s, long count) {
|
||||
long long bits = 0;
|
||||
unsigned char *p = s;
|
||||
uint32_t *p4;
|
||||
#if defined(HAVE_POPCNT)
|
||||
int use_popcnt = __builtin_cpu_supports("popcnt"); /* Check if CPU supports POPCNT instruction. */
|
||||
#else
|
||||
int use_popcnt = 0; /* Assume CPU does not support POPCNT if
|
||||
* __builtin_cpu_supports() is not available. */
|
||||
#endif
|
||||
static const unsigned char bitsinbyte[256] = {0,1,1,2,1,2,2,3,1,2,2,3,2,3,3,4,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,1,2,2,3,2,3,3,4,2,3,3,4,3,4,4,5,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,2,3,3,4,3,4,4,5,3,4,4,5,4,5,5,6,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,3,4,4,5,4,5,5,6,4,5,5,6,5,6,6,7,4,5,5,6,5,6,6,7,5,6,6,7,6,7,7,8};
|
||||
|
||||
/* Count initial bytes not aligned to 32 bit. */
|
||||
while((unsigned long)p & 3 && count) {
|
||||
/* Count initial bytes not aligned to 64-bit when using the POPCNT instruction,
|
||||
* otherwise align to 32-bit. */
|
||||
int align = use_popcnt ? 7 : 3;
|
||||
while ((unsigned long)p & align && count) {
|
||||
bits += bitsinbyte[*p++];
|
||||
count--;
|
||||
}
|
||||
|
||||
if (likely(use_popcnt)) {
|
||||
/* Use separate counters to make the CPU think there are no
|
||||
* dependencies between these popcnt operations. */
|
||||
uint64_t cnt[4];
|
||||
memset(cnt, 0, sizeof(cnt));
|
||||
|
||||
/* Count bits 32 bytes at a time by using popcnt.
|
||||
* Unroll the loop to avoid the overhead of a single popcnt per iteration,
|
||||
* allowing the CPU to extract more instruction-level parallelism.
|
||||
* Reference: https://danluu.com/assembly-intrinsics/ */
|
||||
while (count >= 32) {
|
||||
cnt[0] += __builtin_popcountll(*(uint64_t*)(p));
|
||||
cnt[1] += __builtin_popcountll(*(uint64_t*)(p + 8));
|
||||
cnt[2] += __builtin_popcountll(*(uint64_t*)(p + 16));
|
||||
cnt[3] += __builtin_popcountll(*(uint64_t*)(p + 24));
|
||||
count -= 32;
|
||||
p += 32;
|
||||
}
|
||||
bits += cnt[0] + cnt[1] + cnt[2] + cnt[3];
|
||||
goto remain;
|
||||
}
|
||||
|
||||
/* Count bits 28 bytes at a time */
|
||||
p4 = (uint32_t*)p;
|
||||
while(count>=28) {
|
||||
|
@ -64,8 +95,10 @@ long long redisPopcount(void *s, long count) {
|
|||
((aux6 + (aux6 >> 4)) & 0x0F0F0F0F) +
|
||||
((aux7 + (aux7 >> 4)) & 0x0F0F0F0F))* 0x01010101) >> 24;
|
||||
}
|
||||
/* Count the remaining bytes. */
|
||||
p = (unsigned char*)p4;
|
||||
|
||||
remain:
|
||||
/* Count the remaining bytes. */
|
||||
while(count--) bits += bitsinbyte[*p++];
|
||||
return bits;
|
||||
}
|
||||
|
|
120
src/cluster.c
120
src/cluster.c
|
@ -1558,6 +1558,126 @@ void readonlyCommand(client *c) {
|
|||
addReply(c,shared.ok);
|
||||
}
|
||||
|
||||
void replySlotsFlushAndFree(client *c, SlotsFlush *sflush) {
|
||||
addReplyArrayLen(c, sflush->numRanges);
|
||||
for (int i = 0 ; i < sflush->numRanges ; i++) {
|
||||
addReplyArrayLen(c, 2);
|
||||
addReplyLongLong(c, sflush->ranges[i].first);
|
||||
addReplyLongLong(c, sflush->ranges[i].last);
|
||||
}
|
||||
zfree(sflush);
|
||||
}
|
||||
|
||||
/* Partially flush destination DB in a cluster node, based on the slot range.
|
||||
*
|
||||
* Usage: SFLUSH <start-slot> <end slot> [<start-slot> <end slot>]* [SYNC|ASYNC]
|
||||
*
|
||||
* This is an initial implementation of SFLUSH (slots flush) which is limited to
|
||||
* flushing a single shard as a whole, but in the future the same command may be
|
||||
* used to partially flush a shard based on hash slots. Currently only if provided
|
||||
* slots cover entirely the slots of a node, the node will be flushed and the
|
||||
* return value will be pairs of slot ranges. Otherwise, a single empty set will
|
||||
* be returned. If possible, SFLUSH SYNC will be run as blocking ASYNC as an
|
||||
* optimization.
|
||||
*/
|
||||
void sflushCommand(client *c) {
|
||||
int flags = EMPTYDB_NO_FLAGS, argc = c->argc;
|
||||
|
||||
if (server.cluster_enabled == 0) {
|
||||
addReplyError(c,"This instance has cluster support disabled");
|
||||
return;
|
||||
}
|
||||
|
||||
/* check if last argument is SYNC or ASYNC */
|
||||
if (!strcasecmp(c->argv[c->argc-1]->ptr,"sync")) {
|
||||
flags = EMPTYDB_NO_FLAGS;
|
||||
argc--;
|
||||
} else if (!strcasecmp(c->argv[c->argc-1]->ptr,"async")) {
|
||||
flags = EMPTYDB_ASYNC;
|
||||
argc--;
|
||||
} else if (server.lazyfree_lazy_user_flush) {
|
||||
flags = EMPTYDB_ASYNC;
|
||||
}
|
||||
|
||||
/* parse the slot range */
|
||||
if (argc % 2 == 0) {
|
||||
addReplyErrorArity(c);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Verify <first, last> slot pairs are valid and not overlapping */
|
||||
long long j, first, last;
|
||||
unsigned char slotsToFlushRq[CLUSTER_SLOTS] = {0};
|
||||
for (j = 1; j < argc; j += 2) {
|
||||
/* check if the first slot is valid */
|
||||
if (getLongLongFromObject(c->argv[j], &first) != C_OK || first < 0 || first >= CLUSTER_SLOTS) {
|
||||
addReplyError(c,"Invalid or out of range slot");
|
||||
return;
|
||||
}
|
||||
|
||||
/* check if the last slot is valid */
|
||||
if (getLongLongFromObject(c->argv[j+1], &last) != C_OK || last < 0 || last >= CLUSTER_SLOTS) {
|
||||
addReplyError(c,"Invalid or out of range slot");
|
||||
return;
|
||||
}
|
||||
|
||||
if (first > last) {
|
||||
addReplyErrorFormat(c,"start slot number %lld is greater than end slot number %lld", first, last);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Mark the slots in slotsToFlushRq[] */
|
||||
for (int i = first; i <= last; i++) {
|
||||
if (slotsToFlushRq[i]) {
|
||||
addReplyErrorFormat(c, "Slot %d specified multiple times", i);
|
||||
return;
|
||||
}
|
||||
slotsToFlushRq[i] = 1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Verify slotsToFlushRq[] covers ALL slots of myNode. */
|
||||
clusterNode *myNode = getMyClusterNode();
|
||||
/* During iteration trace also the slot range pairs and save in SlotsFlush.
|
||||
* It is allocated on heap since there is a chance that FLUSH SYNC will be
|
||||
* running as blocking ASYNC and only later reply with slot ranges */
|
||||
int capacity = 32; /* Initial capacity */
|
||||
SlotsFlush *sflush = zmalloc(sizeof(SlotsFlush) + sizeof(SlotRange) * capacity);
|
||||
sflush->numRanges = 0;
|
||||
int inSlotRange = 0;
|
||||
for (int i = 0; i < CLUSTER_SLOTS; i++) {
|
||||
if (myNode == getNodeBySlot(i)) {
|
||||
if (!slotsToFlushRq[i]) {
|
||||
addReplySetLen(c, 0); /* Not all slots of mynode got covered. See sflushCommand() comment. */
|
||||
zfree(sflush);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!inSlotRange) { /* If start another slot range */
|
||||
sflush->ranges[sflush->numRanges].first = i;
|
||||
inSlotRange = 1;
|
||||
}
|
||||
} else {
|
||||
if (inSlotRange) { /* If end another slot range */
|
||||
sflush->ranges[sflush->numRanges++].last = i - 1;
|
||||
inSlotRange = 0;
|
||||
/* If reached 'sflush' capacity, double the capacity */
|
||||
if (sflush->numRanges >= capacity) {
|
||||
capacity *= 2;
|
||||
sflush = zrealloc(sflush, sizeof(SlotsFlush) + sizeof(SlotRange) * capacity);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Update last pair if last cluster slot is also end of last range */
|
||||
if (inSlotRange) sflush->ranges[sflush->numRanges++].last = CLUSTER_SLOTS - 1;
|
||||
|
||||
/* Flush selected slots. If not flush as blocking async, then reply immediately */
|
||||
if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, sflush) == 0)
|
||||
replySlotsFlushAndFree(c, sflush);
|
||||
}
|
||||
|
||||
/* The READWRITE command just clears the READONLY command state. */
|
||||
void readwriteCommand(client *c) {
|
||||
if (server.cluster_enabled == 0) {
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
{
|
||||
"SFLUSH": {
|
||||
"summary": "Remove all keys from selected range of slots.",
|
||||
"complexity": "O(N)+O(k) where N is the number of keys and k is the number of slots.",
|
||||
"group": "server",
|
||||
"since": "8.0.0",
|
||||
"arity": -3,
|
||||
"function": "sflushCommand",
|
||||
"command_flags": [
|
||||
"WRITE",
|
||||
"EXPERIMENTAL"
|
||||
],
|
||||
"acl_categories": [
|
||||
"KEYSPACE",
|
||||
"DANGEROUS"
|
||||
],
|
||||
"command_tips": [
|
||||
],
|
||||
"reply_schema": {
|
||||
"description": "List of slot ranges",
|
||||
"type": "array",
|
||||
"minItems": 0,
|
||||
"maxItems": 4294967295,
|
||||
"items": {
|
||||
"type": "array",
|
||||
"minItems": 2,
|
||||
"maxItems": 2,
|
||||
"items": [
|
||||
{
|
||||
"description": "start slot number",
|
||||
"type": "integer"
|
||||
},
|
||||
{
|
||||
"description": "end slot number",
|
||||
"type": "integer"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"arguments": [
|
||||
{
|
||||
"name": "data",
|
||||
"type": "block",
|
||||
"multiple": true,
|
||||
"arguments": [
|
||||
{
|
||||
"name": "slot-start",
|
||||
"type": "integer"
|
||||
},
|
||||
{
|
||||
"name": "slot-last",
|
||||
"type": "integer"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "flush-type",
|
||||
"type": "oneof",
|
||||
"optional": true,
|
||||
"arguments": [
|
||||
{
|
||||
"name": "async",
|
||||
"type": "pure-token",
|
||||
"token": "ASYNC"
|
||||
},
|
||||
{
|
||||
"name": "sync",
|
||||
"type": "pure-token",
|
||||
"token": "SYNC"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
11
src/config.h
11
src/config.h
|
@ -307,4 +307,15 @@ void setcpuaffinity(const char *cpulist);
|
|||
#define HAVE_FADVISE
|
||||
#endif
|
||||
|
||||
#if defined(__x86_64__) && ((defined(__GNUC__) && __GNUC__ > 5) || (defined(__clang__)))
|
||||
#if defined(__has_attribute) && __has_attribute(target)
|
||||
#define HAVE_POPCNT
|
||||
#define ATTRIBUTE_TARGET_POPCNT __attribute__((target("popcnt")))
|
||||
#else
|
||||
#define ATTRIBUTE_TARGET_POPCNT
|
||||
#endif
|
||||
#else
|
||||
#define ATTRIBUTE_TARGET_POPCNT
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
|
145
src/db.c
145
src/db.c
|
@ -375,7 +375,6 @@ robj *dbRandomKey(redisDb *db) {
|
|||
|
||||
key = dictGetKey(de);
|
||||
keyobj = createStringObject(key,sdslen(key));
|
||||
if (dbFindExpires(db, key)) {
|
||||
if (allvolatile && server.masterhost && --maxtries == 0) {
|
||||
/* If the DB is composed only of keys with an expire set,
|
||||
* it could happen that all the keys are already logically
|
||||
|
@ -391,7 +390,7 @@ robj *dbRandomKey(redisDb *db) {
|
|||
decrRefCount(keyobj);
|
||||
continue; /* search for another key. This expired. */
|
||||
}
|
||||
}
|
||||
|
||||
return keyobj;
|
||||
}
|
||||
}
|
||||
|
@ -730,9 +729,12 @@ void flushAllDataAndResetRDB(int flags) {
|
|||
#endif
|
||||
}
|
||||
|
||||
/* Optimized FLUSHALL\FLUSHDB SYNC command finished to run by lazyfree thread */
|
||||
void flushallSyncBgDone(uint64_t client_id) {
|
||||
|
||||
/* CB function on blocking ASYNC FLUSH completion
|
||||
*
|
||||
* Utilized by commands SFLUSH, FLUSHALL and FLUSHDB.
|
||||
*/
|
||||
void flushallSyncBgDone(uint64_t client_id, void *sflush) {
|
||||
SlotsFlush *slotsFlush = sflush;
|
||||
client *c = lookupClientByID(client_id);
|
||||
|
||||
/* Verify that client still exists */
|
||||
|
@ -745,7 +747,10 @@ void flushallSyncBgDone(uint64_t client_id) {
|
|||
/* Don't update blocked_us since command was processed in bg by lazy_free thread */
|
||||
updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0);
|
||||
|
||||
/* lazyfree bg job always succeed */
|
||||
/* Only SFLUSH command pass pointer to `SlotsFlush` */
|
||||
if (slotsFlush)
|
||||
replySlotsFlushAndFree(c, slotsFlush);
|
||||
else
|
||||
addReply(c, shared.ok);
|
||||
|
||||
/* mark client as unblocked */
|
||||
|
@ -761,10 +766,17 @@ void flushallSyncBgDone(uint64_t client_id) {
|
|||
server.current_client = old_client;
|
||||
}
|
||||
|
||||
void flushCommandCommon(client *c, int isFlushAll) {
|
||||
int blocking_async = 0; /* FLUSHALL\FLUSHDB SYNC opt to run as blocking ASYNC */
|
||||
int flags;
|
||||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
/* Common flush command implementation for FLUSHALL and FLUSHDB.
|
||||
*
|
||||
* Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC
|
||||
* Return 0 otherwise
|
||||
*
|
||||
* sflush - provided only by SFLUSH command, otherwise NULL. Will be used on
|
||||
* completion to reply with the slots flush result. Ownership is passed
|
||||
* to the completion job in case of `blocking_async`.
|
||||
*/
|
||||
int flushCommandCommon(client *c, int type, int flags, SlotsFlush *sflush) {
|
||||
int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */
|
||||
|
||||
/* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */
|
||||
if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) {
|
||||
|
@ -773,7 +785,7 @@ void flushCommandCommon(client *c, int isFlushAll) {
|
|||
blocking_async = 1;
|
||||
}
|
||||
|
||||
if (isFlushAll)
|
||||
if (type == FLUSH_TYPE_ALL)
|
||||
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
|
||||
else
|
||||
server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
|
||||
|
@ -791,10 +803,9 @@ void flushCommandCommon(client *c, int isFlushAll) {
|
|||
|
||||
c->bstate.timeout = 0;
|
||||
blockClient(c,BLOCKED_LAZYFREE);
|
||||
bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id);
|
||||
} else {
|
||||
addReply(c, shared.ok);
|
||||
bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id, sflush);
|
||||
}
|
||||
|
||||
#if defined(USE_JEMALLOC)
|
||||
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
|
||||
* for large databases, flushdb blocks for long anyway, so a bit more won't
|
||||
|
@ -802,7 +813,7 @@ void flushCommandCommon(client *c, int isFlushAll) {
|
|||
*
|
||||
* Take care purge only FLUSHDB for sync flow. FLUSHALL sync flow already
|
||||
* applied at flushAllDataAndResetRDB. Async flow will apply only later on */
|
||||
if ((!isFlushAll) && (!(flags & EMPTYDB_ASYNC))) {
|
||||
if ((type != FLUSH_TYPE_ALL) && (!(flags & EMPTYDB_ASYNC))) {
|
||||
/* Only clear the current thread cache.
|
||||
* Ignore the return call since this will fail if the tcache is disabled. */
|
||||
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
|
||||
|
@ -810,20 +821,32 @@ void flushCommandCommon(client *c, int isFlushAll) {
|
|||
jemalloc_purge();
|
||||
}
|
||||
#endif
|
||||
return blocking_async;
|
||||
}
|
||||
|
||||
/* FLUSHALL [SYNC|ASYNC]
|
||||
*
|
||||
* Flushes the whole server data set. */
|
||||
void flushallCommand(client *c) {
|
||||
flushCommandCommon(c, 1);
|
||||
int flags;
|
||||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
|
||||
/* If FLUSH SYNC isn't running as blocking async, then reply */
|
||||
if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0)
|
||||
addReply(c, shared.ok);
|
||||
}
|
||||
|
||||
/* FLUSHDB [SYNC|ASYNC]
|
||||
*
|
||||
* Flushes the currently SELECTed Redis DB. */
|
||||
void flushdbCommand(client *c) {
|
||||
flushCommandCommon(c, 0);
|
||||
int flags;
|
||||
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
|
||||
|
||||
/* If FLUSH SYNC isn't running as blocking async, then reply */
|
||||
if (flushCommandCommon(c, FLUSH_TYPE_DB,flags, NULL) == 0)
|
||||
addReply(c, shared.ok);
|
||||
|
||||
}
|
||||
|
||||
/* This command implements DEL and UNLINK. */
|
||||
|
@ -1961,17 +1984,72 @@ long long getExpire(redisDb *db, robj *key) {
|
|||
return dictGetSignedIntegerVal(de);
|
||||
}
|
||||
|
||||
/* Delete the specified expired key and propagate expire. */
|
||||
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
|
||||
mstime_t expire_latency;
|
||||
latencyStartMonitor(expire_latency);
|
||||
dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
|
||||
latencyEndMonitor(expire_latency);
|
||||
latencyAddSampleIfNeeded("expire-del",expire_latency);
|
||||
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
|
||||
|
||||
/* Delete the specified expired or evicted key and propagate to replicas.
|
||||
* Currently notify_type can only be NOTIFY_EXPIRED or NOTIFY_EVICTED,
|
||||
* and it affects other aspects like the latency monitor event name and,
|
||||
* which config to look for lazy free, stats var to increment, and so on.
|
||||
*
|
||||
* key_mem_freed is an out parameter which contains the estimated
|
||||
* amount of memory freed due to the trimming (may be NULL) */
|
||||
static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, long long *key_mem_freed) {
|
||||
mstime_t latency;
|
||||
int del_flag = notify_type == NOTIFY_EXPIRED ? DB_FLAG_KEY_EXPIRED : DB_FLAG_KEY_EVICTED;
|
||||
int lazy_flag = notify_type == NOTIFY_EXPIRED ? server.lazyfree_lazy_expire : server.lazyfree_lazy_eviction;
|
||||
char *latency_name = notify_type == NOTIFY_EXPIRED ? "expire-del" : "evict-del";
|
||||
char *notify_name = notify_type == NOTIFY_EXPIRED ? "expired" : "evicted";
|
||||
|
||||
/* The key needs to be converted from static to heap before deleted */
|
||||
int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT;
|
||||
if (static_key) {
|
||||
keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr));
|
||||
}
|
||||
|
||||
serverLog(LL_DEBUG,"key %s %s: deleting it", (char*)keyobj->ptr, notify_type == NOTIFY_EXPIRED ? "expired" : "evicted");
|
||||
|
||||
/* We compute the amount of memory freed by db*Delete() alone.
|
||||
* It is possible that actually the memory needed to propagate
|
||||
* the DEL in AOF and replication link is greater than the one
|
||||
* we are freeing removing the key, but we can't account for
|
||||
* that otherwise we would never exit the loop.
|
||||
*
|
||||
* Same for CSC invalidation messages generated by signalModifiedKey.
|
||||
*
|
||||
* AOF and Output buffer memory will be freed eventually so
|
||||
* we only care about memory used by the key space.
|
||||
*
|
||||
* The code here used to first propagate and then record delta
|
||||
* using only zmalloc_used_memory but in CRDT we can't do that
|
||||
* so we use freeMemoryGetNotCountedMemory to avoid counting
|
||||
* AOF and slave buffers */
|
||||
if (key_mem_freed) *key_mem_freed = (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory();
|
||||
latencyStartMonitor(latency);
|
||||
dbGenericDelete(db, keyobj, lazy_flag, del_flag);
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded(latency_name, latency);
|
||||
if (key_mem_freed) *key_mem_freed -= (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory();
|
||||
|
||||
notifyKeyspaceEvent(notify_type, notify_name,keyobj, db->id);
|
||||
signalModifiedKey(NULL, db, keyobj);
|
||||
propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
|
||||
propagateDeletion(db, keyobj, lazy_flag);
|
||||
|
||||
if (notify_type == NOTIFY_EXPIRED)
|
||||
server.stat_expiredkeys++;
|
||||
else
|
||||
server.stat_evictedkeys++;
|
||||
|
||||
if (static_key)
|
||||
decrRefCount(keyobj);
|
||||
}
|
||||
|
||||
/* Delete the specified expired key and propagate. */
|
||||
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
|
||||
deleteKeyAndPropagate(db, keyobj, NOTIFY_EXPIRED, NULL);
|
||||
}
|
||||
|
||||
/* Delete the specified evicted key and propagate. */
|
||||
void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed) {
|
||||
deleteKeyAndPropagate(db, keyobj, NOTIFY_EVICTED, key_mem_freed);
|
||||
}
|
||||
|
||||
/* Propagate an implicit key deletion into replicas and the AOF file.
|
||||
|
@ -2061,9 +2139,9 @@ int keyIsExpired(redisDb *db, robj *key) {
|
|||
* The function returns KEY_EXPIRED if the key is expired BUT not deleted,
|
||||
* or returns KEY_DELETED if the key is expired and deleted. */
|
||||
keyStatus expireIfNeeded(redisDb *db, robj *key, int flags) {
|
||||
if ((!keyIsExpired(db,key)) ||
|
||||
(server.lazy_expire_disabled) ||
|
||||
(flags & EXPIRE_ALLOW_ACCESS_EXPIRED))
|
||||
if ((server.allow_access_expired) ||
|
||||
(flags & EXPIRE_ALLOW_ACCESS_EXPIRED) ||
|
||||
(!keyIsExpired(db,key)))
|
||||
return KEY_VALID;
|
||||
|
||||
/* If we are running in the context of a replica, instead of
|
||||
|
@ -2094,16 +2172,9 @@ keyStatus expireIfNeeded(redisDb *db, robj *key, int flags) {
|
|||
* will have failed over and the new primary will send us the expire. */
|
||||
if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return KEY_EXPIRED;
|
||||
|
||||
/* The key needs to be converted from static to heap before deleted */
|
||||
int static_key = key->refcount == OBJ_STATIC_REFCOUNT;
|
||||
if (static_key) {
|
||||
key = createStringObject(key->ptr, sdslen(key->ptr));
|
||||
}
|
||||
/* Delete the key */
|
||||
deleteExpiredKeyAndPropagate(db,key);
|
||||
if (static_key) {
|
||||
decrRefCount(key);
|
||||
}
|
||||
|
||||
return KEY_DELETED;
|
||||
}
|
||||
|
||||
|
|
37
src/evict.c
37
src/evict.c
|
@ -525,8 +525,7 @@ int performEvictions(void) {
|
|||
int keys_freed = 0;
|
||||
size_t mem_reported, mem_tofree;
|
||||
long long mem_freed; /* May be negative */
|
||||
mstime_t latency, eviction_latency;
|
||||
long long delta;
|
||||
mstime_t latency;
|
||||
int slaves = listLength(server.slaves);
|
||||
int result = EVICT_FAIL;
|
||||
|
||||
|
@ -659,34 +658,18 @@ int performEvictions(void) {
|
|||
|
||||
/* Finally remove the selected key. */
|
||||
if (bestkey) {
|
||||
long long key_mem_freed;
|
||||
db = server.db+bestdbid;
|
||||
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
|
||||
/* We compute the amount of memory freed by db*Delete() alone.
|
||||
* It is possible that actually the memory needed to propagate
|
||||
* the DEL in AOF and replication link is greater than the one
|
||||
* we are freeing removing the key, but we can't account for
|
||||
* that otherwise we would never exit the loop.
|
||||
*
|
||||
* Same for CSC invalidation messages generated by signalModifiedKey.
|
||||
*
|
||||
* AOF and Output buffer memory will be freed eventually so
|
||||
* we only care about memory used by the key space. */
|
||||
|
||||
enterExecutionUnit(1, 0);
|
||||
delta = (long long) zmalloc_used_memory();
|
||||
latencyStartMonitor(eviction_latency);
|
||||
dbGenericDelete(db,keyobj,server.lazyfree_lazy_eviction,DB_FLAG_KEY_EVICTED);
|
||||
latencyEndMonitor(eviction_latency);
|
||||
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
|
||||
delta -= (long long) zmalloc_used_memory();
|
||||
mem_freed += delta;
|
||||
server.stat_evictedkeys++;
|
||||
signalModifiedKey(NULL,db,keyobj);
|
||||
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
|
||||
keyobj, db->id);
|
||||
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
|
||||
exitExecutionUnit();
|
||||
postExecutionUnitOperations();
|
||||
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
|
||||
deleteEvictedKeyAndPropagate(db, keyobj, &key_mem_freed);
|
||||
decrRefCount(keyobj);
|
||||
exitExecutionUnit();
|
||||
/* Propagate the DEL command */
|
||||
postExecutionUnitOperations();
|
||||
|
||||
mem_freed += key_mem_freed;
|
||||
keys_freed++;
|
||||
|
||||
if (keys_freed % 16 == 0) {
|
||||
|
|
22
src/expire.c
22
src/expire.c
|
@ -36,17 +36,18 @@ static double avg_ttl_factor[16] = {0.98, 0.9604, 0.941192, 0.922368, 0.903921,
|
|||
* to the function to avoid too many gettimeofday() syscalls. */
|
||||
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
|
||||
long long t = dictGetSignedIntegerVal(de);
|
||||
if (now > t) {
|
||||
if (now < t)
|
||||
return 0;
|
||||
|
||||
enterExecutionUnit(1, 0);
|
||||
sds key = dictGetKey(de);
|
||||
robj *keyobj = createStringObject(key,sdslen(key));
|
||||
deleteExpiredKeyAndPropagate(db,keyobj);
|
||||
decrRefCount(keyobj);
|
||||
exitExecutionUnit();
|
||||
/* Propagate the DEL command */
|
||||
postExecutionUnitOperations();
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Try to expire a few timed out keys. The algorithm used is adaptive and
|
||||
|
@ -113,8 +114,6 @@ void expireScanCallback(void *privdata, const dictEntry *const_de) {
|
|||
long long ttl = dictGetSignedIntegerVal(de) - data->now;
|
||||
if (activeExpireCycleTryExpire(data->db, de, data->now)) {
|
||||
data->expired++;
|
||||
/* Propagate the DEL command */
|
||||
postExecutionUnitOperations();
|
||||
}
|
||||
if (ttl > 0) {
|
||||
/* We want the average TTL of keys yet not expired. */
|
||||
|
@ -465,20 +464,13 @@ void expireSlaveKeys(void) {
|
|||
if ((dbids & 1) != 0) {
|
||||
redisDb *db = server.db+dbid;
|
||||
dictEntry *expire = dbFindExpires(db, keyname);
|
||||
|
||||
if (expire &&
|
||||
activeExpireCycleTryExpire(server.db+dbid,expire,start))
|
||||
{
|
||||
/* Propagate the DEL (writable replicas do not propagate anything to other replicas,
|
||||
* but they might propagate to AOF) and trigger module hooks. */
|
||||
postExecutionUnitOperations();
|
||||
}
|
||||
int expired = expire && activeExpireCycleTryExpire(server.db+dbid,expire,start);
|
||||
|
||||
/* If the key was not expired in this DB, we need to set the
|
||||
* corresponding bit in the new bitmap we set as value.
|
||||
* At the end of the loop if the bitmap is zero, it means we
|
||||
* no longer need to keep track of this key. */
|
||||
else if (expire) {
|
||||
if (expire && !expired) {
|
||||
noexpire++;
|
||||
new_dbids |= (uint64_t)1 << dbid;
|
||||
}
|
||||
|
|
|
@ -8849,9 +8849,9 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
|
|||
* it will not be notified about it. */
|
||||
int prev_active = sub->active;
|
||||
sub->active = 1;
|
||||
server.lazy_expire_disabled++;
|
||||
server.allow_access_expired++;
|
||||
sub->notify_callback(&ctx, type, event, key);
|
||||
server.lazy_expire_disabled--;
|
||||
server.allow_access_expired--;
|
||||
sub->active = prev_active;
|
||||
moduleFreeContext(&ctx);
|
||||
}
|
||||
|
@ -11890,7 +11890,7 @@ void processModuleLoadingProgressEvent(int is_aof) {
|
|||
/* When a key is deleted (in dbAsyncDelete/dbSyncDelete/setKey), it
|
||||
* will be called to tell the module which key is about to be released. */
|
||||
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) {
|
||||
server.lazy_expire_disabled++;
|
||||
server.allow_access_expired++;
|
||||
int subevent = REDISMODULE_SUBEVENT_KEY_DELETED;
|
||||
if (flags & DB_FLAG_KEY_EXPIRED) {
|
||||
subevent = REDISMODULE_SUBEVENT_KEY_EXPIRED;
|
||||
|
@ -11913,7 +11913,7 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) {
|
|||
mt->unlink(key,mv->value);
|
||||
}
|
||||
}
|
||||
server.lazy_expire_disabled--;
|
||||
server.allow_access_expired--;
|
||||
}
|
||||
|
||||
/* Return the free_effort of the module, it will automatically choose to call
|
||||
|
|
|
@ -2074,7 +2074,7 @@ void initServerConfig(void) {
|
|||
server.bindaddr[j] = zstrdup(default_bindaddr[j]);
|
||||
memset(server.listeners, 0x00, sizeof(server.listeners));
|
||||
server.active_expire_enabled = 1;
|
||||
server.lazy_expire_disabled = 0;
|
||||
server.allow_access_expired = 0;
|
||||
server.skip_checksum_validation = 0;
|
||||
server.loading = 0;
|
||||
server.async_loading = 0;
|
||||
|
|
17
src/server.h
17
src/server.h
|
@ -1744,7 +1744,7 @@ struct redisServer {
|
|||
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
|
||||
int active_expire_enabled; /* Can be disabled for testing purposes. */
|
||||
int active_expire_effort; /* From 1 (default) to 10, active effort. */
|
||||
int lazy_expire_disabled; /* If > 0, don't trigger lazy expire */
|
||||
int allow_access_expired; /* If > 0, allow access to logically expired keys */
|
||||
int active_defrag_enabled;
|
||||
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
|
||||
int skip_checksum_validation; /* Disable checksum validation for RDB and RESTORE payload. */
|
||||
|
@ -3365,6 +3365,7 @@ int setModuleNumericConfig(ModuleConfig *config, long long val, const char **err
|
|||
/* db.c -- Keyspace access API */
|
||||
int removeExpire(redisDb *db, robj *key);
|
||||
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj);
|
||||
void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed);
|
||||
void propagateDeletion(redisDb *db, robj *key, int lazy);
|
||||
int keyIsExpired(redisDb *db, robj *key);
|
||||
long long getExpire(redisDb *db, robj *key);
|
||||
|
@ -3411,7 +3412,18 @@ int dbDelete(redisDb *db, robj *key);
|
|||
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o);
|
||||
robj *dbUnshareStringValueWithDictEntry(redisDb *db, robj *key, robj *o, dictEntry *de);
|
||||
|
||||
|
||||
#define FLUSH_TYPE_ALL 0
|
||||
#define FLUSH_TYPE_DB 1
|
||||
#define FLUSH_TYPE_SLOTS 2
|
||||
typedef struct SlotRange {
|
||||
unsigned short first, last;
|
||||
} SlotRange;
|
||||
typedef struct SlotsFlush {
|
||||
int numRanges;
|
||||
SlotRange ranges[];
|
||||
} SlotsFlush;
|
||||
void replySlotsFlushAndFree(client *c, SlotsFlush *sflush);
|
||||
int flushCommandCommon(client *c, int type, int flags, SlotsFlush *sflush);
|
||||
#define EMPTYDB_NO_FLAGS 0 /* No flags. */
|
||||
#define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */
|
||||
#define EMPTYDB_NOFUNCTIONS (1<<1) /* Indicate not to flush the functions. */
|
||||
|
@ -3782,6 +3794,7 @@ void migrateCommand(client *c);
|
|||
void askingCommand(client *c);
|
||||
void readonlyCommand(client *c);
|
||||
void readwriteCommand(client *c);
|
||||
void sflushCommand(client *c);
|
||||
int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr);
|
||||
void dumpCommand(client *c);
|
||||
void objectCommand(client *c);
|
||||
|
|
|
@ -241,8 +241,12 @@ void sortCommandGeneric(client *c, int readonly) {
|
|||
} else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
|
||||
/* If GET is specified with a real pattern, we can't accept it in cluster mode,
|
||||
* unless we can make sure the keys formed by the pattern are in the same slot
|
||||
* as the key to sort. */
|
||||
if (server.cluster_enabled && patternHashSlot(c->argv[j+1]->ptr, sdslen(c->argv[j+1]->ptr)) != getKeySlot(c->argv[1]->ptr)) {
|
||||
* as the key to sort. The pattern # represents the key itself, so just skip
|
||||
* pattern slot check. */
|
||||
if (server.cluster_enabled &&
|
||||
strcmp(c->argv[j+1]->ptr, "#") &&
|
||||
patternHashSlot(c->argv[j+1]->ptr, sdslen(c->argv[j+1]->ptr)) != getKeySlot(c->argv[1]->ptr))
|
||||
{
|
||||
addReplyError(c, "GET option of SORT denied in Cluster mode when "
|
||||
"keys formed by the pattern may be in different slots.");
|
||||
syntax_error++;
|
||||
|
|
|
@ -747,7 +747,7 @@ GetFieldRes hashTypeGetValue(redisDb *db, robj *o, sds field, unsigned char **vs
|
|||
}
|
||||
|
||||
if ((server.loading) ||
|
||||
(server.lazy_expire_disabled) ||
|
||||
(server.allow_access_expired) ||
|
||||
(hfeFlags & HFE_LAZY_AVOID_FIELD_DEL) ||
|
||||
(isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)))
|
||||
return GETF_EXPIRED;
|
||||
|
@ -1929,7 +1929,7 @@ static int hashTypeExpireIfNeeded(redisDb *db, robj *o) {
|
|||
|
||||
/* Follow expireIfNeeded() conditions of when not lazy-expire */
|
||||
if ( (server.loading) ||
|
||||
(server.lazy_expire_disabled) ||
|
||||
(server.allow_access_expired) ||
|
||||
(server.masterhost) || /* master-client or user-client, don't delete */
|
||||
(isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)))
|
||||
return 0;
|
||||
|
|
|
@ -56,8 +56,11 @@
|
|||
|
||||
/* Glob-style pattern matching. */
|
||||
static int stringmatchlen_impl(const char *pattern, int patternLen,
|
||||
const char *string, int stringLen, int nocase, int *skipLongerMatches)
|
||||
const char *string, int stringLen, int nocase, int *skipLongerMatches, int nesting)
|
||||
{
|
||||
/* Protection against abusive patterns. */
|
||||
if (nesting > 1000) return 0;
|
||||
|
||||
while(patternLen && stringLen) {
|
||||
switch(pattern[0]) {
|
||||
case '*':
|
||||
|
@ -69,7 +72,7 @@ static int stringmatchlen_impl(const char *pattern, int patternLen,
|
|||
return 1; /* match */
|
||||
while(stringLen) {
|
||||
if (stringmatchlen_impl(pattern+1, patternLen-1,
|
||||
string, stringLen, nocase, skipLongerMatches))
|
||||
string, stringLen, nocase, skipLongerMatches, nesting+1))
|
||||
return 1; /* match */
|
||||
if (*skipLongerMatches)
|
||||
return 0; /* no match */
|
||||
|
@ -191,7 +194,7 @@ static int stringmatchlen_impl(const char *pattern, int patternLen,
|
|||
int stringmatchlen(const char *pattern, int patternLen,
|
||||
const char *string, int stringLen, int nocase) {
|
||||
int skipLongerMatches = 0;
|
||||
return stringmatchlen_impl(pattern,patternLen,string,stringLen,nocase,&skipLongerMatches);
|
||||
return stringmatchlen_impl(pattern,patternLen,string,stringLen,nocase,&skipLongerMatches,0);
|
||||
}
|
||||
|
||||
int stringmatch(const char *pattern, const char *string, int nocase) {
|
||||
|
|
|
@ -6,6 +6,9 @@ if {$::singledb} {
|
|||
set ::dbnum 9
|
||||
}
|
||||
|
||||
file delete ./.rediscli_history_test
|
||||
set ::env(REDISCLI_HISTFILE) ".rediscli_history_test"
|
||||
|
||||
start_server {tags {"cli"}} {
|
||||
proc open_cli {{opts ""} {infile ""}} {
|
||||
if { $opts == "" } {
|
||||
|
@ -68,10 +71,8 @@ start_server {tags {"cli"}} {
|
|||
set _ [format_output [read_cli $fd]]
|
||||
}
|
||||
|
||||
file delete ./.rediscli_history_test
|
||||
proc test_interactive_cli_with_prompt {name code} {
|
||||
set ::env(FAKETTY_WITH_PROMPT) 1
|
||||
set ::env(REDISCLI_HISTFILE) ".rediscli_history_test"
|
||||
test_interactive_cli $name $code
|
||||
unset ::env(FAKETTY_WITH_PROMPT)
|
||||
}
|
||||
|
@ -839,3 +840,4 @@ start_server {tags {"cli external:skip"}} {
|
|||
}
|
||||
}
|
||||
|
||||
file delete ./.rediscli_history_test
|
||||
|
|
|
@ -241,6 +241,11 @@ proc tags_acceptable {tags err_return} {
|
|||
return 0
|
||||
}
|
||||
|
||||
if { [lsearch $tags "experimental"] >=0 && [lsearch $::allowtags "experimental"] == -1 } {
|
||||
set err "experimental test not allowed"
|
||||
return 0
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
|
|
|
@ -506,7 +506,7 @@ proc the_end {} {
|
|||
}
|
||||
}
|
||||
|
||||
# The client is not even driven (the test server is instead) as we just need
|
||||
# The client is not event driven (the test server is instead) as we just need
|
||||
# to read the command, execute, reply... all this in a loop.
|
||||
proc test_client_main server_port {
|
||||
set ::test_server_fd [socket localhost $server_port]
|
||||
|
|
|
@ -107,3 +107,76 @@ test "DELSLOTSRANGE command with several boundary conditions test suite" {
|
|||
assert_match "*9829 11000*12001 12100*12201 13104*" [$master4 CLUSTER SLOTS]
|
||||
}
|
||||
} cluster_allocate_with_continuous_slots_local
|
||||
|
||||
start_cluster 2 0 {tags {external:skip cluster experimental}} {
|
||||
|
||||
set master1 [srv 0 "client"]
|
||||
set master2 [srv -1 "client"]
|
||||
|
||||
test "SFLUSH - Errors and output validation" {
|
||||
assert_match "* 0-8191*" [$master1 CLUSTER NODES]
|
||||
assert_match "* 8192-16383*" [$master2 CLUSTER NODES]
|
||||
assert_match "*0 8191*" [$master1 CLUSTER SLOTS]
|
||||
assert_match "*8192 16383*" [$master2 CLUSTER SLOTS]
|
||||
|
||||
# make master1 non-continuous slots
|
||||
$master1 cluster DELSLOTSRANGE 1000 2000
|
||||
|
||||
# Test SFLUSH errors validation
|
||||
assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4}
|
||||
assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4 SYNC}
|
||||
assert_error {ERR Invalid or out of range slot} {$master1 SFLUSH x 4}
|
||||
assert_error {ERR Invalid or out of range slot} {$master1 SFLUSH 0 12x}
|
||||
assert_error {ERR Slot 3 specified multiple times} {$master1 SFLUSH 2 4 3 5}
|
||||
assert_error {ERR start slot number 8 is greater than*} {$master1 SFLUSH 8 4}
|
||||
assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 4 8 10}
|
||||
assert_error {ERR wrong number of arguments*} {$master1 SFLUSH 0 999 2001 8191 ASYNCX}
|
||||
|
||||
# Test SFLUSH output validation
|
||||
assert_match "" [$master1 SFLUSH 2 4]
|
||||
assert_match "" [$master1 SFLUSH 0 4]
|
||||
assert_match "" [$master2 SFLUSH 0 4]
|
||||
assert_match "" [$master1 SFLUSH 1 8191]
|
||||
assert_match "" [$master1 SFLUSH 0 8190]
|
||||
assert_match "" [$master1 SFLUSH 0 998 2001 8191]
|
||||
assert_match "" [$master1 SFLUSH 1 999 2001 8191]
|
||||
assert_match "" [$master1 SFLUSH 0 999 2001 8190]
|
||||
assert_match "" [$master1 SFLUSH 0 999 2002 8191]
|
||||
assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 999 2001 8191]
|
||||
assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 8191]
|
||||
assert_match "{0 999} {2001 8191}" [$master1 SFLUSH 0 4000 4001 8191]
|
||||
assert_match "" [$master2 SFLUSH 8193 16383]
|
||||
assert_match "" [$master2 SFLUSH 8192 16382]
|
||||
assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383]
|
||||
assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 SYNC]
|
||||
assert_match "{8192 16383}" [$master2 SFLUSH 8192 16383 ASYNC]
|
||||
assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383]
|
||||
assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383 SYNC]
|
||||
assert_match "{8192 16383}" [$master2 SFLUSH 8192 9000 9001 16383 ASYNC]
|
||||
|
||||
# restore master1 continuous slots
|
||||
$master1 cluster ADDSLOTSRANGE 1000 2000
|
||||
}
|
||||
|
||||
test "SFLUSH - Deletes the keys with argument <NONE>/SYNC/ASYNC" {
|
||||
foreach op {"" "SYNC" "ASYNC"} {
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
catch {$master1 SET key$i val$i}
|
||||
catch {$master2 SET key$i val$i}
|
||||
}
|
||||
|
||||
assert {[$master1 DBSIZE] > 0}
|
||||
assert {[$master2 DBSIZE] > 0}
|
||||
if {$op eq ""} {
|
||||
assert_match "{0 8191}" [ $master1 SFLUSH 0 8191]
|
||||
} else {
|
||||
assert_match "{0 8191}" [ $master1 SFLUSH 0 8191 $op]
|
||||
}
|
||||
assert {[$master1 DBSIZE] == 0}
|
||||
assert {[$master2 DBSIZE] > 0}
|
||||
assert_match "{8192 16383}" [ $master2 SFLUSH 8192 16383]
|
||||
assert {[$master2 DBSIZE] == 0}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -509,6 +509,12 @@ foreach {type large} [array get largevalue] {
|
|||
r KEYS "a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*a*b"
|
||||
} {}
|
||||
|
||||
test {Regression for pattern matching very long nested loops} {
|
||||
r flushdb
|
||||
r SET [string repeat "a" 50000] 1
|
||||
r KEYS [string repeat "*?" 50000]
|
||||
} {}
|
||||
|
||||
test {Coverage: basic SWAPDB test and unhappy path} {
|
||||
r flushall
|
||||
r select 0
|
||||
|
|
|
@ -691,6 +691,12 @@ start_server {tags {"scripting"}} {
|
|||
set e
|
||||
} {ERR *Attempt to modify a readonly table*}
|
||||
|
||||
test {lua bit.tohex bug} {
|
||||
set res [run_script {return bit.tohex(65535, -2147483648)} 0]
|
||||
r ping
|
||||
set res
|
||||
} {0000FFFF}
|
||||
|
||||
test {Test an example script DECR_IF_GT} {
|
||||
set decr_if_gt {
|
||||
local current
|
||||
|
|
|
@ -73,7 +73,7 @@ start_server {
|
|||
set result [create_random_dataset 16 lpush]
|
||||
test "SORT GET #" {
|
||||
assert_equal [lsort -integer $result] [r sort tosort GET #]
|
||||
} {} {cluster:skip}
|
||||
}
|
||||
|
||||
foreach command {SORT SORT_RO} {
|
||||
test "$command GET <const>" {
|
||||
|
@ -393,6 +393,11 @@ start_cluster 1 0 {tags {"external:skip cluster sort"}} {
|
|||
r sort "{a}mylist" by "{a}by*" get "{a}get*"
|
||||
} {30 200 100}
|
||||
|
||||
test "sort get # in cluster mode" {
|
||||
assert_equal [r sort "{a}mylist" by "{a}by*" get # ] {3 1 2}
|
||||
r sort "{a}mylist" by "{a}by*" get "{a}get*" get #
|
||||
} {30 3 200 1 100 2}
|
||||
|
||||
test "sort_ro by in cluster mode" {
|
||||
catch {r sort_ro "{a}mylist" by by*} e
|
||||
assert_match {ERR BY option of SORT denied in Cluster mode when *} $e
|
||||
|
@ -404,4 +409,9 @@ start_cluster 1 0 {tags {"external:skip cluster sort"}} {
|
|||
assert_match {ERR GET option of SORT denied in Cluster mode when *} $e
|
||||
r sort_ro "{a}mylist" by "{a}by*" get "{a}get*"
|
||||
} {30 200 100}
|
||||
|
||||
test "sort_ro get # in cluster mode" {
|
||||
assert_equal [r sort_ro "{a}mylist" by "{a}by*" get # ] {3 1 2}
|
||||
r sort_ro "{a}mylist" by "{a}by*" get "{a}get*" get #
|
||||
} {30 3 200 1 100 2}
|
||||
}
|
||||
|
|
|
@ -1454,6 +1454,8 @@ start_server {
|
|||
assert_equal [dict get $group entries-read] 3
|
||||
assert_equal [dict get $group lag] 0
|
||||
|
||||
wait_for_ofs_sync $master $replica
|
||||
|
||||
set reply [$replica XINFO STREAM mystream FULL]
|
||||
set group [lindex [dict get $reply groups] 0]
|
||||
assert_equal [dict get $group entries-read] 3
|
||||
|
|
|
@ -517,6 +517,11 @@ class Subcommand(Command):
|
|||
|
||||
|
||||
def create_command(name, desc):
|
||||
flags = desc.get("command_flags")
|
||||
if flags and "EXPERIMENTAL" in flags:
|
||||
print("Command %s is experimental, skipping..." % name)
|
||||
return
|
||||
|
||||
if desc.get("container"):
|
||||
cmd = Subcommand(name.upper(), desc)
|
||||
subcommands.setdefault(desc["container"].upper(), {})[name] = cmd
|
||||
|
|
Loading…
Reference in New Issue