mirror of https://mirror.osredm.com/root/redis.git
Keep cluster shards command implementation generic (#13440)
Make the clusterCommandShards function use only cluster API functions instead of accessing cluster implementation details. This way the cluster API implementation doesn't have to have intimate knowledge of the command reply format, and doesn't need to interact with the client directly (the addReply function family). The PR has two commits, one moves the function from cluster_legacy.c to cluster.c, and the other modifies it's implementation. **better merge without squashing.**
This commit is contained in:
commit
e4ddc34463
124
src/cluster.c
124
src/cluster.c
|
@ -783,6 +783,130 @@ unsigned int countKeysInSlot(unsigned int slot) {
|
|||
return kvstoreDictSize(server.db->keys, slot);
|
||||
}
|
||||
|
||||
/* Add detailed information of a node to the output buffer of the given client. */
|
||||
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
|
||||
|
||||
int reply_count = 0;
|
||||
char *hostname;
|
||||
void *node_replylen = addReplyDeferredLen(c);
|
||||
|
||||
addReplyBulkCString(c, "id");
|
||||
addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);
|
||||
reply_count++;
|
||||
|
||||
if (clusterNodeTcpPort(node)) {
|
||||
addReplyBulkCString(c, "port");
|
||||
addReplyLongLong(c, clusterNodeTcpPort(node));
|
||||
reply_count++;
|
||||
}
|
||||
|
||||
if (clusterNodeTlsPort(node)) {
|
||||
addReplyBulkCString(c, "tls-port");
|
||||
addReplyLongLong(c, clusterNodeTlsPort(node));
|
||||
reply_count++;
|
||||
}
|
||||
|
||||
addReplyBulkCString(c, "ip");
|
||||
addReplyBulkCString(c, clusterNodeIp(node));
|
||||
reply_count++;
|
||||
|
||||
addReplyBulkCString(c, "endpoint");
|
||||
addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
|
||||
reply_count++;
|
||||
|
||||
hostname = clusterNodeHostname(node);
|
||||
if (hostname != NULL && *hostname != '\0') {
|
||||
addReplyBulkCString(c, "hostname");
|
||||
addReplyBulkCString(c, hostname);
|
||||
reply_count++;
|
||||
}
|
||||
|
||||
long long node_offset;
|
||||
if (clusterNodeIsMyself(node)) {
|
||||
node_offset = clusterNodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
|
||||
} else {
|
||||
node_offset = clusterNodeReplOffset(node);
|
||||
}
|
||||
|
||||
addReplyBulkCString(c, "role");
|
||||
addReplyBulkCString(c, clusterNodeIsSlave(node) ? "replica" : "master");
|
||||
reply_count++;
|
||||
|
||||
addReplyBulkCString(c, "replication-offset");
|
||||
addReplyLongLong(c, node_offset);
|
||||
reply_count++;
|
||||
|
||||
addReplyBulkCString(c, "health");
|
||||
const char *health_msg = NULL;
|
||||
if (clusterNodeIsFailing(node)) {
|
||||
health_msg = "fail";
|
||||
} else if (clusterNodeIsSlave(node) && node_offset == 0) {
|
||||
health_msg = "loading";
|
||||
} else {
|
||||
health_msg = "online";
|
||||
}
|
||||
addReplyBulkCString(c, health_msg);
|
||||
reply_count++;
|
||||
|
||||
setDeferredMapLen(c, node_replylen, reply_count);
|
||||
}
|
||||
|
||||
static clusterNode *clusterGetMasterFromShard(void *shard_handle) {
|
||||
clusterNode *n = NULL;
|
||||
void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
|
||||
while((n = clusterShardNodeIteratorNext(node_it)) != NULL) {
|
||||
if (!clusterNodeIsFailing(n)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
clusterShardNodeIteratorFree(node_it);
|
||||
if (!n) return NULL;
|
||||
return clusterNodeGetMaster(n);
|
||||
}
|
||||
|
||||
/* Add the shard reply of a single shard based off the given primary node. */
|
||||
void addShardReplyForClusterShards(client *c, void *shard_handle) {
|
||||
serverAssert(clusterGetShardNodeCount(shard_handle) > 0);
|
||||
addReplyMapLen(c, 2);
|
||||
addReplyBulkCString(c, "slots");
|
||||
|
||||
/* Use slot_info_pairs from the primary only */
|
||||
clusterNode *master_node = clusterGetMasterFromShard(shard_handle);
|
||||
|
||||
if (master_node && clusterNodeHasSlotInfo(master_node)) {
|
||||
serverAssert((clusterNodeSlotInfoCount(master_node) % 2) == 0);
|
||||
addReplyArrayLen(c, clusterNodeSlotInfoCount(master_node));
|
||||
for (int i = 0; i < clusterNodeSlotInfoCount(master_node); i++)
|
||||
addReplyLongLong(c, (unsigned long)clusterNodeSlotInfoEntry(master_node, i));
|
||||
} else {
|
||||
/* If no slot info pair is provided, the node owns no slots */
|
||||
addReplyArrayLen(c, 0);
|
||||
}
|
||||
|
||||
addReplyBulkCString(c, "nodes");
|
||||
addReplyArrayLen(c, clusterGetShardNodeCount(shard_handle));
|
||||
void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
|
||||
for (clusterNode *n = clusterShardNodeIteratorNext(node_it); n != NULL; n = clusterShardNodeIteratorNext(node_it)) {
|
||||
addNodeDetailsToShardReply(c, n);
|
||||
clusterFreeNodesSlotsInfo(n);
|
||||
}
|
||||
clusterShardNodeIteratorFree(node_it);
|
||||
}
|
||||
|
||||
/* Add to the output buffer of the given client, an array of slot (start, end)
|
||||
* pair owned by the shard, also the primary and set of replica(s) along with
|
||||
* information about each node. */
|
||||
void clusterCommandShards(client *c) {
|
||||
addReplyArrayLen(c, clusterGetShardCount());
|
||||
/* This call will add slot_info_pairs to all nodes */
|
||||
clusterGenNodesSlotsInfo(0);
|
||||
dictIterator *shard_it = clusterGetShardIterator();
|
||||
for(void *shard_handle = clusterNextShardHandle(shard_it); shard_handle != NULL; shard_handle = clusterNextShardHandle(shard_it)) {
|
||||
addShardReplyForClusterShards(c, shard_handle);
|
||||
}
|
||||
clusterFreeShardIterator(shard_it);
|
||||
}
|
||||
|
||||
void clusterCommandHelp(client *c) {
|
||||
const char *help[] = {
|
||||
"COUNTKEYSINSLOT <slot>",
|
||||
|
|
|
@ -97,7 +97,7 @@ int clusterManualFailoverTimeLimit(void);
|
|||
void clusterCommandSlots(client * c);
|
||||
void clusterCommandMyId(client *c);
|
||||
void clusterCommandMyShardId(client *c);
|
||||
void clusterCommandShards(client *c);
|
||||
|
||||
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);
|
||||
|
||||
int clusterNodeCoversSlot(clusterNode *n, int slot);
|
||||
|
@ -142,4 +142,23 @@ int isValidAuxString(char *s, unsigned int length);
|
|||
void migrateCommand(client *c);
|
||||
void clusterCommand(client *c);
|
||||
ConnectionType *connTypeOfCluster(void);
|
||||
|
||||
void clusterGenNodesSlotsInfo(int filter);
|
||||
void clusterFreeNodesSlotsInfo(clusterNode *n);
|
||||
int clusterNodeSlotInfoCount(clusterNode *n);
|
||||
uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx);
|
||||
int clusterNodeHasSlotInfo(clusterNode *n);
|
||||
|
||||
int clusterGetShardCount(void);
|
||||
void *clusterGetShardIterator(void);
|
||||
void *clusterNextShardHandle(void *shard_iterator);
|
||||
void clusterFreeShardIterator(void *shard_iterator);
|
||||
int clusterGetShardNodeCount(void *shard);
|
||||
void *clusterShardHandleGetNodeIterator(void *shard);
|
||||
clusterNode *clusterShardNodeIteratorNext(void *node_iterator);
|
||||
void clusterShardNodeIteratorFree(void *node_iterator);
|
||||
clusterNode *clusterShardNodeFirst(void *shard);
|
||||
|
||||
int clusterNodeTcpPort(clusterNode *node);
|
||||
int clusterNodeTlsPort(clusterNode *node);
|
||||
#endif /* __CLUSTER_H */
|
||||
|
|
|
@ -1624,22 +1624,6 @@ void clusterRemoveNodeFromShard(clusterNode *node) {
|
|||
sdsfree(s);
|
||||
}
|
||||
|
||||
static clusterNode *clusterGetMasterFromShard(list *nodes) {
|
||||
clusterNode *n = NULL;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(nodes,&li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterNode *node = listNodeValue(ln);
|
||||
if (!nodeFailed(node)) {
|
||||
n = node;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!n) return NULL;
|
||||
return clusterNodeGetMaster(n);
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* CLUSTER config epoch handling
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
@ -5598,112 +5582,68 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Add detailed information of a node to the output buffer of the given client. */
|
||||
void addNodeDetailsToShardReply(client *c, clusterNode *node) {
|
||||
int reply_count = 0;
|
||||
void *node_replylen = addReplyDeferredLen(c);
|
||||
addReplyBulkCString(c, "id");
|
||||
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
||||
reply_count++;
|
||||
|
||||
if (node->tcp_port) {
|
||||
addReplyBulkCString(c, "port");
|
||||
addReplyLongLong(c, node->tcp_port);
|
||||
reply_count++;
|
||||
}
|
||||
|
||||
if (node->tls_port) {
|
||||
addReplyBulkCString(c, "tls-port");
|
||||
addReplyLongLong(c, node->tls_port);
|
||||
reply_count++;
|
||||
}
|
||||
|
||||
addReplyBulkCString(c, "ip");
|
||||
addReplyBulkCString(c, node->ip);
|
||||
reply_count++;
|
||||
|
||||
addReplyBulkCString(c, "endpoint");
|
||||
addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
|
||||
reply_count++;
|
||||
|
||||
if (sdslen(node->hostname) != 0) {
|
||||
addReplyBulkCString(c, "hostname");
|
||||
addReplyBulkCBuffer(c, node->hostname, sdslen(node->hostname));
|
||||
reply_count++;
|
||||
}
|
||||
|
||||
long long node_offset;
|
||||
if (node->flags & CLUSTER_NODE_MYSELF) {
|
||||
node_offset = nodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
|
||||
} else {
|
||||
node_offset = node->repl_offset;
|
||||
}
|
||||
|
||||
addReplyBulkCString(c, "role");
|
||||
addReplyBulkCString(c, nodeIsSlave(node) ? "replica" : "master");
|
||||
reply_count++;
|
||||
|
||||
addReplyBulkCString(c, "replication-offset");
|
||||
addReplyLongLong(c, node_offset);
|
||||
reply_count++;
|
||||
|
||||
addReplyBulkCString(c, "health");
|
||||
const char *health_msg = NULL;
|
||||
if (nodeFailed(node)) {
|
||||
health_msg = "fail";
|
||||
} else if (nodeIsSlave(node) && node_offset == 0) {
|
||||
health_msg = "loading";
|
||||
} else {
|
||||
health_msg = "online";
|
||||
}
|
||||
addReplyBulkCString(c, health_msg);
|
||||
reply_count++;
|
||||
|
||||
setDeferredMapLen(c, node_replylen, reply_count);
|
||||
int clusterGetShardCount(void) {
|
||||
return dictSize(server.cluster->shards);
|
||||
}
|
||||
|
||||
/* Add the shard reply of a single shard based off the given primary node. */
|
||||
void addShardReplyForClusterShards(client *c, list *nodes) {
|
||||
serverAssert(listLength(nodes) > 0);
|
||||
|
||||
addReplyMapLen(c, 2);
|
||||
addReplyBulkCString(c, "slots");
|
||||
|
||||
/* Use slot_info_pairs from the primary only */
|
||||
clusterNode *n = clusterGetMasterFromShard(nodes);
|
||||
if (n && n->slot_info_pairs != NULL) {
|
||||
serverAssert((n->slot_info_pairs_count % 2) == 0);
|
||||
addReplyArrayLen(c, n->slot_info_pairs_count);
|
||||
for (int i = 0; i < n->slot_info_pairs_count; i++)
|
||||
addReplyLongLong(c, (unsigned long)n->slot_info_pairs[i]);
|
||||
} else {
|
||||
/* If no slot info pair is provided, the node owns no slots */
|
||||
addReplyArrayLen(c, 0);
|
||||
}
|
||||
|
||||
addReplyBulkCString(c, "nodes");
|
||||
addReplyArrayLen(c, listLength(nodes));
|
||||
listIter li;
|
||||
listRewind(nodes, &li);
|
||||
for (listNode *ln = listNext(&li); ln != NULL; ln = listNext(&li)) {
|
||||
clusterNode *n = listNodeValue(ln);
|
||||
addNodeDetailsToShardReply(c, n);
|
||||
clusterFreeNodesSlotsInfo(n);
|
||||
}
|
||||
void *clusterGetShardIterator(void) {
|
||||
return dictGetSafeIterator(server.cluster->shards);
|
||||
}
|
||||
|
||||
/* Add to the output buffer of the given client, an array of slot (start, end)
|
||||
* pair owned by the shard, also the primary and set of replica(s) along with
|
||||
* information about each node. */
|
||||
void clusterCommandShards(client *c) {
|
||||
addReplyArrayLen(c, dictSize(server.cluster->shards));
|
||||
/* This call will add slot_info_pairs to all nodes */
|
||||
clusterGenNodesSlotsInfo(0);
|
||||
dictIterator *di = dictGetSafeIterator(server.cluster->shards);
|
||||
for(dictEntry *de = dictNext(di); de != NULL; de = dictNext(di)) {
|
||||
addShardReplyForClusterShards(c, dictGetVal(de));
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
void *clusterNextShardHandle(void *shard_iterator) {
|
||||
dictEntry *de = dictNext(shard_iterator);
|
||||
if(de == NULL) return NULL;
|
||||
return dictGetVal(de);
|
||||
}
|
||||
|
||||
void clusterFreeShardIterator(void *shard_iterator) {
|
||||
dictReleaseIterator(shard_iterator);
|
||||
}
|
||||
|
||||
int clusterNodeHasSlotInfo(clusterNode *n) {
|
||||
return n->slot_info_pairs != NULL;
|
||||
}
|
||||
|
||||
int clusterNodeSlotInfoCount(clusterNode *n) {
|
||||
return n->slot_info_pairs_count;
|
||||
}
|
||||
|
||||
uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx) {
|
||||
return n->slot_info_pairs[idx];
|
||||
}
|
||||
|
||||
int clusterGetShardNodeCount(void *shard) {
|
||||
return listLength((list*)shard);
|
||||
}
|
||||
|
||||
void *clusterShardHandleGetNodeIterator(void *shard) {
|
||||
listIter *li = zmalloc(sizeof(listIter));
|
||||
listRewind((list*)shard, li);
|
||||
return li;
|
||||
}
|
||||
|
||||
void clusterShardNodeIteratorFree(void *node_iterator) {
|
||||
zfree(node_iterator);
|
||||
}
|
||||
|
||||
clusterNode *clusterShardNodeIteratorNext(void *node_iterator) {
|
||||
listNode *item = listNext((listIter*)node_iterator);
|
||||
if (item == NULL) return NULL;
|
||||
return listNodeValue(item);
|
||||
}
|
||||
|
||||
clusterNode *clusterShardNodeFirst(void *shard) {
|
||||
listNode *item = listFirst((list*)shard);
|
||||
if (item == NULL) return NULL;
|
||||
return listNodeValue(item);
|
||||
}
|
||||
|
||||
int clusterNodeTcpPort(clusterNode *node) {
|
||||
return node->tcp_port;
|
||||
}
|
||||
|
||||
int clusterNodeTlsPort(clusterNode *node) {
|
||||
return node->tls_port;
|
||||
}
|
||||
|
||||
sds genClusterInfoString(void) {
|
||||
|
|
Loading…
Reference in New Issue