mirror of https://mirror.osredm.com/root/redis.git
Merge remote-tracking branch 'upstream/unstable' into HEAD
This commit is contained in:
commit
452b5b8a3b
15
redis.conf
15
redis.conf
|
@ -1783,6 +1783,21 @@ aof-timestamp-enabled no
|
|||
#
|
||||
# cluster-preferred-endpoint-type ip
|
||||
|
||||
# This configuration defines the sampling ratio (0-100) for checking command
|
||||
# compatibility in cluster mode. When a command is executed, it is sampled at
|
||||
# the specified ratio to determine if it complies with Redis cluster constraints,
|
||||
# such as cross-slot restrictions.
|
||||
#
|
||||
# - A value of 0 means no commands are sampled for compatibility checks.
|
||||
# - A value of 100 means all commands are checked.
|
||||
# - Intermediate values (e.g., 10) mean that approximately 10% of the commands
|
||||
# are randomly selected for compatibility verification.
|
||||
#
|
||||
# Higher sampling ratios may introduce additional performance overhead, especially
|
||||
# under high QPS. The default value is 0 (no sampling).
|
||||
#
|
||||
# cluster-compatibility-sample-ratio 0
|
||||
|
||||
# In order to setup your cluster make sure to read the documentation
|
||||
# available at https://redis.io web site.
|
||||
|
||||
|
|
|
@ -270,6 +270,7 @@ void disconnectAllBlockedClients(void) {
|
|||
|
||||
if (c->bstate.btype == BLOCKED_LAZYFREE) {
|
||||
addReply(c, shared.ok); /* No reason lazy-free to fail */
|
||||
updateStatsOnUnblock(c, 0, 0, 0);
|
||||
c->flags &= ~CLIENT_PENDING_COMMAND;
|
||||
unblockClient(c, 1);
|
||||
} else {
|
||||
|
|
|
@ -3197,6 +3197,7 @@ standardConfig static_configs[] = {
|
|||
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
|
||||
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("cluster-compatibility-sample-ratio", NULL, MODIFIABLE_CONFIG, 0, 100, server.cluster_compatibility_sample_ratio, 0, INTEGER_CONFIG, NULL, NULL),
|
||||
|
||||
/* Unsigned int configs */
|
||||
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
|
||||
|
|
32
src/db.c
32
src/db.c
|
@ -446,7 +446,7 @@ robj *dbRandomKey(redisDb *db) {
|
|||
|
||||
key = dictGetKey(de);
|
||||
keyobj = createStringObject(key,sdslen(key));
|
||||
if (allvolatile && server.masterhost && --maxtries == 0) {
|
||||
if (allvolatile && (server.masterhost || isPausedActions(PAUSE_ACTION_EXPIRE)) && --maxtries == 0) {
|
||||
/* If the DB is composed only of keys with an expire set,
|
||||
* it could happen that all the keys are already logically
|
||||
* expired in the slave, so the function cannot stop because
|
||||
|
@ -812,8 +812,8 @@ void flushallSyncBgDone(uint64_t client_id, void *sflush) {
|
|||
SlotsFlush *slotsFlush = sflush;
|
||||
client *c = lookupClientByID(client_id);
|
||||
|
||||
/* Verify that client still exists */
|
||||
if (!c) {
|
||||
/* Verify that client still exists and being blocked. */
|
||||
if (!(c && c->flags & CLIENT_BLOCKED)) {
|
||||
zfree(sflush);
|
||||
return;
|
||||
}
|
||||
|
@ -834,8 +834,13 @@ void flushallSyncBgDone(uint64_t client_id, void *sflush) {
|
|||
/* mark client as unblocked */
|
||||
unblockClient(c, 1);
|
||||
|
||||
/* FLUSH command is finished. resetClient() and update replication offset. */
|
||||
commandProcessed(c);
|
||||
if (c->flags & CLIENT_PENDING_COMMAND) {
|
||||
c->flags &= ~CLIENT_PENDING_COMMAND;
|
||||
/* The FLUSH command won't be reprocessed, FLUSH command is finished, but
|
||||
* we still need to complete its full processing flow, including updating
|
||||
* the replication offset. */
|
||||
commandProcessed(c);
|
||||
}
|
||||
|
||||
/* On flush completion, update the client's memory */
|
||||
updateClientMemUsageAndBucket(c);
|
||||
|
@ -880,6 +885,10 @@ int flushCommandCommon(client *c, int type, int flags, SlotsFlush *sflush) {
|
|||
elapsedStart(&c->bstate.lazyfreeStartTime);
|
||||
|
||||
c->bstate.timeout = 0;
|
||||
/* We still need to perform cleanup operations for the command, including
|
||||
* updating the replication offset, so mark this command as pending to
|
||||
* avoid command from being reset during unblock. */
|
||||
c->flags |= CLIENT_PENDING_COMMAND;
|
||||
blockClient(c,BLOCKED_LAZYFREE);
|
||||
bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id, sflush);
|
||||
}
|
||||
|
@ -977,6 +986,11 @@ void selectCommand(client *c) {
|
|||
addReplyError(c,"SELECT is not allowed in cluster mode");
|
||||
return;
|
||||
}
|
||||
|
||||
if (id != 0) {
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
}
|
||||
|
||||
if (selectDb(c,id) == C_ERR) {
|
||||
addReplyError(c,"DB index is out of range");
|
||||
} else {
|
||||
|
@ -1689,6 +1703,9 @@ void moveCommand(client *c) {
|
|||
return;
|
||||
}
|
||||
|
||||
/* Record incompatible operations in cluster mode */
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
|
||||
/* Check if the element exists and get a reference */
|
||||
o = lookupKeyWrite(c->db,c->argv[1]);
|
||||
if (!o) {
|
||||
|
@ -1782,6 +1799,10 @@ void copyCommand(client *c) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (srcid != 0 || dbid != 0) {
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
}
|
||||
|
||||
/* Check if the element exists and get a reference */
|
||||
o = lookupKeyRead(c->db, key);
|
||||
if (!o) {
|
||||
|
@ -2020,6 +2041,7 @@ void swapdbCommand(client *c) {
|
|||
RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2};
|
||||
moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si);
|
||||
server.dirty++;
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
}
|
||||
|
|
49
src/defrag.c
49
src/defrag.c
|
@ -482,8 +482,7 @@ long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) {
|
|||
quicklistNode *node;
|
||||
long iterations = 0;
|
||||
int bookmark_failed = 0;
|
||||
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
|
||||
return 0;
|
||||
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
|
||||
|
||||
if (*cursor == 0) {
|
||||
/* if cursor is 0, we start new iteration */
|
||||
|
@ -532,8 +531,7 @@ void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
|
|||
}
|
||||
|
||||
void scanLaterZset(robj *ob, unsigned long *cursor) {
|
||||
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
|
||||
return;
|
||||
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
|
||||
zset *zs = (zset*)ob->ptr;
|
||||
dict *d = zs->dict;
|
||||
scanLaterZsetData data = {zs};
|
||||
|
@ -549,8 +547,7 @@ void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
|
|||
}
|
||||
|
||||
void scanLaterSet(robj *ob, unsigned long *cursor) {
|
||||
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT)
|
||||
return;
|
||||
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
|
||||
dict *d = ob->ptr;
|
||||
dictDefragFunctions defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
|
@ -560,8 +557,7 @@ void scanLaterSet(robj *ob, unsigned long *cursor) {
|
|||
}
|
||||
|
||||
void scanLaterHash(robj *ob, unsigned long *cursor) {
|
||||
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT)
|
||||
return;
|
||||
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
|
||||
dict *d = ob->ptr;
|
||||
dictDefragFunctions defragfns = {
|
||||
.defragAlloc = activeDefragAlloc,
|
||||
|
@ -653,13 +649,10 @@ int defragRaxNode(raxNode **noderef) {
|
|||
|
||||
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
|
||||
int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) {
|
||||
static unsigned char last[sizeof(streamID)];
|
||||
static unsigned char next[sizeof(streamID)];
|
||||
raxIterator ri;
|
||||
long iterations = 0;
|
||||
if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) {
|
||||
*cursor = 0;
|
||||
return 0;
|
||||
}
|
||||
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
|
||||
|
||||
stream *s = ob->ptr;
|
||||
raxStart(&ri,s->rax);
|
||||
|
@ -671,8 +664,11 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime)
|
|||
ri.node_cb = defragRaxNode;
|
||||
raxSeek(&ri,"^",NULL,0);
|
||||
} else {
|
||||
/* if cursor is non-zero, we seek to the static 'last' */
|
||||
if (!raxSeek(&ri,">", last, sizeof(last))) {
|
||||
/* if cursor is non-zero, we seek to the static 'next'.
|
||||
* Since node_cb is set after seek operation, any node traversed during seek wouldn't
|
||||
* be defragmented. To prevent this, we advance to next node before exiting previous
|
||||
* run, ensuring it gets defragmented instead of being skipped during current seek. */
|
||||
if (!raxSeek(&ri,">=", next, sizeof(next))) {
|
||||
*cursor = 0;
|
||||
raxStop(&ri);
|
||||
return 0;
|
||||
|
@ -690,8 +686,15 @@ int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime)
|
|||
server.stat_active_defrag_scanned++;
|
||||
if (++iterations > 128) {
|
||||
if (getMonotonicUs() > endtime) {
|
||||
serverAssert(ri.key_len==sizeof(last));
|
||||
memcpy(last,ri.key,ri.key_len);
|
||||
/* Move to next node. */
|
||||
if (!raxNext(&ri)) {
|
||||
/* If we reached the end, we can stop */
|
||||
*cursor = 0;
|
||||
raxStop(&ri);
|
||||
return 0;
|
||||
}
|
||||
serverAssert(ri.key_len==sizeof(next));
|
||||
memcpy(next,ri.key,ri.key_len);
|
||||
raxStop(&ri);
|
||||
return 1;
|
||||
}
|
||||
|
@ -995,22 +998,22 @@ void defragPubsubScanCallback(void *privdata, const dictEntry *de) {
|
|||
int defragLaterItem(dictEntry *de, unsigned long *cursor, monotime endtime, int dbid) {
|
||||
if (de) {
|
||||
robj *ob = dictGetVal(de);
|
||||
if (ob->type == OBJ_LIST) {
|
||||
if (ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST) {
|
||||
return scanLaterList(ob, cursor, endtime);
|
||||
} else if (ob->type == OBJ_SET) {
|
||||
} else if (ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT) {
|
||||
scanLaterSet(ob, cursor);
|
||||
} else if (ob->type == OBJ_ZSET) {
|
||||
} else if (ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
scanLaterZset(ob, cursor);
|
||||
} else if (ob->type == OBJ_HASH) {
|
||||
} else if (ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT) {
|
||||
scanLaterHash(ob, cursor);
|
||||
} else if (ob->type == OBJ_STREAM) {
|
||||
} else if (ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM) {
|
||||
return scanLaterStreamListpacks(ob, cursor, endtime);
|
||||
} else if (ob->type == OBJ_MODULE) {
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj, dictGetKey(de));
|
||||
return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid);
|
||||
} else {
|
||||
*cursor = 0; /* object type may have changed since we schedule it for later */
|
||||
*cursor = 0; /* object type/encoding may have changed since we schedule it for later */
|
||||
}
|
||||
} else {
|
||||
*cursor = 0; /* object may have been deleted already */
|
||||
|
|
|
@ -1273,6 +1273,10 @@ int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc c
|
|||
if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
/* We will encounter an error as above if cluster is enable */
|
||||
if (flags & CMD_MODULE_NO_CLUSTER)
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
|
||||
/* Check if the command name is valid. */
|
||||
if (!isCommandNameValid(name))
|
||||
return REDISMODULE_ERR;
|
||||
|
@ -1400,6 +1404,10 @@ int RM_CreateSubcommand(RedisModuleCommand *parent, const char *name, RedisModul
|
|||
if ((flags & CMD_MODULE_NO_CLUSTER) && server.cluster_enabled)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
/* We will encounter an error as above if cluster is enable */
|
||||
if (flags & CMD_MODULE_NO_CLUSTER)
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
|
||||
struct redisCommand *parent_cmd = parent->rediscmd;
|
||||
|
||||
if (parent_cmd->parent)
|
||||
|
|
|
@ -172,6 +172,7 @@ client *createClient(connection *conn) {
|
|||
c->io_flags = CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED;
|
||||
c->read_error = 0;
|
||||
c->slot = -1;
|
||||
c->cluster_compatibility_check_slot = -2;
|
||||
c->ctime = c->lastinteraction = server.unixtime;
|
||||
c->duration = 0;
|
||||
clientSetDefaultAuth(c);
|
||||
|
@ -2238,6 +2239,7 @@ static inline void resetClientInternal(client *c, int free_argv) {
|
|||
c->multibulklen = 0;
|
||||
c->bulklen = -1;
|
||||
c->slot = -1;
|
||||
c->cluster_compatibility_check_slot = -2;
|
||||
c->flags &= ~CLIENT_EXECUTING_COMMAND;
|
||||
|
||||
/* Make sure the duration has been recorded to some command. */
|
||||
|
|
|
@ -2014,8 +2014,6 @@ void readSyncBulkPayload(connection *conn) {
|
|||
char buf[PROTO_IOBUF_LEN];
|
||||
ssize_t nread, readlen, nwritten;
|
||||
int use_diskless_load = useDisklessLoad();
|
||||
redisDb *diskless_load_tempDb = NULL;
|
||||
functionsLibCtx* temp_functions_lib_ctx = NULL;
|
||||
int rdbchannel = (conn == server.repl_rdb_transfer_s);
|
||||
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
|
||||
EMPTYDB_NO_FLAGS;
|
||||
|
@ -2208,17 +2206,9 @@ void readSyncBulkPayload(connection *conn) {
|
|||
killRDBChild();
|
||||
}
|
||||
|
||||
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
|
||||
/* Initialize empty tempDb dictionaries. */
|
||||
diskless_load_tempDb = disklessLoadInitTempDb();
|
||||
temp_functions_lib_ctx = functionsLibCtxCreate();
|
||||
|
||||
moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
|
||||
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED,
|
||||
NULL);
|
||||
} else {
|
||||
/* Attach to the new master immediately if we are not using swapdb. */
|
||||
if (!use_diskless_load || server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB)
|
||||
replicationAttachToNewMaster();
|
||||
}
|
||||
|
||||
/* Before loading the DB into memory we need to delete the readable
|
||||
* handler, otherwise it will get called recursively since
|
||||
|
@ -2235,6 +2225,9 @@ void readSyncBulkPayload(connection *conn) {
|
|||
int asyncLoading = 0;
|
||||
|
||||
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
|
||||
moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
|
||||
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED,
|
||||
NULL);
|
||||
/* Async loading means we continue serving read commands during full resync, and
|
||||
* "swap" the new db with the old db only when loading is done.
|
||||
* It is enabled only on SWAPDB diskless replication when master replication ID hasn't changed,
|
||||
|
@ -2243,15 +2236,9 @@ void readSyncBulkPayload(connection *conn) {
|
|||
if (memcmp(server.replid, server.master_replid, CONFIG_RUN_ID_SIZE) == 0) {
|
||||
asyncLoading = 1;
|
||||
}
|
||||
dbarray = diskless_load_tempDb;
|
||||
functions_lib_ctx = temp_functions_lib_ctx;
|
||||
} else {
|
||||
dbarray = server.db;
|
||||
functions_lib_ctx = functionsLibCtxGetCurrent();
|
||||
functionsLibCtxClear(functions_lib_ctx);
|
||||
}
|
||||
|
||||
disklessLoadingRio = &rdb;
|
||||
/* Empty db */
|
||||
loadingSetFlags(NULL, server.repl_transfer_size, asyncLoading);
|
||||
if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) {
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
|
||||
|
@ -2262,7 +2249,17 @@ void readSyncBulkPayload(connection *conn) {
|
|||
}
|
||||
loadingFireEvent(RDBFLAGS_REPLICATION);
|
||||
|
||||
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
|
||||
dbarray = disklessLoadInitTempDb();
|
||||
functions_lib_ctx = functionsLibCtxCreate();
|
||||
} else {
|
||||
dbarray = server.db;
|
||||
functions_lib_ctx = functionsLibCtxGetCurrent();
|
||||
functionsLibCtxClear(functions_lib_ctx);
|
||||
}
|
||||
|
||||
rioInitWithConn(&rdb,conn,server.repl_transfer_size);
|
||||
disklessLoadingRio = &rdb;
|
||||
|
||||
/* Put the socket in blocking mode to simplify RDB transfer.
|
||||
* We'll restore it when the RDB is received. */
|
||||
|
@ -2297,8 +2294,8 @@ void readSyncBulkPayload(connection *conn) {
|
|||
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED,
|
||||
NULL);
|
||||
|
||||
disklessLoadDiscardTempDb(diskless_load_tempDb);
|
||||
functionsLibCtxFree(temp_functions_lib_ctx);
|
||||
disklessLoadDiscardTempDb(dbarray);
|
||||
functionsLibCtxFree(functions_lib_ctx);
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background");
|
||||
} else {
|
||||
/* Remove the half-loaded data in case we started with an empty replica. */
|
||||
|
@ -2328,17 +2325,17 @@ void readSyncBulkPayload(connection *conn) {
|
|||
replicationAttachToNewMaster();
|
||||
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB");
|
||||
swapMainDbWithTempDb(diskless_load_tempDb);
|
||||
swapMainDbWithTempDb(dbarray);
|
||||
|
||||
/* swap existing functions ctx with the temporary one */
|
||||
functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx);
|
||||
functionsLibCtxSwapWithCurrent(functions_lib_ctx);
|
||||
|
||||
moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
|
||||
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED,
|
||||
NULL);
|
||||
|
||||
/* Delete the old db as it's useless now. */
|
||||
disklessLoadDiscardTempDb(diskless_load_tempDb);
|
||||
disklessLoadDiscardTempDb(dbarray);
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding old DB in background");
|
||||
}
|
||||
|
||||
|
|
36
src/script.c
36
src/script.c
|
@ -182,6 +182,11 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
|
|||
return C_ERR;
|
||||
}
|
||||
|
||||
/* Can't run script with 'non-cluster' flag as above when cluster is enabled. */
|
||||
if (script_flags & SCRIPT_FLAG_NO_CLUSTER) {
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
}
|
||||
|
||||
if (running_stale && !(script_flags & SCRIPT_FLAG_ALLOW_STALE)) {
|
||||
addReplyError(caller, "-MASTERDOWN Link with MASTER is down, "
|
||||
"replica-serve-stale-data is set to 'no' "
|
||||
|
@ -249,6 +254,7 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
|
|||
run_ctx->original_client = caller;
|
||||
run_ctx->funcname = funcname;
|
||||
run_ctx->slot = caller->slot;
|
||||
run_ctx->cluster_compatibility_check_slot = caller->cluster_compatibility_check_slot;
|
||||
|
||||
client *script_client = run_ctx->c;
|
||||
client *curr_client = run_ctx->original_client;
|
||||
|
@ -303,6 +309,7 @@ void scriptResetRun(scriptRunCtx *run_ctx) {
|
|||
}
|
||||
|
||||
run_ctx->slot = -1;
|
||||
run_ctx->cluster_compatibility_check_slot = -2;
|
||||
|
||||
preventCommandPropagation(run_ctx->original_client);
|
||||
|
||||
|
@ -521,6 +528,33 @@ static int scriptVerifyClusterState(scriptRunCtx *run_ctx, client *c, client *or
|
|||
return C_OK;
|
||||
}
|
||||
|
||||
static void scriptCheckClusterCompatibility(scriptRunCtx *run_ctx, client *c) {
|
||||
int hashslot = -1;
|
||||
|
||||
/* If we don't need to detect for this script or slot violation already
|
||||
* detected and reported for this script, exit */
|
||||
if (run_ctx->cluster_compatibility_check_slot == -2) return;
|
||||
|
||||
if (!areCommandKeysInSameSlot(c, &hashslot)) {
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
/* Already found cross slot usage, skip the check for the rest of the script */
|
||||
run_ctx->cluster_compatibility_check_slot = -2;
|
||||
} else {
|
||||
/* Check whether the declared keys and the accessed keys belong to the same slot.
|
||||
* If having SCRIPT_ALLOW_CROSS_SLOT flag, skip this check since it's allowed
|
||||
* in cluster mode, but it may fail when the slot doesn't belong to the node. */
|
||||
if (hashslot != -1 && !(run_ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) {
|
||||
if (run_ctx->cluster_compatibility_check_slot == -1) {
|
||||
run_ctx->cluster_compatibility_check_slot = hashslot;
|
||||
} else if (run_ctx->cluster_compatibility_check_slot != hashslot) {
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
/* Already found cross slot usage, skip the check for the rest of the script */
|
||||
run_ctx->cluster_compatibility_check_slot = -2;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* set RESP for a given run_ctx */
|
||||
int scriptSetResp(scriptRunCtx *run_ctx, int resp) {
|
||||
if (resp != 2 && resp != 3) {
|
||||
|
@ -618,6 +652,8 @@ void scriptCall(scriptRunCtx *run_ctx, sds *err) {
|
|||
goto error;
|
||||
}
|
||||
|
||||
scriptCheckClusterCompatibility(run_ctx, c);
|
||||
|
||||
int call_flags = CMD_CALL_NONE;
|
||||
if (run_ctx->repl_flags & PROPAGATE_AOF) {
|
||||
call_flags |= CMD_CALL_PROPAGATE_AOF;
|
||||
|
|
|
@ -54,6 +54,7 @@ struct scriptRunCtx {
|
|||
int repl_flags;
|
||||
monotime start_time;
|
||||
int slot;
|
||||
int cluster_compatibility_check_slot;
|
||||
};
|
||||
|
||||
/* Scripts flags */
|
||||
|
|
59
src/server.c
59
src/server.c
|
@ -2660,6 +2660,7 @@ void resetServerStats(void) {
|
|||
server.aof_delayed_fsync = 0;
|
||||
server.stat_reply_buffer_shrinks = 0;
|
||||
server.stat_reply_buffer_expands = 0;
|
||||
server.stat_cluster_incompatible_ops = 0;
|
||||
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
|
||||
server.el_cmd_cnt_max = 0;
|
||||
lazyfreeResetStats();
|
||||
|
@ -4124,6 +4125,21 @@ int processCommand(client *c) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Check if the command keys are all in the same slot for cluster compatibility */
|
||||
if (server.cluster_compatibility_sample_ratio && !server.cluster_enabled &&
|
||||
!(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 &&
|
||||
c->cmd->proc != execCommand) && SHOULD_CLUSTER_COMPATIBILITY_SAMPLE())
|
||||
{
|
||||
c->cluster_compatibility_check_slot = -1;
|
||||
if (!areCommandKeysInSameSlot(c, &c->cluster_compatibility_check_slot)) {
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
/* If we find cross slot keys, reset slot to -2 to indicate we won't
|
||||
* check this command again. That is useful for script, since we need
|
||||
* this variable to decide if we continue checking accessing keys. */
|
||||
c->cluster_compatibility_check_slot = -2;
|
||||
}
|
||||
}
|
||||
|
||||
/* Disconnect some clients if total clients memory is too high. We do this
|
||||
* before key eviction, after the last command was executed and consumed
|
||||
* some client output buffer memory. */
|
||||
|
@ -4316,6 +4332,46 @@ int processCommand(client *c) {
|
|||
return C_OK;
|
||||
}
|
||||
|
||||
/* Checks if all keys in a command (or a MULTI-EXEC) belong to the same hash slot.
|
||||
* If yes, return 1, otherwise 0. If hashslot is not NULL, it will be set to the
|
||||
* slot of the keys. */
|
||||
int areCommandKeysInSameSlot(client *c, int *hashslot) {
|
||||
int slot = -1;
|
||||
multiState *ms = NULL;
|
||||
|
||||
if (c->cmd->proc == execCommand) {
|
||||
if (!(c->flags & CLIENT_MULTI)) return 1;
|
||||
else ms = &c->mstate;
|
||||
}
|
||||
|
||||
/* If client is in multi-exec, we need to check the slot of all keys
|
||||
* in the transaction. */
|
||||
for (int i = 0; i < (ms ? ms->count : 1); i++) {
|
||||
struct redisCommand *cmd = ms ? ms->commands[i].cmd : c->cmd;
|
||||
robj **argv = ms ? ms->commands[i].argv : c->argv;
|
||||
int argc = ms ? ms->commands[i].argc : c->argc;
|
||||
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
int numkeys = getKeysFromCommand(cmd, argv, argc, &result);
|
||||
keyReference *keyindex = result.keys;
|
||||
|
||||
/* Check if all keys have the same slots, increment the metric if not */
|
||||
for (int j = 0; j < numkeys; j++) {
|
||||
robj *thiskey = argv[keyindex[j].pos];
|
||||
int thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr));
|
||||
if (slot == -1) {
|
||||
slot = thisslot;
|
||||
} else if (slot != thisslot) {
|
||||
getKeysFreeResult(&result);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
getKeysFreeResult(&result);
|
||||
}
|
||||
if (hashslot) *hashslot = slot;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* ====================== Error lookup and execution ===================== */
|
||||
|
||||
/* Users who abuse lua error_reply will generate a new error object on each
|
||||
|
@ -6083,6 +6139,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
|||
"instantaneous_eventloop_cycles_per_sec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_CYCLE),
|
||||
"instantaneous_eventloop_duration_usec:%llu\r\n", getInstantaneousMetric(STATS_METRIC_EL_DURATION)));
|
||||
info = genRedisInfoStringACLStats(info);
|
||||
if (!server.cluster_enabled && server.cluster_compatibility_sample_ratio) {
|
||||
info = sdscatprintf(info, "cluster_incompatible_ops:%lld\r\n", server.stat_cluster_incompatible_ops);
|
||||
}
|
||||
}
|
||||
|
||||
/* Replication */
|
||||
|
|
13
src/server.h
13
src/server.h
|
@ -1239,6 +1239,10 @@ typedef struct {
|
|||
size_t mem_usage_sum;
|
||||
} clientMemUsageBucket;
|
||||
|
||||
#define SHOULD_CLUSTER_COMPATIBILITY_SAMPLE() \
|
||||
(server.cluster_compatibility_sample_ratio == 100 || \
|
||||
(double)rand()/RAND_MAX * 100 < server.cluster_compatibility_sample_ratio)
|
||||
|
||||
#ifdef LOG_REQ_RES
|
||||
/* Structure used to log client's requests and their
|
||||
* responses (see logreqres.c) */
|
||||
|
@ -1305,6 +1309,11 @@ typedef struct client {
|
|||
time_t ctime; /* Client creation time. */
|
||||
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
|
||||
int slot; /* The slot the client is executing against. Set to -1 if no slot is being used */
|
||||
int cluster_compatibility_check_slot; /* The slot the client is executing against for cluster compatibility check.
|
||||
* -2 means we don't need to check slot violation, or we already found
|
||||
* a violation, reported it and don't need to continue checking.
|
||||
* -1 means we're looking for the slot number and didn't find it yet.
|
||||
* any positive number means we found a slot and no violation yet. */
|
||||
dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */
|
||||
time_t lastinteraction; /* Time of the last interaction, used for timeout */
|
||||
time_t obuf_soft_limit_reached_time;
|
||||
|
@ -1848,6 +1857,7 @@ struct redisServer {
|
|||
redisAtomic long long stat_io_writes_processed[IO_THREADS_MAX_NUM]; /* Number of write events processed by IO / Main threads */
|
||||
redisAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
|
||||
long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */
|
||||
long long stat_cluster_incompatible_ops; /* Number of operations that are incompatible with cluster mode */
|
||||
/* The following two are used to track instantaneous metrics, like
|
||||
* number of operations per second, network traffic. */
|
||||
struct {
|
||||
|
@ -1903,6 +1913,8 @@ struct redisServer {
|
|||
int latency_tracking_info_percentiles_len;
|
||||
unsigned int max_new_tls_conns_per_cycle; /* The maximum number of tls connections that will be accepted during each invocation of the event loop. */
|
||||
unsigned int max_new_conns_per_cycle; /* The maximum number of tcp connections that will be accepted during each invocation of the event loop. */
|
||||
int cluster_compatibility_sample_ratio; /* Sampling ratio for cluster mode incompatible commands. */
|
||||
|
||||
/* AOF persistence */
|
||||
int aof_enabled; /* AOF configuration */
|
||||
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
|
||||
|
@ -3232,6 +3244,7 @@ int processCommand(client *c);
|
|||
void commandProcessed(client *c);
|
||||
int processPendingCommandAndInputBuffer(client *c);
|
||||
int processCommandAndResetClient(client *c);
|
||||
int areCommandKeysInSameSlot(client *c, int *hashslot);
|
||||
void setupSignalHandlers(void);
|
||||
int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler);
|
||||
connListener *listenerByType(const char *typename);
|
||||
|
|
23
src/sort.c
23
src/sort.c
|
@ -230,6 +230,17 @@ void sortCommandGeneric(client *c, int readonly) {
|
|||
syntax_error++;
|
||||
break;
|
||||
}
|
||||
|
||||
/* If the BY pattern slot is not equal with the slot of keys, we will record
|
||||
* an incompatible behavior as above comments. */
|
||||
if (server.cluster_compatibility_sample_ratio && !server.cluster_enabled &&
|
||||
SHOULD_CLUSTER_COMPATIBILITY_SAMPLE())
|
||||
{
|
||||
if (patternHashSlot(sortby->ptr, sdslen(sortby->ptr)) !=
|
||||
(int)keyHashSlot(c->argv[1]->ptr, sdslen(c->argv[1]->ptr)))
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
}
|
||||
|
||||
/* If BY is specified with a real pattern, we can't accept
|
||||
* it if no full ACL key access is applied for this command. */
|
||||
if (!user_has_full_key_access) {
|
||||
|
@ -253,6 +264,18 @@ void sortCommandGeneric(client *c, int readonly) {
|
|||
syntax_error++;
|
||||
break;
|
||||
}
|
||||
|
||||
/* If the GET pattern slot is not equal with the slot of keys, we will record
|
||||
* an incompatible behavior as above comments. */
|
||||
if (server.cluster_compatibility_sample_ratio && !server.cluster_enabled &&
|
||||
strcmp(c->argv[j+1]->ptr, "#") &&
|
||||
SHOULD_CLUSTER_COMPATIBILITY_SAMPLE())
|
||||
{
|
||||
if (patternHashSlot(c->argv[j+1]->ptr, sdslen(c->argv[j+1]->ptr)) !=
|
||||
(int)keyHashSlot(c->argv[1]->ptr, sdslen(c->argv[1]->ptr)))
|
||||
server.stat_cluster_incompatible_ops++;
|
||||
}
|
||||
|
||||
if (!user_has_full_key_access) {
|
||||
addReplyError(c,"GET option of SORT denied due to insufficient ACL permissions.");
|
||||
syntax_error++;
|
||||
|
|
|
@ -612,10 +612,10 @@ void incrDecrCommand(client *c, long long incr) {
|
|||
dbAdd(c->db,c->argv[1],new);
|
||||
}
|
||||
}
|
||||
addReplyLongLongFromStr(c,new);
|
||||
signalModifiedKey(c,c->db,c->argv[1]);
|
||||
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
|
||||
server.dirty++;
|
||||
addReplyLongLongFromStr(c,new);
|
||||
}
|
||||
|
||||
void incrCommand(client *c) {
|
||||
|
|
75
src/util.c
75
src/util.c
|
@ -660,13 +660,22 @@ int string2ld(const char *s, size_t slen, long double *dp) {
|
|||
int string2d(const char *s, size_t slen, double *dp) {
|
||||
errno = 0;
|
||||
char *eptr;
|
||||
/* Fast path to reject empty strings, or strings starting by space explicitly */
|
||||
if (unlikely(slen == 0 ||
|
||||
isspace(((const char*)s)[0])))
|
||||
return 0;
|
||||
*dp = fast_float_strtod(s, &eptr);
|
||||
if (slen == 0 ||
|
||||
isspace(((const char*)s)[0]) ||
|
||||
(size_t)(eptr-(char*)s) != slen ||
|
||||
/* If `fast_float_strtod` didn't consume full input, try `strtod`
|
||||
* Given fast_float does not support hexadecimal strings representation */
|
||||
if (unlikely((size_t)(eptr - (char*)s) != slen)) {
|
||||
char *fallback_eptr;
|
||||
*dp = strtod(s, &fallback_eptr);
|
||||
if ((size_t)(fallback_eptr - (char*)s) != slen) return 0;
|
||||
}
|
||||
if (unlikely(errno == EINVAL ||
|
||||
(errno == ERANGE &&
|
||||
(*dp == HUGE_VAL || *dp == -HUGE_VAL || fpclassify(*dp) == FP_ZERO)) ||
|
||||
isnan(*dp))
|
||||
isnan(*dp)))
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
@ -1535,6 +1544,63 @@ static void test_string2l(void) {
|
|||
#endif
|
||||
}
|
||||
|
||||
static void test_string2d(void) {
|
||||
char buf[1024];
|
||||
double v;
|
||||
|
||||
/* Valid hexadecimal value. */
|
||||
redis_strlcpy(buf,"0x0p+0",sizeof(buf));
|
||||
assert(string2d(buf,strlen(buf),&v) == 1);
|
||||
assert(v == 0.0);
|
||||
|
||||
redis_strlcpy(buf,"0x1p+0",sizeof(buf));
|
||||
assert(string2d(buf,strlen(buf),&v) == 1);
|
||||
assert(v == 1.0);
|
||||
|
||||
/* Valid floating-point numbers */
|
||||
redis_strlcpy(buf, "1.5", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 1);
|
||||
assert(v == 1.5);
|
||||
|
||||
redis_strlcpy(buf, "-3.14", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 1);
|
||||
assert(v == -3.14);
|
||||
|
||||
redis_strlcpy(buf, "2.0e10", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 1);
|
||||
assert(v == 2.0e10);
|
||||
|
||||
redis_strlcpy(buf, "1e-3", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 1);
|
||||
assert(v == 0.001);
|
||||
|
||||
/* Valid integer */
|
||||
redis_strlcpy(buf, "42", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 1);
|
||||
assert(v == 42.0);
|
||||
|
||||
/* Invalid cases */
|
||||
/* Empty. */
|
||||
redis_strlcpy(buf, "", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 0);
|
||||
|
||||
/* Starting by space. */
|
||||
redis_strlcpy(buf, " 1.23", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 0);
|
||||
|
||||
/* Invalid hexadecimal format. */
|
||||
redis_strlcpy(buf, "0x1.2g", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 0);
|
||||
|
||||
/* Hexadecimal NaN */
|
||||
redis_strlcpy(buf, "0xNan", sizeof(buf));
|
||||
assert(string2d(buf, strlen(buf), &v) == 0);
|
||||
|
||||
/* overflow. */
|
||||
redis_strlcpy(buf,"23456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789",sizeof(buf));
|
||||
assert(string2d(buf,strlen(buf),&v) == 0);
|
||||
}
|
||||
|
||||
static void test_ll2string(void) {
|
||||
char buf[32];
|
||||
long long v;
|
||||
|
@ -1693,6 +1759,7 @@ int utilTest(int argc, char **argv, int flags) {
|
|||
|
||||
test_string2ll();
|
||||
test_string2l();
|
||||
test_string2d();
|
||||
test_ll2string();
|
||||
test_ld2string();
|
||||
test_fixedpoint_d2string();
|
||||
|
|
|
@ -1619,3 +1619,32 @@ start_server {tags {"repl external:skip"}} {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
set replica [srv 0 client]
|
||||
start_server {} {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
|
||||
test "Test replication with functions when repl-diskless-load is set to on-empty-db" {
|
||||
$replica config set repl-diskless-load on-empty-db
|
||||
|
||||
populate 10 master 10
|
||||
$master function load {#!lua name=test
|
||||
redis.register_function{function_name='func1', callback=function() return 'hello' end, flags={'no-writes'}}
|
||||
}
|
||||
|
||||
$replica replicaof $master_host $master_port
|
||||
|
||||
# Wait until replication is completed
|
||||
wait_replica_online $master 0 1000 100
|
||||
wait_for_ofs_sync $master $replica
|
||||
|
||||
# Sanity check
|
||||
assert_equal [$replica fcall func1 0] "hello"
|
||||
assert_morethan [$replica dbsize] 0
|
||||
assert_equal [$master debug digest] [$replica debug digest]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -540,6 +540,14 @@ int test_keyslot(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
|||
return RedisModule_ReplyWithLongLong(ctx, slot);
|
||||
}
|
||||
|
||||
int only_reply_ok(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
||||
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
|
@ -607,6 +615,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||
return REDISMODULE_ERR;
|
||||
if (RedisModule_CreateCommand(ctx, "test.keyslot", test_keyslot, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
if (RedisModule_CreateCommand(ctx, "test.incompatible_cluster_cmd", only_reply_ok, "", 1, -1, 2) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
if (RedisModule_CreateCommand(ctx, "test.no_cluster_cmd", NULL, "no-cluster", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
RedisModuleCommand *parent = RedisModule_GetCommand(ctx, "test.no_cluster_cmd");
|
||||
if (RedisModule_CreateSubcommand(parent, "set", only_reply_ok, "no-cluster", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
|
|
@ -59,15 +59,31 @@ start_server {tags {"dump"}} {
|
|||
assert_equal [r get foo] {bar}
|
||||
r config set maxmemory-policy noeviction
|
||||
} {OK} {needs:config-maxmemory}
|
||||
|
||||
|
||||
test {RESTORE can set LFU} {
|
||||
r set foo bar
|
||||
set encoded [r dump foo]
|
||||
r del foo
|
||||
r config set maxmemory-policy allkeys-lfu
|
||||
r restore foo 0 $encoded freq 100
|
||||
|
||||
# We need to determine whether the `object` operation happens within the same minute or crosses into a new one
|
||||
# This will help us verify if the freq remains 100 or decays due to a minute transition
|
||||
set start [clock format [clock seconds] -format %M]
|
||||
set freq [r object freq foo]
|
||||
assert {$freq == 100}
|
||||
set end [clock format [clock seconds] -format %M]
|
||||
|
||||
if { $start == $end } {
|
||||
# If the minutes haven't changed (i.e., the restore and object happened within the same minute),
|
||||
# the freq should remain 100 as no decay has occurred yet.
|
||||
assert {$freq == 100}
|
||||
} else {
|
||||
# If the object operation crosses into a new minute, freq may have already decayed by 1 (99),
|
||||
# or it may still be 100 if the minute update hasn't been applied yet when the operation is performed.
|
||||
# The decay might only take effect after the operation completes and the minute is updated.
|
||||
assert {($freq == 100) || ($freq == 99)}
|
||||
}
|
||||
|
||||
r get foo
|
||||
assert_equal [r get foo] {bar}
|
||||
r config set maxmemory-policy noeviction
|
||||
|
|
|
@ -174,4 +174,18 @@ start_server {tags {"lazyfree"}} {
|
|||
assert_equal [s lazyfreed_objects] 2
|
||||
$rd close
|
||||
}
|
||||
|
||||
test "Unblocks client blocked on lazyfree via REPLICAOF command" {
|
||||
set rd [redis_deferring_client]
|
||||
|
||||
populate 50000 ;# Just to make flushdb async slower
|
||||
$rd flushdb
|
||||
wait_for_blocked_client
|
||||
# Test that slaveof command unblocks clients without assertion failure
|
||||
r slaveof 127.0.0.1 0
|
||||
assert_equal [$rd read] {OK}
|
||||
$rd close
|
||||
r ping
|
||||
r slaveof no one
|
||||
} {OK} {external:skip}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ start_server {tags {"modules"} overrides {{save ""}}} {
|
|||
r frag.create key1 1 1000 0
|
||||
|
||||
r config set activedefrag yes
|
||||
wait_for_condition 100 50 {
|
||||
wait_for_condition 200 50 {
|
||||
[getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0
|
||||
} else {
|
||||
fail "Unable to wait for a complete defragmentation cycle to finish"
|
||||
|
@ -52,7 +52,7 @@ start_server {tags {"modules"} overrides {{save ""}}} {
|
|||
r frag.create key2 10000 100 1000
|
||||
|
||||
r config set activedefrag yes
|
||||
wait_for_condition 100 50 {
|
||||
wait_for_condition 200 50 {
|
||||
[getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0
|
||||
} else {
|
||||
fail "Unable to wait for a complete defragmentation cycle to finish"
|
||||
|
@ -79,7 +79,7 @@ start_server {tags {"modules"} overrides {{save ""}}} {
|
|||
r frag.create_frag_global
|
||||
r config set activedefrag yes
|
||||
|
||||
wait_for_condition 100 50 {
|
||||
wait_for_condition 200 50 {
|
||||
[getInfoProperty [r info defragtest_stats] defragtest_defrag_ended] > 0
|
||||
} else {
|
||||
fail "Unable to wait for a complete defragmentation cycle to finish"
|
||||
|
|
|
@ -3,6 +3,10 @@ set testmodule [file normalize tests/modules/keyspace_events.so]
|
|||
tags "modules" {
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
|
||||
# avoid using shared integers, to increase the chance of detection heap issues
|
||||
r config set maxmemory-policy allkeys-lru
|
||||
r config set maxmemory 1gb
|
||||
|
||||
test {Test loaded key space event} {
|
||||
r set x 1
|
||||
r hset y f v
|
||||
|
|
|
@ -561,3 +561,27 @@ if {[string match {*jemalloc*} [s mem_allocator]]} {
|
|||
assert_equal {OK} [r module unload misc]
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"modules"}} {
|
||||
test {Detect incompatible operations in cluster mode for module} {
|
||||
r config set cluster-compatibility-sample-ratio 100
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# since test.no_cluster_cmd and its subcommand have 'no-cluster' flag,
|
||||
# they should not be counted as incompatible ops, increment the counter by 2
|
||||
r module load $testmodule
|
||||
assert_equal [expr $incompatible_ops+2] [s cluster_incompatible_ops]
|
||||
|
||||
# incompatible_cluster_cmd is similar with MSET, check if it is counted as
|
||||
# incompatible ops with different number of keys
|
||||
# only 1 key, should not increment the counter
|
||||
r test.incompatible_cluster_cmd foo bar
|
||||
assert_equal [expr $incompatible_ops+2] [s cluster_incompatible_ops]
|
||||
# 2 cross slot keys, should increment the counter
|
||||
r test.incompatible_cluster_cmd foo bar bar foo
|
||||
assert_equal [expr $incompatible_ops+3] [s cluster_incompatible_ops]
|
||||
# 2 non cross slot keys, should not increment the counter
|
||||
r test.incompatible_cluster_cmd foo bar bar{foo} bar
|
||||
assert_equal [expr $incompatible_ops+3] [s cluster_incompatible_ops]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -530,3 +530,204 @@ start_server {tags {"other external:skip"}} {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"other external:skip"} overrides {cluster-compatibility-sample-ratio 100}} {
|
||||
test {Cross DB command is incompatible with cluster mode} {
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# SELECT with 0 is compatible command in cluster mode
|
||||
assert_equal {OK} [r select 0]
|
||||
assert_equal $incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# SELECT with nonzero is incompatible command in cluster mode
|
||||
assert_equal {OK} [r select 1]
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
|
||||
# SWAPDB is incompatible command in cluster mode
|
||||
assert_equal {OK} [r swapdb 0 1]
|
||||
assert_equal [expr $incompatible_ops + 2] [s cluster_incompatible_ops]
|
||||
|
||||
|
||||
# If destination db in COPY command is equal to source db, it is compatible
|
||||
# with cluster mode, otherwise it is incompatible.
|
||||
r select 0
|
||||
r set key1 value1
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
assert_equal {1} [r copy key1 key2{key1}] ;# destination db is equal to source db
|
||||
assert_equal $incompatible_ops [s cluster_incompatible_ops]
|
||||
assert_equal {1} [r copy key2{key1} key1 db 1] ;# destination db is not equal to source db
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
|
||||
# If destination db in MOVE command is not equal to source db, it is incompatible
|
||||
# with cluster mode.
|
||||
r set key3 value3
|
||||
assert_equal {1} [r move key3 1]
|
||||
assert_equal [expr $incompatible_ops + 2] [s cluster_incompatible_ops]
|
||||
} {} {cluster:skip}
|
||||
|
||||
test {Function no-cluster flag is incompatible with cluster mode} {
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# no-cluster flag is incompatible with cluster mode
|
||||
r function load {#!lua name=test
|
||||
redis.register_function{function_name='f1', callback=function() return 'hello' end, flags={'no-cluster'}}
|
||||
}
|
||||
r fcall f1 0
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
|
||||
# It is compatible without no-cluster flag, should not increase the cluster_incompatible_ops
|
||||
r function load {#!lua name=test2
|
||||
redis.register_function{function_name='f2', callback=function() return 'hello' end}
|
||||
}
|
||||
r fcall f2 0
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
} {} {cluster:skip}
|
||||
|
||||
test {Script no-cluster flag is incompatible with cluster mode} {
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# no-cluster flag is incompatible with cluster mode
|
||||
r eval {#!lua flags=no-cluster
|
||||
return 1
|
||||
} 0
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
|
||||
# It is compatible without no-cluster flag, should not increase the cluster_incompatible_ops
|
||||
r eval {#!lua
|
||||
return 1
|
||||
} 0
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
} {} {cluster:skip}
|
||||
|
||||
test {SORT command incompatible operations with cluster mode} {
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# If the BY pattern slot is not equal with the slot of keys, we consider
|
||||
# an incompatible behavior, otherwise it is compatible, should not increase
|
||||
# the cluster_incompatible_ops
|
||||
r lpush mylist 1 2 3
|
||||
for {set i 1} {$i < 4} {incr i} {
|
||||
r set weight_$i [expr 4 - $i]
|
||||
}
|
||||
assert_equal {3 2 1} [r sort mylist BY weight_*]
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
# weight{mylist}_* and mylist have the same slot
|
||||
for {set i 1} {$i < 4} {incr i} {
|
||||
r set weight{mylist}_$i [expr 4 - $i]
|
||||
}
|
||||
assert_equal {3 2 1} [r sort mylist BY weight{mylist}_*]
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
|
||||
# If the GET pattern slot is not equal with the slot of keys, we consider
|
||||
# an incompatible behavior, otherwise it is compatible, should not increase
|
||||
# the cluster_incompatible_ops
|
||||
for {set i 1} {$i < 4} {incr i} {
|
||||
r set object_$i o_$i
|
||||
}
|
||||
assert_equal {o_3 o_2 o_1} [r sort mylist BY weight{mylist}_* GET object_*]
|
||||
assert_equal [expr $incompatible_ops + 2] [s cluster_incompatible_ops]
|
||||
# object{mylist}_*, weight{mylist}_* and mylist have the same slot
|
||||
for {set i 1} {$i < 4} {incr i} {
|
||||
r set object{mylist}_$i o_$i
|
||||
}
|
||||
assert_equal {o_3 o_2 o_1} [r sort mylist BY weight{mylist}_* GET object{mylist}_*]
|
||||
assert_equal [expr $incompatible_ops + 2] [s cluster_incompatible_ops]
|
||||
} {} {cluster:skip}
|
||||
|
||||
test {Normal cross slot commands are incompatible with cluster mode} {
|
||||
# Normal cross slot command
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
r mset foo bar bar foo
|
||||
r del foo bar
|
||||
assert_equal [expr $incompatible_ops + 2] [s cluster_incompatible_ops]
|
||||
} {} {cluster:skip}
|
||||
|
||||
test {Transaction is incompatible with cluster mode} {
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# Incomplete transaction
|
||||
catch {r EXEC}
|
||||
r multi
|
||||
r exec
|
||||
assert_equal $incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# Transaction, SET and DEL have keys with different slots
|
||||
r multi
|
||||
r set foo bar
|
||||
r del bar
|
||||
r exec
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
} {} {cluster:skip}
|
||||
|
||||
test {Lua scripts are incompatible with cluster mode} {
|
||||
# Lua script, declared keys have different slots, it is not a compatible operation
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
r eval {#!lua
|
||||
redis.call('mset', KEYS[1], 0, KEYS[2], 0)
|
||||
} 2 foo bar
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
|
||||
# Lua script, no declared keys, but accessing keys have different slots,
|
||||
# it is not a compatible operation
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
r eval {#!lua
|
||||
redis.call('mset', 'foo', 0, 'bar', 0)
|
||||
} 0
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
|
||||
# Lua script, declared keys have the same slot, but accessing keys
|
||||
# have different slots in one command, even with flag 'allow-cross-slot-keys',
|
||||
# it still is not a compatible operation
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
r eval {#!lua flags=allow-cross-slot-keys
|
||||
redis.call('mset', 'foo', 0, 'bar', 0)
|
||||
redis.call('mset', KEYS[1], 0, KEYS[2], 0)
|
||||
} 2 foo bar{foo}
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
|
||||
# Lua script, declared keys have the same slot, but accessing keys have different slots
|
||||
# in multiple commands, and with flag 'allow-cross-slot-keys', it is a compatible operation
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
r eval {#!lua flags=allow-cross-slot-keys
|
||||
redis.call('set', 'foo', 0)
|
||||
redis.call('set', 'bar', 0)
|
||||
redis.call('mset', KEYS[1], 0, KEYS[2], 0)
|
||||
} 2 foo bar{foo}
|
||||
assert_equal $incompatible_ops [s cluster_incompatible_ops]
|
||||
} {} {cluster:skip}
|
||||
|
||||
test {Shard subscribe commands are incompatible with cluster mode} {
|
||||
set rd1 [redis_deferring_client]
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
assert_equal {1 2} [ssubscribe $rd1 {foo bar}]
|
||||
assert_equal [expr $incompatible_ops + 1] [s cluster_incompatible_ops]
|
||||
} {} {cluster:skip}
|
||||
|
||||
test {cluster-compatibility-sample-ratio configuration can work} {
|
||||
# Disable cluster compatibility sampling, no increase in cluster_incompatible_ops
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
r config set cluster-compatibility-sample-ratio 0
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
r mset foo bar$i bar foo$i
|
||||
}
|
||||
# Enable cluster compatibility sampling again to show the metric
|
||||
r config set cluster-compatibility-sample-ratio 1
|
||||
assert_equal $incompatible_ops [s cluster_incompatible_ops]
|
||||
|
||||
# 100% sample ratio, all operations should increase cluster_incompatible_ops
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
r config set cluster-compatibility-sample-ratio 100
|
||||
for {set i 0} {$i < 100} {incr i} {
|
||||
r mset foo bar$i bar foo$i
|
||||
}
|
||||
assert_equal [expr $incompatible_ops + 100] [s cluster_incompatible_ops]
|
||||
|
||||
# 30% sample ratio, cluster_incompatible_ops should increase between 20% and 40%
|
||||
set incompatible_ops [s cluster_incompatible_ops]
|
||||
r config set cluster-compatibility-sample-ratio 30
|
||||
for {set i 0} {$i < 1000} {incr i} {
|
||||
r mset foo bar$i bar foo$i
|
||||
}
|
||||
assert_range [s cluster_incompatible_ops] [expr $incompatible_ops + 200] [expr $incompatible_ops + 400]
|
||||
} {} {cluster:skip}
|
||||
}
|
||||
|
|
|
@ -359,6 +359,26 @@ start_server {tags {"pause network"}} {
|
|||
} {bar2}
|
||||
}
|
||||
|
||||
test "Test the randomkey command will not cause the server to get into an infinite loop during the client pause write" {
|
||||
r flushall
|
||||
|
||||
r multi
|
||||
r set key value px 3
|
||||
r client pause 10000 write
|
||||
r exec
|
||||
|
||||
after 5
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[r randomkey] == "key"
|
||||
} else {
|
||||
fail "execute randomkey failed, caused by the infinite loop"
|
||||
}
|
||||
|
||||
r client unpause
|
||||
assert_equal [r randomkey] {}
|
||||
}
|
||||
|
||||
# Make sure we unpause at the end
|
||||
r client unpause
|
||||
}
|
||||
|
|
|
@ -325,7 +325,7 @@ start_server {tags {"external:skip needs:debug"}} {
|
|||
r hset myhash field1 value1
|
||||
|
||||
r hexpireat myhash [expr {[clock seconds] + 2}] NX FIELDS 1 field1
|
||||
assert_range [r hpttl myhash FIELDS 1 field1] 1000 2000
|
||||
assert_range [r hpttl myhash FIELDS 1 field1] 500 2000
|
||||
assert_range [r httl myhash FIELDS 1 field1] 1 2
|
||||
|
||||
r hexpireat myhash [expr {[clock seconds] + 5}] XX FIELDS 1 field1
|
||||
|
|
|
@ -308,10 +308,28 @@ start_server {tags {"zset"}} {
|
|||
assert_error "*NaN*" {r zincrby myzset -inf abc}
|
||||
}
|
||||
|
||||
test "ZINCRBY accepts hexadecimal inputs - $encoding" {
|
||||
r del zhexa
|
||||
|
||||
# Add some hexadecimal values to the sorted set 'zhexa'
|
||||
r zadd zhexa 0x0p+0 "zero"
|
||||
r zadd zhexa 0x1p+0 "one"
|
||||
|
||||
# Increment them
|
||||
# 0 + 0 = 0
|
||||
r zincrby zhexa 0x0p+0 "zero"
|
||||
# 1 + 1 = 2
|
||||
r zincrby zhexa 0x1p+0 "one"
|
||||
|
||||
assert_equal 0 [r zscore zhexa "zero"]
|
||||
assert_equal 2 [r zscore zhexa "one"]
|
||||
}
|
||||
|
||||
test "ZINCRBY against invalid incr value - $encoding" {
|
||||
r del zincr
|
||||
r zadd zincr 1 "one"
|
||||
assert_error "*value is not a valid*" {r zincrby zincr v "one"}
|
||||
assert_error "*value is not a valid float" {r zincrby zincr 23456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789123456789 "one"}
|
||||
}
|
||||
|
||||
test "ZADD - Variadic version base case - $encoding" {
|
||||
|
|
Loading…
Reference in New Issue