mirror of https://mirror.osredm.com/root/redis.git
Have consistent behavior of SPUBLISH within multi/exec like regular command (#13276)
This PR is based on the commits from PR #12944. Allow SPUBLISH command within multi/exec on replica Behavior on unstable: ``` 127.0.0.1:6380> CLUSTER NODES 39ce8aa20f1f0d91f1a88d976ee1926dfefcdf1a 127.0.0.1:6380@16380 myself,slave 8b0feb120b68aac489d6a5af9c77dc40d71bc792 0 0 0 connected 8b0feb120b68aac489d6a5af9c77dc40d71bc792 127.0.0.1:6379@16379 master - 0 1705091681202 0 connected 0-16383 127.0.0.1:6380> SPUBLISH hello world (integer) 0 127.0.0.1:6380> MULTI OK 127.0.0.1:6380(TX)> SPUBLISH hello world QUEUED 127.0.0.1:6380(TX)> EXEC (error) MOVED 866 127.0.0.1:6379 ``` With this change: ``` 127.0.0.1:6380> SPUBLISH hello world (integer) 0 127.0.0.1:6380> MULTI OK 127.0.0.1:6380(TX)> SPUBLISH hello world QUEUED 127.0.0.1:6380(TX)> EXEC 1) (integer) 0 ``` --------- Co-authored-by: Harkrishn Patro <harkrisp@amazon.com> Co-authored-by: oranagra <oran@redislabs.com>
This commit is contained in:
parent
f1b0212917
commit
9ffc35c98e
|
@ -4,6 +4,8 @@
|
||||||
*
|
*
|
||||||
* Licensed under your choice of the Redis Source Available License 2.0
|
* Licensed under your choice of the Redis Source Available License 2.0
|
||||||
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
|
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
|
||||||
|
*
|
||||||
|
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -978,6 +980,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||||
multiCmd mc;
|
multiCmd mc;
|
||||||
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
|
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
|
||||||
existing_keys = 0;
|
existing_keys = 0;
|
||||||
|
int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */
|
||||||
|
|
||||||
/* Allow any key to be set if a module disabled cluster redirections. */
|
/* Allow any key to be set if a module disabled cluster redirections. */
|
||||||
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
|
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
|
||||||
|
@ -1009,10 +1012,6 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||||
mc.cmd = cmd;
|
mc.cmd = cmd;
|
||||||
}
|
}
|
||||||
|
|
||||||
int is_pubsubshard = cmd->proc == ssubscribeCommand ||
|
|
||||||
cmd->proc == sunsubscribeCommand ||
|
|
||||||
cmd->proc == spublishCommand;
|
|
||||||
|
|
||||||
/* Check that all the keys are in the same hash slot, and obtain this
|
/* Check that all the keys are in the same hash slot, and obtain this
|
||||||
* slot and the node associated. */
|
* slot and the node associated. */
|
||||||
for (i = 0; i < ms->count; i++) {
|
for (i = 0; i < ms->count; i++) {
|
||||||
|
@ -1025,6 +1024,13 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||||
margc = ms->commands[i].argc;
|
margc = ms->commands[i].argc;
|
||||||
margv = ms->commands[i].argv;
|
margv = ms->commands[i].argv;
|
||||||
|
|
||||||
|
/* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
|
||||||
|
if (!pubsubshard_included &&
|
||||||
|
doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE))
|
||||||
|
{
|
||||||
|
pubsubshard_included = 1;
|
||||||
|
}
|
||||||
|
|
||||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||||
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
|
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
|
||||||
keyindex = result.keys;
|
keyindex = result.keys;
|
||||||
|
@ -1088,7 +1094,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||||
* node until the migration completes with CLUSTER SETSLOT <slot>
|
* node until the migration completes with CLUSTER SETSLOT <slot>
|
||||||
* NODE <node-id>. */
|
* NODE <node-id>. */
|
||||||
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
|
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
|
||||||
if ((migrating_slot || importing_slot) && !is_pubsubshard)
|
if ((migrating_slot || importing_slot) && !pubsubshard_included)
|
||||||
{
|
{
|
||||||
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
|
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
|
||||||
else existing_keys++;
|
else existing_keys++;
|
||||||
|
@ -1105,7 +1111,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||||
/* Cluster is globally down but we got keys? We only serve the request
|
/* Cluster is globally down but we got keys? We only serve the request
|
||||||
* if it is a read command and when allow_reads_when_down is enabled. */
|
* if it is a read command and when allow_reads_when_down is enabled. */
|
||||||
if (!isClusterHealthy()) {
|
if (!isClusterHealthy()) {
|
||||||
if (is_pubsubshard) {
|
if (pubsubshard_included) {
|
||||||
if (!server.cluster_allow_pubsubshard_when_down) {
|
if (!server.cluster_allow_pubsubshard_when_down) {
|
||||||
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
|
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1168,7 +1174,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
|
||||||
* is serving, we can reply without redirection. */
|
* is serving, we can reply without redirection. */
|
||||||
int is_write_command = (cmd_flags & CMD_WRITE) ||
|
int is_write_command = (cmd_flags & CMD_WRITE) ||
|
||||||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
|
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
|
||||||
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
|
if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
|
||||||
!is_write_command &&
|
!is_write_command &&
|
||||||
clusterNodeIsSlave(myself) &&
|
clusterNodeIsSlave(myself) &&
|
||||||
clusterNodeGetSlaveof(myself) == n)
|
clusterNodeGetSlaveof(myself) == n)
|
||||||
|
|
|
@ -109,6 +109,7 @@ set ::all_tests {
|
||||||
unit/cluster/links
|
unit/cluster/links
|
||||||
unit/cluster/cluster-response-tls
|
unit/cluster/cluster-response-tls
|
||||||
unit/cluster/failure-marking
|
unit/cluster/failure-marking
|
||||||
|
unit/cluster/sharded-pubsub
|
||||||
}
|
}
|
||||||
# Index to the next test to run in the ::all_tests list.
|
# Index to the next test to run in the ::all_tests list.
|
||||||
set ::next_test 0
|
set ::next_test 0
|
||||||
|
|
|
@ -0,0 +1,66 @@
|
||||||
|
#
|
||||||
|
# Copyright (c) 2009-Present, Redis Ltd.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# Licensed under your choice of the Redis Source Available License 2.0
|
||||||
|
# (RSALv2) or the Server Side Public License v1 (SSPLv1).
|
||||||
|
#
|
||||||
|
# Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
|
||||||
|
#
|
||||||
|
|
||||||
|
start_cluster 1 1 {tags {external:skip cluster}} {
|
||||||
|
set primary_id 0
|
||||||
|
set replica1_id 1
|
||||||
|
|
||||||
|
set primary [Rn $primary_id]
|
||||||
|
set replica [Rn $replica1_id]
|
||||||
|
|
||||||
|
test "Sharded pubsub publish behavior within multi/exec" {
|
||||||
|
foreach {node} {primary replica} {
|
||||||
|
set node [set $node]
|
||||||
|
$node MULTI
|
||||||
|
$node SPUBLISH ch1 "hello"
|
||||||
|
$node EXEC
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Sharded pubsub within multi/exec with cross slot operation" {
|
||||||
|
$primary MULTI
|
||||||
|
$primary SPUBLISH ch1 "hello"
|
||||||
|
$primary GET foo
|
||||||
|
catch {[$primary EXEC]} err
|
||||||
|
assert_match {CROSSSLOT*} $err
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Sharded pubsub publish behavior within multi/exec with read operation on primary" {
|
||||||
|
$primary MULTI
|
||||||
|
$primary SPUBLISH foo "hello"
|
||||||
|
$primary GET foo
|
||||||
|
$primary EXEC
|
||||||
|
} {0 {}}
|
||||||
|
|
||||||
|
test "Sharded pubsub publish behavior within multi/exec with read operation on replica" {
|
||||||
|
$replica MULTI
|
||||||
|
$replica SPUBLISH foo "hello"
|
||||||
|
catch {[$replica GET foo]} err
|
||||||
|
assert_match {MOVED*} $err
|
||||||
|
catch {[$replica EXEC]} err
|
||||||
|
assert_match {EXECABORT*} $err
|
||||||
|
}
|
||||||
|
|
||||||
|
test "Sharded pubsub publish behavior within multi/exec with write operation on primary" {
|
||||||
|
$primary MULTI
|
||||||
|
$primary SPUBLISH foo "hello"
|
||||||
|
$primary SET foo bar
|
||||||
|
$primary EXEC
|
||||||
|
} {0 OK}
|
||||||
|
|
||||||
|
test "Sharded pubsub publish behavior within multi/exec with write operation on replica" {
|
||||||
|
$replica MULTI
|
||||||
|
$replica SPUBLISH foo "hello"
|
||||||
|
catch {[$replica SET foo bar]} err
|
||||||
|
assert_match {MOVED*} $err
|
||||||
|
catch {[$replica EXEC]} err
|
||||||
|
assert_match {EXECABORT*} $err
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue