diff --git a/src/replication.c b/src/replication.c index 611913c92..c2cc96605 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2014,8 +2014,6 @@ void readSyncBulkPayload(connection *conn) { char buf[PROTO_IOBUF_LEN]; ssize_t nread, readlen, nwritten; int use_diskless_load = useDisklessLoad(); - redisDb *diskless_load_tempDb = NULL; - functionsLibCtx* temp_functions_lib_ctx = NULL; int rdbchannel = (conn == server.repl_rdb_transfer_s); int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; @@ -2208,17 +2206,9 @@ void readSyncBulkPayload(connection *conn) { killRDBChild(); } - if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* Initialize empty tempDb dictionaries. */ - diskless_load_tempDb = disklessLoadInitTempDb(); - temp_functions_lib_ctx = functionsLibCtxCreate(); - - moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, - REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, - NULL); - } else { + /* Attach to the new master immediately if we are not using swapdb. */ + if (!use_diskless_load || server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) replicationAttachToNewMaster(); - } /* Before loading the DB into memory we need to delete the readable * handler, otherwise it will get called recursively since @@ -2235,6 +2225,9 @@ void readSyncBulkPayload(connection *conn) { int asyncLoading = 0; if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, + REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, + NULL); /* Async loading means we continue serving read commands during full resync, and * "swap" the new db with the old db only when loading is done. * It is enabled only on SWAPDB diskless replication when master replication ID hasn't changed, @@ -2243,15 +2236,9 @@ void readSyncBulkPayload(connection *conn) { if (memcmp(server.replid, server.master_replid, CONFIG_RUN_ID_SIZE) == 0) { asyncLoading = 1; } - dbarray = diskless_load_tempDb; - functions_lib_ctx = temp_functions_lib_ctx; - } else { - dbarray = server.db; - functions_lib_ctx = functionsLibCtxGetCurrent(); - functionsLibCtxClear(functions_lib_ctx); } - disklessLoadingRio = &rdb; + /* Empty db */ loadingSetFlags(NULL, server.repl_transfer_size, asyncLoading); if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); @@ -2262,7 +2249,17 @@ void readSyncBulkPayload(connection *conn) { } loadingFireEvent(RDBFLAGS_REPLICATION); + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + dbarray = disklessLoadInitTempDb(); + functions_lib_ctx = functionsLibCtxCreate(); + } else { + dbarray = server.db; + functions_lib_ctx = functionsLibCtxGetCurrent(); + functionsLibCtxClear(functions_lib_ctx); + } + rioInitWithConn(&rdb,conn,server.repl_transfer_size); + disklessLoadingRio = &rdb; /* Put the socket in blocking mode to simplify RDB transfer. * We'll restore it when the RDB is received. */ @@ -2297,8 +2294,8 @@ void readSyncBulkPayload(connection *conn) { REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED, NULL); - disklessLoadDiscardTempDb(diskless_load_tempDb); - functionsLibCtxFree(temp_functions_lib_ctx); + disklessLoadDiscardTempDb(dbarray); + functionsLibCtxFree(functions_lib_ctx); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background"); } else { /* Remove the half-loaded data in case we started with an empty replica. */ @@ -2328,17 +2325,17 @@ void readSyncBulkPayload(connection *conn) { replicationAttachToNewMaster(); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB"); - swapMainDbWithTempDb(diskless_load_tempDb); + swapMainDbWithTempDb(dbarray); /* swap existing functions ctx with the temporary one */ - functionsLibCtxSwapWithCurrent(temp_functions_lib_ctx); + functionsLibCtxSwapWithCurrent(functions_lib_ctx); moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD, REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED, NULL); /* Delete the old db as it's useless now. */ - disklessLoadDiscardTempDb(diskless_load_tempDb); + disklessLoadDiscardTempDb(dbarray); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding old DB in background"); } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0914c0e55..7ba6e2f59 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -1619,3 +1619,32 @@ start_server {tags {"repl external:skip"}} { } } } + +start_server {tags {"repl external:skip"}} { + set replica [srv 0 client] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + test "Test replication with functions when repl-diskless-load is set to on-empty-db" { + $replica config set repl-diskless-load on-empty-db + + populate 10 master 10 + $master function load {#!lua name=test + redis.register_function{function_name='func1', callback=function() return 'hello' end, flags={'no-writes'}} + } + + $replica replicaof $master_host $master_port + + # Wait until replication is completed + wait_replica_online $master 0 1000 100 + wait_for_ofs_sync $master $replica + + # Sanity check + assert_equal [$replica fcall func1 0] "hello" + assert_morethan [$replica dbsize] 0 + assert_equal [$master debug digest] [$replica debug digest] + } + } +}