This commit is contained in:
Yuan Wang 2025-05-14 01:19:21 +08:00 committed by GitHub
commit 79e175f71a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 97 additions and 4 deletions

View File

@ -1479,7 +1479,7 @@ void freeClientOriginalArgv(client *c) {
static inline void freeClientArgvInternal(client *c, int free_argv) {
int j;
for (j = 0; j < c->argc; j++)
decrRefCount(c->argv[j]);
tryFreeObjectToArgvOjectPool(c->argv[j]);
c->argc = 0;
c->cmd = NULL;
c->iolookedcmd = NULL;
@ -2587,8 +2587,9 @@ int processMultibulkBuffer(client *c) {
sdsclear(c->querybuf);
querybuf_len = sdslen(c->querybuf); /* Update cached length */
} else {
c->argv[c->argc++] =
createStringObject(c->querybuf+c->qb_pos,c->bulklen);
c->argv[c->argc++] = c->running_tid == IOTHREAD_MAIN_THREAD_ID ?
tryAllocObjectFromArgvOjectPool(c->querybuf+c->qb_pos, c->bulklen) :
createStringObject(c->querybuf+c->qb_pos, c->bulklen);
c->argv_len_sum += c->bulklen;
c->qb_pos += c->bulklen+2;
}
@ -2827,7 +2828,6 @@ int processInputBuffer(client *c) {
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
freeClientArgvInternal(c, 0);
c->reqtype = 0;
c->multibulklen = 0;
c->bulklen = -1;

View File

@ -925,6 +925,85 @@ robj *getDecodedObject(robj *o) {
}
}
/* We create string object pools for client argv object to avoid memory allocation
* and deallocation. The string objects use EMBSTR encoding to avoid memory waste.
* As the header of string object with EMBSTR encoding has the fixed size of 20 bytes,
* and memory allocators are typically aligned to 8 bytes, the size of string object
* with EMBSTR encoding will be 24, 32, 40, 48, 56, or 64 bytes, so the significant
* length for storing data is 4, 12, 20, 28, 36, or 44 bytes. Now we create 6 pools
* to cover all EMBSTR encoding string object with different sizes. */
#define CLIENT_ARGV_OBJECT_POOL_NUM 6
#define CLIENT_ARGV_OBJECT_POOL_INIT_LOG 4 /* 16 */
#define CLIENT_ARGV_OBJECT_POOL_CAPACITY_LOG 8 /* 256, powers of 2 allow fast % with & */
/* Create client argv object pools and initialize them. */
clientArgvOjectPool *createClientArgvObjectPools(void) {
serverAssert((CLIENT_ARGV_OBJECT_POOL_NUM-1)*8+4 == OBJ_ENCODING_EMBSTR_SIZE_LIMIT);
serverAssert(CLIENT_ARGV_OBJECT_POOL_INIT_LOG <= CLIENT_ARGV_OBJECT_POOL_CAPACITY_LOG);
clientArgvOjectPool *pools = zmalloc(sizeof(clientArgvOjectPool) *
CLIENT_ARGV_OBJECT_POOL_NUM);
for (int i = 0; i < CLIENT_ARGV_OBJECT_POOL_NUM; i++) {
clientArgvOjectPool *pool = pools + i;
pool->object_size = 4 + i*8;
pool->count = 1 << CLIENT_ARGV_OBJECT_POOL_INIT_LOG;
pool->capacity = 1 << CLIENT_ARGV_OBJECT_POOL_CAPACITY_LOG;
pool->object_queue = zcalloc(sizeof(robj*) * pool->capacity);
for (int i = 0; i < pool->count; i++) {
pool->object_queue[i] = createStringObject(SDS_NOINIT, pool->object_size);
}
pool->head = 0;
pool->tail = pool->count & (pool->capacity - 1);
}
return pools;
}
/* Try to get a client argv object from the pool, if the length of the string
* is larger than OBJ_ENCODING_EMBSTR_SIZE_LIMIT, create a new string object.
* If the pool is empty, we try to alloc a new object. If the pool has objects,
* just return the object from the pool. */
robj *tryAllocObjectFromArgvOjectPool(const char *ptr, size_t len) {
if (len > OBJ_ENCODING_EMBSTR_SIZE_LIMIT) return createStringObject(ptr, len);
robj* o = NULL;
int pool_index = len <= 4 ? 0 : ((len-5)>>3)+1;
clientArgvOjectPool *pool = &server.client_argv_object_pools[pool_index];
if (likely(pool->count > 0)) {
o = pool->object_queue[pool->head];
pool->head = (pool->head + 1) & (pool->capacity - 1);
pool->count--;
} else {
/* If object queue is empty, create a new object with the specific size,
* so it will be put into the object queue when freeing. */
o = createStringObject(SDS_NOINIT, pool->object_size);
}
struct sdshdr8 *sh = (void*)(o + 1);
sh->len = len;
memcpy(sh->buf, ptr, len);
sh->buf[len] = '\0';
return o;
}
/* Try to return an object to the pool when freeing a client argv object. */
void tryFreeObjectToArgvOjectPool(robj *o) {
if (o->refcount > 1 || o->encoding != OBJ_ENCODING_EMBSTR) {
decrRefCount(o);
return;
}
int alloc_size = sdsalloc(o->ptr);
clientArgvOjectPool *pool = &server.client_argv_object_pools[alloc_size >> 3];
if (pool->object_size != alloc_size || pool->count >= pool->capacity) {
decrRefCount(o);
return;
}
pool->object_queue[pool->tail] = o;
pool->tail = (pool->tail + 1) & (pool->capacity - 1);
pool->count++;
}
/* Compare two string objects via strcmp() or strcoll() depending on flags.
* Note that the objects may be integer-encoded. In such a case we
* use ll2string() to get a string representation of the numbers on the stack

View File

@ -2791,6 +2791,7 @@ void initServer(void) {
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
server.reply_buffer_resizing_enabled = 1;
server.client_mem_usage_buckets = NULL;
server.client_argv_object_pools = createClientArgvObjectPools();
resetReplicationBuffer();
/* Make sure the locale is set on startup based on the config file. */

View File

@ -1313,6 +1313,15 @@ typedef struct {
} clientReqResInfo;
#endif
typedef struct {
int head; /* Index of the getting object in pool. */
int tail; /* Index of the putting object in pool. */
int count; /* Number of objects in pool. */
int capacity; /* Capacity of the pool. */
robj **object_queue; /* Queue of objects. */
int object_size; /* Size of the object in this pool. */
} clientArgvOjectPool;
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
uint64_t flags; /* Client flags: CLIENT_* macros. */
@ -1808,6 +1817,7 @@ struct redisServer {
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* The client that triggered the command execution (External or AOF). */
client *executing_client; /* The client executing the current command (possibly script or module). */
clientArgvOjectPool *client_argv_object_pools; /* Pools for client argv objects. */
#ifdef LOG_REQ_RES
char *req_res_logfile; /* Path of log file for logging all requests and their replies. If NULL, no logging will be performed */
@ -3034,6 +3044,9 @@ robj *createZsetObject(void);
robj *createZsetListpackObject(void);
robj *createStreamObject(void);
robj *createModuleObject(moduleType *mt, void *value);
clientArgvOjectPool *createClientArgvObjectPools(void);
robj *tryAllocObjectFromArgvOjectPool(const char *ptr, size_t len);
void tryFreeObjectToArgvOjectPool(robj *obj);
int getLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg);
int getPositiveLongFromObjectOrReply(client *c, robj *o, long *target, const char *msg);
int getRangeLongFromObjectOrReply(client *c, robj *o, long min, long max, long *target, const char *msg);