From caaa1b8f13288b6560a65963420a635ca1c2be28 Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Fri, 10 Jul 2009 12:58:22 +0100 Subject: [PATCH] Define an API for registering incoming message dispatch filters All incoming messages currently get routed to the generic method remoteDispatchClientRequest() for processing. To allow incoming data stream messages to bypass this and be routed to a specific location, a concept of dispatch filters is introduced. * qemud/qemud.h: Add a qemud_client_filter struct and a callback qemud_client_filter_func. Maintain a list of filters on every struct qemud_client * qemud/qemud.c: Move remoteDecodeClientMessageHeader() out of qemudWorker() into qemudDispatchClientRead(). Check registered message filters in qemudDispatchClientRead() to decide where to send incoming messages for dispatch. --- qemud/qemud.c | 28 ++++++++++++++++++++++++---- qemud/qemud.h | 16 ++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/qemud/qemud.c b/qemud/qemud.c index 42bc00eeb8..e393db464a 100644 --- a/qemud/qemud.c +++ b/qemud/qemud.c @@ -1457,8 +1457,7 @@ static void *qemudWorker(void *data) /* This function drops the lock during dispatch, * and re-acquires it before returning */ - if (remoteDecodeClientMessageHeader(msg) < 0 || - remoteDispatchClientRequest (server, client, msg) < 0) { + if (remoteDispatchClientRequest (server, client, msg) < 0) { VIR_FREE(msg); qemudDispatchClientFailure(client); client->refs--; @@ -1705,9 +1704,30 @@ readmore: waiting for us */ goto readmore; } else { + /* Grab the completed message */ + struct qemud_client_message *msg = qemudClientMessageQueueServe(&client->rx); + struct qemud_client_filter *filter; + + /* Decode the header so we can use it for routing decisions */ + if (remoteDecodeClientMessageHeader(msg) < 0) { + VIR_FREE(msg); + qemudDispatchClientFailure(client); + } + + /* Check if any filters match this message */ + filter = client->filters; + while (filter) { + if ((filter->query)(msg, filter->opaque)) { + qemudClientMessageQueuePush(&filter->dx, msg); + msg = NULL; + break; + } + filter = filter->next; + } + /* Move completed message to the end of the dispatch queue */ - qemudClientMessageQueuePush(&client->dx, client->rx); - client->rx = NULL; + if (msg) + qemudClientMessageQueuePush(&client->dx, msg); client->nrequests++; /* Possibly need to create another receive buffer */ diff --git a/qemud/qemud.h b/qemud/qemud.h index 86b893da62..abacbbbb69 100644 --- a/qemud/qemud.h +++ b/qemud/qemud.h @@ -90,6 +90,19 @@ struct qemud_client_message { struct qemud_client_message *next; }; +/* Allow for filtering of incoming messages to a custom + * dispatch processing queue, instead of client->dx. + */ +typedef int (*qemud_client_filter_func)(struct qemud_client_message *msg, void *opaque); +struct qemud_client_filter { + qemud_client_filter_func query; + void *opaque; + + struct qemud_client_message *dx; + + struct qemud_client_filter *next; +}; + /* Stores the per-client connection state */ struct qemud_client { virMutex lock; @@ -134,6 +147,9 @@ struct qemud_client { /* Zero or many messages waiting for transmit * back to client, including async events */ struct qemud_client_message *tx; + /* Filters to capture messages that would otherwise + * end up on the 'dx' queue */ + struct qemud_client_filter *filters; /* This is only valid if a remote open call has been made on this * connection, otherwise it will be NULL. Also if remote close is