diff --git a/block/nbd.c b/block/nbd.c index 9d661c1a92..3f693e3ffa 100644 --- a/block/nbd.c +++ b/block/nbd.c @@ -46,6 +46,10 @@ #define logout(fmt, ...) ((void)0) #endif +#define MAX_NBD_REQUESTS 16 +#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs)) +#define INDEX_TO_HANDLE(bs, index) ((index) ^ ((uint64_t)(intptr_t)bs)) + typedef struct BDRVNBDState { int sock; uint32_t nbdflags; @@ -53,9 +57,12 @@ typedef struct BDRVNBDState { size_t blocksize; char *export_name; /* An NBD server may export several devices */ - CoMutex mutex; - Coroutine *coroutine; + CoMutex send_mutex; + CoMutex free_sema; + Coroutine *send_coroutine; + int in_flight; + Coroutine *recv_coroutine[MAX_NBD_REQUESTS]; struct nbd_reply reply; /* If it begins with '/', this is a UNIX domain socket. Otherwise, @@ -112,41 +119,68 @@ out: static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request) { - qemu_co_mutex_lock(&s->mutex); - s->coroutine = qemu_coroutine_self(); - request->handle = (uint64_t)(intptr_t)s; + int i; + + /* Poor man semaphore. The free_sema is locked when no other request + * can be accepted, and unlocked after receiving one reply. */ + if (s->in_flight >= MAX_NBD_REQUESTS - 1) { + qemu_co_mutex_lock(&s->free_sema); + assert(s->in_flight < MAX_NBD_REQUESTS); + } + s->in_flight++; + + for (i = 0; i < MAX_NBD_REQUESTS; i++) { + if (s->recv_coroutine[i] == NULL) { + s->recv_coroutine[i] = qemu_coroutine_self(); + break; + } + } + + assert(i < MAX_NBD_REQUESTS); + request->handle = INDEX_TO_HANDLE(s, i); } static int nbd_have_request(void *opaque) { BDRVNBDState *s = opaque; - return !!s->coroutine; + return s->in_flight > 0; } static void nbd_reply_ready(void *opaque) { BDRVNBDState *s = opaque; + int i; if (s->reply.handle == 0) { /* No reply already in flight. Fetch a header. */ if (nbd_receive_reply(s->sock, &s->reply) < 0) { s->reply.handle = 0; + goto fail; } } /* There's no need for a mutex on the receive side, because the * handler acts as a synchronization point and ensures that only * one coroutine is called until the reply finishes. */ - if (s->coroutine) { - qemu_coroutine_enter(s->coroutine, NULL); + i = HANDLE_TO_INDEX(s, s->reply.handle); + if (s->recv_coroutine[i]) { + qemu_coroutine_enter(s->recv_coroutine[i], NULL); + return; + } + +fail: + for (i = 0; i < MAX_NBD_REQUESTS; i++) { + if (s->recv_coroutine[i]) { + qemu_coroutine_enter(s->recv_coroutine[i], NULL); + } } } static void nbd_restart_write(void *opaque) { BDRVNBDState *s = opaque; - qemu_coroutine_enter(s->coroutine, NULL); + qemu_coroutine_enter(s->send_coroutine, NULL); } static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, @@ -154,6 +188,8 @@ static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, { int rc, ret; + qemu_co_mutex_lock(&s->send_mutex); + s->send_coroutine = qemu_coroutine_self(); qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, nbd_have_request, NULL, s); rc = nbd_send_request(s->sock, request); @@ -166,6 +202,8 @@ static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request, } qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, nbd_have_request, NULL, s); + s->send_coroutine = NULL; + qemu_co_mutex_unlock(&s->send_mutex); return rc; } @@ -175,7 +213,8 @@ static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, { int ret; - /* Wait until we're woken up by the read handler. */ + /* Wait until we're woken up by the read handler. TODO: perhaps + * peek at the next reply and avoid yielding if it's ours? */ qemu_coroutine_yield(); *reply = s->reply; if (reply->handle != request->handle) { @@ -195,8 +234,11 @@ static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request, static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request) { - s->coroutine = NULL; - qemu_co_mutex_unlock(&s->mutex); + int i = HANDLE_TO_INDEX(s, request->handle); + s->recv_coroutine[i] = NULL; + if (s->in_flight-- == MAX_NBD_REQUESTS) { + qemu_co_mutex_unlock(&s->free_sema); + } } static int nbd_establish_connection(BlockDriverState *bs) @@ -261,7 +303,8 @@ static int nbd_open(BlockDriverState *bs, const char* filename, int flags) BDRVNBDState *s = bs->opaque; int result; - qemu_co_mutex_init(&s->mutex); + qemu_co_mutex_init(&s->send_mutex); + qemu_co_mutex_init(&s->free_sema); /* Pop the config into our state object. Exit if invalid. */ result = nbd_config(s, filename, flags);