From 2ba8de9d5ead4c9bb70d38caad99a2b40b50de79 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 28 May 2023 10:10:52 +0300 Subject: [PATCH] Fix WAIT for clients being blocked in a module command (#12220) So far clients being blocked and unblocked by a module command would update the c->woff variable and so WAIT was ineffective and got released without waiting for the command actions to propagate. This seems to have existed since forever, but not for RM_BlockClientOnKeys. It is problematic though to know if the module did or didn't propagate anything in that command, so for now, instead of adding an API, we'll just update the woff to the latest offset when unblocking, this will cause the client to possibly wait excessively, but that's not that bad. (cherry picked from commit 6117f2882286e972d934fd652b69433528f46e3f) --- src/module.c | 5 ++++ tests/modules/blockonkeys.c | 22 ++++++++++++++++ tests/support/util.tcl | 22 +++++++++++++--- tests/unit/moduleapi/blockedclient.tcl | 24 ++++++++++++++++++ tests/unit/moduleapi/blockonkeys.tcl | 35 ++++++++++++++++++++++++++ 5 files changed, 104 insertions(+), 4 deletions(-) diff --git a/src/module.c b/src/module.c index 11d5a61f9..b5db0ca55 100644 --- a/src/module.c +++ b/src/module.c @@ -7559,6 +7559,11 @@ void moduleHandleBlockedClients(void) { * properly unblocked by the module. */ bc->disconnect_callback = NULL; unblockClient(c); + + /* Update the wait offset, we don't know if this blocked client propagated anything, + * currently we rather not add any API for that, so we just assume it did. */ + c->woff = server.master_repl_offset; + /* Put the client in the list of clients that need to write * if there are pending replies here. This is needed since * during a non blocking command the client may receive output. */ diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 1aa576489..ce0c67cba 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -113,6 +113,8 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { fsl->list[fsl->length++] = ele; RedisModule_SignalKeyAsReady(ctx, argv[1]); + RedisModule_ReplicateVerbatim(ctx); + return RedisModule_ReplyWithSimpleString(ctx, "OK"); } @@ -126,6 +128,9 @@ int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); return REDISMODULE_OK; } @@ -161,6 +166,8 @@ int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { NULL, timeout, &argv[1], 1, NULL); } else { RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); } return REDISMODULE_OK; @@ -180,6 +187,8 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg return REDISMODULE_ERR; RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); return REDISMODULE_OK; } @@ -220,6 +229,8 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { bpopgt_free_privdata, timeout, &argv[1], 1, pgt); } else { RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); } return REDISMODULE_OK; @@ -242,6 +253,8 @@ int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int a long long ele = src->list[--src->length]; dst->list[dst->length++] = ele; RedisModule_SignalKeyAsReady(ctx, dst_keyname); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); return RedisModule_ReplyWithLongLong(ctx, ele); } @@ -284,6 +297,8 @@ int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { dst->list[dst->length++] = ele; RedisModule_SignalKeyAsReady(ctx, argv[2]); RedisModule_ReplyWithLongLong(ctx, ele); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); } return REDISMODULE_OK; @@ -320,6 +335,8 @@ int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **a RedisModule_ReplyWithString(ctx, elem); RedisModule_FreeString(ctx, elem); } + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); RedisModule_ReplySetArrayLength(ctx, len); } else { RedisModule_ReplyWithError(ctx, "ERR Not a list"); @@ -385,6 +402,7 @@ int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) { RedisModule_SignalKeyAsReady(ctx, argv[1]); } + RedisModule_ReplicateVerbatim(ctx); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } @@ -403,6 +421,8 @@ int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **a RedisModule_ReplyWithString(ctx, elem); RedisModule_FreeString(ctx, elem); } + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); result = REDISMODULE_OK; } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST || RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) { @@ -446,6 +466,8 @@ int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_ReplyWithString(ctx, elem); RedisModule_FreeString(ctx, elem); } + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); } else { RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback, blockonkeys_blpopn_timeout_callback, diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 1ed2621cb..27dfcd5ca 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -642,6 +642,20 @@ proc process_is_alive pid { } } +proc pause_process pid { + exec kill -SIGSTOP $pid + wait_for_condition 50 100 { + [string match {*T*} [lindex [exec ps j $pid] 16]] + } else { + puts [exec ps j $pid] + fail "process didn't stop" + } +} + +proc resume_process pid { + exec kill -SIGCONT $pid +} + proc cmdrstat {cmd r} { if {[regexp "\r\ncmdstat_$cmd:(.*?)\r\n" [$r info commandstats] _ value]} { set _ $value @@ -877,17 +891,17 @@ proc debug_digest {{level 0}} { r $level debug digest } -proc wait_for_blocked_client {} { +proc wait_for_blocked_client {{idx 0}} { wait_for_condition 50 100 { - [s blocked_clients] ne 0 + [s $idx blocked_clients] ne 0 } else { fail "no blocked clients" } } -proc wait_for_blocked_clients_count {count {maxtries 100} {delay 10}} { +proc wait_for_blocked_clients_count {count {maxtries 100} {delay 10} {idx 0}} { wait_for_condition $maxtries $delay { - [s blocked_clients] == $count + [s $idx blocked_clients] == $count } else { fail "Timeout waiting for blocked clients" } diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index 2cb44788e..bdcef96b1 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -247,6 +247,30 @@ foreach call_type {nested normal} { assert_match {*calls=2,*,rejected_calls=0,failed_calls=2} [cmdrstat do_bg_rm_call r] } + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + start_server [list overrides [list loadmodule "$testmodule"]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + + # Start the replication process... + $replica replicaof $master_host $master_port + wait_for_sync $replica + + test {WAIT command on module blocked client} { + pause_process [srv 0 pid] + + $master do_bg_rm_call_format ! hset bk1 foo bar + + assert_equal [$master wait 1 1000] 0 + resume_process [srv 0 pid] + assert_equal [$master wait 1 1000] 1 + assert_equal [$replica hget bk1 foo] bar + } + } + test "Unload the module - blockedclient" { assert_equal {OK} [r module unload blockedclient] } diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index 094bcc0c0..5e916c516 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -268,4 +268,39 @@ start_server {tags {"modules"}} { assert_equal {gg ff ee dd cc} [$rd read] $rd close } + + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + start_server [list overrides [list loadmodule "$testmodule"]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + + # Start the replication process... + $replica replicaof $master_host $master_port + wait_for_sync $replica + + test {WAIT command on module blocked client on keys} { + set rd [redis_deferring_client -1] + $rd set x y + $rd read + + pause_process [srv 0 pid] + + $master del k + $rd fsl.bpop k 0 + wait_for_blocked_client -1 + $master fsl.push k 34 + $master fsl.push k 35 + assert_equal {34} [$rd read] + + assert_equal [$master wait 1 1000] 0 + resume_process [srv 0 pid] + assert_equal [$master wait 1 1000] 1 + $rd close + assert_equal {35} [$replica fsl.getall k] + } + } + }