diff --git a/vset.c b/vset.c index 68933c3e7..fea614df6 100644 --- a/vset.c +++ b/vset.c @@ -3,6 +3,60 @@ * * Copyright(C) 2024-Present, Redis Ltd. All Rights Reserved. * Originally authored by: Salvatore Sanfilippo. + * + * ======================== Understand threading model ========================= + * This code implements threaded operarations for two of the commands: + * + * 1. VSIM, by default. + * 2. VADD, if the CAS option is specified. + * + * Note that even if the second operation, VADD, is a write operation, only + * the neighbors collection for the new node is performed in a thread: then, + * the actual insert is performed in the reply callback VADD_CASReply(), + * which is executed in the main thread. + * + * Threaded operations need us to protect various operations with mutexes, + * even if a certain degree of protection is already provided by the HNSW + * library. Here are a few very important things about this implementation + * and the way locking is performed. + * + * 1. All the write operations are performed in the main Redis thread: + * this also include VADD_CASReply() callback, that is called by Redis + * internals only in the context of the main thread. However the HNSW + * library allows background threads in hnsw_search() (VSIM) to modify + * nodes metadata to speedup search (to understand if a node was already + * visited), but this only happens after acquiring a specific lock + * for a given "read slot". + * + * 2. We use a global lock for each Vector Set object, called "in_use". This + * lock is a read-write lock, and is acquired in read mode by all the + * threads that perform reads in the background. It is only acquired in + * write mode by vectorSetWaitAllBackgroundClients(): the function acquires + * the lock and immediately releases it, with the effect of waiting all the + * background threads still running from ending their execution. + * + * Note that no ther thread can be spawned, since we only call + * vectorSetWaitAllBackgroundClients() from the main Redis thread, that + * is also the only thread spawning other threads. + * + * vectorSetWaitAllBackgroundClients() is used in two ways: + * A) When we need to delete a vector set because of (DEL) or other + * operations destroying the object, we need to wait that all the + * background threads working with this object finished their work. + * B) When we modify the HNSW nodes bypassing the normal locking + * provided by the HNSW library. This only happens when we update + * an existing node attribute so far, in VSETATTR and when we call + * VADD to update a node with the SETATTR option. + * + * 3. Often during read operations performed by Redis commands in the + * main thread (VCARD, VEMB, VRANDMEMBER, ...) we don't acquire any + * lock at all. The commands run in the main Redis thread, we can only + * have, at the same time, background reads against the same data + * structure. Note that VSIM_thread() and VADD_thread() still modify the + * read slot metadata, that is node->visited_epoch[slot], but as long as + * our read commands running in the main thread don't need to use + * hnsw_search() or other HNSW functions using the visited epochs slots + * we are safe. */ #define _DEFAULT_SOURCE @@ -240,9 +294,9 @@ int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange nv->attrib = attrib; node = hnsw_insert(o->hnsw,vec,qvec,qrange,0,nv,ef); if (node == NULL) { - // XXX Technically in Redis-land we don't have out of memories as we - // crash. However the HNSW library may fail for error in the locking - // libc call. Probably impossible in practical terms. + // XXX Technically in Redis-land we don't have out of memory, as we + // crash on OOM. However the HNSW library may fail for error in the + // locking libc call. Probably impossible in practical terms. RedisModule_Free(nv); return 0; } @@ -261,26 +315,29 @@ int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange float *parseVector(RedisModuleString **argv, int argc, int start_idx, size_t *dim, uint32_t *reduce_dim, int *consumed_args) { - int consumed = 0; // Argumnets consumed + int consumed = 0; // Argumnets consumed. - /* Check for REDUCE option first */ + /* Check for REDUCE option first. */ if (reduce_dim) *reduce_dim = 0; if (reduce_dim && argc > start_idx + 2 && !strcasecmp(RedisModule_StringPtrLen(argv[start_idx],NULL),"REDUCE")) { long long rdim; - if (RedisModule_StringToLongLong(argv[start_idx+1],&rdim) != REDISMODULE_OK - || rdim <= 0) return NULL; + if (RedisModule_StringToLongLong(argv[start_idx+1],&rdim) + != REDISMODULE_OK || rdim <= 0) + { + return NULL; + } if (reduce_dim) *reduce_dim = rdim; - start_idx += 2; // Skip REDUCE and its argument + start_idx += 2; // Skip REDUCE and its argument. consumed += 2; } - /* Now parse the vector format as before */ + /* Now parse the vector format as before. */ float *vec = NULL; if (!strcasecmp(RedisModule_StringPtrLen(argv[start_idx],NULL),"FP32")) { - if (argc < start_idx + 2) return NULL; // Need FP32 + vector + value + if (argc < start_idx + 2) return NULL; // Need FP32 + vector + value. size_t vec_raw_len; const char *blob = RedisModule_StringPtrLen(argv[start_idx+1],&vec_raw_len); if (vec_raw_len % 4 || vec_raw_len < 4) return NULL; @@ -290,7 +347,7 @@ float *parseVector(RedisModuleString **argv, int argc, int start_idx, memcpy(vec,blob,vec_raw_len); consumed += 2; } else if (!strcasecmp(RedisModule_StringPtrLen(argv[start_idx],NULL),"VALUES")) { - if (argc < start_idx + 2) return NULL; // Need at least dimension. + if (argc < start_idx + 2) return NULL; // Need at least dimension.. long long vdim; if (RedisModule_StringToLongLong(argv[start_idx+1],&vdim) != REDISMODULE_OK || vdim < 1) return NULL;