From 153f8f082e96da08a2d1763a8e45db1bcd1209ff Mon Sep 17 00:00:00 2001 From: "Meir Shpilraien (Spielrein)" Date: Sun, 25 Jun 2023 14:12:27 +0300 Subject: [PATCH] Fix use after free on blocking RM_Call. (#12342) blocking RM_Call was introduced on: #11568, It allows a module to perform blocking commands and get the reply asynchronously.If the command gets block, a special promise CallReply is returned that allow to set the unblock handler. The unblock handler will be called when the command invocation finish and it gets, as input, the command real reply. The issue was that the real CallReply was created using a stack allocated RedisModuleCtx which is no longer available after the unblock handler finishes. So if the module keeps the CallReply after the unblock handler finished, the CallReply holds a pointer to invalid memory and will try to access it when the CallReply will be released. The solution is to create the CallReply with a NULL context to make it totally detached and can be freed freely when the module wants. Test was added to cover this case, running the test with valgrind before the fix shows the use after free error. With the fix, there are no valgrind errors. unrelated: adding a missing `$rd close` in many tests in that file. --- src/module.c | 2 +- tests/modules/blockedclient.c | 60 ++++++++++++++++++++++++++ tests/unit/moduleapi/async_rm_call.tcl | 38 ++++++++++++++-- 3 files changed, 95 insertions(+), 5 deletions(-) diff --git a/src/module.c b/src/module.c index b50c316a3..95327dfdb 100644 --- a/src/module.c +++ b/src/module.c @@ -857,7 +857,7 @@ void moduleCallCommandUnblockedHandler(client *c) { moduleCreateContext(&ctx, module, REDISMODULE_CTX_TEMP_CLIENT); selectDb(ctx.client, c->db->id); - CallReply *reply = moduleParseReply(c, &ctx); + CallReply *reply = moduleParseReply(c, NULL); module->in_call++; promise->on_unblocked(&ctx, reply, promise->private_data); module->in_call--; diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 9b04fda6a..92060fd33 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -321,6 +321,62 @@ int do_rm_call_async(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ return REDISMODULE_OK; } +typedef struct ThreadedAsyncRMCallCtx{ + RedisModuleBlockedClient *bc; + RedisModuleCallReply *reply; +} ThreadedAsyncRMCallCtx; + +void *send_async_reply(void *arg) { + ThreadedAsyncRMCallCtx *ta_rm_call_ctx = arg; + rm_call_async_on_unblocked(NULL, ta_rm_call_ctx->reply, ta_rm_call_ctx->bc); + RedisModule_Free(ta_rm_call_ctx); + return NULL; +} + +/* Called when the command that was blocked on 'RM_Call' gets unblocked + * and schedule a thread to send the reply to the blocked client. */ +static void rm_call_async_reply_on_thread(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) { + UNUSED(ctx); + ThreadedAsyncRMCallCtx *ta_rm_call_ctx = RedisModule_Alloc(sizeof(*ta_rm_call_ctx)); + ta_rm_call_ctx->bc = private_data; + ta_rm_call_ctx->reply = reply; + pthread_t tid; + int res = pthread_create(&tid, NULL, send_async_reply, ta_rm_call_ctx); + assert(res == 0); +} + +/* + * Callback for do_rm_call_async_on_thread. + * Gets the command to invoke as the first argument to the command and runs it, + * passing the rest of the arguments to the command invocation. + * If the command got blocked, blocks the client and unblock on a background thread. + * this allows check the K (allow blocking) argument to RM_Call, and make sure that the reply + * that passes to unblock handler is owned by the handler and are not attached to any + * context that might be freed after the callback ends. + */ +int do_rm_call_async_on_thread(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ + UNUSED(argv); + UNUSED(argc); + + if(argc < 2){ + return RedisModule_WrongArity(ctx); + } + + const char* cmd = RedisModule_StringPtrLen(argv[1], NULL); + + RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "KEv", argv + 2, argc - 2); + + if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) { + rm_call_async_send_reply(ctx, rep); + } else { + RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0); + RedisModule_CallReplyPromiseSetUnblockHandler(rep, rm_call_async_reply_on_thread, bc); + RedisModule_FreeCallReply(rep); + } + + return REDISMODULE_OK; +} + /* Private data for wait_and_do_rm_call_async that holds information about: * 1. the block client, to unblock when done. * 2. the arguments, contains the command to run using RM_Call */ @@ -604,6 +660,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "write", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "do_rm_call_async_on_thread", do_rm_call_async_on_thread, + "write", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "do_rm_call_async_script_mode", do_rm_call_async, "write", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; diff --git a/tests/unit/moduleapi/async_rm_call.tcl b/tests/unit/moduleapi/async_rm_call.tcl index 142b098f0..4799ea124 100644 --- a/tests/unit/moduleapi/async_rm_call.tcl +++ b/tests/unit/moduleapi/async_rm_call.tcl @@ -15,7 +15,18 @@ start_server {tags {"modules"}} { assert_equal {0} [r llen l] } - foreach cmd {do_rm_call_async do_rm_call_async_script_mode} { + test "Blpop on threaded async RM_Call" { + set rd [redis_deferring_client] + + $rd do_rm_call_async_on_thread blpop l 0 + wait_for_blocked_clients_count 1 + r lpush l a + assert_equal [$rd read] {l a} + wait_for_blocked_clients_count 0 + $rd close + } + + foreach cmd {do_rm_call_async do_rm_call_async_script_mode } { test "Blpop on async RM_Call using $cmd" { set rd [redis_deferring_client] @@ -25,6 +36,7 @@ start_server {tags {"modules"}} { r lpush l a assert_equal [$rd read] {l a} wait_for_blocked_clients_count 0 + $rd close } test "Brpop on async RM_Call using $cmd" { @@ -35,6 +47,7 @@ start_server {tags {"modules"}} { r lpush l a assert_equal [$rd read] {l a} wait_for_blocked_clients_count 0 + $rd close } test "Brpoplpush on async RM_Call using $cmd" { @@ -45,6 +58,7 @@ start_server {tags {"modules"}} { r lpush l1 a assert_equal [$rd read] {a} wait_for_blocked_clients_count 0 + $rd close r lpop l2 } {a} @@ -56,6 +70,7 @@ start_server {tags {"modules"}} { r lpush l1 a assert_equal [$rd read] {a} wait_for_blocked_clients_count 0 + $rd close r lpop l2 } {a} @@ -67,6 +82,7 @@ start_server {tags {"modules"}} { r zadd s 10 foo assert_equal [$rd read] {s foo 10} wait_for_blocked_clients_count 0 + $rd close } test "Bzpopmax on async RM_Call using $cmd" { @@ -77,6 +93,7 @@ start_server {tags {"modules"}} { r zadd s 10 foo assert_equal [$rd read] {s foo 10} wait_for_blocked_clients_count 0 + $rd close } } @@ -88,6 +105,7 @@ start_server {tags {"modules"}} { r lpush l a assert_equal [$rd read] {l a} wait_for_blocked_clients_count 0 + $rd close } test {Test multiple async RM_Call waiting on the same event} { @@ -101,6 +119,8 @@ start_server {tags {"modules"}} { assert_equal [$rd1 read] {l element} assert_equal [$rd2 read] {l element} wait_for_blocked_clients_count 0 + $rd1 close + $rd2 close } test {async RM_Call calls RM_Call} { @@ -136,6 +156,7 @@ start_server {tags {"modules"}} { } wait_for_blocked_clients_count 0 + $rd close } test {Become replica while having async RM_Call running} { @@ -156,6 +177,7 @@ start_server {tags {"modules"}} { r lpush l 1 # make sure the async rm_call was aborted assert_equal [r llen l] {1} + $rd close } test {Pipeline with blocking RM_Call} { @@ -175,6 +197,7 @@ start_server {tags {"modules"}} { assert_equal [$rd read] {PONG} wait_for_blocked_clients_count 0 + $rd close } test {blocking RM_Call abort} { @@ -195,6 +218,7 @@ start_server {tags {"modules"}} { r lpush l 1 # make sure the async rm_call was aborted assert_equal [r llen l] {1} + $rd close } } @@ -220,6 +244,7 @@ start_server {tags {"modules"}} { close_replication_stream $repl wait_for_blocked_clients_count 0 + $rd close } test {Test unblock handler are executed as a unit} { @@ -245,6 +270,7 @@ start_server {tags {"modules"}} { close_replication_stream $repl wait_for_blocked_clients_count 0 + $rd close } test {Test no propagation of blocking command} { @@ -269,6 +295,7 @@ start_server {tags {"modules"}} { close_replication_stream $repl wait_for_blocked_clients_count 0 + $rd close } } @@ -303,6 +330,7 @@ start_server {tags {"modules"}} { close_replication_stream $repl wait_for_blocked_clients_count 0 + $rd close } test {Test unblock handler are executed as a unit with lazy expire} { @@ -355,9 +383,10 @@ start_server {tags {"modules"}} { } close_replication_stream $repl r DEBUG SET-ACTIVE-EXPIRE 1 + + wait_for_blocked_clients_count 0 + $rd close } - - wait_for_blocked_clients_count 0 } start_server {tags {"modules"}} { @@ -376,5 +405,6 @@ start_server {tags {"modules"}} { assert_equal [$rd read] {4} wait_for_blocked_clients_count 0 + $rd close } -} \ No newline at end of file +}