mirror of https://mirror.osredm.com/root/redis.git
Merge 0540a08e2e
into 232f2fb077
This commit is contained in:
commit
d3da013b7b
12
redis.conf
12
redis.conf
|
@ -1334,6 +1334,18 @@ lazyfree-lazy-user-flush no
|
|||
# --threads option to match the number of Redis threads, otherwise you'll not
|
||||
# be able to notice the improvements.
|
||||
|
||||
# When multiple commands are parsed by the I/O threads and ready for execution,
|
||||
# we take advantage of knowing the next set of commands and prefetch their
|
||||
# required dictionary entries in a batch. This reduces memory access costs.
|
||||
#
|
||||
# The optimal batch size depends on the specific workflow of the user.
|
||||
# The default batch size is 16, which can be modified using the
|
||||
# 'prefetch-batch-max-size' config.
|
||||
#
|
||||
# When the config is set to 0, prefetching is disabled.
|
||||
#
|
||||
# prefetch-batch-max-size 16
|
||||
|
||||
############################ KERNEL OOM CONTROL ##############################
|
||||
|
||||
# On Linux, it is possible to hint the kernel OOM killer on what processes
|
||||
|
|
|
@ -375,7 +375,7 @@ endif
|
|||
|
||||
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
|
||||
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
|
||||
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
|
||||
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
|
||||
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
|
||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
|
||||
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)
|
||||
|
|
|
@ -3168,6 +3168,7 @@ standardConfig static_configs[] = {
|
|||
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
|
||||
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
|
||||
createIntConfig("prefetch-batch-max-size", NULL, MODIFIABLE_CONFIG, 0, 128, server.prefetch_batch_max_size, 16, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
|
||||
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */
|
||||
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
|
||||
|
|
|
@ -115,9 +115,11 @@
|
|||
#endif
|
||||
|
||||
#if HAS_BUILTIN_PREFETCH
|
||||
#define redis_prefetch(addr) __builtin_prefetch(addr)
|
||||
#define redis_prefetch_read(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */
|
||||
#define redis_prefetch_write(addr) __builtin_prefetch(addr, 1, 3) /* Write with high locality */
|
||||
#else
|
||||
#define redis_prefetch(addr) ((void)(addr)) /* No-op if unsupported */
|
||||
#define redis_prefetch_read(addr) ((void)(addr)) /* No-op if unsupported */
|
||||
#define redis_prefetch_write(addr) ((void)(addr)) /* No-op if unsupported */
|
||||
#endif
|
||||
|
|
18
src/db.c
18
src/db.c
|
@ -324,6 +324,24 @@ int getKeySlot(sds key) {
|
|||
return calculateKeySlot(key);
|
||||
}
|
||||
|
||||
/* Return the slot of the key in the command. IO threads use this function
|
||||
* to calculate slot to reduce main-thread load */
|
||||
int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) {
|
||||
int slot = -1;
|
||||
if (!cmd || !server.cluster_enabled) return slot;
|
||||
|
||||
/* Get the keys from the command */
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
int numkeys = getKeysFromCommand(cmd, argv, argc, &result);
|
||||
if (numkeys > 0) {
|
||||
/* Get the slot of the first key */
|
||||
robj *first = argv[result.keys[0].pos];
|
||||
slot = keyHashSlot(first->ptr, (int)sdslen(first->ptr));
|
||||
}
|
||||
getKeysFreeResult(&result);
|
||||
return slot;
|
||||
}
|
||||
|
||||
/* This is a special version of dbAdd() that is used only when loading
|
||||
* keys from the RDB file: the key is passed as an SDS string that is
|
||||
* copied by the function and freed by the caller.
|
||||
|
|
13
src/dict.c
13
src/dict.c
|
@ -66,7 +66,6 @@ static void _dictShrinkIfNeeded(dict *d);
|
|||
static void _dictRehashStepIfNeeded(dict *d, uint64_t visitedIdx);
|
||||
static signed char _dictNextExp(unsigned long size);
|
||||
static int _dictInit(dict *d, dictType *type);
|
||||
static dictEntry *dictGetNext(const dictEntry *de);
|
||||
static dictEntryLink dictGetNextLink(dictEntry *de);
|
||||
static void dictSetNext(dictEntry *de, dictEntry *next);
|
||||
static int dictDefaultCompare(dictCmpCache *cache, const void *key1, const void *key2);
|
||||
|
@ -499,6 +498,12 @@ int dictAdd(dict *d, void *key, void *val)
|
|||
return DICT_OK;
|
||||
}
|
||||
|
||||
int dictCompareKeys(dict *d, const void *key1, const void *key2) {
|
||||
dictCmpCache cache = {0};
|
||||
keyCmpFunc cmpFunc = dictGetCmpFunc(d);
|
||||
return cmpFunc(&cache, key1, key2);
|
||||
}
|
||||
|
||||
/* Low level add or find:
|
||||
* This function adds the entry but instead of setting a value returns the
|
||||
* dictEntry structure to the user, that will make sure to fill the value
|
||||
|
@ -1007,6 +1012,10 @@ double dictIncrDoubleVal(dictEntry *de, double val) {
|
|||
return de->v.d += val;
|
||||
}
|
||||
|
||||
int dictEntryIsKey(const dictEntry *de) {
|
||||
return entryIsKey(de);
|
||||
}
|
||||
|
||||
void *dictGetKey(const dictEntry *de) {
|
||||
/* if entryIsKey() */
|
||||
if ((uintptr_t)de & ENTRY_PTR_IS_ODD_KEY) return (void *) de;
|
||||
|
@ -1044,7 +1053,7 @@ double *dictGetDoubleValPtr(dictEntry *de) {
|
|||
|
||||
/* Returns the 'next' field of the entry or NULL if the entry doesn't have a
|
||||
* 'next' field. */
|
||||
static dictEntry *dictGetNext(const dictEntry *de) {
|
||||
dictEntry *dictGetNext(const dictEntry *de) {
|
||||
if (entryIsKey(de)) return NULL; /* there's no next */
|
||||
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next;
|
||||
return de->next;
|
||||
|
|
|
@ -181,11 +181,6 @@ typedef struct {
|
|||
if ((d)->type->keyDestructor) \
|
||||
(d)->type->keyDestructor((d), dictGetKey(entry))
|
||||
|
||||
#define dictCompareKeys(d, key1, key2) \
|
||||
(((d)->type->keyCompare) ? \
|
||||
(d)->type->keyCompare((d), key1, key2) : \
|
||||
(key1) == (key2))
|
||||
|
||||
#define dictMetadata(d) (&(d)->metadata)
|
||||
#define dictMetadataSize(d) ((d)->type->dictMetadataBytes \
|
||||
? (d)->type->dictMetadataBytes(d) : 0)
|
||||
|
@ -234,6 +229,8 @@ dictEntry * dictFind(dict *d, const void *key);
|
|||
int dictShrinkIfNeeded(dict *d);
|
||||
int dictExpandIfNeeded(dict *d);
|
||||
void *dictGetKey(const dictEntry *de);
|
||||
int dictEntryIsKey(const dictEntry *de);
|
||||
int dictCompareKeys(dict *d, const void *key1, const void *key2);
|
||||
size_t dictMemUsage(const dict *d);
|
||||
size_t dictEntryMemUsage(int noValueDict);
|
||||
dictIterator *dictGetIterator(dict *d);
|
||||
|
@ -242,6 +239,7 @@ void dictInitIterator(dictIterator *iter, dict *d);
|
|||
void dictInitSafeIterator(dictIterator *iter, dict *d);
|
||||
void dictResetIterator(dictIterator *iter);
|
||||
dictEntry *dictNext(dictIterator *iter);
|
||||
dictEntry *dictGetNext(const dictEntry *de);
|
||||
void dictReleaseIterator(dictIterator *iter);
|
||||
dictEntry *dictGetRandomKey(dict *d);
|
||||
dictEntry *dictGetFairRandomKey(dict *d);
|
||||
|
|
|
@ -44,9 +44,9 @@
|
|||
/* Everything below this line is automatically generated by
|
||||
* generate-fmtargs.py. Do not manually edit. */
|
||||
|
||||
#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, N, ...) N
|
||||
#define ARG_N(_1, _2, _3, _4, _5, _6, _7, _8, _9, _10, _11, _12, _13, _14, _15, _16, _17, _18, _19, _20, _21, _22, _23, _24, _25, _26, _27, _28, _29, _30, _31, _32, _33, _34, _35, _36, _37, _38, _39, _40, _41, _42, _43, _44, _45, _46, _47, _48, _49, _50, _51, _52, _53, _54, _55, _56, _57, _58, _59, _60, _61, _62, _63, _64, _65, _66, _67, _68, _69, _70, _71, _72, _73, _74, _75, _76, _77, _78, _79, _80, _81, _82, _83, _84, _85, _86, _87, _88, _89, _90, _91, _92, _93, _94, _95, _96, _97, _98, _99, _100, _101, _102, _103, _104, _105, _106, _107, _108, _109, _110, _111, _112, _113, _114, _115, _116, _117, _118, _119, _120, _121, _122, _123, _124, _125, _126, _127, _128, _129, _130, _131, _132, _133, _134, _135, _136, _137, _138, _139, _140, _141, _142, _143, _144, _145, _146, _147, _148, _149, _150, _151, _152, _153, _154, _155, _156, _157, _158, _159, _160, N, ...) N
|
||||
|
||||
#define RSEQ_N() 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
|
||||
#define RSEQ_N() 160, 159, 158, 157, 156, 155, 154, 153, 152, 151, 150, 149, 148, 147, 146, 145, 144, 143, 142, 141, 140, 139, 138, 137, 136, 135, 134, 133, 132, 131, 130, 129, 128, 127, 126, 125, 124, 123, 122, 121, 120, 119, 118, 117, 116, 115, 114, 113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101, 100, 99, 98, 97, 96, 95, 94, 93, 92, 91, 90, 89, 88, 87, 86, 85, 84, 83, 82, 81, 80, 79, 78, 77, 76, 75, 74, 73, 72, 71, 70, 69, 68, 67, 66, 65, 64, 63, 62, 61, 60, 59, 58, 57, 56, 55, 54, 53, 52, 51, 50, 49, 48, 47, 46, 45, 44, 43, 42, 41, 40, 39, 38, 37, 36, 35, 34, 33, 32, 31, 30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
|
||||
|
||||
#define COMPACT_FMT_2(fmt, value) fmt
|
||||
#define COMPACT_FMT_4(fmt, value, ...) fmt COMPACT_FMT_2(__VA_ARGS__)
|
||||
|
@ -108,6 +108,26 @@
|
|||
#define COMPACT_FMT_116(fmt, value, ...) fmt COMPACT_FMT_114(__VA_ARGS__)
|
||||
#define COMPACT_FMT_118(fmt, value, ...) fmt COMPACT_FMT_116(__VA_ARGS__)
|
||||
#define COMPACT_FMT_120(fmt, value, ...) fmt COMPACT_FMT_118(__VA_ARGS__)
|
||||
#define COMPACT_FMT_122(fmt, value, ...) fmt COMPACT_FMT_120(__VA_ARGS__)
|
||||
#define COMPACT_FMT_124(fmt, value, ...) fmt COMPACT_FMT_122(__VA_ARGS__)
|
||||
#define COMPACT_FMT_126(fmt, value, ...) fmt COMPACT_FMT_124(__VA_ARGS__)
|
||||
#define COMPACT_FMT_128(fmt, value, ...) fmt COMPACT_FMT_126(__VA_ARGS__)
|
||||
#define COMPACT_FMT_130(fmt, value, ...) fmt COMPACT_FMT_128(__VA_ARGS__)
|
||||
#define COMPACT_FMT_132(fmt, value, ...) fmt COMPACT_FMT_130(__VA_ARGS__)
|
||||
#define COMPACT_FMT_134(fmt, value, ...) fmt COMPACT_FMT_132(__VA_ARGS__)
|
||||
#define COMPACT_FMT_136(fmt, value, ...) fmt COMPACT_FMT_134(__VA_ARGS__)
|
||||
#define COMPACT_FMT_138(fmt, value, ...) fmt COMPACT_FMT_136(__VA_ARGS__)
|
||||
#define COMPACT_FMT_140(fmt, value, ...) fmt COMPACT_FMT_138(__VA_ARGS__)
|
||||
#define COMPACT_FMT_142(fmt, value, ...) fmt COMPACT_FMT_140(__VA_ARGS__)
|
||||
#define COMPACT_FMT_144(fmt, value, ...) fmt COMPACT_FMT_142(__VA_ARGS__)
|
||||
#define COMPACT_FMT_146(fmt, value, ...) fmt COMPACT_FMT_144(__VA_ARGS__)
|
||||
#define COMPACT_FMT_148(fmt, value, ...) fmt COMPACT_FMT_146(__VA_ARGS__)
|
||||
#define COMPACT_FMT_150(fmt, value, ...) fmt COMPACT_FMT_148(__VA_ARGS__)
|
||||
#define COMPACT_FMT_152(fmt, value, ...) fmt COMPACT_FMT_150(__VA_ARGS__)
|
||||
#define COMPACT_FMT_154(fmt, value, ...) fmt COMPACT_FMT_152(__VA_ARGS__)
|
||||
#define COMPACT_FMT_156(fmt, value, ...) fmt COMPACT_FMT_154(__VA_ARGS__)
|
||||
#define COMPACT_FMT_158(fmt, value, ...) fmt COMPACT_FMT_156(__VA_ARGS__)
|
||||
#define COMPACT_FMT_160(fmt, value, ...) fmt COMPACT_FMT_158(__VA_ARGS__)
|
||||
|
||||
#define COMPACT_VALUES_2(fmt, value) value
|
||||
#define COMPACT_VALUES_4(fmt, value, ...) value, COMPACT_VALUES_2(__VA_ARGS__)
|
||||
|
@ -169,5 +189,25 @@
|
|||
#define COMPACT_VALUES_116(fmt, value, ...) value, COMPACT_VALUES_114(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_118(fmt, value, ...) value, COMPACT_VALUES_116(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_120(fmt, value, ...) value, COMPACT_VALUES_118(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_122(fmt, value, ...) value, COMPACT_VALUES_120(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_124(fmt, value, ...) value, COMPACT_VALUES_122(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_126(fmt, value, ...) value, COMPACT_VALUES_124(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_128(fmt, value, ...) value, COMPACT_VALUES_126(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_130(fmt, value, ...) value, COMPACT_VALUES_128(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_132(fmt, value, ...) value, COMPACT_VALUES_130(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_134(fmt, value, ...) value, COMPACT_VALUES_132(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_136(fmt, value, ...) value, COMPACT_VALUES_134(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_138(fmt, value, ...) value, COMPACT_VALUES_136(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_140(fmt, value, ...) value, COMPACT_VALUES_138(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_142(fmt, value, ...) value, COMPACT_VALUES_140(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_144(fmt, value, ...) value, COMPACT_VALUES_142(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_146(fmt, value, ...) value, COMPACT_VALUES_144(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_148(fmt, value, ...) value, COMPACT_VALUES_146(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_150(fmt, value, ...) value, COMPACT_VALUES_148(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_152(fmt, value, ...) value, COMPACT_VALUES_150(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_154(fmt, value, ...) value, COMPACT_VALUES_152(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_156(fmt, value, ...) value, COMPACT_VALUES_154(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_158(fmt, value, ...) value, COMPACT_VALUES_156(__VA_ARGS__)
|
||||
#define COMPACT_VALUES_160(fmt, value, ...) value, COMPACT_VALUES_158(__VA_ARGS__)
|
||||
|
||||
#endif
|
||||
|
|
|
@ -337,6 +337,32 @@ int sendPendingClientsToIOThreads(void) {
|
|||
return processed;
|
||||
}
|
||||
|
||||
/* Prefetch the commands from the IO thread. The return value is the number
|
||||
* of clients that have been prefetched. */
|
||||
int prefetchIOThreadCommands(IOThread *t) {
|
||||
/* Since small batch prefetching is not much effective, so if the remaining
|
||||
* is small (less than twice the max batch size), prefetch all of it. */
|
||||
int len = listLength(mainThreadProcessingClients[t->id]);
|
||||
int config_size = getConfigPrefetchBatchSize();
|
||||
int to_prefetch = len < config_size*2 ? len : config_size;
|
||||
if (to_prefetch == 0) return 0;
|
||||
|
||||
int clients = 0;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(mainThreadProcessingClients[t->id], &li);
|
||||
while((ln = listNext(&li)) && clients++ < to_prefetch) {
|
||||
client *c = listNodeValue(ln);
|
||||
/* One command may have several keys, the batch may be full,
|
||||
* so we stop prefetching if failed. */
|
||||
if (addCommandToBatch(c) == C_ERR) break;
|
||||
}
|
||||
|
||||
/* Prefetch the commands in the batch. */
|
||||
prefetchCommands();
|
||||
return clients;
|
||||
}
|
||||
|
||||
extern int ProcessingEventsWhileBlocked;
|
||||
|
||||
/* Send the pending clients to the IO thread if the number of pending clients
|
||||
|
@ -385,8 +411,19 @@ int processClientsFromIOThread(IOThread *t) {
|
|||
size_t processed = listLength(mainThreadProcessingClients[t->id]);
|
||||
if (processed == 0) return 0;
|
||||
|
||||
int prefetch_clients = 0;
|
||||
/* We may call processClientsFromIOThread reentrantly, so we need to
|
||||
* reset the prefetching batch, besides, users may change the config
|
||||
* of prefetch batch size, so we need to reset the prefetching batch. */
|
||||
resetCommandsBatch();
|
||||
|
||||
listNode *node = NULL;
|
||||
while (listLength(mainThreadProcessingClients[t->id])) {
|
||||
/* Prefetch the commands if no clients in the batch. */
|
||||
if (prefetch_clients <= 0) prefetch_clients = prefetchIOThreadCommands(t);
|
||||
/* Reset the prefetching batch if we have processed all clients. */
|
||||
if (--prefetch_clients <= 0) resetCommandsBatch();
|
||||
|
||||
/* Each time we pop up only the first client to process to guarantee
|
||||
* reentrancy safety. */
|
||||
if (node) zfree(node);
|
||||
|
@ -640,6 +677,8 @@ void initThreadedIO(void) {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
prefetchCommandsBatchInit();
|
||||
|
||||
/* Spawn and initialize the I/O threads. */
|
||||
for (int i = 1; i < server.io_threads_num; i++) {
|
||||
IOThread *t = &IOThreads[i];
|
||||
|
|
|
@ -83,7 +83,7 @@ typedef struct {
|
|||
/**********************************/
|
||||
|
||||
/* Get the dictionary pointer based on dict-index. */
|
||||
static dict *kvstoreGetDict(kvstore *kvs, int didx) {
|
||||
dict *kvstoreGetDict(kvstore *kvs, int didx) {
|
||||
return kvs->dicts[didx];
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,7 @@ dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **exis
|
|||
dictEntryLink kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, int *table_index);
|
||||
void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntryLink plink, int table_index);
|
||||
int kvstoreDictDelete(kvstore *kvs, int didx, const void *key);
|
||||
dict *kvstoreGetDict(kvstore *kvs, int didx);
|
||||
kvstoreDictMetadata *kvstoreGetDictMetadata(kvstore *kvs, int didx);
|
||||
kvstoreMetadata *kvstoreGetMetadata(kvstore *kvs);
|
||||
|
||||
|
|
|
@ -0,0 +1,401 @@
|
|||
/*
|
||||
* This file utilizes prefetching keys and data for multiple commands in a batch,
|
||||
* to improve performance by amortizing memory access costs across multiple operations.
|
||||
*
|
||||
* Copyright (c) 2025-Present, Redis Ltd. and contributors.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Copyright (c) 2024-present, Valkey contributors.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Licensed under your choice of (a) the Redis Source Available License 2.0
|
||||
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||
* GNU Affero General Public License v3 (AGPLv3).
|
||||
*
|
||||
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
|
||||
*/
|
||||
|
||||
#include "memory_prefetch.h"
|
||||
#include "server.h"
|
||||
#include "dict.h"
|
||||
|
||||
typedef enum { HT_IDX_FIRST = 0, HT_IDX_SECOND = 1, HT_IDX_INVALID = -1 } HashTableIndex;
|
||||
|
||||
typedef enum {
|
||||
PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */
|
||||
PREFETCH_ENTRY, /* prefetch entries associated with the given key's hash */
|
||||
PREFETCH_KVOBJ, /* prefetch the kv object of the entry found in the previous step */
|
||||
PREFETCH_VALDATA, /* prefetch the value data of the kv object found in the previous step */
|
||||
PREFETCH_DONE /* Indicates that prefetching for this key is complete */
|
||||
} PrefetchState;
|
||||
|
||||
|
||||
/************************************ State machine diagram for the prefetch operation. ********************************
|
||||
│
|
||||
start
|
||||
│
|
||||
┌────────▼─────────┐
|
||||
┌─────────►│ PREFETCH_BUCKET ├────►────────┐
|
||||
│ └────────┬─────────┘ no more tables -> done
|
||||
| bucket|found |
|
||||
│ | │
|
||||
entry not found - goto next table ┌────────▼────────┐ │
|
||||
└────◄─────┤ PREFETCH_ENTRY | ▼
|
||||
┌────────────►└────────┬────────┘ │
|
||||
| Entry│found │
|
||||
│ | │
|
||||
| ┌───────▼────────┐ │
|
||||
│ | PREFETCH_KVOBJ | ▼
|
||||
│ └───────┬────────┘ │
|
||||
kvobj not found - goto next entry | |
|
||||
│ ┌───────────▼────────────┐ │
|
||||
└──────◄───│ PREFETCH_VALDATA │ ▼
|
||||
└───────────┬────────────┘ │
|
||||
| │
|
||||
┌───────-─▼─────────────┐ │
|
||||
│ PREFETCH_DONE │◄────────┘
|
||||
└───────────────────────┘
|
||||
**********************************************************************************************************************/
|
||||
|
||||
typedef void *(*GetValueDataFunc)(const void *val);
|
||||
|
||||
typedef struct KeyPrefetchInfo {
|
||||
PrefetchState state; /* Current state of the prefetch operation */
|
||||
HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
|
||||
uint64_t bucket_idx; /* Index of the bucket in the current hash table */
|
||||
uint64_t key_hash; /* Hash value of the key being prefetched */
|
||||
dictEntry *current_entry; /* Pointer to the current entry being processed */
|
||||
kvobj *current_kv; /* Pointer to the kv object being prefetched */
|
||||
} KeyPrefetchInfo;
|
||||
|
||||
/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */
|
||||
typedef struct PrefetchCommandsBatch {
|
||||
size_t cur_idx; /* Index of the current key being processed */
|
||||
size_t keys_done; /* Number of keys that have been prefetched */
|
||||
size_t key_count; /* Number of keys in the current batch */
|
||||
size_t client_count; /* Number of clients in the current batch */
|
||||
size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */
|
||||
size_t executed_commands; /* Number of commands executed in the current batch */
|
||||
int *slots; /* Array of slots for each key */
|
||||
void **keys; /* Array of keys to prefetch in the current batch */
|
||||
client **clients; /* Array of clients in the current batch */
|
||||
dict **keys_dicts; /* Main dict for each key */
|
||||
dict **current_dicts; /* Points to dict to prefetch from */
|
||||
KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */
|
||||
GetValueDataFunc get_value_data_func; /* Function to get the value data */
|
||||
} PrefetchCommandsBatch;
|
||||
|
||||
static PrefetchCommandsBatch *batch = NULL;
|
||||
|
||||
void freePrefetchCommandsBatch(void) {
|
||||
if (batch == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
zfree(batch->clients);
|
||||
zfree(batch->keys);
|
||||
zfree(batch->keys_dicts);
|
||||
zfree(batch->slots);
|
||||
zfree(batch->prefetch_info);
|
||||
zfree(batch);
|
||||
batch = NULL;
|
||||
}
|
||||
|
||||
void prefetchCommandsBatchInit(void) {
|
||||
serverAssert(!batch);
|
||||
|
||||
/* To avoid prefetching small batches, we set the max size to twice
|
||||
* the configured size, so if not exceeding twice the limit, we can
|
||||
* prefetch all of it. */
|
||||
size_t max_prefetch_size = server.prefetch_batch_max_size * 2;
|
||||
|
||||
if (max_prefetch_size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
batch = zcalloc(sizeof(PrefetchCommandsBatch));
|
||||
batch->max_prefetch_size = max_prefetch_size;
|
||||
batch->clients = zcalloc(max_prefetch_size * sizeof(client *));
|
||||
batch->keys = zcalloc(max_prefetch_size * sizeof(void *));
|
||||
batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *));
|
||||
batch->slots = zcalloc(max_prefetch_size * sizeof(int));
|
||||
batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo));
|
||||
}
|
||||
|
||||
void onMaxBatchSizeChange(void) {
|
||||
if (batch && batch->client_count > 0) {
|
||||
/* We need to process the current batch before updating the size */
|
||||
return;
|
||||
}
|
||||
|
||||
freePrefetchCommandsBatch();
|
||||
prefetchCommandsBatchInit();
|
||||
}
|
||||
|
||||
/* Prefetch the given pointer and move to the next key in the batch. */
|
||||
static inline void prefetchAndMoveToNextKey(void *addr) {
|
||||
redis_prefetch(addr);
|
||||
/* While the prefetch is in progress, we can continue to the next key */
|
||||
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
|
||||
}
|
||||
|
||||
static inline void markKeyAsdone(KeyPrefetchInfo *info) {
|
||||
info->state = PREFETCH_DONE;
|
||||
server.stat_total_prefetch_entries++;
|
||||
batch->keys_done++;
|
||||
}
|
||||
|
||||
/* Returns the next KeyPrefetchInfo structure that needs to be processed. */
|
||||
static KeyPrefetchInfo *getNextPrefetchInfo(void) {
|
||||
size_t start_idx = batch->cur_idx;
|
||||
do {
|
||||
KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
|
||||
if (info->state != PREFETCH_DONE) return info;
|
||||
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
|
||||
} while (batch->cur_idx != start_idx);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void initBatchInfo(dict **dicts, GetValueDataFunc func) {
|
||||
batch->current_dicts = dicts;
|
||||
batch->get_value_data_func = func;
|
||||
|
||||
/* Initialize the prefetch info */
|
||||
for (size_t i = 0; i < batch->key_count; i++) {
|
||||
KeyPrefetchInfo *info = &batch->prefetch_info[i];
|
||||
if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) {
|
||||
info->state = PREFETCH_DONE;
|
||||
batch->keys_done++;
|
||||
continue;
|
||||
}
|
||||
info->ht_idx = HT_IDX_INVALID;
|
||||
info->current_entry = NULL;
|
||||
info->current_kv = NULL;
|
||||
info->state = PREFETCH_BUCKET;
|
||||
info->key_hash = dictGetHash(batch->current_dicts[i], batch->keys[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/* Prefetch the bucket of the next hash table index.
|
||||
* If no tables are left, move to the PREFETCH_DONE state. */
|
||||
static void prefetchBucket(KeyPrefetchInfo *info) {
|
||||
size_t i = batch->cur_idx;
|
||||
|
||||
/* Determine which hash table to use */
|
||||
if (info->ht_idx == HT_IDX_INVALID) {
|
||||
info->ht_idx = HT_IDX_FIRST;
|
||||
} else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) {
|
||||
info->ht_idx = HT_IDX_SECOND;
|
||||
} else {
|
||||
/* No more tables left - mark as done. */
|
||||
markKeyAsdone(info);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Prefetch the bucket */
|
||||
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]);
|
||||
prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
|
||||
info->current_entry = NULL;
|
||||
info->state = PREFETCH_ENTRY;
|
||||
}
|
||||
|
||||
/* Prefetch the entry in the bucket and move to the PREFETCH_KVOBJ state.
|
||||
* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */
|
||||
static void prefetchEntry(KeyPrefetchInfo *info) {
|
||||
size_t i = batch->cur_idx;
|
||||
|
||||
if (info->current_entry) {
|
||||
/* We already found an entry in the bucket - move to the next entry */
|
||||
info->current_entry = dictGetNext(info->current_entry);
|
||||
} else {
|
||||
/* Go to the first entry in the bucket */
|
||||
info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx];
|
||||
}
|
||||
|
||||
if (info->current_entry) {
|
||||
prefetchAndMoveToNextKey(info->current_entry);
|
||||
info->current_kv = NULL;
|
||||
info->state = PREFETCH_KVOBJ;
|
||||
} else {
|
||||
/* No entry found in the bucket - try the bucket in the next table */
|
||||
info->state = PREFETCH_BUCKET;
|
||||
}
|
||||
}
|
||||
|
||||
/* Prefetch the kv object in the dict entry, and to the PREFETCH_VALDATA state. */
|
||||
static inline void prefetchKVOject(KeyPrefetchInfo *info) {
|
||||
kvobj *kv = dictGetKey(info->current_entry);
|
||||
int is_kv = dictEntryIsKey(info->current_entry);
|
||||
|
||||
info->current_kv = kv;
|
||||
info->state = PREFETCH_VALDATA;
|
||||
/* If the entry is a pointer of kv object, we don't need to prefetch it */
|
||||
if (!is_kv) prefetchAndMoveToNextKey(kv);
|
||||
}
|
||||
|
||||
/* Prefetch the value data of the kv object found in dict entry. */
|
||||
static void prefetchValueData(KeyPrefetchInfo *info) {
|
||||
size_t i = batch->cur_idx;
|
||||
kvobj *kv = info->current_kv;
|
||||
|
||||
/* 1. If this is the last element, we assume a hit and don't compare the keys
|
||||
* 2. This kv object is the target of the lookup. */
|
||||
if ((!dictGetNext(info->current_entry) && !dictIsRehashing(batch->current_dicts[i])) ||
|
||||
dictCompareKeys(batch->current_dicts[i], batch->keys[i], kv))
|
||||
{
|
||||
if (batch->get_value_data_func) {
|
||||
void *value_data = batch->get_value_data_func(kv);
|
||||
if (value_data) prefetchAndMoveToNextKey(value_data);
|
||||
}
|
||||
markKeyAsdone(info);
|
||||
} else {
|
||||
/* Not found in the current entry, move to the next entry */
|
||||
info->state = PREFETCH_ENTRY;
|
||||
}
|
||||
}
|
||||
|
||||
/* Prefetch dictionary data for an array of keys.
|
||||
*
|
||||
* This function takes an array of dictionaries and keys, attempting to bring
|
||||
* data closer to the L1 cache that might be needed for dictionary operations
|
||||
* on those keys.
|
||||
*
|
||||
* The dictFind algorithm:
|
||||
* 1. Evaluate the hash of the key
|
||||
* 2. Access the index in the first table
|
||||
* 3. Walk the entries linked list until the key is found
|
||||
* If the key hasn't been found and the dictionary is in the middle of rehashing,
|
||||
* access the index on the second table and repeat step 3
|
||||
*
|
||||
* dictPrefetch executes the same algorithm as dictFind, but one step at a time
|
||||
* for each key. Instead of waiting for data to be read from memory, it prefetches
|
||||
* the data and then moves on to execute the next prefetch for another key.
|
||||
*
|
||||
* dicts - An array of dictionaries to prefetch data from.
|
||||
* get_val_data_func - A callback function that dictPrefetch can invoke
|
||||
* to bring the key's value data closer to the L1 cache as well.
|
||||
*/
|
||||
static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) {
|
||||
initBatchInfo(dicts, get_val_data_func);
|
||||
KeyPrefetchInfo *info;
|
||||
while ((info = getNextPrefetchInfo())) {
|
||||
switch (info->state) {
|
||||
case PREFETCH_BUCKET: prefetchBucket(info); break;
|
||||
case PREFETCH_ENTRY: prefetchEntry(info); break;
|
||||
case PREFETCH_KVOBJ: prefetchKVOject(info); break;
|
||||
case PREFETCH_VALDATA: prefetchValueData(info); break;
|
||||
default: serverPanic("Unknown prefetch state %d", info->state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Helper function to get the value pointer of a kv object. */
|
||||
static void *getObjectValuePtr(const void *value) {
|
||||
kvobj *kv = (kvobj *)value;
|
||||
return (kv->type == OBJ_STRING && kv->encoding == OBJ_ENCODING_RAW) ? kv->ptr : NULL;
|
||||
}
|
||||
|
||||
void resetCommandsBatch(void) {
|
||||
if (batch == NULL) {
|
||||
/* Handle the case where prefetching becomes enabled from disabled. */
|
||||
if (server.prefetch_batch_max_size) prefetchCommandsBatchInit();
|
||||
return;
|
||||
}
|
||||
|
||||
batch->cur_idx = 0;
|
||||
batch->keys_done = 0;
|
||||
batch->key_count = 0;
|
||||
batch->client_count = 0;
|
||||
batch->executed_commands = 0;
|
||||
|
||||
/* Handle the case where the max prefetch size has been changed. */
|
||||
if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size * 2) {
|
||||
onMaxBatchSizeChange();
|
||||
}
|
||||
}
|
||||
|
||||
/* The config of max prefetch size may be changed during running, the function
|
||||
* can get the size when initializing the batch. */
|
||||
int getConfigPrefetchBatchSize(void) {
|
||||
if (!batch) return 0;
|
||||
/* We double the size when initializing the batch, so divide it by 2. */
|
||||
return batch->max_prefetch_size / 2;
|
||||
}
|
||||
|
||||
/* Prefetch command-related data:
|
||||
* 1. Prefetch the command arguments allocated by the I/O thread to bring them
|
||||
* closer to the L1 cache.
|
||||
* 2. Prefetch the keys and values for all commands in the current batch from
|
||||
* the main dictionaries. */
|
||||
void prefetchCommands(void) {
|
||||
if (!batch) return;
|
||||
|
||||
/* Prefetch argv's for all clients */
|
||||
for (size_t i = 0; i < batch->client_count; i++) {
|
||||
client *c = batch->clients[i];
|
||||
if (!c || c->argc <= 1) continue;
|
||||
/* Skip prefetching first argv (cmd name) it was already looked up by
|
||||
* the I/O thread. */
|
||||
for (int j = 1; j < c->argc; j++) {
|
||||
redis_prefetch(c->argv[j]);
|
||||
}
|
||||
}
|
||||
|
||||
/* Prefetch the argv->ptr if required */
|
||||
for (size_t i = 0; i < batch->client_count; i++) {
|
||||
client *c = batch->clients[i];
|
||||
if (!c || c->argc <= 1) continue;
|
||||
for (int j = 1; j < c->argc; j++) {
|
||||
if (c->argv[j]->encoding == OBJ_ENCODING_RAW) {
|
||||
redis_prefetch(c->argv[j]->ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Get the keys ptrs - we do it here after the key obj was prefetched. */
|
||||
for (size_t i = 0; i < batch->key_count; i++) {
|
||||
batch->keys[i] = ((robj *)batch->keys[i])->ptr;
|
||||
}
|
||||
|
||||
/* Prefetch dict keys for all commands.
|
||||
* Prefetching is beneficial only if there are more than one key. */
|
||||
if (batch->key_count > 1) {
|
||||
server.stat_total_prefetch_batches++;
|
||||
/* Prefetch keys from the main dict */
|
||||
dictPrefetch(batch->keys_dicts, getObjectValuePtr);
|
||||
}
|
||||
}
|
||||
|
||||
/* Adds the client's command to the current batch.
|
||||
*
|
||||
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
|
||||
int addCommandToBatch(client *c) {
|
||||
if (unlikely(!batch)) return C_ERR;
|
||||
|
||||
/* If the batch is full, process it.
|
||||
* We also check the client count to handle cases where
|
||||
* no keys exist for the clients' commands. */
|
||||
if (batch->client_count == batch->max_prefetch_size ||
|
||||
batch->key_count == batch->max_prefetch_size)
|
||||
{
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
batch->clients[batch->client_count++] = c;
|
||||
|
||||
if (likely(c->iolookedcmd)) {
|
||||
/* Get command's keys positions */
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
|
||||
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
|
||||
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
|
||||
batch->slots[batch->key_count] = c->slot > 0 ? c->slot : 0;
|
||||
batch->keys_dicts[batch->key_count] =
|
||||
kvstoreGetDict(c->db->keys, batch->slots[batch->key_count]);
|
||||
batch->key_count++;
|
||||
}
|
||||
getKeysFreeResult(&result);
|
||||
}
|
||||
|
||||
return C_OK;
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright (c) 2025-Present, Redis Ltd.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Copyright (c) 2024-present, Valkey contributors.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Licensed under your choice of (a) the Redis Source Available License 2.0
|
||||
* (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
|
||||
* GNU Affero General Public License v3 (AGPLv3).
|
||||
*
|
||||
* Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
|
||||
*/
|
||||
|
||||
#ifndef MEMORY_PREFETCH_H
|
||||
#define MEMORY_PREFETCH_H
|
||||
|
||||
struct client;
|
||||
|
||||
void prefetchCommandsBatchInit(void);
|
||||
int getConfigPrefetchBatchSize(void);
|
||||
int addCommandToBatch(struct client *c);
|
||||
void resetCommandsBatch(void);
|
||||
void prefetchCommands(void);
|
||||
|
||||
#endif /* MEMORY_PREFETCH_H */
|
|
@ -2838,6 +2838,7 @@ int processInputBuffer(client *c) {
|
|||
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
|
||||
c->io_flags |= CLIENT_IO_PENDING_COMMAND;
|
||||
c->iolookedcmd = lookupCommand(c->argv, c->argc);
|
||||
c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc);
|
||||
enqueuePendingClientsToMainThread(c, 0);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -2722,6 +2722,8 @@ void resetServerStats(void) {
|
|||
server.stat_reply_buffer_shrinks = 0;
|
||||
server.stat_reply_buffer_expands = 0;
|
||||
server.stat_cluster_incompatible_ops = 0;
|
||||
server.stat_total_prefetch_batches = 0;
|
||||
server.stat_total_prefetch_entries = 0;
|
||||
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
|
||||
server.el_cmd_cnt_max = 0;
|
||||
lazyfreeResetStats();
|
||||
|
@ -6197,6 +6199,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
|||
"total_writes_processed:%lld\r\n", stat_total_writes_processed,
|
||||
"io_threaded_reads_processed:%lld\r\n", stat_io_reads_processed,
|
||||
"io_threaded_writes_processed:%lld\r\n", stat_io_writes_processed,
|
||||
"io_threaded_total_prefetch_batches:%lld\r\n", server.stat_total_prefetch_batches,
|
||||
"io_threaded_total_prefetch_entries:%lld\r\n", server.stat_total_prefetch_entries,
|
||||
"client_query_buffer_limit_disconnections:%lld\r\n", stat_client_qbuf_limit_disconnections,
|
||||
"client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections,
|
||||
"reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks,
|
||||
|
|
|
@ -63,6 +63,7 @@ typedef long long ustime_t; /* microsecond time type. */
|
|||
#include "rax.h" /* Radix tree */
|
||||
#include "connection.h" /* Connection abstraction */
|
||||
#include "eventnotifier.h" /* Event notification */
|
||||
#include "memory_prefetch.h"
|
||||
|
||||
#define REDISMODULE_CORE 1
|
||||
typedef struct redisObject robj;
|
||||
|
@ -1833,6 +1834,7 @@ struct redisServer {
|
|||
int io_threads_clients_num[IO_THREADS_MAX_NUM]; /* Number of clients assigned to each IO thread. */
|
||||
int io_threads_do_reads; /* Read and parse from IO threads? */
|
||||
int io_threads_active; /* Is IO threads currently active? */
|
||||
int prefetch_batch_max_size;/* Maximum number of keys to prefetch in a single batch */
|
||||
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
|
||||
int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */
|
||||
int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */
|
||||
|
@ -1908,6 +1910,8 @@ struct redisServer {
|
|||
redisAtomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
|
||||
long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */
|
||||
long long stat_cluster_incompatible_ops; /* Number of operations that are incompatible with cluster mode */
|
||||
long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */
|
||||
long long stat_total_prefetch_batches; /* Total number of prefetched batches */
|
||||
/* The following two are used to track instantaneous metrics, like
|
||||
* number of operations per second, network traffic. */
|
||||
struct {
|
||||
|
@ -3686,6 +3690,7 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
|
|||
int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result);
|
||||
keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys);
|
||||
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc);
|
||||
int doesCommandHaveKeys(struct redisCommand *cmd);
|
||||
int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
|
||||
int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags);
|
||||
|
|
|
@ -170,3 +170,143 @@ start_server {config "minimal.conf" tags {"external:skip"}} {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes} io-threads 2}} {
|
||||
set server_pid [s process_id]
|
||||
# Since each thread may perform memory prefetch independently, this test is
|
||||
# only run when the number of IO threads is 2 to ensure deterministic results.
|
||||
if {[r config get io-threads] eq "io-threads 2"} {
|
||||
test {prefetch works as expected when killing a client from the middle of prefetch commands batch} {
|
||||
# Create 16 (prefetch batch size) +1 clients
|
||||
for {set i 0} {$i < 16} {incr i} {
|
||||
set rd$i [redis_deferring_client]
|
||||
}
|
||||
|
||||
# set a key that will be later be prefetch
|
||||
r set a 0
|
||||
|
||||
# Get the client ID of rd4
|
||||
$rd4 client id
|
||||
set rd4_id [$rd4 read]
|
||||
|
||||
# Create a batch of commands by suspending the server for a while
|
||||
# before responding to the first command
|
||||
pause_process $server_pid
|
||||
|
||||
# The first client will kill the fourth client
|
||||
$rd0 client kill id $rd4_id
|
||||
|
||||
# Send set commands for all clients except the first
|
||||
for {set i 1} {$i < 16} {incr i} {
|
||||
[set rd$i] set a $i
|
||||
[set rd$i] flush
|
||||
}
|
||||
|
||||
# Resume the server
|
||||
resume_process $server_pid
|
||||
|
||||
# Read the results
|
||||
assert_equal {1} [$rd0 read]
|
||||
catch {$rd4 read} err
|
||||
assert_match {I/O error reading reply} $err
|
||||
|
||||
# verify the prefetch stats are as expected
|
||||
set info [r info stats]
|
||||
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
|
||||
assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower
|
||||
set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches]
|
||||
assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher
|
||||
|
||||
# Verify the final state
|
||||
$rd15 get a
|
||||
assert_equal {OK} [$rd15 read]
|
||||
assert_equal {15} [$rd15 read]
|
||||
}
|
||||
|
||||
test {prefetch works as expected when changing the batch size while executing the commands batch} {
|
||||
# Create 16 (default prefetch batch size) clients
|
||||
for {set i 0} {$i < 16} {incr i} {
|
||||
set rd$i [redis_deferring_client]
|
||||
}
|
||||
|
||||
# Create a batch of commands by suspending the server for a while
|
||||
# before responding to the first command
|
||||
pause_process $server_pid
|
||||
|
||||
# Send set commands for all clients the 5th client will change the prefetch batch size
|
||||
for {set i 0} {$i < 16} {incr i} {
|
||||
if {$i == 4} {
|
||||
[set rd$i] config set prefetch-batch-max-size 1
|
||||
}
|
||||
[set rd$i] set a $i
|
||||
[set rd$i] flush
|
||||
}
|
||||
# Resume the server
|
||||
resume_process $server_pid
|
||||
# Read the results
|
||||
for {set i 0} {$i < 16} {incr i} {
|
||||
assert_equal {OK} [[set rd$i] read]
|
||||
[set rd$i] close
|
||||
}
|
||||
|
||||
# assert the configured prefetch batch size was changed
|
||||
assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"}
|
||||
}
|
||||
|
||||
proc do_prefetch_batch {server_pid batch_size} {
|
||||
# Create clients
|
||||
for {set i 0} {$i < $batch_size} {incr i} {
|
||||
set rd$i [redis_deferring_client]
|
||||
}
|
||||
|
||||
# Suspend the server to batch the commands
|
||||
pause_process $server_pid
|
||||
|
||||
# Send commands from all clients
|
||||
for {set i 0} {$i < $batch_size} {incr i} {
|
||||
[set rd$i] set a $i
|
||||
[set rd$i] flush
|
||||
}
|
||||
|
||||
# Resume the server to process the batch
|
||||
resume_process $server_pid
|
||||
|
||||
# Verify responses
|
||||
for {set i 0} {$i < $batch_size} {incr i} {
|
||||
assert_equal {OK} [[set rd$i] read]
|
||||
[set rd$i] close
|
||||
}
|
||||
}
|
||||
|
||||
test {no prefetch when the batch size is set to 0} {
|
||||
# set the batch size to 0
|
||||
r config set prefetch-batch-max-size 0
|
||||
# save the current value of prefetch entries
|
||||
set info [r info stats]
|
||||
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
|
||||
|
||||
do_prefetch_batch $server_pid 16
|
||||
|
||||
# assert the prefetch entries did not change
|
||||
set info [r info stats]
|
||||
set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
|
||||
assert_equal $prefetch_entries $new_prefetch_entries
|
||||
}
|
||||
|
||||
test {Prefetch can resume working when the configuration option is set to a non-zero value} {
|
||||
# save the current value of prefetch entries
|
||||
set info [r info stats]
|
||||
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
|
||||
# set the batch size to 0
|
||||
r config set prefetch-batch-max-size 16
|
||||
|
||||
do_prefetch_batch $server_pid 16
|
||||
|
||||
# assert the prefetch entries did not change
|
||||
set info [r info stats]
|
||||
set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
|
||||
# With slower machines, the number of prefetch entries can be lower
|
||||
assert_range $new_prefetch_entries [expr {$prefetch_entries + 2}] [expr {$prefetch_entries + 16}]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
# Outputs the generated part of src/fmtargs.h
|
||||
MAX_ARGS = 120
|
||||
MAX_ARGS = 160
|
||||
|
||||
import os
|
||||
print("/* Everything below this line is automatically generated by")
|
||||
|
|
Loading…
Reference in New Issue