Fixes ADB crash on Windows due to large number of connections.

The reason for the crash is that ADB on Windows uses WaitForMultipleObjects to
wait on connection events. When number of connections exceeds 64, ADB crashes,
because WaitForMultipleObjects API doesn't support more than 64 handles. This
CL contains a fixer routine that allows waiting on an arbitrary number of
handles.

Change-Id: I83f712e552018df308318154c27df184015a16ee
This commit is contained in:
Vladimir Chtchetkine 2011-11-30 10:20:27 -08:00
parent 5c9263d819
commit ac52833e48
1 changed files with 190 additions and 15 deletions

View File

@ -352,7 +352,7 @@ int adb_open(const char* path, int options)
return -1;
}
}
snprintf( f->name, sizeof(f->name), "%d(%s)", _fh_to_int(f), path );
D( "adb_open: '%s' => fd %d\n", path, _fh_to_int(f) );
return _fh_to_int(f);
@ -837,7 +837,7 @@ static void bip_dump_hex( const unsigned char* ptr, size_t len )
if (len2 > 8) len2 = 8;
for (nn = 0; nn < len2; nn++)
for (nn = 0; nn < len2; nn++)
printf("%02x", ptr[nn]);
printf(" ");
@ -994,7 +994,7 @@ Exit:
SetEvent( bip->evt_read );
}
BIPD(( "bip_buffer_write: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n",
BIPD(( "bip_buffer_write: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n",
bip->fdin, bip->fdout, count, bip->a_start, bip->a_end, bip->b_end, bip->can_write, bip->can_read ));
LeaveCriticalSection( &bip->lock );
@ -1018,7 +1018,7 @@ bip_buffer_read( BipBuffer bip, void* dst, int len )
LeaveCriticalSection( &bip->lock );
errno = EAGAIN;
return -1;
#else
#else
int ret;
LeaveCriticalSection( &bip->lock );
@ -1087,14 +1087,14 @@ Exit:
}
BIPDUMP( (const unsigned char*)dst - count, count );
BIPD(( "bip_buffer_read: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n",
BIPD(( "bip_buffer_read: exit %d->%d count %d (as=%d ae=%d be=%d cw=%d cr=%d\n",
bip->fdin, bip->fdout, count, bip->a_start, bip->a_end, bip->b_end, bip->can_write, bip->can_read ));
LeaveCriticalSection( &bip->lock );
return count;
}
typedef struct SocketPairRec_
typedef struct SocketPairRec_
{
BipBufferRec a2b_bip;
BipBufferRec b2a_bip;
@ -1400,7 +1400,7 @@ event_looper_hook( EventLooper looper, int fd, int events )
f->clazz->_fh_hook( f, events & ~node->wanted, node );
node->wanted |= events;
} else {
D("event_looper_hook: ignoring events %x for %d wanted=%x)\n",
D("event_looper_hook: ignoring events %x for %d wanted=%x)\n",
events, fd, node->wanted);
}
}
@ -1426,6 +1426,180 @@ event_looper_unhook( EventLooper looper, int fd, int events )
}
}
/*
* A fixer for WaitForMultipleObjects on condition that there are more than 64
* handles to wait on.
*
* In cetain cases DDMS may establish more than 64 connections with ADB. For
* instance, this may happen if there are more than 64 processes running on a
* device, or there are multiple devices connected (including the emulator) with
* the combined number of running processes greater than 64. In this case using
* WaitForMultipleObjects to wait on connection events simply wouldn't cut,
* because of the API limitations (64 handles max). So, we need to provide a way
* to scale WaitForMultipleObjects to accept an arbitrary number of handles. The
* easiest (and "Microsoft recommended") way to do that would be dividing the
* handle array into chunks with the chunk size less than 64, and fire up as many
* waiting threads as there are chunks. Then each thread would wait on a chunk of
* handles, and will report back to the caller which handle has been set.
* Here is the implementation of that algorithm.
*/
/* Number of handles to wait on in each wating thread. */
#define WAIT_ALL_CHUNK_SIZE 63
/* Descriptor for a wating thread */
typedef struct WaitForAllParam {
/* A handle to an event to signal when waiting is over. This handle is shared
* accross all the waiting threads, so each waiting thread knows when any
* other thread has exited, so it can exit too. */
HANDLE main_event;
/* Upon exit from a waiting thread contains the index of the handle that has
* been signaled. The index is an absolute index of the signaled handle in
* the original array. This pointer is shared accross all the waiting threads
* and it's not guaranteed (due to a race condition) that when all the
* waiting threads exit, the value contained here would indicate the first
* handle that was signaled. This is fine, because the caller cares only
* about any handle being signaled. It doesn't care about the order, nor
* about the whole list of handles that were signaled. */
LONG volatile *signaled_index;
/* Array of handles to wait on in a waiting thread. */
HANDLE* handles;
/* Number of handles in 'handles' array to wait on. */
int handles_count;
/* Index inside the main array of the first handle in the 'handles' array. */
int first_handle_index;
/* Waiting thread handle. */
HANDLE thread;
} WaitForAllParam;
/* Waiting thread routine. */
static unsigned __stdcall
_in_waiter_thread(void* arg)
{
HANDLE wait_on[WAIT_ALL_CHUNK_SIZE + 1];
int res;
WaitForAllParam* const param = (WaitForAllParam*)arg;
/* We have to wait on the main_event in order to be notified when any of the
* sibling threads is exiting. */
wait_on[0] = param->main_event;
/* The rest of the handles go behind the main event handle. */
memcpy(wait_on + 1, param->handles, param->handles_count * sizeof(HANDLE));
res = WaitForMultipleObjects(param->handles_count + 1, wait_on, FALSE, INFINITE);
if (res > 0 && res < (param->handles_count + 1)) {
/* One of the original handles got signaled. Save its absolute index into
* the output variable. */
InterlockedCompareExchange(param->signaled_index,
res - 1L + param->first_handle_index, -1L);
}
/* Notify the caller (and the siblings) that the wait is over. */
SetEvent(param->main_event);
_endthreadex(0);
return 0;
}
/* WaitForMultipeObjects fixer routine.
* Param:
* handles Array of handles to wait on.
* handles_count Number of handles in the array.
* Return:
* (>= 0 && < handles_count) - Index of the signaled handle in the array, or
* WAIT_FAILED on an error.
*/
static int
_wait_for_all(HANDLE* handles, int handles_count)
{
WaitForAllParam* threads;
HANDLE main_event;
int chunks, chunk, remains;
/* This variable is going to be accessed by several threads at the same time,
* this is bound to fail randomly when the core is run on multi-core machines.
* To solve this, we need to do the following (1 _and_ 2):
* 1. Use the "volatile" qualifier to ensure the compiler doesn't optimize
* out the reads/writes in this function unexpectedly.
* 2. Ensure correct memory ordering. The "simple" way to do that is to wrap
* all accesses inside a critical section. But we can also use
* InterlockedCompareExchange() which always provide a full memory barrier
* on Win32.
*/
volatile LONG sig_index = -1;
/* Calculate number of chunks, and allocate thread param array. */
chunks = handles_count / WAIT_ALL_CHUNK_SIZE;
remains = handles_count % WAIT_ALL_CHUNK_SIZE;
threads = (WaitForAllParam*)malloc((chunks + (remains ? 1 : 0)) *
sizeof(WaitForAllParam));
if (threads == NULL) {
D("Unable to allocate thread array for %d handles.", handles_count);
return (int)WAIT_FAILED;
}
/* Create main event to wait on for all waiting threads. This is a "manualy
* reset" event that will remain set once it was set. */
main_event = CreateEvent(NULL, TRUE, FALSE, NULL);
if (main_event == NULL) {
D("Unable to create main event. Error: %d", GetLastError());
free(threads);
return (int)WAIT_FAILED;
}
/*
* Initialize waiting thread parameters.
*/
for (chunk = 0; chunk < chunks; chunk++) {
threads[chunk].main_event = main_event;
threads[chunk].signaled_index = &sig_index;
threads[chunk].first_handle_index = WAIT_ALL_CHUNK_SIZE * chunk;
threads[chunk].handles = handles + threads[chunk].first_handle_index;
threads[chunk].handles_count = WAIT_ALL_CHUNK_SIZE;
}
if (remains) {
threads[chunk].main_event = main_event;
threads[chunk].signaled_index = &sig_index;
threads[chunk].first_handle_index = WAIT_ALL_CHUNK_SIZE * chunk;
threads[chunk].handles = handles + threads[chunk].first_handle_index;
threads[chunk].handles_count = remains;
chunks++;
}
/* Start the waiting threads. */
for (chunk = 0; chunk < chunks; chunk++) {
/* Note that using adb_thread_create is not appropriate here, since we
* need a handle to wait on for thread termination. */
threads[chunk].thread = (HANDLE)_beginthreadex(NULL, 0, _in_waiter_thread,
&threads[chunk], 0, NULL);
if (threads[chunk].thread == NULL) {
/* Unable to create a waiter thread. Collapse. */
D("Unable to create a waiting thread %d of %d. errno=%d",
chunk, chunks, errno);
chunks = chunk;
SetEvent(main_event);
break;
}
}
/* Wait on any of the threads to get signaled. */
WaitForSingleObject(main_event, INFINITE);
/* Wait on all the waiting threads to exit. */
for (chunk = 0; chunk < chunks; chunk++) {
WaitForSingleObject(threads[chunk].thread, INFINITE);
CloseHandle(threads[chunk].thread);
}
CloseHandle(main_event);
free(threads);
const int ret = (int)InterlockedCompareExchange(&sig_index, -1, -1);
return (ret >= 0) ? ret : (int)WAIT_FAILED;
}
static EventLooperRec win32_looper;
static void fdevent_init(void)
@ -1494,7 +1668,7 @@ static void fdevent_process()
{
looper->htab_count = 0;
for (hook = looper->hooks; hook; hook = hook->next)
for (hook = looper->hooks; hook; hook = hook->next)
{
if (hook->start && !hook->start(hook)) {
D( "fdevent_process: error when starting a hook\n" );
@ -1525,10 +1699,11 @@ static void fdevent_process()
D( "adb_win32: waiting for %d events\n", looper->htab_count );
if (looper->htab_count > MAXIMUM_WAIT_OBJECTS) {
D("handle count %d exceeds MAXIMUM_WAIT_OBJECTS, aborting!\n", looper->htab_count);
abort();
D("handle count %d exceeds MAXIMUM_WAIT_OBJECTS.\n", looper->htab_count);
wait_ret = _wait_for_all(looper->htab, looper->htab_count);
} else {
wait_ret = WaitForMultipleObjects( looper->htab_count, looper->htab, FALSE, INFINITE );
}
wait_ret = WaitForMultipleObjects( looper->htab_count, looper->htab, FALSE, INFINITE );
if (wait_ret == (int)WAIT_FAILED) {
D( "adb_win32: wait failed, error %ld\n", GetLastError() );
} else {
@ -1669,7 +1844,7 @@ void fdevent_destroy(fdevent *fde)
fdevent_remove(fde);
}
void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg)
void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg)
{
memset(fde, 0, sizeof(fdevent));
fde->state = FDE_ACTIVE;
@ -1691,7 +1866,7 @@ void fdevent_remove(fdevent *fde)
if(fde->state & FDE_ACTIVE) {
fdevent_disconnect(fde);
dump_fde(fde, "disconnect");
dump_fde(fde, "disconnect");
fdevent_unregister(fde);
}
@ -1917,7 +2092,7 @@ static void _event_socketpair_prepare( EventHook hook )
if (hook->wanted & FDE_READ && rbip->can_read)
hook->ready |= FDE_READ;
if (hook->wanted & FDE_WRITE && wbip->can_write)
if (hook->wanted & FDE_WRITE && wbip->can_write)
hook->ready |= FDE_WRITE;
}
@ -1938,7 +2113,7 @@ static void _event_socketpair_prepare( EventHook hook )
D("_event_socketpair_start: can't handle FDE_READ+FDE_WRITE\n" );
return 0;
}
D( "_event_socketpair_start: hook %s for %x wanted=%x\n",
D( "_event_socketpair_start: hook %s for %x wanted=%x\n",
hook->fh->name, _fh_to_int(fh), hook->wanted);
return 1;
}