Make cluster shards cmd implementation generic

This and the previous commit make the cluster
shards command a generic implementation instead of a
specific implementation for each cluster API implementation.
This commit (a) adds functions to the cluster API
and (b) modifies the cluster shards cmd implementation
to use cluster API functions instead of directly
accessing the legacy clustering implementation.

Signed-off-by: Josh Hershberg <yehoshua@redis.com>
This commit is contained in:
Josh Hershberg 2024-07-24 11:47:26 +03:00
parent e3e631f394
commit 6d5d754119
3 changed files with 125 additions and 42 deletions

View File

@ -785,47 +785,51 @@ unsigned int countKeysInSlot(unsigned int slot) {
/* Add detailed information of a node to the output buffer of the given client. */ /* Add detailed information of a node to the output buffer of the given client. */
void addNodeDetailsToShardReply(client *c, clusterNode *node) { void addNodeDetailsToShardReply(client *c, clusterNode *node) {
int reply_count = 0; int reply_count = 0;
char *hostname;
void *node_replylen = addReplyDeferredLen(c); void *node_replylen = addReplyDeferredLen(c);
addReplyBulkCString(c, "id"); addReplyBulkCString(c, "id");
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);
reply_count++; reply_count++;
if (node->tcp_port) { if (clusterNodeTcpPort(node)) {
addReplyBulkCString(c, "port"); addReplyBulkCString(c, "port");
addReplyLongLong(c, node->tcp_port); addReplyLongLong(c, clusterNodeTcpPort(node));
reply_count++; reply_count++;
} }
if (node->tls_port) { if (clusterNodeTlsPort(node)) {
addReplyBulkCString(c, "tls-port"); addReplyBulkCString(c, "tls-port");
addReplyLongLong(c, node->tls_port); addReplyLongLong(c, clusterNodeTlsPort(node));
reply_count++; reply_count++;
} }
addReplyBulkCString(c, "ip"); addReplyBulkCString(c, "ip");
addReplyBulkCString(c, node->ip); addReplyBulkCString(c, clusterNodeIp(node));
reply_count++; reply_count++;
addReplyBulkCString(c, "endpoint"); addReplyBulkCString(c, "endpoint");
addReplyBulkCString(c, clusterNodePreferredEndpoint(node)); addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
reply_count++; reply_count++;
if (sdslen(node->hostname) != 0) { hostname = clusterNodeHostname(node);
if (hostname != NULL && *hostname != '\0') {
addReplyBulkCString(c, "hostname"); addReplyBulkCString(c, "hostname");
addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname)); addReplyBulkCString(c, hostname);
reply_count++; reply_count++;
} }
long long node_offset; long long node_offset;
if (node->flags & CLUSTER_NODE_MYSELF) { if (clusterNodeIsMyself(node)) {
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; node_offset = clusterNodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
} else { } else {
node_offset = node->repl_offset; node_offset = clusterNodeReplOffset(node);
} }
addReplyBulkCString(c, "role"); addReplyBulkCString(c, "role");
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master"); addReplyBulkCString(c, clusterNodeIsSlave(node) ? "replica" : "master");
reply_count++; reply_count++;
addReplyBulkCString(c, "replication-offset"); addReplyBulkCString(c, "replication-offset");
@ -834,9 +838,9 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
addReplyBulkCString(c, "health"); addReplyBulkCString(c, "health");
const char *health_msg = NULL; const char *health_msg = NULL;
if (nodeFailed(node)) { if (clusterNodeIsFailing(node)) {
health_msg = "fail"; health_msg = "fail";
} else if (nodeIsSlave(node) && node_offset == 0) { } else if (clusterNodeIsSlave(node) && node_offset == 0) {
health_msg = "loading"; health_msg = "loading";
} else { } else {
health_msg = "online"; health_msg = "online";
@ -847,64 +851,60 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
setDeferredMapLen(c, node_replylen, reply_count); setDeferredMapLen(c, node_replylen, reply_count);
} }
static clusterNode *clusterGetMasterFromShard(list *nodes) { static clusterNode *clusterGetMasterFromShard(void *shard_handle) {
clusterNode *n = NULL; clusterNode *n = NULL;
listIter li; void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
listNode *ln; while((n = clusterShardNodeIteratorNext(node_it)) != NULL) {
listRewind(nodes,&li); if (!clusterNodeIsFailing(n)) {
while ((ln = listNext(&li)) != NULL) {
clusterNode *node = listNodeValue(ln);
if (!nodeFailed(node)) {
n = node;
break; break;
} }
} }
clusterShardNodeIteratorFree(node_it);
if (!n) return NULL; if (!n) return NULL;
return clusterNodeGetMaster(n); return clusterNodeGetMaster(n);
} }
/* Add the shard reply of a single shard based off the given primary node. */ /* Add the shard reply of a single shard based off the given primary node. */
void addShardReplyForClusterShards(client *c, list *nodes) { void addShardReplyForClusterShards(client *c, void *shard_handle) {
serverAssert(listLength(nodes) > 0); serverAssert(clusterGetShardNodeCount(shard_handle) > 0);
addReplyMapLen(c, 2); addReplyMapLen(c, 2);
addReplyBulkCString(c, "slots"); addReplyBulkCString(c, "slots");
/* Use slot_info_pairs from the primary only */ /* Use slot_info_pairs from the primary only */
clusterNode *n = clusterGetMasterFromShard(nodes); clusterNode *master_node = clusterGetMasterFromShard(shard_handle);
if (n && n->slot_info_pairs != NULL) {
serverAssert((n->slot_info_pairs_count % 2) == 0); if (master_node && clusterNodeHasSlotInfo(master_node)) {
addReplyArrayLen(c, n->slot_info_pairs_count); serverAssert((clusterNodeSlotInfoCount(master_node) % 2) == 0);
for (int i = 0; i < n->slot_info_pairs_count; i++) addReplyArrayLen(c, clusterNodeSlotInfoCount(master_node));
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]); for (int i = 0; i < clusterNodeSlotInfoCount(master_node); i++)
addReplyLongLong(c, (unsigned long)clusterNodeSlotInfoEntry(master_node, i));
} else { } else {
/* If no slot info pair is provided, the node owns no slots */ /* If no slot info pair is provided, the node owns no slots */
addReplyArrayLen(c, 0); addReplyArrayLen(c, 0);
} }
addReplyBulkCString(c, "nodes"); addReplyBulkCString(c, "nodes");
addReplyArrayLen(c, listLength(nodes)); addReplyArrayLen(c, clusterGetShardNodeCount(shard_handle));
listIter li; void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
listRewind(nodes, &li); for (clusterNode *n = clusterShardNodeIteratorNext(node_it); n != NULL; n = clusterShardNodeIteratorNext(node_it)) {
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
clusterNode *n = listNodeValue(ln);
addNodeDetailsToShardReply(c, n); addNodeDetailsToShardReply(c, n);
clusterFreeNodesSlotsInfo(n); clusterFreeNodesSlotsInfo(n);
} }
clusterShardNodeIteratorFree(node_it);
} }
/* Add to the output buffer of the given client, an array of slot (start, end) /* Add to the output buffer of the given client, an array of slot (start, end)
* pair owned by the shard, also the primary and set of replica(s) along with * pair owned by the shard, also the primary and set of replica(s) along with
* information about each node. */ * information about each node. */
void clusterCommandShards(client *c) { void clusterCommandShards(client *c) {
addReplyArrayLen(c, dictSize(server.cluster->shards)); addReplyArrayLen(c, clusterGetShardCount());
/* This call will add slot_info_pairs to all nodes */ /* This call will add slot_info_pairs to all nodes */
clusterGenNodesSlotsInfo(0); clusterGenNodesSlotsInfo(0);
dictIterator *di = dictGetSafeIterator(server.cluster->shards); dictIterator *shard_it = clusterGetShardIterator();
for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) { for(void *shard_handle = clusterNextShardHandle(shard_it); shard_handle != NULL; shard_handle = clusterNextShardHandle(shard_it)) {
addShardReplyForClusterShards(c, dictGetVal(de)); addShardReplyForClusterShards(c, shard_handle);
} }
dictReleaseIterator(di); clusterFreeShardIterator(shard_it);
} }
void clusterCommandHelp(client *c) { void clusterCommandHelp(client *c) {

View File

@ -97,7 +97,7 @@ int clusterManualFailoverTimeLimit(void);
void clusterCommandSlots(client * c); void clusterCommandSlots(client * c);
void clusterCommandMyId(client *c); void clusterCommandMyId(client *c);
void clusterCommandMyShardId(client *c); void clusterCommandMyShardId(client *c);
void clusterCommandShards(client *c);
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary); sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);
int clusterNodeCoversSlot(clusterNode *n, int slot); int clusterNodeCoversSlot(clusterNode *n, int slot);
@ -142,4 +142,23 @@ int isValidAuxString(char *s, unsigned int length);
void migrateCommand(client *c); void migrateCommand(client *c);
void clusterCommand(client *c); void clusterCommand(client *c);
ConnectionType *connTypeOfCluster(void); ConnectionType *connTypeOfCluster(void);
void clusterGenNodesSlotsInfo(int filter);
void clusterFreeNodesSlotsInfo(clusterNode *n);
int clusterNodeSlotInfoCount(clusterNode *n);
uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx);
int clusterNodeHasSlotInfo(clusterNode *n);
int clusterGetShardCount(void);
void *clusterGetShardIterator(void);
void *clusterNextShardHandle(void *shard_iterator);
void clusterFreeShardIterator(void *shard_iterator);
int clusterGetShardNodeCount(void *shard);
void *clusterShardHandleGetNodeIterator(void *shard);
clusterNode *clusterShardNodeIteratorNext(void *node_iterator);
void clusterShardNodeIteratorFree(void *node_iterator);
clusterNode *clusterShardNodeFirst(void *shard);
int clusterNodeTcpPort(clusterNode *node);
int clusterNodeTlsPort(clusterNode *node);
#endif /* __CLUSTER_H */ #endif /* __CLUSTER_H */

View File

@ -5582,6 +5582,70 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
} }
} }
int clusterGetShardCount(void) {
return dictSize(server.cluster->shards);
}
void *clusterGetShardIterator(void) {
return dictGetSafeIterator(server.cluster->shards);
}
void *clusterNextShardHandle(void *shard_iterator) {
dictEntry *de = dictNext(shard_iterator);
if(de == NULL) return NULL;
return dictGetVal(de);
}
void clusterFreeShardIterator(void *shard_iterator) {
dictReleaseIterator(shard_iterator);
}
int clusterNodeHasSlotInfo(clusterNode *n) {
return n->slot_info_pairs != NULL;
}
int clusterNodeSlotInfoCount(clusterNode *n) {
return n->slot_info_pairs_count;
}
uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx) {
return n->slot_info_pairs[idx];
}
int clusterGetShardNodeCount(void *shard) {
return listLength((list*)shard);
}
void *clusterShardHandleGetNodeIterator(void *shard) {
listIter *li = zmalloc(sizeof(listIter));
listRewind((list*)shard, li);
return li;
}
void clusterShardNodeIteratorFree(void *node_iterator) {
zfree(node_iterator);
}
clusterNode *clusterShardNodeIteratorNext(void *node_iterator) {
listNode *item = listNext((listIter*)node_iterator);
if (item == NULL) return NULL;
return listNodeValue(item);
}
clusterNode *clusterShardNodeFirst(void *shard) {
listNode *item = listFirst((list*)shard);
if (item == NULL) return NULL;
return listNodeValue(item);
}
int clusterNodeTcpPort(clusterNode *node) {
return node->tcp_port;
}
int clusterNodeTlsPort(clusterNode *node) {
return node->tls_port;
}
sds genClusterInfoString(void) { sds genClusterInfoString(void) {
sds info = sdsempty(); sds info = sdsempty();
char *statestr[] = {"ok","fail"}; char *statestr[] = {"ok","fail"};