mirror of https://gitee.com/openkylin/libvirt.git
Support callbacks on virStream APIs in remote driver client
The current remote driver code for streams only supports blocking I/O mode. This is fine for the usage with migration but is a problem for more general use cases, in particular bi-directional streams. This adds supported for the stream callbacks and non-blocking I/O. with the minor caveat is that it doesn't actually do non-blocking I/O for sending stream data, only receiving it. A future patch will try to do non-blocking sends, but this is quite tricky to get right. * src/remote/remote_driver.c: Allow non-blocking I/O for streams and support callbacks
This commit is contained in:
parent
2fbec00203
commit
5126926548
|
@ -133,6 +133,13 @@ struct private_stream_data {
|
|||
unsigned int serial;
|
||||
unsigned int proc_nr;
|
||||
|
||||
virStreamEventCallback cb;
|
||||
void *cbOpaque;
|
||||
virFreeCallback cbFree;
|
||||
int cbEvents;
|
||||
int cbTimer;
|
||||
int cbDispatch;
|
||||
|
||||
/* XXX this is potentially unbounded if the client
|
||||
* app has domain events registered, since packets
|
||||
* may be read off wire, while app isn't ready to
|
||||
|
@ -201,9 +208,10 @@ struct private_data {
|
|||
};
|
||||
|
||||
enum {
|
||||
REMOTE_CALL_IN_OPEN = (1 << 0),
|
||||
REMOTE_CALL_IN_OPEN = (1 << 0),
|
||||
REMOTE_CALL_QUIET_MISSING_RPC = (1 << 1),
|
||||
REMOTE_QEMU_CALL = (1 << 2),
|
||||
REMOTE_CALL_QEMU = (1 << 2),
|
||||
REMOTE_CALL_NONBLOCK = (1 << 3),
|
||||
};
|
||||
|
||||
|
||||
|
@ -8138,6 +8146,20 @@ remoteStreamOpen(virStreamPtr st,
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
remoteStreamEventTimerUpdate(struct private_stream_data *privst)
|
||||
{
|
||||
if (!privst->cb)
|
||||
return;
|
||||
|
||||
if (!privst->cbEvents)
|
||||
virEventUpdateTimeout(privst->cbTimer, -1);
|
||||
else if (privst->incoming &&
|
||||
(privst->cbEvents & VIR_STREAM_EVENT_READABLE))
|
||||
virEventUpdateTimeout(privst->cbTimer, 0);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
remoteStreamPacket(virStreamPtr st,
|
||||
int status,
|
||||
|
@ -8332,6 +8354,12 @@ remoteStreamRecv(virStreamPtr st,
|
|||
struct remote_thread_call *thiscall;
|
||||
int ret;
|
||||
|
||||
if (st->flags & VIR_STREAM_NONBLOCK) {
|
||||
DEBUG0("Non-blocking mode and no data available");
|
||||
rv = -2;
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (VIR_ALLOC(thiscall) < 0) {
|
||||
virReportOOMError();
|
||||
goto cleanup;
|
||||
|
@ -8375,6 +8403,8 @@ remoteStreamRecv(virStreamPtr st,
|
|||
rv = 0;
|
||||
}
|
||||
|
||||
remoteStreamEventTimerUpdate(privst);
|
||||
|
||||
DEBUG("Done %d", rv);
|
||||
|
||||
cleanup:
|
||||
|
@ -8385,28 +8415,153 @@ cleanup:
|
|||
return rv;
|
||||
}
|
||||
|
||||
static int
|
||||
remoteStreamEventAddCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
|
||||
int events ATTRIBUTE_UNUSED,
|
||||
virStreamEventCallback cb ATTRIBUTE_UNUSED,
|
||||
void *opaque ATTRIBUTE_UNUSED,
|
||||
virFreeCallback ff ATTRIBUTE_UNUSED)
|
||||
|
||||
static void
|
||||
remoteStreamEventTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
|
||||
{
|
||||
return -1;
|
||||
virStreamPtr st = opaque;
|
||||
struct private_data *priv = st->conn->privateData;
|
||||
struct private_stream_data *privst = st->privateData;
|
||||
|
||||
remoteDriverLock(priv);
|
||||
if (privst->cb &&
|
||||
(privst->cbEvents & VIR_STREAM_EVENT_READABLE) &&
|
||||
privst->incomingOffset) {
|
||||
virStreamEventCallback cb = privst->cb;
|
||||
void *cbOpaque = privst->cbOpaque;
|
||||
virFreeCallback cbFree = privst->cbFree;
|
||||
|
||||
privst->cbDispatch = 1;
|
||||
remoteDriverUnlock(priv);
|
||||
(cb)(st, VIR_STREAM_EVENT_READABLE, cbOpaque);
|
||||
remoteDriverLock(priv);
|
||||
privst->cbDispatch = 0;
|
||||
|
||||
if (!privst->cb && cbFree)
|
||||
(cbFree)(cbOpaque);
|
||||
}
|
||||
remoteDriverUnlock(priv);
|
||||
}
|
||||
|
||||
static int
|
||||
remoteStreamEventUpdateCallback(virStreamPtr stream ATTRIBUTE_UNUSED,
|
||||
int events ATTRIBUTE_UNUSED)
|
||||
|
||||
static void
|
||||
remoteStreamEventTimerFree(void *opaque)
|
||||
{
|
||||
return -1;
|
||||
virStreamPtr st = opaque;
|
||||
virUnrefStream(st);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
remoteStreamEventRemoveCallback(virStreamPtr stream ATTRIBUTE_UNUSED)
|
||||
remoteStreamEventAddCallback(virStreamPtr st,
|
||||
int events,
|
||||
virStreamEventCallback cb,
|
||||
void *opaque,
|
||||
virFreeCallback ff)
|
||||
{
|
||||
return -1;
|
||||
struct private_data *priv = st->conn->privateData;
|
||||
struct private_stream_data *privst = st->privateData;
|
||||
int ret = -1;
|
||||
|
||||
remoteDriverLock(priv);
|
||||
|
||||
if (events & ~VIR_STREAM_EVENT_READABLE) {
|
||||
remoteError(VIR_ERR_INTERNAL_ERROR,
|
||||
_("unsupported stream events %d"), events);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (privst->cb) {
|
||||
remoteError(VIR_ERR_INTERNAL_ERROR,
|
||||
_("multiple stream callbacks not supported"));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
virStreamRef(st);
|
||||
if ((privst->cbTimer =
|
||||
virEventAddTimeout(-1,
|
||||
remoteStreamEventTimer,
|
||||
st,
|
||||
remoteStreamEventTimerFree)) < 0) {
|
||||
virUnrefStream(st);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
privst->cb = cb;
|
||||
privst->cbOpaque = opaque;
|
||||
privst->cbFree = ff;
|
||||
privst->cbEvents = events;
|
||||
|
||||
ret = 0;
|
||||
|
||||
cleanup:
|
||||
remoteDriverUnlock(priv);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int
|
||||
remoteStreamEventUpdateCallback(virStreamPtr st,
|
||||
int events)
|
||||
{
|
||||
struct private_data *priv = st->conn->privateData;
|
||||
struct private_stream_data *privst = st->privateData;
|
||||
int ret = -1;
|
||||
|
||||
remoteDriverLock(priv);
|
||||
|
||||
if (events & ~VIR_STREAM_EVENT_READABLE) {
|
||||
remoteError(VIR_ERR_INTERNAL_ERROR,
|
||||
_("unsupported stream events %d"), events);
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (!privst->cb) {
|
||||
remoteError(VIR_ERR_INTERNAL_ERROR,
|
||||
_("no stream callback registered"));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
privst->cbEvents = events;
|
||||
|
||||
remoteStreamEventTimerUpdate(privst);
|
||||
|
||||
ret = 0;
|
||||
|
||||
cleanup:
|
||||
remoteDriverUnlock(priv);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
remoteStreamEventRemoveCallback(virStreamPtr st)
|
||||
{
|
||||
struct private_data *priv = st->conn->privateData;
|
||||
struct private_stream_data *privst = st->privateData;
|
||||
int ret = -1;
|
||||
|
||||
remoteDriverLock(priv);
|
||||
|
||||
if (!privst->cb) {
|
||||
remoteError(VIR_ERR_INTERNAL_ERROR,
|
||||
_("no stream callback registered"));
|
||||
goto cleanup;
|
||||
}
|
||||
|
||||
if (!privst->cbDispatch &&
|
||||
privst->cbFree)
|
||||
(privst->cbFree)(privst->cbOpaque);
|
||||
privst->cb = NULL;
|
||||
privst->cbOpaque = NULL;
|
||||
privst->cbFree = NULL;
|
||||
privst->cbEvents = 0;
|
||||
virEventRemoveTimeout(privst->cbTimer);
|
||||
|
||||
ret = 0;
|
||||
|
||||
cleanup:
|
||||
remoteDriverUnlock(priv);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static int
|
||||
|
@ -9059,7 +9214,7 @@ remoteQemuDomainMonitorCommand (virDomainPtr domain, const char *cmd,
|
|||
args.flags = flags;
|
||||
|
||||
memset (&ret, 0, sizeof ret);
|
||||
if (call (domain->conn, priv, REMOTE_QEMU_CALL, QEMU_PROC_MONITOR_COMMAND,
|
||||
if (call (domain->conn, priv, REMOTE_CALL_QEMU, QEMU_PROC_MONITOR_COMMAND,
|
||||
(xdrproc_t) xdr_qemu_monitor_command_args, (char *) &args,
|
||||
(xdrproc_t) xdr_qemu_monitor_command_ret, (char *) &ret) == -1)
|
||||
goto done;
|
||||
|
@ -9113,7 +9268,7 @@ prepareCall(struct private_data *priv,
|
|||
rv->ret = ret;
|
||||
rv->want_reply = 1;
|
||||
|
||||
if (flags & REMOTE_QEMU_CALL) {
|
||||
if (flags & REMOTE_CALL_QEMU) {
|
||||
hdr.prog = QEMU_PROGRAM;
|
||||
hdr.vers = QEMU_PROTOCOL_VERSION;
|
||||
}
|
||||
|
@ -9506,7 +9661,7 @@ processCallDispatch(virConnectPtr conn, struct private_data *priv,
|
|||
|
||||
expectedprog = REMOTE_PROGRAM;
|
||||
expectedvers = REMOTE_PROTOCOL_VERSION;
|
||||
if (flags & REMOTE_QEMU_CALL) {
|
||||
if (flags & REMOTE_CALL_QEMU) {
|
||||
expectedprog = QEMU_PROGRAM;
|
||||
expectedvers = QEMU_PROTOCOL_VERSION;
|
||||
}
|
||||
|
@ -9732,6 +9887,7 @@ processCallDispatchStream(virConnectPtr conn ATTRIBUTE_UNUSED,
|
|||
thecall->mode = REMOTE_MODE_COMPLETE;
|
||||
} else {
|
||||
VIR_WARN("Got aysnc data packet offset=%d", privst->incomingOffset);
|
||||
remoteStreamEventTimerUpdate(privst);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue