From dff6d809fb6c851244ea07afd07f580d7320cc7a Mon Sep 17 00:00:00 2001 From: "Daniel P. Berrange" Date: Thu, 15 Mar 2012 18:18:07 +0000 Subject: [PATCH] Allow RPC server to run single threaded Refactor the RPC server dispatcher code so that if 'max_workers==0' the entire server will run single threaded. This is useful for use cases where there will only ever be 1 client connected which serializes its requests Signed-off-by: Daniel P. Berrange --- src/rpc/virnetserver.c | 123 ++++++++++++++++++++++++----------------- 1 file changed, 73 insertions(+), 50 deletions(-) diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c index 358666d248..4a02aabc59 100644 --- a/src/rpc/virnetserver.c +++ b/src/rpc/virnetserver.c @@ -127,6 +127,49 @@ static void virNetServerUnlock(virNetServerPtr srv) } +static int virNetServerProcessMsg(virNetServerPtr srv, + virNetServerClientPtr client, + virNetServerProgramPtr prog, + virNetMessagePtr msg) +{ + int ret = -1; + if (!prog) { + /* Only send back an error for type == CALL. Other + * message types are not expecting replies, so we + * must just log it & drop them + */ + if (msg->header.type == VIR_NET_CALL || + msg->header.type == VIR_NET_CALL_WITH_FDS) { + if (virNetServerProgramUnknownError(client, + msg, + &msg->header) < 0) + goto cleanup; + } else { + VIR_INFO("Dropping client mesage, unknown program %d version %d type %d proc %d", + msg->header.prog, msg->header.vers, + msg->header.type, msg->header.proc); + /* Send a dummy reply to free up 'msg' & unblock client rx */ + virNetMessageClear(msg); + msg->header.type = VIR_NET_REPLY; + if (virNetServerClientSendMessage(client, msg) < 0) + goto cleanup; + } + goto done; + } + + if (virNetServerProgramDispatch(prog, + srv, + client, + msg) < 0) + goto cleanup; + +done: + ret = 0; + +cleanup: + return ret; +} + static void virNetServerHandleJob(void *jobOpaque, void *opaque) { virNetServerPtr srv = opaque; @@ -135,41 +178,13 @@ static void virNetServerHandleJob(void *jobOpaque, void *opaque) VIR_DEBUG("server=%p client=%p message=%p prog=%p", srv, job->client, job->msg, job->prog); - if (!job->prog) { - /* Only send back an error for type == CALL. Other - * message types are not expecting replies, so we - * must just log it & drop them - */ - if (job->msg->header.type == VIR_NET_CALL || - job->msg->header.type == VIR_NET_CALL_WITH_FDS) { - if (virNetServerProgramUnknownError(job->client, - job->msg, - &job->msg->header) < 0) - goto error; - } else { - VIR_INFO("Dropping client mesage, unknown program %d version %d type %d proc %d", - job->msg->header.prog, job->msg->header.vers, - job->msg->header.type, job->msg->header.proc); - /* Send a dummy reply to free up 'msg' & unblock client rx */ - virNetMessageClear(job->msg); - job->msg->header.type = VIR_NET_REPLY; - if (virNetServerClientSendMessage(job->client, job->msg) < 0) - goto error; - } - goto cleanup; - } - - if (virNetServerProgramDispatch(job->prog, - srv, - job->client, - job->msg) < 0) + if (virNetServerProcessMsg(srv, job->client, job->prog, job->msg) < 0) goto error; virNetServerLock(srv); virNetServerProgramFree(job->prog); virNetServerUnlock(srv); -cleanup: virNetServerClientFree(job->client); VIR_FREE(job); return; @@ -187,7 +202,6 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client, void *opaque) { virNetServerPtr srv = opaque; - virNetServerJobPtr job; virNetServerProgramPtr prog = NULL; unsigned int priority = 0; size_t i; @@ -196,34 +210,42 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client, VIR_DEBUG("server=%p client=%p message=%p", srv, client, msg); - if (VIR_ALLOC(job) < 0) { - virReportOOMError(); - return -1; - } - - job->client = client; - job->msg = msg; - virNetServerLock(srv); for (i = 0 ; i < srv->nprograms ; i++) { - if (virNetServerProgramMatches(srv->programs[i], job->msg)) { + if (virNetServerProgramMatches(srv->programs[i], msg)) { prog = srv->programs[i]; break; } } - if (prog) { - virNetServerProgramRef(prog); - job->prog = prog; - priority = virNetServerProgramGetPriority(prog, msg->header.proc); + if (srv->workers) { + virNetServerJobPtr job; + + if (VIR_ALLOC(job) < 0) { + virReportOOMError(); + goto cleanup; + } + + job->client = client; + job->msg = msg; + + if (prog) { + virNetServerProgramRef(prog); + job->prog = prog; + priority = virNetServerProgramGetPriority(prog, msg->header.proc); + } + + ret = virThreadPoolSendJob(srv->workers, priority, job); + + if (ret < 0) { + VIR_FREE(job); + virNetServerProgramFree(prog); + } + } else { + ret = virNetServerProcessMsg(srv, client, prog, msg); } - ret = virThreadPoolSendJob(srv->workers, priority, job); - - if (ret < 0) { - VIR_FREE(job); - virNetServerProgramFree(prog); - } +cleanup: virNetServerUnlock(srv); return ret; @@ -324,7 +346,8 @@ virNetServerPtr virNetServerNew(size_t min_workers, srv->refs = 1; - if (!(srv->workers = virThreadPoolNew(min_workers, max_workers, + if (max_workers && + !(srv->workers = virThreadPoolNew(min_workers, max_workers, priority_workers, virNetServerHandleJob, srv)))