virFDStreamData: Turn into virObjectLockable

While this is no functional change, it makes the code look a bit
nicer. Moreover, it prepares ground for future work.

Signed-off-by: Michal Privoznik <mprivozn@redhat.com>
Reviewed-by: John Ferlan <jferlan@redhat.com>
This commit is contained in:
Michal Privoznik 2016-06-16 12:07:41 +02:00
parent 58667ddd5b
commit 585eb46920
1 changed files with 57 additions and 40 deletions

View File

@ -53,6 +53,8 @@ VIR_LOG_INIT("fdstream");
typedef struct virFDStreamData virFDStreamData;
typedef virFDStreamData *virFDStreamDataPtr;
struct virFDStreamData {
virObjectLockable parent;
int fd;
int errfd;
virCommandPtr cmd;
@ -77,10 +79,31 @@ struct virFDStreamData {
virFDStreamInternalCloseCb icbCb;
virFDStreamInternalCloseCbFreeOpaque icbFreeOpaque;
void *icbOpaque;
virMutex lock;
};
static virClassPtr virFDStreamDataClass;
static void
virFDStreamDataDispose(void *obj)
{
virFDStreamDataPtr fdst = obj;
VIR_DEBUG("obj=%p", fdst);
}
static int virFDStreamDataOnceInit(void)
{
if (!(virFDStreamDataClass = virClassNew(virClassForObjectLockable(),
"virFDStreamData",
sizeof(virFDStreamData),
virFDStreamDataDispose)))
return -1;
return 0;
}
VIR_ONCE_GLOBAL_INIT(virFDStreamData)
static int virFDStreamRemoveCallback(virStreamPtr stream)
{
@ -93,7 +116,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream)
return -1;
}
virMutexLock(&fdst->lock);
virObjectLock(fdst);
if (fdst->watch == 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream does not have a callback registered"));
@ -115,7 +138,7 @@ static int virFDStreamRemoveCallback(virStreamPtr stream)
ret = 0;
cleanup:
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return ret;
}
@ -130,7 +153,7 @@ static int virFDStreamUpdateCallback(virStreamPtr stream, int events)
return -1;
}
virMutexLock(&fdst->lock);
virObjectLock(fdst);
if (fdst->watch == 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream does not have a callback registered"));
@ -143,7 +166,7 @@ static int virFDStreamUpdateCallback(virStreamPtr stream, int events)
ret = 0;
cleanup:
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return ret;
}
@ -162,9 +185,9 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED,
if (!fdst)
return;
virMutexLock(&fdst->lock);
virObjectLock(fdst);
if (!fdst->cb) {
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return;
}
@ -172,21 +195,19 @@ static void virFDStreamEvent(int watch ATTRIBUTE_UNUSED,
cbopaque = fdst->opaque;
ff = fdst->ff;
fdst->dispatching = true;
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
cb(stream, events, cbopaque);
virMutexLock(&fdst->lock);
virObjectLock(fdst);
fdst->dispatching = false;
if (fdst->cbRemoved && ff)
(ff)(cbopaque);
closed = fdst->closed;
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
if (closed) {
virMutexDestroy(&fdst->lock);
VIR_FREE(fdst);
}
if (closed)
virObjectUnref(fdst);
}
static void virFDStreamCallbackFree(void *opaque)
@ -211,7 +232,7 @@ virFDStreamAddCallback(virStreamPtr st,
return -1;
}
virMutexLock(&fdst->lock);
virObjectLock(fdst);
if (fdst->watch != 0) {
virReportError(VIR_ERR_INTERNAL_ERROR,
"%s", _("stream already has a callback registered"));
@ -239,7 +260,7 @@ virFDStreamAddCallback(virStreamPtr st,
ret = 0;
cleanup:
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return ret;
}
@ -307,7 +328,7 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
if (!st || !(fdst = st->privateData) || fdst->abortCallbackDispatching)
return 0;
virMutexLock(&fdst->lock);
virObjectLock(fdst);
/* aborting the stream, ensure the callback is called if it's
* registered for stream error event */
@ -317,7 +338,7 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
VIR_STREAM_EVENT_WRITABLE))) {
/* don't enter this function accidentally from the callback again */
if (fdst->abortCallbackCalled) {
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return 0;
}
@ -327,12 +348,12 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
/* cache the pointers */
cb = fdst->cb;
opaque = fdst->opaque;
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
/* call failure callback, poll reports nothing on closed fd */
(cb)(st, VIR_STREAM_EVENT_ERROR, opaque);
virMutexLock(&fdst->lock);
virObjectLock(fdst);
fdst->abortCallbackDispatching = false;
}
@ -356,11 +377,10 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
if (fdst->dispatching) {
fdst->closed = true;
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
} else {
virMutexUnlock(&fdst->lock);
virMutexDestroy(&fdst->lock);
VIR_FREE(fdst);
virObjectUnlock(fdst);
virObjectUnref(fdst);
}
return ret;
@ -395,13 +415,13 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
return -1;
}
virMutexLock(&fdst->lock);
virObjectLock(fdst);
if (fdst->length) {
if (fdst->length == fdst->offset) {
virReportSystemError(ENOSPC, "%s",
_("cannot write to stream"));
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return -1;
}
@ -427,7 +447,7 @@ static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t nbytes)
fdst->offset += ret;
}
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return ret;
}
@ -449,11 +469,11 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
return -1;
}
virMutexLock(&fdst->lock);
virObjectLock(fdst);
if (fdst->length) {
if (fdst->length == fdst->offset) {
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return 0;
}
@ -479,7 +499,7 @@ static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
fdst->offset += ret;
}
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return ret;
}
@ -505,25 +525,22 @@ static int virFDStreamOpenInternal(virStreamPtr st,
VIR_DEBUG("st=%p fd=%d cmd=%p errfd=%d length=%llu",
st, fd, cmd, errfd, length);
if (virFDStreamDataInitialize() < 0)
return -1;
if ((st->flags & VIR_STREAM_NONBLOCK) &&
virSetNonBlock(fd) < 0) {
virReportSystemError(errno, "%s", _("Unable to set non-blocking mode"));
return -1;
}
if (VIR_ALLOC(fdst) < 0)
if (!(fdst = virObjectLockableNew(virFDStreamDataClass)))
return -1;
fdst->fd = fd;
fdst->cmd = cmd;
fdst->errfd = errfd;
fdst->length = length;
if (virMutexInit(&fdst->lock) < 0) {
VIR_FREE(fdst);
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("Unable to initialize mutex"));
return -1;
}
st->driver = &virFDStreamDrv;
st->privateData = fdst;
@ -821,7 +838,7 @@ int virFDStreamSetInternalCloseCb(virStreamPtr st,
{
virFDStreamDataPtr fdst = st->privateData;
virMutexLock(&fdst->lock);
virObjectLock(fdst);
if (fdst->icbFreeOpaque)
(fdst->icbFreeOpaque)(fdst->icbOpaque);
@ -830,6 +847,6 @@ int virFDStreamSetInternalCloseCb(virStreamPtr st,
fdst->icbOpaque = opaque;
fdst->icbFreeOpaque = fcb;
virMutexUnlock(&fdst->lock);
virObjectUnlock(fdst);
return 0;
}