From 8a5cf17cb2cd7cf3c515370b7eab79f3090a7fd5 Mon Sep 17 00:00:00 2001 From: antirez Date: Sat, 15 Mar 2025 23:31:24 +0100 Subject: [PATCH] HNSW: cursor fixes and thread safety. --- hnsw.c | 36 +++++++++++++++++++++++++++++------- hnsw.h | 9 +++++++-- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/hnsw.c b/hnsw.c index f09e0497a..1c29b75c1 100644 --- a/hnsw.c +++ b/hnsw.c @@ -2237,36 +2237,58 @@ int hnsw_deserialize_index(HNSW *index) { * * The function returns NULL on out of memory. */ hnswCursor *hnsw_cursor_init(HNSW *index) { + if (pthread_rwlock_wrlock(&index->global_lock) != 0) return NULL; hnswCursor *cursor = hmalloc(sizeof(*cursor)); - if (cursor == NULL) return NULL; + if (cursor == NULL) { + pthread_rwlock_unlock(&index->global_lock); + return NULL; + } + cursor->index = index; cursor->next = index->cursors; cursor->current = index->head; index->cursors = cursor; + pthread_rwlock_unlock(&index->global_lock); return cursor; } /* Free the cursor. Can be called both at the end of the iteration, when * hnsw_cursor_next() returned NULL, or before. */ -void hnsw_cursor_free(HNSW *index, hnswCursor *cursor) { - hnswCursor *x = index->cursors; +void hnsw_cursor_free(hnswCursor *cursor) { + pthread_rwlock_wrlock(&cursor->index->global_lock); // Best effort. + hnswCursor *x = cursor->index->cursors; hnswCursor *prev = NULL; while(x) { if (x == cursor) { if (prev) prev->next = cursor->next; else - index->cursors = cursor->next; + cursor->index->cursors = cursor->next; hfree(cursor); - return; + break; } x = x->next; } + pthread_rwlock_unlock(&cursor->index->global_lock); +} + +/* Acquire a lock to use the cursor. Returns 1 if the lock was acquired + * with success, otherwise zero is returned. The returned element is + * protected after calling hnsw_cursor_next() for all the time required to + * access it, then hnsw_cursor_release_lock() should be called in order + * to unlock the HNSW index. */ +int hnsw_cursor_acquire_lock(hnswCursor *cursor) { + return pthread_rwlock_rdlock(&cursor->index->global_lock) == 0; +} + +/* Release the cursor lock, see hnsw_cursor_acquire_lock() top comment + * for more information. */ +void hnsw_cursor_release_lock(hnswCursor *cursor) { + pthread_rwlock_unlock(&cursor->index->global_lock); } /* Return the next element of the HNSW. See hnsw_cursor_init() for * the guarantees of the function. */ -hnswNode *hnsw_cursor_next(HNSW *index, hnswCursor *cursor) { - (void)index; // Unused but future proof to have. +hnswNode *hnsw_cursor_next(hnswCursor *cursor) { hnswNode *ret = cursor->current; if (ret) cursor->current = ret->next; return ret; diff --git a/hnsw.h b/hnsw.h index 69c3153b3..48af67d78 100644 --- a/hnsw.h +++ b/hnsw.h @@ -65,12 +65,15 @@ typedef struct hnswNode { hnswNodeLayer layers[]; } hnswNode; +struct HNSW; + /* It is possible to navigate an HNSW with a cursor that guarantees * visiting all the elements that remain in the HNSW from the start to the * end of the process (but not the new ones, so that the process will * eventually finish). Check hnsw_cursor_init(), hnsw_cursor_next() and * hnsw_cursor_free(). */ typedef struct hnswCursor { + struct HNSW *index; // Reference to the index of this cursor. hnswNode *current; // Element to report when hnsw_cursor_next() is called. struct hnswCursor *next; // Next cursor active. } hnswCursor; @@ -156,8 +159,10 @@ uint32_t hnsw_quants_bytes(HNSW *index); /* Cursors. */ hnswCursor *hnsw_cursor_init(HNSW *index); -void hnsw_cursor_free(HNSW *index, hnswCursor *cursor); -hnswNode *hnsw_cursor_next(HNSW *index, hnswCursor *cursor); +void hnsw_cursor_free(hnswCursor *cursor); +hnswNode *hnsw_cursor_next(hnswCursor *cursor); +int hnsw_cursor_acquire_lock(hnswCursor *cursor); +void hnsw_cursor_release_lock(hnswCursor *cursor); /* Allocator selection. */ void hnsw_set_allocator(void (*free_ptr)(void*), void *(*malloc_ptr)(size_t),