Don't use cross-thread unlocking.

This commit is contained in:
antirez 2025-04-01 20:51:37 +02:00
parent 3dd48b5b45
commit c6db0a7c20
2 changed files with 76 additions and 30 deletions

View File

@ -17,7 +17,7 @@ endif
endif
endif
CFLAGS = -O2 -Wall -Wextra -g -ffast-math $(SAN)
CFLAGS = -O2 -Wall -Wextra -g $(SAN) -std=c11
LDFLAGS = -lm $(SAN)
# Detect OS
@ -26,7 +26,7 @@ uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
# Shared library compile flags for linux / osx
ifeq ($(uname_S),Linux)
SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2
SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c11 -O2
SHOBJ_LDFLAGS ?= -shared
ifneq (,$(findstring armv,$(uname_M)))
SHOBJ_LDFLAGS += -latomic
@ -35,7 +35,7 @@ ifneq (,$(findstring aarch64,$(uname_M)))
SHOBJ_LDFLAGS += -latomic
endif
else
SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c99 -Ofast -ffast-math
SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c11 -O3
SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup
endif

100
vset.c
View File

@ -57,6 +57,43 @@
* our read commands running in the main thread don't need to use
* hnsw_search() or other HNSW functions using the visited epochs slots
* we are safe.
*
* 4. There is a race from the moment we create a thread, passing the
* vector set object, to the moment the thread can actually lock the
* result win the in_use_lock mutex: as the thread starts, in the meanwhile
* a DEL/expire could trigger and remove the object. For this reason
* we use an atomic counter that protects our object for this small
* time in vectorSetWaitAllBackgroundClients(). This prevents removal
* of objects that are about to be taken by threads.
*
* Note that other competing soltuions could be used to fix the problem
* but have their set of issues, however they are worth documenting here
* and evaluating in the future:
*
* A. Using a conditional variable we could "wait" for the thread to
* acquire the lock. However this means waiting before returning
* to the event loop, and would make the command execution slower.
* B. We could use again an atomic variable, like we did, but this time
* as a refcount for the object, with a vsetAcquire() vsetRelease().
* In this case, the command could retain the object in the main thread
* before starting the thread, and the thread, after the work is done,
* could release it. This way sometimes the object would be freed by
* the thread, and it's while now can be safe to do the kind of resource
* deallocation that vectorSetReleaseObject() does, given that the
* Redis Modules API is not always thread safe this solution may not
* be future-proof. However there is to evaluate it better in the
* future.
* C. We could use the "B" solution but instead of freeing the object
* in the thread, in this specific case we could just put it into a
* list and defer it for later freeing (for instance in the reply
* callback), so that the object is always freed in the main thread.
* This would require a list of objects to free.
*
* However the current solution only disadvantage is the potential busy
* loop, but this busy loop in practical terms will almost never do
* much: to trigger it, a number of circumnstances must happen: deleting
* Vector Set keys while using them, hitting the small window needed to
* start the thread and read-lock the mutex.
*/
#define _DEFAULT_SOURCE
@ -72,6 +109,7 @@
#include <stdint.h>
#include <math.h>
#include <pthread.h>
#include <stdatomic.h>
#include "hnsw.h"
// We inline directly the expression implementation here so that building
@ -107,6 +145,8 @@ struct vsetObject {
uint64_t id; // Unique ID used by threaded VADD to know the
// object is still the same.
uint64_t numattribs; // Number of nodes associated with an attribute.
atomic_int thread_creation_pending; // Number of threads that are currently
// pending to lock the object.
};
/* Each node has two associated values: the associated string (the item
@ -200,6 +240,7 @@ struct vsetObject *createVectorSetObject(unsigned int dim, uint32_t quant_type,
o->proj_matrix = NULL;
o->proj_input_size = 0;
o->numattribs = 0;
o->thread_creation_pending = 0;
RedisModule_Assert(pthread_rwlock_init(&o->in_use_lock,NULL) == 0);
return o;
}
@ -223,8 +264,19 @@ void vectorSetReleaseObject(struct vsetObject *o) {
/* Wait for all the threads performing operations on this
* index to terminate their work (locking for write will
* wait for all the other threads). */
void vectorSetWaitAllBackgroundClients(struct vsetObject *vset) {
* wait for all the other threads).
*
* if 'for_del' is set to 1, we also wait for all the pending threads
* that still didn't acquire the lock to finish their work. This
* is useful only if we are going to call this function to delete
* the object, and not if we want to just to modify it. */
void vectorSetWaitAllBackgroundClients(struct vsetObject *vset, int for_del) {
if (for_del) {
// If we are going to destroy the object, after this call, let's
// wait for threads that are being created and still didn't had
// a chance to acquire the lock.
while (vset->thread_creation_pending > 0);
}
RedisModule_Assert(pthread_rwlock_wrlock(&vset->in_use_lock) == 0);
pthread_rwlock_unlock(&vset->in_use_lock);
}
@ -252,7 +304,7 @@ int vectorSetInsert(struct vsetObject *o, float *vec, int8_t *qvec, float qrange
/* Wait for clients in the background: background VSIM
* operations touch the nodes attributes we are going
* to touch. */
vectorSetWaitAllBackgroundClients(o);
vectorSetWaitAllBackgroundClients(o,0);
struct vsetNodeVal *nv = node->value;
/* Pass NULL as value-free function. We want to reuse
@ -403,6 +455,11 @@ void *VADD_thread(void *arg) {
float *vec = targ[3];
int ef = (uint64_t)targ[6];
/* Lock the object and signal that we are no longer pending
* the lock acquisition. */
RedisModule_Assert(pthread_rwlock_rdlock(&vset->in_use_lock) == 0);
vset->thread_creation_pending--;
/* Look for candidates... */
InsertContext *ic = hnsw_prepare_insert(vset->hnsw, vec, NULL, 0, 0, ef);
targ[5] = ic; // Pass the context to the reply callback.
@ -680,10 +737,6 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
* way, or use a thread to do candidate neighbors selection and only
* later, in the reply callback, actually add the element. */
if (cas) {
/* Make sure the key does not get deleted during the background
* operation. See VSIM implementation for more information. */
RedisModule_Assert(pthread_rwlock_rdlock(&vset->in_use_lock) == 0);
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,VADD_CASReply,NULL,NULL,0);
pthread_t tid;
void **targ = RedisModule_Alloc(sizeof(void*)*8);
@ -698,8 +751,9 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_RetainString(ctx,val);
if (attrib) RedisModule_RetainString(ctx,attrib);
RedisModule_BlockedClientMeasureTimeStart(bc);
vset->thread_creation_pending++;
if (pthread_create(&tid,NULL,VADD_thread,targ) != 0) {
pthread_rwlock_unlock(&vset->in_use_lock);
vset->thread_creation_pending--;
RedisModule_AbortBlock(bc);
RedisModule_Free(targ);
RedisModule_FreeString(ctx,val);
@ -823,15 +877,20 @@ void *VSIM_thread(void *arg) {
RedisModule_Free(targ[4]);
RedisModule_Free(targ);
/* Lock the object and signal that we are no longer pending
* the lock acquisition. */
RedisModule_Assert(pthread_rwlock_rdlock(&vset->in_use_lock) == 0);
vset->thread_creation_pending--;
// Accumulate reply in a thread safe context: no contention.
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
// Run the query.
VSIM_execute(ctx, vset, vec, count, epsilon, withscores, ef, filter_expr, filter_ef, ground_truth);
pthread_rwlock_unlock(&vset->in_use_lock);
// Cleanup.
RedisModule_FreeThreadSafeContext(ctx);
pthread_rwlock_unlock(&vset->in_use_lock);
RedisModule_BlockedClientMeasureTimeEnd(bc);
RedisModule_UnblockClient(bc,NULL);
return NULL;
@ -1022,26 +1081,12 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
}
if (threaded_request) {
/* Spawn the thread serving the request:
* Acquire the lock here so that the object will not be
* destroyed while we work with it in the thread.
*
* This lock should never block, since:
* 1. If we are in the main thread, the key exists (we looked it up)
* and so there is no deletion in progress.
* 2. If the write lock is taken while destroying the object, another
* command or operation (expire?) from the main thread acquired
* it to delete the object, so *it* will block if there are still
* operations in progress on this key.
*
* Note: even if we create one thread per request, the underlying
/* Note: even if we create one thread per request, the underlying
* HNSW library has a fixed number of slots for the threads, as it's
* defined in HNSW_MAX_THREADS (beware that if you increase it,
* every node will use more memory). This means that while this request
* is threaded, and will NOT block Redis, it may end waiting for a
* free slot if all the HNSW_MAX_THREADS slots are used. */
RedisModule_Assert(pthread_rwlock_rdlock(&vset->in_use_lock) == 0);
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
pthread_t tid;
void **targ = RedisModule_Alloc(sizeof(void*)*10);
@ -1057,8 +1102,9 @@ int VSIM_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
targ[8] = (void*)(unsigned long)filter_ef;
targ[9] = (void*)(unsigned long)ground_truth;
RedisModule_BlockedClientMeasureTimeStart(bc);
vset->thread_creation_pending++;
if (pthread_create(&tid,NULL,VSIM_thread,targ) != 0) {
pthread_rwlock_unlock(&vset->in_use_lock);
vset->thread_creation_pending--;
RedisModule_AbortBlock(bc);
RedisModule_Free(targ[4]);
RedisModule_Free(targ);
@ -1259,7 +1305,7 @@ int VSETATTR_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
/* Background VSIM operations use the node attributes, so
* wait for background operations before messing with them. */
vectorSetWaitAllBackgroundClients(vset);
vectorSetWaitAllBackgroundClients(vset,0);
/* Set or delete the attribute based on the fact it's an empty
* string or not. */
@ -1817,7 +1863,7 @@ size_t VectorSetMemUsage(const void *value) {
void VectorSetFree(void *value) {
struct vsetObject *vset = value;
vectorSetWaitAllBackgroundClients(vset);
vectorSetWaitAllBackgroundClients(vset,1);
vectorSetReleaseObject(value);
}