mirror of https://mirror.osredm.com/root/redis.git
Make redis-cli support PSYNC command (#11647)
The current redis-cli does not support the real PSYNC command, the older version of redis-cli can support PSYNC is because that we actually issue the SYNC command instead of PSYNC, so it act like SYNC (always full-sync). Noted that in this case we will send the SYNC first (triggered by sendSync), then send the PSYNC (the one in redis-cli input). Didn't bother to find which version that the order changed, we send PSYNC first (the one in redis-cli input), and then send the SYNC (the one triggered by sendSync). So even full-sync is not working anymore, and it will result this output (mentioned in issue #11246): ``` psync dummy 0 Entering replica output mode... (press Ctrl-C to quit) SYNC with master, discarding bytes of bulk transfer until EOF marker... Error reading RDB payload while SYNCing ``` This PR adds PSYNC support to redis-cli, which can handle +FULLRESYNC and +CONTINUE responses, and some examples will follow. Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
c8052122a2
commit
4ef4c4a686
|
@ -284,7 +284,7 @@ static struct pref {
|
|||
|
||||
static volatile sig_atomic_t force_cancel_loop = 0;
|
||||
static void usage(int err);
|
||||
static void slaveMode(void);
|
||||
static void slaveMode(int send_sync);
|
||||
char *redisGitSHA1(void);
|
||||
char *redisGitDirty(void);
|
||||
static int cliConnect(int flags);
|
||||
|
@ -1848,7 +1848,7 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
|
|||
|
||||
if (config.slave_mode) {
|
||||
printf("Entering replica output mode... (press Ctrl-C to quit)\n");
|
||||
slaveMode();
|
||||
slaveMode(0);
|
||||
config.slave_mode = 0;
|
||||
zfree(argvlen);
|
||||
return REDIS_ERR; /* Error = slaveMode lost connection to master */
|
||||
|
@ -7695,8 +7695,15 @@ static ssize_t readConn(redisContext *c, char *buf, size_t len)
|
|||
|
||||
/* Sends SYNC and reads the number of bytes in the payload. Used both by
|
||||
* slaveMode() and getRDB().
|
||||
* returns 0 in case an EOF marker is used. */
|
||||
unsigned long long sendSync(redisContext *c, char *out_eof) {
|
||||
*
|
||||
* send_sync if 1 means we will explicitly send SYNC command. If 0 means
|
||||
* we will not send SYNC command, will send the command that in c->obuf.
|
||||
*
|
||||
* Returns the size of the RDB payload to read, or 0 in case an EOF marker is used and the size
|
||||
* is unknown, also returns 0 in case a PSYNC +CONTINUE was found (no RDB payload).
|
||||
*
|
||||
* The out_full_mode parameter if 1 means this is a full sync, if 0 means this is partial mode. */
|
||||
unsigned long long sendSync(redisContext *c, int send_sync, char *out_eof, int *out_full_mode) {
|
||||
/* To start we need to send the SYNC command and return the payload.
|
||||
* The hiredis client lib does not understand this part of the protocol
|
||||
* and we don't want to mess with its buffers, so everything is performed
|
||||
|
@ -7704,10 +7711,20 @@ unsigned long long sendSync(redisContext *c, char *out_eof) {
|
|||
char buf[4096], *p;
|
||||
ssize_t nread;
|
||||
|
||||
/* Send the SYNC command. */
|
||||
if (cliWriteConn(c, "SYNC\r\n", 6) != 6) {
|
||||
fprintf(stderr,"Error writing to master\n");
|
||||
exit(1);
|
||||
if (out_full_mode) *out_full_mode = 1;
|
||||
|
||||
if (send_sync) {
|
||||
/* Send the SYNC command. */
|
||||
if (cliWriteConn(c, "SYNC\r\n", 6) != 6) {
|
||||
fprintf(stderr,"Error writing to master\n");
|
||||
exit(1);
|
||||
}
|
||||
} else {
|
||||
/* We have written the command into c->obuf before. */
|
||||
if (cliWriteConn(c, "", 0) != 0) {
|
||||
fprintf(stderr,"Error writing to master\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
/* Read $<payload>\r\n, making sure to read just up to "\n" */
|
||||
|
@ -7720,12 +7737,41 @@ unsigned long long sendSync(redisContext *c, char *out_eof) {
|
|||
}
|
||||
if (*p == '\n' && p != buf) break;
|
||||
if (*p != '\n') p++;
|
||||
if (p >= buf + sizeof(buf) - 1) break; /* Go back one more char for null-term. */
|
||||
}
|
||||
*p = '\0';
|
||||
if (buf[0] == '-') {
|
||||
fprintf(stderr, "SYNC with master failed: %s\n", buf);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
/* Handling PSYNC responses.
|
||||
* Read +FULLRESYNC <replid> <offset>\r\n, after that is the $<payload> or the $EOF:<40 bytes delimiter>
|
||||
* Read +CONTINUE <replid>\r\n or +CONTINUE\r\n, after that is the command stream */
|
||||
if (!strncmp(buf, "+FULLRESYNC", 11) ||
|
||||
!strncmp(buf, "+CONTINUE", 9))
|
||||
{
|
||||
int sync_partial = !strncmp(buf, "+CONTINUE", 9);
|
||||
fprintf(stderr, "PSYNC replied %s\n", buf);
|
||||
p = buf;
|
||||
while(1) {
|
||||
nread = readConn(c,p,1);
|
||||
if (nread <= 0) {
|
||||
fprintf(stderr,"Error reading bulk length while PSYNCing\n");
|
||||
exit(1);
|
||||
}
|
||||
if (*p == '\n' && p != buf) break;
|
||||
if (*p != '\n') p++;
|
||||
if (p >= buf + sizeof(buf) - 1) break; /* Go back one more char for null-term. */
|
||||
}
|
||||
*p = '\0';
|
||||
|
||||
if (sync_partial) {
|
||||
if (out_full_mode) *out_full_mode = 0;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= RDB_EOF_MARK_SIZE) {
|
||||
memcpy(out_eof, buf+5, RDB_EOF_MARK_SIZE);
|
||||
return 0;
|
||||
|
@ -7733,33 +7779,39 @@ unsigned long long sendSync(redisContext *c, char *out_eof) {
|
|||
return strtoull(buf+1,NULL,10);
|
||||
}
|
||||
|
||||
static void slaveMode(void) {
|
||||
static void slaveMode(int send_sync) {
|
||||
static char eofmark[RDB_EOF_MARK_SIZE];
|
||||
static char lastbytes[RDB_EOF_MARK_SIZE];
|
||||
static int usemark = 0;
|
||||
unsigned long long payload = sendSync(context,eofmark);
|
||||
static int out_full_mode;
|
||||
unsigned long long payload = sendSync(context, send_sync, eofmark, &out_full_mode);
|
||||
char buf[1024];
|
||||
int original_output = config.output;
|
||||
char *info = out_full_mode ? "Full resync" : "Partial resync";
|
||||
|
||||
if (payload == 0) {
|
||||
if (out_full_mode == 1 && payload == 0) {
|
||||
/* SYNC with EOF marker or PSYNC +FULLRESYNC with EOF marker. */
|
||||
payload = ULLONG_MAX;
|
||||
memset(lastbytes,0,RDB_EOF_MARK_SIZE);
|
||||
usemark = 1;
|
||||
fprintf(stderr,"SYNC with master, discarding "
|
||||
"bytes of bulk transfer until EOF marker...\n");
|
||||
} else {
|
||||
fprintf(stderr,"SYNC with master, discarding %llu "
|
||||
"bytes of bulk transfer...\n", payload);
|
||||
fprintf(stderr, "%s with master, discarding "
|
||||
"bytes of bulk transfer until EOF marker...\n", info);
|
||||
} else if (out_full_mode == 1 && payload != 0) {
|
||||
/* SYNC without EOF marker or PSYNC +FULLRESYNC. */
|
||||
fprintf(stderr, "%s with master, discarding %llu "
|
||||
"bytes of bulk transfer...\n", info, payload);
|
||||
} else if (out_full_mode == 0 && payload == 0) {
|
||||
/* PSYNC +CONTINUE (no RDB payload). */
|
||||
fprintf(stderr, "%s with master...\n", info);
|
||||
}
|
||||
|
||||
|
||||
/* Discard the payload. */
|
||||
while(payload) {
|
||||
ssize_t nread;
|
||||
|
||||
nread = readConn(context,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
|
||||
if (nread <= 0) {
|
||||
fprintf(stderr,"Error reading RDB payload while SYNCing\n");
|
||||
fprintf(stderr,"Error reading RDB payload while %sing\n", info);
|
||||
exit(1);
|
||||
}
|
||||
payload -= nread;
|
||||
|
@ -7780,12 +7832,12 @@ static void slaveMode(void) {
|
|||
|
||||
if (usemark) {
|
||||
unsigned long long offset = ULLONG_MAX - payload;
|
||||
fprintf(stderr,"SYNC done after %llu bytes. Logging commands from master.\n", offset);
|
||||
fprintf(stderr,"%s done after %llu bytes. Logging commands from master.\n", info, offset);
|
||||
/* put the slave online */
|
||||
sleep(1);
|
||||
sendReplconf("ACK", "0");
|
||||
} else
|
||||
fprintf(stderr,"SYNC done. Logging commands from master.\n");
|
||||
fprintf(stderr,"%s done. Logging commands from master.\n", info);
|
||||
|
||||
/* Now we can use hiredis to read the incoming protocol. */
|
||||
config.output = OUTPUT_CSV;
|
||||
|
@ -7814,7 +7866,7 @@ static void getRDB(clusterManagerNode *node) {
|
|||
static char eofmark[RDB_EOF_MARK_SIZE];
|
||||
static char lastbytes[RDB_EOF_MARK_SIZE];
|
||||
static int usemark = 0;
|
||||
unsigned long long payload = sendSync(s, eofmark);
|
||||
unsigned long long payload = sendSync(s, 1, eofmark, NULL);
|
||||
char buf[4096];
|
||||
|
||||
if (payload == 0) {
|
||||
|
@ -9027,7 +9079,7 @@ int main(int argc, char **argv) {
|
|||
if (cliConnect(0) == REDIS_ERR) exit(1);
|
||||
sendCapa();
|
||||
sendReplconf("rdb-filter-only", "");
|
||||
slaveMode();
|
||||
slaveMode(1);
|
||||
}
|
||||
|
||||
/* Get RDB/functions mode. */
|
||||
|
|
Loading…
Reference in New Issue