finish memory prefetch

This commit is contained in:
Yuan Wang 2025-04-30 23:28:49 +08:00
parent c472d7f606
commit f573e7e6c8
12 changed files with 363 additions and 129 deletions

View File

@ -1334,6 +1334,18 @@ lazyfree-lazy-user-flush no
# --threads option to match the number of Redis threads, otherwise you'll not
# be able to notice the improvements.
# When multiple commands are parsed by the I/O threads and ready for execution,
# we take advantage of knowing the next set of commands and prefetch their
# required dictionary entries in a batch. This reduces memory access costs.
#
# The optimal batch size depends on the specific workflow of the user.
# The default batch size is 16, which can be modified using the
# 'prefetch-batch-max-size' config.
#
# When the config is set to 0, prefetching is disabled.
#
# prefetch-batch-max-size 16
############################ KERNEL OOM CONTROL ##############################
# On Linux, it is possible to hint the kernel OOM killer on what processes

View File

@ -3168,6 +3168,7 @@ standardConfig static_configs[] = {
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),

View File

@ -115,7 +115,7 @@
#endif
#if HAS_BUILTIN_PREFETCH
#define redis_prefetch(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */
#define redis_prefetch(addr) __builtin_prefetch(addr)
#define redis_prefetch_read(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */
#define redis_prefetch_write(addr) __builtin_prefetch(addr, 1, 3) /* Write with high locality */
#else

View File

@ -300,6 +300,24 @@ int getKeySlot(sds key) {
return calculateKeySlot(key);
}
/* Return the slot of the key in the command. IO threads use this function
* to calculate slot to reduce main-thread load */
int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) {
int slot = -1;
if (!cmd || !server.cluster_enabled) return slot;
/* Get the keys from the command */
getKeysResult result = GETKEYS_RESULT_INIT;
int numkeys = getKeysFromCommand(cmd, argv, argc, &result);
if (numkeys > 0) {
/* Get the slot of the first key */
robj *first = argv[result.keys[0].pos];
slot = keyHashSlot(first->ptr, (int)sdslen(first->ptr));
}
getKeysFreeResult(&result);
return slot;
}
/* This is a special version of dbAdd() that is used only when loading
* keys from the RDB file: the key is passed as an SDS string that is
* retained by the function (and not freed by the caller).

View File

@ -44,9 +44,9 @@
/* Everything below this line is automatically generated by
* generate-fmtargs.py. Do not manually edit. */
#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N
#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, _123, _124, _125, _126, _127, _128, _129, _130, _131, _132, _133, _134, _135, _136, _137, _138, _139, _140, _141, _142, _143, _144, _145, _146, _147, _148, _149, _150, _151, _152, _153, _154, _155, _156, _157, _158, _159, _160, _161, _162, _163, _164, _165, _166, _167, _168, _169, _170, _171, _172, _173, _174, _175, _176, _177, _178, _179, _180, _181, _182, _183, _184, _185, _186, _187, _188, _189, _190, _191, _192, _193, _194, _195, _196, _197, _198, _199, _200, N, ...) N
#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
#define RSEQ_N() 200, 199, 198, 197, 196, 195, 194, 193, 192, 191, 190, 189, 188, 187, 186, 185, 184, 183, 182, 181, 180, 179, 178, 177, 176, 175, 174, 173, 172, 171, 170, 169, 168, 167, 166, 165, 164, 163, 162, 161, 160, 159, 158, 157, 156, 155, 154, 153, 152, 151, 150, 149, 148, 147, 146, 145, 144, 143, 142, 141, 140, 139, 138, 137, 136, 135, 134, 133, 132, 131, 130, 129, 128, 127, 126, 125, 124, 123, 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
#define COMPACT_FMT_2(fmt, value) fmt
#define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__)
@ -108,6 +108,46 @@
#define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__)
#define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__)
#define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__)
#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__)
#define COMPACT_FMT_124(fmt, value, ...) fmt COMPACT_FMT_122(__VA_ARGS__)
#define COMPACT_FMT_126(fmt, value, ...) fmt COMPACT_FMT_124(__VA_ARGS__)
#define COMPACT_FMT_128(fmt, value, ...) fmt COMPACT_FMT_126(__VA_ARGS__)
#define COMPACT_FMT_130(fmt, value, ...) fmt COMPACT_FMT_128(__VA_ARGS__)
#define COMPACT_FMT_132(fmt, value, ...) fmt COMPACT_FMT_130(__VA_ARGS__)
#define COMPACT_FMT_134(fmt, value, ...) fmt COMPACT_FMT_132(__VA_ARGS__)
#define COMPACT_FMT_136(fmt, value, ...) fmt COMPACT_FMT_134(__VA_ARGS__)
#define COMPACT_FMT_138(fmt, value, ...) fmt COMPACT_FMT_136(__VA_ARGS__)
#define COMPACT_FMT_140(fmt, value, ...) fmt COMPACT_FMT_138(__VA_ARGS__)
#define COMPACT_FMT_142(fmt, value, ...) fmt COMPACT_FMT_140(__VA_ARGS__)
#define COMPACT_FMT_144(fmt, value, ...) fmt COMPACT_FMT_142(__VA_ARGS__)
#define COMPACT_FMT_146(fmt, value, ...) fmt COMPACT_FMT_144(__VA_ARGS__)
#define COMPACT_FMT_148(fmt, value, ...) fmt COMPACT_FMT_146(__VA_ARGS__)
#define COMPACT_FMT_150(fmt, value, ...) fmt COMPACT_FMT_148(__VA_ARGS__)
#define COMPACT_FMT_152(fmt, value, ...) fmt COMPACT_FMT_150(__VA_ARGS__)
#define COMPACT_FMT_154(fmt, value, ...) fmt COMPACT_FMT_152(__VA_ARGS__)
#define COMPACT_FMT_156(fmt, value, ...) fmt COMPACT_FMT_154(__VA_ARGS__)
#define COMPACT_FMT_158(fmt, value, ...) fmt COMPACT_FMT_156(__VA_ARGS__)
#define COMPACT_FMT_160(fmt, value, ...) fmt COMPACT_FMT_158(__VA_ARGS__)
#define COMPACT_FMT_162(fmt, value, ...) fmt COMPACT_FMT_160(__VA_ARGS__)
#define COMPACT_FMT_164(fmt, value, ...) fmt COMPACT_FMT_162(__VA_ARGS__)
#define COMPACT_FMT_166(fmt, value, ...) fmt COMPACT_FMT_164(__VA_ARGS__)
#define COMPACT_FMT_168(fmt, value, ...) fmt COMPACT_FMT_166(__VA_ARGS__)
#define COMPACT_FMT_170(fmt, value, ...) fmt COMPACT_FMT_168(__VA_ARGS__)
#define COMPACT_FMT_172(fmt, value, ...) fmt COMPACT_FMT_170(__VA_ARGS__)
#define COMPACT_FMT_174(fmt, value, ...) fmt COMPACT_FMT_172(__VA_ARGS__)
#define COMPACT_FMT_176(fmt, value, ...) fmt COMPACT_FMT_174(__VA_ARGS__)
#define COMPACT_FMT_178(fmt, value, ...) fmt COMPACT_FMT_176(__VA_ARGS__)
#define COMPACT_FMT_180(fmt, value, ...) fmt COMPACT_FMT_178(__VA_ARGS__)
#define COMPACT_FMT_182(fmt, value, ...) fmt COMPACT_FMT_180(__VA_ARGS__)
#define COMPACT_FMT_184(fmt, value, ...) fmt COMPACT_FMT_182(__VA_ARGS__)
#define COMPACT_FMT_186(fmt, value, ...) fmt COMPACT_FMT_184(__VA_ARGS__)
#define COMPACT_FMT_188(fmt, value, ...) fmt COMPACT_FMT_186(__VA_ARGS__)
#define COMPACT_FMT_190(fmt, value, ...) fmt COMPACT_FMT_188(__VA_ARGS__)
#define COMPACT_FMT_192(fmt, value, ...) fmt COMPACT_FMT_190(__VA_ARGS__)
#define COMPACT_FMT_194(fmt, value, ...) fmt COMPACT_FMT_192(__VA_ARGS__)
#define COMPACT_FMT_196(fmt, value, ...) fmt COMPACT_FMT_194(__VA_ARGS__)
#define COMPACT_FMT_198(fmt, value, ...) fmt COMPACT_FMT_196(__VA_ARGS__)
#define COMPACT_FMT_200(fmt, value, ...) fmt COMPACT_FMT_198(__VA_ARGS__)
#define COMPACT_VALUES_2(fmt, value) value
#define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__)
@ -169,5 +209,45 @@
#define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__)
#define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__)
#define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__)
#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__)
#define COMPACT_VALUES_124(fmt, value, ...) value, COMPACT_VALUES_122(__VA_ARGS__)
#define COMPACT_VALUES_126(fmt, value, ...) value, COMPACT_VALUES_124(__VA_ARGS__)
#define COMPACT_VALUES_128(fmt, value, ...) value, COMPACT_VALUES_126(__VA_ARGS__)
#define COMPACT_VALUES_130(fmt, value, ...) value, COMPACT_VALUES_128(__VA_ARGS__)
#define COMPACT_VALUES_132(fmt, value, ...) value, COMPACT_VALUES_130(__VA_ARGS__)
#define COMPACT_VALUES_134(fmt, value, ...) value, COMPACT_VALUES_132(__VA_ARGS__)
#define COMPACT_VALUES_136(fmt, value, ...) value, COMPACT_VALUES_134(__VA_ARGS__)
#define COMPACT_VALUES_138(fmt, value, ...) value, COMPACT_VALUES_136(__VA_ARGS__)
#define COMPACT_VALUES_140(fmt, value, ...) value, COMPACT_VALUES_138(__VA_ARGS__)
#define COMPACT_VALUES_142(fmt, value, ...) value, COMPACT_VALUES_140(__VA_ARGS__)
#define COMPACT_VALUES_144(fmt, value, ...) value, COMPACT_VALUES_142(__VA_ARGS__)
#define COMPACT_VALUES_146(fmt, value, ...) value, COMPACT_VALUES_144(__VA_ARGS__)
#define COMPACT_VALUES_148(fmt, value, ...) value, COMPACT_VALUES_146(__VA_ARGS__)
#define COMPACT_VALUES_150(fmt, value, ...) value, COMPACT_VALUES_148(__VA_ARGS__)
#define COMPACT_VALUES_152(fmt, value, ...) value, COMPACT_VALUES_150(__VA_ARGS__)
#define COMPACT_VALUES_154(fmt, value, ...) value, COMPACT_VALUES_152(__VA_ARGS__)
#define COMPACT_VALUES_156(fmt, value, ...) value, COMPACT_VALUES_154(__VA_ARGS__)
#define COMPACT_VALUES_158(fmt, value, ...) value, COMPACT_VALUES_156(__VA_ARGS__)
#define COMPACT_VALUES_160(fmt, value, ...) value, COMPACT_VALUES_158(__VA_ARGS__)
#define COMPACT_VALUES_162(fmt, value, ...) value, COMPACT_VALUES_160(__VA_ARGS__)
#define COMPACT_VALUES_164(fmt, value, ...) value, COMPACT_VALUES_162(__VA_ARGS__)
#define COMPACT_VALUES_166(fmt, value, ...) value, COMPACT_VALUES_164(__VA_ARGS__)
#define COMPACT_VALUES_168(fmt, value, ...) value, COMPACT_VALUES_166(__VA_ARGS__)
#define COMPACT_VALUES_170(fmt, value, ...) value, COMPACT_VALUES_168(__VA_ARGS__)
#define COMPACT_VALUES_172(fmt, value, ...) value, COMPACT_VALUES_170(__VA_ARGS__)
#define COMPACT_VALUES_174(fmt, value, ...) value, COMPACT_VALUES_172(__VA_ARGS__)
#define COMPACT_VALUES_176(fmt, value, ...) value, COMPACT_VALUES_174(__VA_ARGS__)
#define COMPACT_VALUES_178(fmt, value, ...) value, COMPACT_VALUES_176(__VA_ARGS__)
#define COMPACT_VALUES_180(fmt, value, ...) value, COMPACT_VALUES_178(__VA_ARGS__)
#define COMPACT_VALUES_182(fmt, value, ...) value, COMPACT_VALUES_180(__VA_ARGS__)
#define COMPACT_VALUES_184(fmt, value, ...) value, COMPACT_VALUES_182(__VA_ARGS__)
#define COMPACT_VALUES_186(fmt, value, ...) value, COMPACT_VALUES_184(__VA_ARGS__)
#define COMPACT_VALUES_188(fmt, value, ...) value, COMPACT_VALUES_186(__VA_ARGS__)
#define COMPACT_VALUES_190(fmt, value, ...) value, COMPACT_VALUES_188(__VA_ARGS__)
#define COMPACT_VALUES_192(fmt, value, ...) value, COMPACT_VALUES_190(__VA_ARGS__)
#define COMPACT_VALUES_194(fmt, value, ...) value, COMPACT_VALUES_192(__VA_ARGS__)
#define COMPACT_VALUES_196(fmt, value, ...) value, COMPACT_VALUES_194(__VA_ARGS__)
#define COMPACT_VALUES_198(fmt, value, ...) value, COMPACT_VALUES_196(__VA_ARGS__)
#define COMPACT_VALUES_200(fmt, value, ...) value, COMPACT_VALUES_198(__VA_ARGS__)
#endif

View File

@ -309,24 +309,30 @@ int sendPendingClientsToIOThreads(void) {
return processed;
}
int prefetchIOThreadCommand(IOThread *t) {
/* Prefetch the commands from the IO thread. The return value is the number
* of clients that have been prefetched. */
int prefetchIOThreadCommands(IOThread *t) {
/* Since small batch prefetching is not much effective, so if the remaining
* is small (less than twice the max batch size), prefetch all of it. */
int len = listLength(mainThreadProcessingClients[t->id]);
if (len < 2) return 0;
int iterate = 0;
int prefetch = len < server.prefetch_batch_max_size*2 ? len :
server.prefetch_batch_max_size;
int config_size = getConfigPrefetchBatchSize();
int to_prefetch = len < config_size*2 ? len : config_size;
if (to_prefetch == 0) return 0;
int clients = 0;
listIter li;
listNode *ln;
listRewind(mainThreadProcessingClients[t->id], &li);
while((ln = listNext(&li)) && iterate++ < prefetch) {
while((ln = listNext(&li)) && clients++ < to_prefetch) {
client *c = listNodeValue(ln);
addCommandToBatch(c);
/* One command may have several keys, the batch may be full,
* so we stop prefetching if failed. */
if (addCommandToBatch(c) == C_ERR) break;
}
prefetchCommands();
return prefetch;
/* Prefetch the commands in the batch. */
prefetchCommands();
return clients;
}
extern int ProcessingEventsWhileBlocked;
@ -341,22 +347,19 @@ extern int ProcessingEventsWhileBlocked;
* process new events, if the clients with fired events from the same io thread,
* it may call this function reentrantly. */
void processClientsFromIOThread(IOThread *t) {
/* Get the list of clients to process. */
pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]);
pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
size_t processed = listLength(mainThreadProcessingClients[t->id]);
if (processed == 0) return;
int prefetch_clients = 0;
/* We may call processClientsFromIOThread reentrantly, so we need to
* reset the prefetching batch, besides, users may change the config
* of prefetch batch size, so we need to reset the prefetching batch*/
resetCommandsBatch();
int prefetch = 0;
listNode *node = NULL;
while (listLength(mainThreadProcessingClients[t->id])) {
if (server.prefetch_batch_max_size) {
if (prefetch <= 0) prefetch = prefetchIOThreadCommand(t);
if (--prefetch <= 0) resetCommandsBatch();
}
/* Prefetch the commands if no clients in the batch. */
if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t);
/* Reset the prefetching batch if we have processed all clients. */
if (--prefetch_clients <= 0) resetCommandsBatch();
/* Each time we pop up only the first client to process to guarantee
* reentrancy safety. */
@ -420,7 +423,6 @@ void processClientsFromIOThread(IOThread *t) {
node = NULL;
}
if (node) zfree(node);
resetCommandsBatch();
/* Trigger the io thread to handle these clients ASAP to make them processed
* in parallel.

View File

@ -1,11 +1,11 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*
* This file utilizes prefetching keys and data for multiple commands in a batch,
* to improve performance by amortizing memory access costs across multiple operations.
*/
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD 3-Clause
*
* This file utilizes prefetching keys and data for multiple commands in a batch,
* to improve performance by amortizing memory access costs across multiple operations.
*/
#include "memory_prefetch.h"
#include "server.h"
@ -23,31 +23,31 @@ typedef enum {
/************************************ State machine diagram for the prefetch operation. ********************************
start
PREFETCH_BUCKET
no more tables -> done
| bucket|found |
|
start
PREFETCH_BUCKET
no more tables -> done
| bucket|found |
|
entry not found - goto next table
PREFETCH_ENTRY |
PREFETCH_ENTRY |
| Entryfound
|
value not found - goto next entry |
value not found - goto next entry |
PREFETCH_VALUE |
Valuefound
| |
PREFETCH_VALUE_DATA
|
-
PREFETCH_DONE
Valuefound
| |
PREFETCH_VALUE_DATA
|
-
PREFETCH_DONE
**********************************************************************************************************************/
typedef void *(*GetValueDataFunc)(const void *val);
@ -96,6 +96,10 @@ void freePrefetchCommandsBatch(void) {
void prefetchCommandsBatchInit(void) {
serverAssert(!batch);
/* To avoid prefetching small batches, we set the max size to twice
* the configured size, so if not exceeding twice the limit, we can
* prefetch all of it. */
size_t max_prefetch_size = server.prefetch_batch_max_size * 2;
if (max_prefetch_size == 0) {
@ -165,7 +169,7 @@ static void initBatchInfo(dict **dicts) {
}
/* Prefetch the bucket of the next hash table index.
* If no tables are left, move to the PREFETCH_DONE state. */
* If no tables are left, move to the PREFETCH_DONE state. */
static void prefetchBucket(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
@ -188,7 +192,7 @@ static void prefetchBucket(KeyPrefetchInfo *info) {
}
/* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state.
* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */
* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */
static void prefetchEntry(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
@ -210,7 +214,7 @@ static void prefetchEntry(KeyPrefetchInfo *info) {
}
/* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state.
* If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */
* If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */
static void prefetchValue(KeyPrefetchInfo *info) {
size_t i = batch->cur_idx;
void *value = dictGetVal(info->current_entry);
@ -244,26 +248,26 @@ static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_da
}
/* Prefetch dictionary data for an array of keys.
*
* This function takes an array of dictionaries and keys, attempting to bring
* data closer to the L1 cache that might be needed for dictionary operations
* on those keys.
*
* The dictFind algorithm:
* 1. Evaluate the hash of the key
* 2. Access the index in the first table
* 3. Walk the entries linked list until the key is found
* If the key hasn't been found and the dictionary is in the middle of rehashing,
* access the index on the second table and repeat step 3
*
* dictPrefetch executes the same algorithm as dictFind, but one step at a time
* for each key. Instead of waiting for data to be read from memory, it prefetches
* the data and then moves on to execute the next prefetch for another key.
*
* dicts - An array of dictionaries to prefetch data from.
* get_val_data_func - A callback function that dictPrefetch can invoke
* to bring the key's value data closer to the L1 cache as well.
*/
*
* This function takes an array of dictionaries and keys, attempting to bring
* data closer to the L1 cache that might be needed for dictionary operations
* on those keys.
*
* The dictFind algorithm:
* 1. Evaluate the hash of the key
* 2. Access the index in the first table
* 3. Walk the entries linked list until the key is found
* If the key hasn't been found and the dictionary is in the middle of rehashing,
* access the index on the second table and repeat step 3
*
* dictPrefetch executes the same algorithm as dictFind, but one step at a time
* for each key. Instead of waiting for data to be read from memory, it prefetches
* the data and then moves on to execute the next prefetch for another key.
*
* dicts - An array of dictionaries to prefetch data from.
* get_val_data_func - A callback function that dictPrefetch can invoke
* to bring the key's value data closer to the L1 cache as well.
*/
static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) {
initBatchInfo(dicts);
KeyPrefetchInfo *info;
@ -285,18 +289,37 @@ static void *getObjectValuePtr(const void *val) {
}
void resetCommandsBatch(void) {
if (!batch) return;
if (!batch) {
/* Handle the case where prefetching becomes enabled from disabled. */
if (server.prefetch_batch_max_size > 0)
prefetchCommandsBatchInit();
return;
}
batch->cur_idx = 0;
batch->keys_done = 0;
batch->key_count = 0;
batch->client_count = 0;
batch->executed_commands = 0;
/* Handle the case where the max prefetch size has been changed. */
if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size * 2) {
onMaxBatchSizeChange();
}
}
int getConfigPrefetchBatchSize(void) {
if (!batch) return 0;
/* We double the size when initializing the batch, so divide it by 2. */
return batch->max_prefetch_size / 2;
}
/* Prefetch command-related data:
* 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache.
* 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */
* 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache.
* 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */
void prefetchCommands(void) {
if (!batch) return;
/* Prefetch argv's for all clients */
for (size_t i = 0; i < batch->client_count; i++) {
client *c = batch->clients[i];
@ -333,35 +356,6 @@ void prefetchCommands(void) {
}
}
/* Processes all the prefetched commands in the current batch. */
void processClientsCommandsBatch(void) {
if (!batch || batch->client_count == 0) return;
/* If executed_commands is not 0,
* it means that we are in the middle of processing a batch and this is a recursive call */
if (batch->executed_commands == 0) {
prefetchCommands();
}
/* Process the commands */
for (size_t i = 0; i < batch->client_count; i++) {
client *c = batch->clients[i];
if (c == NULL) continue;
/* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */
batch->clients[i] = NULL;
batch->executed_commands++;
if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c);
}
resetCommandsBatch();
/* Handle the case where the max prefetch size has been changed. */
if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) {
onMaxBatchSizeChange();
}
}
/* Adds the client's command to the current batch.
*
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
@ -393,15 +387,3 @@ int addCommandToBatch(client *c) {
return C_OK;
}
/* Removes the given client from the pending prefetch batch, if present. */
void removeClientFromPendingCommandsBatch(client *c) {
if (!batch) return;
for (size_t i = 0; i < batch->client_count; i++) {
if (batch->clients[i] == c) {
batch->clients[i] = NULL;
return;
}
}
}

View File

@ -4,8 +4,8 @@
struct client;
void prefetchCommandsBatchInit(void);
int addCommandToBatch(struct client *c);
void removeClientFromPendingCommandsBatch(struct client *c);
int getConfigPrefetchBatchSize(void);
int addCommandToBatch(struct client *c);
void resetCommandsBatch(void);
void prefetchCommands(void);

View File

@ -1544,8 +1544,6 @@ void unlinkClient(client *c) {
c->client_list_node = NULL;
}
removeClientFromPendingCommandsBatch(c);
/* Check if this is a replica waiting for diskless replication (rdb pipe),
* in which case it needs to be cleaned from that list */
if (c->flags & CLIENT_SLAVE &&
@ -2836,6 +2834,7 @@ int processInputBuffer(client *c) {
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
c->io_flags |= CLIENT_IO_PENDING_COMMAND;
c->iolookedcmd = lookupCommand(c->argv, c->argc);
c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc);
enqueuePendingClientsToMainThread(c, 0);
break;
}

View File

@ -2174,7 +2174,6 @@ void initServerConfig(void) {
server.page_size = sysconf(_SC_PAGESIZE);
server.pause_cron = 0;
server.dict_resizing = 1;
server.prefetch_batch_max_size = 16;
server.latency_tracking_info_percentiles_len = 3;
server.latency_tracking_info_percentiles = zmalloc(sizeof(double)*(server.latency_tracking_info_percentiles_len));
@ -6134,6 +6133,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"total_writes_processed:%lld\r\n", stat_total_writes_processed,
"io_threaded_reads_processed:%lld\r\n", stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", stat_io_writes_processed,
"io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches,
"io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries,
"client_query_buffer_limit_disconnections:%lld\r\n", stat_client_qbuf_limit_disconnections,
"client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections,
"reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks,

View File

@ -1792,15 +1792,12 @@ struct redisServer {
int io_threads_clients_num[IO_THREADS_MAX_NUM]; /* Number of clients assigned to each IO thread. */
int io_threads_do_reads; /* Read and parse from IO threads? */
int io_threads_active; /* Is IO threads currently active? */
int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */
int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */
int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */
int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */
long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */
long long stat_total_prefetch_batches; /* Total number of prefetched batches */
/* RDB / AOF loading information */
volatile sig_atomic_t loading; /* We are loading data from disk if true */
volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */
@ -1871,6 +1868,8 @@ struct redisServer {
redisAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */
long long stat_cluster_incompatible_ops; /* Number of operations that are incompatible with cluster mode */
long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */
long long stat_total_prefetch_batches; /* Total number of prefetched batches */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
@ -3650,6 +3649,7 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result);
keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys);
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc);
int doesCommandHaveKeys(struct redisCommand *cmd);
int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags);

View File

@ -170,3 +170,142 @@ start_server {config "minimal.conf" tags {"external:skip"}} {
}
}
}
start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} {
set server_pid [s process_id]
# Skip if non io-threads mode - as it is relevant only for io-threads mode
if {[r config get io-threads] eq "io-threads 2"} {
test {prefetch works as expected when killing a client from the middle of prefetch commands batch} {
# Create 16 (prefetch batch size) +1 clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [redis_deferring_client]
}
# set a key that will be later be prefetch
r set a 0
# Get the client ID of rd4
$rd4 client id
set rd4_id [$rd4 read]
# Create a batch of commands by suspending the server for a while
# before responding to the first command
pause_process $server_pid
# The first client will kill the fourth client
$rd0 client kill id $rd4_id
# Send set commands for all clients except the first
for {set i 1} {$i < 16} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server
resume_process $server_pid
# Read the results
assert_equal {1} [$rd0 read]
catch {$rd4 read} err
assert_match {I/O error reading reply} $err
# verify the prefetch stats are as expected
set info [r info stats]
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower
set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches]
assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher
# Verify the final state
$rd15 get a
assert_equal {OK} [$rd15 read]
assert_equal {15} [$rd15 read]
}
test {prefetch works as expected when changing the batch size while executing the commands batch} {
# Create 16 (default prefetch batch size) clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [redis_deferring_client]
}
# Create a batch of commands by suspending the server for a while
# before responding to the first command
pause_process $server_pid
# Send set commands for all clients the 5th client will change the prefetch batch size
for {set i 0} {$i < 16} {incr i} {
if {$i == 4} {
[set rd$i] config set prefetch-batch-max-size 1
}
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server
resume_process $server_pid
# Read the results
for {set i 0} {$i < 16} {incr i} {
assert_equal {OK} [[set rd$i] read]
[set rd$i] close
}
# assert the configured prefetch batch size was changed
assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"}
}
proc do_prefetch_batch {server_pid batch_size} {
# Create clients
for {set i 0} {$i < $batch_size} {incr i} {
set rd$i [redis_deferring_client]
}
# Suspend the server to batch the commands
pause_process $server_pid
# Send commands from all clients
for {set i 0} {$i < $batch_size} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server to process the batch
resume_process $server_pid
# Verify responses
for {set i 0} {$i < $batch_size} {incr i} {
assert_equal {OK} [[set rd$i] read]
[set rd$i] close
}
}
test {no prefetch when the batch size is set to 0} {
# set the batch size to 0
r config set prefetch-batch-max-size 0
# save the current value of prefetch entries
set info [r info stats]
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
do_prefetch_batch $server_pid 16
# assert the prefetch entries did not change
set info [r info stats]
set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
assert_equal $prefetch_entries $new_prefetch_entries
}
test {Prefetch can resume working when the configuration option is set to a non-zero value} {
# save the current value of prefetch entries
set info [r info stats]
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
# set the batch size to 0
r config set prefetch-batch-max-size 16
do_prefetch_batch $server_pid 16
# assert the prefetch entries did not change
set info [r info stats]
set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
# With slower machines, the number of prefetch entries can be lower
assert_range $new_prefetch_entries [expr {$prefetch_entries + 2}] [expr {$prefetch_entries + 16}]
}
}
}