From 4ef4c4a686b6edaf825635d942b698349f67bcc6 Mon Sep 17 00:00:00 2001 From: Binbin Date: Wed, 4 Jan 2023 17:13:22 +0800 Subject: [PATCH] Make redis-cli support PSYNC command (#11647) The current redis-cli does not support the real PSYNC command, the older version of redis-cli can support PSYNC is because that we actually issue the SYNC command instead of PSYNC, so it act like SYNC (always full-sync). Noted that in this case we will send the SYNC first (triggered by sendSync), then send the PSYNC (the one in redis-cli input). Didn't bother to find which version that the order changed, we send PSYNC first (the one in redis-cli input), and then send the SYNC (the one triggered by sendSync). So even full-sync is not working anymore, and it will result this output (mentioned in issue #11246): ``` psync dummy 0 Entering replica output mode... (press Ctrl-C to quit) SYNC with master, discarding bytes of bulk transfer until EOF marker... Error reading RDB payload while SYNCing ``` This PR adds PSYNC support to redis-cli, which can handle +FULLRESYNC and +CONTINUE responses, and some examples will follow. Co-authored-by: Oran Agra --- src/redis-cli.c | 96 +++++++++++++++++++++++++++++++++++++------------ 1 file changed, 74 insertions(+), 22 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 147231540..c8576702c 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -284,7 +284,7 @@ static struct pref { static volatile sig_atomic_t force_cancel_loop = 0; static void usage(int err); -static void slaveMode(void); +static void slaveMode(int send_sync); char *redisGitSHA1(void); char *redisGitDirty(void); static int cliConnect(int flags); @@ -1848,7 +1848,7 @@ static int cliSendCommand(int argc, char **argv, long repeat) { if (config.slave_mode) { printf("Entering replica output mode... (press Ctrl-C to quit)\n"); - slaveMode(); + slaveMode(0); config.slave_mode = 0; zfree(argvlen); return REDIS_ERR; /* Error = slaveMode lost connection to master */ @@ -7695,8 +7695,15 @@ static ssize_t readConn(redisContext *c, char *buf, size_t len) /* Sends SYNC and reads the number of bytes in the payload. Used both by * slaveMode() and getRDB(). - * returns 0 in case an EOF marker is used. */ -unsigned long long sendSync(redisContext *c, char *out_eof) { + * + * send_sync if 1 means we will explicitly send SYNC command. If 0 means + * we will not send SYNC command, will send the command that in c->obuf. + * + * Returns the size of the RDB payload to read, or 0 in case an EOF marker is used and the size + * is unknown, also returns 0 in case a PSYNC +CONTINUE was found (no RDB payload). + * + * The out_full_mode parameter if 1 means this is a full sync, if 0 means this is partial mode. */ +unsigned long long sendSync(redisContext *c, int send_sync, char *out_eof, int *out_full_mode) { /* To start we need to send the SYNC command and return the payload. * The hiredis client lib does not understand this part of the protocol * and we don't want to mess with its buffers, so everything is performed @@ -7704,10 +7711,20 @@ unsigned long long sendSync(redisContext *c, char *out_eof) { char buf[4096], *p; ssize_t nread; - /* Send the SYNC command. */ - if (cliWriteConn(c, "SYNC\r\n", 6) != 6) { - fprintf(stderr,"Error writing to master\n"); - exit(1); + if (out_full_mode) *out_full_mode = 1; + + if (send_sync) { + /* Send the SYNC command. */ + if (cliWriteConn(c, "SYNC\r\n", 6) != 6) { + fprintf(stderr,"Error writing to master\n"); + exit(1); + } + } else { + /* We have written the command into c->obuf before. */ + if (cliWriteConn(c, "", 0) != 0) { + fprintf(stderr,"Error writing to master\n"); + exit(1); + } } /* Read $\r\n, making sure to read just up to "\n" */ @@ -7720,12 +7737,41 @@ unsigned long long sendSync(redisContext *c, char *out_eof) { } if (*p == '\n' && p != buf) break; if (*p != '\n') p++; + if (p >= buf + sizeof(buf) - 1) break; /* Go back one more char for null-term. */ } *p = '\0'; if (buf[0] == '-') { fprintf(stderr, "SYNC with master failed: %s\n", buf); exit(1); } + + /* Handling PSYNC responses. + * Read +FULLRESYNC \r\n, after that is the $ or the $EOF:<40 bytes delimiter> + * Read +CONTINUE \r\n or +CONTINUE\r\n, after that is the command stream */ + if (!strncmp(buf, "+FULLRESYNC", 11) || + !strncmp(buf, "+CONTINUE", 9)) + { + int sync_partial = !strncmp(buf, "+CONTINUE", 9); + fprintf(stderr, "PSYNC replied %s\n", buf); + p = buf; + while(1) { + nread = readConn(c,p,1); + if (nread <= 0) { + fprintf(stderr,"Error reading bulk length while PSYNCing\n"); + exit(1); + } + if (*p == '\n' && p != buf) break; + if (*p != '\n') p++; + if (p >= buf + sizeof(buf) - 1) break; /* Go back one more char for null-term. */ + } + *p = '\0'; + + if (sync_partial) { + if (out_full_mode) *out_full_mode = 0; + return 0; + } + } + if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= RDB_EOF_MARK_SIZE) { memcpy(out_eof, buf+5, RDB_EOF_MARK_SIZE); return 0; @@ -7733,33 +7779,39 @@ unsigned long long sendSync(redisContext *c, char *out_eof) { return strtoull(buf+1,NULL,10); } -static void slaveMode(void) { +static void slaveMode(int send_sync) { static char eofmark[RDB_EOF_MARK_SIZE]; static char lastbytes[RDB_EOF_MARK_SIZE]; static int usemark = 0; - unsigned long long payload = sendSync(context,eofmark); + static int out_full_mode; + unsigned long long payload = sendSync(context, send_sync, eofmark, &out_full_mode); char buf[1024]; int original_output = config.output; + char *info = out_full_mode ? "Full resync" : "Partial resync"; - if (payload == 0) { + if (out_full_mode == 1 && payload == 0) { + /* SYNC with EOF marker or PSYNC +FULLRESYNC with EOF marker. */ payload = ULLONG_MAX; memset(lastbytes,0,RDB_EOF_MARK_SIZE); usemark = 1; - fprintf(stderr,"SYNC with master, discarding " - "bytes of bulk transfer until EOF marker...\n"); - } else { - fprintf(stderr,"SYNC with master, discarding %llu " - "bytes of bulk transfer...\n", payload); + fprintf(stderr, "%s with master, discarding " + "bytes of bulk transfer until EOF marker...\n", info); + } else if (out_full_mode == 1 && payload != 0) { + /* SYNC without EOF marker or PSYNC +FULLRESYNC. */ + fprintf(stderr, "%s with master, discarding %llu " + "bytes of bulk transfer...\n", info, payload); + } else if (out_full_mode == 0 && payload == 0) { + /* PSYNC +CONTINUE (no RDB payload). */ + fprintf(stderr, "%s with master...\n", info); } - /* Discard the payload. */ while(payload) { ssize_t nread; nread = readConn(context,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); if (nread <= 0) { - fprintf(stderr,"Error reading RDB payload while SYNCing\n"); + fprintf(stderr,"Error reading RDB payload while %sing\n", info); exit(1); } payload -= nread; @@ -7780,12 +7832,12 @@ static void slaveMode(void) { if (usemark) { unsigned long long offset = ULLONG_MAX - payload; - fprintf(stderr,"SYNC done after %llu bytes. Logging commands from master.\n", offset); + fprintf(stderr,"%s done after %llu bytes. Logging commands from master.\n", info, offset); /* put the slave online */ sleep(1); sendReplconf("ACK", "0"); } else - fprintf(stderr,"SYNC done. Logging commands from master.\n"); + fprintf(stderr,"%s done. Logging commands from master.\n", info); /* Now we can use hiredis to read the incoming protocol. */ config.output = OUTPUT_CSV; @@ -7814,7 +7866,7 @@ static void getRDB(clusterManagerNode *node) { static char eofmark[RDB_EOF_MARK_SIZE]; static char lastbytes[RDB_EOF_MARK_SIZE]; static int usemark = 0; - unsigned long long payload = sendSync(s, eofmark); + unsigned long long payload = sendSync(s, 1, eofmark, NULL); char buf[4096]; if (payload == 0) { @@ -9027,7 +9079,7 @@ int main(int argc, char **argv) { if (cliConnect(0) == REDIS_ERR) exit(1); sendCapa(); sendReplconf("rdb-filter-only", ""); - slaveMode(); + slaveMode(1); } /* Get RDB/functions mode. */