Document threading model in a top comment.

This commit is contained in:
antirez 2025-03-27 08:31:15 +01:00
parent 3d31fc3bee
commit 63057253d8
1 changed files with 68 additions and 11 deletions

79
vset.c
View File

@ -3,6 +3,60 @@
*
* Copyright(C) 2024-Present, Redis Ltd. All Rights Reserved.
* Originally authored by: Salvatore Sanfilippo.
*
* ======================== Understand threading model =========================
* This code implements threaded operarations for two of the commands:
*
* 1. VSIM, by default.
* 2. VADD, if the CAS option is specified.
*
* Note that even if the second operation, VADD, is a write operation, only
* the neighbors collection for the new node is performed in a thread: then,
* the actual insert is performed in the reply callback VADD_CASReply(),
* which is executed in the main thread.
*
* Threaded operations need us to protect various operations with mutexes,
* even if a certain degree of protection is already provided by the HNSW
* library. Here are a few very important things about this implementation
* and the way locking is performed.
*
* 1. All the write operations are performed in the main Redis thread:
* this also include VADD_CASReply() callback, that is called by Redis
* internals only in the context of the main thread. However the HNSW
* library allows background threads in hnsw_search() (VSIM) to modify
* nodes metadata to speedup search (to understand if a node was already
* visited), but this only happens after acquiring a specific lock
* for a given "read slot".
*
* 2. We use a global lock for each Vector Set object, called "in_use". This
* lock is a read-write lock, and is acquired in read mode by all the
* threads that perform reads in the background. It is only acquired in
* write mode by vectorSetWaitAllBackgroundClients(): the function acquires
* the lock and immediately releases it, with the effect of waiting all the
* background threads still running from ending their execution.
*
* Note that no ther thread can be spawned, since we only call
* vectorSetWaitAllBackgroundClients() from the main Redis thread, that
* is also the only thread spawning other threads.
*
* vectorSetWaitAllBackgroundClients() is used in two ways:
* A) When we need to delete a vector set because of (DEL) or other
* operations destroying the object, we need to wait that all the
* background threads working with this object finished their work.
* B) When we modify the HNSW nodes bypassing the normal locking
* provided by the HNSW library. This only happens when we update
* an existing node attribute so far, in VSETATTR and when we call
* VADD to update a node with the SETATTR option.
*
* 3. Often during read operations performed by Redis commands in the
* main thread (VCARD, VEMB, VRANDMEMBER, ...) we don't acquire any
* lock at all. The commands run in the main Redis thread, we can only
* have, at the same time, background reads against the same data
* structure. Note that VSIM_thread() and VADD_thread() still modify the
* read slot metadata, that is node->visited_epoch[slot], but as long as
* our read commands running in the main thread don't need to use
* hnsw_search() or other HNSW functions using the visited epochs slots
* we are safe.
*/
#define _DEFAULT_SOURCE
@ -240,9 +294,9 @@ int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange
nv->attrib = attrib;
node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,nv,ef);
if (node == NULL) {
// XXX Technically in Redis-land we don't have out of memories as we
// crash. However the HNSW library may fail for error in the locking
// libc call. Probably impossible in practical terms.
// XXX Technically in Redis-land we don't have out of memory, as we
// crash on OOM. However the HNSW library may fail for error in the
// locking libc call. Probably impossible in practical terms.
RedisModule_Free(nv);
return 0;
}
@ -261,26 +315,29 @@ int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange
float *parseVector(RedisModuleString **argv, int argc, int start_idx,
size_t *dim, uint32_t *reduce_dim, int *consumed_args)
{
int consumed = 0; // Argumnets consumed
int consumed = 0; // Argumnets consumed.
/* Check for REDUCE option first */
/* Check for REDUCE option first. */
if (reduce_dim) *reduce_dim = 0;
if (reduce_dim && argc > start_idx + 2 &&
!strcasecmp(RedisModule_StringPtrLen(argv[start_idx],NULL),"REDUCE"))
{
long long rdim;
if (RedisModule_StringToLongLong(argv[start_idx+1],&rdim) != REDISMODULE_OK
|| rdim <= 0) return NULL;
if (RedisModule_StringToLongLong(argv[start_idx+1],&rdim)
!= REDISMODULE_OK || rdim <= 0)
{
return NULL;
}
if (reduce_dim) *reduce_dim = rdim;
start_idx += 2; // Skip REDUCE and its argument
start_idx += 2; // Skip REDUCE and its argument.
consumed += 2;
}
/* Now parse the vector format as before */
/* Now parse the vector format as before. */
float *vec = NULL;
if (!strcasecmp(RedisModule_StringPtrLen(argv[start_idx],NULL),"FP32")) {
if (argc < start_idx + 2) return NULL; // Need FP32 + vector + value
if (argc < start_idx + 2) return NULL; // Need FP32 + vector + value.
size_t vec_raw_len;
const char *blob = RedisModule_StringPtrLen(argv[start_idx+1],&vec_raw_len);
if (vec_raw_len % 4 || vec_raw_len < 4) return NULL;
@ -290,7 +347,7 @@ float *parseVector(RedisModuleString **argv, int argc, int start_idx,
memcpy(vec,blob,vec_raw_len);
consumed += 2;
} else if (!strcasecmp(RedisModule_StringPtrLen(argv[start_idx],NULL),"VALUES")) {
if (argc < start_idx + 2) return NULL; // Need at least dimension.
if (argc < start_idx + 2) return NULL; // Need at least dimension..
long long vdim;
if (RedisModule_StringToLongLong(argv[start_idx+1],&vdim) != REDISMODULE_OK
|| vdim < 1) return NULL;