Only mark the client reprocessing flag when unblocked on keys (#14165)

This PR is based on https://github.com/valkey-io/valkey/pull/2109

When we refactored the blocking framework we introduced the client
reprocessing infrastructure. In cases the client was blocked on keys, it
will attempt to reprocess the command. One challenge was to keep track
of the command timeout, since we are reprocessing and do not want to
re-register the client with a fresh timeout each time. The solution was
to consider the client reprocessing flag when the client is
blockedOnKeys:

```c
    if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
        /* If the client is re-processing the command, we do not set the timeout
         * because we need to retain the client's original timeout. */
        c->bstate.timeout = timeout;
    }
```

However, this introduced a new issue. There are cases where the client
will consecutive blocking of different types for example:
```
CLIENT PAUSE 10000 ALL
BZPOPMAX zset 1
```
would have the client blocked on the zset endlessly if nothing will be
written to it.

**Credits to @uriyage for locating this with his fuzzer testing**

The suggested solution is to only flag the client when it is
specifically unblocked on keys.

Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
debing.sun 2025-07-21 20:05:47 +08:00 committed by GitHub
parent a4ff8d6ab6
commit 45c8fcc992
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 43 additions and 14 deletions

View File

@ -371,7 +371,7 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;
if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
if (!(c->flags & CLIENT_REEXECUTING_COMMAND)) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
@ -657,6 +657,7 @@ static void unblockClientOnKey(client *c, robj *key) {
* we need to re process the command again */
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
c->flags |= CLIENT_REEXECUTING_COMMAND;
/* We want the command processing and the unblock handler (see RM_Call 'K' option)
* to run atomically, this is why we must enter the execution unit here before
* running the command, and exit the execution unit after calling the unblock handler (if exists).
@ -675,6 +676,8 @@ static void unblockClientOnKey(client *c, robj *key) {
}
exitExecutionUnit();
afterCommand(c);
/* Clear the CLIENT_REEXECUTING_COMMAND flag after the proc is executed. */
c->flags &= ~CLIENT_REEXECUTING_COMMAND;
server.current_client = old_client;
}
}

View File

@ -3718,7 +3718,7 @@ void call(client *c, int flags) {
* and a client which is reprocessing command again (after being unblocked).
* Blocked clients can be blocked in different places and not always it means the call() function has been
* called. For example this is required for avoiding double logging to monitors.*/
int reprocessing_command = flags & CMD_CALL_REPROCESSING;
int reprocessing_command = (c->flags & CLIENT_REEXECUTING_COMMAND) ? 1 : 0;
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
@ -3745,20 +3745,12 @@ void call(client *c, int flags) {
* re-processing and unblock the client.*/
c->flags |= CLIENT_EXECUTING_COMMAND;
/* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual
* processing of the command proc, the client is aware that it is being
* re-processed. */
if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND;
monotime monotonic_start = 0;
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();
c->cmd->proc(c);
/* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */
if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND;
exitExecutionUnit();
/* In case client is blocked after trying to execute the command,
@ -4422,7 +4414,6 @@ int processCommand(client *c) {
addReply(c,shared.queued);
} else {
int flags = CMD_CALL_FULL;
if (client_reprocessing_command) flags |= CMD_CALL_REPROCESSING;
call(c,flags);
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand())
handleClientsBlockedOnKeys();

View File

@ -426,7 +426,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
auth had been authenticated from the Module. */
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
#define CLIENT_REEXECUTING_COMMAND (1ULL<<50) /* The client is re-executing the command. */
#define CLIENT_REPL_RDB_CHANNEL (1ULL<<51) /* Client which is used for rdb delivery as part of rdb channel replication */
#define CLIENT_INTERNAL (1ULL<<52) /* Internal client connection */
@ -686,8 +686,7 @@ typedef enum {
#define CMD_CALL_NONE 0
#define CMD_CALL_PROPAGATE_AOF (1<<0)
#define CMD_CALL_PROPAGATE_REPL (1<<1)
#define CMD_CALL_REPROCESSING (1<<2)
#define CMD_CALL_FROM_MODULE (1<<3) /* From RM_Call */
#define CMD_CALL_FROM_MODULE (1<<2) /* From RM_Call */
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
#define CMD_CALL_FULL (CMD_CALL_PROPAGATE)

View File

@ -1,3 +1,17 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of (a) the Redis Source Available License 2.0
# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
# GNU Affero General Public License v3 (AGPLv3).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
# check functionality compression of plain and packed nodes
start_server [list overrides [list save ""] ] {
r config set list-compress-depth 2
@ -2428,4 +2442,26 @@ foreach {pop} {BLPOP BLMPOP_RIGHT} {
close_replication_stream $repl
} {} {needs:repl}
test "Blocking timeout following PAUSE should honor the timeout" {
# cleanup first
r del mylist
# create a test client
set rd [redis_deferring_client]
# first PAUSE all writes for a very long time
r client pause 10000000000000 write
# block a client on the list
$rd BLPOP mylist 1
wait_for_blocked_clients_count 1
# now unpause the writes
r client unpause
# client should time-out
wait_for_blocked_clients_count 0
$rd close
}
} ;# stop servers