This commit is contained in:
debing.sun 2025-07-17 21:55:03 +07:00 committed by GitHub
commit d9c7911c27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 65 additions and 2 deletions

View File

@ -214,6 +214,23 @@ void unblockClient(client *c, int queue_for_reprocessing) {
if (queue_for_reprocessing) queueClientForReprocessing(c);
}
/* Check if the specified client can be safely timed out using
* unblockClientOnTimeout(). */
int blockedClientMayTimeout(client *c) {
if (c->bstate.btype == BLOCKED_MODULE) {
return moduleBlockedClientMayTimeout(c);
}
if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM ||
c->bstate.btype == BLOCKED_WAIT)
{
return 1;
}
return 0;
}
/* This function gets called when a blocked client timed out in order to
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */

View File

@ -3712,11 +3712,12 @@ NULL
if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
!= C_OK) return;
struct client *target = lookupClientByID(id);
/* Note that we never try to unblock a client blocked on a module command, which
/* Note that we never try to unblock a client blocked on a module command,
* or a client blocked by CLIENT PAUSE or some other blocking type which
* doesn't have a timeout callback (even in the case of UNBLOCK ERROR).
* The reason is that we assume that if a command doesn't expect to be timedout,
* it also doesn't expect to be unblocked by CLIENT UNBLOCK */
if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
if (target && target->flags & CLIENT_BLOCKED && blockedClientMayTimeout(target)) {
if (unblock_error)
unblockClientOnError(target,
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");

View File

@ -3817,6 +3817,7 @@ void unblockClient(client *c, int queue_for_reprocessing);
void unblockClientOnTimeout(client *c);
void unblockClientOnError(client *c, const char *err_str);
void queueClientForReprocessing(client *c);
int blockedClientMayTimeout(client *c);
void replyToBlockedClientTimedOut(client *c);
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit);
void disconnectAllBlockedClients(void);

View File

@ -1,3 +1,17 @@
#
# Copyright (c) 2009-Present, Redis Ltd.
# All rights reserved.
#
# Copyright (c) 2024-present, Valkey contributors.
# All rights reserved.
#
# Licensed under your choice of (a) the Redis Source Available License 2.0
# (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
# GNU Affero General Public License v3 (AGPLv3).
#
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
#
start_server {tags {"pause network"}} {
test "Test read commands are not blocked by client pause" {
r client PAUSE 100000 WRITE
@ -379,6 +393,36 @@ start_server {tags {"pause network"}} {
assert_equal [r randomkey] {}
}
test "CLIENT UNBLOCK is not allow to unblock client blocked by CLIENT PAUSE" {
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
$rd1 client id
$rd2 client id
set client_id1 [$rd1 read]
set client_id2 [$rd2 read]
r del mylist
r client pause 100000 write
$rd1 blpop mylist 0
$rd2 blpop mylist 0
wait_for_blocked_clients_count 2 50 100
# This used to trigger a panic.
assert_equal 0 [r client unblock $client_id1 timeout]
# THis used to return a UNBLOCKED error.
assert_equal 0 [r client unblock $client_id2 error]
# After the unpause, it must be able to unblock the client.
r client unpause
assert_equal 1 [r client unblock $client_id1 timeout]
assert_equal 1 [r client unblock $client_id2 error]
assert_equal {} [$rd1 read]
assert_error "UNBLOCKED*" {$rd2 read}
$rd1 close
$rd2 close
}
# Make sure we unpause at the end
r client unpause
}