diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1107221b7..1d5847389 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -91,8 +91,8 @@ jobs: run: | apt-get update apt-get install -y gnupg2 - echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list - echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list + echo "deb http://archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list + echo "deb http://archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5 apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32 apt-get update diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index 20c6cf55e..7f88ebd89 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -1116,8 +1116,8 @@ jobs: run: | apt-get update apt-get install -y gnupg2 - echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list - echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list + echo "deb http://archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list + echo "deb http://archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5 apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32 apt-get update @@ -1164,8 +1164,8 @@ jobs: run: | apt-get update apt-get install -y gnupg2 - echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list - echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list + echo "deb http://archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list + echo "deb http://archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5 apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32 apt-get update @@ -1218,8 +1218,8 @@ jobs: run: | apt-get update apt-get install -y gnupg2 - echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list - echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list + echo "deb http://archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list + echo "deb http://archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5 apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32 apt-get update diff --git a/.github/workflows/external.yml b/.github/workflows/external.yml index 3cc03198f..da993f668 100644 --- a/.github/workflows/external.yml +++ b/.github/workflows/external.yml @@ -26,7 +26,7 @@ jobs: --tags -slow - name: Archive redis log if: ${{ failure() }} - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-external-redis-log path: external-redis.log @@ -53,7 +53,7 @@ jobs: --tags -slow - name: Archive redis log if: ${{ failure() }} - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-external-cluster-log path: external-redis.log @@ -76,7 +76,7 @@ jobs: --tags "-slow -needs:debug" - name: Archive redis log if: ${{ failure() }} - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-external-redis-log path: external-redis.log diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 569e0cb28..1d14c0415 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -11,6 +11,22 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Redis 7.2.8 Released Wed 23 Apr 2025 12:00:00 IST +================================================================================ + +Update urgency: `SECURITY`: There are security fixes in the release. + +### Security fixes + +* (CVE-2025-21605) An unauthenticated client can cause an unlimited growth of output buffers + +### Bug fixes + +* #12817, #12905 Fix race condition issues between the main thread and module threads +* #13863 `RANDOMKEY` - infinite loop during client pause +* #13877 ShardID inconsistency when both primary and replica support it + ================================================================================ Redis 7.2.7 Released Mon 6 Jan 2025 12:30:00 IDT diff --git a/src/Makefile b/src/Makefile index ecbd2753d..6a8790941 100644 --- a/src/Makefile +++ b/src/Makefile @@ -101,6 +101,9 @@ else ifeq ($(SANITIZER),undefined) MALLOC=libc CFLAGS+=-fsanitize=undefined -fno-sanitize-recover=all -fno-omit-frame-pointer + ifeq (clang,$(CLANG)) + CFLAGS+=-fno-sanitize=function + endif LDFLAGS+=-fsanitize=undefined else ifeq ($(SANITIZER),thread) diff --git a/src/cluster.c b/src/cluster.c index da0301718..765958a0c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -61,6 +61,7 @@ list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); +int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); void clusterSetMaster(clusterNode *n); @@ -963,30 +964,24 @@ clusterNode *clusterNodeGetSlave(clusterNode *node, int slave_idx) { static void updateShardId(clusterNode *node, const char *shard_id) { if (shard_id && memcmp(node->shard_id, shard_id, CLUSTER_NAMELEN) != 0) { - assignShardIdToNode(node, shard_id, CLUSTER_TODO_SAVE_CONFIG); - - /* If the replica or master does not support shard-id (old version), - * we still need to make our best effort to keep their shard-id consistent. + /* We always make our best effort to keep the shard-id consistent + * between the master and its replicas: * - * 1. Master supports but the replica does not. - * We might first update the replica's shard-id to the master's randomly - * generated shard-id. Then, when the master's shard-id arrives, we must - * also update all its replicas. - * 2. If the master does not support but the replica does. - * We also need to synchronize the master's shard-id with the replica. - * 3. If neither of master and replica supports it. - * The master will have a randomly generated shard-id and will update - * the replica to match the master's shard-id. */ + * 1. When updating the master's shard-id, we simultaneously update the + * shard-id of all its replicas to ensure consistency. + * 2. When updating replica's shard-id, if it differs from its master's shard-id, + * we discard this replica's shard-id and continue using master's shard-id. + * This applies even if the master does not support shard-id, in which + * case we rely on the master's randomly generated shard-id. */ if (node->slaveof == NULL) { + assignShardIdToNode(node, shard_id, CLUSTER_TODO_SAVE_CONFIG); for (int i = 0; i < clusterNodeNumSlaves(node); i++) { clusterNode *slavenode = clusterNodeGetSlave(node, i); if (memcmp(slavenode->shard_id, shard_id, CLUSTER_NAMELEN) != 0) assignShardIdToNode(slavenode, shard_id, CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG); } - } else { - clusterNode *masternode = node->slaveof; - if (memcmp(masternode->shard_id, shard_id, CLUSTER_NAMELEN) != 0) - assignShardIdToNode(masternode, shard_id, CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG); + } else if (memcmp(node->slaveof->shard_id, shard_id, CLUSTER_NAMELEN) == 0) { + assignShardIdToNode(node, shard_id, CLUSTER_TODO_SAVE_CONFIG); } } } @@ -3058,7 +3053,50 @@ int clusterProcessPacket(clusterLink *link) { if (nodeIsMaster(sender)) { /* Master turned into a slave! Reconfigure the node. */ - clusterDelNodeSlots(sender); + if (master && !memcmp(master->shard_id, sender->shard_id, CLUSTER_NAMELEN)) { + /* `sender` was a primary and was in the same shard as `master`, its new primary */ + if (sender->configEpoch > senderConfigEpoch) { + serverLog(LL_NOTICE, + "Ignore stale message from %.40s (%s) in shard %.40s;" + " gossip config epoch: %llu, current config epoch: %llu", + sender->name, + sender->human_nodename, + sender->shard_id, + (unsigned long long)senderConfigEpoch, + (unsigned long long)sender->configEpoch); + } else { + /* A failover occurred in the shard where `sender` belongs to and `sender` is no longer + * a primary. Update slot assignment to `master`, which is the new primary in the shard */ + int slots = clusterMoveNodeSlots(sender, master); + /* `master` is still a `slave` in this observer node's view; update its role and configEpoch */ + clusterSetNodeAsMaster(master); + master->configEpoch = senderConfigEpoch; + serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)" + " lost %d slot(s) to node %.40s (%s) with a config epoch of %llu", + sender->shard_id, + sender->name, + sender->human_nodename, + slots, + master->name, + master->human_nodename, + (unsigned long long) master->configEpoch); + } + } else { + /* `sender` was moved to another shard and has become a replica, remove its slot assignment */ + int slots = clusterDelNodeSlots(sender); + serverLog(LL_NOTICE, "Node %.40s (%s) is no longer master of shard %.40s;" + " removed all %d slot(s) it used to own", + sender->name, + sender->human_nodename, + sender->shard_id, + slots); + if (master != NULL) { + serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", + sender->name, + sender->human_nodename, + master->shard_id); + } + } sender->flags &= ~(CLUSTER_NODE_MASTER| CLUSTER_NODE_MIGRATE_TO); sender->flags |= CLUSTER_NODE_SLAVE; @@ -5010,6 +5048,22 @@ int clusterDelSlot(int slot) { return C_OK; } +/* Transfer slots from `from_node` to `to_node`. + * Iterates over all cluster slots, transferring each slot covered by `from_node` to `to_node`. + * Counts and returns the number of slots transferred. */ +int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node) { + int processed = 0; + + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (clusterNodeGetSlotBit(from_node, j)) { + clusterDelSlot(j); + clusterAddSlot(to_node, j); + processed++; + } + } + return processed; +} + /* Delete all the slots associated with the specified node. * The number of deleted slots is returned. */ int clusterDelNodeSlots(clusterNode *node) { diff --git a/src/db.c b/src/db.c index 8f9a436b4..58aa2eab5 100644 --- a/src/db.c +++ b/src/db.c @@ -333,7 +333,7 @@ robj *dbRandomKey(redisDb *db) { key = dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); if (dictFind(db->expires,key)) { - if (allvolatile && server.masterhost && --maxtries == 0) { + if (allvolatile && (server.masterhost || isPausedActions(PAUSE_ACTION_EXPIRE)) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically * expired in the slave, so the function cannot stop because diff --git a/src/defrag.c b/src/defrag.c index 25b4ee45a..1a1ca1bf7 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -662,8 +662,9 @@ void defragStream(redisDb *db, dictEntry *kde) { void defragModule(redisDb *db, dictEntry *kde) { robj *obj = dictGetVal(kde); serverAssert(obj->type == OBJ_MODULE); - - if (!moduleDefragValue(dictGetKey(kde), obj, db->id)) + robj keyobj; + initStaticStringObject(keyobj, dictGetKey(kde)); + if (!moduleDefragValue(&keyobj, obj, db->id)) defragLater(db, kde); } @@ -806,7 +807,9 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int } else if (ob->type == OBJ_STREAM) { return scanLaterStreamListpacks(ob, cursor, endtime); } else if (ob->type == OBJ_MODULE) { - return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, dbid); + robj keyobj; + initStaticStringObject(keyobj, dictGetKey(de)); + return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid); } else { *cursor = 0; /* object type may have changed since we schedule it for later */ } diff --git a/src/module.c b/src/module.c index 1cb418c9b..d58918201 100644 --- a/src/module.c +++ b/src/module.c @@ -282,6 +282,10 @@ typedef struct RedisModuleBlockedClient { monotime background_timer; /* Timer tracking the start of background work */ uint64_t background_duration; /* Current command background time duration. Used for measuring latency of blocking cmds */ + int blocked_on_keys_explicit_unblock; /* Set to 1 only in the case of an explicit RM_Unblock on + * a client that is blocked on keys. In this case we will + * call the timeout call back from within + * moduleHandleBlockedClients which runs from the main thread */ } RedisModuleBlockedClient; /* This is a list of Module Auth Contexts. Each time a Module registers a callback, a new ctx is @@ -306,7 +310,6 @@ static size_t moduleTempClientMinCount = 0; /* Min client count in pool since * allow thread safe contexts to execute commands at a safe moment. */ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER; - /* Function pointer type for keyspace event notification subscriptions from modules. */ typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key); @@ -2294,7 +2297,10 @@ ustime_t RM_CachedMicroseconds(void) { * Within the same command, you can call multiple times * RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() * to accumulate independent time intervals to the background duration. - * This method always return REDISMODULE_OK. */ + * This method always return REDISMODULE_OK. + * + * This function is not thread safe, If used in module thread and blocked callback (possibly main thread) + * simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { elapsedStart(&(bc->background_timer)); return REDISMODULE_OK; @@ -2304,7 +2310,10 @@ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) { * to calculate the elapsed execution time. * On success REDISMODULE_OK is returned. * This method only returns REDISMODULE_ERR if no start time was - * previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */ + * previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). + * + * This function is not thread safe, If used in module thread and blocked callback (possibly main thread) + * simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) { // If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart if (!bc->background_timer) @@ -2363,7 +2372,33 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) { server.busy_module_yield_flags |= BUSY_MODULE_YIELD_CLIENTS; /* Let redis process events */ - processEventsWhileBlocked(); + if (!pthread_equal(server.main_thread_id, pthread_self())) { + /* If we are not in the main thread, we defer event loop processing to the main thread + * after the main thread enters acquiring GIL state in order to protect the event + * loop (ae.c) and avoid potential race conditions. */ + + int acquiring; + atomicGet(server.module_gil_acquring, acquiring); + if (!acquiring) { + /* If the main thread has not yet entered the acquiring GIL state, + * we attempt to wake it up and exit without waiting for it to + * acquire the GIL. This avoids blocking the caller, allowing them to + * continue with unfinished tasks before the next yield. + * We assume the caller keeps the GIL locked. */ + if (write(server.module_pipe[1],"A",1) != 1) { + /* Ignore the error, this is best-effort. */ + } + } else { + /* Release the GIL, yielding CPU to give the main thread an opportunity to start + * event processing, and then acquire the GIL again until the main thread releases it. */ + moduleReleaseGIL(); + sched_yield(); + moduleAcquireGIL(); + } + } else { + /* If we are in the main thread, we can safely process events. */ + processEventsWhileBlocked(); + } server.busy_module_yield_reply = prev_busy_module_yield_reply; /* Possibly restore the previous flags in case of two nested contexts @@ -2647,7 +2682,10 @@ RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisM * pass ctx as NULL when releasing the string (but passing a context will not * create any issue). Strings created with a context should be freed also passing * the context, so if you want to free a string out of context later, make sure - * to create it using a NULL context. */ + * to create it using a NULL context. + * + * This API is not thread safe, access to these retained strings (if they originated + * from a client command arguments) must be done with GIL locked. */ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) { decrRefCount(str); if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str); @@ -2684,7 +2722,10 @@ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) { * * Threaded modules that reference retained strings from other threads *must* * explicitly trim the allocation as soon as the string is retained. Not doing - * so may result with automatic trimming which is not thread safe. */ + * so may result with automatic trimming which is not thread safe. + * + * This API is not thread safe, access to these retained strings (if they originated + * from a client command arguments) must be done with GIL locked. */ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) { /* Increment the string reference counting only if we can't @@ -2726,7 +2767,10 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) { * * Threaded modules that reference held strings from other threads *must* * explicitly trim the allocation as soon as the string is held. Not doing - * so may result with automatic trimming which is not thread safe. */ + * so may result with automatic trimming which is not thread safe. + * + * This API is not thread safe, access to these retained strings (if they originated + * from a client command arguments) must be done with GIL locked. */ RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) { if (str->refcount == OBJ_STATIC_REFCOUNT) { return RM_CreateStringFromString(ctx, str); @@ -7680,7 +7724,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF int islua = scriptIsRunning(); int ismulti = server.in_exec; - c->bstate.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); + c->bstate.module_blocked_handle = zcalloc(sizeof(RedisModuleBlockedClient)); RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; ctx->module->blocked_clients++; @@ -8159,7 +8203,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) { * argument, but better to be safe than sorry. */ if (bc->timeout_callback == NULL) return REDISMODULE_ERR; if (bc->unblocked) return REDISMODULE_OK; - if (bc->client) moduleBlockedClientTimedOut(bc->client); + if (bc->client) bc->blocked_on_keys_explicit_unblock = 1; } moduleUnblockClientByHandle(bc,privdata); return REDISMODULE_OK; @@ -8237,6 +8281,10 @@ void moduleHandleBlockedClients(void) { reply_us = elapsedUs(replyTimer); moduleFreeContext(&ctx); } + if (c && bc->blocked_on_keys_explicit_unblock) { + serverAssert(bc->blocked_on_keys); + moduleBlockedClientTimedOut(c); + } /* Hold onto the blocked client if module auth is in progress. The reply callback is invoked * when the client is reprocessed. */ if (c && clientHasModuleAuthInProgress(c)) { @@ -8257,6 +8305,9 @@ void moduleHandleBlockedClients(void) { /* Update stats now that we've finished the blocking operation. * This needs to be out of the reply callback above given that a * module might not define any callback and still do blocking ops. + * + * If the module is blocked on keys updateStatsOnUnblock will be + * called from moduleUnblockClientOnKey */ if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) { updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies); @@ -8277,7 +8328,7 @@ void moduleHandleBlockedClients(void) { * if there are pending replies here. This is needed since * during a non blocking command the client may receive output. */ if (!clientHasModuleAuthInProgress(c) && clientHasPendingReplies(c) && - !(c->flags & CLIENT_PENDING_WRITE)) + !(c->flags & CLIENT_PENDING_WRITE) && c->conn) { c->flags |= CLIENT_PENDING_WRITE; listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node); @@ -8312,23 +8363,25 @@ int moduleBlockedClientMayTimeout(client *c) { /* Called when our client timed out. After this function unblockClient() * is called, and it will invalidate the blocked client. So this function * does not need to do any cleanup. Eventually the module will call the - * API to unblock the client and the memory will be released. */ + * API to unblock the client and the memory will be released. + * + * This function should only be called from the main thread, we must handle the unblocking + * of the client synchronously. This ensures that we can reply to the client before + * resetClient() is called. */ void moduleBlockedClientTimedOut(client *c) { RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle; - /* Protect against re-processing: don't serve clients that are already - * in the unblocking list for any reason (including RM_UnblockClient() - * explicit call). See #6798. */ - if (bc->unblocked) return; - RedisModuleCtx ctx; moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT); ctx.client = bc->client; ctx.blocked_client = bc; ctx.blocked_privdata = bc->privdata; + long long prev_error_replies = server.stat_total_error_replies; + bc->timeout_callback(&ctx,(void**)c->argv,c->argc); moduleFreeContext(&ctx); + updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies); /* For timeout events, we do not want to call the disconnect callback, @@ -11849,6 +11902,7 @@ void moduleInitModulesSystem(void) { moduleUnblockedClients = listCreate(); server.loadmodule_queue = listCreate(); server.module_configs_queue = dictCreate(&sdsKeyValueHashDictType); + server.module_gil_acquring = 0; modules = dictCreate(&modulesDictType); moduleAuthCallbacks = listCreate(); @@ -12204,7 +12258,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc, int is_loa } if (post_load_err) { - moduleUnload(ctx.module->name, NULL); + serverAssert(moduleUnload(ctx.module->name, NULL, 1) == C_OK); moduleFreeContext(&ctx); return C_ERR; } @@ -12220,14 +12274,17 @@ int moduleLoad(const char *path, void **module_argv, int module_argc, int is_loa /* Unload the module registered with the specified name. On success * C_OK is returned, otherwise C_ERR is returned and errmsg is set - * with an appropriate message. */ -int moduleUnload(sds name, const char **errmsg) { + * with an appropriate message. + * Only forcefully unload this module, passing forced_unload != 0, + * if it is certain that it has not yet been in use (e.g., immediate + * unload on failed load). */ +int moduleUnload(sds name, const char **errmsg, int forced_unload) { struct RedisModule *module = dictFetchValue(modules,name); if (module == NULL) { *errmsg = "no such module with that name"; return C_ERR; - } else if (listLength(module->types)) { + } else if (listLength(module->types) && !forced_unload) { *errmsg = "the module exports one or more module-side data " "types, can't unload"; return C_ERR; @@ -13020,7 +13077,7 @@ NULL } else if (!strcasecmp(subcmd,"unload") && c->argc == 3) { const char *errmsg = NULL; - if (moduleUnload(c->argv[2]->ptr, &errmsg) == C_OK) + if (moduleUnload(c->argv[2]->ptr, &errmsg, 0) == C_OK) addReply(c,shared.ok); else { if (errmsg == NULL) errmsg = "operation not possible."; diff --git a/src/networking.c b/src/networking.c index 9674526be..bb1a72b9f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -413,8 +413,9 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { * to a channel which we are subscribed to, then we wanna postpone that message to be added * after the command's reply (specifically important during multi-exec). the exception is * the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply. - * The check for executing_client also avoids affecting push messages that are part of eviction. */ - if (c == server.current_client && (c->flags & CLIENT_PUSHING) && + * The check for executing_client also avoids affecting push messages that are part of eviction. + * Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */ + if ((c->flags & CLIENT_PUSHING) && c == server.current_client && server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd)) { _addReplyProtoToList(c,server.pending_push_messages,s,len); @@ -1433,7 +1434,7 @@ void unlinkClient(client *c) { listNode *ln; /* If this is marked as current client unset it. */ - if (server.current_client == c) server.current_client = NULL; + if (c->conn && server.current_client == c) server.current_client = NULL; /* Certain operations must be done only if the client has an active connection. * If the client was already unlinked or if it's a "fake client" the @@ -1477,7 +1478,7 @@ void unlinkClient(client *c) { } /* Remove from the list of pending reads if needed. */ - serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + serverAssert(!c->conn || io_threads_op == IO_THREADS_OP_IDLE); if (c->pending_read_list_node != NULL) { listDelNode(server.clients_pending_read,c->pending_read_list_node); c->pending_read_list_node = NULL; @@ -1630,6 +1631,12 @@ void freeClient(client *c) { reqresReset(c, 1); #endif + /* Remove the contribution that this client gave to our + * incrementally computed memory usage. */ + if (c->conn) + server.stat_clients_type_memory[c->last_memory_type] -= + c->last_memory_usage; + /* Unlink the client: this will close the socket, remove the I/O * handlers, and remove references of the client from different * places where active clients may be referenced. */ @@ -1678,10 +1685,6 @@ void freeClient(client *c) { * we lost the connection with the master. */ if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); - /* Remove the contribution that this client gave to our - * incrementally computed memory usage. */ - server.stat_clients_type_memory[c->last_memory_type] -= - c->last_memory_usage; /* Remove client from memory usage buckets */ if (c->mem_usage_bucket) { c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; @@ -2470,7 +2473,7 @@ int processCommandAndResetClient(client *c) { commandProcessed(c); /* Update the client's memory to include output buffer growth following the * processed command. */ - updateClientMemUsageAndBucket(c); + if (c->conn) updateClientMemUsageAndBucket(c); } if (server.current_client == NULL) deadclient = 1; @@ -3855,6 +3858,11 @@ int checkClientOutputBufferLimits(client *c) { int soft = 0, hard = 0, class; unsigned long used_mem = getClientOutputBufferMemoryUsage(c); + /* For unauthenticated clients the output buffer is limited to prevent + * them from abusing it by not reading the replies */ + if (used_mem > 1024 && authRequired(c)) + return 1; + class = getClientType(c); /* For the purpose of output buffer limiting, masters are handled * like normal clients. */ diff --git a/src/server.c b/src/server.c index 5c9870f0e..213942441 100644 --- a/src/server.c +++ b/src/server.c @@ -872,6 +872,7 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) { * usage bucket. */ void updateClientMemoryUsage(client *c) { + serverAssert(c->conn); size_t mem = getClientMemoryUsage(c, NULL); int type = getClientType(c); /* Now that we have the memory used by the client, remove the old @@ -884,7 +885,7 @@ void updateClientMemoryUsage(client *c) { } int clientEvictionAllowed(client *c) { - if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) { + if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT || !c->conn) { return 0; } int type = getClientType(c); @@ -924,7 +925,7 @@ void removeClientFromMemUsageBucket(client *c, int allow_eviction) { * returns 1 if client eviction for this client is allowed, 0 otherwise. */ int updateClientMemUsageAndBucket(client *c) { - serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + serverAssert(io_threads_op == IO_THREADS_OP_IDLE && c->conn); int allow_eviction = clientEvictionAllowed(c); removeClientFromMemUsageBucket(c, allow_eviction); @@ -1793,7 +1794,9 @@ void afterSleep(struct aeEventLoop *eventLoop) { mstime_t latency; latencyStartMonitor(latency); + atomicSet(server.module_gil_acquring, 1); moduleAcquireGIL(); + atomicSet(server.module_gil_acquring, 0); moduleFireServerEvent(REDISMODULE_EVENT_EVENTLOOP, REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP, NULL); diff --git a/src/server.h b/src/server.h index b1fa542fd..586711e22 100644 --- a/src/server.h +++ b/src/server.h @@ -1581,6 +1581,7 @@ struct redisServer { int module_pipe[2]; /* Pipe used to awake the event loop by module threads. */ pid_t child_pid; /* PID of current child */ int child_type; /* Type of current child */ + redisAtomic int module_gil_acquring; /* Indicates whether the GIL is being acquiring by the main thread. */ /* Networking */ int port; /* TCP listening port */ int tls_port; /* TLS listening port */ @@ -2468,7 +2469,7 @@ void moduleInitModulesSystem(void); void moduleInitModulesSystemLast(void); void modulesCron(void); int moduleLoad(const char *path, void **argv, int argc, int is_loadex); -int moduleUnload(sds name, const char **errmsg); +int moduleUnload(sds name, const char **errmsg, int forced_unload); void moduleLoadFromQueue(void); int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int moduleGetCommandChannelsViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); diff --git a/src/version.h b/src/version.h index 6958e84ca..9d41c5ab1 100644 --- a/src/version.h +++ b/src/version.h @@ -1,2 +1,2 @@ -#define REDIS_VERSION "7.2.7" -#define REDIS_VERSION_NUM 0x00070207 +#define REDIS_VERSION "7.2.8" +#define REDIS_VERSION_NUM 0x00070208 diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 92060fd33..23030cef4 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -102,6 +102,7 @@ typedef struct { void *bg_call_worker(void *arg) { bg_call_data *bg = arg; + RedisModuleBlockedClient *bc = bg->bc; // Get Redis module context RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc); @@ -135,6 +136,12 @@ void *bg_call_worker(void *arg) { RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + cmd_pos + 1, bg->argc - cmd_pos - 1); RedisModule_FreeString(NULL, format_redis_str); + /* Free the arguments within GIL to prevent simultaneous freeing in main thread. */ + for (int i=0; iargc; i++) + RedisModule_FreeString(ctx, bg->argv[i]); + RedisModule_Free(bg->argv); + RedisModule_Free(bg); + // Release GIL RedisModule_ThreadSafeContextUnlock(ctx); @@ -147,13 +154,7 @@ void *bg_call_worker(void *arg) { } // Unblock client - RedisModule_UnblockClient(bg->bc, NULL); - - /* Free the arguments */ - for (int i=0; iargc; i++) - RedisModule_FreeString(ctx, bg->argv[i]); - RedisModule_Free(bg->argv); - RedisModule_Free(bg); + RedisModule_UnblockClient(bc, NULL); // Free the Redis module context RedisModule_FreeThreadSafeContext(ctx); diff --git a/tests/modules/blockonbackground.c b/tests/modules/blockonbackground.c index 2e3b1a557..e068e20d9 100644 --- a/tests/modules/blockonbackground.c +++ b/tests/modules/blockonbackground.c @@ -7,12 +7,41 @@ #define UNUSED(x) (void)(x) +typedef struct { + /* Mutex for protecting RedisModule_BlockedClientMeasureTime*() API from race + * conditions due to timeout callback triggered in the main thread. */ + pthread_mutex_t measuretime_mutex; + int measuretime_completed; /* Indicates that time measure has ended and will not continue further */ + int myint; /* Used for replying */ +} BlockPrivdata; + +void blockClientPrivdataInit(RedisModuleBlockedClient *bc) { + BlockPrivdata *block_privdata = RedisModule_Calloc(1, sizeof(*block_privdata)); + block_privdata->measuretime_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; + RedisModule_BlockClientSetPrivateData(bc, block_privdata); +} + +void blockClientMeasureTimeStart(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata) { + pthread_mutex_lock(&block_privdata->measuretime_mutex); + RedisModule_BlockedClientMeasureTimeStart(bc); + pthread_mutex_unlock(&block_privdata->measuretime_mutex); +} + +void blockClientMeasureTimeEnd(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata, int completed) { + pthread_mutex_lock(&block_privdata->measuretime_mutex); + if (!block_privdata->measuretime_completed) { + RedisModule_BlockedClientMeasureTimeEnd(bc); + if (completed) block_privdata->measuretime_completed = 1; + } + pthread_mutex_unlock(&block_privdata->measuretime_mutex); +} + /* Reply callback for blocking command BLOCK.DEBUG */ int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { UNUSED(argv); UNUSED(argc); - int *myint = RedisModule_GetBlockedClientPrivateData(ctx); - return RedisModule_ReplyWithLongLong(ctx,*myint); + BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx); + return RedisModule_ReplyWithLongLong(ctx,block_privdata->myint); } /* Timeout callback for blocking command BLOCK.DEBUG */ @@ -20,13 +49,16 @@ int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) UNUSED(argv); UNUSED(argc); RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx); - RedisModule_BlockedClientMeasureTimeEnd(bc); + BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx); + blockClientMeasureTimeEnd(bc, block_privdata, 1); return RedisModule_ReplyWithSimpleString(ctx,"Request timedout"); } /* Private data freeing callback for BLOCK.DEBUG command. */ void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) { UNUSED(ctx); + BlockPrivdata *block_privdata = privdata; + pthread_mutex_destroy(&block_privdata->measuretime_mutex); RedisModule_Free(privdata); } @@ -42,19 +74,20 @@ void *BlockDebug_ThreadMain(void *arg) { RedisModuleBlockedClient *bc = targ[0]; long long delay = (unsigned long)targ[1]; long long enable_time_track = (unsigned long)targ[2]; + BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc); + if (enable_time_track) - RedisModule_BlockedClientMeasureTimeStart(bc); + blockClientMeasureTimeStart(bc, block_privdata); RedisModule_Free(targ); struct timespec ts; ts.tv_sec = delay / 1000; ts.tv_nsec = (delay % 1000) * 1000000; nanosleep(&ts, NULL); - int *r = RedisModule_Alloc(sizeof(int)); - *r = rand(); if (enable_time_track) - RedisModule_BlockedClientMeasureTimeEnd(bc); - RedisModule_UnblockClient(bc,r); + blockClientMeasureTimeEnd(bc, block_privdata, 0); + block_privdata->myint = rand(); + RedisModule_UnblockClient(bc,block_privdata); return NULL; } @@ -64,23 +97,22 @@ void *DoubleBlock_ThreadMain(void *arg) { void **targ = arg; RedisModuleBlockedClient *bc = targ[0]; long long delay = (unsigned long)targ[1]; - RedisModule_BlockedClientMeasureTimeStart(bc); + BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc); + blockClientMeasureTimeStart(bc, block_privdata); RedisModule_Free(targ); struct timespec ts; ts.tv_sec = delay / 1000; ts.tv_nsec = (delay % 1000) * 1000000; nanosleep(&ts, NULL); - int *r = RedisModule_Alloc(sizeof(int)); - *r = rand(); - RedisModule_BlockedClientMeasureTimeEnd(bc); + blockClientMeasureTimeEnd(bc, block_privdata, 0); /* call again RedisModule_BlockedClientMeasureTimeStart() and * RedisModule_BlockedClientMeasureTimeEnd and ensure that the * total execution time is 2x the delay. */ - RedisModule_BlockedClientMeasureTimeStart(bc); + blockClientMeasureTimeStart(bc, block_privdata); nanosleep(&ts, NULL); - RedisModule_BlockedClientMeasureTimeEnd(bc); - - RedisModule_UnblockClient(bc,r); + blockClientMeasureTimeEnd(bc, block_privdata, 0); + block_privdata->myint = rand(); + RedisModule_UnblockClient(bc,block_privdata); return NULL; } @@ -107,6 +139,7 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a pthread_t tid; RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout); + blockClientPrivdataInit(bc); /* Here we set a disconnection handler, however since this module will * block in sleep() in a thread, there is not much we can do in the @@ -148,6 +181,7 @@ int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **a pthread_t tid; RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout); + blockClientPrivdataInit(bc); /* Here we set a disconnection handler, however since this module will * block in sleep() in a thread, there is not much we can do in the @@ -184,6 +218,7 @@ int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, pthread_t tid; RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0); + blockClientPrivdataInit(bc); /* Now that we setup a blocking client, we need to pass the control * to the thread. However we need to pass arguments to the thread: diff --git a/tests/modules/datatype.c b/tests/modules/datatype.c index 408d1a526..05cf2337c 100644 --- a/tests/modules/datatype.c +++ b/tests/modules/datatype.c @@ -312,3 +312,12 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } + +int RedisModule_OnUnload(RedisModuleCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + if (datatype) { + RedisModule_Free(datatype); + datatype = NULL; + } + return REDISMODULE_OK; +} diff --git a/tests/modules/defragtest.c b/tests/modules/defragtest.c index 6a02a059f..22f304924 100644 --- a/tests/modules/defragtest.c +++ b/tests/modules/defragtest.c @@ -141,13 +141,14 @@ size_t FragFreeEffort(RedisModuleString *key, const void *value) { } int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) { - REDISMODULE_NOT_USED(key); unsigned long i = 0; int steps = 0; int dbid = RedisModule_GetDbIdFromDefragCtx(ctx); RedisModule_Assert(dbid != -1); + RedisModule_Log(NULL, "notice", "Defrag key: %s", RedisModule_StringPtrLen(key, NULL)); + /* Attempt to get cursor, validate it's what we're exepcting */ if (RedisModule_DefragCursorGet(ctx, &i) == REDISMODULE_OK) { if (i > 0) datatype_resumes++; diff --git a/tests/modules/usercall.c b/tests/modules/usercall.c index 6b23974d4..316de1eea 100644 --- a/tests/modules/usercall.c +++ b/tests/modules/usercall.c @@ -115,6 +115,7 @@ typedef struct { void *bg_call_worker(void *arg) { bg_call_data *bg = arg; + RedisModuleBlockedClient *bc = bg->bc; // Get Redis module context RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc); @@ -136,6 +137,12 @@ void *bg_call_worker(void *arg) { RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + 3, bg->argc - 3); RedisModule_FreeString(NULL, format_redis_str); + /* Free the arguments within GIL to prevent simultaneous freeing in main thread. */ + for (int i=0; iargc; i++) + RedisModule_FreeString(ctx, bg->argv[i]); + RedisModule_Free(bg->argv); + RedisModule_Free(bg); + // Release GIL RedisModule_ThreadSafeContextUnlock(ctx); @@ -148,13 +155,7 @@ void *bg_call_worker(void *arg) { } // Unblock client - RedisModule_UnblockClient(bg->bc, NULL); - - /* Free the arguments */ - for (int i=0; iargc; i++) - RedisModule_FreeString(ctx, bg->argv[i]); - RedisModule_Free(bg->argv); - RedisModule_Free(bg); + RedisModule_UnblockClient(bc, NULL); // Free the Redis module context RedisModule_FreeThreadSafeContext(ctx); diff --git a/tests/unit/auth.tcl b/tests/unit/auth.tcl index 9532e0bd2..0d25f5dea 100644 --- a/tests/unit/auth.tcl +++ b/tests/unit/auth.tcl @@ -45,6 +45,24 @@ start_server {tags {"auth external:skip"} overrides {requirepass foobar}} { assert_match {*unauthenticated bulk length*} $e $rr close } + + test {For unauthenticated clients output buffer is limited} { + set rr [redis [srv "host"] [srv "port"] 1 $::tls] + $rr SET x 5 + catch {[$rr read]} e + assert_match {*NOAUTH Authentication required*} $e + + # Fill the output buffer in a loop without reading it and make + # sure the client disconnected. + # Considering the socket eat some of the replies, we are testing + # that such client can't consume more than few MB's. + catch { + for {set j 0} {$j < 1000000} {incr j} { + $rr SET x 5 + } + } e + assert_match {I/O error reading reply} $e + } } start_server {tags {"auth_binary_password external:skip"}} { diff --git a/tests/unit/moduleapi/datatype.tcl b/tests/unit/moduleapi/datatype.tcl index 951c060e7..5d1722caa 100644 --- a/tests/unit/moduleapi/datatype.tcl +++ b/tests/unit/moduleapi/datatype.tcl @@ -1,6 +1,11 @@ set testmodule [file normalize tests/modules/datatype.so] start_server {tags {"modules"}} { + test {DataType: test loadex with invalid config} { + catch { r module loadex $testmodule CONFIG invalid_config 1 } e + assert_match {*ERR Error loading the extension*} $e + } + r module load $testmodule test {DataType: Test module is sane, GET/SET work.} { diff --git a/tests/unit/pause.tcl b/tests/unit/pause.tcl index e30f922e6..5f4e92cae 100644 --- a/tests/unit/pause.tcl +++ b/tests/unit/pause.tcl @@ -359,6 +359,26 @@ start_server {tags {"pause network"}} { } {bar2} } + test "Test the randomkey command will not cause the server to get into an infinite loop during the client pause write" { + r flushall + + r multi + r set key value px 3 + r client pause 10000 write + r exec + + after 5 + + wait_for_condition 50 100 { + [r randomkey] == "key" + } else { + fail "execute randomkey failed, caused by the infinite loop" + } + + r client unpause + assert_equal [r randomkey] {} + } + # Make sure we unpause at the end r client unpause }