Make HNSW CAS commit atomic.

This way we don't need to mess with node->value at a latter time
where an explicit lock would be required. Now we have:

1. Prepare context (neighbors).
2. Commit, and set the associated value.
This commit is contained in:
antirez 2025-03-27 11:05:04 +01:00
parent c61c535c32
commit 29c27bc13e
4 changed files with 17 additions and 19 deletions

20
hnsw.c
View File

@ -375,7 +375,7 @@ void hnsw_normalize_vector(float *x, float *l2ptr, uint32_t dim) {
}
/* Helper function to generate random level. */
uint32_t random_level() {
uint32_t random_level(void) {
static const int threshold = HNSW_P * RAND_MAX;
uint32_t level = 0;
@ -1656,7 +1656,7 @@ struct InsertContext {
* See hnsw_node_new() for information about 'vector' and 'qvector'
* arguments, and which one to pass. */
InsertContext *hnsw_prepare_insert_nolock(HNSW *index, const float *vector,
const int8_t *qvector, float qrange, uint64_t id, void *value,
const int8_t *qvector, float qrange, uint64_t id,
int slot, int ef)
{
InsertContext *ctx = hmalloc(sizeof(*ctx));
@ -1673,7 +1673,6 @@ InsertContext *hnsw_prepare_insert_nolock(HNSW *index, const float *vector,
hfree(ctx);
return NULL;
}
ctx->node->value = value;
hnswNode *curr_ep = index->enter_point;
@ -1707,12 +1706,12 @@ InsertContext *hnsw_prepare_insert_nolock(HNSW *index, const float *vector,
/* External API for hnsw_prepare_insert_nolock(), handling locking. */
InsertContext *hnsw_prepare_insert(HNSW *index, const float *vector,
const int8_t *qvector, float qrange, uint64_t id, void *value,
const int8_t *qvector, float qrange, uint64_t id,
int ef)
{
InsertContext *ctx;
int slot = hnsw_acquire_read_slot(index);
ctx = hnsw_prepare_insert_nolock(index,vector,qvector,qrange,id,value,slot,ef);
ctx = hnsw_prepare_insert_nolock(index,vector,qvector,qrange,id,slot,ef);
hnsw_release_read_slot(index,slot);
return ctx;
}
@ -1738,8 +1737,9 @@ void hnsw_free_insert_context(InsertContext *ctx) {
* just inserted node. Out of memory is not possible since no critical
* allocation is never performed in this code path: we populate links
* on already allocated nodes. */
hnswNode *hnsw_commit_insert_nolock(HNSW *index, InsertContext *ctx) {
hnswNode *hnsw_commit_insert_nolock(HNSW *index, InsertContext *ctx, void *value) {
hnswNode *node = ctx->node;
node->value = value;
/* Handle first node case. */
if (index->enter_point == NULL) {
@ -1798,7 +1798,7 @@ hnswNode *hnsw_commit_insert_nolock(HNSW *index, InsertContext *ctx) {
* index and return its pointer. Otherwise NULL is returned and the operation
* should be either performed with the blocking API hnsw_insert() or attempted
* again. */
hnswNode *hnsw_try_commit_insert(HNSW *index, InsertContext *ctx) {
hnswNode *hnsw_try_commit_insert(HNSW *index, InsertContext *ctx, void *value) {
/* Check if the version changed since preparation. Note that we
* should access index->version under the write lock in order to
* be sure we can safely commit the write: this is just a fast-path
@ -1824,7 +1824,7 @@ hnswNode *hnsw_try_commit_insert(HNSW *index, InsertContext *ctx) {
/* Commit the change: note that it's up to hnsw_commit_insert_nolock()
* to free the insertion context. */
hnswNode *node = hnsw_commit_insert_nolock(index, ctx);
hnswNode *node = hnsw_commit_insert_nolock(index, ctx, value);
/* Release the write lock. */
pthread_rwlock_unlock(&index->global_lock);
@ -1848,14 +1848,14 @@ hnswNode *hnsw_insert(HNSW *index, const float *vector, const int8_t *qvector, f
// Prepare the insertion - note we pass slot 0 since we're single threaded.
InsertContext *ctx = hnsw_prepare_insert_nolock(index, vector, qvector,
qrange, id, value, 0, ef);
qrange, id, 0, ef);
if (!ctx) {
pthread_rwlock_unlock(&index->global_lock);
return NULL;
}
// Commit the prepared insertion without version checking.
hnswNode *node = hnsw_commit_insert_nolock(index, ctx);
hnswNode *node = hnsw_commit_insert_nolock(index, ctx, value);
// Release write lock and return our node pointer.
pthread_rwlock_unlock(&index->global_lock);

4
hnsw.h
View File

@ -144,8 +144,8 @@ int hnsw_acquire_read_slot(HNSW *index);
void hnsw_release_read_slot(HNSW *index, int slot);
/* Optimistic insertion API. */
InsertContext *hnsw_prepare_insert(HNSW *index, const float *vector, const int8_t *qvector, float qrange, uint64_t id, void *value, int ef);
hnswNode *hnsw_try_commit_insert(HNSW *index, InsertContext *ctx);
InsertContext *hnsw_prepare_insert(HNSW *index, const float *vector, const int8_t *qvector, float qrange, uint64_t id, int ef);
hnswNode *hnsw_try_commit_insert(HNSW *index, InsertContext *ctx, void *value);
void hnsw_free_insert_context(InsertContext *ctx);
/* Serialization. */

8
vset.c
View File

@ -339,7 +339,7 @@ float *parseVector(RedisModuleString **argv, int argc, int start_idx,
/* Now parse the vector format as before. */
float *vec = NULL;
char *vec_format = RedisModule_StringPtrLen(argv[start_idx],NULL);
const char *vec_format = RedisModule_StringPtrLen(argv[start_idx],NULL);
if (!strcasecmp(vec_format,"FP32")) {
if (argc < start_idx + 2) return NULL; // Need FP32 + vector + value.
@ -404,7 +404,7 @@ void *VADD_thread(void *arg) {
int ef = (uint64_t)targ[6];
/* Look for candidates... */
InsertContext *ic = hnsw_prepare_insert(vset->hnsw, vec, NULL, 0, 0, NULL, ef);
InsertContext *ic = hnsw_prepare_insert(vset->hnsw, vec, NULL, 0, 0, ef);
targ[5] = ic; // Pass the context to the reply callback.
/* Unblock the client so that our read reply will be invoked. */
@ -473,15 +473,13 @@ int VADD_CASReply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
* locking failure (likely impossible in practical terms). */
hnswNode *newnode;
if (ic == NULL ||
(newnode = hnsw_try_commit_insert(vset->hnsw, ic)) == NULL)
(newnode = hnsw_try_commit_insert(vset->hnsw, ic, nv)) == NULL)
{
/* If we are here, the CAS insert failed. We need to insert
* again with full locking for neighbors selection and
* actual insertion. This time we can't fail: */
newnode = hnsw_insert(vset->hnsw, vec, NULL, 0, 0, nv, ef);
RedisModule_Assert(newnode != NULL);
} else {
newnode->value = nv;
}
RedisModule_DictSet(vset->dict,val,newnode);
val = NULL; // Don't free it later.

4
w2v.c
View File

@ -344,8 +344,8 @@ void *threaded_insert(void *ctxptr) {
// applies the check if the graph wasn't modified.
InsertContext *ic;
uint64_t next_id = ctx->id++;
ic = hnsw_prepare_insert(ctx->index, v, NULL, 0, next_id, word, 200);
if (hnsw_try_commit_insert(ctx->index, ic) == NULL) {
ic = hnsw_prepare_insert(ctx->index, v, NULL, 0, next_id, 200);
if (hnsw_try_commit_insert(ctx->index, ic, word) == NULL) {
// This time try locking since the start.
hnsw_insert(ctx->index, v, NULL, 0, next_id, word, 200);
}