Convert daemon/events.c to use virMutex and virThread

The code currently uses pthreads APIs directly. This is not
portable to Win32 threads. Switch it over to use the portability
APIs. Also add a wrapper for pipe() which is subtely different
on Win32

* daemon/event.c: Switch to use virMutex & virThread.
This commit is contained in:
Daniel P. Berrange 2010-11-02 15:56:44 +00:00
parent 64d6750709
commit 77960c0e9d
1 changed files with 42 additions and 45 deletions

View File

@ -69,9 +69,9 @@ struct virEventTimeout {
/* State for the main event loop */ /* State for the main event loop */
struct virEventLoop { struct virEventLoop {
pthread_mutex_t lock; virMutex lock;
int running; int running;
pthread_t leader; virThread leader;
int wakeupfd[2]; int wakeupfd[2];
int handlesCount; int handlesCount;
int handlesAlloc; int handlesAlloc;
@ -90,16 +90,6 @@ static int nextWatch = 1;
/* Unique ID for the next timer to be registered */ /* Unique ID for the next timer to be registered */
static int nextTimer = 1; static int nextTimer = 1;
static void virEventLock(void)
{
pthread_mutex_lock(&eventLoop.lock);
}
static void virEventUnlock(void)
{
pthread_mutex_unlock(&eventLoop.lock);
}
/* /*
* Register a callback for monitoring file handle events. * Register a callback for monitoring file handle events.
* NB, it *must* be safe to call this from within a callback * NB, it *must* be safe to call this from within a callback
@ -111,13 +101,13 @@ int virEventAddHandleImpl(int fd, int events,
virFreeCallback ff) { virFreeCallback ff) {
int watch; int watch;
EVENT_DEBUG("Add handle fd=%d events=%d cb=%p opaque=%p", fd, events, cb, opaque); EVENT_DEBUG("Add handle fd=%d events=%d cb=%p opaque=%p", fd, events, cb, opaque);
virEventLock(); virMutexLock(&eventLoop.lock);
if (eventLoop.handlesCount == eventLoop.handlesAlloc) { if (eventLoop.handlesCount == eventLoop.handlesAlloc) {
EVENT_DEBUG("Used %d handle slots, adding %d more", EVENT_DEBUG("Used %d handle slots, adding %d more",
eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT); eventLoop.handlesAlloc, EVENT_ALLOC_EXTENT);
if (VIR_REALLOC_N(eventLoop.handles, if (VIR_REALLOC_N(eventLoop.handles,
(eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0) { (eventLoop.handlesAlloc + EVENT_ALLOC_EXTENT)) < 0) {
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return -1; return -1;
} }
eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT; eventLoop.handlesAlloc += EVENT_ALLOC_EXTENT;
@ -137,7 +127,7 @@ int virEventAddHandleImpl(int fd, int events,
eventLoop.handlesCount++; eventLoop.handlesCount++;
virEventInterruptLocked(); virEventInterruptLocked();
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return watch; return watch;
} }
@ -151,7 +141,7 @@ void virEventUpdateHandleImpl(int watch, int events) {
return; return;
} }
virEventLock(); virMutexLock(&eventLoop.lock);
for (i = 0 ; i < eventLoop.handlesCount ; i++) { for (i = 0 ; i < eventLoop.handlesCount ; i++) {
if (eventLoop.handles[i].watch == watch) { if (eventLoop.handles[i].watch == watch) {
eventLoop.handles[i].events = eventLoop.handles[i].events =
@ -160,7 +150,7 @@ void virEventUpdateHandleImpl(int watch, int events) {
break; break;
} }
} }
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
} }
/* /*
@ -178,7 +168,7 @@ int virEventRemoveHandleImpl(int watch) {
return -1; return -1;
} }
virEventLock(); virMutexLock(&eventLoop.lock);
for (i = 0 ; i < eventLoop.handlesCount ; i++) { for (i = 0 ; i < eventLoop.handlesCount ; i++) {
if (eventLoop.handles[i].deleted) if (eventLoop.handles[i].deleted)
continue; continue;
@ -187,11 +177,11 @@ int virEventRemoveHandleImpl(int watch) {
EVENT_DEBUG("mark delete %d %d", i, eventLoop.handles[i].fd); EVENT_DEBUG("mark delete %d %d", i, eventLoop.handles[i].fd);
eventLoop.handles[i].deleted = 1; eventLoop.handles[i].deleted = 1;
virEventInterruptLocked(); virEventInterruptLocked();
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return 0; return 0;
} }
} }
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return -1; return -1;
} }
@ -212,13 +202,13 @@ int virEventAddTimeoutImpl(int frequency,
return -1; return -1;
} }
virEventLock(); virMutexLock(&eventLoop.lock);
if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) { if (eventLoop.timeoutsCount == eventLoop.timeoutsAlloc) {
EVENT_DEBUG("Used %d timeout slots, adding %d more", EVENT_DEBUG("Used %d timeout slots, adding %d more",
eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT); eventLoop.timeoutsAlloc, EVENT_ALLOC_EXTENT);
if (VIR_REALLOC_N(eventLoop.timeouts, if (VIR_REALLOC_N(eventLoop.timeouts,
(eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0) { (eventLoop.timeoutsAlloc + EVENT_ALLOC_EXTENT)) < 0) {
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return -1; return -1;
} }
eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT; eventLoop.timeoutsAlloc += EVENT_ALLOC_EXTENT;
@ -238,7 +228,7 @@ int virEventAddTimeoutImpl(int frequency,
eventLoop.timeoutsCount++; eventLoop.timeoutsCount++;
ret = nextTimer-1; ret = nextTimer-1;
virEventInterruptLocked(); virEventInterruptLocked();
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return ret; return ret;
} }
@ -256,7 +246,7 @@ void virEventUpdateTimeoutImpl(int timer, int frequency) {
return; return;
} }
virEventLock(); virMutexLock(&eventLoop.lock);
for (i = 0 ; i < eventLoop.timeoutsCount ; i++) { for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
if (eventLoop.timeouts[i].timer == timer) { if (eventLoop.timeouts[i].timer == timer) {
eventLoop.timeouts[i].frequency = frequency; eventLoop.timeouts[i].frequency = frequency;
@ -268,7 +258,7 @@ void virEventUpdateTimeoutImpl(int timer, int frequency) {
break; break;
} }
} }
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
} }
/* /*
@ -286,7 +276,7 @@ int virEventRemoveTimeoutImpl(int timer) {
return -1; return -1;
} }
virEventLock(); virMutexLock(&eventLoop.lock);
for (i = 0 ; i < eventLoop.timeoutsCount ; i++) { for (i = 0 ; i < eventLoop.timeoutsCount ; i++) {
if (eventLoop.timeouts[i].deleted) if (eventLoop.timeouts[i].deleted)
continue; continue;
@ -294,11 +284,11 @@ int virEventRemoveTimeoutImpl(int timer) {
if (eventLoop.timeouts[i].timer == timer) { if (eventLoop.timeouts[i].timer == timer) {
eventLoop.timeouts[i].deleted = 1; eventLoop.timeouts[i].deleted = 1;
virEventInterruptLocked(); virEventInterruptLocked();
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return 0; return 0;
} }
} }
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return -1; return -1;
} }
@ -426,9 +416,9 @@ static int virEventDispatchTimeouts(void) {
eventLoop.timeouts[i].expiresAt = eventLoop.timeouts[i].expiresAt =
now + eventLoop.timeouts[i].frequency; now + eventLoop.timeouts[i].frequency;
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
(cb)(timer, opaque); (cb)(timer, opaque);
virEventLock(); virMutexLock(&eventLoop.lock);
} }
} }
return 0; return 0;
@ -475,10 +465,10 @@ static int virEventDispatchHandles(int nfds, struct pollfd *fds) {
EVENT_DEBUG("Dispatch n=%d f=%d w=%d e=%d %p", i, EVENT_DEBUG("Dispatch n=%d f=%d w=%d e=%d %p", i,
fds[n].fd, eventLoop.handles[i].watch, fds[n].fd, eventLoop.handles[i].watch,
fds[n].revents, eventLoop.handles[i].opaque); fds[n].revents, eventLoop.handles[i].opaque);
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
(cb)(eventLoop.handles[i].watch, (cb)(eventLoop.handles[i].watch,
fds[n].fd, hEvents, opaque); fds[n].fd, hEvents, opaque);
virEventLock(); virMutexLock(&eventLoop.lock);
} }
} }
@ -575,9 +565,9 @@ int virEventRunOnce(void) {
struct pollfd *fds = NULL; struct pollfd *fds = NULL;
int ret, timeout, nfds; int ret, timeout, nfds;
virEventLock(); virMutexLock(&eventLoop.lock);
eventLoop.running = 1; eventLoop.running = 1;
eventLoop.leader = pthread_self(); virThreadSelf(&eventLoop.leader);
if (virEventCleanupTimeouts() < 0 || if (virEventCleanupTimeouts() < 0 ||
virEventCleanupHandles() < 0) virEventCleanupHandles() < 0)
@ -587,7 +577,7 @@ int virEventRunOnce(void) {
virEventCalculateTimeout(&timeout) < 0) virEventCalculateTimeout(&timeout) < 0)
goto error; goto error;
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
retry: retry:
EVENT_DEBUG("Poll on %d handles %p timeout %d", nfds, fds, timeout); EVENT_DEBUG("Poll on %d handles %p timeout %d", nfds, fds, timeout);
@ -600,7 +590,7 @@ int virEventRunOnce(void) {
goto error_unlocked; goto error_unlocked;
} }
virEventLock(); virMutexLock(&eventLoop.lock);
if (virEventDispatchTimeouts() < 0) if (virEventDispatchTimeouts() < 0)
goto error; goto error;
@ -613,31 +603,38 @@ int virEventRunOnce(void) {
goto error; goto error;
eventLoop.running = 0; eventLoop.running = 0;
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
VIR_FREE(fds); VIR_FREE(fds);
return 0; return 0;
error: error:
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
error_unlocked: error_unlocked:
VIR_FREE(fds); VIR_FREE(fds);
return -1; return -1;
} }
static void virEventHandleWakeup(int watch ATTRIBUTE_UNUSED, static void virEventHandleWakeup(int watch ATTRIBUTE_UNUSED,
int fd, int fd,
int events ATTRIBUTE_UNUSED, int events ATTRIBUTE_UNUSED,
void *opaque ATTRIBUTE_UNUSED) void *opaque ATTRIBUTE_UNUSED)
{ {
char c; char c;
virEventLock(); virMutexLock(&eventLoop.lock);
ignore_value(saferead(fd, &c, sizeof(c))); ignore_value(saferead(fd, &c, sizeof(c)));
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
} }
#ifdef WIN32
static inline int pipe(int fd[2]) {
return _pipe(fd, 4096, 0);
}
#endif
int virEventInit(void) int virEventInit(void)
{ {
if (pthread_mutex_init(&eventLoop.lock, NULL) != 0) if (virMutexInit(&eventLoop.lock) < 0)
return -1; return -1;
if (pipe(eventLoop.wakeupfd) < 0 || if (pipe(eventLoop.wakeupfd) < 0 ||
@ -660,8 +657,8 @@ static int virEventInterruptLocked(void)
char c = '\0'; char c = '\0';
if (!eventLoop.running || if (!eventLoop.running ||
pthread_self() == eventLoop.leader) { virThreadIsSelf(&eventLoop.leader)) {
VIR_DEBUG("Skip interrupt, %d %d", eventLoop.running, (int)eventLoop.leader); VIR_DEBUG("Skip interrupt, %d %d", eventLoop.running, (int)eventLoop.leader.thread);
return 0; return 0;
} }
@ -674,9 +671,9 @@ static int virEventInterruptLocked(void)
int virEventInterrupt(void) int virEventInterrupt(void)
{ {
int ret; int ret;
virEventLock(); virMutexLock(&eventLoop.lock);
ret = virEventInterruptLocked(); ret = virEventInterruptLocked();
virEventUnlock(); virMutexUnlock(&eventLoop.lock);
return ret; return ret;
} }