diff --git a/qemud/qemud.c b/qemud/qemud.c index 42bc00eeb8..e393db464a 100644 --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -1457,8 +1457,7 @@ static void *qemudWorker(void *data) /* This function drops the lock during dispatch, * and re-acquires it before returning */ - if (remoteDecodeClientMessageHeader(msg) < 0 || - remoteDispatchClientRequest (server, client, msg) < 0) { + if (remoteDispatchClientRequest (server, client, msg) < 0) { VIR_FREE(msg); qemudDispatchClientFailure(client); client->refs--; @@ -1705,9 +1704,30 @@ readmore: waiting for us */ goto readmore; } else { + /* Grab the completed message */ + struct qemud_client_message *msg = qemudClientMessageQueueServe(&client->rx); + struct qemud_client_filter *filter; + + /* Decode the header so we can use it for routing decisions */ + if (remoteDecodeClientMessageHeader(msg) < 0) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); + } + + /* Check if any filters match this message */ + filter = client->filters; + while (filter) { + if ((filter->query)(msg, filter->opaque)) { + qemudClientMessageQueuePush(&filter->dx, msg); + msg = NULL; + break; + } + filter = filter->next; + } + /* Move completed message to the end of the dispatch queue */ - qemudClientMessageQueuePush(&client->dx, client->rx); - client->rx = NULL; + if (msg) + qemudClientMessageQueuePush(&client->dx, msg); client->nrequests++; /* Possibly need to create another receive buffer */ diff --git a/qemud/qemud.h b/qemud/qemud.h index 86b893da62..abacbbbb69 100644 --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -90,6 +90,19 @@ struct qemud_client_message { struct qemud_client_message *next; }; +/* Allow for filtering of incoming messages to a custom + * dispatch processing queue, instead of client->dx. + */ +typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque); +struct qemud_client_filter { + qemud_client_filter_func query; + void *opaque; + + struct qemud_client_message *dx; + + struct qemud_client_filter *next; +}; + /* Stores the per-client connection state */ struct qemud_client { virMutex lock; @@ -134,6 +147,9 @@ struct qemud_client { /* Zero or many messages waiting for transmit * back to client, including async events */ struct qemud_client_message *tx; + /* Filters to capture messages that would otherwise + * end up on the 'dx' queue */ + struct qemud_client_filter *filters; /* This is only valid if a remote open call has been made on this * connection, otherwise it will be NULL. Also if remote close is