mirror of https://mirror.osredm.com/root/redis.git
Prep to make cluster shards cmd generic
This and the next following commit makes the cluster shards command a generic implementation instead of a specific implementation for each cluster API implementation. This commit simply moves the cluster shards implementation from cluster_legacy.c to cluster.c without changing the implementation at all. The reason for doing so was to help with reviewing the changes in the diff. Signed-off-by: Josh Hershberg <yehoshua@redis.com>
This commit is contained in:
parent
8038eb3147
commit
e3e631f394
124
src/cluster.c
124
src/cluster.c
|
@ -783,6 +783,130 @@ unsigned int countKeysInSlot(unsigned int slot) {
|
||||||
return kvstoreDictSize(server.db->keys, slot);
|
return kvstoreDictSize(server.db->keys, slot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Add detailed information of a node to the output buffer of the given client. */
|
||||||
|
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
|
||||||
|
int reply_count = 0;
|
||||||
|
void *node_replylen = addReplyDeferredLen(c);
|
||||||
|
addReplyBulkCString(c, "id");
|
||||||
|
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
||||||
|
reply_count++;
|
||||||
|
|
||||||
|
if (node->tcp_port) {
|
||||||
|
addReplyBulkCString(c, "port");
|
||||||
|
addReplyLongLong(c, node->tcp_port);
|
||||||
|
reply_count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (node->tls_port) {
|
||||||
|
addReplyBulkCString(c, "tls-port");
|
||||||
|
addReplyLongLong(c, node->tls_port);
|
||||||
|
reply_count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
addReplyBulkCString(c, "ip");
|
||||||
|
addReplyBulkCString(c, node->ip);
|
||||||
|
reply_count++;
|
||||||
|
|
||||||
|
addReplyBulkCString(c, "endpoint");
|
||||||
|
addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
|
||||||
|
reply_count++;
|
||||||
|
|
||||||
|
if (sdslen(node->hostname) != 0) {
|
||||||
|
addReplyBulkCString(c, "hostname");
|
||||||
|
addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname));
|
||||||
|
reply_count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
long long node_offset;
|
||||||
|
if (node->flags & CLUSTER_NODE_MYSELF) {
|
||||||
|
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
|
||||||
|
} else {
|
||||||
|
node_offset = node->repl_offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
addReplyBulkCString(c, "role");
|
||||||
|
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
|
||||||
|
reply_count++;
|
||||||
|
|
||||||
|
addReplyBulkCString(c, "replication-offset");
|
||||||
|
addReplyLongLong(c, node_offset);
|
||||||
|
reply_count++;
|
||||||
|
|
||||||
|
addReplyBulkCString(c, "health");
|
||||||
|
const char *health_msg = NULL;
|
||||||
|
if (nodeFailed(node)) {
|
||||||
|
health_msg = "fail";
|
||||||
|
} else if (nodeIsSlave(node) && node_offset == 0) {
|
||||||
|
health_msg = "loading";
|
||||||
|
} else {
|
||||||
|
health_msg = "online";
|
||||||
|
}
|
||||||
|
addReplyBulkCString(c, health_msg);
|
||||||
|
reply_count++;
|
||||||
|
|
||||||
|
setDeferredMapLen(c, node_replylen, reply_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
static clusterNode *clusterGetMasterFromShard(list *nodes) {
|
||||||
|
clusterNode *n = NULL;
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
listRewind(nodes,&li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
clusterNode *node = listNodeValue(ln);
|
||||||
|
if (!nodeFailed(node)) {
|
||||||
|
n = node;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!n) return NULL;
|
||||||
|
return clusterNodeGetMaster(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Add the shard reply of a single shard based off the given primary node. */
|
||||||
|
void addShardReplyForClusterShards(client *c, list *nodes) {
|
||||||
|
serverAssert(listLength(nodes) > 0);
|
||||||
|
|
||||||
|
addReplyMapLen(c, 2);
|
||||||
|
addReplyBulkCString(c, "slots");
|
||||||
|
|
||||||
|
/* Use slot_info_pairs from the primary only */
|
||||||
|
clusterNode *n = clusterGetMasterFromShard(nodes);
|
||||||
|
if (n && n->slot_info_pairs != NULL) {
|
||||||
|
serverAssert((n->slot_info_pairs_count % 2) == 0);
|
||||||
|
addReplyArrayLen(c, n->slot_info_pairs_count);
|
||||||
|
for (int i = 0; i < n->slot_info_pairs_count; i++)
|
||||||
|
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
|
||||||
|
} else {
|
||||||
|
/* If no slot info pair is provided, the node owns no slots */
|
||||||
|
addReplyArrayLen(c, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
addReplyBulkCString(c, "nodes");
|
||||||
|
addReplyArrayLen(c, listLength(nodes));
|
||||||
|
listIter li;
|
||||||
|
listRewind(nodes, &li);
|
||||||
|
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
|
||||||
|
clusterNode *n = listNodeValue(ln);
|
||||||
|
addNodeDetailsToShardReply(c, n);
|
||||||
|
clusterFreeNodesSlotsInfo(n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Add to the output buffer of the given client, an array of slot (start, end)
|
||||||
|
* pair owned by the shard, also the primary and set of replica(s) along with
|
||||||
|
* information about each node. */
|
||||||
|
void clusterCommandShards(client *c) {
|
||||||
|
addReplyArrayLen(c, dictSize(server.cluster->shards));
|
||||||
|
/* This call will add slot_info_pairs to all nodes */
|
||||||
|
clusterGenNodesSlotsInfo(0);
|
||||||
|
dictIterator *di = dictGetSafeIterator(server.cluster->shards);
|
||||||
|
for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
|
||||||
|
addShardReplyForClusterShards(c, dictGetVal(de));
|
||||||
|
}
|
||||||
|
dictReleaseIterator(di);
|
||||||
|
}
|
||||||
|
|
||||||
void clusterCommandHelp(client *c) {
|
void clusterCommandHelp(client *c) {
|
||||||
const char *help[] = {
|
const char *help[] = {
|
||||||
"COUNTKEYSINSLOT <slot>",
|
"COUNTKEYSINSLOT <slot>",
|
||||||
|
|
|
@ -1624,22 +1624,6 @@ void clusterRemoveNodeFromShard(clusterNode *node) {
|
||||||
sdsfree(s);
|
sdsfree(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
static clusterNode *clusterGetMasterFromShard(list *nodes) {
|
|
||||||
clusterNode *n = NULL;
|
|
||||||
listIter li;
|
|
||||||
listNode *ln;
|
|
||||||
listRewind(nodes,&li);
|
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
|
||||||
clusterNode *node = listNodeValue(ln);
|
|
||||||
if (!nodeFailed(node)) {
|
|
||||||
n = node;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!n) return NULL;
|
|
||||||
return clusterNodeGetMaster(n);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* -----------------------------------------------------------------------------
|
/* -----------------------------------------------------------------------------
|
||||||
* CLUSTER config epoch handling
|
* CLUSTER config epoch handling
|
||||||
* -------------------------------------------------------------------------- */
|
* -------------------------------------------------------------------------- */
|
||||||
|
@ -5598,114 +5582,6 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Add detailed information of a node to the output buffer of the given client. */
|
|
||||||
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
|
|
||||||
int reply_count = 0;
|
|
||||||
void *node_replylen = addReplyDeferredLen(c);
|
|
||||||
addReplyBulkCString(c, "id");
|
|
||||||
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
|
||||||
reply_count++;
|
|
||||||
|
|
||||||
if (node->tcp_port) {
|
|
||||||
addReplyBulkCString(c, "port");
|
|
||||||
addReplyLongLong(c, node->tcp_port);
|
|
||||||
reply_count++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (node->tls_port) {
|
|
||||||
addReplyBulkCString(c, "tls-port");
|
|
||||||
addReplyLongLong(c, node->tls_port);
|
|
||||||
reply_count++;
|
|
||||||
}
|
|
||||||
|
|
||||||
addReplyBulkCString(c, "ip");
|
|
||||||
addReplyBulkCString(c, node->ip);
|
|
||||||
reply_count++;
|
|
||||||
|
|
||||||
addReplyBulkCString(c, "endpoint");
|
|
||||||
addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
|
|
||||||
reply_count++;
|
|
||||||
|
|
||||||
if (sdslen(node->hostname) != 0) {
|
|
||||||
addReplyBulkCString(c, "hostname");
|
|
||||||
addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname));
|
|
||||||
reply_count++;
|
|
||||||
}
|
|
||||||
|
|
||||||
long long node_offset;
|
|
||||||
if (node->flags & CLUSTER_NODE_MYSELF) {
|
|
||||||
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
|
|
||||||
} else {
|
|
||||||
node_offset = node->repl_offset;
|
|
||||||
}
|
|
||||||
|
|
||||||
addReplyBulkCString(c, "role");
|
|
||||||
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
|
|
||||||
reply_count++;
|
|
||||||
|
|
||||||
addReplyBulkCString(c, "replication-offset");
|
|
||||||
addReplyLongLong(c, node_offset);
|
|
||||||
reply_count++;
|
|
||||||
|
|
||||||
addReplyBulkCString(c, "health");
|
|
||||||
const char *health_msg = NULL;
|
|
||||||
if (nodeFailed(node)) {
|
|
||||||
health_msg = "fail";
|
|
||||||
} else if (nodeIsSlave(node) && node_offset == 0) {
|
|
||||||
health_msg = "loading";
|
|
||||||
} else {
|
|
||||||
health_msg = "online";
|
|
||||||
}
|
|
||||||
addReplyBulkCString(c, health_msg);
|
|
||||||
reply_count++;
|
|
||||||
|
|
||||||
setDeferredMapLen(c, node_replylen, reply_count);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add the shard reply of a single shard based off the given primary node. */
|
|
||||||
void addShardReplyForClusterShards(client *c, list *nodes) {
|
|
||||||
serverAssert(listLength(nodes) > 0);
|
|
||||||
|
|
||||||
addReplyMapLen(c, 2);
|
|
||||||
addReplyBulkCString(c, "slots");
|
|
||||||
|
|
||||||
/* Use slot_info_pairs from the primary only */
|
|
||||||
clusterNode *n = clusterGetMasterFromShard(nodes);
|
|
||||||
if (n && n->slot_info_pairs != NULL) {
|
|
||||||
serverAssert((n->slot_info_pairs_count % 2) == 0);
|
|
||||||
addReplyArrayLen(c, n->slot_info_pairs_count);
|
|
||||||
for (int i = 0; i < n->slot_info_pairs_count; i++)
|
|
||||||
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
|
|
||||||
} else {
|
|
||||||
/* If no slot info pair is provided, the node owns no slots */
|
|
||||||
addReplyArrayLen(c, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
addReplyBulkCString(c, "nodes");
|
|
||||||
addReplyArrayLen(c, listLength(nodes));
|
|
||||||
listIter li;
|
|
||||||
listRewind(nodes, &li);
|
|
||||||
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
|
|
||||||
clusterNode *n = listNodeValue(ln);
|
|
||||||
addNodeDetailsToShardReply(c, n);
|
|
||||||
clusterFreeNodesSlotsInfo(n);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Add to the output buffer of the given client, an array of slot (start, end)
|
|
||||||
* pair owned by the shard, also the primary and set of replica(s) along with
|
|
||||||
* information about each node. */
|
|
||||||
void clusterCommandShards(client *c) {
|
|
||||||
addReplyArrayLen(c, dictSize(server.cluster->shards));
|
|
||||||
/* This call will add slot_info_pairs to all nodes */
|
|
||||||
clusterGenNodesSlotsInfo(0);
|
|
||||||
dictIterator *di = dictGetSafeIterator(server.cluster->shards);
|
|
||||||
for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
|
|
||||||
addShardReplyForClusterShards(c, dictGetVal(de));
|
|
||||||
}
|
|
||||||
dictReleaseIterator(di);
|
|
||||||
}
|
|
||||||
|
|
||||||
sds genClusterInfoString(void) {
|
sds genClusterInfoString(void) {
|
||||||
sds info = sdsempty();
|
sds info = sdsempty();
|
||||||
char *statestr[] = {"ok","fail"};
|
char *statestr[] = {"ok","fail"};
|
||||||
|
|
Loading…
Reference in New Issue