mirror of https://mirror.osredm.com/root/redis.git
Cluster refactor: Make clusterNode private
Move clusterNode into cluster_legacy.h. In order to achieve this some accessor methods were added and also a refactor of how debugCommand handles cluster related subcommands. Signed-off-by: Josh Hershberg <yehoshua@redis.com>
This commit is contained in:
parent
98a6c44b75
commit
d9a0478599
|
@ -23,7 +23,8 @@
|
|||
#define CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */
|
||||
#define CLUSTER_REDIR_DOWN_RO_STATE 7 /* -CLUSTERDOWN, allow reads. */
|
||||
|
||||
struct clusterNode;
|
||||
typedef struct _clusterNode clusterNode;
|
||||
struct clusterState;
|
||||
|
||||
/* clusterLink encapsulates everything needed to talk with a remote node. */
|
||||
typedef struct clusterLink {
|
||||
|
@ -35,7 +36,7 @@ typedef struct clusterLink {
|
|||
char *rcvbuf; /* Packet reception buffer */
|
||||
size_t rcvbuf_len; /* Used size of rcvbuf */
|
||||
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
|
||||
struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
|
||||
clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
|
||||
int inbound; /* 1 if this link is an inbound link accepted from the related node */
|
||||
} clusterLink;
|
||||
|
||||
|
@ -52,7 +53,6 @@ typedef struct clusterLink {
|
|||
#define CLUSTER_NODE_NOFAILOVER 512 /* Slave will not try to failover. */
|
||||
#define CLUSTER_NODE_NULL_NAME "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
|
||||
|
||||
#define nodeIsMaster(n) ((n)->flags & CLUSTER_NODE_MASTER)
|
||||
#define nodeIsSlave(n) ((n)->flags & CLUSTER_NODE_SLAVE)
|
||||
#define nodeInHandshake(n) ((n)->flags & CLUSTER_NODE_HANDSHAKE)
|
||||
#define nodeHasAddr(n) (!((n)->flags & CLUSTER_NODE_NOADDR))
|
||||
|
@ -89,47 +89,10 @@ typedef struct clusterLink {
|
|||
|
||||
/* This structure represent elements of node->fail_reports. */
|
||||
typedef struct clusterNodeFailReport {
|
||||
struct clusterNode *node; /* Node reporting the failure condition. */
|
||||
clusterNode *node; /* Node reporting the failure condition. */
|
||||
mstime_t time; /* Time of the last report from this node. */
|
||||
} clusterNodeFailReport;
|
||||
|
||||
typedef struct clusterNode {
|
||||
mstime_t ctime; /* Node object creation time. */
|
||||
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
|
||||
char shard_id[CLUSTER_NAMELEN]; /* shard id, hex string, sha1-size */
|
||||
int flags; /* CLUSTER_NODE_... */
|
||||
uint64_t configEpoch; /* Last configEpoch observed for this node */
|
||||
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
|
||||
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
|
||||
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
|
||||
int numslots; /* Number of slots handled by this node */
|
||||
int numslaves; /* Number of slave nodes, if this is a master */
|
||||
struct clusterNode **slaves; /* pointers to slave nodes */
|
||||
struct clusterNode *slaveof; /* pointer to the master node. Note that it
|
||||
may be NULL even if the node is a slave
|
||||
if we don't have the master node in our
|
||||
tables. */
|
||||
unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */
|
||||
mstime_t ping_sent; /* Unix time we sent latest ping */
|
||||
mstime_t pong_received; /* Unix time we received the pong */
|
||||
mstime_t data_received; /* Unix time we received any data */
|
||||
mstime_t fail_time; /* Unix time when FAIL flag was set */
|
||||
mstime_t voted_time; /* Last time we voted for a slave of this master */
|
||||
mstime_t repl_offset_time; /* Unix time we received offset for this node */
|
||||
mstime_t orphaned_time; /* Starting time of orphaned master condition */
|
||||
long long repl_offset; /* Last known repl offset for this node. */
|
||||
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
|
||||
sds hostname; /* The known hostname for this node */
|
||||
sds human_nodename; /* The known human readable nodename for this node */
|
||||
int tcp_port; /* Latest known clients TCP port. */
|
||||
int tls_port; /* Latest known clients TLS port */
|
||||
int cport; /* Latest known cluster port of this node. */
|
||||
clusterLink *link; /* TCP/IP link established toward this node */
|
||||
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
|
||||
list *fail_reports; /* List of nodes signaling this as failing */
|
||||
} clusterNode;
|
||||
|
||||
struct clusterState;
|
||||
|
||||
|
||||
/* ---------------------- API exported outside cluster.c -------------------- */
|
||||
|
@ -168,5 +131,16 @@ int clusterManualFailoverTimeLimit(void);
|
|||
char* getMyClusterId(void);
|
||||
int getClusterSize(void);
|
||||
char** getClusterNodesList(size_t *numnodes);
|
||||
int nodeIsMaster(clusterNode *n);
|
||||
int handleDebugClusterCommand(client *c);
|
||||
int clusterNodeConfirmedReachable(clusterNode *node);
|
||||
char* clusterNodeIp(clusterNode *node);
|
||||
int clusterNodeIsSlave(clusterNode *node);
|
||||
clusterNode *clusterNodeGetSlaveof(clusterNode *node);
|
||||
char* clusterNodeGetName(clusterNode *node);
|
||||
int clusterNodeTimedOut(clusterNode *node);
|
||||
int clusterNodeIsFailing(clusterNode *node);
|
||||
int clusterNodeIsNoFailover(clusterNode *node);
|
||||
|
||||
char **clusterDebugCommandHelp(void);
|
||||
#endif /* __CLUSTER_H */
|
||||
|
|
|
@ -7696,3 +7696,84 @@ char** getClusterNodesList(size_t *numnodes) {
|
|||
dictReleaseIterator(di);
|
||||
return ids;
|
||||
}
|
||||
|
||||
int nodeIsMaster(clusterNode *n) {
|
||||
return n->flags & CLUSTER_NODE_MASTER;
|
||||
}
|
||||
|
||||
int handleDebugClusterCommand(client *c) {
|
||||
if (strcasecmp(c->argv[1]->ptr, "CLUSTERLINK") ||
|
||||
strcasecmp(c->argv[2]->ptr, "KILL") ||
|
||||
c->argc != 5) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!server.cluster_enabled) {
|
||||
addReplyError(c, "Debug option only available for cluster mode enabled setup!");
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Find the node. */
|
||||
clusterNode *n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
|
||||
if (!n) {
|
||||
addReplyErrorFormat(c, "Unknown node %s", (char *) c->argv[4]->ptr);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Terminate the link based on the direction or all. */
|
||||
if (!strcasecmp(c->argv[3]->ptr, "from")) {
|
||||
freeClusterLink(n->inbound_link);
|
||||
} else if (!strcasecmp(c->argv[3]->ptr, "to")) {
|
||||
freeClusterLink(n->link);
|
||||
} else if (!strcasecmp(c->argv[3]->ptr, "all")) {
|
||||
freeClusterLink(n->link);
|
||||
freeClusterLink(n->inbound_link);
|
||||
} else {
|
||||
addReplyErrorFormat(c, "Unknown direction %s", (char *) c->argv[3]->ptr);
|
||||
}
|
||||
addReply(c, shared.ok);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int clusterNodeConfirmedReachable(clusterNode *node) {
|
||||
return !(node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE));
|
||||
}
|
||||
|
||||
char* clusterNodeIp(clusterNode *node) {
|
||||
return node->ip;
|
||||
}
|
||||
|
||||
int clusterNodeIsSlave(clusterNode *node) {
|
||||
return !nodeIsMaster(node);
|
||||
}
|
||||
|
||||
clusterNode *clusterNodeGetSlaveof(clusterNode *node) {
|
||||
return node->slaveof;
|
||||
}
|
||||
|
||||
char* clusterNodeGetName(clusterNode *node) {
|
||||
return node->name;
|
||||
}
|
||||
|
||||
int clusterNodeTimedOut(clusterNode *node) {
|
||||
return nodeTimedOut(node);
|
||||
}
|
||||
|
||||
int clusterNodeIsFailing(clusterNode *node) {
|
||||
return nodeFailed(node);
|
||||
}
|
||||
|
||||
int clusterNodeIsNoFailover(clusterNode *node) {
|
||||
return node->flags & CLUSTER_NODE_NOFAILOVER;
|
||||
}
|
||||
|
||||
char **clusterDebugCommandHelp(void) {
|
||||
const char *help[] = {
|
||||
"CLUSTERLINK KILL <to|from|all> <node-id>",
|
||||
" Kills the link based on the direction to/from (both) with the provided node." ,
|
||||
NULL
|
||||
};
|
||||
|
||||
return help;
|
||||
}
|
||||
|
|
|
@ -211,6 +211,42 @@ static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset");
|
|||
master is up. */
|
||||
#define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */
|
||||
|
||||
struct _clusterNode {
|
||||
mstime_t ctime; /* Node object creation time. */
|
||||
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
|
||||
char shard_id[CLUSTER_NAMELEN]; /* shard id, hex string, sha1-size */
|
||||
int flags; /* CLUSTER_NODE_... */
|
||||
uint64_t configEpoch; /* Last configEpoch observed for this node */
|
||||
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
|
||||
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
|
||||
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
|
||||
int numslots; /* Number of slots handled by this node */
|
||||
int numslaves; /* Number of slave nodes, if this is a master */
|
||||
clusterNode **slaves; /* pointers to slave nodes */
|
||||
clusterNode *slaveof; /* pointer to the master node. Note that it
|
||||
may be NULL even if the node is a slave
|
||||
if we don't have the master node in our
|
||||
tables. */
|
||||
unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */
|
||||
mstime_t ping_sent; /* Unix time we sent latest ping */
|
||||
mstime_t pong_received; /* Unix time we received the pong */
|
||||
mstime_t data_received; /* Unix time we received any data */
|
||||
mstime_t fail_time; /* Unix time when FAIL flag was set */
|
||||
mstime_t voted_time; /* Last time we voted for a slave of this master */
|
||||
mstime_t repl_offset_time; /* Unix time we received offset for this node */
|
||||
mstime_t orphaned_time; /* Starting time of orphaned master condition */
|
||||
long long repl_offset; /* Last known repl offset for this node. */
|
||||
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
|
||||
sds hostname; /* The known hostname for this node */
|
||||
sds human_nodename; /* The known human readable nodename for this node */
|
||||
int tcp_port; /* Latest known clients TCP port. */
|
||||
int tls_port; /* Latest known clients TLS port */
|
||||
int cport; /* Latest known cluster port of this node. */
|
||||
clusterLink *link; /* TCP/IP link established toward this node */
|
||||
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
|
||||
list *fail_reports; /* List of nodes signaling this as failing */
|
||||
};
|
||||
|
||||
struct clusterState {
|
||||
clusterNode *myself; /* This node */
|
||||
uint64_t currentEpoch;
|
||||
|
|
33
src/debug.c
33
src/debug.c
|
@ -496,11 +496,9 @@ void debugCommand(client *c) {
|
|||
" In case RESET is provided the peak reset time will be restored to the default value",
|
||||
"REPLYBUFFER RESIZING <0|1>",
|
||||
" Enable or disable the reply buffer resize cron job",
|
||||
"CLUSTERLINK KILL <to|from|all> <node-id>",
|
||||
" Kills the link based on the direction to/from (both) with the provided node." ,
|
||||
NULL
|
||||
};
|
||||
addReplyHelp(c, help);
|
||||
addExtendedReplyHelp(c, help, clusterDebugCommandHelp());
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
|
||||
/* Compiler gives warnings about writing to a random address
|
||||
* e.g "*((char*)-1) = 'x';". As a workaround, we map a read-only area
|
||||
|
@ -1018,34 +1016,7 @@ NULL
|
|||
return;
|
||||
}
|
||||
addReply(c, shared.ok);
|
||||
} else if(!strcasecmp(c->argv[1]->ptr,"CLUSTERLINK") &&
|
||||
!strcasecmp(c->argv[2]->ptr,"KILL") &&
|
||||
c->argc == 5) {
|
||||
if (!server.cluster_enabled) {
|
||||
addReplyError(c, "Debug option only available for cluster mode enabled setup!");
|
||||
return;
|
||||
}
|
||||
|
||||
/* Find the node. */
|
||||
clusterNode *n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
|
||||
if (!n) {
|
||||
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Terminate the link based on the direction or all. */
|
||||
if (!strcasecmp(c->argv[3]->ptr,"from")) {
|
||||
freeClusterLink(n->inbound_link);
|
||||
} else if (!strcasecmp(c->argv[3]->ptr,"to")) {
|
||||
freeClusterLink(n->link);
|
||||
} else if (!strcasecmp(c->argv[3]->ptr,"all")) {
|
||||
freeClusterLink(n->link);
|
||||
freeClusterLink(n->inbound_link);
|
||||
} else {
|
||||
addReplyErrorFormat(c, "Unknown direction %s", (char*) c->argv[3]->ptr);
|
||||
}
|
||||
addReply(c,shared.ok);
|
||||
} else {
|
||||
} else if(!handleDebugClusterCommand(c)) {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
return;
|
||||
}
|
||||
|
|
21
src/module.c
21
src/module.c
|
@ -8967,20 +8967,19 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
|
|||
UNUSED(ctx);
|
||||
|
||||
clusterNode *node = clusterLookupNode(id, strlen(id));
|
||||
if (node == NULL ||
|
||||
node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
|
||||
if (node == NULL || !clusterNodeConfirmedReachable(node))
|
||||
{
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (ip) redis_strlcpy(ip,node->ip,NET_IP_STR_LEN);
|
||||
if (ip) redis_strlcpy(ip, clusterNodeIp(node),NET_IP_STR_LEN);
|
||||
|
||||
if (master_id) {
|
||||
/* If the information is not available, the function will set the
|
||||
* field to zero bytes, so that when the field can't be populated the
|
||||
* function kinda remains predictable. */
|
||||
if (node->flags & CLUSTER_NODE_SLAVE && node->slaveof)
|
||||
memcpy(master_id,node->slaveof->name,REDISMODULE_NODE_ID_LEN);
|
||||
if (clusterNodeIsSlave(node) && clusterNodeGetSlaveof(node))
|
||||
memcpy(master_id, clusterNodeGetName(clusterNodeGetSlaveof(node)) ,REDISMODULE_NODE_ID_LEN);
|
||||
else
|
||||
memset(master_id,0,REDISMODULE_NODE_ID_LEN);
|
||||
}
|
||||
|
@ -8990,12 +8989,12 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
|
|||
* we can provide binary compatibility. */
|
||||
if (flags) {
|
||||
*flags = 0;
|
||||
if (node->flags & CLUSTER_NODE_MYSELF) *flags |= REDISMODULE_NODE_MYSELF;
|
||||
if (node->flags & CLUSTER_NODE_MASTER) *flags |= REDISMODULE_NODE_MASTER;
|
||||
if (node->flags & CLUSTER_NODE_SLAVE) *flags |= REDISMODULE_NODE_SLAVE;
|
||||
if (node->flags & CLUSTER_NODE_PFAIL) *flags |= REDISMODULE_NODE_PFAIL;
|
||||
if (node->flags & CLUSTER_NODE_FAIL) *flags |= REDISMODULE_NODE_FAIL;
|
||||
if (node->flags & CLUSTER_NODE_NOFAILOVER) *flags |= REDISMODULE_NODE_NOFAILOVER;
|
||||
if (clusterNodeIsMyself(node)) *flags |= REDISMODULE_NODE_MYSELF;
|
||||
if (nodeIsMaster(node)) *flags |= REDISMODULE_NODE_MASTER;
|
||||
if (clusterNodeIsSlave(node)) *flags |= REDISMODULE_NODE_SLAVE;
|
||||
if (clusterNodeTimedOut(node)) *flags |= REDISMODULE_NODE_PFAIL;
|
||||
if (clusterNodeIsFailing(node)) *flags |= REDISMODULE_NODE_FAIL;
|
||||
if (clusterNodeIsNoFailover(node)) *flags |= REDISMODULE_NODE_NOFAILOVER;
|
||||
}
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
|
|
@ -1117,14 +1117,18 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) {
|
|||
}
|
||||
}
|
||||
|
||||
/* Add an array of C strings as status replies with a heading.
|
||||
* This function is typically invoked by from commands that support
|
||||
* subcommands in response to the 'help' subcommand. The help array
|
||||
* is terminated by NULL sentinel. */
|
||||
void addReplyHelp(client *c, const char **help) {
|
||||
/* This function is similar to the addReplyHelp function but adds the
|
||||
* ability to pass in two arrays of strings. Some commands have
|
||||
* some additional subcommands based on the specific feature implementation
|
||||
* Redis is compiled with (currently just clustering). This function allows
|
||||
* to pass is the common subcommands in `help` and any implementation
|
||||
* specific subcommands in `extended_help`.
|
||||
*/
|
||||
void addExtendedReplyHelp(client *c, const char **help, const char **extended_help) {
|
||||
sds cmd = sdsnew((char*) c->argv[0]->ptr);
|
||||
void *blenp = addReplyDeferredLen(c);
|
||||
int blen = 0;
|
||||
int idx = 0;
|
||||
|
||||
sdstoupper(cmd);
|
||||
addReplyStatusFormat(c,
|
||||
|
@ -1132,6 +1136,10 @@ void addReplyHelp(client *c, const char **help) {
|
|||
sdsfree(cmd);
|
||||
|
||||
while (help[blen]) addReplyStatus(c,help[blen++]);
|
||||
if (extended_help) {
|
||||
while (extended_help[idx]) addReplyStatus(c,extended_help[idx++]);
|
||||
}
|
||||
blen += idx;
|
||||
|
||||
addReplyStatus(c,"HELP");
|
||||
addReplyStatus(c," Print this help.");
|
||||
|
@ -1141,6 +1149,14 @@ void addReplyHelp(client *c, const char **help) {
|
|||
setDeferredArrayLen(c,blenp,blen);
|
||||
}
|
||||
|
||||
/* Add an array of C strings as status replies with a heading.
|
||||
* This function is typically invoked by commands that support
|
||||
* subcommands in response to the 'help' subcommand. The help array
|
||||
* is terminated by NULL sentinel. */
|
||||
void addReplyHelp(client *c, const char **help) {
|
||||
addExtendedReplyHelp(c, help, NULL);
|
||||
}
|
||||
|
||||
/* Add a suggestive error reply.
|
||||
* This function is typically invoked by from commands that support
|
||||
* subcommands in response to an unknown subcommand or argument error. */
|
||||
|
|
|
@ -2627,6 +2627,7 @@ void addReplySetLen(client *c, long length);
|
|||
void addReplyAttributeLen(client *c, long length);
|
||||
void addReplyPushLen(client *c, long length);
|
||||
void addReplyHelp(client *c, const char **help);
|
||||
void addExtendedReplyHelp(client *c, const char **help, const char **extended_help);
|
||||
void addReplySubcommandSyntaxError(client *c);
|
||||
void addReplyLoadedModules(client *c);
|
||||
void copyReplicaOutputBuffer(client *dst, client *src);
|
||||
|
|
Loading…
Reference in New Issue