mirror of https://mirror.osredm.com/root/redis.git
VADD CAS: fallback when thread creation fails.
This commit is contained in:
parent
f05912dea2
commit
cc3874ab87
32
vset.c
32
vset.c
|
@ -520,24 +520,15 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Don't do CAS updates. For how things work now, the CAS state would
|
/* For existing keys don't do CAS updates. For how things work now, the
|
||||||
* be invalidated by the detetion before adding back. */
|
* CAS state would be invalidated by the detetion before adding back. */
|
||||||
if (cas && RedisModule_DictGet(vset->dict,val,NULL) != NULL)
|
if (cas && RedisModule_DictGet(vset->dict,val,NULL) != NULL)
|
||||||
cas = 0;
|
cas = 0;
|
||||||
|
|
||||||
/* Here depending on the CAS option we directly insert in a blocking
|
/* Here depending on the CAS option we directly insert in a blocking
|
||||||
* way, or use a thread to do candidate neighbors selection and only
|
* way, or use a thread to do candidate neighbors selection and only
|
||||||
* later, in the reply callback, actually add the element. */
|
* later, in the reply callback, actually add the element. */
|
||||||
|
if (cas) {
|
||||||
if (!cas) {
|
|
||||||
/* Insert vector synchronously. */
|
|
||||||
int added = vectorSetInsert(vset,vec,NULL,0,val,attrib,1,ef);
|
|
||||||
RedisModule_Free(vec);
|
|
||||||
|
|
||||||
RedisModule_ReplyWithLongLong(ctx,added);
|
|
||||||
if (added) RedisModule_ReplicateVerbatim(ctx);
|
|
||||||
return REDISMODULE_OK;
|
|
||||||
} else {
|
|
||||||
/* Make sure the key does not get deleted during the background
|
/* Make sure the key does not get deleted during the background
|
||||||
* operation. See VSIM implementation for more information. */
|
* operation. See VSIM implementation for more information. */
|
||||||
pthread_rwlock_rdlock(&vset->in_use_lock);
|
pthread_rwlock_rdlock(&vset->in_use_lock);
|
||||||
|
@ -558,13 +549,22 @@ int VADD_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
if (pthread_create(&tid,NULL,VADD_thread,targ) != 0) {
|
if (pthread_create(&tid,NULL,VADD_thread,targ) != 0) {
|
||||||
pthread_rwlock_unlock(&vset->in_use_lock);
|
pthread_rwlock_unlock(&vset->in_use_lock);
|
||||||
RedisModule_AbortBlock(bc);
|
RedisModule_AbortBlock(bc);
|
||||||
RedisModule_FreeString(ctx, val);
|
|
||||||
RedisModule_Free(vec);
|
|
||||||
RedisModule_Free(targ);
|
RedisModule_Free(targ);
|
||||||
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
|
|
||||||
|
// Fall back to synchronous insert, see later in the code.
|
||||||
|
} else {
|
||||||
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
return REDISMODULE_OK;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Insert vector synchronously: we reach this place even
|
||||||
|
* if cas was true but thread creation failed. */
|
||||||
|
int added = vectorSetInsert(vset,vec,NULL,0,val,attrib,1,ef);
|
||||||
|
RedisModule_Free(vec);
|
||||||
|
|
||||||
|
RedisModule_ReplyWithLongLong(ctx,added);
|
||||||
|
if (added) RedisModule_ReplicateVerbatim(ctx);
|
||||||
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* HNSW callback to filter items according to a predicate function
|
/* HNSW callback to filter items according to a predicate function
|
||||||
|
|
Loading…
Reference in New Issue