diff --git a/src/blocked.c b/src/blocked.c index 2010805ef..3ba77a184 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -214,6 +214,23 @@ void unblockClient(client *c, int queue_for_reprocessing) { if (queue_for_reprocessing) queueClientForReprocessing(c); } +/* Check if the specified client can be safely timed out using + * unblockClientOnTimeout(). */ +int blockedClientMayTimeout(client *c) { + if (c->bstate.btype == BLOCKED_MODULE) { + return moduleBlockedClientMayTimeout(c); + } + + if (c->bstate.btype == BLOCKED_LIST || + c->bstate.btype == BLOCKED_ZSET || + c->bstate.btype == BLOCKED_STREAM || + c->bstate.btype == BLOCKED_WAIT) + { + return 1; + } + return 0; +} + /* This function gets called when a blocked client timed out in order to * send it a reply of some kind. After this function is called, * unblockClient() will be called with the same client as argument. */ diff --git a/src/networking.c b/src/networking.c index 9c2f1641d..23f1c2183 100644 --- a/src/networking.c +++ b/src/networking.c @@ -3712,11 +3712,12 @@ NULL if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL) != C_OK) return; struct client *target = lookupClientByID(id); - /* Note that we never try to unblock a client blocked on a module command, which + /* Note that we never try to unblock a client blocked on a module command, + * or a client blocked by CLIENT PAUSE or some other blocking type which * doesn't have a timeout callback (even in the case of UNBLOCK ERROR). * The reason is that we assume that if a command doesn't expect to be timedout, * it also doesn't expect to be unblocked by CLIENT UNBLOCK */ - if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) { + if (target && target->flags & CLIENT_BLOCKED && blockedClientMayTimeout(target)) { if (unblock_error) unblockClientOnError(target, "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); diff --git a/src/server.h b/src/server.h index ab00134f6..88427990f 100644 --- a/src/server.h +++ b/src/server.h @@ -3817,6 +3817,7 @@ void unblockClient(client *c, int queue_for_reprocessing); void unblockClientOnTimeout(client *c); void unblockClientOnError(client *c, const char *err_str); void queueClientForReprocessing(client *c); +int blockedClientMayTimeout(client *c); void replyToBlockedClientTimedOut(client *c); int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit); void disconnectAllBlockedClients(void); diff --git a/tests/unit/pause.tcl b/tests/unit/pause.tcl index 5f4e92cae..5e5dfac05 100644 --- a/tests/unit/pause.tcl +++ b/tests/unit/pause.tcl @@ -1,3 +1,17 @@ +# +# Copyright (c) 2009-Present, Redis Ltd. +# All rights reserved. +# +# Copyright (c) 2024-present, Valkey contributors. +# All rights reserved. +# +# Licensed under your choice of (a) the Redis Source Available License 2.0 +# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the +# GNU Affero General Public License v3 (AGPLv3). +# +# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. +# + start_server {tags {"pause network"}} { test "Test read commands are not blocked by client pause" { r client PAUSE 100000 WRITE @@ -379,6 +393,36 @@ start_server {tags {"pause network"}} { assert_equal [r randomkey] {} } + test "CLIENT UNBLOCK is not allow to unblock client blocked by CLIENT PAUSE" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + $rd1 client id + $rd2 client id + set client_id1 [$rd1 read] + set client_id2 [$rd2 read] + + r del mylist + r client pause 100000 write + $rd1 blpop mylist 0 + $rd2 blpop mylist 0 + wait_for_blocked_clients_count 2 50 100 + + # This used to trigger a panic. + assert_equal 0 [r client unblock $client_id1 timeout] + # THis used to return a UNBLOCKED error. + assert_equal 0 [r client unblock $client_id2 error] + + # After the unpause, it must be able to unblock the client. + r client unpause + assert_equal 1 [r client unblock $client_id1 timeout] + assert_equal 1 [r client unblock $client_id2 error] + assert_equal {} [$rd1 read] + assert_error "UNBLOCKED*" {$rd2 read} + + $rd1 close + $rd2 close + } + # Make sure we unpause at the end r client unpause }