diff --git a/src/cluster.c b/src/cluster.c index feb8f8d21..7389de710 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2626,9 +2626,9 @@ int clusterProcessPacket(clusterLink *link) { resetManualFailover(); server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; - pauseClients(PAUSE_DURING_FAILOVER, + pauseActions(PAUSE_DURING_FAILOVER, now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT), - CLIENT_PAUSE_WRITE); + PAUSE_ACTIONS_CLIENT_WRITE_SET); serverLog(LL_WARNING,"Manual failover requested by replica %.40s.", sender->name); /* We need to send a ping message to the replica, as it would carry @@ -3932,9 +3932,9 @@ void clusterHandleSlaveMigration(int max_slaves) { * startup or to abort a manual failover in progress. */ void resetManualFailover(void) { if (server.cluster->mf_slave) { - /* We were a master failing over, so we paused clients. Regardless - * of the outcome we unpause now to allow traffic again. */ - unpauseClients(PAUSE_DURING_FAILOVER); + /* We were a master failing over, so we paused clients and related actions. + * Regardless of the outcome we unpause now to allow traffic again. */ + unpauseActions(PAUSE_DURING_FAILOVER); } server.cluster->mf_end = 0; /* No manual failover in progress. */ server.cluster->mf_can_start = 0; diff --git a/src/db.c b/src/db.c index 58febe0d9..a560ce052 100644 --- a/src/db.c +++ b/src/db.c @@ -1680,11 +1680,10 @@ int expireIfNeeded(redisDb *db, robj *key, int flags) { if (flags & EXPIRE_AVOID_DELETE_EXPIRED) return 1; - /* If clients are paused, we keep the current dataset constant, - * but return to the client what we believe is the right state. Typically, - * at the end of the pause we will properly expire the key OR we will - * have failed over and the new primary will send us the expire. */ - if (checkClientPauseTimeoutAndReturnIfPaused()) return 1; + /* If 'expire' action is paused, for whatever reason, then don't expire any key. + * Typically, at the end of the pause we will properly expire the key OR we + * will have failed over and the new primary will send us the expire. */ + if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return 1; /* Delete the key */ deleteExpiredKeyAndPropagate(db,key); diff --git a/src/evict.c b/src/evict.c index 45ec95f1f..637a8b6c7 100644 --- a/src/evict.c +++ b/src/evict.c @@ -487,10 +487,8 @@ static int isSafeToPerformEvictions(void) { * and just be masters exact copies. */ if (server.masterhost && server.repl_slave_ignore_maxmemory) return 0; - /* When clients are paused the dataset should be static not just from the - * POV of clients not being able to write, but also from the POV of - * expires and evictions of keys not being performed. */ - if (checkClientPauseTimeoutAndReturnIfPaused()) return 0; + /* If 'evict' action is paused, for whatever reason, then return false */ + if (isPausedActionsWithUpdate(PAUSE_ACTION_EVICT)) return 0; return 1; } diff --git a/src/expire.c b/src/expire.c index 852c5f71a..8220f80cf 100644 --- a/src/expire.c +++ b/src/expire.c @@ -135,10 +135,10 @@ void activeExpireCycle(int type) { int dbs_per_call = CRON_DBS_PER_CALL; long long start = ustime(), timelimit, elapsed; - /* When clients are paused the dataset should be static not just from the - * POV of clients not being able to write, but also from the POV of - * expires and evictions of keys not being performed. */ - if (checkClientPauseTimeoutAndReturnIfPaused()) return; + /* If 'expire' action is paused, for whatever reason, then don't expire any key. + * Typically, at the end of the pause we will properly expire the key OR we + * will have failed over and the new primary will send us the expire. */ + if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return; if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit diff --git a/src/module.c b/src/module.c index 9dbaa3ca2..dc17b40df 100644 --- a/src/module.c +++ b/src/module.c @@ -3654,7 +3654,7 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { * periodically in timer callbacks or other periodic callbacks. */ int RM_AvoidReplicaTraffic() { - return checkClientPauseTimeoutAndReturnIfPaused(); + return !!(isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA)); } /* Change the currently selected DB. Returns an error if the id diff --git a/src/networking.c b/src/networking.c index 3a4dc3816..fdd7fc3b7 100644 --- a/src/networking.c +++ b/src/networking.c @@ -38,6 +38,7 @@ #include static void setProtocolError(const char *errstr, client *c); +static void pauseClientsByClient(mstime_t end, int isPauseClientAll); int postponeClientRead(client *c); int ProcessingEventsWhileBlocked = 0; /* See processEventsWhileBlocked(). */ @@ -3148,20 +3149,18 @@ NULL addReplyNull(c); } else if (!strcasecmp(c->argv[1]->ptr,"unpause") && c->argc == 2) { /* CLIENT UNPAUSE */ - unpauseClients(PAUSE_BY_CLIENT_COMMAND); + unpauseActions(PAUSE_BY_CLIENT_COMMAND); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"pause") && (c->argc == 3 || c->argc == 4)) { /* CLIENT PAUSE TIMEOUT [WRITE|ALL] */ mstime_t end; - int type = CLIENT_PAUSE_ALL; + int isPauseClientAll = 1; if (c->argc == 4) { if (!strcasecmp(c->argv[3]->ptr,"write")) { - type = CLIENT_PAUSE_WRITE; - } else if (!strcasecmp(c->argv[3]->ptr,"all")) { - type = CLIENT_PAUSE_ALL; - } else { + isPauseClientAll = 0; + } else if (strcasecmp(c->argv[3]->ptr,"all")) { addReplyError(c, "CLIENT PAUSE mode must be WRITE or ALL"); return; @@ -3170,7 +3169,7 @@ NULL if (getTimeoutFromObjectOrReply(c,c->argv[2],&end, UNIT_MILLISECONDS) != C_OK) return; - pauseClients(PAUSE_BY_CLIENT_COMMAND, end, type); + pauseClientsByClient(end, isPauseClientAll); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"tracking") && c->argc >= 3) { /* CLIENT TRACKING (on|off) [REDIRECT ] [BCAST] [PREFIX first] @@ -3816,38 +3815,26 @@ void flushSlavesOutputBuffers(void) { } } -/* Compute current most restrictive pause type and its end time, aggregated for +/* Compute current paused actions and its end time, aggregated for * all pause purposes. */ -static void updateClientPauseTypeAndEndTime(void) { - pause_type old_type = server.client_pause_type; - pause_type type = CLIENT_PAUSE_OFF; - mstime_t end = 0; +void updatePausedActions(void) { + uint32_t prev_paused_actions = server.paused_actions; + server.paused_actions = 0; + for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) { - pause_event *p = server.client_pause_per_purpose[i]; - if (p == NULL) { - /* Nothing to do. */ - } else if (p->end < server.mstime) { - /* This one expired. */ - zfree(p); - server.client_pause_per_purpose[i] = NULL; - } else if (p->type > type) { - /* This type is the most restrictive so far. */ - type = p->type; + pause_event *p = &(server.client_pause_per_purpose[i]); + if (p->end > server.mstime) + server.paused_actions |= p->paused_actions; + else { + p->paused_actions = 0; + p->end = 0; } } - /* Find the furthest end time among the pause purposes of the most - * restrictive type */ - for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) { - pause_event *p = server.client_pause_per_purpose[i]; - if (p != NULL && p->type == type && p->end > end) end = p->end; - } - server.client_pause_type = type; - server.client_pause_end_time = end; - /* If the pause type is less restrictive than before, we unblock all clients * so they are reprocessed (may get re-paused). */ - if (type < old_type) { + uint32_t mask_cli = (PAUSE_ACTION_CLIENT_WRITE|PAUSE_ACTION_CLIENT_ALL); + if ((server.paused_actions & mask_cli) < (prev_paused_actions & mask_cli)) { unblockPostponedClients(); } } @@ -3864,7 +3851,26 @@ void unblockPostponedClients() { } } -/* Pause clients up to the specified unixtime (in ms) for a given type of +/* Set pause-client end-time and restricted action. If already paused, then: + * 1. Keep higher end-time value between configured and the new one + * 2. Keep most restrictive action between configured and the new one */ +static void pauseClientsByClient(mstime_t endTime, int isPauseClientAll) { + uint32_t actions; + pause_event *p = &server.client_pause_per_purpose[PAUSE_BY_CLIENT_COMMAND]; + + if (isPauseClientAll) + actions = PAUSE_ACTIONS_CLIENT_ALL_SET; + else { + actions = PAUSE_ACTIONS_CLIENT_WRITE_SET; + /* If currently configured most restrictive client pause, then keep it */ + if (p->paused_actions & PAUSE_ACTION_CLIENT_ALL) + actions = PAUSE_ACTIONS_CLIENT_ALL_SET; + } + + pauseActions(PAUSE_BY_CLIENT_COMMAND, endTime, actions); +} + +/* Pause actions up to the specified unixtime (in ms) for a given type of * commands. * * A main use case of this function is to allow pausing replication traffic @@ -3875,20 +3881,17 @@ void unblockPostponedClients() { * failover procedure implemented by CLUSTER FAILOVER. * * The function always succeed, even if there is already a pause in progress. - * In such a case, the duration is set to the maximum and new end time and the - * type is set to the more restrictive type of pause. */ -void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) { + * The new paused_actions of a given 'purpose' will override the old ones and + * end time will be updated if new end time is bigger than currently configured */ +void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions) { /* Manage pause type and end time per pause purpose. */ - if (server.client_pause_per_purpose[purpose] == NULL) { - server.client_pause_per_purpose[purpose] = zmalloc(sizeof(pause_event)); - server.client_pause_per_purpose[purpose]->type = type; - server.client_pause_per_purpose[purpose]->end = end; - } else { - pause_event *p = server.client_pause_per_purpose[purpose]; - p->type = max(p->type, type); - p->end = max(p->end, end); - } - updateClientPauseTypeAndEndTime(); + server.client_pause_per_purpose[purpose].paused_actions = actions; + + /* If currently configured end time bigger than new one, then keep it */ + if (server.client_pause_per_purpose[purpose].end < end) + server.client_pause_per_purpose[purpose].end = end; + + updatePausedActions(); /* We allow write commands that were queued * up before and after to execute. We need @@ -3899,29 +3902,23 @@ void pauseClients(pause_purpose purpose, mstime_t end, pause_type type) { } } -/* Unpause clients and queue them for reprocessing. */ -void unpauseClients(pause_purpose purpose) { - if (server.client_pause_per_purpose[purpose] == NULL) return; - zfree(server.client_pause_per_purpose[purpose]); - server.client_pause_per_purpose[purpose] = NULL; - updateClientPauseTypeAndEndTime(); +/* Unpause actions and queue them for reprocessing. */ +void unpauseActions(pause_purpose purpose) { + server.client_pause_per_purpose[purpose].end = 0; + server.client_pause_per_purpose[purpose].paused_actions = 0; + updatePausedActions(); } -/* Returns true if clients are paused and false otherwise. */ -int areClientsPaused(void) { - return server.client_pause_type != CLIENT_PAUSE_OFF; +/* Returns bitmask of paused actions */ +uint32_t isPausedActions(uint32_t actions_bitmask) { + return (server.paused_actions & actions_bitmask); } -/* Checks if the current client pause has elapsed and unpause clients - * if it has. Also returns true if clients are now paused and false - * otherwise. */ -int checkClientPauseTimeoutAndReturnIfPaused(void) { - if (!areClientsPaused()) - return 0; - if (server.client_pause_end_time < server.mstime) { - updateClientPauseTypeAndEndTime(); - } - return areClientsPaused(); +/* Returns bitmask of paused actions */ +uint32_t isPausedActionsWithUpdate(uint32_t actions_bitmask) { + if (!(server.paused_actions & actions_bitmask)) return 0; + updatePausedActions(); + return (server.paused_actions & actions_bitmask); } /* This function is called by Redis in order to process a few events from diff --git a/src/replication.c b/src/replication.c index ad05d175a..3674bea05 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3627,7 +3627,7 @@ void replicationCron(void) { ((server.cluster_enabled && server.cluster->mf_end) || server.failover_end_time) && - checkClientPauseTimeoutAndReturnIfPaused(); + isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA); if (!manual_failover_in_progress) { ping_argv[0] = shared.ping; @@ -3865,7 +3865,7 @@ void clearFailoverState() { server.target_replica_host = NULL; server.target_replica_port = 0; server.failover_state = NO_FAILOVER; - unpauseClients(PAUSE_DURING_FAILOVER); + unpauseActions(PAUSE_DURING_FAILOVER); } /* Abort an ongoing failover if one is going on. */ @@ -4014,7 +4014,9 @@ void failoverCommand(client *c) { server.force_failover = force_flag; server.failover_state = FAILOVER_WAIT_FOR_SYNC; /* Cluster failover will unpause eventually */ - pauseClients(PAUSE_DURING_FAILOVER, LLONG_MAX, CLIENT_PAUSE_WRITE); + pauseActions(PAUSE_DURING_FAILOVER, + LLONG_MAX, + PAUSE_ACTIONS_CLIENT_WRITE_SET); addReply(c,shared.ok); } diff --git a/src/server.c b/src/server.c index 11e3fd332..47886ba71 100644 --- a/src/server.c +++ b/src/server.c @@ -1377,8 +1377,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - /* Clear the paused clients state if needed. */ - checkClientPauseTimeoutAndReturnIfPaused(); + /* Clear the paused actions state if needed. */ + updatePausedActions(); /* Replication cron function -- used to reconnect to master, * detect transfer failures, start background RDB transfers and so forth. @@ -1615,7 +1615,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * We also don't send the ACKs while clients are paused, since it can * increment the replication backlog, they'll be sent after the pause * if we are still the master. */ - if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) { + if (server.get_ack_from_slaves && !isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA)) { sendGetackToReplicas(); server.get_ack_from_slaves = 0; } @@ -2448,8 +2448,7 @@ void initServer(void) { server.tracking_pending_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; - server.client_pause_type = CLIENT_PAUSE_OFF; - server.client_pause_end_time = 0; + server.paused_actions = 0; memset(server.client_pause_per_purpose, 0, sizeof(server.client_pause_per_purpose)); server.postponed_clients = listCreate(); @@ -3135,9 +3134,10 @@ static void propagateNow(int dbid, robj **argv, int argc, int target) { if (!shouldPropagate(target)) return; - /* This needs to be unreachable since the dataset should be fixed during - * client pause, otherwise data may be lost during a failover. */ - serverAssert(!(areClientsPaused() && !server.client_pause_in_transaction)); + /* This needs to be unreachable since the dataset should be fixed during + * replica pause (otherwise data may be lost during a failover) */ + serverAssert(!(isPausedActions(PAUSE_ACTION_REPLICA) && + (!server.client_pause_in_transaction))); if (server.aof_state != AOF_OFF && target & PROPAGATE_AOF) feedAppendOnlyFile(dbid,argv,argc); @@ -3959,8 +3959,8 @@ int processCommand(client *c) { /* If the server is paused, block the client until * the pause has ended. Replicas are never paused. */ if (!(c->flags & CLIENT_SLAVE) && - ((server.client_pause_type == CLIENT_PAUSE_ALL) || - (server.client_pause_type == CLIENT_PAUSE_WRITE && is_may_replicate_command))) + ((isPausedActions(PAUSE_ACTION_CLIENT_ALL)) || + ((isPausedActions(PAUSE_ACTION_CLIENT_WRITE)) && is_may_replicate_command))) { c->bpop.timeout = 0; blockClient(c,BLOCKED_POSTPONE); @@ -4072,8 +4072,10 @@ int prepareForShutdown(int flags) { !isReadyToShutdown()) { server.shutdown_mstime = server.mstime + server.shutdown_timeout * 1000; - if (!areClientsPaused()) sendGetackToReplicas(); - pauseClients(PAUSE_DURING_SHUTDOWN, LLONG_MAX, CLIENT_PAUSE_WRITE); + if (!isPausedActions(PAUSE_ACTION_REPLICA)) sendGetackToReplicas(); + pauseActions(PAUSE_DURING_SHUTDOWN, + LLONG_MAX, + PAUSE_ACTIONS_CLIENT_WRITE_SET); serverLog(LL_NOTICE, "Waiting for replicas before shutting down."); return C_ERR; } @@ -4107,7 +4109,7 @@ static void cancelShutdown(void) { server.shutdown_mstime = 0; server.last_sig_received = 0; replyToClientsBlockedOnShutdown(); - unpauseClients(PAUSE_DURING_SHUTDOWN); + unpauseActions(PAUSE_DURING_SHUTDOWN); } /* Returns C_OK if shutdown was aborted and C_ERR if shutdown wasn't ongoing. */ diff --git a/src/server.h b/src/server.h index 80e07fb9d..32b24afc4 100644 --- a/src/server.h +++ b/src/server.h @@ -568,13 +568,22 @@ typedef enum { #define PROPAGATE_AOF 1 #define PROPAGATE_REPL 2 -/* Client pause types, larger types are more restrictive - * pause types than smaller pause types. */ -typedef enum { - CLIENT_PAUSE_OFF = 0, /* Pause no commands */ - CLIENT_PAUSE_WRITE, /* Pause write commands */ - CLIENT_PAUSE_ALL /* Pause all commands */ -} pause_type; +/* Actions pause types */ +#define PAUSE_ACTION_CLIENT_WRITE (1<<0) +#define PAUSE_ACTION_CLIENT_ALL (1<<1) /* must be bigger than PAUSE_ACTION_CLIENT_WRITE */ +#define PAUSE_ACTION_EXPIRE (1<<2) +#define PAUSE_ACTION_EVICT (1<<3) +#define PAUSE_ACTION_REPLICA (1<<4) /* pause replica traffic */ + +/* common sets of actions to pause/unpause */ +#define PAUSE_ACTIONS_CLIENT_WRITE_SET (PAUSE_ACTION_CLIENT_WRITE|\ + PAUSE_ACTION_EXPIRE|\ + PAUSE_ACTION_EVICT|\ + PAUSE_ACTION_REPLICA) +#define PAUSE_ACTIONS_CLIENT_ALL_SET (PAUSE_ACTION_CLIENT_ALL|\ + PAUSE_ACTION_EXPIRE|\ + PAUSE_ACTION_EVICT|\ + PAUSE_ACTION_REPLICA) /* Client pause purposes. Each purpose has its own end time and pause type. */ typedef enum { @@ -585,7 +594,7 @@ typedef enum { } pause_purpose; typedef struct { - pause_type type; + uint32_t paused_actions; /* Bitmask of actions */ mstime_t end; } pause_event; @@ -1531,10 +1540,9 @@ struct redisServer { rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ int in_nested_call; /* If > 0, in a nested call of a call */ rax *clients_index; /* Active clients dictionary by client ID. */ - pause_type client_pause_type; /* True if clients are currently paused */ + uint32_t paused_actions; /* Bitmask of actions that are currently paused */ list *postponed_clients; /* List of postponed clients */ - mstime_t client_pause_end_time; /* Time when we undo clients_paused */ - pause_event *client_pause_per_purpose[NUM_PAUSE_PURPOSES]; + pause_event client_pause_per_purpose[NUM_PAUSE_PURPOSES]; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ @@ -2528,10 +2536,11 @@ void flushSlavesOutputBuffers(void); void disconnectSlaves(void); void evictClients(void); int listenToPort(connListener *fds); -void pauseClients(pause_purpose purpose, mstime_t end, pause_type type); -void unpauseClients(pause_purpose purpose); -int areClientsPaused(void); -int checkClientPauseTimeoutAndReturnIfPaused(void); +void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions_bitmask); +void unpauseActions(pause_purpose purpose); +uint32_t isPausedActions(uint32_t action_bitmask); +uint32_t isPausedActionsWithUpdate(uint32_t action_bitmask); +void updatePausedActions(void); void unblockPostponedClients(); void processEventsWhileBlocked(void); void whileBlockedCron(); diff --git a/tests/unit/pause.tcl b/tests/unit/pause.tcl index 3440c5f9b..2a851bbac 100644 --- a/tests/unit/pause.tcl +++ b/tests/unit/pause.tcl @@ -10,6 +10,45 @@ start_server {tags {"pause network"}} { $rd close } + test "Test old pause-all takes precedence over new pause-write (less restrictive)" { + # Scenario: + # 1. Run 'PAUSE ALL' for 200msec + # 2. Run 'PAUSE WRITE' for 10 msec + # 3. Wait 50msec + # 4. 'GET FOO'. + # Expected that: + # - While the time of the second 'PAUSE' is shorter than first 'PAUSE', + # pause-client feature will stick to the longer one, i.e, will be paused + # up to 200msec. + # - The GET command will be postponed ~200msec, even though last command + # paused only WRITE. This is because the first 'PAUSE ALL' command is + # more restrictive than the second 'PAUSE WRITE' and pause-client feature + # preserve most restrictive configuration among multiple settings. + set rd [redis_deferring_client] + $rd SET FOO BAR + + set test_start_time [clock milliseconds] + r client PAUSE 200 ALL + r client PAUSE 20 WRITE + after 50 + $rd get FOO + set elapsed [expr {[clock milliseconds]-$test_start_time}] + assert_lessthan 200 $elapsed + } + + test "Test new pause time is smaller than old one, then old time preserved" { + r client PAUSE 60000 WRITE + r client PAUSE 10 WRITE + after 100 + set rd [redis_deferring_client] + $rd SET FOO BAR + wait_for_blocked_clients_count 1 100 10 + + r client unpause + assert_match "OK" [$rd read] + $rd close + } + test "Test write commands are paused by RO" { r client PAUSE 60000 WRITE