mirror of https://mirror.osredm.com/root/redis.git
memory prefetch
This commit is contained in:
parent
a3f1d09a7d
commit
c472d7f606
|
@ -363,7 +363,7 @@ endif
|
|||
|
||||
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
|
||||
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
|
||||
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
|
||||
REDIS_SERVER_OBJ=threads_mngr.o memory_prefetch.o adlist.o quicklist.o ae.o anet.o dict.o ebuckets.o eventnotifier.o iothread.o mstr.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
|
||||
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
|
||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
|
||||
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)
|
||||
|
|
|
@ -115,9 +115,11 @@
|
|||
#endif
|
||||
|
||||
#if HAS_BUILTIN_PREFETCH
|
||||
#define redis_prefetch(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */
|
||||
#define redis_prefetch_read(addr) __builtin_prefetch(addr, 0, 3) /* Read with high locality */
|
||||
#define redis_prefetch_write(addr) __builtin_prefetch(addr, 1, 3) /* Write with high locality */
|
||||
#else
|
||||
#define redis_prefetch(addr) ((void)(addr)) /* No-op if unsupported */
|
||||
#define redis_prefetch_read(addr) ((void)(addr)) /* No-op if unsupported */
|
||||
#define redis_prefetch_write(addr) ((void)(addr)) /* No-op if unsupported */
|
||||
#endif
|
||||
|
|
|
@ -66,7 +66,6 @@ static void _dictShrinkIfNeeded(dict *d);
|
|||
static void _dictRehashStepIfNeeded(dict *d, uint64_t visitedIdx);
|
||||
static signed char _dictNextExp(unsigned long size);
|
||||
static int _dictInit(dict *d, dictType *type);
|
||||
static dictEntry *dictGetNext(const dictEntry *de);
|
||||
static dictEntry **dictGetNextRef(dictEntry *de);
|
||||
static void dictSetNext(dictEntry *de, dictEntry *next);
|
||||
static int dictDefaultCompare(dict *d, const void *key1, const void *key2);
|
||||
|
@ -960,7 +959,7 @@ double *dictGetDoubleValPtr(dictEntry *de) {
|
|||
|
||||
/* Returns the 'next' field of the entry or NULL if the entry doesn't have a
|
||||
* 'next' field. */
|
||||
static dictEntry *dictGetNext(const dictEntry *de) {
|
||||
dictEntry *dictGetNext(const dictEntry *de) {
|
||||
if (entryIsKey(de)) return NULL; /* there's no next */
|
||||
if (entryIsNoValue(de)) return decodeEntryNoValue(de)->next;
|
||||
return de->next;
|
||||
|
|
|
@ -185,6 +185,7 @@ typedef struct {
|
|||
#define dictPauseAutoResize(d) ((d)->pauseAutoResize++)
|
||||
#define dictResumeAutoResize(d) ((d)->pauseAutoResize--)
|
||||
#define dictUseStoredKeyApi(d, flag) ((d)->useStoredKeyApi = (flag))
|
||||
#define dictBucketHashKey(d, key) ((d)->type->hashFunction(key))
|
||||
|
||||
/* If our unsigned long type can store a 64 bit number, use a 64 bit PRNG. */
|
||||
#if ULONG_MAX >= 0xffffffffffffffff
|
||||
|
@ -247,6 +248,7 @@ void dictInitIterator(dictIterator *iter, dict *d);
|
|||
void dictInitSafeIterator(dictIterator *iter, dict *d);
|
||||
void dictResetIterator(dictIterator *iter);
|
||||
dictEntry *dictNext(dictIterator *iter);
|
||||
dictEntry *dictGetNext(const dictEntry *de);
|
||||
void dictReleaseIterator(dictIterator *iter);
|
||||
dictEntry *dictGetRandomKey(dict *d);
|
||||
dictEntry *dictGetFairRandomKey(dict *d);
|
||||
|
|
|
@ -309,6 +309,26 @@ int sendPendingClientsToIOThreads(void) {
|
|||
return processed;
|
||||
}
|
||||
|
||||
int prefetchIOThreadCommand(IOThread *t) {
|
||||
int len = listLength(mainThreadProcessingClients[t->id]);
|
||||
if (len < 2) return 0;
|
||||
|
||||
int iterate = 0;
|
||||
int prefetch = len < server.prefetch_batch_max_size*2 ? len :
|
||||
server.prefetch_batch_max_size;
|
||||
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(mainThreadProcessingClients[t->id], &li);
|
||||
while((ln = listNext(&li)) && iterate++ < prefetch) {
|
||||
client *c = listNodeValue(ln);
|
||||
addCommandToBatch(c);
|
||||
}
|
||||
prefetchCommands();
|
||||
|
||||
return prefetch;
|
||||
}
|
||||
|
||||
extern int ProcessingEventsWhileBlocked;
|
||||
|
||||
/* The main thread processes the clients from IO threads, these clients may have
|
||||
|
@ -321,9 +341,23 @@ extern int ProcessingEventsWhileBlocked;
|
|||
* process new events, if the clients with fired events from the same io thread,
|
||||
* it may call this function reentrantly. */
|
||||
void processClientsFromIOThread(IOThread *t) {
|
||||
/* Get the list of clients to process. */
|
||||
pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]);
|
||||
pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
|
||||
size_t processed = listLength(mainThreadProcessingClients[t->id]);
|
||||
if (processed == 0) return;
|
||||
|
||||
int prefetch = 0;
|
||||
listNode *node = NULL;
|
||||
|
||||
while (listLength(mainThreadProcessingClients[t->id])) {
|
||||
|
||||
if (server.prefetch_batch_max_size) {
|
||||
if (prefetch <= 0) prefetch = prefetchIOThreadCommand(t);
|
||||
if (--prefetch <= 0) resetCommandsBatch();
|
||||
}
|
||||
|
||||
/* Each time we pop up only the first client to process to guarantee
|
||||
* reentrancy safety. */
|
||||
if (node) zfree(node);
|
||||
|
@ -386,6 +420,7 @@ void processClientsFromIOThread(IOThread *t) {
|
|||
node = NULL;
|
||||
}
|
||||
if (node) zfree(node);
|
||||
resetCommandsBatch();
|
||||
|
||||
/* Trigger the io thread to handle these clients ASAP to make them processed
|
||||
* in parallel.
|
||||
|
@ -561,6 +596,8 @@ void initThreadedIO(void) {
|
|||
exit(1);
|
||||
}
|
||||
|
||||
prefetchCommandsBatchInit();
|
||||
|
||||
/* Spawn and initialize the I/O threads. */
|
||||
for (int i = 1; i < server.io_threads_num; i++) {
|
||||
IOThread *t = &IOThreads[i];
|
||||
|
|
|
@ -84,7 +84,7 @@ typedef struct {
|
|||
/**********************************/
|
||||
|
||||
/* Get the dictionary pointer based on dict-index. */
|
||||
static dict *kvstoreGetDict(kvstore *kvs, int didx) {
|
||||
dict *kvstoreGetDict(kvstore *kvs, int didx) {
|
||||
return kvs->dicts[didx];
|
||||
}
|
||||
|
||||
|
|
|
@ -101,6 +101,7 @@ void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val);
|
|||
dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index);
|
||||
void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index);
|
||||
int kvstoreDictDelete(kvstore *kvs, int didx, const void *key);
|
||||
dict *kvstoreGetDict(kvstore *kvs, int didx);
|
||||
kvstoreDictMetadata *kvstoreGetDictMetadata(kvstore *kvs, int didx);
|
||||
kvstoreMetadata *kvstoreGetMetadata(kvstore *kvs);
|
||||
|
||||
|
|
|
@ -0,0 +1,407 @@
|
|||
/*
|
||||
* Copyright Valkey Contributors.
|
||||
* All rights reserved.
|
||||
* SPDX-License-Identifier: BSD 3-Clause
|
||||
*
|
||||
* This file utilizes prefetching keys and data for multiple commands in a batch,
|
||||
* to improve performance by amortizing memory access costs across multiple operations.
|
||||
*/
|
||||
|
||||
#include "memory_prefetch.h"
|
||||
#include "server.h"
|
||||
#include "dict.h"
|
||||
|
||||
typedef enum { HT_IDX_FIRST = 0, HT_IDX_SECOND = 1, HT_IDX_INVALID = -1 } HashTableIndex;
|
||||
|
||||
typedef enum {
|
||||
PREFETCH_BUCKET, /* Initial state, determines which hash table to use and prefetch the table's bucket */
|
||||
PREFETCH_ENTRY, /* prefetch entries associated with the given key's hash */
|
||||
PREFETCH_VALUE, /* prefetch the value object of the entry found in the previous step */
|
||||
PREFETCH_VALUE_DATA, /* prefetch the value object's data (if applicable) */
|
||||
PREFETCH_DONE /* Indicates that prefetching for this key is complete */
|
||||
} PrefetchState;
|
||||
|
||||
|
||||
/************************************ State machine diagram for the prefetch operation. ********************************
|
||||
│
|
||||
start
|
||||
│
|
||||
┌────────▼─────────┐
|
||||
┌─────────►│ PREFETCH_BUCKET ├────►────────┐
|
||||
│ └────────┬─────────┘ no more tables -> done
|
||||
| bucket|found |
|
||||
│ | │
|
||||
entry not found - goto next table ┌────────▼────────┐ │
|
||||
└────◄─────┤ PREFETCH_ENTRY | ▼
|
||||
┌────────────►└────────┬────────┘ │
|
||||
| Entry│found │
|
||||
│ | │
|
||||
value not found - goto next entry ┌───────▼────────┐ |
|
||||
└───────◄──────┤ PREFETCH_VALUE | ▼
|
||||
└───────┬────────┘ │
|
||||
Value│found │
|
||||
| |
|
||||
┌───────────▼──────────────┐ │
|
||||
│ PREFETCH_VALUE_DATA │ ▼
|
||||
└───────────┬──────────────┘ │
|
||||
| │
|
||||
┌───────-─▼─────────────┐ │
|
||||
│ PREFETCH_DONE │◄────────┘
|
||||
└───────────────────────┘
|
||||
**********************************************************************************************************************/
|
||||
|
||||
typedef void *(*GetValueDataFunc)(const void *val);
|
||||
|
||||
typedef struct KeyPrefetchInfo {
|
||||
PrefetchState state; /* Current state of the prefetch operation */
|
||||
HashTableIndex ht_idx; /* Index of the current hash table (0 or 1 for rehashing) */
|
||||
uint64_t bucket_idx; /* Index of the bucket in the current hash table */
|
||||
uint64_t key_hash; /* Hash value of the key being prefetched */
|
||||
dictEntry *current_entry; /* Pointer to the current entry being processed */
|
||||
} KeyPrefetchInfo;
|
||||
|
||||
/* PrefetchCommandsBatch structure holds the state of the current batch of client commands being processed. */
|
||||
typedef struct PrefetchCommandsBatch {
|
||||
size_t cur_idx; /* Index of the current key being processed */
|
||||
size_t keys_done; /* Number of keys that have been prefetched */
|
||||
size_t key_count; /* Number of keys in the current batch */
|
||||
size_t client_count; /* Number of clients in the current batch */
|
||||
size_t max_prefetch_size; /* Maximum number of keys to prefetch in a batch */
|
||||
size_t executed_commands; /* Number of commands executed in the current batch */
|
||||
int *slots; /* Array of slots for each key */
|
||||
void **keys; /* Array of keys to prefetch in the current batch */
|
||||
client **clients; /* Array of clients in the current batch */
|
||||
dict **keys_dicts; /* Main dict for each key */
|
||||
dict **expire_dicts; /* Expire dict for each key */
|
||||
dict **current_dicts; /* Points to either keys_dicts or expire_dicts */
|
||||
KeyPrefetchInfo *prefetch_info; /* Prefetch info for each key */
|
||||
} PrefetchCommandsBatch;
|
||||
|
||||
static PrefetchCommandsBatch *batch = NULL;
|
||||
|
||||
void freePrefetchCommandsBatch(void) {
|
||||
if (batch == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
zfree(batch->clients);
|
||||
zfree(batch->keys);
|
||||
zfree(batch->keys_dicts);
|
||||
zfree(batch->expire_dicts);
|
||||
zfree(batch->slots);
|
||||
zfree(batch->prefetch_info);
|
||||
zfree(batch);
|
||||
batch = NULL;
|
||||
}
|
||||
|
||||
void prefetchCommandsBatchInit(void) {
|
||||
serverAssert(!batch);
|
||||
size_t max_prefetch_size = server.prefetch_batch_max_size * 2;
|
||||
|
||||
if (max_prefetch_size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
batch = zcalloc(sizeof(PrefetchCommandsBatch));
|
||||
batch->max_prefetch_size = max_prefetch_size;
|
||||
batch->clients = zcalloc(max_prefetch_size * sizeof(client *));
|
||||
batch->keys = zcalloc(max_prefetch_size * sizeof(void *));
|
||||
batch->keys_dicts = zcalloc(max_prefetch_size * sizeof(dict *));
|
||||
batch->expire_dicts = zcalloc(max_prefetch_size * sizeof(dict *));
|
||||
batch->slots = zcalloc(max_prefetch_size * sizeof(int));
|
||||
batch->prefetch_info = zcalloc(max_prefetch_size * sizeof(KeyPrefetchInfo));
|
||||
}
|
||||
|
||||
void onMaxBatchSizeChange(void) {
|
||||
if (batch && batch->client_count > 0) {
|
||||
/* We need to process the current batch before updating the size */
|
||||
return;
|
||||
}
|
||||
|
||||
freePrefetchCommandsBatch();
|
||||
prefetchCommandsBatchInit();
|
||||
}
|
||||
|
||||
/* Prefetch the given pointer and move to the next key in the batch. */
|
||||
static void prefetchAndMoveToNextKey(void *addr) {
|
||||
redis_prefetch(addr);
|
||||
/* While the prefetch is in progress, we can continue to the next key */
|
||||
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
|
||||
}
|
||||
|
||||
static void markKeyAsdone(KeyPrefetchInfo *info) {
|
||||
info->state = PREFETCH_DONE;
|
||||
server.stat_total_prefetch_entries++;
|
||||
batch->keys_done++;
|
||||
}
|
||||
|
||||
/* Returns the next KeyPrefetchInfo structure that needs to be processed. */
|
||||
static KeyPrefetchInfo *getNextPrefetchInfo(void) {
|
||||
size_t start_idx = batch->cur_idx;
|
||||
do {
|
||||
KeyPrefetchInfo *info = &batch->prefetch_info[batch->cur_idx];
|
||||
if (info->state != PREFETCH_DONE) return info;
|
||||
batch->cur_idx = (batch->cur_idx + 1) % batch->key_count;
|
||||
} while (batch->cur_idx != start_idx);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void initBatchInfo(dict **dicts) {
|
||||
batch->current_dicts = dicts;
|
||||
|
||||
/* Initialize the prefetch info */
|
||||
for (size_t i = 0; i < batch->key_count; i++) {
|
||||
KeyPrefetchInfo *info = &batch->prefetch_info[i];
|
||||
if (!batch->current_dicts[i] || dictSize(batch->current_dicts[i]) == 0) {
|
||||
info->state = PREFETCH_DONE;
|
||||
batch->keys_done++;
|
||||
continue;
|
||||
}
|
||||
info->ht_idx = HT_IDX_INVALID;
|
||||
info->current_entry = NULL;
|
||||
info->state = PREFETCH_BUCKET;
|
||||
info->key_hash = dictBucketHashKey(batch->current_dicts[i], batch->keys[i]);
|
||||
}
|
||||
}
|
||||
|
||||
/* Prefetch the bucket of the next hash table index.
|
||||
* If no tables are left, move to the PREFETCH_DONE state. */
|
||||
static void prefetchBucket(KeyPrefetchInfo *info) {
|
||||
size_t i = batch->cur_idx;
|
||||
|
||||
/* Determine which hash table to use */
|
||||
if (info->ht_idx == HT_IDX_INVALID) {
|
||||
info->ht_idx = HT_IDX_FIRST;
|
||||
} else if (info->ht_idx == HT_IDX_FIRST && dictIsRehashing(batch->current_dicts[i])) {
|
||||
info->ht_idx = HT_IDX_SECOND;
|
||||
} else {
|
||||
/* No more tables left - mark as done. */
|
||||
markKeyAsdone(info);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Prefetch the bucket */
|
||||
info->bucket_idx = info->key_hash & DICTHT_SIZE_MASK(batch->current_dicts[i]->ht_size_exp[info->ht_idx]);
|
||||
prefetchAndMoveToNextKey(&batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx]);
|
||||
info->current_entry = NULL;
|
||||
info->state = PREFETCH_ENTRY;
|
||||
}
|
||||
|
||||
/* Prefetch the next entry in the bucket and move to the PREFETCH_VALUE state.
|
||||
* If no more entries in the bucket, move to the PREFETCH_BUCKET state to look at the next table. */
|
||||
static void prefetchEntry(KeyPrefetchInfo *info) {
|
||||
size_t i = batch->cur_idx;
|
||||
|
||||
if (info->current_entry) {
|
||||
/* We already found an entry in the bucket - move to the next entry */
|
||||
info->current_entry = dictGetNext(info->current_entry);
|
||||
} else {
|
||||
/* Go to the first entry in the bucket */
|
||||
info->current_entry = batch->current_dicts[i]->ht_table[info->ht_idx][info->bucket_idx];
|
||||
}
|
||||
|
||||
if (info->current_entry) {
|
||||
prefetchAndMoveToNextKey(info->current_entry);
|
||||
info->state = PREFETCH_VALUE;
|
||||
} else {
|
||||
/* No entry found in the bucket - try the bucket in the next table */
|
||||
info->state = PREFETCH_BUCKET;
|
||||
}
|
||||
}
|
||||
|
||||
/* Prefetch the entry's value. If the value is found, move to the PREFETCH_VALUE_DATA state.
|
||||
* If the value is not found, move to the PREFETCH_ENTRY state to look at the next entry in the bucket. */
|
||||
static void prefetchValue(KeyPrefetchInfo *info) {
|
||||
size_t i = batch->cur_idx;
|
||||
void *value = dictGetVal(info->current_entry);
|
||||
|
||||
if (dictGetNext(info->current_entry) == NULL && !dictIsRehashing(batch->current_dicts[i])) {
|
||||
/* If this is the last element, we assume a hit and don't compare the keys */
|
||||
prefetchAndMoveToNextKey(value);
|
||||
info->state = PREFETCH_VALUE_DATA;
|
||||
return;
|
||||
}
|
||||
|
||||
void *current_entry_key = dictGetKey(info->current_entry);
|
||||
if (batch->keys[i] == current_entry_key ||
|
||||
dictCompareKeys(batch->current_dicts[i], batch->keys[i], current_entry_key)) {
|
||||
/* If the key is found, prefetch the value */
|
||||
prefetchAndMoveToNextKey(value);
|
||||
info->state = PREFETCH_VALUE_DATA;
|
||||
} else {
|
||||
/* Move to the next entry */
|
||||
info->state = PREFETCH_ENTRY;
|
||||
}
|
||||
}
|
||||
|
||||
/* Prefetch the value data if available. */
|
||||
static void prefetchValueData(KeyPrefetchInfo *info, GetValueDataFunc get_val_data_func) {
|
||||
if (get_val_data_func) {
|
||||
void *value_data = get_val_data_func(dictGetVal(info->current_entry));
|
||||
if (value_data) prefetchAndMoveToNextKey(value_data);
|
||||
}
|
||||
markKeyAsdone(info);
|
||||
}
|
||||
|
||||
/* Prefetch dictionary data for an array of keys.
|
||||
*
|
||||
* This function takes an array of dictionaries and keys, attempting to bring
|
||||
* data closer to the L1 cache that might be needed for dictionary operations
|
||||
* on those keys.
|
||||
*
|
||||
* The dictFind algorithm:
|
||||
* 1. Evaluate the hash of the key
|
||||
* 2. Access the index in the first table
|
||||
* 3. Walk the entries linked list until the key is found
|
||||
* If the key hasn't been found and the dictionary is in the middle of rehashing,
|
||||
* access the index on the second table and repeat step 3
|
||||
*
|
||||
* dictPrefetch executes the same algorithm as dictFind, but one step at a time
|
||||
* for each key. Instead of waiting for data to be read from memory, it prefetches
|
||||
* the data and then moves on to execute the next prefetch for another key.
|
||||
*
|
||||
* dicts - An array of dictionaries to prefetch data from.
|
||||
* get_val_data_func - A callback function that dictPrefetch can invoke
|
||||
* to bring the key's value data closer to the L1 cache as well.
|
||||
*/
|
||||
static void dictPrefetch(dict **dicts, GetValueDataFunc get_val_data_func) {
|
||||
initBatchInfo(dicts);
|
||||
KeyPrefetchInfo *info;
|
||||
while ((info = getNextPrefetchInfo())) {
|
||||
switch (info->state) {
|
||||
case PREFETCH_BUCKET: prefetchBucket(info); break;
|
||||
case PREFETCH_ENTRY: prefetchEntry(info); break;
|
||||
case PREFETCH_VALUE: prefetchValue(info); break;
|
||||
case PREFETCH_VALUE_DATA: prefetchValueData(info, get_val_data_func); break;
|
||||
default: serverPanic("Unknown prefetch state %d", info->state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Helper function to get the value pointer of an object. */
|
||||
static void *getObjectValuePtr(const void *val) {
|
||||
robj *o = (robj *)val;
|
||||
return (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_RAW) ? o->ptr : NULL;
|
||||
}
|
||||
|
||||
void resetCommandsBatch(void) {
|
||||
if (!batch) return;
|
||||
batch->cur_idx = 0;
|
||||
batch->keys_done = 0;
|
||||
batch->key_count = 0;
|
||||
batch->client_count = 0;
|
||||
batch->executed_commands = 0;
|
||||
}
|
||||
|
||||
/* Prefetch command-related data:
|
||||
* 1. Prefetch the command arguments allocated by the I/O thread to bring them closer to the L1 cache.
|
||||
* 2. Prefetch the keys and values for all commands in the current batch from the main and expires dictionaries. */
|
||||
void prefetchCommands(void) {
|
||||
/* Prefetch argv's for all clients */
|
||||
for (size_t i = 0; i < batch->client_count; i++) {
|
||||
client *c = batch->clients[i];
|
||||
if (!c || c->argc <= 1) continue;
|
||||
/* Skip prefetching first argv (cmd name) it was already looked up by the I/O thread. */
|
||||
for (int j = 1; j < c->argc; j++) {
|
||||
redis_prefetch(c->argv[j]);
|
||||
}
|
||||
}
|
||||
|
||||
/* Prefetch the argv->ptr if required */
|
||||
for (size_t i = 0; i < batch->client_count; i++) {
|
||||
client *c = batch->clients[i];
|
||||
if (!c || c->argc <= 1) continue;
|
||||
for (int j = 1; j < c->argc; j++) {
|
||||
if (c->argv[j]->encoding == OBJ_ENCODING_RAW) {
|
||||
redis_prefetch(c->argv[j]->ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Get the keys ptrs - we do it here after the key obj was prefetched. */
|
||||
for (size_t i = 0; i < batch->key_count; i++) {
|
||||
batch->keys[i] = ((robj *)batch->keys[i])->ptr;
|
||||
}
|
||||
|
||||
/* Prefetch dict keys for all commands. Prefetching is beneficial only if there are more than one key. */
|
||||
if (batch->key_count > 1) {
|
||||
server.stat_total_prefetch_batches++;
|
||||
/* Prefetch keys from the main dict */
|
||||
dictPrefetch(batch->keys_dicts, getObjectValuePtr);
|
||||
/* Prefetch keys from the expires dict - no value data to prefetch */
|
||||
dictPrefetch(batch->expire_dicts, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
/* Processes all the prefetched commands in the current batch. */
|
||||
void processClientsCommandsBatch(void) {
|
||||
if (!batch || batch->client_count == 0) return;
|
||||
|
||||
/* If executed_commands is not 0,
|
||||
* it means that we are in the middle of processing a batch and this is a recursive call */
|
||||
if (batch->executed_commands == 0) {
|
||||
prefetchCommands();
|
||||
}
|
||||
|
||||
/* Process the commands */
|
||||
for (size_t i = 0; i < batch->client_count; i++) {
|
||||
client *c = batch->clients[i];
|
||||
if (c == NULL) continue;
|
||||
|
||||
/* Set the client to null immediately to avoid accessing it again recursively when ProcessingEventsWhileBlocked */
|
||||
batch->clients[i] = NULL;
|
||||
batch->executed_commands++;
|
||||
if (processPendingCommandAndInputBuffer(c) != C_ERR) beforeNextClient(c);
|
||||
}
|
||||
|
||||
resetCommandsBatch();
|
||||
|
||||
/* Handle the case where the max prefetch size has been changed. */
|
||||
if (batch->max_prefetch_size != (size_t)server.prefetch_batch_max_size) {
|
||||
onMaxBatchSizeChange();
|
||||
}
|
||||
}
|
||||
|
||||
/* Adds the client's command to the current batch.
|
||||
*
|
||||
* Returns C_OK if the command was added successfully, C_ERR otherwise. */
|
||||
int addCommandToBatch(client *c) {
|
||||
if (!batch) return C_ERR;
|
||||
|
||||
/* If the batch is full, process it.
|
||||
* We also check the client count to handle cases where
|
||||
* no keys exist for the clients' commands. */
|
||||
if (batch->client_count == batch->max_prefetch_size || batch->key_count == batch->max_prefetch_size) {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
batch->clients[batch->client_count++] = c;
|
||||
|
||||
/* Get command's keys positions */
|
||||
if (c->iolookedcmd) {
|
||||
getKeysResult result = GETKEYS_RESULT_INIT;
|
||||
int num_keys = getKeysFromCommand(c->iolookedcmd, c->argv, c->argc, &result);
|
||||
for (int i = 0; i < num_keys && batch->key_count < batch->max_prefetch_size; i++) {
|
||||
batch->keys[batch->key_count] = c->argv[result.keys[i].pos];
|
||||
batch->slots[batch->key_count] = c->slot > 0 ? c->slot : 0;
|
||||
batch->keys_dicts[batch->key_count] = kvstoreGetDict(c->db->keys, batch->slots[batch->key_count]);
|
||||
batch->expire_dicts[batch->key_count] = kvstoreGetDict(c->db->expires, batch->slots[batch->key_count]);
|
||||
batch->key_count++;
|
||||
}
|
||||
getKeysFreeResult(&result);
|
||||
}
|
||||
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Removes the given client from the pending prefetch batch, if present. */
|
||||
void removeClientFromPendingCommandsBatch(client *c) {
|
||||
if (!batch) return;
|
||||
|
||||
for (size_t i = 0; i < batch->client_count; i++) {
|
||||
if (batch->clients[i] == c) {
|
||||
batch->clients[i] = NULL;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
#ifndef MEMORY_PREFETCH_H
|
||||
#define MEMORY_PREFETCH_H
|
||||
|
||||
struct client;
|
||||
|
||||
void prefetchCommandsBatchInit(void);
|
||||
int addCommandToBatch(struct client *c);
|
||||
void removeClientFromPendingCommandsBatch(struct client *c);
|
||||
void resetCommandsBatch(void);
|
||||
void prefetchCommands(void);
|
||||
|
||||
#endif /* MEMORY_PREFETCH_H */
|
|
@ -1544,6 +1544,8 @@ void unlinkClient(client *c) {
|
|||
c->client_list_node = NULL;
|
||||
}
|
||||
|
||||
removeClientFromPendingCommandsBatch(c);
|
||||
|
||||
/* Check if this is a replica waiting for diskless replication (rdb pipe),
|
||||
* in which case it needs to be cleaned from that list */
|
||||
if (c->flags & CLIENT_SLAVE &&
|
||||
|
|
|
@ -2174,6 +2174,7 @@ void initServerConfig(void) {
|
|||
server.page_size = sysconf(_SC_PAGESIZE);
|
||||
server.pause_cron = 0;
|
||||
server.dict_resizing = 1;
|
||||
server.prefetch_batch_max_size = 16;
|
||||
|
||||
server.latency_tracking_info_percentiles_len = 3;
|
||||
server.latency_tracking_info_percentiles = zmalloc(sizeof(double)*(server.latency_tracking_info_percentiles_len));
|
||||
|
@ -2663,6 +2664,8 @@ void resetServerStats(void) {
|
|||
server.stat_reply_buffer_shrinks = 0;
|
||||
server.stat_reply_buffer_expands = 0;
|
||||
server.stat_cluster_incompatible_ops = 0;
|
||||
server.stat_total_prefetch_batches = 0;
|
||||
server.stat_total_prefetch_entries = 0;
|
||||
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
|
||||
server.el_cmd_cnt_max = 0;
|
||||
lazyfreeResetStats();
|
||||
|
|
|
@ -63,6 +63,7 @@ typedef long long ustime_t; /* microsecond time type. */
|
|||
#include "rax.h" /* Radix tree */
|
||||
#include "connection.h" /* Connection abstraction */
|
||||
#include "eventnotifier.h" /* Event notification */
|
||||
#include "memory_prefetch.h"
|
||||
|
||||
#define REDISMODULE_CORE 1
|
||||
typedef struct redisObject robj;
|
||||
|
@ -1796,6 +1797,10 @@ struct redisServer {
|
|||
int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */
|
||||
int enable_module_cmd; /* Enable MODULE commands, see PROTECTED_ACTION_ALLOWED_* */
|
||||
|
||||
int prefetch_batch_max_size; /* Maximum number of keys to prefetch in a single batch */
|
||||
long long stat_total_prefetch_entries; /* Total number of prefetched dict entries */
|
||||
long long stat_total_prefetch_batches; /* Total number of prefetched batches */
|
||||
|
||||
/* RDB / AOF loading information */
|
||||
volatile sig_atomic_t loading; /* We are loading data from disk if true */
|
||||
volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */
|
||||
|
|
Loading…
Reference in New Issue