mirror of https://mirror.osredm.com/root/redis.git
Optimize client memory usage tracking operation while client eviction is disabled (#11348)
## Issue During the client input/output buffer processing, the memory usage is incrementally updated to keep track of clients going beyond a certain threshold `maxmemory-clients` to be evicted. However, this additional tracking activity leads to unnecessary CPU cycles wasted when no client-eviction is required. It is applicable in two cases. * `maxmemory-clients` is set to `0` which equates to no client eviction (applicable to all clients) * `CLIENT NO-EVICT` flag is set to `ON` which equates to a particular client not applicable for eviction. ## Solution * Disable client memory usage tracking during the read/write flow when `maxmemory-clients` is set to `0` or `client no-evict` is `on`. The memory usage is tracked only during the `clientCron` i.e. it gets periodically updated. * Cleanup the clients from the memory usage bucket when client eviction is disabled. * When the maxmemory-clients config is enabled or disabled at runtime, we immediately update the memory usage buckets for all clients (tested scanning 80000 took some 20ms) Benchmark shown that this can improve performance by about 5% in certain situations. Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
8a315fc285
commit
c0267b3fa5
33
src/config.c
33
src/config.c
|
@ -2976,6 +2976,37 @@ void rewriteConfigLatencyTrackingInfoPercentilesOutputOption(standardConfig *con
|
||||||
rewriteConfigRewriteLine(state,name,line,1);
|
rewriteConfigRewriteLine(state,name,line,1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int applyClientMaxMemoryUsage(const char **err) {
|
||||||
|
UNUSED(err);
|
||||||
|
listIter li;
|
||||||
|
listNode *ln;
|
||||||
|
|
||||||
|
/* server.client_mem_usage_buckets is an indication that the previous config
|
||||||
|
* was non-zero, in which case we can exit and no apply is needed. */
|
||||||
|
if(server.maxmemory_clients !=0 && server.client_mem_usage_buckets)
|
||||||
|
return 1;
|
||||||
|
if (server.maxmemory_clients != 0)
|
||||||
|
initServerClientMemUsageBuckets();
|
||||||
|
|
||||||
|
/* When client eviction is enabled update memory buckets for all clients.
|
||||||
|
* When disabled, clear that data structure. */
|
||||||
|
listRewind(server.clients, &li);
|
||||||
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
client *c = listNodeValue(ln);
|
||||||
|
if (server.maxmemory_clients == 0) {
|
||||||
|
/* Remove client from memory usage bucket. */
|
||||||
|
removeClientFromMemUsageBucket(c, 0);
|
||||||
|
} else {
|
||||||
|
/* Update each client(s) memory usage and add to appropriate bucket. */
|
||||||
|
updateClientMemUsageAndBucket(c);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (server.maxmemory_clients == 0)
|
||||||
|
freeServerClientMemUsageBuckets();
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
standardConfig static_configs[] = {
|
standardConfig static_configs[] = {
|
||||||
/* Bool configs */
|
/* Bool configs */
|
||||||
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
|
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
|
||||||
|
@ -3146,7 +3177,7 @@ standardConfig static_configs[] = {
|
||||||
createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL),
|
createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL),
|
||||||
createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */
|
createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */
|
||||||
createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */
|
createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */
|
||||||
createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL),
|
createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, applyClientMaxMemoryUsage),
|
||||||
|
|
||||||
/* Other configs */
|
/* Other configs */
|
||||||
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */
|
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */
|
||||||
|
|
|
@ -947,6 +947,10 @@ NULL
|
||||||
else
|
else
|
||||||
addReply(c, shared.ok);
|
addReply(c, shared.ok);
|
||||||
} else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) {
|
} else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) {
|
||||||
|
if (!server.client_mem_usage_buckets) {
|
||||||
|
addReplyError(c,"maxmemory-clients is disabled.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
sds bucket_info = sdsempty();
|
sds bucket_info = sdsempty();
|
||||||
for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
|
for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
|
||||||
if (j == 0)
|
if (j == 0)
|
||||||
|
|
|
@ -1970,7 +1970,7 @@ int writeToClient(client *c, int handler_installed) {
|
||||||
* Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
|
* Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
|
||||||
* handleClientsWithPendingWritesUsingThreads(). */
|
* handleClientsWithPendingWritesUsingThreads(). */
|
||||||
if (io_threads_op == IO_THREADS_OP_IDLE)
|
if (io_threads_op == IO_THREADS_OP_IDLE)
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2420,7 +2420,7 @@ int processCommandAndResetClient(client *c) {
|
||||||
commandProcessed(c);
|
commandProcessed(c);
|
||||||
/* Update the client's memory to include output buffer growth following the
|
/* Update the client's memory to include output buffer growth following the
|
||||||
* processed command. */
|
* processed command. */
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (server.current_client == NULL) deadclient = 1;
|
if (server.current_client == NULL) deadclient = 1;
|
||||||
|
@ -2557,7 +2557,7 @@ int processInputBuffer(client *c) {
|
||||||
* important in case the query buffer is big and wasn't drained during
|
* important in case the query buffer is big and wasn't drained during
|
||||||
* the above loop (because of partially sent big commands). */
|
* the above loop (because of partially sent big commands). */
|
||||||
if (io_threads_op == IO_THREADS_OP_IDLE)
|
if (io_threads_op == IO_THREADS_OP_IDLE)
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
|
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
@ -2995,9 +2995,11 @@ NULL
|
||||||
/* CLIENT NO-EVICT ON|OFF */
|
/* CLIENT NO-EVICT ON|OFF */
|
||||||
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
if (!strcasecmp(c->argv[2]->ptr,"on")) {
|
||||||
c->flags |= CLIENT_NO_EVICT;
|
c->flags |= CLIENT_NO_EVICT;
|
||||||
|
removeClientFromMemUsageBucket(c, 0);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
|
||||||
c->flags &= ~CLIENT_NO_EVICT;
|
c->flags &= ~CLIENT_NO_EVICT;
|
||||||
|
updateClientMemUsageAndBucket(c);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
} else {
|
} else {
|
||||||
addReplyErrorObject(c,shared.syntaxerr);
|
addReplyErrorObject(c,shared.syntaxerr);
|
||||||
|
@ -4228,7 +4230,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
|
||||||
client *c = listNodeValue(ln);
|
client *c = listNodeValue(ln);
|
||||||
|
|
||||||
/* Update the client in the mem usage after we're done processing it in the io-threads */
|
/* Update the client in the mem usage after we're done processing it in the io-threads */
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
|
|
||||||
/* Install the write handler if there are pending writes in some
|
/* Install the write handler if there are pending writes in some
|
||||||
* of the clients. */
|
* of the clients. */
|
||||||
|
@ -4337,7 +4339,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Once io-threads are idle we can update the client in the mem usage */
|
/* Once io-threads are idle we can update the client in the mem usage */
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
|
|
||||||
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
|
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
|
||||||
/* If the client is no longer valid, we avoid
|
/* If the client is no longer valid, we avoid
|
||||||
|
@ -4384,6 +4386,8 @@ size_t getClientEvictionLimit(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void evictClients(void) {
|
void evictClients(void) {
|
||||||
|
if (!server.client_mem_usage_buckets)
|
||||||
|
return;
|
||||||
/* Start eviction from topmost bucket (largest clients) */
|
/* Start eviction from topmost bucket (largest clients) */
|
||||||
int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
|
int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
|
||||||
listIter bucket_iter;
|
listIter bucket_iter;
|
||||||
|
|
|
@ -1208,7 +1208,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
|
||||||
|
|
||||||
/* Computing the memory used by the clients would be O(N) if done
|
/* Computing the memory used by the clients would be O(N) if done
|
||||||
* here online. We use our values computed incrementally by
|
* here online. We use our values computed incrementally by
|
||||||
* updateClientMemUsage(). */
|
* updateClientMemoryUsage(). */
|
||||||
mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+
|
mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+
|
||||||
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+
|
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+
|
||||||
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL];
|
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL];
|
||||||
|
|
|
@ -465,7 +465,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
client *c = ln->value;
|
client *c = ln->value;
|
||||||
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
|
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
receivers++;
|
receivers++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -491,7 +491,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
client *c = listNodeValue(ln);
|
client *c = listNodeValue(ln);
|
||||||
addReplyPubsubPatMessage(c,pattern,channel,message);
|
addReplyPubsubPatMessage(c,pattern,channel,message);
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
receivers++;
|
receivers++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -593,7 +593,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
|
||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *monitor = ln->value;
|
client *monitor = ln->value;
|
||||||
addReply(monitor,cmdobj);
|
addReply(monitor,cmdobj);
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
}
|
}
|
||||||
decrRefCount(cmdobj);
|
decrRefCount(cmdobj);
|
||||||
}
|
}
|
||||||
|
|
135
src/server.c
135
src/server.c
|
@ -839,37 +839,44 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) {
|
||||||
return &server.client_mem_usage_buckets[bucket_idx];
|
return &server.client_mem_usage_buckets[bucket_idx];
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is called both on explicit clients when something changed their buffers,
|
/*
|
||||||
* so we can track clients' memory and enforce clients' maxmemory in real time,
|
* This method updates the client memory usage and update the
|
||||||
* and also from the clientsCron. We call it from the cron so we have updated
|
* server stats for client type.
|
||||||
* stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration
|
|
||||||
* change requires us to evict a non-active client.
|
|
||||||
*
|
*
|
||||||
* This also adds the client to the correct memory usage bucket. Each bucket contains
|
* This method is called from the clientsCron to have updated
|
||||||
* all clients with roughly the same amount of memory. This way we group
|
* stats for non CLIENT_TYPE_NORMAL/PUBSUB clients to accurately
|
||||||
* together clients consuming about the same amount of memory and can quickly
|
* provide information around clients memory usage.
|
||||||
* free them in case we reach maxmemory-clients (client eviction).
|
*
|
||||||
|
* It is also used in updateClientMemUsageAndBucket to have latest
|
||||||
|
* client memory usage information to place it into appropriate client memory
|
||||||
|
* usage bucket.
|
||||||
*/
|
*/
|
||||||
int updateClientMemUsage(client *c) {
|
void updateClientMemoryUsage(client *c) {
|
||||||
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
|
|
||||||
size_t mem = getClientMemoryUsage(c, NULL);
|
size_t mem = getClientMemoryUsage(c, NULL);
|
||||||
int type = getClientType(c);
|
int type = getClientType(c);
|
||||||
|
/* Now that we have the memory used by the client, remove the old
|
||||||
|
* value from the old category, and add it back. */
|
||||||
|
server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage;
|
||||||
|
server.stat_clients_type_memory[type] += mem;
|
||||||
|
/* Remember what we added and where, to remove it next time. */
|
||||||
|
c->last_memory_type = type;
|
||||||
|
c->last_memory_usage = mem;
|
||||||
|
}
|
||||||
|
|
||||||
/* Remove the old value of the memory used by the client from the old
|
int clientEvictionAllowed(client *c) {
|
||||||
* category, and add it back. */
|
if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) {
|
||||||
if (type != c->last_memory_type) {
|
return 0;
|
||||||
server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage;
|
|
||||||
server.stat_clients_type_memory[type] += mem;
|
|
||||||
c->last_memory_type = type;
|
|
||||||
} else {
|
|
||||||
server.stat_clients_type_memory[type] += mem - c->last_memory_usage;
|
|
||||||
}
|
}
|
||||||
|
int type = getClientType(c);
|
||||||
|
return (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB);
|
||||||
|
}
|
||||||
|
|
||||||
int allow_eviction =
|
|
||||||
(type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB) &&
|
|
||||||
!(c->flags & CLIENT_NO_EVICT);
|
|
||||||
|
|
||||||
/* Update the client in the mem usage buckets */
|
/* This function is used to cleanup the client's previously tracked memory usage.
|
||||||
|
* This is called during incremental client memory usage tracking as well as
|
||||||
|
* used to reset when client to bucket allocation is not required when
|
||||||
|
* client eviction is disabled. */
|
||||||
|
void removeClientFromMemUsageBucket(client *c, int allow_eviction) {
|
||||||
if (c->mem_usage_bucket) {
|
if (c->mem_usage_bucket) {
|
||||||
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
|
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
|
||||||
/* If this client can't be evicted then remove it from the mem usage
|
/* If this client can't be evicted then remove it from the mem usage
|
||||||
|
@ -880,23 +887,42 @@ int updateClientMemUsage(client *c) {
|
||||||
c->mem_usage_bucket_node = NULL;
|
c->mem_usage_bucket_node = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (allow_eviction) {
|
}
|
||||||
clientMemUsageBucket *bucket = getMemUsageBucket(mem);
|
|
||||||
bucket->mem_usage_sum += mem;
|
/* This is called only if explicit clients when something changed their buffers,
|
||||||
if (bucket != c->mem_usage_bucket) {
|
* so we can track clients' memory and enforce clients' maxmemory in real time.
|
||||||
if (c->mem_usage_bucket)
|
*
|
||||||
listDelNode(c->mem_usage_bucket->clients,
|
* This also adds the client to the correct memory usage bucket. Each bucket contains
|
||||||
c->mem_usage_bucket_node);
|
* all clients with roughly the same amount of memory. This way we group
|
||||||
c->mem_usage_bucket = bucket;
|
* together clients consuming about the same amount of memory and can quickly
|
||||||
listAddNodeTail(bucket->clients, c);
|
* free them in case we reach maxmemory-clients (client eviction).
|
||||||
c->mem_usage_bucket_node = listLast(bucket->clients);
|
*
|
||||||
}
|
* returns 1 if client eviction for this client is allowed, 0 otherwise.
|
||||||
|
*/
|
||||||
|
int updateClientMemUsageAndBucket(client *c) {
|
||||||
|
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
|
||||||
|
int allow_eviction = clientEvictionAllowed(c);
|
||||||
|
removeClientFromMemUsageBucket(c, allow_eviction);
|
||||||
|
|
||||||
|
if (!allow_eviction) {
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Remember what we added, to remove it next time. */
|
/* Update client memory usage. */
|
||||||
c->last_memory_usage = mem;
|
updateClientMemoryUsage(c);
|
||||||
|
|
||||||
return 0;
|
/* Update the client in the mem usage buckets */
|
||||||
|
clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage);
|
||||||
|
bucket->mem_usage_sum += c->last_memory_usage;
|
||||||
|
if (bucket != c->mem_usage_bucket) {
|
||||||
|
if (c->mem_usage_bucket)
|
||||||
|
listDelNode(c->mem_usage_bucket->clients,
|
||||||
|
c->mem_usage_bucket_node);
|
||||||
|
c->mem_usage_bucket = bucket;
|
||||||
|
listAddNodeTail(bucket->clients, c);
|
||||||
|
c->mem_usage_bucket_node = listLast(bucket->clients);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Return the max samples in the memory usage of clients tracked by
|
/* Return the max samples in the memory usage of clients tracked by
|
||||||
|
@ -984,8 +1010,11 @@ void clientsCron(void) {
|
||||||
* in turn would make the INFO command too slow. So we perform this
|
* in turn would make the INFO command too slow. So we perform this
|
||||||
* computation incrementally and track the (not instantaneous but updated
|
* computation incrementally and track the (not instantaneous but updated
|
||||||
* to the second) total memory used by clients using clientsCron() in
|
* to the second) total memory used by clients using clientsCron() in
|
||||||
* a more incremental way (depending on server.hz). */
|
* a more incremental way (depending on server.hz).
|
||||||
if (updateClientMemUsage(c)) continue;
|
* If client eviction is enabled, update the bucket as well. */
|
||||||
|
if (!updateClientMemUsageAndBucket(c))
|
||||||
|
updateClientMemoryUsage(c);
|
||||||
|
|
||||||
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
|
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1865,6 +1894,25 @@ void createSharedObjects(void) {
|
||||||
shared.maxstring = sdsnew("maxstring");
|
shared.maxstring = sdsnew("maxstring");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void initServerClientMemUsageBuckets() {
|
||||||
|
if (server.client_mem_usage_buckets)
|
||||||
|
return;
|
||||||
|
server.client_mem_usage_buckets = zmalloc(sizeof(clientMemUsageBucket)*CLIENT_MEM_USAGE_BUCKETS);
|
||||||
|
for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
|
||||||
|
server.client_mem_usage_buckets[j].mem_usage_sum = 0;
|
||||||
|
server.client_mem_usage_buckets[j].clients = listCreate();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void freeServerClientMemUsageBuckets() {
|
||||||
|
if (!server.client_mem_usage_buckets)
|
||||||
|
return;
|
||||||
|
for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++)
|
||||||
|
listRelease(server.client_mem_usage_buckets[j].clients);
|
||||||
|
zfree(server.client_mem_usage_buckets);
|
||||||
|
server.client_mem_usage_buckets = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
void initServerConfig(void) {
|
void initServerConfig(void) {
|
||||||
int j;
|
int j;
|
||||||
char *default_bindaddr[CONFIG_DEFAULT_BINDADDR_COUNT] = CONFIG_DEFAULT_BINDADDR;
|
char *default_bindaddr[CONFIG_DEFAULT_BINDADDR_COUNT] = CONFIG_DEFAULT_BINDADDR;
|
||||||
|
@ -2461,6 +2509,7 @@ void initServer(void) {
|
||||||
server.cluster_drop_packet_filter = -1;
|
server.cluster_drop_packet_filter = -1;
|
||||||
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
|
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
|
||||||
server.reply_buffer_resizing_enabled = 1;
|
server.reply_buffer_resizing_enabled = 1;
|
||||||
|
server.client_mem_usage_buckets = NULL;
|
||||||
resetReplicationBuffer();
|
resetReplicationBuffer();
|
||||||
|
|
||||||
/* Make sure the locale is set on startup based on the config file. */
|
/* Make sure the locale is set on startup based on the config file. */
|
||||||
|
@ -2469,11 +2518,6 @@ void initServer(void) {
|
||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
|
|
||||||
server.client_mem_usage_buckets[j].mem_usage_sum = 0;
|
|
||||||
server.client_mem_usage_buckets[j].clients = listCreate();
|
|
||||||
}
|
|
||||||
|
|
||||||
createSharedObjects();
|
createSharedObjects();
|
||||||
adjustOpenFilesLimit();
|
adjustOpenFilesLimit();
|
||||||
const char *clk_msg = monotonicInit();
|
const char *clk_msg = monotonicInit();
|
||||||
|
@ -2606,6 +2650,9 @@ void initServer(void) {
|
||||||
ACLUpdateDefaultUserPassword(server.requirepass);
|
ACLUpdateDefaultUserPassword(server.requirepass);
|
||||||
|
|
||||||
applyWatchdogPeriod();
|
applyWatchdogPeriod();
|
||||||
|
|
||||||
|
if (server.maxmemory_clients != 0)
|
||||||
|
initServerClientMemUsageBuckets();
|
||||||
}
|
}
|
||||||
|
|
||||||
void initListeners() {
|
void initListeners() {
|
||||||
|
|
10
src/server.h
10
src/server.h
|
@ -1197,7 +1197,7 @@ typedef struct client {
|
||||||
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
|
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
|
||||||
subscribed to in BCAST mode, in the
|
subscribed to in BCAST mode, in the
|
||||||
context of client side caching. */
|
context of client side caching. */
|
||||||
/* In updateClientMemUsage() we track the memory usage of
|
/* In updateClientMemoryUsage() we track the memory usage of
|
||||||
* each client and add it to the sum of all the clients of a given type,
|
* each client and add it to the sum of all the clients of a given type,
|
||||||
* however we need to remember what was the old contribution of each
|
* however we need to remember what was the old contribution of each
|
||||||
* client, and in which category the client was, in order to remove it
|
* client, and in which category the client was, in order to remove it
|
||||||
|
@ -1551,7 +1551,7 @@ struct redisServer {
|
||||||
client *current_client; /* Current client executing the command. */
|
client *current_client; /* Current client executing the command. */
|
||||||
|
|
||||||
/* Stuff for client mem eviction */
|
/* Stuff for client mem eviction */
|
||||||
clientMemUsageBucket client_mem_usage_buckets[CLIENT_MEM_USAGE_BUCKETS];
|
clientMemUsageBucket* client_mem_usage_buckets;
|
||||||
|
|
||||||
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
|
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
|
||||||
int in_nested_call; /* If > 0, in a nested call of a call */
|
int in_nested_call; /* If > 0, in a nested call of a call */
|
||||||
|
@ -2577,8 +2577,8 @@ int handleClientsWithPendingReadsUsingThreads(void);
|
||||||
int stopThreadedIOIfNeeded(void);
|
int stopThreadedIOIfNeeded(void);
|
||||||
int clientHasPendingReplies(client *c);
|
int clientHasPendingReplies(client *c);
|
||||||
int islocalClient(client *c);
|
int islocalClient(client *c);
|
||||||
int updateClientMemUsage(client *c);
|
int updateClientMemUsageAndBucket(client *c);
|
||||||
void updateClientMemUsageBucket(client *c);
|
void removeClientFromMemUsageBucket(client *c, int allow_eviction);
|
||||||
void unlinkClient(client *c);
|
void unlinkClient(client *c);
|
||||||
int writeToClient(client *c, int handler_installed);
|
int writeToClient(client *c, int handler_installed);
|
||||||
void linkClient(client *c);
|
void linkClient(client *c);
|
||||||
|
@ -3117,6 +3117,8 @@ void initConfigValues();
|
||||||
void removeConfig(sds name);
|
void removeConfig(sds name);
|
||||||
sds getConfigDebugInfo();
|
sds getConfigDebugInfo();
|
||||||
int allowProtectedAction(int config, client *c);
|
int allowProtectedAction(int config, client *c);
|
||||||
|
void initServerClientMemUsageBuckets();
|
||||||
|
void freeServerClientMemUsageBuckets();
|
||||||
|
|
||||||
/* Module Configuration */
|
/* Module Configuration */
|
||||||
typedef struct ModuleConfig ModuleConfig;
|
typedef struct ModuleConfig ModuleConfig;
|
||||||
|
|
|
@ -311,7 +311,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
|
||||||
addReplyArrayLen(c,1);
|
addReplyArrayLen(c,1);
|
||||||
addReplyBulkCBuffer(c,keyname,keylen);
|
addReplyBulkCBuffer(c,keyname,keylen);
|
||||||
}
|
}
|
||||||
updateClientMemUsage(c);
|
updateClientMemUsageAndBucket(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is called when a key is modified in Redis and in the case
|
/* This function is called when a key is modified in Redis and in the case
|
||||||
|
|
|
@ -140,7 +140,7 @@ start_server {} {
|
||||||
set temp_maxmemory_clients 200000
|
set temp_maxmemory_clients 200000
|
||||||
r config set maxmemory-clients $temp_maxmemory_clients
|
r config set maxmemory-clients $temp_maxmemory_clients
|
||||||
|
|
||||||
# Append watched keys until list maxes out maxmemroy clients and causes client eviction
|
# Append watched keys until list maxes out maxmemory clients and causes client eviction
|
||||||
catch {
|
catch {
|
||||||
for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
|
for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
|
||||||
$rr watch $j
|
$rr watch $j
|
||||||
|
@ -467,7 +467,7 @@ start_server {} {
|
||||||
start_server {} {
|
start_server {} {
|
||||||
test "evict clients in right order (large to small)" {
|
test "evict clients in right order (large to small)" {
|
||||||
# Note that each size step needs to be at least x2 larger than previous step
|
# Note that each size step needs to be at least x2 larger than previous step
|
||||||
# because of how the client-eviction size bucktting works
|
# because of how the client-eviction size bucketing works
|
||||||
set sizes [list [kb 128] [mb 1] [mb 3]]
|
set sizes [list [kb 128] [mb 1] [mb 3]]
|
||||||
set clients_per_size 3
|
set clients_per_size 3
|
||||||
r client setname control
|
r client setname control
|
||||||
|
@ -531,5 +531,52 @@ start_server {} {
|
||||||
} {} {needs:debug}
|
} {} {needs:debug}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start_server {} {
|
||||||
|
foreach type {"client no-evict" "maxmemory-clients disabled"} {
|
||||||
|
r flushall
|
||||||
|
r client no-evict on
|
||||||
|
r config set maxmemory-clients 0
|
||||||
|
|
||||||
|
test "client total memory grows during $type" {
|
||||||
|
r setrange k [mb 1] v
|
||||||
|
set rr [redis_client]
|
||||||
|
$rr client setname test_client
|
||||||
|
if {$type eq "client no-evict"} {
|
||||||
|
$rr client no-evict on
|
||||||
|
r config set maxmemory-clients 1
|
||||||
|
}
|
||||||
|
$rr deferred 1
|
||||||
|
|
||||||
|
# Fill output buffer in loop without reading it and make sure
|
||||||
|
# the tot-mem of client has increased (OS buffers didn't swallow it)
|
||||||
|
# and eviction not occurring.
|
||||||
|
while {true} {
|
||||||
|
$rr get k
|
||||||
|
$rr flush
|
||||||
|
after 10
|
||||||
|
if {[client_field test_client tot-mem] > [mb 10]} {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Trigger the client eviction, by flipping the no-evict flag to off
|
||||||
|
if {$type eq "client no-evict"} {
|
||||||
|
$rr client no-evict off
|
||||||
|
} else {
|
||||||
|
r config set maxmemory-clients 1
|
||||||
|
}
|
||||||
|
|
||||||
|
# wait for the client to be disconnected
|
||||||
|
wait_for_condition 5000 50 {
|
||||||
|
![client_exists test_client]
|
||||||
|
} else {
|
||||||
|
puts [r client list]
|
||||||
|
fail "client was not disconnected"
|
||||||
|
}
|
||||||
|
$rr close
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} ;# tags
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue