From f573e7e6c801a51de84295b11668c7f9f214e041 Mon Sep 17 00:00:00 2001 From: Yuan Wang Date: Wed, 30 Apr 2025 23:28:49 +0800 Subject: [PATCH] finish memory prefetch --- redis.conf | 12 +++ src/config.c | 1 + src/config.h | 2 +- src/db.c | 18 ++++ src/fmtargs.h | 84 ++++++++++++++++++- src/iothread.c | 48 +++++------ src/memory_prefetch.c | 170 +++++++++++++++++--------------------- src/memory_prefetch.h | 4 +- src/networking.c | 3 +- src/server.c | 3 +- src/server.h | 8 +- tests/unit/networking.tcl | 139 +++++++++++++++++++++++++++++++ 12 files changed, 363 insertions(+), 129 deletions(-) diff --git a/redis.conf b/redis.conf index 62cec06de..26b41dde4 100644 --- a/redis.conf +++ b/redis.conf @@ -1334,6 +1334,18 @@ lazyfree-lazy-user-flush no # --threads option to match the number of Redis threads, otherwise you'll not # be able to notice the improvements. +# When multiple commands are parsed by the I/O threads and ready for execution, +# we take advantage of knowing the next set of commands and prefetch their +# required dictionary entries in a batch. This reduces memory access costs. +# +# The optimal batch size depends on the specific workflow of the user. +# The default batch size is 16, which can be modified using the +# 'prefetch-batch-max-size' config. +# +# When the config is set to 0, prefetching is disabled. +# +# prefetch-batch-max-size 16 + ############################ KERNEL OOM CONTROL ############################## # On Linux, it is possible to hint the kernel OOM killer on what processes diff --git a/src/config.c b/src/config.c index a5ddad6b7..50963e4d0 100644 --- a/src/config.c +++ b/src/config.c @@ -3168,6 +3168,7 @@ standardConfig static_configs[] = { createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */ + createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */ createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL), diff --git a/src/config.h b/src/config.h index b7ed30884..ce7da028a 100644 --- a/src/config.h +++ b/src/config.h @@ -115,7 +115,7 @@ #endif #if HAS_BUILTIN_PREFETCH -#define redis_prefetch(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */ +#define redis_prefetch(addr) __builtin_prefetch(addr) #define redis_prefetch_read(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */ #define redis_prefetch_write(addr) __builtin_prefetch(addr, 1, 3) /* Write with high locality */ #else diff --git a/src/db.c b/src/db.c index 508c342fb..030861128 100644 --- a/src/db.c +++ b/src/db.c @@ -300,6 +300,24 @@ int getKeySlot(sds key) { return calculateKeySlot(key); } +/* Return the slot of the key in the command. IO threads use this function + * to calculate slot to reduce main-thread load */ +int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) { + int slot = -1; + if (!cmd || !server.cluster_enabled) return slot; + + /* Get the keys from the command */ + getKeysResult result = GETKEYS_RESULT_INIT; + int numkeys = getKeysFromCommand(cmd, argv, argc, &result); + if (numkeys > 0) { + /* Get the slot of the first key */ + robj *first = argv[result.keys[0].pos]; + slot = keyHashSlot(first->ptr, (int)sdslen(first->ptr)); + } + getKeysFreeResult(&result); + return slot; +} + /* This is a special version of dbAdd() that is used only when loading * keys from the RDB file: the key is passed as an SDS string that is * retained by the function (and not freed by the caller). diff --git a/src/fmtargs.h b/src/fmtargs.h index e52d3b99c..1fbd02ed8 100644 --- a/src/fmtargs.h +++ b/src/fmtargs.h @@ -44,9 +44,9 @@ /* Everything below this line is automatically generated by * generate-fmtargs.py. Do not manually edit. */ -#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N +#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, _123, _124, _125, _126, _127, _128, _129, _130, _131, _132, _133, _134, _135, _136, _137, _138, _139, _140, _141, _142, _143, _144, _145, _146, _147, _148, _149, _150, _151, _152, _153, _154, _155, _156, _157, _158, _159, _160, _161, _162, _163, _164, _165, _166, _167, _168, _169, _170, _171, _172, _173, _174, _175, _176, _177, _178, _179, _180, _181, _182, _183, _184, _185, _186, _187, _188, _189, _190, _191, _192, _193, _194, _195, _196, _197, _198, _199, _200, N, ...) N -#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 +#define RSEQ_N() 200, 199, 198, 197, 196, 195, 194, 193, 192, 191, 190, 189, 188, 187, 186, 185, 184, 183, 182, 181, 180, 179, 178, 177, 176, 175, 174, 173, 172, 171, 170, 169, 168, 167, 166, 165, 164, 163, 162, 161, 160, 159, 158, 157, 156, 155, 154, 153, 152, 151, 150, 149, 148, 147, 146, 145, 144, 143, 142, 141, 140, 139, 138, 137, 136, 135, 134, 133, 132, 131, 130, 129, 128, 127, 126, 125, 124, 123, 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 #define COMPACT_FMT_2(fmt, value) fmt #define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__) @@ -108,6 +108,46 @@ #define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__) #define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__) #define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__) +#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__) +#define COMPACT_FMT_124(fmt, value, ...) fmt COMPACT_FMT_122(__VA_ARGS__) +#define COMPACT_FMT_126(fmt, value, ...) fmt COMPACT_FMT_124(__VA_ARGS__) +#define COMPACT_FMT_128(fmt, value, ...) fmt COMPACT_FMT_126(__VA_ARGS__) +#define COMPACT_FMT_130(fmt, value, ...) fmt COMPACT_FMT_128(__VA_ARGS__) +#define COMPACT_FMT_132(fmt, value, ...) fmt COMPACT_FMT_130(__VA_ARGS__) +#define COMPACT_FMT_134(fmt, value, ...) fmt COMPACT_FMT_132(__VA_ARGS__) +#define COMPACT_FMT_136(fmt, value, ...) fmt COMPACT_FMT_134(__VA_ARGS__) +#define COMPACT_FMT_138(fmt, value, ...) fmt COMPACT_FMT_136(__VA_ARGS__) +#define COMPACT_FMT_140(fmt, value, ...) fmt COMPACT_FMT_138(__VA_ARGS__) +#define COMPACT_FMT_142(fmt, value, ...) fmt COMPACT_FMT_140(__VA_ARGS__) +#define COMPACT_FMT_144(fmt, value, ...) fmt COMPACT_FMT_142(__VA_ARGS__) +#define COMPACT_FMT_146(fmt, value, ...) fmt COMPACT_FMT_144(__VA_ARGS__) +#define COMPACT_FMT_148(fmt, value, ...) fmt COMPACT_FMT_146(__VA_ARGS__) +#define COMPACT_FMT_150(fmt, value, ...) fmt COMPACT_FMT_148(__VA_ARGS__) +#define COMPACT_FMT_152(fmt, value, ...) fmt COMPACT_FMT_150(__VA_ARGS__) +#define COMPACT_FMT_154(fmt, value, ...) fmt COMPACT_FMT_152(__VA_ARGS__) +#define COMPACT_FMT_156(fmt, value, ...) fmt COMPACT_FMT_154(__VA_ARGS__) +#define COMPACT_FMT_158(fmt, value, ...) fmt COMPACT_FMT_156(__VA_ARGS__) +#define COMPACT_FMT_160(fmt, value, ...) fmt COMPACT_FMT_158(__VA_ARGS__) +#define COMPACT_FMT_162(fmt, value, ...) fmt COMPACT_FMT_160(__VA_ARGS__) +#define COMPACT_FMT_164(fmt, value, ...) fmt COMPACT_FMT_162(__VA_ARGS__) +#define COMPACT_FMT_166(fmt, value, ...) fmt COMPACT_FMT_164(__VA_ARGS__) +#define COMPACT_FMT_168(fmt, value, ...) fmt COMPACT_FMT_166(__VA_ARGS__) +#define COMPACT_FMT_170(fmt, value, ...) fmt COMPACT_FMT_168(__VA_ARGS__) +#define COMPACT_FMT_172(fmt, value, ...) fmt COMPACT_FMT_170(__VA_ARGS__) +#define COMPACT_FMT_174(fmt, value, ...) fmt COMPACT_FMT_172(__VA_ARGS__) +#define COMPACT_FMT_176(fmt, value, ...) fmt COMPACT_FMT_174(__VA_ARGS__) +#define COMPACT_FMT_178(fmt, value, ...) fmt COMPACT_FMT_176(__VA_ARGS__) +#define COMPACT_FMT_180(fmt, value, ...) fmt COMPACT_FMT_178(__VA_ARGS__) +#define COMPACT_FMT_182(fmt, value, ...) fmt COMPACT_FMT_180(__VA_ARGS__) +#define COMPACT_FMT_184(fmt, value, ...) fmt COMPACT_FMT_182(__VA_ARGS__) +#define COMPACT_FMT_186(fmt, value, ...) fmt COMPACT_FMT_184(__VA_ARGS__) +#define COMPACT_FMT_188(fmt, value, ...) fmt COMPACT_FMT_186(__VA_ARGS__) +#define COMPACT_FMT_190(fmt, value, ...) fmt COMPACT_FMT_188(__VA_ARGS__) +#define COMPACT_FMT_192(fmt, value, ...) fmt COMPACT_FMT_190(__VA_ARGS__) +#define COMPACT_FMT_194(fmt, value, ...) fmt COMPACT_FMT_192(__VA_ARGS__) +#define COMPACT_FMT_196(fmt, value, ...) fmt COMPACT_FMT_194(__VA_ARGS__) +#define COMPACT_FMT_198(fmt, value, ...) fmt COMPACT_FMT_196(__VA_ARGS__) +#define COMPACT_FMT_200(fmt, value, ...) fmt COMPACT_FMT_198(__VA_ARGS__) #define COMPACT_VALUES_2(fmt, value) value #define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__) @@ -169,5 +209,45 @@ #define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__) #define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__) #define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__) +#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__) +#define COMPACT_VALUES_124(fmt, value, ...) value, COMPACT_VALUES_122(__VA_ARGS__) +#define COMPACT_VALUES_126(fmt, value, ...) value, COMPACT_VALUES_124(__VA_ARGS__) +#define COMPACT_VALUES_128(fmt, value, ...) value, COMPACT_VALUES_126(__VA_ARGS__) +#define COMPACT_VALUES_130(fmt, value, ...) value, COMPACT_VALUES_128(__VA_ARGS__) +#define COMPACT_VALUES_132(fmt, value, ...) value, COMPACT_VALUES_130(__VA_ARGS__) +#define COMPACT_VALUES_134(fmt, value, ...) value, COMPACT_VALUES_132(__VA_ARGS__) +#define COMPACT_VALUES_136(fmt, value, ...) value, COMPACT_VALUES_134(__VA_ARGS__) +#define COMPACT_VALUES_138(fmt, value, ...) value, COMPACT_VALUES_136(__VA_ARGS__) +#define COMPACT_VALUES_140(fmt, value, ...) value, COMPACT_VALUES_138(__VA_ARGS__) +#define COMPACT_VALUES_142(fmt, value, ...) value, COMPACT_VALUES_140(__VA_ARGS__) +#define COMPACT_VALUES_144(fmt, value, ...) value, COMPACT_VALUES_142(__VA_ARGS__) +#define COMPACT_VALUES_146(fmt, value, ...) value, COMPACT_VALUES_144(__VA_ARGS__) +#define COMPACT_VALUES_148(fmt, value, ...) value, COMPACT_VALUES_146(__VA_ARGS__) +#define COMPACT_VALUES_150(fmt, value, ...) value, COMPACT_VALUES_148(__VA_ARGS__) +#define COMPACT_VALUES_152(fmt, value, ...) value, COMPACT_VALUES_150(__VA_ARGS__) +#define COMPACT_VALUES_154(fmt, value, ...) value, COMPACT_VALUES_152(__VA_ARGS__) +#define COMPACT_VALUES_156(fmt, value, ...) value, COMPACT_VALUES_154(__VA_ARGS__) +#define COMPACT_VALUES_158(fmt, value, ...) value, COMPACT_VALUES_156(__VA_ARGS__) +#define COMPACT_VALUES_160(fmt, value, ...) value, COMPACT_VALUES_158(__VA_ARGS__) +#define COMPACT_VALUES_162(fmt, value, ...) value, COMPACT_VALUES_160(__VA_ARGS__) +#define COMPACT_VALUES_164(fmt, value, ...) value, COMPACT_VALUES_162(__VA_ARGS__) +#define COMPACT_VALUES_166(fmt, value, ...) value, COMPACT_VALUES_164(__VA_ARGS__) +#define COMPACT_VALUES_168(fmt, value, ...) value, COMPACT_VALUES_166(__VA_ARGS__) +#define COMPACT_VALUES_170(fmt, value, ...) value, COMPACT_VALUES_168(__VA_ARGS__) +#define COMPACT_VALUES_172(fmt, value, ...) value, COMPACT_VALUES_170(__VA_ARGS__) +#define COMPACT_VALUES_174(fmt, value, ...) value, COMPACT_VALUES_172(__VA_ARGS__) +#define COMPACT_VALUES_176(fmt, value, ...) value, COMPACT_VALUES_174(__VA_ARGS__) +#define COMPACT_VALUES_178(fmt, value, ...) value, COMPACT_VALUES_176(__VA_ARGS__) +#define COMPACT_VALUES_180(fmt, value, ...) value, COMPACT_VALUES_178(__VA_ARGS__) +#define COMPACT_VALUES_182(fmt, value, ...) value, COMPACT_VALUES_180(__VA_ARGS__) +#define COMPACT_VALUES_184(fmt, value, ...) value, COMPACT_VALUES_182(__VA_ARGS__) +#define COMPACT_VALUES_186(fmt, value, ...) value, COMPACT_VALUES_184(__VA_ARGS__) +#define COMPACT_VALUES_188(fmt, value, ...) value, COMPACT_VALUES_186(__VA_ARGS__) +#define COMPACT_VALUES_190(fmt, value, ...) value, COMPACT_VALUES_188(__VA_ARGS__) +#define COMPACT_VALUES_192(fmt, value, ...) value, COMPACT_VALUES_190(__VA_ARGS__) +#define COMPACT_VALUES_194(fmt, value, ...) value, COMPACT_VALUES_192(__VA_ARGS__) +#define COMPACT_VALUES_196(fmt, value, ...) value, COMPACT_VALUES_194(__VA_ARGS__) +#define COMPACT_VALUES_198(fmt, value, ...) value, COMPACT_VALUES_196(__VA_ARGS__) +#define COMPACT_VALUES_200(fmt, value, ...) value, COMPACT_VALUES_198(__VA_ARGS__) #endif diff --git a/src/iothread.c b/src/iothread.c index bb9b591b1..83656e253 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -309,24 +309,30 @@ int sendPendingClientsToIOThreads(void) { return processed; } -int prefetchIOThreadCommand(IOThread *t) { +/* Prefetch the commands from the IO thread. The return value is the number + * of clients that have been prefetched. */ +int prefetchIOThreadCommands(IOThread *t) { + /* Since small batch prefetching is not much effective, so if the remaining + * is small (less than twice the max batch size), prefetch all of it. */ int len = listLength(mainThreadProcessingClients[t->id]); - if (len < 2) return 0; - - int iterate = 0; - int prefetch = len < server.prefetch_batch_max_size*2 ? len : - server.prefetch_batch_max_size; + int config_size = getConfigPrefetchBatchSize(); + int to_prefetch = len < config_size*2 ? len : config_size; + if (to_prefetch == 0) return 0; + int clients = 0; listIter li; listNode *ln; listRewind(mainThreadProcessingClients[t->id], &li); - while((ln = listNext(&li)) && iterate++ < prefetch) { + while((ln = listNext(&li)) && clients++ < to_prefetch) { client *c = listNodeValue(ln); - addCommandToBatch(c); + /* One command may have several keys, the batch may be full, + * so we stop prefetching if failed. */ + if (addCommandToBatch(c) == C_ERR) break; } - prefetchCommands(); - return prefetch; + /* Prefetch the commands in the batch. */ + prefetchCommands(); + return clients; } extern int ProcessingEventsWhileBlocked; @@ -341,22 +347,19 @@ extern int ProcessingEventsWhileBlocked; * process new events, if the clients with fired events from the same io thread, * it may call this function reentrantly. */ void processClientsFromIOThread(IOThread *t) { - /* Get the list of clients to process. */ - pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]); - listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]); - pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]); - size_t processed = listLength(mainThreadProcessingClients[t->id]); - if (processed == 0) return; + int prefetch_clients = 0; + /* We may call processClientsFromIOThread reentrantly, so we need to + * reset the prefetching batch, besides, users may change the config + * of prefetch batch size, so we need to reset the prefetching batch*/ + resetCommandsBatch(); - int prefetch = 0; listNode *node = NULL; while (listLength(mainThreadProcessingClients[t->id])) { - - if (server.prefetch_batch_max_size) { - if (prefetch <= 0) prefetch = prefetchIOThreadCommand(t); - if (--prefetch <= 0) resetCommandsBatch(); - } + /* Prefetch the commands if no clients in the batch. */ + if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t); + /* Reset the prefetching batch if we have processed all clients. */ + if (--prefetch_clients <= 0) resetCommandsBatch(); /* Each time we pop up only the first client to process to guarantee * reentrancy safety. */ @@ -420,7 +423,6 @@ void processClientsFromIOThread(IOThread *t) { node = NULL; } if (node) zfree(node); - resetCommandsBatch(); /* Trigger the io thread to handle these clients ASAP to make them processed * in parallel. diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c index 85b6ba416..9c963f24b 100644 --- a/src/memory_prefetch.c +++ b/src/memory_prefetch.c @@ -1,11 +1,11 @@ /* -* Copyright Valkey Contributors. -* All rights reserved. -* SPDX-License-Identifier: BSD 3-Clause -* -* This file utilizes prefetching keys and data for multiple commands in a batch, -* to improve performance by amortizing memory access costs across multiple operations. -*/ + * Copyright Valkey Contributors. + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + * + * This file utilizes prefetching keys and data for multiple commands in a batch, + * to improve performance by amortizing memory access costs across multiple operations. + */ #include "memory_prefetch.h" #include "server.h" @@ -23,31 +23,31 @@ typedef enum { /************************************ State machine diagram for the prefetch operation. ******************************** - │ - start - │ - ┌────────▼─────────┐ - ┌─────────►│ PREFETCH_BUCKET ├────►────────┐ - │ └────────┬─────────┘ no more tables -> done - | bucket|found | - │ | │ + │ + start + │ + ┌────────▼─────────┐ + ┌─────────►│ PREFETCH_BUCKET ├────►────────┐ + │ └────────┬─────────┘ no more tables -> done + | bucket|found | + │ | │ entry not found - goto next table ┌────────▼────────┐ │ - └────◄─────┤ PREFETCH_ENTRY | ▼ + └────◄─────┤ PREFETCH_ENTRY | ▼ ┌────────────►└────────┬────────┘ │ | Entry│found │ │ | │ - value not found - goto next entry ┌───────▼────────┐ | + value not found - goto next entry ┌───────▼────────┐ | └───────◄──────┤ PREFETCH_VALUE | ▼ - └───────┬────────┘ │ - Value│found │ - | | - ┌───────────▼──────────────┐ │ - │ PREFETCH_VALUE_DATA │ ▼ - └───────────┬──────────────┘ │ - | │ - ┌───────-─▼─────────────┐ │ - │ PREFETCH_DONE │◄────────┘ - └───────────────────────┘ + └───────┬────────┘ │ + Value│found │ + | | + ┌───────────▼──────────────┐ │ + │ PREFETCH_VALUE_DATA │ ▼ + └───────────┬──────────────┘ │ + | │ + ┌───────-─▼─────────────┐ │ + │ PREFETCH_DONE │◄────────┘ + └───────────────────────┘ **********************************************************************************************************************/ typedef void *(*GetValueDataFunc)(const void *val); @@ -96,6 +96,10 @@ void freePrefetchCommandsBatch(void) { void prefetchCommandsBatchInit(void) { serverAssert(!batch); + + /* To avoid prefetching small batches, we set the max size to twice + * the configured size, so if not exceeding twice the limit, we can + * prefetch all of it. */ size_t max_prefetch_size = server.prefetch_batch_max_size * 2; if (max_prefetch_size == 0) { @@ -165,7 +169,7 @@ static void initBatchInfo(dict **dicts) { } /* Prefetch the bucket of the next hash table index. -* If no tables are left, move to the PREFETCH_DONE state. */ + * If no tables are left, move to the PREFETCH_DONE state. */ static void prefetchBucket(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; @@ -188,7 +192,7 @@ static void prefetchBucket(KeyPrefetchInfo *info) { } /* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state. -* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ + * If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ static void prefetchEntry(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; @@ -210,7 +214,7 @@ static void prefetchEntry(KeyPrefetchInfo *info) { } /* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state. -* If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */ + * If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */ static void prefetchValue(KeyPrefetchInfo *info) { size_t i = batch->cur_idx; void *value = dictGetVal(info->current_entry); @@ -244,26 +248,26 @@ static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_da } /* Prefetch dictionary data for an array of keys. -* -* This function takes an array of dictionaries and keys, attempting to bring -* data closer to the L1 cache that might be needed for dictionary operations -* on those keys. -* -* The dictFind algorithm: -* 1. Evaluate the hash of the key -* 2. Access the index in the first table -* 3. Walk the entries linked list until the key is found -* If the key hasn't been found and the dictionary is in the middle of rehashing, -* access the index on the second table and repeat step 3 -* -* dictPrefetch executes the same algorithm as dictFind, but one step at a time -* for each key. Instead of waiting for data to be read from memory, it prefetches -* the data and then moves on to execute the next prefetch for another key. -* -* dicts - An array of dictionaries to prefetch data from. -* get_val_data_func - A callback function that dictPrefetch can invoke -* to bring the key's value data closer to the L1 cache as well. -*/ + * + * This function takes an array of dictionaries and keys, attempting to bring + * data closer to the L1 cache that might be needed for dictionary operations + * on those keys. + * + * The dictFind algorithm: + * 1. Evaluate the hash of the key + * 2. Access the index in the first table + * 3. Walk the entries linked list until the key is found + * If the key hasn't been found and the dictionary is in the middle of rehashing, + * access the index on the second table and repeat step 3 + * + * dictPrefetch executes the same algorithm as dictFind, but one step at a time + * for each key. Instead of waiting for data to be read from memory, it prefetches + * the data and then moves on to execute the next prefetch for another key. + * + * dicts - An array of dictionaries to prefetch data from. + * get_val_data_func - A callback function that dictPrefetch can invoke + * to bring the key's value data closer to the L1 cache as well. + */ static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) { initBatchInfo(dicts); KeyPrefetchInfo *info; @@ -285,18 +289,37 @@ static void *getObjectValuePtr(const void *val) { } void resetCommandsBatch(void) { - if (!batch) return; + if (!batch) { + /* Handle the case where prefetching becomes enabled from disabled. */ + if (server.prefetch_batch_max_size > 0) + prefetchCommandsBatchInit(); + return; + } + batch->cur_idx = 0; batch->keys_done = 0; batch->key_count = 0; batch->client_count = 0; batch->executed_commands = 0; + + /* Handle the case where the max prefetch size has been changed. */ + if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size * 2) { + onMaxBatchSizeChange(); + } +} + +int getConfigPrefetchBatchSize(void) { + if (!batch) return 0; + /* We double the size when initializing the batch, so divide it by 2. */ + return batch->max_prefetch_size / 2; } /* Prefetch command-related data: -* 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache. -* 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */ + * 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache. + * 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */ void prefetchCommands(void) { + if (!batch) return; + /* Prefetch argv's for all clients */ for (size_t i = 0; i < batch->client_count; i++) { client *c = batch->clients[i]; @@ -333,35 +356,6 @@ void prefetchCommands(void) { } } -/* Processes all the prefetched commands in the current batch. */ -void processClientsCommandsBatch(void) { - if (!batch || batch->client_count == 0) return; - - /* If executed_commands is not 0, - * it means that we are in the middle of processing a batch and this is a recursive call */ - if (batch->executed_commands == 0) { - prefetchCommands(); - } - - /* Process the commands */ - for (size_t i = 0; i < batch->client_count; i++) { - client *c = batch->clients[i]; - if (c == NULL) continue; - - /* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */ - batch->clients[i] = NULL; - batch->executed_commands++; - if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c); - } - - resetCommandsBatch(); - - /* Handle the case where the max prefetch size has been changed. */ - if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) { - onMaxBatchSizeChange(); - } -} - /* Adds the client's command to the current batch. * * Returns C_OK if the command was added successfully, C_ERR otherwise. */ @@ -393,15 +387,3 @@ int addCommandToBatch(client *c) { return C_OK; } - -/* Removes the given client from the pending prefetch batch, if present. */ -void removeClientFromPendingCommandsBatch(client *c) { - if (!batch) return; - - for (size_t i = 0; i < batch->client_count; i++) { - if (batch->clients[i] == c) { - batch->clients[i] = NULL; - return; - } - } -} diff --git a/src/memory_prefetch.h b/src/memory_prefetch.h index 5ab5b3c5a..b83b18505 100644 --- a/src/memory_prefetch.h +++ b/src/memory_prefetch.h @@ -4,8 +4,8 @@ struct client; void prefetchCommandsBatchInit(void); -int addCommandToBatch(struct client *c); -void removeClientFromPendingCommandsBatch(struct client *c); +int getConfigPrefetchBatchSize(void); +int addCommandToBatch(struct client *c); void resetCommandsBatch(void); void prefetchCommands(void); diff --git a/src/networking.c b/src/networking.c index 72c96ec76..e03d0275e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1544,8 +1544,6 @@ void unlinkClient(client *c) { c->client_list_node = NULL; } - removeClientFromPendingCommandsBatch(c); - /* Check if this is a replica waiting for diskless replication (rdb pipe), * in which case it needs to be cleaned from that list */ if (c->flags & CLIENT_SLAVE && @@ -2836,6 +2834,7 @@ int processInputBuffer(client *c) { if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; c->iolookedcmd = lookupCommand(c->argv, c->argc); + c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc); enqueuePendingClientsToMainThread(c, 0); break; } diff --git a/src/server.c b/src/server.c index 3abffa0a5..c0298bb62 100644 --- a/src/server.c +++ b/src/server.c @@ -2174,7 +2174,6 @@ void initServerConfig(void) { server.page_size = sysconf(_SC_PAGESIZE); server.pause_cron = 0; server.dict_resizing = 1; - server.prefetch_batch_max_size = 16; server.latency_tracking_info_percentiles_len = 3; server.latency_tracking_info_percentiles = zmalloc(sizeof(double)*(server.latency_tracking_info_percentiles_len)); @@ -6134,6 +6133,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "total_writes_processed:%lld\r\n", stat_total_writes_processed, "io_threaded_reads_processed:%lld\r\n", stat_io_reads_processed, "io_threaded_writes_processed:%lld\r\n", stat_io_writes_processed, + "io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches, + "io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries, "client_query_buffer_limit_disconnections:%lld\r\n", stat_client_qbuf_limit_disconnections, "client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections, "reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks, diff --git a/src/server.h b/src/server.h index 7807a2af4..df9839b20 100644 --- a/src/server.h +++ b/src/server.h @@ -1792,15 +1792,12 @@ struct redisServer { int io_threads_clients_num[IO_THREADS_MAX_NUM]; /* Number of clients assigned to each IO thread. */ int io_threads_do_reads; /* Read and parse from IO threads? */ int io_threads_active; /* Is IO threads currently active? */ + int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */ long long events_processed_while_blocked; /* processEventsWhileBlocked() */ int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */ int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */ - int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */ - long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ - long long stat_total_prefetch_batches; /* Total number of prefetched batches */ - /* RDB / AOF loading information */ volatile sig_atomic_t loading; /* We are loading data from disk if true */ volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */ @@ -1871,6 +1868,8 @@ struct redisServer { redisAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */ long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */ long long stat_cluster_incompatible_ops; /* Number of operations that are incompatible with cluster mode */ + long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ + long long stat_total_prefetch_batches; /* Total number of prefetched batches */ /* The following two are used to track instantaneous metrics, like * number of operations per second, network traffic. */ struct { @@ -3650,6 +3649,7 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index); int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result); keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys); int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); +int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc); int doesCommandHaveKeys(struct redisCommand *cmd); int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result); int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags); diff --git a/tests/unit/networking.tcl b/tests/unit/networking.tcl index 79d6e399d..600c67203 100644 --- a/tests/unit/networking.tcl +++ b/tests/unit/networking.tcl @@ -170,3 +170,142 @@ start_server {config "minimal.conf" tags {"external:skip"}} { } } } + +start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} { + set server_pid [s process_id] + # Skip if non io-threads mode - as it is relevant only for io-threads mode + if {[r config get io-threads] eq "io-threads 2"} { + test {prefetch works as expected when killing a client from the middle of prefetch commands batch} { + # Create 16 (prefetch batch size) +1 clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [redis_deferring_client] + } + + # set a key that will be later be prefetch + r set a 0 + + # Get the client ID of rd4 + $rd4 client id + set rd4_id [$rd4 read] + + # Create a batch of commands by suspending the server for a while + # before responding to the first command + pause_process $server_pid + + # The first client will kill the fourth client + $rd0 client kill id $rd4_id + + # Send set commands for all clients except the first + for {set i 1} {$i < 16} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Resume the server + resume_process $server_pid + + # Read the results + assert_equal {1} [$rd0 read] + catch {$rd4 read} err + assert_match {I/O error reading reply} $err + + # verify the prefetch stats are as expected + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower + set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches] + assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher + + # Verify the final state + $rd15 get a + assert_equal {OK} [$rd15 read] + assert_equal {15} [$rd15 read] + } + + test {prefetch works as expected when changing the batch size while executing the commands batch} { + # Create 16 (default prefetch batch size) clients + for {set i 0} {$i < 16} {incr i} { + set rd$i [redis_deferring_client] + } + + # Create a batch of commands by suspending the server for a while + # before responding to the first command + pause_process $server_pid + + # Send set commands for all clients the 5th client will change the prefetch batch size + for {set i 0} {$i < 16} {incr i} { + if {$i == 4} { + [set rd$i] config set prefetch-batch-max-size 1 + } + [set rd$i] set a $i + [set rd$i] flush + } + # Resume the server + resume_process $server_pid + # Read the results + for {set i 0} {$i < 16} {incr i} { + assert_equal {OK} [[set rd$i] read] + [set rd$i] close + } + + # assert the configured prefetch batch size was changed + assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"} + } + + proc do_prefetch_batch {server_pid batch_size} { + # Create clients + for {set i 0} {$i < $batch_size} {incr i} { + set rd$i [redis_deferring_client] + } + + # Suspend the server to batch the commands + pause_process $server_pid + + # Send commands from all clients + for {set i 0} {$i < $batch_size} {incr i} { + [set rd$i] set a $i + [set rd$i] flush + } + + # Resume the server to process the batch + resume_process $server_pid + + # Verify responses + for {set i 0} {$i < $batch_size} {incr i} { + assert_equal {OK} [[set rd$i] read] + [set rd$i] close + } + } + + test {no prefetch when the batch size is set to 0} { + # set the batch size to 0 + r config set prefetch-batch-max-size 0 + # save the current value of prefetch entries + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + + do_prefetch_batch $server_pid 16 + + # assert the prefetch entries did not change + set info [r info stats] + set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + assert_equal $prefetch_entries $new_prefetch_entries + } + + test {Prefetch can resume working when the configuration option is set to a non-zero value} { + # save the current value of prefetch entries + set info [r info stats] + set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + # set the batch size to 0 + r config set prefetch-batch-max-size 16 + + do_prefetch_batch $server_pid 16 + + # assert the prefetch entries did not change + set info [r info stats] + set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries] + # With slower machines, the number of prefetch entries can be lower + assert_range $new_prefetch_entries [expr {$prefetch_entries + 2}] [expr {$prefetch_entries + 16}] + } + } +}