diff --git a/src/Makefile b/src/Makefile index 40079b9f3..92ca7ca16 100644 --- a/src/Makefile +++ b/src/Makefile @@ -363,7 +363,7 @@ endif REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX) REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX) -REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o +REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX) diff --git a/src/config.h b/src/config.h index 47338554a..b7ed30884 100644 --- a/src/config.h +++ b/src/config.h @@ -115,9 +115,11 @@ #endif #if HAS_BUILTIN_PREFETCH +#define redis_prefetch(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */ #define redis_prefetch_read(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */ #define redis_prefetch_write(addr) __builtin_prefetch(addr, 1, 3) /* Write with high locality */ #else +#define redis_prefetch(addr) ((void)(addr)) /* No-op if unsupported */ #define redis_prefetch_read(addr) ((void)(addr)) /* No-op if unsupported */ #define redis_prefetch_write(addr) ((void)(addr)) /* No-op if unsupported */ #endif diff --git a/src/dict.c b/src/dict.c index 3f60bd165..aaac8506e 100644 --- a/src/dict.c +++ b/src/dict.c @@ -66,7 +66,6 @@ static void _dictShrinkIfNeeded(dict *d); static void _dictRehashStepIfNeeded(dict *d, uint64_t visitedIdx); static signed char _dictNextExp(unsigned long size); static int _dictInit(dict *d, dictType *type); -static dictEntry *dictGetNext(const dictEntry *de); static dictEntry **dictGetNextRef(dictEntry *de); static void dictSetNext(dictEntry *de, dictEntry *next); static int dictDefaultCompare(dict *d, const void *key1, const void *key2); @@ -960,7 +959,7 @@ double *dictGetDoubleValPtr(dictEntry *de) { /* Returns the 'next' field of the entry or NULL if the entry doesn't have a * 'next' field. */ -static dictEntry *dictGetNext(const dictEntry *de) { +dictEntry *dictGetNext(const dictEntry *de) { if (entryIsKey(de)) return NULL; /* there's no next */ if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next; return de->next; diff --git a/src/dict.h b/src/dict.h index b4bf581b9..af00901d9 100644 --- a/src/dict.h +++ b/src/dict.h @@ -185,6 +185,7 @@ typedef struct { #define dictPauseAutoResize(d) ((d)->pauseAutoResize++) #define dictResumeAutoResize(d) ((d)->pauseAutoResize--) #define dictUseStoredKeyApi(d, flag) ((d)->useStoredKeyApi = (flag)) +#define dictBucketHashKey(d, key) ((d)->type->hashFunction(key)) /* If our unsigned long type can store a 64 bit number, use a 64 bit PRNG. */ #if ULONG_MAX >= 0xffffffffffffffff @@ -247,6 +248,7 @@ void dictInitIterator(dictIterator *iter, dict *d); void dictInitSafeIterator(dictIterator *iter, dict *d); void dictResetIterator(dictIterator *iter); dictEntry *dictNext(dictIterator *iter); +dictEntry *dictGetNext(const dictEntry *de); void dictReleaseIterator(dictIterator *iter); dictEntry *dictGetRandomKey(dict *d); dictEntry *dictGetFairRandomKey(dict *d); diff --git a/src/iothread.c b/src/iothread.c index d643905d6..bb9b591b1 100644 --- a/src/iothread.c +++ b/src/iothread.c @@ -309,6 +309,26 @@ int sendPendingClientsToIOThreads(void) { return processed; } +int prefetchIOThreadCommand(IOThread *t) { + int len = listLength(mainThreadProcessingClients[t->id]); + if (len < 2) return 0; + + int iterate = 0; + int prefetch = len < server.prefetch_batch_max_size*2 ? len : + server.prefetch_batch_max_size; + + listIter li; + listNode *ln; + listRewind(mainThreadProcessingClients[t->id], &li); + while((ln = listNext(&li)) && iterate++ < prefetch) { + client *c = listNodeValue(ln); + addCommandToBatch(c); + } + prefetchCommands(); + + return prefetch; +} + extern int ProcessingEventsWhileBlocked; /* The main thread processes the clients from IO threads, these clients may have @@ -321,9 +341,23 @@ extern int ProcessingEventsWhileBlocked; * process new events, if the clients with fired events from the same io thread, * it may call this function reentrantly. */ void processClientsFromIOThread(IOThread *t) { + /* Get the list of clients to process. */ + pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]); + listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]); + pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]); + size_t processed = listLength(mainThreadProcessingClients[t->id]); + if (processed == 0) return; + + int prefetch = 0; listNode *node = NULL; while (listLength(mainThreadProcessingClients[t->id])) { + + if (server.prefetch_batch_max_size) { + if (prefetch <= 0) prefetch = prefetchIOThreadCommand(t); + if (--prefetch <= 0) resetCommandsBatch(); + } + /* Each time we pop up only the first client to process to guarantee * reentrancy safety. */ if (node) zfree(node); @@ -386,6 +420,7 @@ void processClientsFromIOThread(IOThread *t) { node = NULL; } if (node) zfree(node); + resetCommandsBatch(); /* Trigger the io thread to handle these clients ASAP to make them processed * in parallel. @@ -561,6 +596,8 @@ void initThreadedIO(void) { exit(1); } + prefetchCommandsBatchInit(); + /* Spawn and initialize the I/O threads. */ for (int i = 1; i < server.io_threads_num; i++) { IOThread *t = &IOThreads[i]; diff --git a/src/kvstore.c b/src/kvstore.c index ef29bd4d0..718f473d2 100644 --- a/src/kvstore.c +++ b/src/kvstore.c @@ -84,7 +84,7 @@ typedef struct { /**********************************/ /* Get the dictionary pointer based on dict-index. */ -static dict *kvstoreGetDict(kvstore *kvs, int didx) { +dict *kvstoreGetDict(kvstore *kvs, int didx) { return kvs->dicts[didx]; } diff --git a/src/kvstore.h b/src/kvstore.h index e2431a94e..de67f54ee 100644 --- a/src/kvstore.h +++ b/src/kvstore.h @@ -101,6 +101,7 @@ void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val); dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index); void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index); int kvstoreDictDelete(kvstore *kvs, int didx, const void *key); +dict *kvstoreGetDict(kvstore *kvs, int didx); kvstoreDictMetadata *kvstoreGetDictMetadata(kvstore *kvs, int didx); kvstoreMetadata *kvstoreGetMetadata(kvstore *kvs); diff --git a/src/memory_prefetch.c b/src/memory_prefetch.c new file mode 100644 index 000000000..85b6ba416 --- /dev/null +++ b/src/memory_prefetch.c @@ -0,0 +1,407 @@ +/* +* Copyright Valkey Contributors. +* All rights reserved. +* SPDX-License-Identifier: BSD 3-Clause +* +* This file utilizes prefetching keys and data for multiple commands in a batch, +* to improve performance by amortizing memory access costs across multiple operations. +*/ + +#include "memory_prefetch.h" +#include "server.h" +#include "dict.h" + +typedef enum { HT_IDX_FIRST = 0, HT_IDX_SECOND = 1, HT_IDX_INVALID = -1 } HashTableIndex; + +typedef enum { + PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */ + PREFETCH_ENTRY, /* prefetch entries associated with the given key's hash */ + PREFETCH_VALUE, /* prefetch the value object of the entry found in the previous step */ + PREFETCH_VALUE_DATA, /* prefetch the value object's data (if applicable) */ + PREFETCH_DONE /* Indicates that prefetching for this key is complete */ +} PrefetchState; + + +/************************************ State machine diagram for the prefetch operation. ******************************** + │ + start + │ + ┌────────▼─────────┐ + ┌─────────►│ PREFETCH_BUCKET ├────►────────┐ + │ └────────┬─────────┘ no more tables -> done + | bucket|found | + │ | │ + entry not found - goto next table ┌────────▼────────┐ │ + └────◄─────┤ PREFETCH_ENTRY | ▼ + ┌────────────►└────────┬────────┘ │ + | Entry│found │ + │ | │ + value not found - goto next entry ┌───────▼────────┐ | + └───────◄──────┤ PREFETCH_VALUE | ▼ + └───────┬────────┘ │ + Value│found │ + | | + ┌───────────▼──────────────┐ │ + │ PREFETCH_VALUE_DATA │ ▼ + └───────────┬──────────────┘ │ + | │ + ┌───────-─▼─────────────┐ │ + │ PREFETCH_DONE │◄────────┘ + └───────────────────────┘ +**********************************************************************************************************************/ + +typedef void *(*GetValueDataFunc)(const void *val); + +typedef struct KeyPrefetchInfo { + PrefetchState state; /* Current state of the prefetch operation */ + HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */ + uint64_t bucket_idx; /* Index of the bucket in the current hash table */ + uint64_t key_hash; /* Hash value of the key being prefetched */ + dictEntry *current_entry; /* Pointer to the current entry being processed */ +} KeyPrefetchInfo; + +/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */ +typedef struct PrefetchCommandsBatch { + size_t cur_idx; /* Index of the current key being processed */ + size_t keys_done; /* Number of keys that have been prefetched */ + size_t key_count; /* Number of keys in the current batch */ + size_t client_count; /* Number of clients in the current batch */ + size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */ + size_t executed_commands; /* Number of commands executed in the current batch */ + int *slots; /* Array of slots for each key */ + void **keys; /* Array of keys to prefetch in the current batch */ + client **clients; /* Array of clients in the current batch */ + dict **keys_dicts; /* Main dict for each key */ + dict **expire_dicts; /* Expire dict for each key */ + dict **current_dicts; /* Points to either keys_dicts or expire_dicts */ + KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */ +} PrefetchCommandsBatch; + +static PrefetchCommandsBatch *batch = NULL; + +void freePrefetchCommandsBatch(void) { + if (batch == NULL) { + return; + } + + zfree(batch->clients); + zfree(batch->keys); + zfree(batch->keys_dicts); + zfree(batch->expire_dicts); + zfree(batch->slots); + zfree(batch->prefetch_info); + zfree(batch); + batch = NULL; +} + +void prefetchCommandsBatchInit(void) { + serverAssert(!batch); + size_t max_prefetch_size = server.prefetch_batch_max_size * 2; + + if (max_prefetch_size == 0) { + return; + } + + batch = zcalloc(sizeof(PrefetchCommandsBatch)); + batch->max_prefetch_size = max_prefetch_size; + batch->clients = zcalloc(max_prefetch_size * sizeof(client *)); + batch->keys = zcalloc(max_prefetch_size * sizeof(void *)); + batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *)); + batch->slots = zcalloc(max_prefetch_size * sizeof(int)); + batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo)); +} + +void onMaxBatchSizeChange(void) { + if (batch && batch->client_count > 0) { + /* We need to process the current batch before updating the size */ + return; + } + + freePrefetchCommandsBatch(); + prefetchCommandsBatchInit(); +} + +/* Prefetch the given pointer and move to the next key in the batch. */ +static void prefetchAndMoveToNextKey(void *addr) { + redis_prefetch(addr); + /* While the prefetch is in progress, we can continue to the next key */ + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; +} + +static void markKeyAsdone(KeyPrefetchInfo *info) { + info->state = PREFETCH_DONE; + server.stat_total_prefetch_entries++; + batch->keys_done++; +} + +/* Returns the next KeyPrefetchInfo structure that needs to be processed. */ +static KeyPrefetchInfo *getNextPrefetchInfo(void) { + size_t start_idx = batch->cur_idx; + do { + KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx]; + if (info->state != PREFETCH_DONE) return info; + batch->cur_idx = (batch->cur_idx + 1) % batch->key_count; + } while (batch->cur_idx != start_idx); + return NULL; +} + +static void initBatchInfo(dict **dicts) { + batch->current_dicts = dicts; + + /* Initialize the prefetch info */ + for (size_t i = 0; i < batch->key_count; i++) { + KeyPrefetchInfo *info = &batch->prefetch_info[i]; + if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) { + info->state = PREFETCH_DONE; + batch->keys_done++; + continue; + } + info->ht_idx = HT_IDX_INVALID; + info->current_entry = NULL; + info->state = PREFETCH_BUCKET; + info->key_hash = dictBucketHashKey(batch->current_dicts[i], batch->keys[i]); + } +} + +/* Prefetch the bucket of the next hash table index. +* If no tables are left, move to the PREFETCH_DONE state. */ +static void prefetchBucket(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + + /* Determine which hash table to use */ + if (info->ht_idx == HT_IDX_INVALID) { + info->ht_idx = HT_IDX_FIRST; + } else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) { + info->ht_idx = HT_IDX_SECOND; + } else { + /* No more tables left - mark as done. */ + markKeyAsdone(info); + return; + } + + /* Prefetch the bucket */ + info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]); + prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]); + info->current_entry = NULL; + info->state = PREFETCH_ENTRY; +} + +/* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state. +* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */ +static void prefetchEntry(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + + if (info->current_entry) { + /* We already found an entry in the bucket - move to the next entry */ + info->current_entry = dictGetNext(info->current_entry); + } else { + /* Go to the first entry in the bucket */ + info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]; + } + + if (info->current_entry) { + prefetchAndMoveToNextKey(info->current_entry); + info->state = PREFETCH_VALUE; + } else { + /* No entry found in the bucket - try the bucket in the next table */ + info->state = PREFETCH_BUCKET; + } +} + +/* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state. +* If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */ +static void prefetchValue(KeyPrefetchInfo *info) { + size_t i = batch->cur_idx; + void *value = dictGetVal(info->current_entry); + + if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) { + /* If this is the last element, we assume a hit and don't compare the keys */ + prefetchAndMoveToNextKey(value); + info->state = PREFETCH_VALUE_DATA; + return; + } + + void *current_entry_key = dictGetKey(info->current_entry); + if (batch->keys[i] == current_entry_key || + dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) { + /* If the key is found, prefetch the value */ + prefetchAndMoveToNextKey(value); + info->state = PREFETCH_VALUE_DATA; + } else { + /* Move to the next entry */ + info->state = PREFETCH_ENTRY; + } +} + +/* Prefetch the value data if available. */ +static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_data_func) { + if (get_val_data_func) { + void *value_data = get_val_data_func(dictGetVal(info->current_entry)); + if (value_data) prefetchAndMoveToNextKey(value_data); + } + markKeyAsdone(info); +} + +/* Prefetch dictionary data for an array of keys. +* +* This function takes an array of dictionaries and keys, attempting to bring +* data closer to the L1 cache that might be needed for dictionary operations +* on those keys. +* +* The dictFind algorithm: +* 1. Evaluate the hash of the key +* 2. Access the index in the first table +* 3. Walk the entries linked list until the key is found +* If the key hasn't been found and the dictionary is in the middle of rehashing, +* access the index on the second table and repeat step 3 +* +* dictPrefetch executes the same algorithm as dictFind, but one step at a time +* for each key. Instead of waiting for data to be read from memory, it prefetches +* the data and then moves on to execute the next prefetch for another key. +* +* dicts - An array of dictionaries to prefetch data from. +* get_val_data_func - A callback function that dictPrefetch can invoke +* to bring the key's value data closer to the L1 cache as well. +*/ +static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) { + initBatchInfo(dicts); + KeyPrefetchInfo *info; + while ((info = getNextPrefetchInfo())) { + switch (info->state) { + case PREFETCH_BUCKET: prefetchBucket(info); break; + case PREFETCH_ENTRY: prefetchEntry(info); break; + case PREFETCH_VALUE: prefetchValue(info); break; + case PREFETCH_VALUE_DATA: prefetchValueData(info, get_val_data_func); break; + default: serverPanic("Unknown prefetch state %d", info->state); + } + } +} + +/* Helper function to get the value pointer of an object. */ +static void *getObjectValuePtr(const void *val) { + robj *o = (robj *)val; + return (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) ? o->ptr : NULL; +} + +void resetCommandsBatch(void) { + if (!batch) return; + batch->cur_idx = 0; + batch->keys_done = 0; + batch->key_count = 0; + batch->client_count = 0; + batch->executed_commands = 0; +} + +/* Prefetch command-related data: +* 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache. +* 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */ +void prefetchCommands(void) { + /* Prefetch argv's for all clients */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + /* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */ + for (int j = 1; j < c->argc; j++) { + redis_prefetch(c->argv[j]); + } + } + + /* Prefetch the argv->ptr if required */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (!c || c->argc <= 1) continue; + for (int j = 1; j < c->argc; j++) { + if (c->argv[j]->encoding == OBJ_ENCODING_RAW) { + redis_prefetch(c->argv[j]->ptr); + } + } + } + + /* Get the keys ptrs - we do it here after the key obj was prefetched. */ + for (size_t i = 0; i < batch->key_count; i++) { + batch->keys[i] = ((robj *)batch->keys[i])->ptr; + } + + /* Prefetch dict keys for all commands. Prefetching is beneficial only if there are more than one key. */ + if (batch->key_count > 1) { + server.stat_total_prefetch_batches++; + /* Prefetch keys from the main dict */ + dictPrefetch(batch->keys_dicts, getObjectValuePtr); + /* Prefetch keys from the expires dict - no value data to prefetch */ + dictPrefetch(batch->expire_dicts, NULL); + } +} + +/* Processes all the prefetched commands in the current batch. */ +void processClientsCommandsBatch(void) { + if (!batch || batch->client_count == 0) return; + + /* If executed_commands is not 0, + * it means that we are in the middle of processing a batch and this is a recursive call */ + if (batch->executed_commands == 0) { + prefetchCommands(); + } + + /* Process the commands */ + for (size_t i = 0; i < batch->client_count; i++) { + client *c = batch->clients[i]; + if (c == NULL) continue; + + /* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */ + batch->clients[i] = NULL; + batch->executed_commands++; + if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c); + } + + resetCommandsBatch(); + + /* Handle the case where the max prefetch size has been changed. */ + if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) { + onMaxBatchSizeChange(); + } +} + +/* Adds the client's command to the current batch. + * + * Returns C_OK if the command was added successfully, C_ERR otherwise. */ +int addCommandToBatch(client *c) { + if (!batch) return C_ERR; + + /* If the batch is full, process it. + * We also check the client count to handle cases where + * no keys exist for the clients' commands. */ + if (batch->client_count == batch->max_prefetch_size || batch->key_count == batch->max_prefetch_size) { + return C_ERR; + } + + batch->clients[batch->client_count++] = c; + + /* Get command's keys positions */ + if (c->iolookedcmd) { + getKeysResult result = GETKEYS_RESULT_INIT; + int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result); + for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) { + batch->keys[batch->key_count] = c->argv[result.keys[i].pos]; + batch->slots[batch->key_count] = c->slot > 0 ? c->slot : 0; + batch->keys_dicts[batch->key_count] = kvstoreGetDict(c->db->keys, batch->slots[batch->key_count]); + batch->expire_dicts[batch->key_count] = kvstoreGetDict(c->db->expires, batch->slots[batch->key_count]); + batch->key_count++; + } + getKeysFreeResult(&result); + } + + return C_OK; +} + +/* Removes the given client from the pending prefetch batch, if present. */ +void removeClientFromPendingCommandsBatch(client *c) { + if (!batch) return; + + for (size_t i = 0; i < batch->client_count; i++) { + if (batch->clients[i] == c) { + batch->clients[i] = NULL; + return; + } + } +} diff --git a/src/memory_prefetch.h b/src/memory_prefetch.h new file mode 100644 index 000000000..5ab5b3c5a --- /dev/null +++ b/src/memory_prefetch.h @@ -0,0 +1,12 @@ +#ifndef MEMORY_PREFETCH_H +#define MEMORY_PREFETCH_H + +struct client; + +void prefetchCommandsBatchInit(void); +int addCommandToBatch(struct client *c); +void removeClientFromPendingCommandsBatch(struct client *c); +void resetCommandsBatch(void); +void prefetchCommands(void); + +#endif /* MEMORY_PREFETCH_H */ diff --git a/src/networking.c b/src/networking.c index eb0b389eb..72c96ec76 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1544,6 +1544,8 @@ void unlinkClient(client *c) { c->client_list_node = NULL; } + removeClientFromPendingCommandsBatch(c); + /* Check if this is a replica waiting for diskless replication (rdb pipe), * in which case it needs to be cleaned from that list */ if (c->flags & CLIENT_SLAVE && diff --git a/src/server.c b/src/server.c index 364227bd6..3abffa0a5 100644 --- a/src/server.c +++ b/src/server.c @@ -2174,6 +2174,7 @@ void initServerConfig(void) { server.page_size = sysconf(_SC_PAGESIZE); server.pause_cron = 0; server.dict_resizing = 1; + server.prefetch_batch_max_size = 16; server.latency_tracking_info_percentiles_len = 3; server.latency_tracking_info_percentiles = zmalloc(sizeof(double)*(server.latency_tracking_info_percentiles_len)); @@ -2663,6 +2664,8 @@ void resetServerStats(void) { server.stat_reply_buffer_shrinks = 0; server.stat_reply_buffer_expands = 0; server.stat_cluster_incompatible_ops = 0; + server.stat_total_prefetch_batches = 0; + server.stat_total_prefetch_entries = 0; memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM); server.el_cmd_cnt_max = 0; lazyfreeResetStats(); diff --git a/src/server.h b/src/server.h index 1fb999c7a..7807a2af4 100644 --- a/src/server.h +++ b/src/server.h @@ -63,6 +63,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "rax.h" /* Radix tree */ #include "connection.h" /* Connection abstraction */ #include "eventnotifier.h" /* Event notification */ +#include "memory_prefetch.h" #define REDISMODULE_CORE 1 typedef struct redisObject robj; @@ -1796,6 +1797,10 @@ struct redisServer { int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */ int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */ + int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */ + long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */ + long long stat_total_prefetch_batches; /* Total number of prefetched batches */ + /* RDB / AOF loading information */ volatile sig_atomic_t loading; /* We are loading data from disk if true */ volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */