mirror of https://mirror.osredm.com/root/redis.git
Module unblock on keys: updateStatsOnUnblock is called twice (#13405)
This commit reverts the deletion of the condition `!bc->blocked_on_keys` that was accidentally introduced by https://github.com/redis/redis/pull/12817. In case a blocked-on-keys module client is unblocked both `moduleUnblockClientOnKey` and `moduleHandleBlockedClients` are called which resulted in `updateStatsOnUnblock` being called twice Now, that `moduleHandleBlockedClients` doesn't call `updateStatsOnUnblock` in case of unblocked module key-blocked clients, in the unlikely event that the module decides to call `RM_UnblockClient` on a key-blocked client, we need to call `updateStatsOnUnblock` from within `moduleBlockedClientTimedOut`, but since `moduleBlockedClientTimedOut` is not tread-safe we can't call it directly from withing `RM_UnblockClient`. Added a new flag `blocked_on_keys_explicit_unblock` for that specific case, which will cause `moduleBlockedClientTimedOut` to be called from `moduleHandleBlockedClients` (which is only called from the main thread) --------- Co-authored-by: debing.sun <debing.sun@redis.com>
This commit is contained in:
parent
ffff7fea7c
commit
81440a333d
|
@ -223,7 +223,7 @@ void replyToBlockedClientTimedOut(client *c) {
|
|||
addReplyLongLong(c,server.fsynced_reploff >= c->bstate.reploffset);
|
||||
addReplyLongLong(c,replicationCountAOFAcksByOffset(c->bstate.reploffset));
|
||||
} else if (c->bstate.btype == BLOCKED_MODULE) {
|
||||
moduleBlockedClientTimedOut(c, 0);
|
||||
moduleBlockedClientTimedOut(c);
|
||||
} else {
|
||||
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
|
||||
}
|
||||
|
|
44
src/module.c
44
src/module.c
|
@ -262,6 +262,10 @@ typedef struct RedisModuleBlockedClient {
|
|||
monotime background_timer; /* Timer tracking the start of background work */
|
||||
uint64_t background_duration; /* Current command background time duration.
|
||||
Used for measuring latency of blocking cmds */
|
||||
int blocked_on_keys_explicit_unblock; /* Set to 1 only in the case of an explicit RM_Unblock on
|
||||
* a client that is blocked on keys. In this case we will
|
||||
* call the timeout call back from within
|
||||
* moduleHandleBlockedClients which runs from the main thread */
|
||||
} RedisModuleBlockedClient;
|
||||
|
||||
/* This is a list of Module Auth Contexts. Each time a Module registers a callback, a new ctx is
|
||||
|
@ -7782,7 +7786,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
|||
int islua = scriptIsRunning();
|
||||
int ismulti = server.in_exec;
|
||||
|
||||
c->bstate.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
||||
c->bstate.module_blocked_handle = zcalloc(sizeof(RedisModuleBlockedClient));
|
||||
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
|
||||
ctx->module->blocked_clients++;
|
||||
|
||||
|
@ -8261,7 +8265,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
|
|||
* argument, but better to be safe than sorry. */
|
||||
if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
|
||||
if (bc->unblocked) return REDISMODULE_OK;
|
||||
if (bc->client) moduleBlockedClientTimedOut(bc->client, 1);
|
||||
if (bc->client) bc->blocked_on_keys_explicit_unblock = 1;
|
||||
}
|
||||
moduleUnblockClientByHandle(bc,privdata);
|
||||
return REDISMODULE_OK;
|
||||
|
@ -8339,6 +8343,10 @@ void moduleHandleBlockedClients(void) {
|
|||
reply_us = elapsedUs(replyTimer);
|
||||
moduleFreeContext(&ctx);
|
||||
}
|
||||
if (c && bc->blocked_on_keys_explicit_unblock) {
|
||||
serverAssert(bc->blocked_on_keys);
|
||||
moduleBlockedClientTimedOut(c);
|
||||
}
|
||||
/* Hold onto the blocked client if module auth is in progress. The reply callback is invoked
|
||||
* when the client is reprocessed. */
|
||||
if (c && clientHasModuleAuthInProgress(c)) {
|
||||
|
@ -8359,11 +8367,12 @@ void moduleHandleBlockedClients(void) {
|
|||
/* Update stats now that we've finished the blocking operation.
|
||||
* This needs to be out of the reply callback above given that a
|
||||
* module might not define any callback and still do blocking ops.
|
||||
*
|
||||
* If the module is blocked on keys updateStatsOnUnblock will be
|
||||
* called from moduleUnblockClientOnKey
|
||||
*/
|
||||
if (c && !clientHasModuleAuthInProgress(c)) {
|
||||
int had_errors = c->deferred_reply_errors ? !!listLength(c->deferred_reply_errors) :
|
||||
(server.stat_total_error_replies != prev_error_replies);
|
||||
updateStatsOnUnblock(c, bc->background_duration, reply_us, had_errors);
|
||||
if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) {
|
||||
updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies);
|
||||
}
|
||||
|
||||
if (c != NULL) {
|
||||
|
@ -8418,31 +8427,19 @@ int moduleBlockedClientMayTimeout(client *c) {
|
|||
* does not need to do any cleanup. Eventually the module will call the
|
||||
* API to unblock the client and the memory will be released.
|
||||
*
|
||||
* If this function is called from a module, we handle the timeout callback
|
||||
* and the update of the unblock status in a thread-safe manner to avoid race
|
||||
* conditions with the main thread.
|
||||
* If this function is called from the main thread, we must handle the unblocking
|
||||
* This function should only be called from the main thread, we must handle the unblocking
|
||||
* of the client synchronously. This ensures that we can reply to the client before
|
||||
* resetClient() is called. */
|
||||
void moduleBlockedClientTimedOut(client *c, int from_module) {
|
||||
void moduleBlockedClientTimedOut(client *c) {
|
||||
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
|
||||
|
||||
/* Protect against re-processing: don't serve clients that are already
|
||||
* in the unblocking list for any reason (including RM_UnblockClient()
|
||||
* explicit call). See #6798. */
|
||||
if (bc->unblocked) return;
|
||||
|
||||
RedisModuleCtx ctx;
|
||||
int flags = REDISMODULE_CTX_BLOCKED_TIMEOUT;
|
||||
if (from_module) flags |= REDISMODULE_CTX_THREAD_SAFE;
|
||||
moduleCreateContext(&ctx, bc->module, flags);
|
||||
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT);
|
||||
ctx.client = bc->client;
|
||||
ctx.blocked_client = bc;
|
||||
ctx.blocked_privdata = bc->privdata;
|
||||
|
||||
long long prev_error_replies;
|
||||
if (!from_module)
|
||||
prev_error_replies = server.stat_total_error_replies;
|
||||
long long prev_error_replies = server.stat_total_error_replies;
|
||||
|
||||
if (bc->timeout_callback) {
|
||||
/* In theory, the user should always pass the timeout handler as an
|
||||
|
@ -8452,8 +8449,7 @@ void moduleBlockedClientTimedOut(client *c, int from_module) {
|
|||
|
||||
moduleFreeContext(&ctx);
|
||||
|
||||
if (!from_module)
|
||||
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);
|
||||
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);
|
||||
|
||||
/* For timeout events, we do not want to call the disconnect callback,
|
||||
* because the blocked client will be automatically disconnected in
|
||||
|
|
|
@ -2521,7 +2521,7 @@ void moduleCallCommandUnblockedHandler(client *c);
|
|||
int isModuleClientUnblocked(client *c);
|
||||
void unblockClientFromModule(client *c);
|
||||
void moduleHandleBlockedClients(void);
|
||||
void moduleBlockedClientTimedOut(client *c, int from_module);
|
||||
void moduleBlockedClientTimedOut(client *c);
|
||||
void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
size_t moduleCount(void);
|
||||
void moduleAcquireGIL(void);
|
||||
|
|
Loading…
Reference in New Issue