diff --git a/src/blocked.c b/src/blocked.c index 009e2557b..5ec29eb00 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -223,7 +223,7 @@ void replyToBlockedClientTimedOut(client *c) { addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset); addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset)); } else if (c->bstate.btype == BLOCKED_MODULE) { - moduleBlockedClientTimedOut(c, 0); + moduleBlockedClientTimedOut(c); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } diff --git a/src/module.c b/src/module.c index a1fa6599c..ced079f2f 100644 --- a/src/module.c +++ b/src/module.c @@ -262,6 +262,10 @@ typedef struct RedisModuleBlockedClient { monotime background_timer; /* Timer tracking the start of background work */ uint64_t background_duration; /* Current command background time duration. Used for measuring latency of blocking cmds */ + int blocked_on_keys_explicit_unblock; /* Set to 1 only in the case of an explicit RM_Unblock on + * a client that is blocked on keys. In this case we will + * call the timeout call back from within + * moduleHandleBlockedClients which runs from the main thread */ } RedisModuleBlockedClient; /* This is a list of Module Auth Contexts. Each time a Module registers a callback, a new ctx is @@ -7782,7 +7786,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF int islua = scriptIsRunning(); int ismulti = server.in_exec; - c->bstate.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); + c->bstate.module_blocked_handle = zcalloc(sizeof(RedisModuleBlockedClient)); RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; ctx->module->blocked_clients++; @@ -8261,7 +8265,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { * argument, but better to be safe than sorry. */ if (bc->timeout_callback == NULL) return REDISMODULE_ERR; if (bc->unblocked) return REDISMODULE_OK; - if (bc->client) moduleBlockedClientTimedOut(bc->client, 1); + if (bc->client) bc->blocked_on_keys_explicit_unblock = 1; } moduleUnblockClientByHandle(bc,privdata); return REDISMODULE_OK; @@ -8339,6 +8343,10 @@ void moduleHandleBlockedClients(void) { reply_us = elapsedUs(replyTimer); moduleFreeContext(&ctx); } + if (c && bc->blocked_on_keys_explicit_unblock) { + serverAssert(bc->blocked_on_keys); + moduleBlockedClientTimedOut(c); + } /* Hold onto the blocked client if module auth is in progress. The reply callback is invoked * when the client is reprocessed. */ if (c && clientHasModuleAuthInProgress(c)) { @@ -8359,11 +8367,12 @@ void moduleHandleBlockedClients(void) { /* Update stats now that we've finished the blocking operation. * This needs to be out of the reply callback above given that a * module might not define any callback and still do blocking ops. + * + * If the module is blocked on keys updateStatsOnUnblock will be + * called from moduleUnblockClientOnKey */ - if (c && !clientHasModuleAuthInProgress(c)) { - int had_errors = c->deferred_reply_errors ? !!listLength(c->deferred_reply_errors) : - (server.stat_total_error_replies != prev_error_replies); - updateStatsOnUnblock(c, bc->background_duration, reply_us, had_errors); + if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) { + updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies); } if (c != NULL) { @@ -8418,31 +8427,19 @@ int moduleBlockedClientMayTimeout(client *c) { * does not need to do any cleanup. Eventually the module will call the * API to unblock the client and the memory will be released. * - * If this function is called from a module, we handle the timeout callback - * and the update of the unblock status in a thread-safe manner to avoid race - * conditions with the main thread. - * If this function is called from the main thread, we must handle the unblocking + * This function should only be called from the main thread, we must handle the unblocking * of the client synchronously. This ensures that we can reply to the client before * resetClient() is called. */ -void moduleBlockedClientTimedOut(client *c, int from_module) { +void moduleBlockedClientTimedOut(client *c) { RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; - /* Protect against re-processing: don't serve clients that are already - * in the unblocking list for any reason (including RM_UnblockClient() - * explicit call). See #6798. */ - if (bc->unblocked) return; - RedisModuleCtx ctx; - int flags = REDISMODULE_CTX_BLOCKED_TIMEOUT; - if (from_module) flags |= REDISMODULE_CTX_THREAD_SAFE; - moduleCreateContext(&ctx, bc->module, flags); + moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT); ctx.client = bc->client; ctx.blocked_client = bc; ctx.blocked_privdata = bc->privdata; - long long prev_error_replies; - if (!from_module) - prev_error_replies = server.stat_total_error_replies; + long long prev_error_replies = server.stat_total_error_replies; if (bc->timeout_callback) { /* In theory, the user should always pass the timeout handler as an @@ -8452,8 +8449,7 @@ void moduleBlockedClientTimedOut(client *c, int from_module) { moduleFreeContext(&ctx); - if (!from_module) - updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); + updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); /* For timeout events, we do not want to call the disconnect callback, * because the blocked client will be automatically disconnected in diff --git a/src/server.h b/src/server.h index a0e8ee556..de42dcd7d 100644 --- a/src/server.h +++ b/src/server.h @@ -2521,7 +2521,7 @@ void moduleCallCommandUnblockedHandler(client *c); int isModuleClientUnblocked(client *c); void unblockClientFromModule(client *c); void moduleHandleBlockedClients(void); -void moduleBlockedClientTimedOut(client *c, int from_module); +void moduleBlockedClientTimedOut(client *c); void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask); size_t moduleCount(void); void moduleAcquireGIL(void);