Add CLUSTER SLOT-STATS command

Add CLUSTER SLOT-STATS command for key count, cpu time and
network IO per slot currently.

The command has the following syntax

    CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot

or

    CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC/DESC]

where metric can currently be one of the following

    key-count -- Number of keys in a given slot
    cpu-usec -- Amount of CPU time (in microseconds) spent on a given slot
    network-bytes-in -- Amount of network ingress (in bytes) received for given slot
    network-bytes-out -- Amount of network egress (in bytes) sent out for given slot

This PR is based on:
    valkey-io/valkey#351
    valkey-io/valkey#709
    valkey-io/valkey#710

Co-authored-by: Kyle Kim <kimkyle@amazon.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
Slavomir Kaslev 2025-05-07 18:49:15 +03:00
parent a0b22576b8
commit 8f3f9cd629
21 changed files with 1727 additions and 8 deletions

View File

@ -1798,6 +1798,16 @@ aof-timestamp-enabled no
# #
# cluster-compatibility-sample-ratio 0 # cluster-compatibility-sample-ratio 0
# Clusters can be configured to track per-slot resource statistics,
# which are accessible by the CLUSTER SLOT-STATS command.
#
# By default, the 'cluster-slot-stats-enabled' is disabled, and only 'key-count' is captured.
# By enabling the 'cluster-slot-stats-enabled' config, the cluster will begin to capture advanced statistics.
# These statistics can be leveraged to assess general slot usage trends, identify hot / cold slots,
# migrate slots for a balanced cluster workload, and / or re-write application logic to better utilize slots.
#
# cluster-slot-stats-enabled no
# In order to setup your cluster make sure to read the documentation # In order to setup your cluster make sure to read the documentation
# available at https://redis.io web site. # available at https://redis.io web site.

View File

@ -375,7 +375,7 @@ endif
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)

View File

@ -49,6 +49,7 @@
#include "slowlog.h" #include "slowlog.h"
#include "latency.h" #include "latency.h"
#include "monotonic.h" #include "monotonic.h"
#include "cluster_slot_stats.h"
/* forward declarations */ /* forward declarations */
static void unblockClientWaitingData(client *c); static void unblockClientWaitingData(client *c);
@ -91,6 +92,7 @@ void blockClient(client *c, int btype) {
* This function will make updates to the commandstats, slowlog and monitors.*/ * This function will make updates to the commandstats, slowlog and monitors.*/
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){ void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors){
const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us; const ustime_t total_cmd_duration = c->duration + blocked_us + reply_us;
clusterSlotStatsAddCpuDuration(c, total_cmd_duration);
c->lastcmd->microseconds += total_cmd_duration; c->lastcmd->microseconds += total_cmd_duration;
c->lastcmd->calls++; c->lastcmd->calls++;
c->commands_processed++; c->commands_processed++;

View File

@ -2,6 +2,9 @@
* Copyright (c) 2009-Present, Redis Ltd. * Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved. * All rights reserved.
* *
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0 * Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3). * GNU Affero General Public License v3 (AGPLv3).
@ -17,6 +20,7 @@
#include "server.h" #include "server.h"
#include "cluster.h" #include "cluster.h"
#include "cluster_slot_stats.h"
#include <ctype.h> #include <ctype.h>
@ -930,6 +934,8 @@ void clusterCommandHelp(client *c) {
"SLOTS", "SLOTS",
" Return information about slots range mappings. Each range is made of:", " Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids", " start, end, master and replicas IP addresses, ports and ids",
"SLOT-STATS",
" Return an array of slot usage statistics for slots assigned to the current node.",
"SHARDS", "SHARDS",
" Return information about slot range mappings and the nodes associated with them.", " Return information about slot range mappings and the nodes associated with them.",
NULL NULL
@ -1688,3 +1694,12 @@ void readwriteCommand(client *c) {
c->flags &= ~CLIENT_READONLY; c->flags &= ~CLIENT_READONLY;
addReply(c,shared.ok); addReply(c,shared.ok);
} }
/* Resets transient cluster stats that we expose via INFO or other means that we want
* to reset via CONFIG RESETSTAT. The function is also used in order to
* initialize these fields in clusterInit() at server startup. */
void resetClusterStats(void) {
if (!server.cluster_enabled) return;
clusterSlotStatResetAll();
}

View File

@ -1,3 +1,17 @@
/*
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
#ifndef __CLUSTER_H #ifndef __CLUSTER_H
#define __CLUSTER_H #define __CLUSTER_H
@ -130,6 +144,8 @@ const char *clusterNodePreferredEndpoint(clusterNode *n);
long long clusterNodeReplOffset(clusterNode *node); long long clusterNodeReplOffset(clusterNode *node);
clusterNode *clusterLookupNode(const char *name, int length); clusterNode *clusterLookupNode(const char *name, int length);
const char *clusterGetSecret(size_t *len); const char *clusterGetSecret(size_t *len);
unsigned int countKeysInSlot(unsigned int slot);
int getSlotOrReply(client *c, robj *o);
/* functions with shared implementations */ /* functions with shared implementations */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code); clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, uint64_t cmd_flags, int *error_code);
@ -147,6 +163,7 @@ void clusterFreeNodesSlotsInfo(clusterNode *n);
int clusterNodeSlotInfoCount(clusterNode *n); int clusterNodeSlotInfoCount(clusterNode *n);
uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx); uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx);
int clusterNodeHasSlotInfo(clusterNode *n); int clusterNodeHasSlotInfo(clusterNode *n);
void resetClusterStats(void);
int clusterGetShardCount(void); int clusterGetShardCount(void);
void *clusterGetShardIterator(void); void *clusterGetShardIterator(void);

View File

@ -20,6 +20,7 @@
#include "server.h" #include "server.h"
#include "cluster.h" #include "cluster.h"
#include "cluster_legacy.h" #include "cluster_legacy.h"
#include "cluster_slot_stats.h"
#include "endianconv.h" #include "endianconv.h"
#include "connection.h" #include "connection.h"
@ -1025,6 +1026,7 @@ void clusterInit(void) {
clusterUpdateMyselfIp(); clusterUpdateMyselfIp();
clusterUpdateMyselfHostname(); clusterUpdateMyselfHostname();
clusterUpdateMyselfHumanNodename(); clusterUpdateMyselfHumanNodename();
resetClusterStats();
getRandomHexChars(server.cluster->internal_secret, CLUSTER_INTERNALSECRETLEN); getRandomHexChars(server.cluster->internal_secret, CLUSTER_INTERNALSECRETLEN);
} }
@ -4989,6 +4991,9 @@ int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR; if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot); clusterNodeSetSlotBit(n,slot);
server.cluster->slots[slot] = n; server.cluster->slots[slot] = n;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clusterSlotStatReset(slot);
return C_OK; return C_OK;
} }
@ -5007,6 +5012,7 @@ int clusterDelSlot(int slot) {
server.cluster->slots[slot] = NULL; server.cluster->slots[slot] = NULL;
/* Make owner_not_claiming_slot flag consistent with slot ownership information. */ /* Make owner_not_claiming_slot flag consistent with slot ownership information. */
bitmapClearBit(server.cluster->owner_not_claiming_slot, slot); bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
clusterSlotStatReset(slot);
return C_OK; return C_OK;
} }

View File

@ -68,6 +68,7 @@ typedef struct clusterLink {
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */ #define CLUSTER_NODE_EXTENSIONS_SUPPORTED 1024 /* This node supports extensions. */
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" #define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE) #define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE) #define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR)) #define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
@ -330,6 +331,13 @@ struct _clusterNode {
list *fail_reports; /* List of nodes signaling this as failing */ list *fail_reports; /* List of nodes signaling this as failing */
}; };
/* Struct used for storing slot statistics. */
typedef struct slotStat {
uint64_t cpu_usec;
uint64_t network_bytes_in;
uint64_t network_bytes_out;
} slotStat;
struct clusterState { struct clusterState {
clusterNode *myself; /* This node */ clusterNode *myself; /* This node */
uint64_t currentEpoch; uint64_t currentEpoch;
@ -377,6 +385,8 @@ struct clusterState {
* stops claiming the slot. This prevents spreading incorrect information (that * stops claiming the slot. This prevents spreading incorrect information (that
* source still owns the slot) using UPDATE messages. */ * source still owns the slot) using UPDATE messages. */
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8]; unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
/* Struct used for storing slot statistics, for all slots owned by the current shard. */
slotStat slot_stats[CLUSTER_SLOTS];
}; };

337
src/cluster_slot_stats.c Normal file
View File

@ -0,0 +1,337 @@
/*
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
#include "cluster_slot_stats.h"
typedef enum {
KEY_COUNT,
CPU_USEC,
NETWORK_BYTES_IN,
NETWORK_BYTES_OUT,
SLOT_STAT_COUNT,
INVALID
} slotStatType;
/* -----------------------------------------------------------------------------
* CLUSTER SLOT-STATS command
* -------------------------------------------------------------------------- */
/* Struct used to temporarily hold slot statistics for sorting. */
typedef struct {
int slot;
uint64_t stat;
} slotStatForSort;
static int markSlotsAssignedToMyShard(unsigned char *assigned_slots, int start_slot, int end_slot) {
clusterNode *primary = clusterNodeGetMaster(getMyClusterNode());
int assigned_slots_count = 0;
for (int slot = start_slot; slot <= end_slot; slot++) {
if (!clusterNodeCoversSlot(primary, slot)) continue;
assigned_slots[slot]++;
assigned_slots_count++;
}
return assigned_slots_count;
}
static uint64_t getSlotStat(int slot, slotStatType stat_type) {
switch (stat_type) {
case KEY_COUNT: return countKeysInSlot(slot);
case CPU_USEC: return server.cluster->slot_stats[slot].cpu_usec;
case NETWORK_BYTES_IN: return server.cluster->slot_stats[slot].network_bytes_in;
case NETWORK_BYTES_OUT: return server.cluster->slot_stats[slot].network_bytes_out;
default: serverPanic("Invalid slot stat type %d was found.", stat_type);
}
}
/* Compare by stat in ascending order. If stat is the same, compare by slot in ascending order. */
static int slotStatForSortAscCmp(const void *a, const void *b) {
const slotStatForSort *entry_a = a;
const slotStatForSort *entry_b = b;
if (entry_a->stat == entry_b->stat) {
return entry_a->slot - entry_b->slot;
}
return entry_a->stat - entry_b->stat;
}
/* Compare by stat in descending order. If stat is the same, compare by slot in ascending order. */
static int slotStatForSortDescCmp(const void *a, const void *b) {
const slotStatForSort *entry_a = a;
const slotStatForSort *entry_b = b;
if (entry_b->stat == entry_a->stat) {
return entry_a->slot - entry_b->slot;
}
return entry_b->stat - entry_a->stat;
}
static void collectAndSortSlotStats(slotStatForSort slot_stats[], slotStatType order_by, int desc) {
clusterNode *primary = clusterNodeGetMaster(getMyClusterNode());
int i = 0;
for (int slot = 0; slot < CLUSTER_SLOTS; slot++) {
if (!clusterNodeCoversSlot(primary, slot)) continue;
slot_stats[i].slot = slot;
slot_stats[i].stat = getSlotStat(slot, order_by);
i++;
}
qsort(slot_stats, i, sizeof(slotStatForSort), desc ? slotStatForSortDescCmp : slotStatForSortAscCmp);
}
static void addReplySlotStat(client *c, int slot) {
addReplyArrayLen(c, 2); /* Array of size 2, where 0th index represents (int) slot,
* and 1st index represents (map) usage statistics. */
addReplyLongLong(c, slot);
addReplyMapLen(c, (server.cluster_slot_stats_enabled) ? SLOT_STAT_COUNT
: 1); /* Nested map representing slot usage statistics. */
addReplyBulkCString(c, "key-count");
addReplyLongLong(c, countKeysInSlot(slot));
/* Any additional metrics aside from key-count come with a performance trade-off,
* and are aggregated and returned based on its server config. */
if (server.cluster_slot_stats_enabled) {
addReplyBulkCString(c, "cpu-usec");
addReplyLongLong(c, server.cluster->slot_stats[slot].cpu_usec);
addReplyBulkCString(c, "network-bytes-in");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_in);
addReplyBulkCString(c, "network-bytes-out");
addReplyLongLong(c, server.cluster->slot_stats[slot].network_bytes_out);
}
}
/* Adds reply for the SLOTSRANGE variant.
* Response is ordered in ascending slot number. */
static void addReplySlotsRange(client *c, unsigned char *assigned_slots, int start_slot, int end_slot, int len) {
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */
for (int slot = start_slot; slot <= end_slot; slot++) {
if (assigned_slots[slot]) addReplySlotStat(c, slot);
}
}
static void addReplySortedSlotStats(client *c, slotStatForSort slot_stats[], long limit) {
int num_slots_assigned = getMyShardSlotCount();
int len = min(limit, num_slots_assigned);
addReplyArrayLen(c, len); /* Top level RESP reply format is defined as an array, due to ordering invariance. */
for (int i = 0; i < len; i++) {
addReplySlotStat(c, slot_stats[i].slot);
}
}
static int canAddNetworkBytesOut(client *c) {
return server.cluster_slot_stats_enabled && server.cluster_enabled && c->slot != -1;
}
/* Accumulates egress bytes upon sending RESP responses back to user clients. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
if (!canAddNetworkBytesOut(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}
/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;
/* We multiply the bytes len by the number of replicas to account for us broadcasting to multiple replicas at once. */
len *= (long long)listLength(server.slaves);
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
serverAssert(nodeIsMaster(server.cluster->myself));
/* We sometimes want to adjust the counter downwards (for example when we want to undo accounting for
* SELECT commands that don't belong to any slot) so let's make sure we don't underflow the counter. */
serverAssert(len >= 0 || server.cluster->slot_stats[c->slot].network_bytes_out >= (uint64_t)-len);
server.cluster->slot_stats[c->slot].network_bytes_out += len;
}
/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica
* count. */
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(len);
}
/* Decrement network bytes out for replication stream.
* This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command.
* This will decrement `len` value times the active replica count. */
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(-len);
}
/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
* This type is not aggregated, to stay consistent with server.stat_net_output_bytes aggregation.
* This function covers the internal propagation component. */
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot) {
/* For a blocked client, c->slot could be pre-filled.
* Thus c->slot is backed-up for restoration after aggregation is completed. */
int save_slot = c->slot;
c->slot = slot;
if (canAddNetworkBytesOut(c)) {
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].network_bytes_out += c->net_output_bytes_curr_cmd;
}
/* For sharded pubsub, the client's network bytes metrics must be reset here,
* as resetClient() is not called until subscription ends. */
c->net_output_bytes_curr_cmd = 0;
c->slot = save_slot;
}
/* Adds reply for the ORDERBY variant.
* Response is ordered based on the sort result. */
static void addReplyOrderBy(client *c, slotStatType order_by, long limit, int desc) {
slotStatForSort slot_stats[CLUSTER_SLOTS];
collectAndSortSlotStats(slot_stats, order_by, desc);
addReplySortedSlotStats(c, slot_stats, limit);
}
/* Resets applicable slot statistics. */
void clusterSlotStatReset(int slot) {
/* key-count is exempt, as it is queried separately through `countKeysInSlot()`. */
memset(&server.cluster->slot_stats[slot], 0, sizeof(slotStat));
}
void clusterSlotStatResetAll(void) {
memset(server.cluster->slot_stats, 0, sizeof(server.cluster->slot_stats));
}
/* For cpu-usec accumulation, nested commands within EXEC, EVAL, FCALL are skipped.
* This is due to their unique callstack, where the c->duration for
* EXEC, EVAL and FCALL already includes all of its nested commands.
* Meaning, the accumulation of cpu-usec for these nested commands
* would equate to repeating the same calculation twice.
*/
static int canAddCpuDuration(client *c) {
return server.cluster_slot_stats_enabled && /* Config should be enabled. */
server.cluster_enabled && /* Cluster mode should be enabled. */
c->slot != -1 && /* Command should be slot specific. */
(!server.execution_nesting || /* Either command should not be nested, */
(c->realcmd->flags & CMD_BLOCKING)); /* or it must be due to unblocking. */
}
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration) {
if (!canAddCpuDuration(c)) return;
serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
server.cluster->slot_stats[c->slot].cpu_usec += duration;
}
/* For cross-slot scripting, its caller client's slot must be invalidated,
* such that its slot-stats aggregation is bypassed. */
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx) {
if (!(ctx->flags & SCRIPT_ALLOW_CROSS_SLOT)) return;
ctx->original_client->slot = -1;
}
static int canAddNetworkBytesIn(client *c) {
/* First, cluster mode must be enabled.
* Second, command should target a specific slot.
* Third, blocked client is not aggregated, to avoid duplicate aggregation upon unblocking.
* Fourth, the server is not under a MULTI/EXEC transaction, to avoid duplicate aggregation of
* EXEC's 14 bytes RESP upon nested call()'s afterCommand(). */
return server.cluster_enabled && server.cluster_slot_stats_enabled &&
c->slot != -1 && !(c->flags & CLIENT_BLOCKED) && !server.in_exec;
}
/* Adds network ingress bytes of the current command in execution,
* calculated earlier within networking.c layer.
*
* Note: Below function should only be called once c->slot is parsed.
* Otherwise, the aggregation will be skipped due to canAddNetworkBytesIn() check failure.
* */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c) {
if (!canAddNetworkBytesIn(c)) return;
if (c->cmd->proc == execCommand) {
/* Accumulate its corresponding MULTI RESP; *1\r\n$5\r\nmulti\r\n */
c->net_input_bytes_curr_cmd += 15;
}
server.cluster->slot_stats[c->slot].network_bytes_in += c->net_input_bytes_curr_cmd;
}
void clusterSlotStatsCommand(client *c) {
if (!server.cluster_enabled) {
addReplyError(c, "This instance has cluster support disabled");
return;
}
/* Parse additional arguments. */
if (c->argc == 5 && !strcasecmp(c->argv[2]->ptr, "slotsrange")) {
/* CLUSTER SLOT-STATS SLOTSRANGE start-slot end-slot */
int start_slot, end_slot;
if ((start_slot = getSlotOrReply(c, c->argv[3])) == -1 ||
(end_slot = getSlotOrReply(c, c->argv[4])) == -1) {
return;
}
if (start_slot > end_slot) {
addReplyErrorFormat(c, "Start slot number %d is greater than end slot number %d", start_slot, end_slot);
return;
}
/* Initialize slot assignment array. */
unsigned char assigned_slots[CLUSTER_SLOTS] = {0};
int assigned_slots_count = markSlotsAssignedToMyShard(assigned_slots, start_slot, end_slot);
addReplySlotsRange(c, assigned_slots, start_slot, end_slot, assigned_slots_count);
} else if (c->argc >= 4 && !strcasecmp(c->argv[2]->ptr, "orderby")) {
/* CLUSTER SLOT-STATS ORDERBY metric [LIMIT limit] [ASC | DESC] */
int desc = 1;
slotStatType order_by = INVALID;
if (!strcasecmp(c->argv[3]->ptr, "key-count")) {
order_by = KEY_COUNT;
} else if (!strcasecmp(c->argv[3]->ptr, "cpu-usec") && server.cluster_slot_stats_enabled) {
order_by = CPU_USEC;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-in") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_IN;
} else if (!strcasecmp(c->argv[3]->ptr, "network-bytes-out") && server.cluster_slot_stats_enabled) {
order_by = NETWORK_BYTES_OUT;
} else {
addReplyError(c, "Unrecognized sort metric for ORDERBY.");
return;
}
int i = 4; /* Next argument index, following ORDERBY */
int limit_counter = 0, asc_desc_counter = 0;
long limit = CLUSTER_SLOTS;
while (i < c->argc) {
int moreargs = c->argc > i + 1;
if (!strcasecmp(c->argv[i]->ptr, "limit") && moreargs) {
if (getRangeLongFromObjectOrReply(
c, c->argv[i + 1], 1, CLUSTER_SLOTS, &limit,
"Limit has to lie in between 1 and 16384 (maximum number of slots).") != C_OK) {
return;
}
i++;
limit_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "asc")) {
desc = 0;
asc_desc_counter++;
} else if (!strcasecmp(c->argv[i]->ptr, "desc")) {
desc = 1;
asc_desc_counter++;
} else {
addReplyErrorObject(c, shared.syntaxerr);
return;
}
if (limit_counter > 1 || asc_desc_counter > 1) {
addReplyError(c, "Multiple filters of the same type are disallowed.");
return;
}
i++;
}
addReplyOrderBy(c, order_by, limit, desc);
} else {
addReplySubcommandSyntaxError(c);
}
}

35
src/cluster_slot_stats.h Normal file
View File

@ -0,0 +1,35 @@
/*
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
#include "server.h"
#include "cluster.h"
#include "script.h"
#include "cluster_legacy.h"
/* General use-cases. */
void clusterSlotStatReset(int slot);
void clusterSlotStatResetAll(void);
/* cpu-usec metric. */
void clusterSlotStatsAddCpuDuration(client *c, ustime_t duration);
void clusterSlotStatsInvalidateSlotIfApplicable(scriptRunCtx *ctx);
/* network-bytes-in metric. */
void clusterSlotStatsAddNetworkBytesInForUserClient(client *c);
/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);

View File

@ -921,6 +921,56 @@ struct COMMAND_ARG CLUSTER_SLAVES_Args[] = {
{MAKE_ARG("node-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)}, {MAKE_ARG("node-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
}; };
/********** CLUSTER SLOT_STATS ********************/
#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SLOT_STATS history */
#define CLUSTER_SLOT_STATS_History NULL
#endif
#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER SLOT_STATS tips */
const char *CLUSTER_SLOT_STATS_Tips[] = {
"nondeterministic_output",
"request_policy:all_shards",
};
#endif
#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER SLOT_STATS key specs */
#define CLUSTER_SLOT_STATS_Keyspecs NULL
#endif
/* CLUSTER SLOT_STATS filter slotsrange argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_slotsrange_Subargs[] = {
{MAKE_ARG("start-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("end-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};
/* CLUSTER SLOT_STATS filter orderby order argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_order_Subargs[] = {
{MAKE_ARG("asc",ARG_TYPE_PURE_TOKEN,-1,"ASC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("desc",ARG_TYPE_PURE_TOKEN,-1,"DESC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};
/* CLUSTER SLOT_STATS filter orderby argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_orderby_Subargs[] = {
{MAKE_ARG("metric",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("limit",ARG_TYPE_INTEGER,-1,"LIMIT",NULL,NULL,CMD_ARG_OPTIONAL,0,NULL)},
{MAKE_ARG("order",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_order_Subargs},
};
/* CLUSTER SLOT_STATS filter argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_filter_Subargs[] = {
{MAKE_ARG("slotsrange",ARG_TYPE_BLOCK,-1,"SLOTSRANGE",NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_slotsrange_Subargs},
{MAKE_ARG("orderby",ARG_TYPE_BLOCK,-1,"ORDERBY",NULL,NULL,CMD_ARG_NONE,3,NULL),.subargs=CLUSTER_SLOT_STATS_filter_orderby_Subargs},
};
/* CLUSTER SLOT_STATS argument table */
struct COMMAND_ARG CLUSTER_SLOT_STATS_Args[] = {
{MAKE_ARG("filter",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_SLOT_STATS_filter_Subargs},
};
/********** CLUSTER SLOTS ********************/ /********** CLUSTER SLOTS ********************/
#ifndef SKIP_CMD_HISTORY_TABLE #ifndef SKIP_CMD_HISTORY_TABLE
@ -972,6 +1022,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args}, {MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)}, {MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args}, {MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slot-stats","Return an array of slot usage statistics for slots assigned to the current node.","O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.","8.2.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOT_STATS_History,0,CLUSTER_SLOT_STATS_Tips,2,clusterSlotStatsCommand,-4,CMD_STALE|CMD_LOADING,0,CLUSTER_SLOT_STATS_Keyspecs,0,NULL,1),.args=CLUSTER_SLOT_STATS_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)}, {MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
{0} {0}
}; };

View File

@ -0,0 +1,111 @@
{
"SLOT-STATS": {
"summary": "Return an array of slot usage statistics for slots assigned to the current node.",
"complexity": "O(N) where N is the total number of slots based on arguments. O(N*log(N)) with ORDERBY subcommand.",
"group": "cluster",
"since": "8.2.0",
"arity": -4,
"container": "CLUSTER",
"function": "clusterSlotStatsCommand",
"command_flags": [
"STALE",
"LOADING"
],
"command_tips": [
"NONDETERMINISTIC_OUTPUT",
"REQUEST_POLICY:ALL_SHARDS"
],
"reply_schema": {
"type": "array",
"description": "Array of nested arrays, where the inner array element represents a slot and its respective usage statistics.",
"items": {
"type": "array",
"description": "Array of size 2, where 0th index represents (int) slot and 1st index represents (map) usage statistics.",
"minItems": 2,
"maxItems": 2,
"items": [
{
"description": "Slot Number.",
"type": "integer"
},
{
"type": "object",
"description": "Map of slot usage statistics.",
"additionalProperties": false,
"properties": {
"key-count": {
"type": "integer"
},
"cpu-usec": {
"type": "integer"
},
"network-bytes-in": {
"type": "integer"
},
"network-bytes-out": {
"type": "integer"
}
}
}
]
}
},
"arguments": [
{
"name": "filter",
"type": "oneof",
"arguments": [
{
"token": "SLOTSRANGE",
"name": "slotsrange",
"type": "block",
"arguments": [
{
"name": "start-slot",
"type": "integer"
},
{
"name": "end-slot",
"type": "integer"
}
]
},
{
"token": "ORDERBY",
"name": "orderby",
"type": "block",
"arguments": [
{
"name": "metric",
"type": "string"
},
{
"token": "LIMIT",
"name": "limit",
"type": "integer",
"optional": true
},
{
"name": "order",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "asc",
"type": "pure-token",
"token": "ASC"
},
{
"name": "desc",
"type": "pure-token",
"token": "DESC"
}
]
}
]
}
]
}
]
}
}

View File

@ -3114,6 +3114,7 @@ standardConfig static_configs[] = {
createBoolConfig("aof-disable-auto-gc", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.aof_disable_auto_gc, 0, NULL, updateAofAutoGCEnabled), createBoolConfig("aof-disable-auto-gc", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.aof_disable_auto_gc, 0, NULL, updateAofAutoGCEnabled),
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL), createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 0, NULL, NULL), createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
/* String Configs */ /* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL), createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
@ -3465,6 +3466,7 @@ NULL
void configResetStatCommand(client *c) { void configResetStatCommand(client *c) {
resetServerStats(); resetServerStats();
resetClusterStats();
resetCommandTableStats(server.commands); resetCommandTableStats(server.commands);
resetErrorTableStats(); resetErrorTableStats();
addReply(c,shared.ok); addReply(c,shared.ok);

View File

@ -8,6 +8,8 @@
* Licensed under your choice of (a) the Redis Source Available License 2.0 * Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3). * GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/ */
#include "server.h" #include "server.h"
@ -311,17 +313,32 @@ int calculateKeySlot(sds key) {
/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/ /* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/
int getKeySlot(sds key) { int getKeySlot(sds key) {
if (!server.cluster_enabled) return 0;
/* This is performance optimization that uses pre-set slot id from the current command, /* This is performance optimization that uses pre-set slot id from the current command,
* in order to avoid calculation of the key hash. * in order to avoid calculation of the key hash.
*
* This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set. * This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
* It only gets set during the execution of command under `call` method. Other flows requesting * It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot. * the key slot would fallback to calculateKeySlot.
*
* Modules and scripts executed on the primary may get replicated as multi-execs that operate on multiple slots,
* so we must always recompute the slot for commands coming from the primary.
*/ */
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND) { if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND &&
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key)==server.current_client->slot); !(server.current_client->flags & CLIENT_MASTER))
{
debugServerAssertWithInfo(server.current_client, NULL,
(int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot);
return server.current_client->slot; return server.current_client->slot;
} }
return calculateKeySlot(key); int slot = keyHashSlot(key, (int)sdslen(key));
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
* we are able to backfill c->slot here, where the key's hash calculation is made. */
if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) {
server.current_client->slot = slot;
}
return slot;
} }
/* This is a special version of dbAdd() that is used only when loading /* This is a special version of dbAdd() that is used only when loading

View File

@ -15,6 +15,7 @@
#include "server.h" #include "server.h"
#include "atomicvar.h" #include "atomicvar.h"
#include "cluster.h" #include "cluster.h"
#include "cluster_slot_stats.h"
#include "script.h" #include "script.h"
#include "fpconv_dtoa.h" #include "fpconv_dtoa.h"
#include "fmtargs.h" #include "fmtargs.h"
@ -220,6 +221,8 @@ client *createClient(connection *conn) {
listInitNode(&c->clients_pending_write_node, c); listInitNode(&c->clients_pending_write_node, c);
c->mem_usage_bucket = NULL; c->mem_usage_bucket = NULL;
c->mem_usage_bucket_node = NULL; c->mem_usage_bucket_node = NULL;
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes_curr_cmd = 0;
if (conn) linkClient(c); if (conn) linkClient(c);
initClientMultiState(c); initClientMultiState(c);
c->net_input_bytes = 0; c->net_input_bytes = 0;
@ -398,6 +401,7 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
return; return;
} }
c->net_output_bytes_curr_cmd += len;
/* We call it here because this function may affect the reply /* We call it here because this function may affect the reply
* buffer offset (see function comment) */ * buffer offset (see function comment) */
reqresSaveClientReplyOffset(c); reqresSaveClientReplyOffset(c);
@ -789,6 +793,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
if (len_to_copy > length) if (len_to_copy > length)
len_to_copy = length; len_to_copy = length;
memcpy(prev->buf + prev->used, s, len_to_copy); memcpy(prev->buf + prev->used, s, len_to_copy);
c->net_output_bytes_curr_cmd += len_to_copy;
prev->used += len_to_copy; prev->used += len_to_copy;
length -= len_to_copy; length -= len_to_copy;
if (length == 0) { if (length == 0) {
@ -804,6 +809,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
{ {
memmove(next->buf + length, next->buf, next->used); memmove(next->buf + length, next->buf, next->used);
memcpy(next->buf, s, length); memcpy(next->buf, s, length);
c->net_output_bytes_curr_cmd += length;
next->used += length; next->used += length;
listDelNode(c->reply,ln); listDelNode(c->reply,ln);
} else { } else {
@ -814,6 +820,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
buf->size = usable_size - sizeof(clientReplyBlock); buf->size = usable_size - sizeof(clientReplyBlock);
buf->used = length; buf->used = length;
memcpy(buf->buf, s, length); memcpy(buf->buf, s, length);
c->net_output_bytes_curr_cmd += length;
listNodeValue(ln) = buf; listNodeValue(ln) = buf;
c->reply_bytes += buf->size; c->reply_bytes += buf->size;
@ -2276,6 +2283,9 @@ static inline void resetClientInternal(client *c, int free_argv) {
c->flags |= CLIENT_REPLY_SKIP; c->flags |= CLIENT_REPLY_SKIP;
c->flags &= ~CLIENT_REPLY_SKIP_NEXT; c->flags &= ~CLIENT_REPLY_SKIP_NEXT;
} }
c->net_input_bytes_curr_cmd = 0;
c->net_output_bytes_curr_cmd = 0;
} }
/* resetClient prepare the client to process the next command */ /* resetClient prepare the client to process the next command */
@ -2394,6 +2404,22 @@ int processInlineBuffer(client *c) {
c->argv_len_sum += sdslen(argv[j]); c->argv_len_sum += sdslen(argv[j]);
} }
zfree(argv); zfree(argv);
/* Per-slot network bytes-in calculation.
*
* We calculate and store the current command's ingress bytes under
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For inline buffer, every whitespace is of length 1,
* with the exception of the trailing '\r\n' being length 2.
*
* For example;
* Command) SET key value
* Inline) SET key value\r\n
*/
c->net_input_bytes_curr_cmd = (c->argv_len_sum + (c->argc - 1) + 2);
return C_OK; return C_OK;
} }
@ -2467,6 +2493,7 @@ int processMultibulkBuffer(client *c) {
/* We know for sure there is a whole line since newline != NULL, /* We know for sure there is a whole line since newline != NULL,
* so go ahead and find out the multi bulk length. */ * so go ahead and find out the multi bulk length. */
serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*'); serverAssertWithInfo(c,NULL,c->querybuf[c->qb_pos] == '*');
size_t multibulklen_slen = newline - (c->querybuf + 1 + c->qb_pos);
ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll); ok = string2ll(c->querybuf+1+c->qb_pos,newline-(c->querybuf+1+c->qb_pos),&ll);
if (!ok || ll > INT_MAX) { if (!ok || ll > INT_MAX) {
c->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH; c->read_error = CLIENT_READ_INVALID_MULTIBUCK_LENGTH;
@ -2494,6 +2521,39 @@ int processMultibulkBuffer(client *c) {
c->argv = zmalloc(sizeof(robj*)*c->argv_len); c->argv = zmalloc(sizeof(robj*)*c->argv_len);
} }
c->argv_len_sum = 0; c->argv_len_sum = 0;
/* Per-slot network bytes-in calculation.
*
* We calculate and store the current command's ingress bytes under
* c->net_input_bytes_curr_cmd, for which its per-slot aggregation is deferred
* until c->slot is parsed later within processCommand().
*
* Calculation: For multi bulk buffer, we accumulate four factors, namely;
*
* 1) multibulklen_slen + 3
* Cumulative string length (and not the value of) of multibulklen,
* including the first "*" byte and last "\r\n" 2 bytes from RESP.
* 2) bulklen_slen + 3
* Cumulative string length (and not the value of) of bulklen,
* including +3 from RESP first "$" byte and last "\r\n" 2 bytes per argument count.
* 3) c->argv_len_sum
* Cumulative string length of all argument vectors.
* 4) c->argc * 2
* Cumulative string length of the arguments' white-spaces, for which there exists a total of
* "\r\n" 2 bytes per argument.
*
* For example;
* Command) SET key value
* RESP) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
*
* 1) String length of "*3\r\n" is 4, obtained from (multibulklen_slen + 3).
* 2) String length of "$3\r\n" "$3\r\n" "$5\r\n" is 12, obtained from (bulklen_slen + 3).
* 3) String length of "SET" "key" "value" is 11, obtained from (c->argv_len_sum).
* 4) String length of the 3 arguments' white-spaces "\r\n" is 6, obtained from (c->argc * 2).
*
* The 1st component is calculated within the below line.
* */
c->net_input_bytes_curr_cmd += (multibulklen_slen + 3);
} }
serverAssertWithInfo(c,NULL,c->multibulklen > 0); serverAssertWithInfo(c,NULL,c->multibulklen > 0);
@ -2518,6 +2578,7 @@ int processMultibulkBuffer(client *c) {
return C_ERR; return C_ERR;
} }
size_t bulklen_slen = newline - (c->querybuf + c->qb_pos + 1);
ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll); ok = string2ll(c->querybuf+c->qb_pos+1,newline-(c->querybuf+c->qb_pos+1),&ll);
if (!ok || ll < 0 || if (!ok || ll < 0 ||
(!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) { (!(c->flags & CLIENT_MASTER) && ll > server.proto_max_bulk_len)) {
@ -2557,6 +2618,8 @@ int processMultibulkBuffer(client *c) {
} }
} }
c->bulklen = ll; c->bulklen = ll;
/* Per-slot network bytes-in calculation, 2nd component. */
c->net_input_bytes_curr_cmd += (bulklen_slen + 3);
} }
/* Read bulk argument */ /* Read bulk argument */
@ -2598,7 +2661,11 @@ int processMultibulkBuffer(client *c) {
} }
/* We're done when c->multibulk == 0 */ /* We're done when c->multibulk == 0 */
if (c->multibulklen == 0) return C_OK; if (c->multibulklen == 0) {
/* Per-slot network bytes-in calculation, 3rd and 4th components. */
c->net_input_bytes_curr_cmd += (c->argv_len_sum + (c->argc * 2));
return C_OK;
}
/* Still not ready to process the command */ /* Still not ready to process the command */
return C_ERR; return C_ERR;
@ -2620,6 +2687,7 @@ void commandProcessed(client *c) {
if (c->flags & CLIENT_BLOCKED) return; if (c->flags & CLIENT_BLOCKED) return;
reqresAppendResponse(c); reqresAppendResponse(c);
clusterSlotStatsAddNetworkBytesInForUserClient(c);
resetClientInternal(c, 0); resetClientInternal(c, 0);
long long prev_offset = c->reploff; long long prev_offset = c->reploff;

View File

@ -2,13 +2,19 @@
* Copyright (c) 2009-Present, Redis Ltd. * Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved. * All rights reserved.
* *
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0 * Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3). * GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/ */
#include "server.h" #include "server.h"
#include "cluster.h" #include "cluster.h"
#include "cluster_slot_stats.h"
/* Structure to hold the pubsub related metadata. Currently used /* Structure to hold the pubsub related metadata. Currently used
* for pubsub and pubsubshard feature. */ * for pubsub and pubsubshard feature. */
@ -470,6 +476,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
while ((entry = dictNext(iter)) != NULL) { while ((entry = dictNext(iter)) != NULL) {
client *c = dictGetKey(entry); client *c = dictGetKey(entry);
addReplyPubsubMessage(c,channel,message,*type.messageBulk); addReplyPubsubMessage(c,channel,message,*type.messageBulk);
clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(c, slot);
updateClientMemUsageAndBucket(c); updateClientMemUsageAndBucket(c);
receivers++; receivers++;
} }

View File

@ -28,6 +28,7 @@
#include "server.h" #include "server.h"
#include "cluster.h" #include "cluster.h"
#include "cluster_slot_stats.h"
#include "bio.h" #include "bio.h"
#include "functions.h" #include "functions.h"
#include "connection.h" #include "connection.h"
@ -392,6 +393,8 @@ void feedReplicationBuffer(char *s, size_t len) {
if (server.repl_backlog == NULL) return; if (server.repl_backlog == NULL) return;
clusterSlotStatsIncrNetworkBytesOutForReplication(len);
while(len > 0) { while(len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */ size_t start_pos = 0; /* The position of referenced block to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
@ -547,6 +550,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
feedReplicationBufferWithObject(selectcmd); feedReplicationBufferWithObject(selectcmd);
/* Although the SELECT command is not associated with any slot,
* its per-slot network-bytes-out accumulation is made by the above function call.
* To cancel-out this accumulation, below adjustment is made. */
clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr));
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd); decrRefCount(selectcmd);
@ -4126,6 +4134,9 @@ void replicationSendAck(void) {
addReplyBulkLongLong(c,server.fsynced_reploff); addReplyBulkLongLong(c,server.fsynced_reploff);
} }
c->flags &= ~CLIENT_MASTER_FORCE_REPLY; c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
/* Accumulation from above replies must be reset back to 0 manually,
* as this subroutine does not invoke resetClient(). */
c->net_output_bytes_curr_cmd = 0;
} }
} }

View File

@ -2,14 +2,20 @@
* Copyright (c) 2009-Present, Redis Ltd. * Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved. * All rights reserved.
* *
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of (a) the Redis Source Available License 2.0 * Licensed under your choice of (a) the Redis Source Available License 2.0
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
* GNU Affero General Public License v3 (AGPLv3). * GNU Affero General Public License v3 (AGPLv3).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/ */
#include "server.h" #include "server.h"
#include "script.h" #include "script.h"
#include "cluster.h" #include "cluster.h"
#include "cluster_slot_stats.h"
#include <lua.h> #include <lua.h>
#include <lauxlib.h> #include <lauxlib.h>
@ -664,6 +670,7 @@ void scriptCall(scriptRunCtx *run_ctx, sds *err) {
} }
call(c, call_flags); call(c, call_flags);
serverAssert((c->flags & CLIENT_BLOCKED) == 0); serverAssert((c->flags & CLIENT_BLOCKED) == 0);
clusterSlotStatsInvalidateSlotIfApplicable(run_ctx);
return; return;
error: error:

View File

@ -15,6 +15,7 @@
#include "server.h" #include "server.h"
#include "monotonic.h" #include "monotonic.h"
#include "cluster.h" #include "cluster.h"
#include "cluster_slot_stats.h"
#include "slowlog.h" #include "slowlog.h"
#include "bio.h" #include "bio.h"
#include "latency.h" #include "latency.h"
@ -3803,13 +3804,14 @@ void call(client *c, int flags) {
if (!(c->flags & CLIENT_BLOCKED)) if (!(c->flags & CLIENT_BLOCKED))
freeClientOriginalArgv(c); freeClientOriginalArgv(c);
/* populate the per-command statistics that we show in INFO commandstats. /* Populate the per-command and per-slot statistics that we show in INFO commandstats and CLUSTER SLOT-STATS,
* If the client is blocked we will handle latency stats and duration when it is unblocked. */ * respectively. If the client is blocked we will handle latency stats and duration when it is unblocked. */
if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) { if (update_command_stats && !(c->flags & CLIENT_BLOCKED)) {
real_cmd->calls++; real_cmd->calls++;
real_cmd->microseconds += c->duration; real_cmd->microseconds += c->duration;
if (server.latency_tracking_enabled && !(c->flags & CLIENT_BLOCKED)) if (server.latency_tracking_enabled && !(c->flags & CLIENT_BLOCKED))
updateCommandLatencyHistogram(&(real_cmd->latency_histogram), c->duration*1000); updateCommandLatencyHistogram(&(real_cmd->latency_histogram), c->duration*1000);
clusterSlotStatsAddCpuDuration(c, c->duration);
} }
/* The duration needs to be reset after each call except for a blocked command, /* The duration needs to be reset after each call except for a blocked command,
@ -3963,6 +3965,8 @@ void afterCommand(client *c) {
/* Flush pending tracking invalidations. */ /* Flush pending tracking invalidations. */
trackingHandlePendingKeyInvalidations(); trackingHandlePendingKeyInvalidations();
clusterSlotStatsAddNetworkBytesOutForUserClient(c);
/* Flush other pending push messages. only when we are not in nested call. /* Flush other pending push messages. only when we are not in nested call.
* So the messages are not interleaved with transaction response. */ * So the messages are not interleaved with transaction response. */
if (!server.execution_nesting) if (!server.execution_nesting)

View File

@ -1438,6 +1438,11 @@ typedef struct client {
/* list node in clients_pending_write list */ /* list node in clients_pending_write list */
listNode clients_pending_write_node; listNode clients_pending_write_node;
/* Statistics and metrics */
size_t net_input_bytes_curr_cmd; /* Total network input bytes read for the
* execution of this client's current command. */
size_t net_output_bytes_curr_cmd; /* Total network output bytes sent to this
* client, by the current command. */
/* Response buffer */ /* Response buffer */
size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */ size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */ mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */
@ -2231,6 +2236,7 @@ struct redisServer {
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */ unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */ * dropping packets of a specific type */
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
/* Scripting */ /* Scripting */
unsigned int lua_arena; /* eval lua arena used in jemalloc. */ unsigned int lua_arena; /* eval lua arena used in jemalloc. */
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */ mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
@ -4024,6 +4030,7 @@ void sunsubscribeCommand(client *c);
void watchCommand(client *c); void watchCommand(client *c);
void unwatchCommand(client *c); void unwatchCommand(client *c);
void clusterCommand(client *c); void clusterCommand(client *c);
void clusterSlotStatsCommand(client *c);
void restoreCommand(client *c); void restoreCommand(client *c);
void migrateCommand(client *c); void migrateCommand(client *c);
void askingCommand(client *c); void askingCommand(client *c);

View File

@ -1,3 +1,17 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of (a) the Redis Source Available License 2.0
# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
# GNU Affero General Public License v3 (AGPLv3).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
# Cluster helper functions # Cluster helper functions
# Check if cluster configuration is consistent. # Check if cluster configuration is consistent.
@ -112,7 +126,7 @@ proc start_cluster {masters replicas options code {slot_allocator continuous_slo
# Configure the starting of multiple servers. Set cluster node timeout # Configure the starting of multiple servers. Set cluster node timeout
# aggressively since many tests depend on ping/pong messages. # aggressively since many tests depend on ping/pong messages.
set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000]] set cluster_options [list overrides [list cluster-enabled yes cluster-ping-interval 100 cluster-node-timeout 3000 cluster-slot-stats-enabled yes]]
set options [concat $cluster_options $options] set options [concat $cluster_options $options]
# Cluster mode only supports a single database, so before executing the tests # Cluster mode only supports a single database, so before executing the tests

View File

@ -0,0 +1,988 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of (a) the Redis Source Available License 2.0
# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
# GNU Affero General Public License v3 (AGPLv3).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
# Integration tests for CLUSTER SLOT-STATS command.
# -----------------------------------------------------------------------------
# Helper functions for CLUSTER SLOT-STATS test cases.
# -----------------------------------------------------------------------------
# Converts array RESP response into a dict.
# This is useful for many test cases, where unnecessary nesting is removed.
proc convert_array_into_dict {slot_stats} {
set res [dict create]
foreach slot_stat $slot_stats {
# slot_stat is an array of size 2, where 0th index represents (int) slot,
# and 1st index represents (map) usage statistics.
dict set res [lindex $slot_stat 0] [lindex $slot_stat 1]
}
return $res
}
proc get_cmdstat_usec {cmd r} {
set cmdstatline [cmdrstat $cmd r]
regexp "usec=(.*?),usec_per_call=(.*?),rejected_calls=0,failed_calls=0" $cmdstatline -> usec _
return $usec
}
proc initialize_expected_slots_dict {} {
set expected_slots [dict create]
for {set i 0} {$i < 16384} {incr i 1} {
dict set expected_slots $i 0
}
return $expected_slots
}
proc initialize_expected_slots_dict_with_range {start_slot end_slot} {
assert {$start_slot <= $end_slot}
set expected_slots [dict create]
for {set i $start_slot} {$i <= $end_slot} {incr i 1} {
dict set expected_slots $i 0
}
return $expected_slots
}
proc assert_empty_slot_stats {slot_stats metrics_to_assert} {
set slot_stats [convert_array_into_dict $slot_stats]
dict for {slot stats} $slot_stats {
foreach metric_name $metrics_to_assert {
set metric_value [dict get $stats $metric_name]
assert {$metric_value == 0}
}
}
}
proc assert_empty_slot_stats_with_exception {slot_stats exception_slots metrics_to_assert} {
set slot_stats [convert_array_into_dict $slot_stats]
dict for {slot stats} $exception_slots {
assert {[dict exists $slot_stats $slot]} ;# slot_stats must contain the expected slots.
}
dict for {slot stats} $slot_stats {
if {[dict exists $exception_slots $slot]} {
foreach metric_name $metrics_to_assert {
set metric_value [dict get $exception_slots $slot $metric_name]
assert {[dict get $stats $metric_name] == $metric_value}
}
} else {
dict for {metric value} $stats {
assert {$value == 0}
}
}
}
}
proc assert_equal_slot_stats {slot_stats_1 slot_stats_2 deterministic_metrics non_deterministic_metrics} {
set slot_stats_1 [convert_array_into_dict $slot_stats_1]
set slot_stats_2 [convert_array_into_dict $slot_stats_2]
assert {[dict size $slot_stats_1] == [dict size $slot_stats_2]}
dict for {slot stats_1} $slot_stats_1 {
assert {[dict exists $slot_stats_2 $slot]}
set stats_2 [dict get $slot_stats_2 $slot]
# For deterministic metrics, we assert their equality.
foreach metric $deterministic_metrics {
assert {[dict get $stats_1 $metric] == [dict get $stats_2 $metric]}
}
# For non-deterministic metrics, we assert their non-zeroness as a best-effort.
foreach metric $non_deterministic_metrics {
assert {([dict get $stats_1 $metric] == 0 && [dict get $stats_2 $metric] == 0) || \
([dict get $stats_1 $metric] != 0 && [dict get $stats_2 $metric] != 0)}
}
}
}
proc assert_all_slots_have_been_seen {expected_slots} {
dict for {k v} $expected_slots {
assert {$v == 1}
}
}
proc assert_slot_visibility {slot_stats expected_slots} {
set slot_stats [convert_array_into_dict $slot_stats]
dict for {slot _} $slot_stats {
assert {[dict exists $expected_slots $slot]}
dict set expected_slots $slot 1
}
assert_all_slots_have_been_seen $expected_slots
}
proc assert_slot_stats_monotonic_order {slot_stats orderby is_desc} {
# For Tcl dict, the order of iteration is the order in which the keys were inserted into the dictionary
# Thus, the response ordering is preserved upon calling 'convert_array_into_dict()'.
# Source: https://www.tcl.tk/man/tcl8.6.11/TclCmd/dict.htm
set slot_stats [convert_array_into_dict $slot_stats]
set prev_metric -1
dict for {_ stats} $slot_stats {
set curr_metric [dict get $stats $orderby]
if {$prev_metric != -1} {
if {$is_desc == 1} {
assert {$prev_metric >= $curr_metric}
} else {
assert {$prev_metric <= $curr_metric}
}
}
set prev_metric $curr_metric
}
}
proc assert_slot_stats_monotonic_descent {slot_stats orderby} {
assert_slot_stats_monotonic_order $slot_stats $orderby 1
}
proc assert_slot_stats_monotonic_ascent {slot_stats orderby} {
assert_slot_stats_monotonic_order $slot_stats $orderby 0
}
proc wait_for_replica_key_exists {key key_count} {
wait_for_condition 1000 50 {
[R 1 exists $key] eq "$key_count"
} else {
fail "Test key was not replicated"
}
}
# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS cpu-usec metric correctness.
# -----------------------------------------------------------------------------
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
# Define shared variables.
set key "FOO"
set key_slot [R 0 cluster keyslot $key]
set key_secondary "FOO2"
set key_secondary_slot [R 0 cluster keyslot $key_secondary]
set metrics_to_assert [list cpu-usec]
test "CLUSTER SLOT-STATS cpu-usec reset upon CONFIG RESETSTAT." {
R 0 SET $key VALUE
R 0 DEL $key
R 0 CONFIG RESETSTAT
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec reset upon slot migration." {
R 0 SET $key VALUE
R 0 CLUSTER DELSLOTS $key_slot
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
R 0 CLUSTER ADDSLOTS $key_slot
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for non-slot specific commands." {
R 0 INFO
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for slot specific commands." {
R 0 SET $key VALUE
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set usec [get_cmdstat_usec set r]
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec $usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on keyspace update." {
# Blocking command with no timeout. Only keyspace update can unblock this client.
set rd [redis_deferring_client]
$rd BLPOP $key 0
wait_for_blocked_clients_count 1
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
# When the client is blocked, no accumulation is made. This behaviour is identical to INFO COMMANDSTATS.
assert_empty_slot_stats $slot_stats $metrics_to_assert
# Unblocking command.
R 0 LPUSH $key value
wait_for_blocked_clients_count 0
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set lpush_usec [get_cmdstat_usec lpush r]
set blpop_usec [get_cmdstat_usec blpop r]
# Assert that both blocking and non-blocking command times have been accumulated.
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec [expr $lpush_usec + $blpop_usec]
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for blocking commands, unblocked on timeout." {
# Blocking command with 0.5 seconds timeout.
set rd [redis_deferring_client]
$rd BLPOP $key 0.5
# Confirm that the client is blocked, then unblocked within 1 second.
wait_for_blocked_clients_count 1
wait_for_blocked_clients_count 0
# Assert that the blocking command time has been accumulated.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set blpop_usec [get_cmdstat_usec blpop r]
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec $blpop_usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for transactions." {
set r1 [redis_client]
$r1 MULTI
$r1 SET $key value
$r1 GET $key
# CPU metric is not accumulated until EXEC is reached. This behaviour is identical to INFO COMMANDSTATS.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
# Execute transaction, and assert that all nested command times have been accumulated.
$r1 EXEC
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set exec_usec [get_cmdstat_usec exec r]
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec $exec_usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, without cross-slot keys." {
r eval [format "#!lua
redis.call('set', '%s', 'bar'); redis.call('get', '%s')" $key $key] 0
set eval_usec [get_cmdstat_usec eval r]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec $eval_usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for lua-scripts, with cross-slot keys." {
r eval [format "#!lua flags=allow-cross-slot-keys
redis.call('set', '%s', 'bar'); redis.call('get', '%s');
" $key $key_secondary] 0
# For cross-slot, we do not accumulate at all.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for functions, without cross-slot keys." {
set function_str [format "#!lua name=f1
redis.register_function{
function_name='f1',
callback=function() redis.call('set', '%s', '1') redis.call('get', '%s') end
}" $key $key]
r function load replace $function_str
r fcall f1 0
set fcall_usec [get_cmdstat_usec fcall r]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create cpu-usec $fcall_usec
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS cpu-usec for functions, with cross-slot keys." {
set function_str [format "#!lua name=f1
redis.register_function{
function_name='f1',
callback=function() redis.call('set', '%s', '1') redis.call('get', '%s') end,
flags={'allow-cross-slot-keys'}
}" $key $key_secondary]
r function load replace $function_str
r fcall f1 0
# For cross-slot, we do not accumulate at all.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
}
# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS network-bytes-in.
# -----------------------------------------------------------------------------
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
# Define shared variables.
set key "key"
set key_slot [R 0 cluster keyslot $key]
set metrics_to_assert [list network-bytes-in]
test "CLUSTER SLOT-STATS network-bytes-in, multi bulk buffer processing." {
# *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
R 0 SET $key value
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-in 33
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS network-bytes-in, in-line buffer processing." {
set rd [redis_deferring_client]
# SET key value\r\n --> 15 bytes.
$rd write "SET $key value\r\n"
$rd flush
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-in 15
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS network-bytes-in, blocking command." {
set rd [redis_deferring_client]
# *3\r\n$5\r\nblpop\r\n$3\r\nkey\r\n$1\r\n0\r\n --> 31 bytes.
$rd BLPOP $key 0
wait_for_blocked_clients_count 1
# Slot-stats must be empty here, as the client is yet to be unblocked.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
# *3\r\n$5\r\nlpush\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 35 bytes.
R 0 LPUSH $key value
wait_for_blocked_clients_count 0
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-in 66 ;# 31 + 35 bytes.
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS network-bytes-in, multi-exec transaction." {
set r [redis_client]
# *1\r\n$5\r\nmulti\r\n --> 15 bytes.
$r MULTI
# *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
assert {[$r SET $key value] eq {QUEUED}}
# *1\r\n$4\r\nexec\r\n --> 14 bytes.
assert {[$r EXEC] eq {OK}}
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-in 62 ;# 15 + 33 + 14 bytes.
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS network-bytes-in, non slot specific command." {
R 0 INFO
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS network-bytes-in, pub/sub." {
# PUB/SUB does not get accumulated at per-slot basis,
# as it is cluster-wide and is not slot specific.
set rd [redis_deferring_client]
$rd subscribe channel
R 0 publish channel message
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
}
start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
set channel "channel"
set key_slot [R 0 cluster keyslot $channel]
set metrics_to_assert [list network-bytes-in]
# Setup replication.
assert {[s -1 role] eq {slave}}
wait_for_condition 1000 50 {
[s -1 master_link_status] eq {up}
} else {
fail "Instance #1 master link status is not up"
}
R 1 readonly
test "CLUSTER SLOT-STATS network-bytes-in, sharded pub/sub." {
set slot [R 0 cluster keyslot $channel]
set primary [Rn 0]
set replica [Rn 1]
set replica_subcriber [redis_deferring_client -1]
$replica_subcriber SSUBSCRIBE $channel
# *2\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n --> 34 bytes.
$primary SPUBLISH $channel hello
# *3\r\n$8\r\nspublish\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
set slot_stats [$primary CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-in 42
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
set slot_stats [$replica CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-in 34
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
}
# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS network-bytes-out correctness.
# -----------------------------------------------------------------------------
start_cluster 1 0 {tags {external:skip cluster}} {
# Define shared variables.
set key "FOO"
set key_slot [R 0 cluster keyslot $key]
set expected_slots_to_key_count [dict create $key_slot 1]
set metrics_to_assert [list network-bytes-out]
R 0 CONFIG SET cluster-slot-stats-enabled yes
test "CLUSTER SLOT-STATS network-bytes-out, for non-slot specific commands." {
R 0 INFO
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS network-bytes-out, for slot specific commands." {
R 0 SET $key value
# +OK\r\n --> 5 bytes
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-out 5
]
]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
test "CLUSTER SLOT-STATS network-bytes-out, blocking commands." {
set rd [redis_deferring_client]
$rd BLPOP $key 0
wait_for_blocked_clients_count 1
# Assert empty slot stats here, since COB is yet to be flushed due to the block.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
# Unblock the command.
# LPUSH client) :1\r\n --> 4 bytes.
# BLPOP client) *2\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 24 bytes, upon unblocking.
R 0 LPUSH $key value
wait_for_blocked_clients_count 0
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-out 28 ;# 4 + 24 bytes.
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
R 0 CONFIG RESETSTAT
R 0 FLUSHALL
}
start_cluster 1 1 {tags {external:skip cluster}} {
# Define shared variables.
set key "FOO"
set key_slot [R 0 CLUSTER KEYSLOT $key]
set metrics_to_assert [list network-bytes-out]
R 0 CONFIG SET cluster-slot-stats-enabled yes
# Setup replication.
assert {[s -1 role] eq {slave}}
wait_for_condition 1000 50 {
[s -1 master_link_status] eq {up}
} else {
fail "Instance #1 master link status is not up"
}
R 1 readonly
test "CLUSTER SLOT-STATS network-bytes-out, replication stream egress." {
assert_equal [R 0 SET $key VALUE] {OK}
# Local client) +OK\r\n --> 5 bytes.
# Replication stream) *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-out 38 ;# 5 + 33 bytes.
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
}
start_cluster 1 1 {tags {external:skip cluster}} {
# Define shared variables.
set channel "channel"
set key_slot [R 0 cluster keyslot $channel]
set channel_secondary "channel2"
set key_slot_secondary [R 0 cluster keyslot $channel_secondary]
set metrics_to_assert [list network-bytes-out]
R 0 CONFIG SET cluster-slot-stats-enabled yes
test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, single channel." {
set slot [R 0 cluster keyslot $channel]
set publisher [Rn 0]
set subscriber [redis_client]
set replica [redis_deferring_client -1]
# Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes
$subscriber SSUBSCRIBE $channel
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-out 38
]
]
R 0 CONFIG RESETSTAT
# Publisher client) :1\r\n --> 4 bytes.
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
assert_equal 1 [$publisher SPUBLISH $channel hello]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create $key_slot [
dict create network-bytes-out 46 ;# 4 + 42 bytes.
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
$subscriber QUIT
R 0 FLUSHALL
R 0 CONFIG RESETSTAT
test "CLUSTER SLOT-STATS network-bytes-out, sharded pub/sub, cross-slot channels." {
set slot [R 0 cluster keyslot $channel]
set publisher [Rn 0]
set subscriber [redis_client]
set replica [redis_deferring_client -1]
# Stack multi-slot subscriptions against a single client.
# For primary channel;
# Subscriber client) *3\r\n$10\r\nssubscribe\r\n$7\r\nchannel\r\n:1\r\n --> 38 bytes
# For secondary channel;
# Subscriber client) *3\r\n$10\r\nssubscribe\r\n$8\r\nchannel2\r\n:1\r\n --> 39 bytes
$subscriber SSUBSCRIBE $channel
$subscriber SSUBSCRIBE $channel_secondary
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create \
$key_slot [ \
dict create network-bytes-out 38
] \
$key_slot_secondary [ \
dict create network-bytes-out 39
]
]
R 0 CONFIG RESETSTAT
# For primary channel;
# Publisher client) :1\r\n --> 4 bytes.
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$7\r\nchannel\r\n$5\r\nhello\r\n --> 42 bytes.
# For secondary channel;
# Publisher client) :1\r\n --> 4 bytes.
# Subscriber client) *3\r\n$8\r\nsmessage\r\n$8\r\nchannel2\r\n$5\r\nhello\r\n --> 43 bytes.
assert_equal 1 [$publisher SPUBLISH $channel hello]
assert_equal 1 [$publisher SPUBLISH $channel_secondary hello]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
set expected_slot_stats [
dict create \
$key_slot [ \
dict create network-bytes-out 46 ;# 4 + 42 bytes.
] \
$key_slot_secondary [ \
dict create network-bytes-out 47 ;# 4 + 43 bytes.
]
]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
}
# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS key-count metric correctness.
# -----------------------------------------------------------------------------
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
# Define shared variables.
set key "FOO"
set key_slot [R 0 cluster keyslot $key]
set metrics_to_assert [list key-count]
set expected_slot_stats [
dict create $key_slot [
dict create key-count 1
]
]
test "CLUSTER SLOT-STATS contains default value upon redis-server startup" {
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
test "CLUSTER SLOT-STATS contains correct metrics upon key introduction" {
R 0 SET $key TEST
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
test "CLUSTER SLOT-STATS contains correct metrics upon key mutation" {
R 0 SET $key NEW_VALUE
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats $expected_slot_stats $metrics_to_assert
}
test "CLUSTER SLOT-STATS contains correct metrics upon key deletion" {
R 0 DEL $key
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats $slot_stats $metrics_to_assert
}
test "CLUSTER SLOT-STATS slot visibility based on slot ownership changes" {
R 0 CONFIG SET cluster-require-full-coverage no
R 0 CLUSTER DELSLOTS $key_slot
set expected_slots [initialize_expected_slots_dict]
dict unset expected_slots $key_slot
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert {[dict size $expected_slots] == 16383}
assert_slot_visibility $slot_stats $expected_slots
R 0 CLUSTER ADDSLOTS $key_slot
set expected_slots [initialize_expected_slots_dict]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert {[dict size $expected_slots] == 16384}
assert_slot_visibility $slot_stats $expected_slots
}
}
# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS SLOTSRANGE sub-argument.
# -----------------------------------------------------------------------------
start_cluster 1 0 {tags {external:skip cluster}} {
test "CLUSTER SLOT-STATS SLOTSRANGE all slots present" {
set start_slot 100
set end_slot 102
set expected_slots [initialize_expected_slots_dict_with_range $start_slot $end_slot]
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE $start_slot $end_slot]
assert_slot_visibility $slot_stats $expected_slots
}
test "CLUSTER SLOT-STATS SLOTSRANGE some slots missing" {
set start_slot 100
set end_slot 102
set expected_slots [initialize_expected_slots_dict_with_range $start_slot $end_slot]
R 0 CLUSTER DELSLOTS $start_slot
dict unset expected_slots $start_slot
set slot_stats [R 0 CLUSTER SLOT-STATS SLOTSRANGE $start_slot $end_slot]
assert_slot_visibility $slot_stats $expected_slots
}
}
# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS ORDERBY sub-argument.
# -----------------------------------------------------------------------------
start_cluster 1 0 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
set metrics [list "key-count" "cpu-usec" "network-bytes-in" "network-bytes-out"]
# SET keys for target hashslots, to encourage ordering.
set hash_tags [list 0 1 2 3 4]
set num_keys 1
foreach hash_tag $hash_tags {
for {set i 0} {$i < $num_keys} {incr i 1} {
R 0 SET "$i{$hash_tag}" VALUE
}
incr num_keys 1
}
# SET keys for random hashslots, for random noise.
set num_keys 0
while {$num_keys < 1000} {
set random_key [randomInt 16384]
R 0 SET $random_key VALUE
incr num_keys 1
}
test "CLUSTER SLOT-STATS ORDERBY DESC correct ordering" {
foreach orderby $metrics {
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby DESC]
assert_slot_stats_monotonic_descent $slot_stats $orderby
}
}
test "CLUSTER SLOT-STATS ORDERBY ASC correct ordering" {
foreach orderby $metrics {
set slot_stats [R 0 CLUSTER SLOT-STATS ORDERBY $orderby ASC]
assert_slot_stats_monotonic_ascent $slot_stats $orderby
}
}
test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is less than number of assigned slots" {
R 0 FLUSHALL SYNC
R 0 CONFIG RESETSTAT
foreach orderby $metrics {
set limit 5
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC]
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC]
set slot_stats_desc_length [llength $slot_stats_desc]
set slot_stats_asc_length [llength $slot_stats_asc]
assert {$limit == $slot_stats_desc_length && $limit == $slot_stats_asc_length}
# All slot statistics have been reset to 0, so we will order by slot in ascending order.
set expected_slots [dict create 0 0 1 0 2 0 3 0 4 0]
assert_slot_visibility $slot_stats_desc $expected_slots
assert_slot_visibility $slot_stats_asc $expected_slots
}
}
test "CLUSTER SLOT-STATS ORDERBY LIMIT correct response pagination, where limit is greater than number of assigned slots" {
R 0 CONFIG SET cluster-require-full-coverage no
R 0 FLUSHALL SYNC
R 0 CLUSTER FLUSHSLOTS
R 0 CLUSTER ADDSLOTS 100 101
foreach orderby $metrics {
set num_assigned_slots 2
set limit 5
set slot_stats_desc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit DESC]
set slot_stats_asc [R 0 CLUSTER SLOT-STATS ORDERBY $orderby LIMIT $limit ASC]
set slot_stats_desc_length [llength $slot_stats_desc]
set slot_stats_asc_length [llength $slot_stats_asc]
set expected_response_length [expr min($num_assigned_slots, $limit)]
assert {$expected_response_length == $slot_stats_desc_length && $expected_response_length == $slot_stats_asc_length}
set expected_slots [dict create 100 0 101 0]
assert_slot_visibility $slot_stats_desc $expected_slots
assert_slot_visibility $slot_stats_asc $expected_slots
}
}
test "CLUSTER SLOT-STATS ORDERBY arg sanity check." {
# Non-existent argument.
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count non-existent-arg}
# Negative LIMIT.
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY key-count DESC LIMIT -1}
# Non-existent ORDERBY metric.
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY non-existent-metric}
# When cluster-slot-stats-enabled config is disabled, you cannot sort using advanced metrics.
R 0 CONFIG SET cluster-slot-stats-enabled no
set orderby "cpu-usec"
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
set orderby "network-bytes-in"
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
set orderby "network-bytes-out"
assert_error "ERR*" {R 0 CLUSTER SLOT-STATS ORDERBY $orderby}
}
}
# -----------------------------------------------------------------------------
# Test cases for CLUSTER SLOT-STATS replication.
# -----------------------------------------------------------------------------
start_cluster 1 1 {tags {external:skip cluster} overrides {cluster-slot-stats-enabled yes}} {
# Define shared variables.
set key "key"
set key_slot [R 0 CLUSTER KEYSLOT $key]
set primary [Rn 0]
set replica [Rn 1]
# For replication, assertions are split between deterministic and non-deterministic metrics.
# * For deterministic metrics, strict equality assertions are made.
# * For non-deterministic metrics, non-zeroness assertions are made.
# Non-zeroness as in, both primary and replica should either have some value, or no value at all.
#
# * key-count is deterministic between primary and its replica.
# * cpu-usec is non-deterministic between primary and its replica.
# * network-bytes-in is deterministic between primary and its replica.
# * network-bytes-out will remain empty in the replica, since primary client do not receive replies, unless for replicationSendAck().
set deterministic_metrics [list key-count network-bytes-in]
set non_deterministic_metrics [list cpu-usec]
set empty_metrics [list network-bytes-out]
# Setup replication.
assert {[s -1 role] eq {slave}}
wait_for_condition 1000 50 {
[s -1 master_link_status] eq {up}
} else {
fail "Instance #1 master link status is not up"
}
R 1 readonly
test "CLUSTER SLOT-STATS metrics replication for new keys" {
# *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$5\r\nvalue\r\n --> 33 bytes.
R 0 SET $key VALUE
set expected_slot_stats [
dict create $key_slot [
dict create key-count 1 network-bytes-in 33
]
]
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
wait_for_condition 500 10 {
[string match {*calls=1,*} [cmdrstat set $replica]]
} else {
fail "Replica did not receive the command."
}
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
assert_empty_slot_stats $slot_stats_replica $empty_metrics
}
R 0 CONFIG RESETSTAT
R 1 CONFIG RESETSTAT
test "CLUSTER SLOT-STATS metrics replication for existing keys" {
# *3\r\n$3\r\nset\r\n$3\r\nkey\r\n$13\r\nvalue_updated\r\n --> 42 bytes.
R 0 SET $key VALUE_UPDATED
set expected_slot_stats [
dict create $key_slot [
dict create key-count 1 network-bytes-in 42
]
]
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
wait_for_condition 500 10 {
[string match {*calls=1,*} [cmdrstat set $replica]]
} else {
fail "Replica did not receive the command."
}
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
assert_empty_slot_stats $slot_stats_replica $empty_metrics
}
R 0 CONFIG RESETSTAT
R 1 CONFIG RESETSTAT
test "CLUSTER SLOT-STATS metrics replication for deleting keys" {
# *2\r\n$3\r\ndel\r\n$3\r\nkey\r\n --> 22 bytes.
R 0 DEL $key
set expected_slot_stats [
dict create $key_slot [
dict create key-count 0 network-bytes-in 22
]
]
set slot_stats_master [R 0 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_empty_slot_stats_with_exception $slot_stats_master $expected_slot_stats $deterministic_metrics
wait_for_condition 500 10 {
[string match {*calls=1,*} [cmdrstat del $replica]]
} else {
fail "Replica did not receive the command."
}
set slot_stats_replica [R 1 CLUSTER SLOT-STATS SLOTSRANGE 0 16383]
assert_equal_slot_stats $slot_stats_master $slot_stats_replica $deterministic_metrics $non_deterministic_metrics
assert_empty_slot_stats $slot_stats_replica $empty_metrics
}
R 0 CONFIG RESETSTAT
R 1 CONFIG RESETSTAT
}