From cc3874ab87865d9f7fbb86f20c52ca96275b1f70 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 12 Mar 2025 16:57:03 +0100 Subject: [PATCH] VADD CAS: fallback when thread creation fails. --- vset.c | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/vset.c b/vset.c index 310ef462e..f51ea0ee9 100644 --- a/vset.c +++ b/vset.c @@ -520,24 +520,15 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } } - /* Don't do CAS updates. For how things work now, the CAS state would - * be invalidated by the detetion before adding back. */ + /* For existing keys don't do CAS updates. For how things work now, the + * CAS state would be invalidated by the detetion before adding back. */ if (cas && RedisModule_DictGet(vset->dict,val,NULL) != NULL) cas = 0; /* Here depending on the CAS option we directly insert in a blocking * way, or use a thread to do candidate neighbors selection and only * later, in the reply callback, actually add the element. */ - - if (!cas) { - /* Insert vector synchronously. */ - int added = vectorSetInsert(vset,vec,NULL,0,val,attrib,1,ef); - RedisModule_Free(vec); - - RedisModule_ReplyWithLongLong(ctx,added); - if (added) RedisModule_ReplicateVerbatim(ctx); - return REDISMODULE_OK; - } else { + if (cas) { /* Make sure the key does not get deleted during the background * operation. See VSIM implementation for more information. */ pthread_rwlock_rdlock(&vset->in_use_lock); @@ -558,13 +549,22 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (pthread_create(&tid,NULL,VADD_thread,targ) != 0) { pthread_rwlock_unlock(&vset->in_use_lock); RedisModule_AbortBlock(bc); - RedisModule_FreeString(ctx, val); - RedisModule_Free(vec); RedisModule_Free(targ); - return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); + + // Fall back to synchronous insert, see later in the code. + } else { + return REDISMODULE_OK; } - return REDISMODULE_OK; } + + /* Insert vector synchronously: we reach this place even + * if cas was true but thread creation failed. */ + int added = vectorSetInsert(vset,vec,NULL,0,val,attrib,1,ef); + RedisModule_Free(vec); + + RedisModule_ReplyWithLongLong(ctx,added); + if (added) RedisModule_ReplicateVerbatim(ctx); + return REDISMODULE_OK; } /* HNSW callback to filter items according to a predicate function