diff --git a/src/cluster.c b/src/cluster.c index 017b7f261..e79d61fc5 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -783,6 +783,130 @@ unsigned int countKeysInSlot(unsigned int slot) { return kvstoreDictSize(server.db->keys, slot); } +/* Add detailed information of a node to the output buffer of the given client. */ +void addNodeDetailsToShardReply(client *c, clusterNode *node) { + int reply_count = 0; + void *node_replylen = addReplyDeferredLen(c); + addReplyBulkCString(c, "id"); + addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); + reply_count++; + + if (node->tcp_port) { + addReplyBulkCString(c, "port"); + addReplyLongLong(c, node->tcp_port); + reply_count++; + } + + if (node->tls_port) { + addReplyBulkCString(c, "tls-port"); + addReplyLongLong(c, node->tls_port); + reply_count++; + } + + addReplyBulkCString(c, "ip"); + addReplyBulkCString(c, node->ip); + reply_count++; + + addReplyBulkCString(c, "endpoint"); + addReplyBulkCString(c, clusterNodePreferredEndpoint(node)); + reply_count++; + + if (sdslen(node->hostname) != 0) { + addReplyBulkCString(c, "hostname"); + addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname)); + reply_count++; + } + + long long node_offset; + if (node->flags & CLUSTER_NODE_MYSELF) { + node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; + } else { + node_offset = node->repl_offset; + } + + addReplyBulkCString(c, "role"); + addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master"); + reply_count++; + + addReplyBulkCString(c, "replication-offset"); + addReplyLongLong(c, node_offset); + reply_count++; + + addReplyBulkCString(c, "health"); + const char *health_msg = NULL; + if (nodeFailed(node)) { + health_msg = "fail"; + } else if (nodeIsSlave(node) && node_offset == 0) { + health_msg = "loading"; + } else { + health_msg = "online"; + } + addReplyBulkCString(c, health_msg); + reply_count++; + + setDeferredMapLen(c, node_replylen, reply_count); +} + +static clusterNode *clusterGetMasterFromShard(list *nodes) { + clusterNode *n = NULL; + listIter li; + listNode *ln; + listRewind(nodes,&li); + while ((ln = listNext(&li)) != NULL) { + clusterNode *node = listNodeValue(ln); + if (!nodeFailed(node)) { + n = node; + break; + } + } + if (!n) return NULL; + return clusterNodeGetMaster(n); +} + +/* Add the shard reply of a single shard based off the given primary node. */ +void addShardReplyForClusterShards(client *c, list *nodes) { + serverAssert(listLength(nodes) > 0); + + addReplyMapLen(c, 2); + addReplyBulkCString(c, "slots"); + + /* Use slot_info_pairs from the primary only */ + clusterNode *n = clusterGetMasterFromShard(nodes); + if (n && n->slot_info_pairs != NULL) { + serverAssert((n->slot_info_pairs_count % 2) == 0); + addReplyArrayLen(c, n->slot_info_pairs_count); + for (int i = 0; i < n->slot_info_pairs_count; i++) + addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]); + } else { + /* If no slot info pair is provided, the node owns no slots */ + addReplyArrayLen(c, 0); + } + + addReplyBulkCString(c, "nodes"); + addReplyArrayLen(c, listLength(nodes)); + listIter li; + listRewind(nodes, &li); + for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) { + clusterNode *n = listNodeValue(ln); + addNodeDetailsToShardReply(c, n); + clusterFreeNodesSlotsInfo(n); + } +} + +/* Add to the output buffer of the given client, an array of slot (start, end) + * pair owned by the shard, also the primary and set of replica(s) along with + * information about each node. */ +void clusterCommandShards(client *c) { + addReplyArrayLen(c, dictSize(server.cluster->shards)); + /* This call will add slot_info_pairs to all nodes */ + clusterGenNodesSlotsInfo(0); + dictIterator *di = dictGetSafeIterator(server.cluster->shards); + for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) { + addShardReplyForClusterShards(c, dictGetVal(de)); + } + dictReleaseIterator(di); +} + void clusterCommandHelp(client *c) { const char *help[] = { "COUNTKEYSINSLOT ", diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 632ddba6d..f9cce99ae 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1624,22 +1624,6 @@ void clusterRemoveNodeFromShard(clusterNode *node) { sdsfree(s); } -static clusterNode *clusterGetMasterFromShard(list *nodes) { - clusterNode *n = NULL; - listIter li; - listNode *ln; - listRewind(nodes,&li); - while ((ln = listNext(&li)) != NULL) { - clusterNode *node = listNodeValue(ln); - if (!nodeFailed(node)) { - n = node; - break; - } - } - if (!n) return NULL; - return clusterNodeGetMaster(n); -} - /* ----------------------------------------------------------------------------- * CLUSTER config epoch handling * -------------------------------------------------------------------------- */ @@ -5598,114 +5582,6 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) { } } -/* Add detailed information of a node to the output buffer of the given client. */ -void addNodeDetailsToShardReply(client *c, clusterNode *node) { - int reply_count = 0; - void *node_replylen = addReplyDeferredLen(c); - addReplyBulkCString(c, "id"); - addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN); - reply_count++; - - if (node->tcp_port) { - addReplyBulkCString(c, "port"); - addReplyLongLong(c, node->tcp_port); - reply_count++; - } - - if (node->tls_port) { - addReplyBulkCString(c, "tls-port"); - addReplyLongLong(c, node->tls_port); - reply_count++; - } - - addReplyBulkCString(c, "ip"); - addReplyBulkCString(c, node->ip); - reply_count++; - - addReplyBulkCString(c, "endpoint"); - addReplyBulkCString(c, clusterNodePreferredEndpoint(node)); - reply_count++; - - if (sdslen(node->hostname) != 0) { - addReplyBulkCString(c, "hostname"); - addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname)); - reply_count++; - } - - long long node_offset; - if (node->flags & CLUSTER_NODE_MYSELF) { - node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; - } else { - node_offset = node->repl_offset; - } - - addReplyBulkCString(c, "role"); - addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master"); - reply_count++; - - addReplyBulkCString(c, "replication-offset"); - addReplyLongLong(c, node_offset); - reply_count++; - - addReplyBulkCString(c, "health"); - const char *health_msg = NULL; - if (nodeFailed(node)) { - health_msg = "fail"; - } else if (nodeIsSlave(node) && node_offset == 0) { - health_msg = "loading"; - } else { - health_msg = "online"; - } - addReplyBulkCString(c, health_msg); - reply_count++; - - setDeferredMapLen(c, node_replylen, reply_count); -} - -/* Add the shard reply of a single shard based off the given primary node. */ -void addShardReplyForClusterShards(client *c, list *nodes) { - serverAssert(listLength(nodes) > 0); - - addReplyMapLen(c, 2); - addReplyBulkCString(c, "slots"); - - /* Use slot_info_pairs from the primary only */ - clusterNode *n = clusterGetMasterFromShard(nodes); - if (n && n->slot_info_pairs != NULL) { - serverAssert((n->slot_info_pairs_count % 2) == 0); - addReplyArrayLen(c, n->slot_info_pairs_count); - for (int i = 0; i < n->slot_info_pairs_count; i++) - addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]); - } else { - /* If no slot info pair is provided, the node owns no slots */ - addReplyArrayLen(c, 0); - } - - addReplyBulkCString(c, "nodes"); - addReplyArrayLen(c, listLength(nodes)); - listIter li; - listRewind(nodes, &li); - for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) { - clusterNode *n = listNodeValue(ln); - addNodeDetailsToShardReply(c, n); - clusterFreeNodesSlotsInfo(n); - } -} - -/* Add to the output buffer of the given client, an array of slot (start, end) - * pair owned by the shard, also the primary and set of replica(s) along with - * information about each node. */ -void clusterCommandShards(client *c) { - addReplyArrayLen(c, dictSize(server.cluster->shards)); - /* This call will add slot_info_pairs to all nodes */ - clusterGenNodesSlotsInfo(0); - dictIterator *di = dictGetSafeIterator(server.cluster->shards); - for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) { - addShardReplyForClusterShards(c, dictGetVal(de)); - } - dictReleaseIterator(di); -} - sds genClusterInfoString(void) { sds info = sdsempty(); char *statestr[] = {"ok","fail"};