mirror of https://mirror.osredm.com/root/redis.git
Support TLS service when "tls-cluster" is not enabled and persist both plain and TLS port in nodes.conf (#12233)
Originally, when "tls-cluster" is enabled, `port` is set to TLS port. In order to support non-TLS clients, `pport` is used to propagate TCP port across cluster nodes. However when "tls-cluster" is disabled, `port` is set to TCP port, and `pport` is not used, which means the cluster cannot provide TLS service unless "tls-cluster" is on. ``` typedef struct { // ... uint16_t port; /* Latest known clients port (TLS or plain). */ uint16_t pport; /* Latest known clients plaintext port. Only used if the main clients port is for TLS. */ // ... } clusterNode; ``` ``` typedef struct { // ... uint16_t port; /* TCP base port number. */ uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */ // ... } clusterMsg; ``` This PR renames `port` and `pport` in `clusterNode` to `tcp_port` and `tls_port`, to record both ports no matter "tls-cluster" is enabled or disabled. This allows to provide TLS service to clients when "tls-cluster" is disabled: when displaying cluster topology, or giving `MOVED` error, server can provide TLS or TCP port according to client's connection type, no matter what type of connection cluster bus is using. For backwards compatibility, `port` and `pport` in `clusterMsg` are preserved, when "tls-cluster" is enabled, `port` is set to TLS port and `pport` is set to TCP port, when "tls-cluster" is disabled, `port` is set to TCP port and `pport` is set to TLS port (instead of 0). Also, in the nodes.conf file, a new aux field displaying an extra port is added to complete the persisted info. We may have `tls_port=xxxxx` or `tcp_port=xxxxx` in the aux field, to complete the cluster topology, while the other port is stored in the normal `<ip>:<port>` field. The format is shown below. ``` <node-id> <ip>:<tcp_port>@<cport>,<hostname>,shard-id=...,tls-port=6379 myself,master - 0 0 0 connected 0-1000 ``` Or we can switch the position of two ports, both can be correctly resolved. ``` <node-id> <ip>:<tls_port>@<cport>,<hostname>,shard-id=...,tcp-port=6379 myself,master - 0 0 0 connected 0-1000 ```
This commit is contained in:
parent
9600553ef2
commit
22a29935ff
268
src/cluster.c
268
src/cluster.c
|
@ -31,6 +31,7 @@
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
#include "cluster.h"
|
#include "cluster.h"
|
||||||
#include "endianconv.h"
|
#include "endianconv.h"
|
||||||
|
#include "connection.h"
|
||||||
|
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
|
@ -92,8 +93,30 @@ int auxShardIdPresent(clusterNode *n);
|
||||||
int auxHumanNodenameSetter(clusterNode *n, void *value, int length);
|
int auxHumanNodenameSetter(clusterNode *n, void *value, int length);
|
||||||
sds auxHumanNodenameGetter(clusterNode *n, sds s);
|
sds auxHumanNodenameGetter(clusterNode *n, sds s);
|
||||||
int auxHumanNodenamePresent(clusterNode *n);
|
int auxHumanNodenamePresent(clusterNode *n);
|
||||||
|
int auxTcpPortSetter(clusterNode *n, void *value, int length);
|
||||||
|
sds auxTcpPortGetter(clusterNode *n, sds s);
|
||||||
|
int auxTcpPortPresent(clusterNode *n);
|
||||||
|
int auxTlsPortSetter(clusterNode *n, void *value, int length);
|
||||||
|
sds auxTlsPortGetter(clusterNode *n, sds s);
|
||||||
|
int auxTlsPortPresent(clusterNode *n);
|
||||||
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
|
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
|
||||||
|
|
||||||
|
int getNodeDefaultClientPort(clusterNode *n) {
|
||||||
|
return server.tls_cluster ? n->tls_port : n->tcp_port;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int getNodeDefaultReplicationPort(clusterNode *n) {
|
||||||
|
return server.tls_replication ? n->tls_port : n->tcp_port;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int getNodeClientPort(clusterNode *n, int use_tls) {
|
||||||
|
return use_tls ? n->tls_port : n->tcp_port;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int defaultClientPort(void) {
|
||||||
|
return server.tls_cluster ? server.tls_port : server.port;
|
||||||
|
}
|
||||||
|
|
||||||
/* Links to the next and previous entries for keys in the same slot are stored
|
/* Links to the next and previous entries for keys in the same slot are stored
|
||||||
* in the dict entry metadata. See Slot to Key API below. */
|
* in the dict entry metadata. See Slot to Key API below. */
|
||||||
#define dictEntryNextInSlot(de) \
|
#define dictEntryNextInSlot(de) \
|
||||||
|
@ -176,6 +199,8 @@ typedef struct {
|
||||||
typedef enum {
|
typedef enum {
|
||||||
af_shard_id,
|
af_shard_id,
|
||||||
af_human_nodename,
|
af_human_nodename,
|
||||||
|
af_tcp_port,
|
||||||
|
af_tls_port,
|
||||||
af_count,
|
af_count,
|
||||||
} auxFieldIndex;
|
} auxFieldIndex;
|
||||||
|
|
||||||
|
@ -186,6 +211,8 @@ typedef enum {
|
||||||
auxFieldHandler auxFieldHandlers[] = {
|
auxFieldHandler auxFieldHandlers[] = {
|
||||||
{"shard-id", auxShardIdSetter, auxShardIdGetter, auxShardIdPresent},
|
{"shard-id", auxShardIdSetter, auxShardIdGetter, auxShardIdPresent},
|
||||||
{"nodename", auxHumanNodenameSetter, auxHumanNodenameGetter, auxHumanNodenamePresent},
|
{"nodename", auxHumanNodenameSetter, auxHumanNodenameGetter, auxHumanNodenamePresent},
|
||||||
|
{"tcp-port", auxTcpPortSetter, auxTcpPortGetter, auxTcpPortPresent},
|
||||||
|
{"tls-port", auxTlsPortSetter, auxTlsPortGetter, auxTlsPortPresent},
|
||||||
};
|
};
|
||||||
|
|
||||||
int isValidAuxChar(int c) {
|
int isValidAuxChar(int c) {
|
||||||
|
@ -247,6 +274,44 @@ int auxHumanNodenamePresent(clusterNode *n) {
|
||||||
return sdslen(n->human_nodename);
|
return sdslen(n->human_nodename);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int auxTcpPortSetter(clusterNode *n, void *value, int length) {
|
||||||
|
if (length > 5 || length < 1) {
|
||||||
|
return C_ERR;
|
||||||
|
}
|
||||||
|
char buf[length + 1];
|
||||||
|
memcpy(buf, (char*)value, length);
|
||||||
|
buf[length] = '\0';
|
||||||
|
n->tcp_port = atoi(buf);
|
||||||
|
return (n->tcp_port < 0 || n->tcp_port >= 65536) ? C_ERR : C_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
sds auxTcpPortGetter(clusterNode *n, sds s) {
|
||||||
|
return sdscatprintf(s, "%d", n->tcp_port);
|
||||||
|
}
|
||||||
|
|
||||||
|
int auxTcpPortPresent(clusterNode *n) {
|
||||||
|
return n->tcp_port >= 0 && n->tcp_port < 65536;
|
||||||
|
}
|
||||||
|
|
||||||
|
int auxTlsPortSetter(clusterNode *n, void *value, int length) {
|
||||||
|
if (length > 5 || length < 1) {
|
||||||
|
return C_ERR;
|
||||||
|
}
|
||||||
|
char buf[length + 1];
|
||||||
|
memcpy(buf, (char*)value, length);
|
||||||
|
buf[length] = '\0';
|
||||||
|
n->tls_port = atoi(buf);
|
||||||
|
return (n->tls_port < 0 || n->tls_port >= 65536) ? C_ERR : C_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
sds auxTlsPortGetter(clusterNode *n, sds s) {
|
||||||
|
return sdscatprintf(s, "%d", n->tls_port);
|
||||||
|
}
|
||||||
|
|
||||||
|
int auxTlsPortPresent(clusterNode *n) {
|
||||||
|
return n->tls_port >= 0 && n->tls_port < 65536;
|
||||||
|
}
|
||||||
|
|
||||||
/* clusterLink send queue blocks */
|
/* clusterLink send queue blocks */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
size_t totlen; /* Total length of this block including the message */
|
size_t totlen; /* Total length of this block including the message */
|
||||||
|
@ -376,7 +441,8 @@ int clusterLoadConfig(char *filename) {
|
||||||
* the format of "aux=val" where both aux and val can contain
|
* the format of "aux=val" where both aux and val can contain
|
||||||
* characters that pass the isValidAuxChar check only. The order
|
* characters that pass the isValidAuxChar check only. The order
|
||||||
* of the aux fields is insignificant. */
|
* of the aux fields is insignificant. */
|
||||||
|
int aux_tcp_port = 0;
|
||||||
|
int aux_tls_port = 0;
|
||||||
for (int i = 2; i < aux_argc; i++) {
|
for (int i = 2; i < aux_argc; i++) {
|
||||||
int field_argc;
|
int field_argc;
|
||||||
sds *field_argv;
|
sds *field_argv;
|
||||||
|
@ -407,6 +473,8 @@ int clusterLoadConfig(char *filename) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
field_found = 1;
|
field_found = 1;
|
||||||
|
aux_tcp_port |= j == af_tcp_port;
|
||||||
|
aux_tls_port |= j == af_tls_port;
|
||||||
if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) {
|
if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) {
|
||||||
/* Invalid aux field format */
|
/* Invalid aux field format */
|
||||||
sdsfreesplitres(field_argv, field_argc);
|
sdsfreesplitres(field_argv, field_argc);
|
||||||
|
@ -438,11 +506,23 @@ int clusterLoadConfig(char *filename) {
|
||||||
*busp = '\0';
|
*busp = '\0';
|
||||||
busp++;
|
busp++;
|
||||||
}
|
}
|
||||||
n->port = atoi(port);
|
/* If neither TCP or TLS port is found in aux field, it is considered
|
||||||
|
* an old version of nodes.conf file.*/
|
||||||
|
if (!aux_tcp_port && !aux_tls_port) {
|
||||||
|
if (server.tls_cluster) {
|
||||||
|
n->tls_port = atoi(port);
|
||||||
|
} else {
|
||||||
|
n->tcp_port = atoi(port);
|
||||||
|
}
|
||||||
|
} else if (!aux_tcp_port) {
|
||||||
|
n->tcp_port = atoi(port);
|
||||||
|
} else if (!aux_tls_port) {
|
||||||
|
n->tls_port = atoi(port);
|
||||||
|
}
|
||||||
/* In older versions of nodes.conf the "@busport" part is missing.
|
/* In older versions of nodes.conf the "@busport" part is missing.
|
||||||
* In this case we set it to the default offset of 10000 from the
|
* In this case we set it to the default offset of 10000 from the
|
||||||
* base port. */
|
* base port. */
|
||||||
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
|
n->cport = busp ? atoi(busp) : (getNodeDefaultClientPort(n) + CLUSTER_PORT_INCR);
|
||||||
|
|
||||||
/* The plaintext port for client in a TLS cluster (n->pport) is not
|
/* The plaintext port for client in a TLS cluster (n->pport) is not
|
||||||
* stored in nodes.conf. It is received later over the bus protocol. */
|
* stored in nodes.conf. It is received later over the bus protocol. */
|
||||||
|
@ -747,23 +827,20 @@ int clusterLockConfig(char *filename) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Derives our ports to be announced in the cluster bus. */
|
/* Derives our ports to be announced in the cluster bus. */
|
||||||
void deriveAnnouncedPorts(int *announced_port, int *announced_pport,
|
void deriveAnnouncedPorts(int *announced_tcp_port, int *announced_tls_port,
|
||||||
int *announced_cport) {
|
int *announced_cport) {
|
||||||
int port = server.tls_cluster ? server.tls_port : server.port;
|
|
||||||
/* Default announced ports. */
|
|
||||||
*announced_port = port;
|
|
||||||
*announced_pport = server.tls_cluster ? server.port : 0;
|
|
||||||
*announced_cport = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
|
|
||||||
|
|
||||||
/* Config overriding announced ports. */
|
/* Config overriding announced ports. */
|
||||||
if (server.tls_cluster && server.cluster_announce_tls_port) {
|
*announced_tcp_port = server.cluster_announce_port ?
|
||||||
*announced_port = server.cluster_announce_tls_port;
|
server.cluster_announce_port : server.port;
|
||||||
*announced_pport = server.cluster_announce_port;
|
*announced_tls_port = server.cluster_announce_tls_port ?
|
||||||
} else if (server.cluster_announce_port) {
|
server.cluster_announce_tls_port : server.tls_port;
|
||||||
*announced_port = server.cluster_announce_port;
|
/* Derive cluster bus port. */
|
||||||
}
|
|
||||||
if (server.cluster_announce_bus_port) {
|
if (server.cluster_announce_bus_port) {
|
||||||
*announced_cport = server.cluster_announce_bus_port;
|
*announced_cport = server.cluster_announce_bus_port;
|
||||||
|
} else if (server.cluster_port) {
|
||||||
|
*announced_cport = server.cluster_port;
|
||||||
|
} else {
|
||||||
|
*announced_cport = defaultClientPort() + CLUSTER_PORT_INCR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -790,7 +867,7 @@ void clusterUpdateMyselfFlags(void) {
|
||||||
* The option can be set at runtime via CONFIG SET. */
|
* The option can be set at runtime via CONFIG SET. */
|
||||||
void clusterUpdateMyselfAnnouncedPorts(void) {
|
void clusterUpdateMyselfAnnouncedPorts(void) {
|
||||||
if (!myself) return;
|
if (!myself) return;
|
||||||
deriveAnnouncedPorts(&myself->port,&myself->pport,&myself->cport);
|
deriveAnnouncedPorts(&myself->tcp_port,&myself->tls_port,&myself->cport);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We want to take myself->ip in sync with the cluster-announce-ip option.
|
/* We want to take myself->ip in sync with the cluster-announce-ip option.
|
||||||
|
@ -938,7 +1015,7 @@ void clusterInit(void) {
|
||||||
/* Port sanity check II
|
/* Port sanity check II
|
||||||
* The other handshake port check is triggered too late to stop
|
* The other handshake port check is triggered too late to stop
|
||||||
* us from trying to use a too-high cluster port number. */
|
* us from trying to use a too-high cluster port number. */
|
||||||
int port = server.tls_cluster ? server.tls_port : server.port;
|
int port = defaultClientPort();
|
||||||
if (!server.cluster_port && port > (65535-CLUSTER_PORT_INCR)) {
|
if (!server.cluster_port && port > (65535-CLUSTER_PORT_INCR)) {
|
||||||
serverLog(LL_WARNING, "Redis port number too high. "
|
serverLog(LL_WARNING, "Redis port number too high. "
|
||||||
"Cluster communication port is 10,000 port "
|
"Cluster communication port is 10,000 port "
|
||||||
|
@ -959,7 +1036,7 @@ void clusterInit(void) {
|
||||||
|
|
||||||
/* Set myself->port/cport/pport to my listening ports, we'll just need to
|
/* Set myself->port/cport/pport to my listening ports, we'll just need to
|
||||||
* discover the IP address via MEET messages. */
|
* discover the IP address via MEET messages. */
|
||||||
deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
|
deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport);
|
||||||
|
|
||||||
server.cluster->mf_end = 0;
|
server.cluster->mf_end = 0;
|
||||||
server.cluster->mf_slave = NULL;
|
server.cluster->mf_slave = NULL;
|
||||||
|
@ -976,7 +1053,7 @@ void clusterInitListeners(void) {
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
int port = server.tls_cluster ? server.tls_port : server.port;
|
int port = defaultClientPort();
|
||||||
connListener *listener = &server.clistener;
|
connListener *listener = &server.clistener;
|
||||||
listener->count = 0;
|
listener->count = 0;
|
||||||
listener->bindaddr = server.bindaddr;
|
listener->bindaddr = server.bindaddr;
|
||||||
|
@ -1306,9 +1383,9 @@ clusterNode *createClusterNode(char *nodename, int flags) {
|
||||||
memset(node->ip,0,sizeof(node->ip));
|
memset(node->ip,0,sizeof(node->ip));
|
||||||
node->hostname = sdsempty();
|
node->hostname = sdsempty();
|
||||||
node->human_nodename = sdsempty();
|
node->human_nodename = sdsempty();
|
||||||
node->port = 0;
|
node->tcp_port = 0;
|
||||||
node->cport = 0;
|
node->cport = 0;
|
||||||
node->pport = 0;
|
node->tls_port = 0;
|
||||||
node->fail_reports = listCreate();
|
node->fail_reports = listCreate();
|
||||||
node->voted_time = 0;
|
node->voted_time = 0;
|
||||||
node->orphaned_time = 0;
|
node->orphaned_time = 0;
|
||||||
|
@ -1927,7 +2004,7 @@ int clusterHandshakeInProgress(char *ip, int port, int cport) {
|
||||||
|
|
||||||
if (!nodeInHandshake(node)) continue;
|
if (!nodeInHandshake(node)) continue;
|
||||||
if (!strcasecmp(node->ip,ip) &&
|
if (!strcasecmp(node->ip,ip) &&
|
||||||
node->port == port &&
|
getNodeDefaultClientPort(node) == port &&
|
||||||
node->cport == cport) break;
|
node->cport == cport) break;
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
@ -1988,12 +2065,36 @@ int clusterStartHandshake(char *ip, int port, int cport) {
|
||||||
* handshake. */
|
* handshake. */
|
||||||
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
|
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
|
||||||
memcpy(n->ip,norm_ip,sizeof(n->ip));
|
memcpy(n->ip,norm_ip,sizeof(n->ip));
|
||||||
n->port = port;
|
if (server.tls_cluster) {
|
||||||
|
n->tls_port = port;
|
||||||
|
} else {
|
||||||
|
n->tcp_port = port;
|
||||||
|
}
|
||||||
n->cport = cport;
|
n->cport = cport;
|
||||||
clusterAddNode(n);
|
clusterAddNode(n);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void getClientPortFromClusterMsg(clusterMsg *hdr, int *tls_port, int *tcp_port) {
|
||||||
|
if (server.tls_cluster) {
|
||||||
|
*tls_port = ntohs(hdr->port);
|
||||||
|
*tcp_port = ntohs(hdr->pport);
|
||||||
|
} else {
|
||||||
|
*tls_port = ntohs(hdr->pport);
|
||||||
|
*tcp_port = ntohs(hdr->port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void getClientPortFromGossip(clusterMsgDataGossip *g, int *tls_port, int *tcp_port) {
|
||||||
|
if (server.tls_cluster) {
|
||||||
|
*tls_port = ntohs(g->port);
|
||||||
|
*tcp_port = ntohs(g->pport);
|
||||||
|
} else {
|
||||||
|
*tls_port = ntohs(g->pport);
|
||||||
|
*tcp_port = ntohs(g->port);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Process the gossip section of PING or PONG packets.
|
/* Process the gossip section of PING or PONG packets.
|
||||||
* Note that this function assumes that the packet is already sanity-checked
|
* Note that this function assumes that the packet is already sanity-checked
|
||||||
* by the caller, not in the content of the gossip section, but in the
|
* by the caller, not in the content of the gossip section, but in the
|
||||||
|
@ -2019,6 +2120,10 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
|
||||||
sdsfree(ci);
|
sdsfree(ci);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Convert port and pport into TCP port and TLS port. */
|
||||||
|
int msg_tls_port, msg_tcp_port;
|
||||||
|
getClientPortFromGossip(g, &msg_tls_port, &msg_tcp_port);
|
||||||
|
|
||||||
/* Update our state accordingly to the gossip sections */
|
/* Update our state accordingly to the gossip sections */
|
||||||
node = clusterLookupNode(g->nodename, CLUSTER_NAMELEN);
|
node = clusterLookupNode(g->nodename, CLUSTER_NAMELEN);
|
||||||
if (node) {
|
if (node) {
|
||||||
|
@ -2072,13 +2177,14 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
|
||||||
!(flags & CLUSTER_NODE_NOADDR) &&
|
!(flags & CLUSTER_NODE_NOADDR) &&
|
||||||
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
|
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
|
||||||
(strcasecmp(node->ip,g->ip) ||
|
(strcasecmp(node->ip,g->ip) ||
|
||||||
node->port != ntohs(g->port) ||
|
node->tls_port != (server.tls_cluster ? ntohs(g->port) : ntohs(g->pport)) ||
|
||||||
|
node->tcp_port != (server.tls_cluster ? ntohs(g->pport) : ntohs(g->port)) ||
|
||||||
node->cport != ntohs(g->cport)))
|
node->cport != ntohs(g->cport)))
|
||||||
{
|
{
|
||||||
if (node->link) freeClusterLink(node->link);
|
if (node->link) freeClusterLink(node->link);
|
||||||
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
|
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
|
||||||
node->port = ntohs(g->port);
|
node->tcp_port = msg_tcp_port;
|
||||||
node->pport = ntohs(g->pport);
|
node->tls_port = msg_tls_port;
|
||||||
node->cport = ntohs(g->cport);
|
node->cport = ntohs(g->cport);
|
||||||
node->flags &= ~CLUSTER_NODE_NOADDR;
|
node->flags &= ~CLUSTER_NODE_NOADDR;
|
||||||
}
|
}
|
||||||
|
@ -2099,8 +2205,8 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
|
||||||
clusterNode *node;
|
clusterNode *node;
|
||||||
node = createClusterNode(g->nodename, flags);
|
node = createClusterNode(g->nodename, flags);
|
||||||
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
|
memcpy(node->ip,g->ip,NET_IP_STR_LEN);
|
||||||
node->port = ntohs(g->port);
|
node->tcp_port = msg_tcp_port;
|
||||||
node->pport = ntohs(g->pport);
|
node->tls_port = msg_tls_port;
|
||||||
node->cport = ntohs(g->cport);
|
node->cport = ntohs(g->cport);
|
||||||
clusterAddNode(node);
|
clusterAddNode(node);
|
||||||
}
|
}
|
||||||
|
@ -2145,9 +2251,9 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
|
||||||
clusterMsg *hdr)
|
clusterMsg *hdr)
|
||||||
{
|
{
|
||||||
char ip[NET_IP_STR_LEN] = {0};
|
char ip[NET_IP_STR_LEN] = {0};
|
||||||
int port = ntohs(hdr->port);
|
|
||||||
int pport = ntohs(hdr->pport);
|
|
||||||
int cport = ntohs(hdr->cport);
|
int cport = ntohs(hdr->cport);
|
||||||
|
int tcp_port, tls_port;
|
||||||
|
getClientPortFromClusterMsg(hdr, &tls_port, &tcp_port);
|
||||||
|
|
||||||
/* We don't proceed if the link is the same as the sender link, as this
|
/* We don't proceed if the link is the same as the sender link, as this
|
||||||
* function is designed to see if the node link is consistent with the
|
* function is designed to see if the node link is consistent with the
|
||||||
|
@ -2162,23 +2268,23 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
|
||||||
* in the next round of PINGs */
|
* in the next round of PINGs */
|
||||||
if (nodeIp2String(ip,link,hdr->myip) == C_ERR) return 0;
|
if (nodeIp2String(ip,link,hdr->myip) == C_ERR) return 0;
|
||||||
|
|
||||||
if (node->port == port && node->cport == cport && node->pport == pport &&
|
if (node->tcp_port == tcp_port && node->cport == cport && node->tls_port == tls_port &&
|
||||||
strcmp(ip,node->ip) == 0) return 0;
|
strcmp(ip,node->ip) == 0) return 0;
|
||||||
|
|
||||||
/* IP / port is different, update it. */
|
/* IP / port is different, update it. */
|
||||||
memcpy(node->ip,ip,sizeof(ip));
|
memcpy(node->ip,ip,sizeof(ip));
|
||||||
node->port = port;
|
node->tcp_port = tcp_port;
|
||||||
node->pport = pport;
|
node->tls_port = tls_port;
|
||||||
node->cport = cport;
|
node->cport = cport;
|
||||||
if (node->link) freeClusterLink(node->link);
|
if (node->link) freeClusterLink(node->link);
|
||||||
node->flags &= ~CLUSTER_NODE_NOADDR;
|
node->flags &= ~CLUSTER_NODE_NOADDR;
|
||||||
serverLog(LL_NOTICE,"Address updated for node %.40s (%s), now %s:%d",
|
serverLog(LL_NOTICE,"Address updated for node %.40s (%s), now %s:%d",
|
||||||
node->name, node->human_nodename, node->ip, node->port);
|
node->name, node->human_nodename, node->ip, getNodeDefaultClientPort(node));
|
||||||
|
|
||||||
/* Check if this is our master and we have to change the
|
/* Check if this is our master and we have to change the
|
||||||
* replication target as well. */
|
* replication target as well. */
|
||||||
if (nodeIsSlave(myself) && myself->slaveof == node)
|
if (nodeIsSlave(myself) && myself->slaveof == node)
|
||||||
replicationSetMaster(node->ip, node->port);
|
replicationSetMaster(node->ip, getNodeDefaultReplicationPort(node));
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2739,8 +2845,7 @@ int clusterProcessPacket(clusterLink *link) {
|
||||||
|
|
||||||
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
|
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
|
||||||
serverAssert(nodeIp2String(node->ip,link,hdr->myip) == C_OK);
|
serverAssert(nodeIp2String(node->ip,link,hdr->myip) == C_OK);
|
||||||
node->port = ntohs(hdr->port);
|
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
|
||||||
node->pport = ntohs(hdr->pport);
|
|
||||||
node->cport = ntohs(hdr->cport);
|
node->cport = ntohs(hdr->cport);
|
||||||
clusterAddNode(node);
|
clusterAddNode(node);
|
||||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
||||||
|
@ -2802,8 +2907,8 @@ int clusterProcessPacket(clusterLink *link) {
|
||||||
link->node->flags);
|
link->node->flags);
|
||||||
link->node->flags |= CLUSTER_NODE_NOADDR;
|
link->node->flags |= CLUSTER_NODE_NOADDR;
|
||||||
link->node->ip[0] = '\0';
|
link->node->ip[0] = '\0';
|
||||||
link->node->port = 0;
|
link->node->tcp_port = 0;
|
||||||
link->node->pport = 0;
|
link->node->tls_port = 0;
|
||||||
link->node->cport = 0;
|
link->node->cport = 0;
|
||||||
freeClusterLink(link);
|
freeClusterLink(link);
|
||||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
||||||
|
@ -3351,15 +3456,20 @@ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Handle cluster-announce-[tls-|bus-]port. */
|
/* Handle cluster-announce-[tls-|bus-]port. */
|
||||||
int announced_port, announced_pport, announced_cport;
|
int announced_tcp_port, announced_tls_port, announced_cport;
|
||||||
deriveAnnouncedPorts(&announced_port, &announced_pport, &announced_cport);
|
deriveAnnouncedPorts(&announced_tcp_port, &announced_tls_port, &announced_cport);
|
||||||
|
|
||||||
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
|
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
|
||||||
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
|
memset(hdr->slaveof,0,CLUSTER_NAMELEN);
|
||||||
if (myself->slaveof != NULL)
|
if (myself->slaveof != NULL)
|
||||||
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
|
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
|
||||||
hdr->port = htons(announced_port);
|
if (server.tls_cluster) {
|
||||||
hdr->pport = htons(announced_pport);
|
hdr->port = htons(announced_tls_port);
|
||||||
|
hdr->pport = htons(announced_tcp_port);
|
||||||
|
} else {
|
||||||
|
hdr->port = htons(announced_tcp_port);
|
||||||
|
hdr->pport = htons(announced_tls_port);
|
||||||
|
}
|
||||||
hdr->cport = htons(announced_cport);
|
hdr->cport = htons(announced_cport);
|
||||||
hdr->flags = htons(myself->flags);
|
hdr->flags = htons(myself->flags);
|
||||||
hdr->state = server.cluster->state;
|
hdr->state = server.cluster->state;
|
||||||
|
@ -3391,10 +3501,15 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
|
||||||
gossip->ping_sent = htonl(n->ping_sent/1000);
|
gossip->ping_sent = htonl(n->ping_sent/1000);
|
||||||
gossip->pong_received = htonl(n->pong_received/1000);
|
gossip->pong_received = htonl(n->pong_received/1000);
|
||||||
memcpy(gossip->ip,n->ip,sizeof(n->ip));
|
memcpy(gossip->ip,n->ip,sizeof(n->ip));
|
||||||
gossip->port = htons(n->port);
|
if (server.tls_cluster) {
|
||||||
|
gossip->port = htons(n->tls_port);
|
||||||
|
gossip->pport = htons(n->tcp_port);
|
||||||
|
} else {
|
||||||
|
gossip->port = htons(n->tcp_port);
|
||||||
|
gossip->pport = htons(n->tls_port);
|
||||||
|
}
|
||||||
gossip->cport = htons(n->cport);
|
gossip->cport = htons(n->cport);
|
||||||
gossip->flags = htons(n->flags);
|
gossip->flags = htons(n->flags);
|
||||||
gossip->pport = htons(n->pport);
|
|
||||||
gossip->notused1 = 0;
|
gossip->notused1 = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4614,7 +4729,7 @@ void clusterCron(void) {
|
||||||
myself->slaveof &&
|
myself->slaveof &&
|
||||||
nodeHasAddr(myself->slaveof))
|
nodeHasAddr(myself->slaveof))
|
||||||
{
|
{
|
||||||
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
|
replicationSetMaster(myself->slaveof->ip, getNodeDefaultReplicationPort(myself->slaveof));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Abort a manual failover if the timeout is reached. */
|
/* Abort a manual failover if the timeout is reached. */
|
||||||
|
@ -5017,7 +5132,7 @@ void clusterSetMaster(clusterNode *n) {
|
||||||
myself->slaveof = n;
|
myself->slaveof = n;
|
||||||
updateShardId(myself, n->shard_id);
|
updateShardId(myself, n->shard_id);
|
||||||
clusterNodeAddSlave(n,myself);
|
clusterNodeAddSlave(n,myself);
|
||||||
replicationSetMaster(n->ip, n->port);
|
replicationSetMaster(n->ip, getNodeDefaultReplicationPort(n));
|
||||||
resetManualFailover();
|
resetManualFailover();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5076,10 +5191,10 @@ sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_cou
|
||||||
* See clusterGenNodesDescription() top comment for more information.
|
* See clusterGenNodesDescription() top comment for more information.
|
||||||
*
|
*
|
||||||
* The function returns the string representation as an SDS string. */
|
* The function returns the string representation as an SDS string. */
|
||||||
sds clusterGenNodeDescription(client *c, clusterNode *node, int use_pport) {
|
sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
|
||||||
int j, start;
|
int j, start;
|
||||||
sds ci;
|
sds ci;
|
||||||
int port = use_pport && node->pport ? node->pport : node->port;
|
int port = getNodeClientPort(node, tls_primary);
|
||||||
|
|
||||||
/* Node coordinates */
|
/* Node coordinates */
|
||||||
ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
|
ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
|
||||||
|
@ -5097,6 +5212,9 @@ sds clusterGenNodeDescription(client *c, clusterNode *node, int use_pport) {
|
||||||
* to be persisted to nodes.conf */
|
* to be persisted to nodes.conf */
|
||||||
if (c == NULL) {
|
if (c == NULL) {
|
||||||
for (int i = af_count-1; i >=0; i--) {
|
for (int i = af_count-1; i >=0; i--) {
|
||||||
|
if ((tls_primary && i == af_tls_port) || (!tls_primary && i == af_tcp_port)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (auxFieldHandlers[i].isPresent(node)) {
|
if (auxFieldHandlers[i].isPresent(node)) {
|
||||||
ci = sdscatprintf(ci, ",%s=", auxFieldHandlers[i].field);
|
ci = sdscatprintf(ci, ",%s=", auxFieldHandlers[i].field);
|
||||||
ci = auxFieldHandlers[i].getter(node, ci);
|
ci = auxFieldHandlers[i].getter(node, ci);
|
||||||
|
@ -5219,13 +5337,13 @@ void clusterFreeNodesSlotsInfo(clusterNode *n) {
|
||||||
* include all the known nodes in the representation, including nodes in
|
* include all the known nodes in the representation, including nodes in
|
||||||
* the HANDSHAKE state.
|
* the HANDSHAKE state.
|
||||||
*
|
*
|
||||||
* Setting use_pport to 1 in a TLS cluster makes the result contain the
|
* Setting tls_primary to 1 to put TLS port in the main <ip>:<port>
|
||||||
* plaintext client port rather then the TLS client port of each node.
|
* field and put TCP port in aux field, instead of the opposite way.
|
||||||
*
|
*
|
||||||
* The representation obtained using this function is used for the output
|
* The representation obtained using this function is used for the output
|
||||||
* of the CLUSTER NODES function, and as format for the cluster
|
* of the CLUSTER NODES function, and as format for the cluster
|
||||||
* configuration file (nodes.conf) for a given node. */
|
* configuration file (nodes.conf) for a given node. */
|
||||||
sds clusterGenNodesDescription(client *c, int filter, int use_pport) {
|
sds clusterGenNodesDescription(client *c, int filter, int tls_primary) {
|
||||||
sds ci = sdsempty(), ni;
|
sds ci = sdsempty(), ni;
|
||||||
dictIterator *di;
|
dictIterator *di;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
|
@ -5238,7 +5356,7 @@ sds clusterGenNodesDescription(client *c, int filter, int use_pport) {
|
||||||
clusterNode *node = dictGetVal(de);
|
clusterNode *node = dictGetVal(de);
|
||||||
|
|
||||||
if (node->flags & filter) continue;
|
if (node->flags & filter) continue;
|
||||||
ni = clusterGenNodeDescription(c, node, use_pport);
|
ni = clusterGenNodeDescription(c, node, tls_primary);
|
||||||
ci = sdscatsds(ci,ni);
|
ci = sdscatsds(ci,ni);
|
||||||
sdsfree(ni);
|
sdsfree(ni);
|
||||||
ci = sdscatlen(ci,"\n",1);
|
ci = sdscatlen(ci,"\n",1);
|
||||||
|
@ -5426,10 +5544,8 @@ void addNodeToNodeReply(client *c, clusterNode *node) {
|
||||||
serverPanic("Unrecognized preferred endpoint type");
|
serverPanic("Unrecognized preferred endpoint type");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Report non-TLS ports to non-TLS client in TLS cluster if available. */
|
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
|
||||||
int use_pport = (server.tls_cluster &&
|
addReplyLongLong(c, getNodeClientPort(node, connIsTLS(c->conn)));
|
||||||
c->conn && (c->conn->type != connectionTypeTls()));
|
|
||||||
addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port);
|
|
||||||
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
||||||
|
|
||||||
/* Add the additional endpoint information, this is all the known networking information
|
/* Add the additional endpoint information, this is all the known networking information
|
||||||
|
@ -5492,19 +5608,15 @@ void addNodeDetailsToShardReply(client *c, clusterNode *node) {
|
||||||
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
|
||||||
reply_count++;
|
reply_count++;
|
||||||
|
|
||||||
/* We use server.tls_cluster as a proxy for whether or not
|
if (node->tcp_port) {
|
||||||
* the remote port is the tls port or not */
|
|
||||||
int plaintext_port = server.tls_cluster ? node->pport : node->port;
|
|
||||||
int tls_port = server.tls_cluster ? node->port : 0;
|
|
||||||
if (plaintext_port) {
|
|
||||||
addReplyBulkCString(c, "port");
|
addReplyBulkCString(c, "port");
|
||||||
addReplyLongLong(c, plaintext_port);
|
addReplyLongLong(c, node->tcp_port);
|
||||||
reply_count++;
|
reply_count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tls_port) {
|
if (node->tls_port) {
|
||||||
addReplyBulkCString(c, "tls-port");
|
addReplyBulkCString(c, "tls-port");
|
||||||
addReplyLongLong(c, tls_port);
|
addReplyLongLong(c, node->tls_port);
|
||||||
reply_count++;
|
reply_count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5783,14 +5895,14 @@ NULL
|
||||||
long long port, cport;
|
long long port, cport;
|
||||||
|
|
||||||
if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
|
if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
|
||||||
addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
|
addReplyErrorFormat(c,"Invalid base port specified: %s",
|
||||||
(char*)c->argv[3]->ptr);
|
(char*)c->argv[3]->ptr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (c->argc == 5) {
|
if (c->argc == 5) {
|
||||||
if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
|
if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
|
||||||
addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
|
addReplyErrorFormat(c,"Invalid bus port specified: %s",
|
||||||
(char*)c->argv[4]->ptr);
|
(char*)c->argv[4]->ptr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -5808,11 +5920,8 @@ NULL
|
||||||
}
|
}
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
|
||||||
/* CLUSTER NODES */
|
/* CLUSTER NODES */
|
||||||
/* Report plaintext ports, only if cluster is TLS but client is known to
|
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
|
||||||
* be non-TLS). */
|
sds nodes = clusterGenNodesDescription(c, 0, connIsTLS(c->conn));
|
||||||
int use_pport = (server.tls_cluster &&
|
|
||||||
c->conn && (c->conn->type != connectionTypeTls()));
|
|
||||||
sds nodes = clusterGenNodesDescription(c, 0, use_pport);
|
|
||||||
addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
|
addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
|
||||||
sdsfree(nodes);
|
sdsfree(nodes);
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
|
||||||
|
@ -6174,12 +6283,10 @@ NULL
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Use plaintext port if cluster is TLS but client is non-TLS. */
|
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
|
||||||
int use_pport = (server.tls_cluster &&
|
|
||||||
c->conn && (c->conn->type != connectionTypeTls()));
|
|
||||||
addReplyArrayLen(c,n->numslaves);
|
addReplyArrayLen(c,n->numslaves);
|
||||||
for (j = 0; j < n->numslaves; j++) {
|
for (j = 0; j < n->numslaves; j++) {
|
||||||
sds ni = clusterGenNodeDescription(c, n->slaves[j], use_pport);
|
sds ni = clusterGenNodeDescription(c, n->slaves[j], connIsTLS(c->conn));
|
||||||
addReplyBulkCString(c,ni);
|
addReplyBulkCString(c,ni);
|
||||||
sdsfree(ni);
|
sdsfree(ni);
|
||||||
}
|
}
|
||||||
|
@ -7304,11 +7411,8 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
|
||||||
} else if (error_code == CLUSTER_REDIR_MOVED ||
|
} else if (error_code == CLUSTER_REDIR_MOVED ||
|
||||||
error_code == CLUSTER_REDIR_ASK)
|
error_code == CLUSTER_REDIR_ASK)
|
||||||
{
|
{
|
||||||
/* Redirect to IP:port. Include plaintext port if cluster is TLS but
|
/* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
|
||||||
* client is non-TLS. */
|
int port = getNodeClientPort(n, connIsTLS(c->conn));
|
||||||
int use_pport = (server.tls_cluster &&
|
|
||||||
c->conn && (c->conn->type != connectionTypeTls()));
|
|
||||||
int port = use_pport && n->pport ? n->pport : n->port;
|
|
||||||
addReplyErrorSds(c,sdscatprintf(sdsempty(),
|
addReplyErrorSds(c,sdscatprintf(sdsempty(),
|
||||||
"-%s %d %s:%d",
|
"-%s %d %s:%d",
|
||||||
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
|
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
|
||||||
|
|
|
@ -142,9 +142,8 @@ typedef struct clusterNode {
|
||||||
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
|
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
|
||||||
sds hostname; /* The known hostname for this node */
|
sds hostname; /* The known hostname for this node */
|
||||||
sds human_nodename; /* The known human readable nodename for this node */
|
sds human_nodename; /* The known human readable nodename for this node */
|
||||||
int port; /* Latest known clients port (TLS or plain). */
|
int tcp_port; /* Latest known clients TCP port. */
|
||||||
int pport; /* Latest known clients plaintext port. Only used
|
int tls_port; /* Latest known clients TLS port */
|
||||||
if the main clients port is for TLS. */
|
|
||||||
int cport; /* Latest known cluster port of this node. */
|
int cport; /* Latest known cluster port of this node. */
|
||||||
clusterLink *link; /* TCP/IP link established toward this node */
|
clusterLink *link; /* TCP/IP link established toward this node */
|
||||||
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
|
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
|
||||||
|
@ -226,10 +225,10 @@ typedef struct {
|
||||||
uint32_t ping_sent;
|
uint32_t ping_sent;
|
||||||
uint32_t pong_received;
|
uint32_t pong_received;
|
||||||
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
|
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
|
||||||
uint16_t port; /* base port last time it was seen */
|
uint16_t port; /* primary port last time it was seen */
|
||||||
uint16_t cport; /* cluster port last time it was seen */
|
uint16_t cport; /* cluster port last time it was seen */
|
||||||
uint16_t flags; /* node->flags copy */
|
uint16_t flags; /* node->flags copy */
|
||||||
uint16_t pport; /* plaintext-port, when base port is TLS */
|
uint16_t pport; /* secondary port last time it was seen */
|
||||||
uint16_t notused1;
|
uint16_t notused1;
|
||||||
} clusterMsgDataGossip;
|
} clusterMsgDataGossip;
|
||||||
|
|
||||||
|
@ -338,7 +337,7 @@ typedef struct {
|
||||||
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
|
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
|
||||||
uint32_t totlen; /* Total length of this message */
|
uint32_t totlen; /* Total length of this message */
|
||||||
uint16_t ver; /* Protocol version, currently set to 1. */
|
uint16_t ver; /* Protocol version, currently set to 1. */
|
||||||
uint16_t port; /* TCP base port number. */
|
uint16_t port; /* Primary port number (TCP or TLS). */
|
||||||
uint16_t type; /* Message type */
|
uint16_t type; /* Message type */
|
||||||
uint16_t count; /* Only used for some kind of messages. */
|
uint16_t count; /* Only used for some kind of messages. */
|
||||||
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
|
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
|
||||||
|
@ -353,7 +352,8 @@ typedef struct {
|
||||||
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
|
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
|
||||||
uint16_t extensions; /* Number of extensions sent along with this packet. */
|
uint16_t extensions; /* Number of extensions sent along with this packet. */
|
||||||
char notused1[30]; /* 30 bytes reserved for future usage. */
|
char notused1[30]; /* 30 bytes reserved for future usage. */
|
||||||
uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
|
uint16_t pport; /* Secondary port number: if primary port is TCP port, this is
|
||||||
|
TLS port, and if primary port is TLS port, this is TCP port.*/
|
||||||
uint16_t cport; /* Sender TCP cluster bus port */
|
uint16_t cport; /* Sender TCP cluster bus port */
|
||||||
uint16_t flags; /* Sender node flags */
|
uint16_t flags; /* Sender node flags */
|
||||||
unsigned char state; /* Cluster state from the POV of the sender */
|
unsigned char state; /* Cluster state from the POV of the sender */
|
||||||
|
@ -429,9 +429,11 @@ void slotToChannelAdd(sds channel);
|
||||||
void slotToChannelDel(sds channel);
|
void slotToChannelDel(sds channel);
|
||||||
void clusterUpdateMyselfHostname(void);
|
void clusterUpdateMyselfHostname(void);
|
||||||
void clusterUpdateMyselfAnnouncedPorts(void);
|
void clusterUpdateMyselfAnnouncedPorts(void);
|
||||||
sds clusterGenNodesDescription(client *c, int filter, int use_pport);
|
sds clusterGenNodesDescription(client *c, int filter, int tls_primary);
|
||||||
sds genClusterInfoString(void);
|
sds genClusterInfoString(void);
|
||||||
void freeClusterLink(clusterLink *link);
|
void freeClusterLink(clusterLink *link);
|
||||||
void clusterUpdateMyselfHumanNodename(void);
|
void clusterUpdateMyselfHumanNodename(void);
|
||||||
int isValidAuxString(char *s, unsigned int length);
|
int isValidAuxString(char *s, unsigned int length);
|
||||||
|
int getNodeDefaultClientPort(clusterNode *n);
|
||||||
|
|
||||||
#endif /* __CLUSTER_H */
|
#endif /* __CLUSTER_H */
|
||||||
|
|
|
@ -446,4 +446,9 @@ int RedisRegisterConnectionTypeSocket(void);
|
||||||
int RedisRegisterConnectionTypeUnix(void);
|
int RedisRegisterConnectionTypeUnix(void);
|
||||||
int RedisRegisterConnectionTypeTLS(void);
|
int RedisRegisterConnectionTypeTLS(void);
|
||||||
|
|
||||||
|
/* Return 1 if connection is using TLS protocol, 0 if otherwise. */
|
||||||
|
static inline int connIsTLS(connection *conn) {
|
||||||
|
return conn && conn->type == connectionTypeTls();
|
||||||
|
}
|
||||||
|
|
||||||
#endif /* __REDIS_CONNECTION_H */
|
#endif /* __REDIS_CONNECTION_H */
|
||||||
|
|
|
@ -8944,7 +8944,7 @@ int RM_GetClusterNodeInfo(RedisModuleCtx *ctx, const char *id, char *ip, char *m
|
||||||
else
|
else
|
||||||
memset(master_id,0,REDISMODULE_NODE_ID_LEN);
|
memset(master_id,0,REDISMODULE_NODE_ID_LEN);
|
||||||
}
|
}
|
||||||
if (port) *port = node->port;
|
if (port) *port = getNodeDefaultClientPort(node);
|
||||||
|
|
||||||
/* As usually we have to remap flags for modules, in order to ensure
|
/* As usually we have to remap flags for modules, in order to ensure
|
||||||
* we can provide binary compatibility. */
|
* we can provide binary compatibility. */
|
||||||
|
|
|
@ -70,9 +70,18 @@ proc continuous_slot_allocation {masters} {
|
||||||
# tests run.
|
# tests run.
|
||||||
proc cluster_setup {masters node_count slot_allocator code} {
|
proc cluster_setup {masters node_count slot_allocator code} {
|
||||||
# Have all nodes meet
|
# Have all nodes meet
|
||||||
for {set i 1} {$i < $node_count} {incr i} {
|
if {$::tls} {
|
||||||
R 0 CLUSTER MEET [srv -$i host] [srv -$i port]
|
set tls_cluster [lindex [R 0 CONFIG GET tls-cluster] 1]
|
||||||
}
|
}
|
||||||
|
if {$::tls && !$tls_cluster} {
|
||||||
|
for {set i 1} {$i < $node_count} {incr i} {
|
||||||
|
R 0 CLUSTER MEET [srv -$i host] [srv -$i pport]
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for {set i 1} {$i < $node_count} {incr i} {
|
||||||
|
R 0 CLUSTER MEET [srv -$i host] [srv -$i port]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$slot_allocator $masters
|
$slot_allocator $masters
|
||||||
|
|
||||||
|
|
|
@ -496,7 +496,8 @@ proc start_server {options {code undefined}} {
|
||||||
# start every server on a different port
|
# start every server on a different port
|
||||||
set port [find_available_port $::baseport $::portcount]
|
set port [find_available_port $::baseport $::portcount]
|
||||||
if {$::tls} {
|
if {$::tls} {
|
||||||
dict set config "port" 0
|
set pport [find_available_port $::baseport $::portcount]
|
||||||
|
dict set config "port" $pport
|
||||||
dict set config "tls-port" $port
|
dict set config "tls-port" $port
|
||||||
dict set config "tls-cluster" "yes"
|
dict set config "tls-cluster" "yes"
|
||||||
dict set config "tls-replication" "yes"
|
dict set config "tls-replication" "yes"
|
||||||
|
@ -567,6 +568,8 @@ proc start_server {options {code undefined}} {
|
||||||
puts "Port $port was already busy, trying another port..."
|
puts "Port $port was already busy, trying another port..."
|
||||||
set port [find_available_port $::baseport $::portcount]
|
set port [find_available_port $::baseport $::portcount]
|
||||||
if {$::tls} {
|
if {$::tls} {
|
||||||
|
set pport [find_available_port $::baseport $::portcount]
|
||||||
|
dict set config port $pport
|
||||||
dict set config "tls-port" $port
|
dict set config "tls-port" $port
|
||||||
} else {
|
} else {
|
||||||
dict set config port $port
|
dict set config port $port
|
||||||
|
@ -615,6 +618,9 @@ proc start_server {options {code undefined}} {
|
||||||
dict set srv "stdout" $stdout
|
dict set srv "stdout" $stdout
|
||||||
dict set srv "stderr" $stderr
|
dict set srv "stderr" $stderr
|
||||||
dict set srv "unixsocket" $unixsocket
|
dict set srv "unixsocket" $unixsocket
|
||||||
|
if {$::tls} {
|
||||||
|
dict set srv "pport" $pport
|
||||||
|
}
|
||||||
|
|
||||||
# if a block of code is supplied, we wait for the server to become
|
# if a block of code is supplied, we wait for the server to become
|
||||||
# available, create a client object and kill the server afterwards
|
# available, create a client object and kill the server afterwards
|
||||||
|
|
|
@ -102,6 +102,7 @@ set ::all_tests {
|
||||||
unit/cluster/multi-slot-operations
|
unit/cluster/multi-slot-operations
|
||||||
unit/cluster/slot-ownership
|
unit/cluster/slot-ownership
|
||||||
unit/cluster/links
|
unit/cluster/links
|
||||||
|
unit/cluster/cluster-response-tls
|
||||||
}
|
}
|
||||||
# Index to the next test to run in the ::all_tests list.
|
# Index to the next test to run in the ::all_tests list.
|
||||||
set ::next_test 0
|
set ::next_test 0
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
source tests/support/cluster.tcl
|
||||||
|
|
||||||
|
proc get_port_from_moved_error {e} {
|
||||||
|
set ip_port [lindex [split $e " "] 2]
|
||||||
|
return [lindex [split $ip_port ":"] 1]
|
||||||
|
}
|
||||||
|
|
||||||
|
proc get_pport_by_port {port} {
|
||||||
|
foreach srv $::servers {
|
||||||
|
set srv_port [dict get $srv port]
|
||||||
|
if {$port == $srv_port} {
|
||||||
|
return [dict get $srv pport]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
proc get_port_from_node_info {line} {
|
||||||
|
set fields [split $line " "]
|
||||||
|
set addr [lindex $fields 1]
|
||||||
|
set ip_port [lindex [split $addr "@"] 0]
|
||||||
|
return [lindex [split $ip_port ":"] 1]
|
||||||
|
}
|
||||||
|
|
||||||
|
proc cluster_response_tls {tls_cluster} {
|
||||||
|
|
||||||
|
test "CLUSTER SLOTS with different connection type -- tls-cluster $tls_cluster" {
|
||||||
|
set slots1 [R 0 cluster slots]
|
||||||
|
set pport [srv 0 pport]
|
||||||
|
set cluster_client [redis_cluster 127.0.0.1:$pport 0]
|
||||||
|
set slots2 [$cluster_client cluster slots]
|
||||||
|
$cluster_client close
|
||||||
|
# Compare the ports in the first row
|
||||||
|
assert_no_match [lindex $slots1 0 2 1] [lindex $slots2 0 2 1]
|
||||||
|
}
|
||||||
|
|
||||||
|
test "CLUSTER NODES return port according to connection type -- tls-cluster $tls_cluster" {
|
||||||
|
set nodes [R 0 cluster nodes]
|
||||||
|
set port1 [get_port_from_node_info [lindex [split $nodes "\r\n"] 0]]
|
||||||
|
set pport [srv 0 pport]
|
||||||
|
set cluster_client [redis_cluster 127.0.0.1:$pport 0]
|
||||||
|
set nodes [$cluster_client cluster nodes]
|
||||||
|
set port2 [get_port_from_node_info [lindex [split $nodes "\r\n"] 0]]
|
||||||
|
$cluster_client close
|
||||||
|
assert_not_equal $port1 $port2
|
||||||
|
}
|
||||||
|
|
||||||
|
set cluster [redis_cluster 127.0.0.1:[srv 0 port]]
|
||||||
|
set cluster_pport [redis_cluster 127.0.0.1:[srv 0 pport] 0]
|
||||||
|
$cluster refresh_nodes_map
|
||||||
|
|
||||||
|
test "Set many keys in the cluster -- tls-cluster $tls_cluster" {
|
||||||
|
for {set i 0} {$i < 5000} {incr i} {
|
||||||
|
$cluster set $i $i
|
||||||
|
assert { [$cluster get $i] eq $i }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Test cluster responses during migration of slot x -- tls-cluster $tls_cluster" {
|
||||||
|
set slot 10
|
||||||
|
array set nodefrom [$cluster masternode_for_slot $slot]
|
||||||
|
array set nodeto [$cluster masternode_notfor_slot $slot]
|
||||||
|
$nodeto(link) cluster setslot $slot importing $nodefrom(id)
|
||||||
|
$nodefrom(link) cluster setslot $slot migrating $nodeto(id)
|
||||||
|
|
||||||
|
# Get a key from that slot
|
||||||
|
set key [$nodefrom(link) cluster GETKEYSINSLOT $slot "1"]
|
||||||
|
# MOVED REPLY
|
||||||
|
catch {$nodeto(link) set $key "newVal"} e_moved1
|
||||||
|
assert_match "*MOVED*" $e_moved1
|
||||||
|
# ASK REPLY
|
||||||
|
catch {$nodefrom(link) set "abc{$key}" "newVal"} e_ask1
|
||||||
|
assert_match "*ASK*" $e_ask1
|
||||||
|
|
||||||
|
# UNSTABLE REPLY
|
||||||
|
assert_error "*TRYAGAIN*" {$nodefrom(link) mset "a{$key}" "newVal" $key "newVal2"}
|
||||||
|
|
||||||
|
# Connecting using another protocol
|
||||||
|
array set nodefrom_pport [$cluster_pport masternode_for_slot $slot]
|
||||||
|
array set nodeto_pport [$cluster_pport masternode_notfor_slot $slot]
|
||||||
|
|
||||||
|
# MOVED REPLY
|
||||||
|
catch {$nodeto_pport(link) set $key "newVal"} e_moved2
|
||||||
|
assert_match "*MOVED*" $e_moved2
|
||||||
|
# ASK REPLY
|
||||||
|
catch {$nodefrom_pport(link) set "abc{$key}" "newVal"} e_ask2
|
||||||
|
assert_match "*ASK*" $e_ask2
|
||||||
|
# Compare MOVED error's port
|
||||||
|
set port1 [get_port_from_moved_error $e_moved1]
|
||||||
|
set port2 [get_port_from_moved_error $e_moved2]
|
||||||
|
assert_not_equal $port1 $port2
|
||||||
|
assert_equal $port1 $nodefrom(port)
|
||||||
|
assert_equal $port2 [get_pport_by_port $nodefrom(port)]
|
||||||
|
# Compare ASK error's port
|
||||||
|
set port1 [get_port_from_moved_error $e_ask1]
|
||||||
|
set port2 [get_port_from_moved_error $e_ask2]
|
||||||
|
assert_not_equal $port1 $port2
|
||||||
|
assert_equal $port1 $nodeto(port)
|
||||||
|
assert_equal $port2 [get_pport_by_port $nodeto(port)]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if {$::tls} {
|
||||||
|
start_cluster 3 3 {tags {external:skip cluster tls} overrides {tls-cluster yes tls-replication yes}} {
|
||||||
|
cluster_response_tls yes
|
||||||
|
}
|
||||||
|
start_cluster 3 3 {tags {external:skip cluster tls} overrides {tls-cluster no tls-replication no}} {
|
||||||
|
cluster_response_tls no
|
||||||
|
}
|
||||||
|
}
|
|
@ -404,13 +404,14 @@ start_server {tags {"other external:skip"}} {
|
||||||
assert_match "*/redis-server" [lindex $cmdline 1]
|
assert_match "*/redis-server" [lindex $cmdline 1]
|
||||||
|
|
||||||
if {$::tls} {
|
if {$::tls} {
|
||||||
set expect_port 0
|
set expect_port [srv 0 pport]
|
||||||
set expect_tls_port [srv 0 port]
|
set expect_tls_port [srv 0 port]
|
||||||
|
set port [srv 0 pport]
|
||||||
} else {
|
} else {
|
||||||
set expect_port [srv 0 port]
|
set expect_port [srv 0 port]
|
||||||
set expect_tls_port 0
|
set expect_tls_port 0
|
||||||
|
set port [srv 0 port]
|
||||||
}
|
}
|
||||||
set port [srv 0 port]
|
|
||||||
|
|
||||||
assert_equal "$::host:$port" [lindex $cmdline 2]
|
assert_equal "$::host:$port" [lindex $cmdline 2]
|
||||||
assert_equal $expect_port [lindex $cmdline 3]
|
assert_equal $expect_port [lindex $cmdline 3]
|
||||||
|
|
Loading…
Reference in New Issue