diff --git a/src/module.c b/src/module.c index b50c316a3..95327dfdb 100644 --- a/src/module.c +++ b/src/module.c @@ -857,7 +857,7 @@ void moduleCallCommandUnblockedHandler(client *c) { moduleCreateContext(&ctx, module, REDISMODULE_CTX_TEMP_CLIENT); selectDb(ctx.client, c->db->id); - CallReply *reply = moduleParseReply(c, &ctx); + CallReply *reply = moduleParseReply(c, NULL); module->in_call++; promise->on_unblocked(&ctx, reply, promise->private_data); module->in_call--; diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 9b04fda6a..92060fd33 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -321,6 +321,62 @@ int do_rm_call_async(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ return REDISMODULE_OK; } +typedef struct ThreadedAsyncRMCallCtx{ + RedisModuleBlockedClient *bc; + RedisModuleCallReply *reply; +} ThreadedAsyncRMCallCtx; + +void *send_async_reply(void *arg) { + ThreadedAsyncRMCallCtx *ta_rm_call_ctx = arg; + rm_call_async_on_unblocked(NULL, ta_rm_call_ctx->reply, ta_rm_call_ctx->bc); + RedisModule_Free(ta_rm_call_ctx); + return NULL; +} + +/* Called when the command that was blocked on 'RM_Call' gets unblocked + * and schedule a thread to send the reply to the blocked client. */ +static void rm_call_async_reply_on_thread(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) { + UNUSED(ctx); + ThreadedAsyncRMCallCtx *ta_rm_call_ctx = RedisModule_Alloc(sizeof(*ta_rm_call_ctx)); + ta_rm_call_ctx->bc = private_data; + ta_rm_call_ctx->reply = reply; + pthread_t tid; + int res = pthread_create(&tid, NULL, send_async_reply, ta_rm_call_ctx); + assert(res == 0); +} + +/* + * Callback for do_rm_call_async_on_thread. + * Gets the command to invoke as the first argument to the command and runs it, + * passing the rest of the arguments to the command invocation. + * If the command got blocked, blocks the client and unblock on a background thread. + * this allows check the K (allow blocking) argument to RM_Call, and make sure that the reply + * that passes to unblock handler is owned by the handler and are not attached to any + * context that might be freed after the callback ends. + */ +int do_rm_call_async_on_thread(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ + UNUSED(argv); + UNUSED(argc); + + if(argc < 2){ + return RedisModule_WrongArity(ctx); + } + + const char* cmd = RedisModule_StringPtrLen(argv[1], NULL); + + RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "KEv", argv + 2, argc - 2); + + if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) { + rm_call_async_send_reply(ctx, rep); + } else { + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0); + RedisModule_CallReplyPromiseSetUnblockHandler(rep, rm_call_async_reply_on_thread, bc); + RedisModule_FreeCallReply(rep); + } + + return REDISMODULE_OK; +} + /* Private data for wait_and_do_rm_call_async that holds information about: * 1. the block client, to unblock when done. * 2. the arguments, contains the command to run using RM_Call */ @@ -604,6 +660,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "write", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "do_rm_call_async_on_thread", do_rm_call_async_on_thread, + "write", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "do_rm_call_async_script_mode", do_rm_call_async, "write", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; diff --git a/tests/unit/moduleapi/async_rm_call.tcl b/tests/unit/moduleapi/async_rm_call.tcl index 142b098f0..4799ea124 100644 --- a/tests/unit/moduleapi/async_rm_call.tcl +++ b/tests/unit/moduleapi/async_rm_call.tcl @@ -15,7 +15,18 @@ start_server {tags {"modules"}} { assert_equal {0} [r llen l] } - foreach cmd {do_rm_call_async do_rm_call_async_script_mode} { + test "Blpop on threaded async RM_Call" { + set rd [redis_deferring_client] + + $rd do_rm_call_async_on_thread blpop l 0 + wait_for_blocked_clients_count 1 + r lpush l a + assert_equal [$rd read] {l a} + wait_for_blocked_clients_count 0 + $rd close + } + + foreach cmd {do_rm_call_async do_rm_call_async_script_mode } { test "Blpop on async RM_Call using $cmd" { set rd [redis_deferring_client] @@ -25,6 +36,7 @@ start_server {tags {"modules"}} { r lpush l a assert_equal [$rd read] {l a} wait_for_blocked_clients_count 0 + $rd close } test "Brpop on async RM_Call using $cmd" { @@ -35,6 +47,7 @@ start_server {tags {"modules"}} { r lpush l a assert_equal [$rd read] {l a} wait_for_blocked_clients_count 0 + $rd close } test "Brpoplpush on async RM_Call using $cmd" { @@ -45,6 +58,7 @@ start_server {tags {"modules"}} { r lpush l1 a assert_equal [$rd read] {a} wait_for_blocked_clients_count 0 + $rd close r lpop l2 } {a} @@ -56,6 +70,7 @@ start_server {tags {"modules"}} { r lpush l1 a assert_equal [$rd read] {a} wait_for_blocked_clients_count 0 + $rd close r lpop l2 } {a} @@ -67,6 +82,7 @@ start_server {tags {"modules"}} { r zadd s 10 foo assert_equal [$rd read] {s foo 10} wait_for_blocked_clients_count 0 + $rd close } test "Bzpopmax on async RM_Call using $cmd" { @@ -77,6 +93,7 @@ start_server {tags {"modules"}} { r zadd s 10 foo assert_equal [$rd read] {s foo 10} wait_for_blocked_clients_count 0 + $rd close } } @@ -88,6 +105,7 @@ start_server {tags {"modules"}} { r lpush l a assert_equal [$rd read] {l a} wait_for_blocked_clients_count 0 + $rd close } test {Test multiple async RM_Call waiting on the same event} { @@ -101,6 +119,8 @@ start_server {tags {"modules"}} { assert_equal [$rd1 read] {l element} assert_equal [$rd2 read] {l element} wait_for_blocked_clients_count 0 + $rd1 close + $rd2 close } test {async RM_Call calls RM_Call} { @@ -136,6 +156,7 @@ start_server {tags {"modules"}} { } wait_for_blocked_clients_count 0 + $rd close } test {Become replica while having async RM_Call running} { @@ -156,6 +177,7 @@ start_server {tags {"modules"}} { r lpush l 1 # make sure the async rm_call was aborted assert_equal [r llen l] {1} + $rd close } test {Pipeline with blocking RM_Call} { @@ -175,6 +197,7 @@ start_server {tags {"modules"}} { assert_equal [$rd read] {PONG} wait_for_blocked_clients_count 0 + $rd close } test {blocking RM_Call abort} { @@ -195,6 +218,7 @@ start_server {tags {"modules"}} { r lpush l 1 # make sure the async rm_call was aborted assert_equal [r llen l] {1} + $rd close } } @@ -220,6 +244,7 @@ start_server {tags {"modules"}} { close_replication_stream $repl wait_for_blocked_clients_count 0 + $rd close } test {Test unblock handler are executed as a unit} { @@ -245,6 +270,7 @@ start_server {tags {"modules"}} { close_replication_stream $repl wait_for_blocked_clients_count 0 + $rd close } test {Test no propagation of blocking command} { @@ -269,6 +295,7 @@ start_server {tags {"modules"}} { close_replication_stream $repl wait_for_blocked_clients_count 0 + $rd close } } @@ -303,6 +330,7 @@ start_server {tags {"modules"}} { close_replication_stream $repl wait_for_blocked_clients_count 0 + $rd close } test {Test unblock handler are executed as a unit with lazy expire} { @@ -355,9 +383,10 @@ start_server {tags {"modules"}} { } close_replication_stream $repl r DEBUG SET-ACTIVE-EXPIRE 1 + + wait_for_blocked_clients_count 0 + $rd close } - - wait_for_blocked_clients_count 0 } start_server {tags {"modules"}} { @@ -376,5 +405,6 @@ start_server {tags {"modules"}} { assert_equal [$rd read] {4} wait_for_blocked_clients_count 0 + $rd close } -} \ No newline at end of file +}