Rdb channel replication (#13732)

This PR is based on:

https://github.com/redis/redis/pull/12109
https://github.com/valkey-io/valkey/pull/60

Closes: https://github.com/redis/redis/issues/11678

**Motivation**

During a full sync, when master is delivering RDB to the replica,
incoming write commands are kept in a replication buffer in order to be
sent to the replica once RDB delivery is completed. If RDB delivery
takes a long time, it might create memory pressure on master. Also, once
a replica connection accumulates replication data which is larger than
output buffer limits, master will kill replica connection. This may
cause a replication failure.

The main benefit of the rdb channel replication is streaming incoming
commands in parallel to the RDB delivery. This approach shifts
replication stream buffering to the replica and reduces load on master.
We do this by opening another connection for RDB delivery. The main
channel on replica will be receiving replication stream while rdb
channel is receiving the RDB.

This feature also helps to reduce master's main process CPU load. By
opening a dedicated connection for the RDB transfer, the bgsave process
has access to the new connection and it will stream RDB directly to the
replicas. Before this change, due to TLS connection restriction, the
bgsave process was writing RDB bytes to a pipe and the main process was
forwarding
it to the replica. This is no longer necessary, the main process can
avoid these expensive socket read/write syscalls. It also means RDB
delivery to replica will be faster as it avoids this step.

In summary, replication will be faster and master's performance during
full syncs will improve.


**Implementation steps**

1. When replica connects to the master, it sends 'rdb-channel-repl' as
part of capability exchange to let master to know replica supports rdb
channel.
2. When replica lacks sufficient data for PSYNC, master sends
+RDBCHANNELSYNC reply with replica's client id. As the next step, the
replica opens a new connection (rdb-channel) and configures it against
the master with the appropriate capabilities and requirements. It also
sends given client id back to master over rdbchannel, so that master can
associate these channels. (initial replica connection will be referred
as main-channel) Then, replica requests fullsync using the RDB channel.
3. Prior to forking, master attaches the replica's main channel to the
replication backlog to deliver replication stream starting at the
snapshot end offset.
4. The master main process sends replication stream via the main
channel, while the bgsave process sends the RDB directly to the replica
via the rdb-channel. Replica accumulates replication stream in a local
buffer, while the RDB is being loaded into the memory.
5. Once the replica completes loading the rdb, it drops the rdb channel
and streams the accumulated replication stream into the db. Sync is
completed.

**Some details**
- Currently, rdbchannel replication is supported only if
`repl-diskless-sync` is enabled on master. Otherwise, replication will
happen over a single connection as in before.
- On replica, there is a limit to replication stream buffering. Replica
uses a new config `replica-full-sync-buffer-limit` to limit number of
bytes to accumulate. If it is not set, replica inherits
`client-output-buffer-limit <replica>` hard limit config. If we reach
this limit, replica stops accumulating. This is not a failure scenario
though. Further accumulation will happen on master side. Depending on
the configured limits on master, master may kill the replica connection.

**API changes in INFO output:**

1. New replica state: `send_bulk_and_stream`. Indicates full sync is
still in progress for this replica. It is receiving replication stream
and rdb in parallel.
```
slave0:ip=127.0.0.1,port=5002,state=send_bulk_and_stream,offset=0,lag=0
```
Replica state changes in steps:
- First, replica sends psync and receives +RDBCHANNELSYNC
:`state=wait_bgsave`
- After replica connects with rdbchannel and delivery starts:
`state=send_bulk_and_stream`
 - After full sync: `state=online`

2. On replica side, replication stream buffering metrics:
- replica_full_sync_buffer_size: Currently accumulated replication
stream data in bytes.
- replica_full_sync_buffer_peak: Peak number of bytes that this instance
accumulated in the lifetime of the process.

```
replica_full_sync_buffer_size:20485             
replica_full_sync_buffer_peak:1048560
```

**API changes in CLIENT LIST**

In `client list` output, rdbchannel clients will have 'C' flag in
addition to 'S' replica flag:
```
id=11 addr=127.0.0.1:39108 laddr=127.0.0.1:5001 fd=14 name= age=5 idle=5 flags=SC db=0 sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=0 qbuf-free=0 argv-mem=0 multi-mem=0 rbs=1024 rbp=0 obl=0 oll=0 omem=0 tot-mem=1920 events=r cmd=psync user=default redir=-1 resp=2 lib-name= lib-ver= io-thread=0
```

**Config changes:**
- `replica-full-sync-buffer-limit`: Controls how much replication data
replica can accumulate during rdbchannel replication. If it is not set,
a value of 0 means replica will inherit `client-output-buffer-limit
<replica>` hard limit config to limit accumulated data.
- `repl-rdb-channel` config is added as a hidden config. This is mostly
for testing as we need to support both rdbchannel replication and the
older single connection replication (to keep compatibility with older
versions and rdbchannel replication will not be enabled if
repl-diskless-sync is not enabled). it affects both the master (not to
respond to rdb channel requests), and the replica (not to declare
capability)

**Internal API changes:**
Changes that were introduced to Redis replication:
- New replication capability is added to replconf command: `capa
rdb-channel-repl`. Indicates replica is capable of rdb channel
replication. Replica sends it when it connects to master along with
other capabilities.
- If replica needs fullsync, master replies `+RDBCHANNELSYNC
<client-id>` to the replica's PSYNC request.
- When replica opens rdbchannel connection, as part of replconf command,
it sends `rdb-channel 1` to let master know this is rdb channel. Also,
it sends `main-ch-client-id <client-id>` as part of replconf command so
master can associate channels.
  
**Testing:**
As rdbchannel replication is enabled by default, we run whole test suite
with it. Though, as we need to support both rdbchannel and single
connection replication, we'll be running some tests twice with
`repl-rdb-channel yes/no` config.

**Replica state diagram**
```
* * Replica state machine *
 *
 * Main channel state
 * ┌───────────────────┐
 * │RECEIVE_PING_REPLY │
 * └────────┬──────────┘
 *          │ +PONG
 * ┌────────▼──────────┐
 * │SEND_HANDSHAKE     │                     RDB channel state
 * └────────┬──────────┘            ┌───────────────────────────────┐
 *          │+OK                ┌───► RDB_CH_SEND_HANDSHAKE         │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_AUTH_REPLY │        │    REPLCONF main-ch-client-id <clientid>
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │ RDB_CH_RECEIVE_AUTH_REPLY     │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_PORT_REPLY │        │                  │ +OK
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │  RDB_CH_RECEIVE_REPLCONF_REPLY│
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_IP_REPLY   │        │                  │ +OK
 * └────────┬──────────┘        │   ┌──────────────▼────────────────┐
 *          │+OK                │   │ RDB_CH_RECEIVE_FULLRESYNC     │
 * ┌────────▼──────────┐        │   └──────────────┬────────────────┘
 * │RECEIVE_CAPA_REPLY │        │                  │+FULLRESYNC
 * └────────┬──────────┘        │                  │Rdb delivery
 *          │                   │   ┌──────────────▼────────────────┐
 * ┌────────▼──────────┐        │   │ RDB_CH_RDB_LOADING            │
 * │SEND_PSYNC         │        │   └──────────────┬────────────────┘
 * └─┬─────────────────┘        │                  │ Done loading
 *   │PSYNC (use cached-master) │                  │
 * ┌─▼─────────────────┐        │                  │
 * │RECEIVE_PSYNC_REPLY│        │    ┌────────────►│ Replica streams replication
 * └─┬─────────────────┘        │    │             │ buffer into memory
 *   │                          │    │             │
 *   │+RDBCHANNELSYNC client-id │    │             │
 *   ├──────┬───────────────────┘    │             │
 *   │      │ Main channel           │             │
 *   │      │ accumulates repl data  │             │
 *   │   ┌──▼────────────────┐       │     ┌───────▼───────────┐
 *   │   │ REPL_TRANSFER     ├───────┘     │    CONNECTED      │
 *   │   └───────────────────┘             └────▲───▲──────────┘
 *   │                                          │   │
 *   │                                          │   │
 *   │  +FULLRESYNC    ┌───────────────────┐    │   │
 *   ├────────────────► REPL_TRANSFER      ├────┘   │
 *   │                 └───────────────────┘        │
 *   │  +CONTINUE                                   │
 *   └──────────────────────────────────────────────┘
 */
 ```
 -----
 This PR also contains changes and ideas from: 
https://github.com/valkey-io/valkey/pull/837
https://github.com/valkey-io/valkey/pull/1173
https://github.com/valkey-io/valkey/pull/804
https://github.com/valkey-io/valkey/pull/945
https://github.com/valkey-io/valkey/pull/989
---------

Co-authored-by: Yuan Wang <wangyuancode@163.com>
Co-authored-by: debing.sun <debing.sun@redis.com>
Co-authored-by: Moti Cohen <moticless@gmail.com>
Co-authored-by: naglera <anagler123@gmail.com>
Co-authored-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Binbin <binloveplay1314@qq.com>
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Co-authored-by: Ping Xie <pingxie@outlook.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com>
Co-authored-by: xbasel <103044017+xbasel@users.noreply.github.com>
This commit is contained in:
Ozan Tezcan 2025-01-13 15:09:52 +03:00 committed by GitHub
parent dc0ee51cb1
commit 73a9b916c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 2204 additions and 212 deletions

View File

@ -727,6 +727,24 @@ repl-disable-tcp-nodelay no
#
# repl-backlog-ttl 3600
# During a fullsync, the master may decide to send both the RDB file and the
# replication stream to the replica in parallel. This approach shifts the
# responsibility of buffering the replication stream to the replica during the
# fullsync process. The replica accumulates the replication stream data until
# the RDB file is fully loaded. Once the RDB delivery is completed and
# successfully loaded, the replica begins processing and applying the
# accumulated replication data to the db. The configuration below controls how
# much replication data the replica can accumulate during a fullsync.
#
# When the replica reaches this limit, it will stop accumulating further data.
# At this point, additional data accumulation may occur on the master side
# depending on the 'client-output-buffer-limit <replica>' config of master.
#
# A value of 0 means replica inherits hard limit of
# 'client-output-buffer-limit <replica>' config to limit accumulation size.
#
# replica-full-sync-buffer-limit 0
# The replica priority is an integer number published by Redis in the INFO
# output. It is used by Redis Sentinel in order to select a replica to promote
# into a master if the master is no longer working correctly.

View File

@ -3,6 +3,9 @@
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
@ -3076,6 +3079,7 @@ standardConfig static_configs[] = {
createBoolConfig("lazyfree-lazy-user-flush", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush , 0, NULL, NULL),
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
createBoolConfig("repl-diskless-sync", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.repl_diskless_sync, 1, NULL, NULL),
createBoolConfig("repl-rdb-channel", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.repl_rdb_channel, 1, NULL, NULL),
createBoolConfig("aof-rewrite-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.aof_rewrite_incremental_fsync, 1, NULL, NULL),
createBoolConfig("no-appendfsync-on-rewrite", NULL, MODIFIABLE_CONFIG, server.aof_no_fsync_on_rewrite, 0, NULL, NULL),
createBoolConfig("cluster-require-full-coverage", NULL, MODIFIABLE_CONFIG, server.cluster_require_full_coverage, 1, NULL, NULL),
@ -3218,6 +3222,7 @@ standardConfig static_configs[] = {
createLongLongConfig("proto-max-bulk-len", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
createLongLongConfig("replica-full-sync-buffer-limit", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.repl_full_sync_buffer_limit, 0, MEMORY_CONFIG, NULL, NULL), /* Default: Inherits 'client-output-buffer-limit <replica>' */
/* Unsigned Long Long configs */
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),

View File

@ -2,6 +2,9 @@
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
@ -483,6 +486,8 @@ void debugCommand(client *c) {
" In case RESET is provided the peak reset time will be restored to the default value",
"REPLYBUFFER RESIZING <0|1>",
" Enable or disable the reply buffer resize cron job",
"REPL-PAUSE <clear|after-fork|before-rdb-channel|on-streaming-repl-buf>",
" Pause the server's main process during various replication steps.",
"DICT-RESIZING <0|1>",
" Enable or disable the main dict and expire dict resizing.",
"SCRIPT <LIST|<sha>>",
@ -1018,6 +1023,20 @@ NULL
return;
}
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "repl-pause") && c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr, "clear")) {
server.repl_debug_pause = REPL_DEBUG_PAUSE_NONE;
} else if (!strcasecmp(c->argv[2]->ptr,"after-fork")) {
server.repl_debug_pause |= REPL_DEBUG_AFTER_FORK;
} else if (!strcasecmp(c->argv[2]->ptr,"before-rdb-channel")) {
server.repl_debug_pause |= REPL_DEBUG_BEFORE_RDB_CHANNEL;
} else if (!strcasecmp(c->argv[2]->ptr, "on-streaming-repl-buf")) {
server.repl_debug_pause |= REPL_DEBUG_ON_STREAMING_REPL_BUF;
} else {
addReplySubcommandSyntaxError(c);
return;
}
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "dict-resizing") && c->argc == 3) {
server.dict_resizing = atoi(c->argv[2]->ptr);
addReply(c, shared.ok);
@ -2583,6 +2602,12 @@ void applyWatchdogPeriod(void) {
}
}
void debugPauseProcess(void) {
serverLog(LL_NOTICE, "Process is about to stop.");
raise(SIGSTOP);
serverLog(LL_NOTICE, "Process has been continued.");
}
/* Positive input is sleep time in microseconds. Negative input is fractions
* of microseconds, i.e. -10 means 100 nanoseconds. */
void debugDelay(int usec) {

View File

@ -188,6 +188,7 @@ client *createClient(connection *conn) {
c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->main_ch_client_id = 0;
c->reply = listCreate();
c->deferred_reply_errors = NULL;
c->reply_bytes = 0;
@ -252,6 +253,7 @@ void putClientInPendingWriteQueue(client *c) {
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_start_cmd_stream_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
@ -1556,7 +1558,16 @@ void unlinkClient(client *c) {
}
}
/* Only use shutdown when the fork is active and we are the parent. */
if (server.child_type) connShutdown(c->conn);
if (server.child_type) {
/* connShutdown() may access TLS state. If this is a rdbchannel
* client, bgsave fork is writing to the connection and TLS state in
* the main process is stale. SSL_shutdown() involves a handshake,
* and it may block the caller when used with stale TLS state.*/
if (c->flags & CLIENT_REPL_RDB_CHANNEL)
shutdown(c->conn->fd, SHUT_RDWR);
else
connShutdown(c->conn);
}
connClose(c->conn);
c->conn = NULL;
}
@ -1725,7 +1736,8 @@ void freeClient(client *c) {
/* Log link disconnection with slave */
if (clientTypeIsSlave(c)) {
serverLog(LL_NOTICE,"Connection with replica %s lost.",
const char *type = c->flags & CLIENT_REPL_RDB_CHANNEL ? " (rdbchannel)" : "";
serverLog(LL_NOTICE,"Connection with replica%s %s lost.", type,
replicationGetSlaveName(c));
}
@ -3086,6 +3098,7 @@ sds catClientInfoString(sds s, client *client) {
if (client->flags & CLIENT_READONLY) *p++ = 'r';
if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
if (client->flags & CLIENT_NO_TOUCH) *p++ = 'T';
if (client->flags & CLIENT_REPL_RDB_CHANNEL) *p++ = 'C';
if (p == flags) *p++ = 'N';
*p++ = '\0';
@ -4309,7 +4322,7 @@ void flushSlavesOutputBuffers(void) {
*
* 3. Obviously if the slave is not ONLINE.
*/
if (slave->replstate == SLAVE_STATE_ONLINE &&
if ((slave->replstate == SLAVE_STATE_ONLINE || slave->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) &&
!(slave->flags & CLIENT_CLOSE_ASAP) &&
can_receive_writes &&
!slave->repl_start_cmd_stream_on_ack &&

137
src/rdb.c
View File

@ -2,8 +2,13 @@
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
#include "server.h"
@ -3810,8 +3815,10 @@ static void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) {
}
if (server.rdb_child_exit_pipe!=-1)
close(server.rdb_child_exit_pipe);
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
close(server.rdb_pipe_read);
if (server.rdb_pipe_read != -1) {
aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
close(server.rdb_pipe_read);
}
server.rdb_child_exit_pipe = -1;
server.rdb_pipe_read = -1;
zfree(server.rdb_pipe_conns);
@ -3875,7 +3882,8 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
listNode *ln;
listIter li;
pid_t childpid;
int pipefds[2], rdb_pipe_write, safe_to_exit_pipe;
int pipefds[2], rdb_pipe_write = 0, safe_to_exit_pipe = 0;
int rdb_channel = (req & SLAVE_REQ_RDB_CHANNEL);
if (hasActiveChildProcess()) return C_ERR;
@ -3883,29 +3891,30 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
* drained the pipe. */
if (server.rdb_pipe_conns) return C_ERR;
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.rdb_pipe_read = pipefds[0]; /* read end */
rdb_pipe_write = pipefds[1]; /* write end */
if (!rdb_channel) {
/* Before to fork, create a pipe that is used to transfer the rdb bytes to
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.rdb_pipe_read = pipefds[0]; /* read end */
rdb_pipe_write = pipefds[1]; /* write end */
/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
if (anetPipe(pipefds, 0, 0) == -1) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
return C_ERR;
/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
if (anetPipe(pipefds, 0, 0) == -1) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
return C_ERR;
}
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
}
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
server.rdb_pipe_conns = zmalloc(sizeof(connection *)*listLength(server.slaves));
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
* the RDB to, which are in WAIT_BGSAVE_START state. */
int numconns = 0;
connection **conns = zmalloc(sizeof(*conns) * listLength(server.slaves));
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
@ -3913,22 +3922,36 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
/* Check slave has the exact requirements */
if (slave->slave_req != req)
continue;
server.rdb_pipe_conns[server.rdb_pipe_numconns++] = slave->conn;
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
conns[numconns++] = slave->conn;
if (rdb_channel) {
/* Put the socket in blocking mode to simplify RDB transfer. */
connSendTimeout(slave->conn, server.repl_timeout * 1000);
connBlock(slave->conn);
}
}
}
if (!rdb_channel) {
server.rdb_pipe_conns = conns;
server.rdb_pipe_numconns = numconns;
server.rdb_pipe_numconns_writing = 0;
}
/* Create the child process. */
if ((childpid = redisFork(CHILD_TYPE_RDB)) == 0) {
/* Child */
int retval, dummy;
rio rdb;
rioInitWithFd(&rdb,rdb_pipe_write);
/* Close the reading part, so that if the parent crashes, the child will
* get a write error and exit. */
close(server.rdb_pipe_read);
if (rdb_channel) {
rioInitWithConnset(&rdb, conns, numconns);
} else {
rioInitWithFd(&rdb,rdb_pipe_write);
/* Close the reading part, so that if the parent crashes, the child
* will get a write error and exit. */
close(server.rdb_pipe_read);
}
redisSetProcTitle("redis-rdb-to-slaves");
redisSetCpuAffinity(server.bgsave_cpulist);
@ -3941,14 +3964,19 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
sendChildCowInfo(CHILD_INFO_TYPE_RDB_COW_SIZE, "RDB");
}
rioFreeFd(&rdb);
/* wake up the reader, tell it we're done. */
close(rdb_pipe_write);
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
/* hold exit until the parent tells us it's safe. we're not expecting
* to read anything, just get the error when the pipe is closed. */
dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
if (rdb_channel) {
rioFreeConnset(&rdb);
} else {
rioFreeFd(&rdb);
/* wake up the reader, tell it we're done. */
close(rdb_pipe_write);
close(server.rdb_child_exit_pipe); /* close write end so that we can detect the close on the parent. */
/* hold exit until the parent tells us it's safe. we're not expecting
* to read anything, just get the error when the pipe is closed. */
dummy = read(safe_to_exit_pipe, pipefds, 1);
UNUSED(dummy);
}
zfree(conns);
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
/* Parent */
@ -3966,24 +3994,33 @@ int rdbSaveToSlavesSockets(int req, rdbSaveInfo *rsi) {
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
}
}
close(rdb_pipe_write);
close(server.rdb_pipe_read);
close(server.rdb_child_exit_pipe);
zfree(server.rdb_pipe_conns);
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
if (!rdb_channel) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
close(server.rdb_child_exit_pipe);
zfree(server.rdb_pipe_conns);
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
}
} else {
serverLog(LL_NOTICE,"Background RDB transfer started by pid %ld",
(long) childpid);
serverLog(LL_NOTICE, "Background RDB transfer started by pid %ld to %s", (long)childpid,
rdb_channel ? "replica socket" : "parent process pipe");
server.rdb_save_time_start = time(NULL);
server.rdb_child_type = RDB_CHILD_TYPE_SOCKET;
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
if (!rdb_channel) {
close(rdb_pipe_write); /* close write in parent so that it can detect the close on the child. */
if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
}
}
}
close(safe_to_exit_pipe);
if (rdb_channel)
zfree(conns);
else
close(safe_to_exit_pipe);
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */

File diff suppressed because it is too large Load Diff

174
src/rio.c
View File

@ -1,3 +1,16 @@
/*
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
/* rio.c is a simple stream-oriented I/O abstraction that provides an interface
* to write code that can consume/produce data using different concrete input
* and output devices. For instance the same rdb.c code using the rio
@ -14,34 +27,6 @@
* for the current checksum.
*
* ----------------------------------------------------------------------------
*
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
* Copyright (c) 2009-current, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
@ -429,6 +414,139 @@ void rioFreeFd(rio *r) {
sdsfree(r->io.fd.buf);
}
/* ------------------- Connection set implementation ------------------
* This target is used to write the RDB file to a set of replica connections as
* part of rdb channel replication. */
/* Returns 1 for success, 0 for failure.
* The function returns success as long as we are able to correctly write
* to at least one file descriptor.
*
* When buf is NULL or len is 0, the function performs a flush operation if
* there is some pending buffer, so this function is also used in order to
* implement rioConnsetFlush(). */
static size_t rioConnsetWrite(rio *r, const void *buf, size_t len) {
const size_t pre_flush_size = 256 * 1024;
unsigned char *p = (unsigned char*) buf;
size_t buflen = len;
size_t failed = 0; /* number of connections that write() returned error. */
/* For small writes, we rather keep the data in user-space buffer, and flush
* it only when it grows. however for larger writes, we prefer to flush
* any pre-existing buffer, and write the new one directly without reallocs
* and memory copying. */
if (len > pre_flush_size) {
rioConnsetWrite(r, NULL, 0);
} else {
if (buf && len) {
r->io.connset.buf = sdscatlen(r->io.connset.buf, buf, len);
if (sdslen(r->io.connset.buf) <= PROTO_IOBUF_LEN)
return 1;
}
p = (unsigned char *)r->io.connset.buf;
buflen = sdslen(r->io.connset.buf);
}
while (buflen > 0) {
/* Write in little chunks so that when there are big writes we
* parallelize while the kernel is sending data in background to the
* TCP socket. */
size_t limit = PROTO_IOBUF_LEN * 2;
size_t count = buflen < limit ? buflen : limit;
for (size_t i = 0; i < r->io.connset.n_dst; i++) {
size_t n_written = 0;
if (r->io.connset.dst[i].failed != 0) {
failed++;
continue; /* Skip failed connections. */
}
do {
ssize_t ret;
connection *c = r->io.connset.dst[i].conn;
ret = connWrite(c, p + n_written, count - n_written);
if (ret <= 0) {
if (errno == 0)
errno = EIO;
/* With blocking sockets, which is the sole user of this
* rio target, EWOULDBLOCK is returned only because of
* the SO_SNDTIMEO socket option, so we translate the error
* into one more recognizable by the user. */
if (ret == -1 && errno == EWOULDBLOCK)
errno = ETIMEDOUT;
r->io.connset.dst[i].failed = 1;
break;
}
n_written += ret;
} while (n_written != count);
}
if (failed == r->io.connset.n_dst)
return 0; /* All the connections have failed. */
p += count;
buflen -= count;
r->io.connset.pos += count;
}
sdsclear(r->io.connset.buf);
return 1;
}
/* Returns 1 or 0 for success/failure. */
static size_t rioConnsetRead(rio *r, void *buf, size_t len) {
UNUSED(r);
UNUSED(buf);
UNUSED(len);
return 0; /* Error, this target does not support reading. */
}
/* Returns the number of sent bytes. */
static off_t rioConnsetTell(rio *r) {
return r->io.connset.pos;
}
/* Flushes any buffer to target device if applicable. Returns 1 on success
* and 0 on failures. */
static int rioConnsetFlush(rio *r) {
/* Our flush is implemented by the write method, that recognizes a
* buffer set to NULL with a count of zero as a flush request. */
return rioConnsetWrite(r, NULL, 0);
}
static const rio rioConnsetIO = {
rioConnsetRead,
rioConnsetWrite,
rioConnsetTell,
rioConnsetFlush,
NULL, /* update_checksum */
0, /* current checksum */
0, /* flags */
0, /* bytes read or written */
0, /* read/write chunk size */
{ { NULL, 0 } } /* union for io-specific vars */
};
void rioInitWithConnset(rio *r, connection **conns, size_t n_conns) {
*r = rioConnsetIO;
r->io.connset.dst = zcalloc(sizeof(*r->io.connset.dst) * n_conns);
r->io.connset.n_dst = n_conns;
r->io.connset.pos = 0;
r->io.connset.buf = sdsempty();
for (size_t i = 0; i < n_conns; i++)
r->io.connset.dst[i].conn = conns[i];
}
/* release the rio stream. */
void rioFreeConnset(rio *r) {
zfree(r->io.connset.dst);
sdsfree(r->io.connset.buf);
}
/* ---------------------------- Generic functions ---------------------------- */
/* This function can be installed both in memory and file streams when checksum

View File

@ -1,31 +1,14 @@
/*
* Copyright (c) 2009-2012, Pieter Noordhuis <pcnoordhuis at gmail dot com>
* Copyright (c) 2009-current, Redis Ltd.
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* Copyright (c) 2024-present, Valkey contributors.
* All rights reserved.
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
*/
@ -39,6 +22,7 @@
#define RIO_FLAG_READ_ERROR (1<<0)
#define RIO_FLAG_WRITE_ERROR (1<<1)
#define RIO_FLAG_ABORT (1<<2)
#define RIO_TYPE_FILE (1<<0)
#define RIO_TYPE_BUFFER (1<<1)
@ -97,6 +81,17 @@ struct _rio {
off_t pos;
sds buf;
} fd;
/* Multiple connections target (used to write to N sockets). */
struct {
struct {
connection *conn; /* Connection */
int failed; /* If write failed on this connection. */
} *dst;
size_t n_dst; /* Number of connections */
off_t pos; /* Number of sent bytes */
sds buf;
} connset;
} io;
};
@ -107,7 +102,7 @@ typedef struct _rio rio;
* if needed. */
static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
if (r->flags & RIO_FLAG_WRITE_ERROR) return 0;
if (r->flags & (RIO_FLAG_WRITE_ERROR | RIO_FLAG_ABORT)) return 0;
while (len) {
size_t bytes_to_write = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->update_cksum) r->update_cksum(r,buf,bytes_to_write);
@ -123,7 +118,7 @@ static inline size_t rioWrite(rio *r, const void *buf, size_t len) {
}
static inline size_t rioRead(rio *r, void *buf, size_t len) {
if (r->flags & RIO_FLAG_READ_ERROR) return 0;
if (r->flags & (RIO_FLAG_READ_ERROR | RIO_FLAG_ABORT)) return 0;
while (len) {
size_t bytes_to_read = (r->max_processing_chunk && r->max_processing_chunk < len) ? r->max_processing_chunk : len;
if (r->read(r,buf,bytes_to_read) == 0) {
@ -146,6 +141,10 @@ static inline int rioFlush(rio *r) {
return r->flush(r);
}
static inline void rioAbort(rio *r) {
r->flags |= RIO_FLAG_ABORT;
}
/* This function allows to know if there was a read error in any past
* operation, since the rio stream was created or since the last call
* to rioClearError(). */
@ -159,16 +158,18 @@ static inline int rioGetWriteError(rio *r) {
}
static inline void rioClearErrors(rio *r) {
r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR);
r->flags &= ~(RIO_FLAG_READ_ERROR|RIO_FLAG_WRITE_ERROR|RIO_FLAG_ABORT);
}
void rioInitWithFile(rio *r, FILE *fp);
void rioInitWithBuffer(rio *r, sds s);
void rioInitWithConn(rio *r, connection *conn, size_t read_limit);
void rioInitWithFd(rio *r, int fd);
void rioInitWithConnset(rio *r, connection **conns, size_t n_conns);
void rioFreeFd(rio *r);
void rioFreeConn(rio *r, sds* out_remainingBufferedData);
void rioFreeConnset(rio *r);
size_t rioWriteBulkCount(rio *r, char prefix, long count);
size_t rioWriteBulkString(rio *r, const char *buf, size_t len);

View File

@ -1444,7 +1444,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
serverLog(LL_DEBUG,
"%lu clients connected (%lu replicas), %zu bytes in use",
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
replicationLogicalReplicaCount(),
zmalloc_used_memory());
}
}
@ -2183,6 +2183,8 @@ void initServerConfig(void) {
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_rdb_ch_state = REPL_RDB_CH_STATE_NONE;
server.repl_full_sync_buffer = (struct replDataBuf) {0};
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_transfer_s = NULL;
@ -2679,6 +2681,8 @@ void initServer(void) {
server.hz = server.config_hz;
server.pid = getpid();
server.in_fork_child = CHILD_TYPE_NONE;
server.rdb_pipe_read = -1;
server.rdb_child_exit_pipe = -1;
server.main_thread_id = pthread_self();
server.current_client = NULL;
server.errors = raxNew();
@ -5458,7 +5462,10 @@ const char *replstateToString(int replstate) {
switch (replstate) {
case SLAVE_STATE_WAIT_BGSAVE_START:
case SLAVE_STATE_WAIT_BGSAVE_END:
case SLAVE_STATE_WAIT_RDB_CHANNEL:
return "wait_bgsave";
case SLAVE_STATE_SEND_BULK_AND_STREAM:
return "send_bulk_and_stream";
case SLAVE_STATE_SEND_BULK:
return "send_bulk";
case SLAVE_STATE_ONLINE:
@ -6056,7 +6063,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"master_last_io_seconds_ago:%d\r\n", server.master ? ((int)(server.unixtime-server.master->lastinteraction)) : -1,
"master_sync_in_progress:%d\r\n", server.repl_state == REPL_STATE_TRANSFER,
"slave_read_repl_offset:%lld\r\n", slave_read_repl_offset,
"slave_repl_offset:%lld\r\n", slave_repl_offset));
"slave_repl_offset:%lld\r\n", slave_repl_offset,
"replica_full_sync_buffer_size:%zu\r\n", server.repl_full_sync_buffer.size,
"replica_full_sync_buffer_peak:%zu\r\n", server.repl_full_sync_buffer.peak));
if (server.repl_state == REPL_STATE_TRANSFER) {
double perc = 0;
@ -6085,7 +6094,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
info = sdscatprintf(info,
"connected_slaves:%lu\r\n",
listLength(server.slaves));
replicationLogicalReplicaCount());
/* If min-slaves-to-write is active, write the number of slaves
* currently considered 'good'. */
@ -6108,6 +6117,14 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
int port;
long lag = 0;
/* During rdbchannel replication, replica opens two connections.
* These are distinct slaves in server.slaves list from master
* POV. We don't want to list these separately. If a rdbchannel
* replica has an associated main-channel replica in
* server.slaves list, we'll list main channel replica only. */
if (replicationCheckHasMainChannel(slave))
continue;
if (!slaveip) {
if (connAddrPeerName(slave->conn,ip,sizeof(ip),&port) == -1)
continue;

View File

@ -92,6 +92,7 @@ struct hdr_histogram;
/* Error codes */
#define C_OK 0
#define C_ERR -1
#define C_RETRY -2
/* Static server configuration */
#define CONFIG_DEFAULT_HZ 10 /* Time interrupt calls/sec. */
@ -394,6 +395,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
#define CLIENT_REPL_RDB_CHANNEL (1ULL<<51) /* Client which is used for rdb delivery as part of rdb channel replication */
/* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */
#define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE)
@ -473,6 +475,24 @@ typedef enum {
REPL_STATE_CONNECTED, /* Connected to master */
} repl_state;
/* Replica rdb channel replication state. Used in server.repl_rdb_ch_state for
* replicas to remember what to do next. */
typedef enum {
REPL_RDB_CH_STATE_CLOSE_ASAP = -1, /* Async error state */
REPL_RDB_CH_STATE_NONE = 0, /* No active rdb channel sync */
REPL_RDB_CH_SEND_HANDSHAKE, /* Send handshake sequence to master */
REPL_RDB_CH_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */
REPL_RDB_CH_RECEIVE_REPLCONF_REPLY, /* Wait for REPLCONF reply */
REPL_RDB_CH_RECEIVE_FULLRESYNC, /* Wait for +FULLRESYNC reply */
REPL_RDB_CH_RDB_LOADING, /* Loading rdb using rdb channel */
} repl_rdb_channel_state;
/* Replication debug flags for testing. */
#define REPL_DEBUG_PAUSE_NONE (1 << 0)
#define REPL_DEBUG_AFTER_FORK (1 << 1)
#define REPL_DEBUG_BEFORE_RDB_CHANNEL (1 << 2)
#define REPL_DEBUG_ON_STREAMING_REPL_BUF (1 << 3)
/* The state of an in progress coordinated failover */
typedef enum {
NO_FAILOVER = 0, /* No failover in progress */
@ -491,16 +511,22 @@ typedef enum {
#define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
#define SLAVE_STATE_RDB_TRANSMITTED 10 /* RDB file transmitted - This state is used only for
* a replica that only wants RDB without replication buffer */
#define SLAVE_STATE_WAIT_RDB_CHANNEL 11 /* Main channel of replica is connected,
* we are waiting rdbchannel connection to start delivery.*/
#define SLAVE_STATE_SEND_BULK_AND_STREAM 12 /* Main channel of a replica which uses rdb channel replication.
* Sending RDB file and replication stream in parallel. */
/* Slave capabilities. */
#define SLAVE_CAPA_NONE 0
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
#define SLAVE_CAPA_NONE 0
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
#define SLAVE_CAPA_RDB_CHANNEL_REPL (1<<2) /* Supports rdb channel replication during full sync */
/* Slave requirements */
#define SLAVE_REQ_NONE 0
#define SLAVE_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */
#define SLAVE_REQ_NONE 0
#define SLAVE_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */
#define SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS (1 << 1) /* Exclude functions from RDB */
#define SLAVE_REQ_RDB_CHANNEL (1 << 2) /* Use rdb channel replication */
/* Mask of all bits in the slave requirements bitfield that represent non-standard (filtered) RDB requirements */
#define SLAVE_REQ_RDB_MASK (SLAVE_REQ_RDB_EXCLUDE_DATA | SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS)
@ -1162,6 +1188,23 @@ typedef struct replBacklog {
* byte in the replication backlog buffer.*/
} replBacklog;
/* Used by replDataBuf during rdb channel replication to accumulate replication
* stream on replica side. */
typedef struct replDataBufBlock {
size_t used; /* Used bytes in the buf */
size_t size; /* Size of the buf */
char buf[]; /* Replication data */
} replDataBufBlock;
/* Linked list of replDataBufBlock structs, holds replication stream during
* rdb channel replication on replica side. */
typedef struct replDataBuf {
list *blocks; /* List of replDataBufBlock */
size_t size; /* Total number of bytes available in all blocks. */
size_t used; /* Total number of bytes actually used in all blocks. */
size_t peak; /* Peak number of bytes stored in all blocks. */
} replDataBuf;
typedef struct {
list *clients;
size_t mem_usage_sum;
@ -1258,6 +1301,7 @@ typedef struct client {
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
uint64_t main_ch_client_id; /* The client id of this replica's main channel */
multiState mstate; /* MULTI/EXEC state */
blockingState bstate; /* blocking state */
long long woff; /* Last write global replication offset. */
@ -1936,6 +1980,8 @@ struct redisServer {
int repl_ping_slave_period; /* Master pings the slave every N seconds */
replBacklog *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
long long repl_full_sync_buffer_limit; /* Accumulated repl data limit during rdb channel replication */
replDataBuf repl_full_sync_buffer; /* Accumulated replication data for rdb channel replication */
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
@ -1949,6 +1995,9 @@ struct redisServer {
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
int repl_diskless_sync_max_replicas;/* Max replicas for diskless repl BGSAVE
* delay (start sooner if they all connect). */
int repl_rdb_channel; /* Config used to determine if the replica should
* use rdb channel replication for full syncs. */
int repl_debug_pause; /* Debug config to force the main process to pause. */
size_t repl_buffer_mem; /* The memory of replication buffer. */
list *repl_buffer_blocks; /* Replication buffers blocks list
* (serving replica clients and repl backlog) */
@ -1962,10 +2011,13 @@ struct redisServer {
client *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
int repl_state; /* Replication status if the instance is a slave */
int repl_rdb_ch_state; /* State of the replica's rdb channel during rdb channel replication */
uint64_t repl_main_ch_client_id; /* Main channel client id received in +RDBCHANNELSYNC reply. */
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
connection *repl_transfer_s; /* Slave -> Master SYNC connection */
connection *repl_rdb_transfer_s; /* Slave -> Master FULL SYNC connection (RDB download) */
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
@ -2948,6 +3000,8 @@ void clearFailoverState(void);
void updateFailoverStatus(void);
void abortFailover(const char *err);
const char *getFailoverStateString(void);
int replicationCheckHasMainChannel(client *slave);
unsigned long replicationLogicalReplicaCount(void);
/* Generic persistence functions */
void startLoadingFile(size_t size, char* filename, int rdbflags);
@ -3972,6 +4026,7 @@ void killThreads(void);
void makeThreadKillable(void);
void swapMainDbWithTempDb(redisDb *tempDb);
sds getVersion(void);
void debugPauseProcess(void);
/* Use macro for checking log level to avoid evaluating arguments in cases log
* should be ignored due to low level. */

View File

@ -46,7 +46,7 @@ test "Resharding all the master #0 slots away from it" {
}
test "Master #0 who lost all slots should turn into a replica without replicas" {
wait_for_condition 1000 50 {
wait_for_condition 2000 50 {
[RI 0 role] == "slave" && [RI 0 connected_slaves] == 0
} else {
puts [R 0 info replication]

View File

@ -1,18 +1,37 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
source tests/support/redis.tcl
set ::tlsdir "tests/tls"
proc gen_write_load {host port seconds tls} {
# Continuously sends SET commands to the server. If key is omitted, a random key
# is used for every SET command. The value is always random.
proc gen_write_load {host port seconds tls {key ""}} {
set start_time [clock seconds]
set r [redis $host $port 1 $tls]
$r client setname LOAD_HANDLER
$r select 9
while 1 {
$r set [expr rand()] [expr rand()]
if {$key == ""} {
$r set [expr rand()] [expr rand()]
} else {
$r set $key [expr rand()]
}
if {[clock seconds]-$start_time > $seconds} {
exit 0
}
}
}
gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3]
gen_write_load [lindex $argv 0] [lindex $argv 1] [lindex $argv 2] [lindex $argv 3] [lindex $argv 4]

View File

@ -1,6 +1,20 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
# This test group aims to test that all replicas share one global replication buffer,
# two replicas don't make replication buffer size double, and when there is no replica,
# replica buffer will shrink.
foreach rdbchannel {"yes" "no"} {
start_server {tags {"repl external:skip"}} {
start_server {} {
start_server {} {
@ -9,6 +23,10 @@ start_server {} {
set replica2 [srv -2 client]
set replica3 [srv -1 client]
$replica1 config set repl-rdb-channel $rdbchannel
$replica2 config set repl-rdb-channel $rdbchannel
$replica3 config set repl-rdb-channel $rdbchannel
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
@ -18,6 +36,7 @@ start_server {} {
$master config set repl-diskless-sync-delay 5
$master config set repl-diskless-sync-max-replicas 1
$master config set client-output-buffer-limit "replica 0 0 0"
$master config set repl-rdb-channel $rdbchannel
# Make sure replica3 is synchronized with master
$replica3 replicaof $master_host $master_port
@ -39,7 +58,7 @@ start_server {} {
fail "fail to sync with replicas"
}
test {All replicas share one global replication buffer} {
test "All replicas share one global replication buffer rdbchannel=$rdbchannel" {
set before_used [s used_memory]
populate 1024 "" 1024 ; # Write extra 1M data
# New data uses 1M memory, but all replicas use only one
@ -47,7 +66,13 @@ start_server {} {
# more than double of replication buffer.
set repl_buf_mem [s mem_total_replication_buffers]
set extra_mem [expr {[s used_memory]-$before_used-1024*1024}]
assert {$extra_mem < 2*$repl_buf_mem}
if {$rdbchannel == "yes"} {
# master's replication buffers should not grow
assert {$extra_mem < 1024*1024}
assert {$repl_buf_mem < 1024*1024}
} else {
assert {$extra_mem < 2*$repl_buf_mem}
}
# Kill replica1, replication_buffer will not become smaller
catch {$replica1 shutdown nosave}
@ -59,7 +84,7 @@ start_server {} {
assert_equal $repl_buf_mem [s mem_total_replication_buffers]
}
test {Replication buffer will become smaller when no replica uses} {
test "Replication buffer will become smaller when no replica uses rdbchannel=$rdbchannel" {
# Make sure replica3 catch up with the master
wait_for_ofs_sync $master $replica3
@ -71,12 +96,18 @@ start_server {} {
} else {
fail "replica2 doesn't disconnect with master"
}
assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]}
if {$rdbchannel == "yes"} {
# master's replication buffers should not grow
assert {1024*512 > [s mem_total_replication_buffers]}
} else {
assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]}
}
}
}
}
}
}
}
# This test group aims to test replication backlog size can outgrow the backlog
# limit config if there is a slow replica which keep massive replication buffers,
@ -84,6 +115,7 @@ start_server {} {
# partial re-synchronization. Of course, replication backlog memory also can
# become smaller when master disconnects with slow replicas since output buffer
# limit is reached.
foreach rdbchannel {"yes" "no"} {
start_server {tags {"repl external:skip"}} {
start_server {} {
start_server {} {
@ -98,6 +130,7 @@ start_server {} {
$master config set save ""
$master config set repl-backlog-size 16384
$master config set repl-rdb-channel $rdbchannel
$master config set client-output-buffer-limit "replica 0 0 0"
# Executing 'debug digest' on master which has many keys costs much time
@ -105,12 +138,16 @@ start_server {} {
# with master.
$master config set repl-timeout 1000
$replica1 config set repl-timeout 1000
$replica1 config set repl-rdb-channel $rdbchannel
$replica1 config set client-output-buffer-limit "replica 1024 0 0"
$replica2 config set repl-timeout 1000
$replica2 config set client-output-buffer-limit "replica 1024 0 0"
$replica2 config set repl-rdb-channel $rdbchannel
$replica1 replicaof $master_host $master_port
wait_for_sync $replica1
test {Replication backlog size can outgrow the backlog limit config} {
test "Replication backlog size can outgrow the backlog limit config rdbchannel=$rdbchannel" {
# Generating RDB will take 1000 seconds
$master config set rdb-key-save-delay 1000000
populate 1000 master 10000
@ -124,7 +161,7 @@ start_server {} {
}
# Replication actual backlog grow more than backlog setting since
# the slow replica2 kept replication buffer.
populate 10000 master 10000
populate 20000 master 10000
assert {[s repl_backlog_histlen] > [expr 10000*10000]}
}
@ -135,7 +172,7 @@ start_server {} {
fail "Replica offset didn't catch up with the master after too long time"
}
test {Replica could use replication buffer (beyond backlog config) for partial resynchronization} {
test "Replica could use replication buffer (beyond backlog config) for partial resynchronization rdbchannel=$rdbchannel" {
# replica1 disconnects with master
$replica1 replicaof [srv -1 host] [srv -1 port]
# Write a mass of data that exceeds repl-backlog-size
@ -155,7 +192,7 @@ start_server {} {
assert_equal [$master debug digest] [$replica1 debug digest]
}
test {Replication backlog memory will become smaller if disconnecting with replica} {
test "Replication backlog memory will become smaller if disconnecting with replica rdbchannel=$rdbchannel" {
assert {[s repl_backlog_histlen] > [expr 2*10000*10000]}
assert_equal [s connected_slaves] {2}
@ -165,8 +202,11 @@ start_server {} {
r set key [string repeat A [expr 64*1024]]
# master will close replica2's connection since replica2's output
# buffer limit is reached, so there only is replica1.
# In case of rdbchannel=yes, main channel will be disconnected only.
wait_for_condition 100 100 {
[s connected_slaves] eq {1}
[s connected_slaves] eq {1} ||
([s connected_slaves] eq {2} &&
[string match {*slave*state=wait_bgsave*} [$master info]])
} else {
fail "master didn't disconnect with replica2"
}
@ -185,15 +225,19 @@ start_server {} {
}
}
}
}
test {Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size} {
foreach rdbchannel {"yes" "no"} {
test "Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size rdbchannel=$rdbchannel" {
start_server {tags {"repl external:skip"}} {
start_server {} {
r config set save ""
r config set repl-backlog-size 100mb
r config set client-output-buffer-limit "replica 512k 0 0"
r config set repl-rdb-channel $rdbchannel
set replica [srv -1 client]
$replica config set repl-rdb-channel $rdbchannel
$replica replicaof [srv 0 host] [srv 0 port]
wait_for_sync $replica
@ -231,7 +275,7 @@ test {Partial resynchronization is successful even client-output-buffer-limit is
}
# This test was added to make sure big keys added to the backlog do not trigger psync loop.
test {Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending} {
test "Replica client-output-buffer size is limited to backlog_limit/16 when no replication data is pending rdbchannel=$rdbchannel" {
proc client_field {r type f} {
set client [$r client list type $type]
if {![regexp $f=(\[a-zA-Z0-9-\]+) $client - res]} {
@ -252,6 +296,8 @@ test {Replica client-output-buffer size is limited to backlog_limit/16 when no r
$master config set repl-backlog-size 16384
$master config set client-output-buffer-limit "replica 32768 32768 60"
$master config set repl-rdb-channel $rdbchannel
$replica config set repl-rdb-channel $rdbchannel
# Key has has to be larger than replica client-output-buffer limit.
set keysize [expr 256*1024]
@ -304,4 +350,5 @@ test {Replica client-output-buffer size is limited to backlog_limit/16 when no r
}
}
}
}

View File

@ -1,3 +1,16 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
# Creates a master-slave pair and breaks the link continuously to force
# partial resyncs attempts, all this while flooding the master with
# write queries.
@ -8,7 +21,7 @@
# If reconnect is > 0, the test actually try to break the connection and
# reconnect with the master, otherwise just the initial synchronization is
# checked for consistency.
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect rdbchannel} {
start_server {tags {"repl"} overrides {save {}}} {
start_server {overrides {save {}}} {
@ -21,7 +34,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
$master config set repl-backlog-ttl $backlog_ttl
$master config set repl-diskless-sync $mdl
$master config set repl-diskless-sync-delay 1
$master config set repl-rdb-channel $rdbchannel
$slave config set repl-diskless-load $sdl
$slave config set repl-rdb-channel $rdbchannel
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000]
@ -46,7 +61,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
}
}
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" {
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect, rdbchannel: $rdbchannel)" {
# Now while the clients are writing data, break the maste-slave
# link multiple times.
if ($reconnect) {
@ -120,24 +135,31 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
tags {"external:skip"} {
foreach mdl {no yes} {
foreach sdl {disabled swapdb} {
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
} $mdl $sdl 0
foreach rdbchannel {yes no} {
if {$rdbchannel == "yes" && $mdl == "no"} {
# rdbchannel replication requires repl-diskless-sync enabled
continue
}
test_psync {ok psync} 6 100000000 3600 0 {
assert {[s -1 sync_partial_ok] > 0}
} $mdl $sdl 1
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
} $mdl $sdl 0 $rdbchannel
test_psync {no backlog} 6 100 3600 0.5 {
assert {[s -1 sync_partial_err] > 0}
} $mdl $sdl 1
test_psync {ok psync} 6 100000000 3600 0 {
assert {[s -1 sync_partial_ok] > 0}
} $mdl $sdl 1 $rdbchannel
test_psync {ok after delay} 3 100000000 3600 3 {
assert {[s -1 sync_partial_ok] > 0}
} $mdl $sdl 1
test_psync {no backlog} 6 100 3600 0.5 {
assert {[s -1 sync_partial_err] > 0}
} $mdl $sdl 1 $rdbchannel
test_psync {backlog expired} 3 100000000 1 3 {
assert {[s -1 sync_partial_err] > 0}
} $mdl $sdl 1
test_psync {ok after delay} 3 100000000 3600 3 {
assert {[s -1 sync_partial_ok] > 0}
} $mdl $sdl 1 $rdbchannel
test_psync {backlog expired} 3 100000000 1 3 {
assert {[s -1 sync_partial_err] > 0}
} $mdl $sdl 1 $rdbchannel
}
}
}
}

View File

@ -0,0 +1,795 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
# Returns either main or rdbchannel client id
# Assumes there is one replica with two channels
proc get_replica_client_id {master rdbchannel} {
set input [$master client list type replica]
foreach line [split $input "\n"] {
if {[regexp {id=(\d+).*flags=(\S+)} $line match id flags]} {
if {$rdbchannel == "yes"} {
# rdbchannel will have C flag
if {[string match *C* $flags]} {
return $id
}
} else {
return $id
}
}
}
error "Replica not found"
}
start_server {tags {"repl external:skip"}} {
set replica1 [srv 0 client]
start_server {} {
set replica2 [srv 0 client]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set repl-diskless-sync yes
$master config set repl-rdb-channel yes
populate 1000 master 10
test "Test replication with multiple replicas (rdbchannel enabled on both)" {
$replica1 config set repl-rdb-channel yes
$replica1 replicaof $master_host $master_port
$replica2 config set repl-rdb-channel yes
$replica2 replicaof $master_host $master_port
wait_replica_online $master 0
wait_replica_online $master 1
$master set x 1
# Wait until replicas catch master
wait_for_ofs_sync $master $replica1
wait_for_ofs_sync $master $replica2
# Verify db's are identical
assert_morethan [$master dbsize] 0
assert_equal [$master get x] 1
assert_equal [$master debug digest] [$replica1 debug digest]
assert_equal [$master debug digest] [$replica2 debug digest]
}
test "Test replication with multiple replicas (rdbchannel enabled on one of them)" {
# Allow both replicas to ask for sync
$master config set repl-diskless-sync-delay 5
$replica1 replicaof no one
$replica2 replicaof no one
$replica1 config set repl-rdb-channel yes
$replica2 config set repl-rdb-channel no
set prev_forks [s 0 total_forks]
$master set x 2
# There will be two forks subsequently, one for rdbchannel
# replica another for the replica without rdbchannel config.
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
set res [wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets (rdb-channel)*"} 0 2000 10]
set loglines [lindex $res 1]
wait_for_log_messages 0 {"*Starting BGSAVE* replicas sockets*"} $loglines 2000 10
wait_replica_online $master 0 100 100
wait_replica_online $master 1 100 100
# Verify two new forks.
assert_equal [s 0 total_forks] [expr $prev_forks + 2]
wait_for_ofs_sync $master $replica1
wait_for_ofs_sync $master $replica2
# Verify db's are identical
assert_equal [$replica1 get x] 2
assert_equal [$replica2 get x] 2
assert_equal [$master debug digest] [$replica1 debug digest]
assert_equal [$master debug digest] [$replica2 debug digest]
}
test "Test rdbchannel is not used if repl-diskless-sync config is disabled on master" {
$replica1 replicaof no one
$replica2 replicaof no one
$master config set repl-diskless-sync-delay 0
$master config set repl-diskless-sync no
$master set x 3
$replica1 replicaof $master_host $master_port
# Verify log message does not mention rdbchannel
wait_for_log_messages 0 {"*Starting BGSAVE for SYNC with target: disk*"} 0 2000 1
wait_replica_online $master 0
wait_for_ofs_sync $master $replica1
# Verify db's are identical
assert_equal [$replica1 get x] 3
assert_equal [$master debug digest] [$replica1 debug digest]
}
}
}
}
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
set replica_pid [srv 0 pid]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set repl-rdb-channel yes
$replica config set repl-rdb-channel yes
# Reuse this test to verify large key delivery
$master config set rdbcompression no
$master config set rdb-key-save-delay 3000
populate 1000 prefix1 10
populate 5 prefix2 3000000
populate 5 prefix3 2000000
populate 5 prefix4 1000000
# On master info output, we should see state transition in this order:
# 1. wait_bgsave: Replica receives psync error (+RDBCHANNELSYNC)
# 2. send_bulk_and_stream: Replica opens rdbchannel and delivery started
# 3. online: Sync is completed
test "Test replica state should start with wait_bgsave" {
$replica config set key-load-delay 100000
# Pause replica before opening rdb channel conn
$replica debug repl-pause before-rdb-channel
$replica replicaof $master_host $master_port
wait_for_condition 50 200 {
[s 0 connected_slaves] == 1 &&
[string match "*wait_bgsave*" [s 0 slave0]]
} else {
fail "replica failed"
}
}
test "Test replica state advances to send_bulk_and_stream when rdbchannel connects" {
$master set x 1
resume_process $replica_pid
wait_for_condition 50 200 {
[s 0 connected_slaves] == 1 &&
[s 0 rdb_bgsave_in_progress] == 1 &&
[string match "*send_bulk_and_stream*" [s 0 slave0]]
} else {
fail "replica failed"
}
}
test "Test replica rdbchannel client has SC flag on client list output" {
set input [$master client list type replica]
# There will two replicas, second one should be rdbchannel
set trimmed_input [string trimright $input]
set lines [split $trimmed_input "\n"]
if {[llength $lines] < 2} {
error "There is no second line in the input: $input"
}
set second_line [lindex $lines 1]
# Check if 'flags=SC' exists in the second line
if {![regexp {flags=SC} $second_line]} {
error "Flags are not 'SC' in the second line: $second_line"
}
}
test "Test replica state advances to online when fullsync is completed" {
# Speed up loading
$replica config set key-load-delay 0
wait_replica_online $master 0 100 1000
wait_for_ofs_sync $master $replica
wait_for_condition 50 200 {
[s 0 rdb_bgsave_in_progress] == 0 &&
[s 0 connected_slaves] == 1 &&
[string match "*online*" [s 0 slave0]]
} else {
fail "replica failed"
}
wait_replica_online $master 0 100 1000
wait_for_ofs_sync $master $replica
# Verify db's are identical
assert_morethan [$master dbsize] 0
assert_equal [$master debug digest] [$replica debug digest]
}
}
}
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set repl-rdb-channel yes
$replica config set repl-rdb-channel yes
test "Test master memory does not increase during replication" {
# Put some delay to rdb generation. If master doesn't forward
# incoming traffic to replica, master's replication buffer will grow
$master config set rdb-key-save-delay 200
$master config set repl-backlog-size 5mb
populate 10000 master 10000
# Start write traffic
set load_handle [start_write_load $master_host $master_port 100 "key1"]
set prev_used [s 0 used_memory]
$replica replicaof $master_host $master_port
set backlog_size [lindex [$master config get repl-backlog-size] 1]
# Verify used_memory stays low
set max_retry 1000
set prev_buf_size 0
while {$max_retry} {
assert_lessthan [expr [s 0 used_memory] - $prev_used] 20000000
assert_lessthan_equal [s 0 mem_total_replication_buffers] [expr {$backlog_size + 1000000}]
# Check replica state
if {[string match *slave0*state=online* [$master info]] &&
[s -1 master_link_status] == "up"} {
break
} else {
incr max_retry -1
after 10
}
}
if {$max_retry == 0} {
error "assertion:Replica not in sync after 10 seconds"
}
stop_write_load $load_handle
}
}
}
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set repl-rdb-channel yes
$replica config set repl-rdb-channel yes
test "Test replication stream buffer becomes full on replica" {
# For replication stream accumulation, replica inherits slave output
# buffer limit as the size limit. In this test, we create traffic to
# fill the buffer fully. Once the limit is reached, accumulation
# will stop. This is not a failure scenario though. From that point,
# further accumulation may occur on master side. Replication should
# be completed successfully.
# Create some artificial delay for rdb delivery and load. We'll
# generate some traffic to fill the replication buffer.
$master config set rdb-key-save-delay 1000
$replica config set key-load-delay 1000
$replica config set client-output-buffer-limit "replica 64kb 64kb 0"
populate 2000 master 1
set prev_sync_full [s 0 sync_full]
$replica replicaof $master_host $master_port
# Wait for replica to establish psync using main channel
wait_for_condition 500 1000 {
[string match "*state=send_bulk_and_stream*" [s 0 slave0]]
} else {
fail "replica didn't start sync"
}
# Create some traffic on replication stream
populate 100 master 100000
# Wait for replica's buffer limit reached
wait_for_log_messages -1 {"*Replication buffer limit has been reached*"} 0 1000 10
# Speed up loading
$replica config set key-load-delay 0
# Wait until sync is successful
wait_for_condition 200 200 {
[status $master master_repl_offset] eq [status $replica master_repl_offset] &&
[status $master master_repl_offset] eq [status $replica slave_repl_offset]
} else {
fail "replica offsets didn't match in time"
}
# Verify sync was not interrupted.
assert_equal [s 0 sync_full] [expr $prev_sync_full + 1]
# Verify db's are identical
assert_morethan [$master dbsize] 0
assert_equal [$master debug digest] [$replica debug digest]
}
test "Test replication stream buffer config replica-full-sync-buffer-limit" {
# By default, replica inherits client-output-buffer-limit of replica
# to limit accumulated repl data during rdbchannel sync.
# replica-full-sync-buffer-limit should override it if it is set.
$replica replicaof no one
# Create some artificial delay for rdb delivery and load. We'll
# generate some traffic to fill the replication buffer.
$master config set rdb-key-save-delay 1000
$replica config set key-load-delay 1000
$replica config set client-output-buffer-limit "replica 1024 1024 0"
$replica config set replica-full-sync-buffer-limit 20mb
populate 2000 master 1
$replica replicaof $master_host $master_port
# Wait until replication starts
wait_for_condition 500 1000 {
[string match "*state=send_bulk_and_stream*" [s 0 slave0]]
} else {
fail "replica didn't start sync"
}
# Create some traffic on replication stream
populate 100 master 100000
# Make sure config is used, we accumulated more than
# client-output-buffer-limit
assert_morethan [s -1 replica_full_sync_buffer_size] 1024
}
}
}
start_server {tags {"repl external:skip"}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
set master_pid [srv 0 pid]
set loglines [count_log_lines 0]
$master config set repl-diskless-sync yes
$master config set repl-rdb-channel yes
$master config set repl-backlog-size 1mb
$master config set client-output-buffer-limit "replica 100k 0 0"
$master config set loglevel debug
$master config set repl-diskless-sync-delay 3
start_server {} {
set replica [srv 0 client]
set replica_pid [srv 0 pid]
$replica config set repl-rdb-channel yes
$replica config set loglevel debug
$replica config set repl-timeout 10
$replica config set key-load-delay 10000
$replica config set loading-process-events-interval-bytes 1024
test "Test master disconnects replica when output buffer limit is reached" {
populate 20000 master 100 -1
$replica replicaof $master_host $master_port
wait_for_condition 50 200 {
[s 0 loading] == 1
} else {
fail "[s 0 loading] sdsdad"
}
# Generate some traffic for backlog ~2mb
populate 20 master 1000000 -1
set res [wait_for_log_messages -1 {"*Client * closed * for overcoming of output buffer limits.*"} $loglines 1000 10]
set loglines [lindex $res 1]
$replica config set key-load-delay 0
# Wait until replica loads RDB
wait_for_log_messages 0 {"*Done loading RDB*"} 0 1000 10
}
test "Test replication recovers after output buffer failures" {
# Verify system is operational
$master set x 1
# Wait until replica catches up
wait_replica_online $master 0 1000 100
wait_for_ofs_sync $master $replica
# Verify db's are identical
assert_morethan [$master dbsize] 0
assert_equal [$replica get x] 1
assert_equal [$master debug digest] [$replica debug digest]
}
}
}
start_server {tags {"repl external:skip"}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set repl-diskless-sync yes
$master config set repl-rdb-channel yes
$master config set rdb-key-save-delay 300
$master config set client-output-buffer-limit "replica 0 0 0"
$master config set repl-diskless-sync-delay 5
$master config set loglevel debug
populate 10000 master 1
start_server {} {
set replica1 [srv 0 client]
$replica1 config set repl-rdb-channel yes
$replica1 config set loglevel debug
start_server {} {
set replica2 [srv 0 client]
$replica2 config set repl-rdb-channel yes
$replica2 config set loglevel debug
set load_handle [start_write_load $master_host $master_port 100 "key"]
test "Test master continues RDB delivery if not all replicas are dropped" {
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 200 {
[s -2 rdb_bgsave_in_progress] == 1
} else {
fail "Sync did not start"
}
# Wait for both replicas main conns to establish psync
wait_for_condition 500 100 {
[s -2 connected_slaves] == 2
} else {
fail "Replicas didn't establish psync:
sync_partial_ok: [s -2 sync_partial_ok]"
}
# kill one of the replicas
catch {$replica1 shutdown nosave}
# Wait until replica completes full sync
# Verify there is no other full sync attempt
wait_for_condition 50 1000 {
[s 0 master_link_status] == "up" &&
[s -2 sync_full] == 2 &&
[s -2 connected_slaves] == 1
} else {
fail "Sync session did not continue
master_link_status: [s 0 master_link_status]
sync_full:[s -2 sync_full]
connected_slaves: [s -2 connected_slaves]"
}
}
test "Test master aborts rdb delivery if all replicas are dropped" {
$replica2 replicaof no one
# Start replication
$replica2 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[s -2 rdb_bgsave_in_progress] == 1
} else {
fail "Sync did not start"
}
set loglines [count_log_lines -2]
# kill replica
catch {$replica2 shutdown nosave}
# Verify master aborts rdb save
wait_for_condition 50 1000 {
[s -2 rdb_bgsave_in_progress] == 0 &&
[s -2 connected_slaves] == 0
} else {
fail "Master should abort the sync
rdb_bgsave_in_progress:[s -2 rdb_bgsave_in_progress]
connected_slaves: [s -2 connected_slaves]"
}
wait_for_log_messages -2 {"*Background transfer error*"} $loglines 1000 50
}
stop_write_load $load_handle
}
}
}
start_server {tags {"repl external:skip"}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set repl-diskless-sync yes
$master config set repl-rdb-channel yes
$master config set loglevel debug
$master config set rdb-key-save-delay 1000
populate 3000 prefix1 1
populate 100 prefix2 100000
start_server {} {
set replica [srv 0 client]
set replica_pid [srv 0 pid]
$replica config set repl-rdb-channel yes
$replica config set loglevel debug
$replica config set repl-timeout 10
set load_handle [start_write_load $master_host $master_port 100 "key"]
test "Test replica recovers when rdb channel connection is killed" {
$replica replicaof $master_host $master_port
# Wait for sync session to start
wait_for_condition 500 200 {
[string match "*state=send_bulk_and_stream*" [s -1 slave0]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "replica didn't start sync session in time"
}
set loglines [count_log_lines -1]
# Kill rdb channel client
set id [get_replica_client_id $master yes]
$master client kill id $id
wait_for_log_messages -1 {"*Background transfer error*"} $loglines 1000 10
# Verify master rejects main-ch-client-id after connection is killed
assert_error {*Unrecognized*} {$master replconf main-ch-client-id $id}
# Replica should retry
wait_for_condition 500 200 {
[string match "*state=send_bulk_and_stream*" [s -1 slave0]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "replica didn't retry after connection close"
}
}
test "Test replica recovers when main channel connection is killed" {
set loglines [count_log_lines -1]
# Kill main channel client
set id [get_replica_client_id $master yes]
$master client kill id $id
wait_for_log_messages -1 {"*Background transfer error*"} $loglines 1000 20
# Replica should retry
wait_for_condition 500 2000 {
[string match "*state=send_bulk_and_stream*" [s -1 slave0]] &&
[s -1 rdb_bgsave_in_progress] eq 1
} else {
fail "replica didn't retry after connection close"
}
}
stop_write_load $load_handle
test "Test replica recovers connection failures" {
# Wait until replica catches up
wait_replica_online $master 0 1000 100
wait_for_ofs_sync $master $replica
# Verify db's are identical
assert_morethan [$master dbsize] 0
assert_equal [$master debug digest] [$replica debug digest]
}
}
}
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
set replica_pid [srv 0 pid]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
test "Test master connection drops while streaming repl buffer into the db" {
# Just after replica loads RDB, it will stream repl buffer into the
# db. During streaming, we kill the master connection. Replica
# will abort streaming and then try another psync with master.
$master config set rdb-key-save-delay 1000
$master config set repl-rdb-channel yes
$master config set repl-diskless-sync yes
$replica config set repl-rdb-channel yes
$replica config set loading-process-events-interval-bytes 1024
# Populate db and start write traffic
populate 2000 master 1000
set load_handle [start_write_load $master_host $master_port 100 "key1"]
# Replica will pause in the loop of repl buffer streaming
$replica debug repl-pause on-streaming-repl-buf
$replica replicaof $master_host $master_port
# Check if repl stream accumulation is started.
wait_for_condition 50 1000 {
[s -1 replica_full_sync_buffer_size] > 0
} else {
fail "repl stream accumulation not started"
}
# Wait until replica starts streaming repl buffer
wait_for_log_messages -1 {"*Starting to stream replication buffer*"} 0 2000 10
stop_write_load $load_handle
$master config set rdb-key-save-delay 0
# Kill master connection and resume the process
$replica deferred 1
$replica client kill type master
$replica debug repl-pause clear
resume_process $replica_pid
$replica read
$replica read
$replica deferred 0
wait_for_log_messages -1 {"*Master client was freed while streaming*"} 0 500 10
# Quick check for stats test coverage
assert_morethan_equal [s -1 replica_full_sync_buffer_peak] [s -1 replica_full_sync_buffer_size]
# Wait until replica recovers and verify db's are identical
wait_replica_online $master 0 1000 10
wait_for_ofs_sync $master $replica
assert_morethan [$master dbsize] 0
assert_equal [$master debug digest] [$replica debug digest]
}
}
}
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
set replica_pid [srv 0 pid]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
test "Test main channel connection drops while loading rdb (disk based)" {
# While loading rdb, we kill main channel connection.
# We expect replica to complete loading RDB and then try psync
# with the master.
$master config set repl-rdb-channel yes
$replica config set repl-rdb-channel yes
$replica config set repl-diskless-load disabled
$replica config set key-load-delay 10000
$replica config set loading-process-events-interval-bytes 1024
# Populate db and start write traffic
populate 10000 master 100
$replica replicaof $master_host $master_port
# Wait until replica starts loading
wait_for_condition 50 200 {
[s -1 loading] == 1
} else {
fail "replica did not start loading"
}
# Kill replica connections
$master client kill type replica
$master set x 1
# At this point, we expect replica to complete loading RDB. Then,
# it will try psync with master.
wait_for_log_messages -1 {"*Aborting rdb channel sync while loading the RDB*"} 0 2000 10
wait_for_log_messages -1 {"*After loading RDB, replica will try psync with master*"} 0 2000 10
# Speed up loading
$replica config set key-load-delay 0
# Wait until replica becomes online
wait_replica_online $master 0 100 100
# Verify there is another successful psync and no other full sync
wait_for_condition 50 200 {
[s 0 sync_full] == 1 &&
[s 0 sync_partial_ok] == 1
} else {
fail "psync was not successful [s 0 sync_full] [s 0 sync_partial_ok]"
}
# Verify db's are identical after recovery
wait_for_ofs_sync $master $replica
assert_morethan [$master dbsize] 0
assert_equal [$master debug digest] [$replica debug digest]
}
}
}
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
set replica_pid [srv 0 pid]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
test "Test main channel connection drops while loading rdb (diskless)" {
# While loading rdb, kill both main and rdbchannel connections.
# We expect replica to abort sync and later retry again.
$master config set repl-rdb-channel yes
$replica config set repl-rdb-channel yes
$replica config set repl-diskless-load swapdb
$replica config set key-load-delay 10000
$replica config set loading-process-events-interval-bytes 1024
# Populate db and start write traffic
populate 10000 master 100
$replica replicaof $master_host $master_port
# Wait until replica starts loading
wait_for_condition 50 200 {
[s -1 loading] == 1
} else {
fail "replica did not start loading"
}
# Kill replica connections
$master client kill type replica
$master set x 1
# At this point, we expect replica to abort loading RDB.
wait_for_log_messages -1 {"*Aborting rdb channel sync while loading the RDB*"} 0 2000 10
wait_for_log_messages -1 {"*Failed trying to load the MASTER synchronization DB from socket*"} 0 2000 10
# Speed up loading
$replica config set key-load-delay 0
stop_write_load $load_handle
# Wait until replica recovers and becomes online
wait_replica_online $master 0 100 100
# Verify replica attempts another full sync
wait_for_condition 50 200 {
[s 0 sync_full] == 2 &&
[s 0 sync_partial_ok] == 0
} else {
fail "sync was not successful [s 0 sync_full] [s 0 sync_partial_ok]"
}
# Verify db's are identical after recovery
wait_for_ofs_sync $master $replica
assert_morethan [$master dbsize] 0
assert_equal [$master debug digest] [$replica debug digest]
}
}
}

View File

@ -1,3 +1,16 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
proc log_file_matches {log pattern} {
set fp [open $log r]
set content [read $fp]
@ -303,7 +316,7 @@ start_server {tags {"repl external:skip"}} {
}
}
foreach mdl {no yes} {
foreach mdl {no yes} rdbchannel {no yes} {
foreach sdl {disabled swapdb} {
start_server {tags {"repl external:skip"} overrides {save {}}} {
set master [srv 0 client]
@ -319,7 +332,13 @@ foreach mdl {no yes} {
lappend slaves [srv 0 client]
start_server {overrides {save {}}} {
lappend slaves [srv 0 client]
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" {
test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl, rdbchannel=$rdbchannel" {
$master config set repl-rdb-channel $rdbchannel
[lindex $slaves 0] config set repl-rdb-channel $rdbchannel
[lindex $slaves 1] config set repl-rdb-channel $rdbchannel
[lindex $slaves 2] config set repl-rdb-channel $rdbchannel
# start load handles only inside the test, so that the test can be skipped
set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000]
set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000]
@ -438,7 +457,7 @@ start_server {tags {"repl external:skip"} overrides {save {}}} {
}
# Diskless load swapdb when NOT async_loading (different master replid)
foreach testType {Successful Aborted} {
foreach testType {Successful Aborted} rdbchannel {yes no} {
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
set replica_host [srv 0 host]
@ -453,6 +472,7 @@ foreach testType {Successful Aborted} {
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 0
$master config set save ""
$master config set repl-rdb-channel $rdbchannel
$replica config set repl-diskless-load swapdb
$replica config set save ""
@ -474,7 +494,7 @@ foreach testType {Successful Aborted} {
# Start the replication process
$replica replicaof $master_host $master_port
test {Diskless load swapdb (different replid): replica enter loading} {
test "Diskless load swapdb (different replid): replica enter loading rdbchannel=$rdbchannel" {
# Wait for the replica to start reading the rdb
wait_for_condition 100 100 {
[s -1 loading] eq 1
@ -498,7 +518,7 @@ foreach testType {Successful Aborted} {
fail "Replica didn't disconnect"
}
test {Diskless load swapdb (different replid): old database is exposed after replication fails} {
test "Diskless load swapdb (different replid): old database is exposed after replication fails rdbchannel=$rdbchannel" {
# Ensure we see old values from replica
assert_equal [$replica get mykey] "myvalue"
@ -590,8 +610,8 @@ foreach testType {Successful Aborted} {
if {$testType == "Aborted"} {
# Set master with a slow rdb generation, so that we can easily intercept loading
# 10ms per key, with 2000 keys is 20 seconds
$master config set rdb-key-save-delay 10000
# 20ms per key, with 2000 keys is 40 seconds
$master config set rdb-key-save-delay 20000
}
# Force the replica to try another full sync (this time it will have matching master replid)
@ -862,6 +882,7 @@ start_server {tags {"repl external:skip"} overrides {save ""}} {
# we also need the replica to process requests during transfer (which it does only once in 2mb)
$master debug populate 20000 test 10000
$master config set rdbcompression no
$master config set repl-rdb-channel no
# If running on Linux, we also measure utime/stime to detect possible I/O handling issues
set os [catch {exec uname}]
set measure_time [expr {$os == "Linux"} ? 1 : 0]
@ -1009,6 +1030,7 @@ test "diskless replication child being killed is collected" {
set master_pid [srv 0 pid]
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 0
$master config set repl-rdb-channel no
# put enough data in the db that the rdb file will be bigger than the socket buffers
$master debug populate 20000 test 10000
$master config set rdbcompression no
@ -1269,7 +1291,8 @@ start_server {tags {"repl external:skip"}} {
r slaveof $master2_host $master2_port
wait_for_condition 50 100 {
([s -2 rdb_bgsave_in_progress] == 1) &&
([string match "*wait_bgsave*" [s -2 slave0]])
([string match "*wait_bgsave*" [s -2 slave0]] ||
[string match "*send_bulk_and_stream*" [s -2 slave0]])
} else {
fail "full sync didn't start"
}

View File

@ -1,3 +1,16 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
proc randstring {min max {type binary}} {
set len [expr {$min+int(rand()*($max-$min+1))}]
set output {}
@ -118,11 +131,11 @@ proc wait_for_sync r {
}
}
proc wait_replica_online r {
wait_for_condition 50 100 {
[string match "*slave0:*,state=online*" [$r info replication]]
proc wait_replica_online {r {replica_id 0} {maxtries 50} {delay 100}} {
wait_for_condition $maxtries $delay {
[string match "*slave$replica_id:*,state=online*" [$r info replication]]
} else {
fail "replica didn't online in time"
fail "replica $replica_id did not become online in time"
}
}
@ -565,10 +578,11 @@ proc find_valgrind_errors {stderr on_termination} {
}
# Execute a background process writing random data for the specified number
# of seconds to the specified Redis instance.
proc start_write_load {host port seconds} {
# of seconds to the specified Redis instance. If key is omitted, a random key
# is used for every SET command.
proc start_write_load {host port seconds {key ""}} {
set tclsh [info nameofexecutable]
exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls &
exec $tclsh tests/helpers/gen_write_load.tcl $host $port $seconds $::tls $key &
}
# Stop a process generating write load executed with start_write_load.
@ -677,6 +691,12 @@ proc pause_process pid {
}
proc resume_process pid {
wait_for_condition 50 1000 {
[string match "T*" [exec ps -o state= -p $pid]]
} else {
puts [exec ps j $pid]
fail "process was not stopped"
}
exec kill -SIGCONT $pid
}

View File

@ -1,3 +1,16 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of the Redis Source Available License 2.0
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
start_server {tags {"auth external:skip"}} {
test {AUTH fails if there is no password configured server side} {
catch {r auth foo} err
@ -65,24 +78,29 @@ start_server {tags {"auth_binary_password external:skip"}} {
set master_port [srv -1 port]
set slave [srv 0 client]
test {MASTERAUTH test with binary password} {
$master config set requirepass "abc\x00def"
foreach rdbchannel {yes no} {
test "MASTERAUTH test with binary password rdbchannel=$rdbchannel" {
$slave slaveof no one
$master config set requirepass "abc\x00def"
$master config set repl-rdb-channel $rdbchannel
# Configure the replica with masterauth
set loglines [count_log_lines 0]
$slave config set masterauth "abc"
$slave slaveof $master_host $master_port
# Configure the replica with masterauth
set loglines [count_log_lines 0]
$slave config set masterauth "abc"
$slave config set repl-rdb-channel $rdbchannel
$slave slaveof $master_host $master_port
# Verify replica is not able to sync with master
wait_for_log_messages 0 {"*Unable to AUTH to MASTER*"} $loglines 1000 10
assert_equal {down} [s 0 master_link_status]
# Test replica with the correct masterauth
$slave config set masterauth "abc\x00def"
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Can't turn the instance into a replica"
# Verify replica is not able to sync with master
wait_for_log_messages 0 {"*Unable to AUTH to MASTER*"} $loglines 1000 10
assert_equal {down} [s 0 master_link_status]
# Test replica with the correct masterauth
$slave config set masterauth "abc\x00def"
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Can't turn the instance into a replica"
}
}
}
}