mirror of https://mirror.osredm.com/root/redis.git
Release 7.2.8 (#13965)
CI / test-ubuntu-latest (push) Failing after 0s
Details
CI / test-sanitizer-address (push) Failing after 0s
Details
CI / build-debian-old (push) Failing after 0s
Details
CI / build-32bit (push) Failing after 0s
Details
CI / build-libc-malloc (push) Failing after 0s
Details
CI / build-centos-jemalloc (push) Failing after 0s
Details
CI / build-old-chain-jemalloc (push) Failing after 0s
Details
External Server Tests / test-external-standalone (push) Failing after 0s
Details
External Server Tests / test-external-cluster (push) Failing after 0s
Details
External Server Tests / test-external-nodebug (push) Failing after 0s
Details
Spellcheck / Spellcheck (push) Failing after 1s
Details
CI / build-macos-latest (push) Has been cancelled
Details
CI / test-ubuntu-latest (push) Failing after 0s
Details
CI / test-sanitizer-address (push) Failing after 0s
Details
CI / build-debian-old (push) Failing after 0s
Details
CI / build-32bit (push) Failing after 0s
Details
CI / build-libc-malloc (push) Failing after 0s
Details
CI / build-centos-jemalloc (push) Failing after 0s
Details
CI / build-old-chain-jemalloc (push) Failing after 0s
Details
External Server Tests / test-external-standalone (push) Failing after 0s
Details
External Server Tests / test-external-cluster (push) Failing after 0s
Details
External Server Tests / test-external-nodebug (push) Failing after 0s
Details
Spellcheck / Spellcheck (push) Failing after 1s
Details
CI / build-macos-latest (push) Has been cancelled
Details
This commit is contained in:
commit
31d93c5928
|
@ -91,8 +91,8 @@ jobs:
|
|||
run: |
|
||||
apt-get update
|
||||
apt-get install -y gnupg2
|
||||
echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list
|
||||
echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list
|
||||
echo "deb http://archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list
|
||||
echo "deb http://archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list
|
||||
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5
|
||||
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32
|
||||
apt-get update
|
||||
|
|
|
@ -1116,8 +1116,8 @@ jobs:
|
|||
run: |
|
||||
apt-get update
|
||||
apt-get install -y gnupg2
|
||||
echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list
|
||||
echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list
|
||||
echo "deb http://archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list
|
||||
echo "deb http://archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list
|
||||
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5
|
||||
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32
|
||||
apt-get update
|
||||
|
@ -1164,8 +1164,8 @@ jobs:
|
|||
run: |
|
||||
apt-get update
|
||||
apt-get install -y gnupg2
|
||||
echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list
|
||||
echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list
|
||||
echo "deb http://archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list
|
||||
echo "deb http://archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list
|
||||
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5
|
||||
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32
|
||||
apt-get update
|
||||
|
@ -1218,8 +1218,8 @@ jobs:
|
|||
run: |
|
||||
apt-get update
|
||||
apt-get install -y gnupg2
|
||||
echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list
|
||||
echo "deb http://dk.archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list
|
||||
echo "deb http://archive.ubuntu.com/ubuntu/ xenial main" >> /etc/apt/sources.list
|
||||
echo "deb http://archive.ubuntu.com/ubuntu/ xenial universe" >> /etc/apt/sources.list
|
||||
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 40976EAF437D05B5
|
||||
apt-key adv --keyserver keyserver.ubuntu.com --recv-keys 3B4FE6ACC0B21F32
|
||||
apt-get update
|
||||
|
|
|
@ -26,7 +26,7 @@ jobs:
|
|||
--tags -slow
|
||||
- name: Archive redis log
|
||||
if: ${{ failure() }}
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: test-external-redis-log
|
||||
path: external-redis.log
|
||||
|
@ -53,7 +53,7 @@ jobs:
|
|||
--tags -slow
|
||||
- name: Archive redis log
|
||||
if: ${{ failure() }}
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: test-external-cluster-log
|
||||
path: external-redis.log
|
||||
|
@ -76,7 +76,7 @@ jobs:
|
|||
--tags "-slow -needs:debug"
|
||||
- name: Archive redis log
|
||||
if: ${{ failure() }}
|
||||
uses: actions/upload-artifact@v3
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: test-external-redis-log
|
||||
path: external-redis.log
|
||||
|
|
|
@ -11,6 +11,22 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
|
|||
SECURITY: There are security fixes in the release.
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
================================================================================
|
||||
Redis 7.2.8 Released Wed 23 Apr 2025 12:00:00 IST
|
||||
================================================================================
|
||||
|
||||
Update urgency: `SECURITY`: There are security fixes in the release.
|
||||
|
||||
### Security fixes
|
||||
|
||||
* (CVE-2025-21605) An unauthenticated client can cause an unlimited growth of output buffers
|
||||
|
||||
### Bug fixes
|
||||
|
||||
* #12817, #12905 Fix race condition issues between the main thread and module threads
|
||||
* #13863 `RANDOMKEY` - infinite loop during client pause
|
||||
* #13877 ShardID inconsistency when both primary and replica support it
|
||||
|
||||
|
||||
================================================================================
|
||||
Redis 7.2.7 Released Mon 6 Jan 2025 12:30:00 IDT
|
||||
|
|
|
@ -101,6 +101,9 @@ else
|
|||
ifeq ($(SANITIZER),undefined)
|
||||
MALLOC=libc
|
||||
CFLAGS+=-fsanitize=undefined -fno-sanitize-recover=all -fno-omit-frame-pointer
|
||||
ifeq (clang,$(CLANG))
|
||||
CFLAGS+=-fno-sanitize=function
|
||||
endif
|
||||
LDFLAGS+=-fsanitize=undefined
|
||||
else
|
||||
ifeq ($(SANITIZER),thread)
|
||||
|
|
|
@ -61,6 +61,7 @@ list *clusterGetNodesInMyShard(clusterNode *node);
|
|||
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
|
||||
int clusterAddSlot(clusterNode *n, int slot);
|
||||
int clusterDelSlot(int slot);
|
||||
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node);
|
||||
int clusterDelNodeSlots(clusterNode *node);
|
||||
int clusterNodeSetSlotBit(clusterNode *n, int slot);
|
||||
void clusterSetMaster(clusterNode *n);
|
||||
|
@ -963,30 +964,24 @@ clusterNode *clusterNodeGetSlave(clusterNode *node, int slave_idx) {
|
|||
|
||||
static void updateShardId(clusterNode *node, const char *shard_id) {
|
||||
if (shard_id && memcmp(node->shard_id, shard_id, CLUSTER_NAMELEN) != 0) {
|
||||
assignShardIdToNode(node, shard_id, CLUSTER_TODO_SAVE_CONFIG);
|
||||
|
||||
/* If the replica or master does not support shard-id (old version),
|
||||
* we still need to make our best effort to keep their shard-id consistent.
|
||||
/* We always make our best effort to keep the shard-id consistent
|
||||
* between the master and its replicas:
|
||||
*
|
||||
* 1. Master supports but the replica does not.
|
||||
* We might first update the replica's shard-id to the master's randomly
|
||||
* generated shard-id. Then, when the master's shard-id arrives, we must
|
||||
* also update all its replicas.
|
||||
* 2. If the master does not support but the replica does.
|
||||
* We also need to synchronize the master's shard-id with the replica.
|
||||
* 3. If neither of master and replica supports it.
|
||||
* The master will have a randomly generated shard-id and will update
|
||||
* the replica to match the master's shard-id. */
|
||||
* 1. When updating the master's shard-id, we simultaneously update the
|
||||
* shard-id of all its replicas to ensure consistency.
|
||||
* 2. When updating replica's shard-id, if it differs from its master's shard-id,
|
||||
* we discard this replica's shard-id and continue using master's shard-id.
|
||||
* This applies even if the master does not support shard-id, in which
|
||||
* case we rely on the master's randomly generated shard-id. */
|
||||
if (node->slaveof == NULL) {
|
||||
assignShardIdToNode(node, shard_id, CLUSTER_TODO_SAVE_CONFIG);
|
||||
for (int i = 0; i < clusterNodeNumSlaves(node); i++) {
|
||||
clusterNode *slavenode = clusterNodeGetSlave(node, i);
|
||||
if (memcmp(slavenode->shard_id, shard_id, CLUSTER_NAMELEN) != 0)
|
||||
assignShardIdToNode(slavenode, shard_id, CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
|
||||
}
|
||||
} else {
|
||||
clusterNode *masternode = node->slaveof;
|
||||
if (memcmp(masternode->shard_id, shard_id, CLUSTER_NAMELEN) != 0)
|
||||
assignShardIdToNode(masternode, shard_id, CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
|
||||
} else if (memcmp(node->slaveof->shard_id, shard_id, CLUSTER_NAMELEN) == 0) {
|
||||
assignShardIdToNode(node, shard_id, CLUSTER_TODO_SAVE_CONFIG);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3058,7 +3053,50 @@ int clusterProcessPacket(clusterLink *link) {
|
|||
|
||||
if (nodeIsMaster(sender)) {
|
||||
/* Master turned into a slave! Reconfigure the node. */
|
||||
clusterDelNodeSlots(sender);
|
||||
if (master && !memcmp(master->shard_id, sender->shard_id, CLUSTER_NAMELEN)) {
|
||||
/* `sender` was a primary and was in the same shard as `master`, its new primary */
|
||||
if (sender->configEpoch > senderConfigEpoch) {
|
||||
serverLog(LL_NOTICE,
|
||||
"Ignore stale message from %.40s (%s) in shard %.40s;"
|
||||
" gossip config epoch: %llu, current config epoch: %llu",
|
||||
sender->name,
|
||||
sender->human_nodename,
|
||||
sender->shard_id,
|
||||
(unsigned long long)senderConfigEpoch,
|
||||
(unsigned long long)sender->configEpoch);
|
||||
} else {
|
||||
/* A failover occurred in the shard where `sender` belongs to and `sender` is no longer
|
||||
* a primary. Update slot assignment to `master`, which is the new primary in the shard */
|
||||
int slots = clusterMoveNodeSlots(sender, master);
|
||||
/* `master` is still a `slave` in this observer node's view; update its role and configEpoch */
|
||||
clusterSetNodeAsMaster(master);
|
||||
master->configEpoch = senderConfigEpoch;
|
||||
serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)"
|
||||
" lost %d slot(s) to node %.40s (%s) with a config epoch of %llu",
|
||||
sender->shard_id,
|
||||
sender->name,
|
||||
sender->human_nodename,
|
||||
slots,
|
||||
master->name,
|
||||
master->human_nodename,
|
||||
(unsigned long long) master->configEpoch);
|
||||
}
|
||||
} else {
|
||||
/* `sender` was moved to another shard and has become a replica, remove its slot assignment */
|
||||
int slots = clusterDelNodeSlots(sender);
|
||||
serverLog(LL_NOTICE, "Node %.40s (%s) is no longer master of shard %.40s;"
|
||||
" removed all %d slot(s) it used to own",
|
||||
sender->name,
|
||||
sender->human_nodename,
|
||||
sender->shard_id,
|
||||
slots);
|
||||
if (master != NULL) {
|
||||
serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s",
|
||||
sender->name,
|
||||
sender->human_nodename,
|
||||
master->shard_id);
|
||||
}
|
||||
}
|
||||
sender->flags &= ~(CLUSTER_NODE_MASTER|
|
||||
CLUSTER_NODE_MIGRATE_TO);
|
||||
sender->flags |= CLUSTER_NODE_SLAVE;
|
||||
|
@ -5010,6 +5048,22 @@ int clusterDelSlot(int slot) {
|
|||
return C_OK;
|
||||
}
|
||||
|
||||
/* Transfer slots from `from_node` to `to_node`.
|
||||
* Iterates over all cluster slots, transferring each slot covered by `from_node` to `to_node`.
|
||||
* Counts and returns the number of slots transferred. */
|
||||
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node) {
|
||||
int processed = 0;
|
||||
|
||||
for (int j = 0; j < CLUSTER_SLOTS; j++) {
|
||||
if (clusterNodeGetSlotBit(from_node, j)) {
|
||||
clusterDelSlot(j);
|
||||
clusterAddSlot(to_node, j);
|
||||
processed++;
|
||||
}
|
||||
}
|
||||
return processed;
|
||||
}
|
||||
|
||||
/* Delete all the slots associated with the specified node.
|
||||
* The number of deleted slots is returned. */
|
||||
int clusterDelNodeSlots(clusterNode *node) {
|
||||
|
|
2
src/db.c
2
src/db.c
|
@ -333,7 +333,7 @@ robj *dbRandomKey(redisDb *db) {
|
|||
key = dictGetKey(de);
|
||||
keyobj = createStringObject(key,sdslen(key));
|
||||
if (dictFind(db->expires,key)) {
|
||||
if (allvolatile && server.masterhost && --maxtries == 0) {
|
||||
if (allvolatile && (server.masterhost || isPausedActions(PAUSE_ACTION_EXPIRE)) && --maxtries == 0) {
|
||||
/* If the DB is composed only of keys with an expire set,
|
||||
* it could happen that all the keys are already logically
|
||||
* expired in the slave, so the function cannot stop because
|
||||
|
|
|
@ -662,8 +662,9 @@ void defragStream(redisDb *db, dictEntry *kde) {
|
|||
void defragModule(redisDb *db, dictEntry *kde) {
|
||||
robj *obj = dictGetVal(kde);
|
||||
serverAssert(obj->type == OBJ_MODULE);
|
||||
|
||||
if (!moduleDefragValue(dictGetKey(kde), obj, db->id))
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj, dictGetKey(kde));
|
||||
if (!moduleDefragValue(&keyobj, obj, db->id))
|
||||
defragLater(db, kde);
|
||||
}
|
||||
|
||||
|
@ -806,7 +807,9 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int
|
|||
} else if (ob->type == OBJ_STREAM) {
|
||||
return scanLaterStreamListpacks(ob, cursor, endtime);
|
||||
} else if (ob->type == OBJ_MODULE) {
|
||||
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, dbid);
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj, dictGetKey(de));
|
||||
return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid);
|
||||
} else {
|
||||
*cursor = 0; /* object type may have changed since we schedule it for later */
|
||||
}
|
||||
|
|
99
src/module.c
99
src/module.c
|
@ -282,6 +282,10 @@ typedef struct RedisModuleBlockedClient {
|
|||
monotime background_timer; /* Timer tracking the start of background work */
|
||||
uint64_t background_duration; /* Current command background time duration.
|
||||
Used for measuring latency of blocking cmds */
|
||||
int blocked_on_keys_explicit_unblock; /* Set to 1 only in the case of an explicit RM_Unblock on
|
||||
* a client that is blocked on keys. In this case we will
|
||||
* call the timeout call back from within
|
||||
* moduleHandleBlockedClients which runs from the main thread */
|
||||
} RedisModuleBlockedClient;
|
||||
|
||||
/* This is a list of Module Auth Contexts. Each time a Module registers a callback, a new ctx is
|
||||
|
@ -306,7 +310,6 @@ static size_t moduleTempClientMinCount = 0; /* Min client count in pool since
|
|||
* allow thread safe contexts to execute commands at a safe moment. */
|
||||
static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
|
||||
/* Function pointer type for keyspace event notification subscriptions from modules. */
|
||||
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
|
||||
|
||||
|
@ -2294,7 +2297,10 @@ ustime_t RM_CachedMicroseconds(void) {
|
|||
* Within the same command, you can call multiple times
|
||||
* RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd()
|
||||
* to accumulate independent time intervals to the background duration.
|
||||
* This method always return REDISMODULE_OK. */
|
||||
* This method always return REDISMODULE_OK.
|
||||
*
|
||||
* This function is not thread safe, If used in module thread and blocked callback (possibly main thread)
|
||||
* simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */
|
||||
int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
|
||||
elapsedStart(&(bc->background_timer));
|
||||
return REDISMODULE_OK;
|
||||
|
@ -2304,7 +2310,10 @@ int RM_BlockedClientMeasureTimeStart(RedisModuleBlockedClient *bc) {
|
|||
* to calculate the elapsed execution time.
|
||||
* On success REDISMODULE_OK is returned.
|
||||
* This method only returns REDISMODULE_ERR if no start time was
|
||||
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ). */
|
||||
* previously defined ( meaning RM_BlockedClientMeasureTimeStart was not called ).
|
||||
*
|
||||
* This function is not thread safe, If used in module thread and blocked callback (possibly main thread)
|
||||
* simultaneously, it's recommended to protect them with lock owned by caller instead of GIL. */
|
||||
int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
|
||||
// If the counter is 0 then we haven't called RM_BlockedClientMeasureTimeStart
|
||||
if (!bc->background_timer)
|
||||
|
@ -2363,7 +2372,33 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
|
|||
server.busy_module_yield_flags |= BUSY_MODULE_YIELD_CLIENTS;
|
||||
|
||||
/* Let redis process events */
|
||||
processEventsWhileBlocked();
|
||||
if (!pthread_equal(server.main_thread_id, pthread_self())) {
|
||||
/* If we are not in the main thread, we defer event loop processing to the main thread
|
||||
* after the main thread enters acquiring GIL state in order to protect the event
|
||||
* loop (ae.c) and avoid potential race conditions. */
|
||||
|
||||
int acquiring;
|
||||
atomicGet(server.module_gil_acquring, acquiring);
|
||||
if (!acquiring) {
|
||||
/* If the main thread has not yet entered the acquiring GIL state,
|
||||
* we attempt to wake it up and exit without waiting for it to
|
||||
* acquire the GIL. This avoids blocking the caller, allowing them to
|
||||
* continue with unfinished tasks before the next yield.
|
||||
* We assume the caller keeps the GIL locked. */
|
||||
if (write(server.module_pipe[1],"A",1) != 1) {
|
||||
/* Ignore the error, this is best-effort. */
|
||||
}
|
||||
} else {
|
||||
/* Release the GIL, yielding CPU to give the main thread an opportunity to start
|
||||
* event processing, and then acquire the GIL again until the main thread releases it. */
|
||||
moduleReleaseGIL();
|
||||
sched_yield();
|
||||
moduleAcquireGIL();
|
||||
}
|
||||
} else {
|
||||
/* If we are in the main thread, we can safely process events. */
|
||||
processEventsWhileBlocked();
|
||||
}
|
||||
|
||||
server.busy_module_yield_reply = prev_busy_module_yield_reply;
|
||||
/* Possibly restore the previous flags in case of two nested contexts
|
||||
|
@ -2647,7 +2682,10 @@ RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisM
|
|||
* pass ctx as NULL when releasing the string (but passing a context will not
|
||||
* create any issue). Strings created with a context should be freed also passing
|
||||
* the context, so if you want to free a string out of context later, make sure
|
||||
* to create it using a NULL context. */
|
||||
* to create it using a NULL context.
|
||||
*
|
||||
* This API is not thread safe, access to these retained strings (if they originated
|
||||
* from a client command arguments) must be done with GIL locked. */
|
||||
void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
||||
decrRefCount(str);
|
||||
if (ctx != NULL) autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str);
|
||||
|
@ -2684,7 +2722,10 @@ void RM_FreeString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
|||
*
|
||||
* Threaded modules that reference retained strings from other threads *must*
|
||||
* explicitly trim the allocation as soon as the string is retained. Not doing
|
||||
* so may result with automatic trimming which is not thread safe. */
|
||||
* so may result with automatic trimming which is not thread safe.
|
||||
*
|
||||
* This API is not thread safe, access to these retained strings (if they originated
|
||||
* from a client command arguments) must be done with GIL locked. */
|
||||
void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
||||
if (ctx == NULL || !autoMemoryFreed(ctx,REDISMODULE_AM_STRING,str)) {
|
||||
/* Increment the string reference counting only if we can't
|
||||
|
@ -2726,7 +2767,10 @@ void RM_RetainString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
|||
*
|
||||
* Threaded modules that reference held strings from other threads *must*
|
||||
* explicitly trim the allocation as soon as the string is held. Not doing
|
||||
* so may result with automatic trimming which is not thread safe. */
|
||||
* so may result with automatic trimming which is not thread safe.
|
||||
*
|
||||
* This API is not thread safe, access to these retained strings (if they originated
|
||||
* from a client command arguments) must be done with GIL locked. */
|
||||
RedisModuleString* RM_HoldString(RedisModuleCtx *ctx, RedisModuleString *str) {
|
||||
if (str->refcount == OBJ_STATIC_REFCOUNT) {
|
||||
return RM_CreateStringFromString(ctx, str);
|
||||
|
@ -7680,7 +7724,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
|||
int islua = scriptIsRunning();
|
||||
int ismulti = server.in_exec;
|
||||
|
||||
c->bstate.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
||||
c->bstate.module_blocked_handle = zcalloc(sizeof(RedisModuleBlockedClient));
|
||||
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
|
||||
ctx->module->blocked_clients++;
|
||||
|
||||
|
@ -8159,7 +8203,7 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
|
|||
* argument, but better to be safe than sorry. */
|
||||
if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
|
||||
if (bc->unblocked) return REDISMODULE_OK;
|
||||
if (bc->client) moduleBlockedClientTimedOut(bc->client);
|
||||
if (bc->client) bc->blocked_on_keys_explicit_unblock = 1;
|
||||
}
|
||||
moduleUnblockClientByHandle(bc,privdata);
|
||||
return REDISMODULE_OK;
|
||||
|
@ -8237,6 +8281,10 @@ void moduleHandleBlockedClients(void) {
|
|||
reply_us = elapsedUs(replyTimer);
|
||||
moduleFreeContext(&ctx);
|
||||
}
|
||||
if (c && bc->blocked_on_keys_explicit_unblock) {
|
||||
serverAssert(bc->blocked_on_keys);
|
||||
moduleBlockedClientTimedOut(c);
|
||||
}
|
||||
/* Hold onto the blocked client if module auth is in progress. The reply callback is invoked
|
||||
* when the client is reprocessed. */
|
||||
if (c && clientHasModuleAuthInProgress(c)) {
|
||||
|
@ -8257,6 +8305,9 @@ void moduleHandleBlockedClients(void) {
|
|||
/* Update stats now that we've finished the blocking operation.
|
||||
* This needs to be out of the reply callback above given that a
|
||||
* module might not define any callback and still do blocking ops.
|
||||
*
|
||||
* If the module is blocked on keys updateStatsOnUnblock will be
|
||||
* called from moduleUnblockClientOnKey
|
||||
*/
|
||||
if (c && !clientHasModuleAuthInProgress(c) && !bc->blocked_on_keys) {
|
||||
updateStatsOnUnblock(c, bc->background_duration, reply_us, server.stat_total_error_replies != prev_error_replies);
|
||||
|
@ -8277,7 +8328,7 @@ void moduleHandleBlockedClients(void) {
|
|||
* if there are pending replies here. This is needed since
|
||||
* during a non blocking command the client may receive output. */
|
||||
if (!clientHasModuleAuthInProgress(c) && clientHasPendingReplies(c) &&
|
||||
!(c->flags & CLIENT_PENDING_WRITE))
|
||||
!(c->flags & CLIENT_PENDING_WRITE) && c->conn)
|
||||
{
|
||||
c->flags |= CLIENT_PENDING_WRITE;
|
||||
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
|
||||
|
@ -8312,23 +8363,25 @@ int moduleBlockedClientMayTimeout(client *c) {
|
|||
/* Called when our client timed out. After this function unblockClient()
|
||||
* is called, and it will invalidate the blocked client. So this function
|
||||
* does not need to do any cleanup. Eventually the module will call the
|
||||
* API to unblock the client and the memory will be released. */
|
||||
* API to unblock the client and the memory will be released.
|
||||
*
|
||||
* This function should only be called from the main thread, we must handle the unblocking
|
||||
* of the client synchronously. This ensures that we can reply to the client before
|
||||
* resetClient() is called. */
|
||||
void moduleBlockedClientTimedOut(client *c) {
|
||||
RedisModuleBlockedClient *bc = c->bstate.module_blocked_handle;
|
||||
|
||||
/* Protect against re-processing: don't serve clients that are already
|
||||
* in the unblocking list for any reason (including RM_UnblockClient()
|
||||
* explicit call). See #6798. */
|
||||
if (bc->unblocked) return;
|
||||
|
||||
RedisModuleCtx ctx;
|
||||
moduleCreateContext(&ctx, bc->module, REDISMODULE_CTX_BLOCKED_TIMEOUT);
|
||||
ctx.client = bc->client;
|
||||
ctx.blocked_client = bc;
|
||||
ctx.blocked_privdata = bc->privdata;
|
||||
|
||||
long long prev_error_replies = server.stat_total_error_replies;
|
||||
|
||||
bc->timeout_callback(&ctx,(void**)c->argv,c->argc);
|
||||
moduleFreeContext(&ctx);
|
||||
|
||||
updateStatsOnUnblock(c, bc->background_duration, 0, server.stat_total_error_replies != prev_error_replies);
|
||||
|
||||
/* For timeout events, we do not want to call the disconnect callback,
|
||||
|
@ -11849,6 +11902,7 @@ void moduleInitModulesSystem(void) {
|
|||
moduleUnblockedClients = listCreate();
|
||||
server.loadmodule_queue = listCreate();
|
||||
server.module_configs_queue = dictCreate(&sdsKeyValueHashDictType);
|
||||
server.module_gil_acquring = 0;
|
||||
modules = dictCreate(&modulesDictType);
|
||||
moduleAuthCallbacks = listCreate();
|
||||
|
||||
|
@ -12204,7 +12258,7 @@ int moduleLoad(const char *path, void **module_argv, int module_argc, int is_loa
|
|||
}
|
||||
|
||||
if (post_load_err) {
|
||||
moduleUnload(ctx.module->name, NULL);
|
||||
serverAssert(moduleUnload(ctx.module->name, NULL, 1) == C_OK);
|
||||
moduleFreeContext(&ctx);
|
||||
return C_ERR;
|
||||
}
|
||||
|
@ -12220,14 +12274,17 @@ int moduleLoad(const char *path, void **module_argv, int module_argc, int is_loa
|
|||
|
||||
/* Unload the module registered with the specified name. On success
|
||||
* C_OK is returned, otherwise C_ERR is returned and errmsg is set
|
||||
* with an appropriate message. */
|
||||
int moduleUnload(sds name, const char **errmsg) {
|
||||
* with an appropriate message.
|
||||
* Only forcefully unload this module, passing forced_unload != 0,
|
||||
* if it is certain that it has not yet been in use (e.g., immediate
|
||||
* unload on failed load). */
|
||||
int moduleUnload(sds name, const char **errmsg, int forced_unload) {
|
||||
struct RedisModule *module = dictFetchValue(modules,name);
|
||||
|
||||
if (module == NULL) {
|
||||
*errmsg = "no such module with that name";
|
||||
return C_ERR;
|
||||
} else if (listLength(module->types)) {
|
||||
} else if (listLength(module->types) && !forced_unload) {
|
||||
*errmsg = "the module exports one or more module-side data "
|
||||
"types, can't unload";
|
||||
return C_ERR;
|
||||
|
@ -13020,7 +13077,7 @@ NULL
|
|||
|
||||
} else if (!strcasecmp(subcmd,"unload") && c->argc == 3) {
|
||||
const char *errmsg = NULL;
|
||||
if (moduleUnload(c->argv[2]->ptr, &errmsg) == C_OK)
|
||||
if (moduleUnload(c->argv[2]->ptr, &errmsg, 0) == C_OK)
|
||||
addReply(c,shared.ok);
|
||||
else {
|
||||
if (errmsg == NULL) errmsg = "operation not possible.";
|
||||
|
|
|
@ -413,8 +413,9 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
|
|||
* to a channel which we are subscribed to, then we wanna postpone that message to be added
|
||||
* after the command's reply (specifically important during multi-exec). the exception is
|
||||
* the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply.
|
||||
* The check for executing_client also avoids affecting push messages that are part of eviction. */
|
||||
if (c == server.current_client && (c->flags & CLIENT_PUSHING) &&
|
||||
* The check for executing_client also avoids affecting push messages that are part of eviction.
|
||||
* Check CLIENT_PUSHING first to avoid race conditions, as it's absent in module's fake client. */
|
||||
if ((c->flags & CLIENT_PUSHING) && c == server.current_client &&
|
||||
server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd))
|
||||
{
|
||||
_addReplyProtoToList(c,server.pending_push_messages,s,len);
|
||||
|
@ -1433,7 +1434,7 @@ void unlinkClient(client *c) {
|
|||
listNode *ln;
|
||||
|
||||
/* If this is marked as current client unset it. */
|
||||
if (server.current_client == c) server.current_client = NULL;
|
||||
if (c->conn && server.current_client == c) server.current_client = NULL;
|
||||
|
||||
/* Certain operations must be done only if the client has an active connection.
|
||||
* If the client was already unlinked or if it's a "fake client" the
|
||||
|
@ -1477,7 +1478,7 @@ void unlinkClient(client *c) {
|
|||
}
|
||||
|
||||
/* Remove from the list of pending reads if needed. */
|
||||
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
|
||||
serverAssert(!c->conn || io_threads_op == IO_THREADS_OP_IDLE);
|
||||
if (c->pending_read_list_node != NULL) {
|
||||
listDelNode(server.clients_pending_read,c->pending_read_list_node);
|
||||
c->pending_read_list_node = NULL;
|
||||
|
@ -1630,6 +1631,12 @@ void freeClient(client *c) {
|
|||
reqresReset(c, 1);
|
||||
#endif
|
||||
|
||||
/* Remove the contribution that this client gave to our
|
||||
* incrementally computed memory usage. */
|
||||
if (c->conn)
|
||||
server.stat_clients_type_memory[c->last_memory_type] -=
|
||||
c->last_memory_usage;
|
||||
|
||||
/* Unlink the client: this will close the socket, remove the I/O
|
||||
* handlers, and remove references of the client from different
|
||||
* places where active clients may be referenced. */
|
||||
|
@ -1678,10 +1685,6 @@ void freeClient(client *c) {
|
|||
* we lost the connection with the master. */
|
||||
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
|
||||
|
||||
/* Remove the contribution that this client gave to our
|
||||
* incrementally computed memory usage. */
|
||||
server.stat_clients_type_memory[c->last_memory_type] -=
|
||||
c->last_memory_usage;
|
||||
/* Remove client from memory usage buckets */
|
||||
if (c->mem_usage_bucket) {
|
||||
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
|
||||
|
@ -2470,7 +2473,7 @@ int processCommandAndResetClient(client *c) {
|
|||
commandProcessed(c);
|
||||
/* Update the client's memory to include output buffer growth following the
|
||||
* processed command. */
|
||||
updateClientMemUsageAndBucket(c);
|
||||
if (c->conn) updateClientMemUsageAndBucket(c);
|
||||
}
|
||||
|
||||
if (server.current_client == NULL) deadclient = 1;
|
||||
|
@ -3855,6 +3858,11 @@ int checkClientOutputBufferLimits(client *c) {
|
|||
int soft = 0, hard = 0, class;
|
||||
unsigned long used_mem = getClientOutputBufferMemoryUsage(c);
|
||||
|
||||
/* For unauthenticated clients the output buffer is limited to prevent
|
||||
* them from abusing it by not reading the replies */
|
||||
if (used_mem > 1024 && authRequired(c))
|
||||
return 1;
|
||||
|
||||
class = getClientType(c);
|
||||
/* For the purpose of output buffer limiting, masters are handled
|
||||
* like normal clients. */
|
||||
|
|
|
@ -872,6 +872,7 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) {
|
|||
* usage bucket.
|
||||
*/
|
||||
void updateClientMemoryUsage(client *c) {
|
||||
serverAssert(c->conn);
|
||||
size_t mem = getClientMemoryUsage(c, NULL);
|
||||
int type = getClientType(c);
|
||||
/* Now that we have the memory used by the client, remove the old
|
||||
|
@ -884,7 +885,7 @@ void updateClientMemoryUsage(client *c) {
|
|||
}
|
||||
|
||||
int clientEvictionAllowed(client *c) {
|
||||
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) {
|
||||
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT || !c->conn) {
|
||||
return 0;
|
||||
}
|
||||
int type = getClientType(c);
|
||||
|
@ -924,7 +925,7 @@ void removeClientFromMemUsageBucket(client *c, int allow_eviction) {
|
|||
* returns 1 if client eviction for this client is allowed, 0 otherwise.
|
||||
*/
|
||||
int updateClientMemUsageAndBucket(client *c) {
|
||||
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
|
||||
serverAssert(io_threads_op == IO_THREADS_OP_IDLE && c->conn);
|
||||
int allow_eviction = clientEvictionAllowed(c);
|
||||
removeClientFromMemUsageBucket(c, allow_eviction);
|
||||
|
||||
|
@ -1793,7 +1794,9 @@ void afterSleep(struct aeEventLoop *eventLoop) {
|
|||
mstime_t latency;
|
||||
latencyStartMonitor(latency);
|
||||
|
||||
atomicSet(server.module_gil_acquring, 1);
|
||||
moduleAcquireGIL();
|
||||
atomicSet(server.module_gil_acquring, 0);
|
||||
moduleFireServerEvent(REDISMODULE_EVENT_EVENTLOOP,
|
||||
REDISMODULE_SUBEVENT_EVENTLOOP_AFTER_SLEEP,
|
||||
NULL);
|
||||
|
|
|
@ -1581,6 +1581,7 @@ struct redisServer {
|
|||
int module_pipe[2]; /* Pipe used to awake the event loop by module threads. */
|
||||
pid_t child_pid; /* PID of current child */
|
||||
int child_type; /* Type of current child */
|
||||
redisAtomic int module_gil_acquring; /* Indicates whether the GIL is being acquiring by the main thread. */
|
||||
/* Networking */
|
||||
int port; /* TCP listening port */
|
||||
int tls_port; /* TLS listening port */
|
||||
|
@ -2468,7 +2469,7 @@ void moduleInitModulesSystem(void);
|
|||
void moduleInitModulesSystemLast(void);
|
||||
void modulesCron(void);
|
||||
int moduleLoad(const char *path, void **argv, int argc, int is_loadex);
|
||||
int moduleUnload(sds name, const char **errmsg);
|
||||
int moduleUnload(sds name, const char **errmsg, int forced_unload);
|
||||
void moduleLoadFromQueue(void);
|
||||
int moduleGetCommandKeysViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int moduleGetCommandChannelsViaAPI(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
|
|
|
@ -1,2 +1,2 @@
|
|||
#define REDIS_VERSION "7.2.7"
|
||||
#define REDIS_VERSION_NUM 0x00070207
|
||||
#define REDIS_VERSION "7.2.8"
|
||||
#define REDIS_VERSION_NUM 0x00070208
|
||||
|
|
|
@ -102,6 +102,7 @@ typedef struct {
|
|||
|
||||
void *bg_call_worker(void *arg) {
|
||||
bg_call_data *bg = arg;
|
||||
RedisModuleBlockedClient *bc = bg->bc;
|
||||
|
||||
// Get Redis module context
|
||||
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
|
||||
|
@ -135,6 +136,12 @@ void *bg_call_worker(void *arg) {
|
|||
RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + cmd_pos + 1, bg->argc - cmd_pos - 1);
|
||||
RedisModule_FreeString(NULL, format_redis_str);
|
||||
|
||||
/* Free the arguments within GIL to prevent simultaneous freeing in main thread. */
|
||||
for (int i=0; i<bg->argc; i++)
|
||||
RedisModule_FreeString(ctx, bg->argv[i]);
|
||||
RedisModule_Free(bg->argv);
|
||||
RedisModule_Free(bg);
|
||||
|
||||
// Release GIL
|
||||
RedisModule_ThreadSafeContextUnlock(ctx);
|
||||
|
||||
|
@ -147,13 +154,7 @@ void *bg_call_worker(void *arg) {
|
|||
}
|
||||
|
||||
// Unblock client
|
||||
RedisModule_UnblockClient(bg->bc, NULL);
|
||||
|
||||
/* Free the arguments */
|
||||
for (int i=0; i<bg->argc; i++)
|
||||
RedisModule_FreeString(ctx, bg->argv[i]);
|
||||
RedisModule_Free(bg->argv);
|
||||
RedisModule_Free(bg);
|
||||
RedisModule_UnblockClient(bc, NULL);
|
||||
|
||||
// Free the Redis module context
|
||||
RedisModule_FreeThreadSafeContext(ctx);
|
||||
|
|
|
@ -7,12 +7,41 @@
|
|||
|
||||
#define UNUSED(x) (void)(x)
|
||||
|
||||
typedef struct {
|
||||
/* Mutex for protecting RedisModule_BlockedClientMeasureTime*() API from race
|
||||
* conditions due to timeout callback triggered in the main thread. */
|
||||
pthread_mutex_t measuretime_mutex;
|
||||
int measuretime_completed; /* Indicates that time measure has ended and will not continue further */
|
||||
int myint; /* Used for replying */
|
||||
} BlockPrivdata;
|
||||
|
||||
void blockClientPrivdataInit(RedisModuleBlockedClient *bc) {
|
||||
BlockPrivdata *block_privdata = RedisModule_Calloc(1, sizeof(*block_privdata));
|
||||
block_privdata->measuretime_mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
|
||||
RedisModule_BlockClientSetPrivateData(bc, block_privdata);
|
||||
}
|
||||
|
||||
void blockClientMeasureTimeStart(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata) {
|
||||
pthread_mutex_lock(&block_privdata->measuretime_mutex);
|
||||
RedisModule_BlockedClientMeasureTimeStart(bc);
|
||||
pthread_mutex_unlock(&block_privdata->measuretime_mutex);
|
||||
}
|
||||
|
||||
void blockClientMeasureTimeEnd(RedisModuleBlockedClient *bc, BlockPrivdata *block_privdata, int completed) {
|
||||
pthread_mutex_lock(&block_privdata->measuretime_mutex);
|
||||
if (!block_privdata->measuretime_completed) {
|
||||
RedisModule_BlockedClientMeasureTimeEnd(bc);
|
||||
if (completed) block_privdata->measuretime_completed = 1;
|
||||
}
|
||||
pthread_mutex_unlock(&block_privdata->measuretime_mutex);
|
||||
}
|
||||
|
||||
/* Reply callback for blocking command BLOCK.DEBUG */
|
||||
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
UNUSED(argv);
|
||||
UNUSED(argc);
|
||||
int *myint = RedisModule_GetBlockedClientPrivateData(ctx);
|
||||
return RedisModule_ReplyWithLongLong(ctx,*myint);
|
||||
BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx);
|
||||
return RedisModule_ReplyWithLongLong(ctx,block_privdata->myint);
|
||||
}
|
||||
|
||||
/* Timeout callback for blocking command BLOCK.DEBUG */
|
||||
|
@ -20,13 +49,16 @@ int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||
UNUSED(argv);
|
||||
UNUSED(argc);
|
||||
RedisModuleBlockedClient *bc = RedisModule_GetBlockedClientHandle(ctx);
|
||||
RedisModule_BlockedClientMeasureTimeEnd(bc);
|
||||
BlockPrivdata *block_privdata = RedisModule_GetBlockedClientPrivateData(ctx);
|
||||
blockClientMeasureTimeEnd(bc, block_privdata, 1);
|
||||
return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
|
||||
}
|
||||
|
||||
/* Private data freeing callback for BLOCK.DEBUG command. */
|
||||
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
|
||||
UNUSED(ctx);
|
||||
BlockPrivdata *block_privdata = privdata;
|
||||
pthread_mutex_destroy(&block_privdata->measuretime_mutex);
|
||||
RedisModule_Free(privdata);
|
||||
}
|
||||
|
||||
|
@ -42,19 +74,20 @@ void *BlockDebug_ThreadMain(void *arg) {
|
|||
RedisModuleBlockedClient *bc = targ[0];
|
||||
long long delay = (unsigned long)targ[1];
|
||||
long long enable_time_track = (unsigned long)targ[2];
|
||||
BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc);
|
||||
|
||||
if (enable_time_track)
|
||||
RedisModule_BlockedClientMeasureTimeStart(bc);
|
||||
blockClientMeasureTimeStart(bc, block_privdata);
|
||||
RedisModule_Free(targ);
|
||||
|
||||
struct timespec ts;
|
||||
ts.tv_sec = delay / 1000;
|
||||
ts.tv_nsec = (delay % 1000) * 1000000;
|
||||
nanosleep(&ts, NULL);
|
||||
int *r = RedisModule_Alloc(sizeof(int));
|
||||
*r = rand();
|
||||
if (enable_time_track)
|
||||
RedisModule_BlockedClientMeasureTimeEnd(bc);
|
||||
RedisModule_UnblockClient(bc,r);
|
||||
blockClientMeasureTimeEnd(bc, block_privdata, 0);
|
||||
block_privdata->myint = rand();
|
||||
RedisModule_UnblockClient(bc,block_privdata);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -64,23 +97,22 @@ void *DoubleBlock_ThreadMain(void *arg) {
|
|||
void **targ = arg;
|
||||
RedisModuleBlockedClient *bc = targ[0];
|
||||
long long delay = (unsigned long)targ[1];
|
||||
RedisModule_BlockedClientMeasureTimeStart(bc);
|
||||
BlockPrivdata *block_privdata = RedisModule_BlockClientGetPrivateData(bc);
|
||||
blockClientMeasureTimeStart(bc, block_privdata);
|
||||
RedisModule_Free(targ);
|
||||
struct timespec ts;
|
||||
ts.tv_sec = delay / 1000;
|
||||
ts.tv_nsec = (delay % 1000) * 1000000;
|
||||
nanosleep(&ts, NULL);
|
||||
int *r = RedisModule_Alloc(sizeof(int));
|
||||
*r = rand();
|
||||
RedisModule_BlockedClientMeasureTimeEnd(bc);
|
||||
blockClientMeasureTimeEnd(bc, block_privdata, 0);
|
||||
/* call again RedisModule_BlockedClientMeasureTimeStart() and
|
||||
* RedisModule_BlockedClientMeasureTimeEnd and ensure that the
|
||||
* total execution time is 2x the delay. */
|
||||
RedisModule_BlockedClientMeasureTimeStart(bc);
|
||||
blockClientMeasureTimeStart(bc, block_privdata);
|
||||
nanosleep(&ts, NULL);
|
||||
RedisModule_BlockedClientMeasureTimeEnd(bc);
|
||||
|
||||
RedisModule_UnblockClient(bc,r);
|
||||
blockClientMeasureTimeEnd(bc, block_privdata, 0);
|
||||
block_privdata->myint = rand();
|
||||
RedisModule_UnblockClient(bc,block_privdata);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -107,6 +139,7 @@ int HelloBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a
|
|||
|
||||
pthread_t tid;
|
||||
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
||||
blockClientPrivdataInit(bc);
|
||||
|
||||
/* Here we set a disconnection handler, however since this module will
|
||||
* block in sleep() in a thread, there is not much we can do in the
|
||||
|
@ -148,6 +181,7 @@ int HelloBlockNoTracking_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **a
|
|||
|
||||
pthread_t tid;
|
||||
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout);
|
||||
blockClientPrivdataInit(bc);
|
||||
|
||||
/* Here we set a disconnection handler, however since this module will
|
||||
* block in sleep() in a thread, there is not much we can do in the
|
||||
|
@ -184,6 +218,7 @@ int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
|||
|
||||
pthread_t tid;
|
||||
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,0);
|
||||
blockClientPrivdataInit(bc);
|
||||
|
||||
/* Now that we setup a blocking client, we need to pass the control
|
||||
* to the thread. However we need to pass arguments to the thread:
|
||||
|
|
|
@ -312,3 +312,12 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
int RedisModule_OnUnload(RedisModuleCtx *ctx) {
|
||||
REDISMODULE_NOT_USED(ctx);
|
||||
if (datatype) {
|
||||
RedisModule_Free(datatype);
|
||||
datatype = NULL;
|
||||
}
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
|
|
@ -141,13 +141,14 @@ size_t FragFreeEffort(RedisModuleString *key, const void *value) {
|
|||
}
|
||||
|
||||
int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) {
|
||||
REDISMODULE_NOT_USED(key);
|
||||
unsigned long i = 0;
|
||||
int steps = 0;
|
||||
|
||||
int dbid = RedisModule_GetDbIdFromDefragCtx(ctx);
|
||||
RedisModule_Assert(dbid != -1);
|
||||
|
||||
RedisModule_Log(NULL, "notice", "Defrag key: %s", RedisModule_StringPtrLen(key, NULL));
|
||||
|
||||
/* Attempt to get cursor, validate it's what we're exepcting */
|
||||
if (RedisModule_DefragCursorGet(ctx, &i) == REDISMODULE_OK) {
|
||||
if (i > 0) datatype_resumes++;
|
||||
|
|
|
@ -115,6 +115,7 @@ typedef struct {
|
|||
|
||||
void *bg_call_worker(void *arg) {
|
||||
bg_call_data *bg = arg;
|
||||
RedisModuleBlockedClient *bc = bg->bc;
|
||||
|
||||
// Get Redis module context
|
||||
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
|
||||
|
@ -136,6 +137,12 @@ void *bg_call_worker(void *arg) {
|
|||
RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + 3, bg->argc - 3);
|
||||
RedisModule_FreeString(NULL, format_redis_str);
|
||||
|
||||
/* Free the arguments within GIL to prevent simultaneous freeing in main thread. */
|
||||
for (int i=0; i<bg->argc; i++)
|
||||
RedisModule_FreeString(ctx, bg->argv[i]);
|
||||
RedisModule_Free(bg->argv);
|
||||
RedisModule_Free(bg);
|
||||
|
||||
// Release GIL
|
||||
RedisModule_ThreadSafeContextUnlock(ctx);
|
||||
|
||||
|
@ -148,13 +155,7 @@ void *bg_call_worker(void *arg) {
|
|||
}
|
||||
|
||||
// Unblock client
|
||||
RedisModule_UnblockClient(bg->bc, NULL);
|
||||
|
||||
/* Free the arguments */
|
||||
for (int i=0; i<bg->argc; i++)
|
||||
RedisModule_FreeString(ctx, bg->argv[i]);
|
||||
RedisModule_Free(bg->argv);
|
||||
RedisModule_Free(bg);
|
||||
RedisModule_UnblockClient(bc, NULL);
|
||||
|
||||
// Free the Redis module context
|
||||
RedisModule_FreeThreadSafeContext(ctx);
|
||||
|
|
|
@ -45,6 +45,24 @@ start_server {tags {"auth external:skip"} overrides {requirepass foobar}} {
|
|||
assert_match {*unauthenticated bulk length*} $e
|
||||
$rr close
|
||||
}
|
||||
|
||||
test {For unauthenticated clients output buffer is limited} {
|
||||
set rr [redis [srv "host"] [srv "port"] 1 $::tls]
|
||||
$rr SET x 5
|
||||
catch {[$rr read]} e
|
||||
assert_match {*NOAUTH Authentication required*} $e
|
||||
|
||||
# Fill the output buffer in a loop without reading it and make
|
||||
# sure the client disconnected.
|
||||
# Considering the socket eat some of the replies, we are testing
|
||||
# that such client can't consume more than few MB's.
|
||||
catch {
|
||||
for {set j 0} {$j < 1000000} {incr j} {
|
||||
$rr SET x 5
|
||||
}
|
||||
} e
|
||||
assert_match {I/O error reading reply} $e
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"auth_binary_password external:skip"}} {
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
set testmodule [file normalize tests/modules/datatype.so]
|
||||
|
||||
start_server {tags {"modules"}} {
|
||||
test {DataType: test loadex with invalid config} {
|
||||
catch { r module loadex $testmodule CONFIG invalid_config 1 } e
|
||||
assert_match {*ERR Error loading the extension*} $e
|
||||
}
|
||||
|
||||
r module load $testmodule
|
||||
|
||||
test {DataType: Test module is sane, GET/SET work.} {
|
||||
|
|
|
@ -359,6 +359,26 @@ start_server {tags {"pause network"}} {
|
|||
} {bar2}
|
||||
}
|
||||
|
||||
test "Test the randomkey command will not cause the server to get into an infinite loop during the client pause write" {
|
||||
r flushall
|
||||
|
||||
r multi
|
||||
r set key value px 3
|
||||
r client pause 10000 write
|
||||
r exec
|
||||
|
||||
after 5
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[r randomkey] == "key"
|
||||
} else {
|
||||
fail "execute randomkey failed, caused by the infinite loop"
|
||||
}
|
||||
|
||||
r client unpause
|
||||
assert_equal [r randomkey] {}
|
||||
}
|
||||
|
||||
# Make sure we unpause at the end
|
||||
r client unpause
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue