Merge "adb: move adb to poll."

This commit is contained in:
Yabin Cui 2015-09-15 18:40:51 +00:00 committed by Gerrit Code Review
commit fa965d9639
3 changed files with 283 additions and 597 deletions

View File

@ -177,6 +177,7 @@ static void setup_trace_mask() {
{"jdwp", TRACE_JDWP},
{"services", TRACE_SERVICES},
{"auth", TRACE_AUTH},
{"fdevent", TRACE_FDEVENT},
{"shell", TRACE_SHELL}};
std::vector<std::string> elements = android::base::Split(trace_setting, " ");

View File

@ -20,59 +20,30 @@
#include "sysdeps.h"
#include "fdevent.h"
#include <errno.h>
#include <fcntl.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdio.h>
#include <poll.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <list>
#include <unordered_map>
#include <vector>
#include <base/logging.h>
#include <base/stringprintf.h>
#include "adb_io.h"
#include "adb_trace.h"
/* !!! Do not enable DEBUG for the adb that will run as the server:
** both stdout and stderr are used to communicate between the client
** and server. Any extra output will cause failures.
*/
#define DEBUG 0 /* non-0 will break adb server */
#if !ADB_HOST
// This socket is used when a subproc shell service exists.
// It wakes up the fdevent_loop() and cause the correct handling
// of the shell's pseudo-tty master. I.e. force close it.
#if !ADB_HOST
int SHELL_EXIT_NOTIFY_FD = -1;
#endif // !ADB_HOST
static void fatal(const char *fn, const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
fprintf(stderr, "%s:", fn);
vfprintf(stderr, fmt, ap);
va_end(ap);
abort();
}
#define FATAL(x...) fatal(__FUNCTION__, x)
#if DEBUG
static void dump_fde(fdevent *fde, const char *info)
{
adb_mutex_lock(&D_lock);
fprintf(stderr,"FDE #%03d %c%c%c %s\n", fde->fd,
fde->state & FDE_READ ? 'R' : ' ',
fde->state & FDE_WRITE ? 'W' : ' ',
fde->state & FDE_ERROR ? 'E' : ' ',
info);
adb_mutex_unlock(&D_lock);
}
#else
#define dump_fde(fde, info) do { } while(0)
#endif
#define FDE_EVENTMASK 0x00ff
#define FDE_STATEMASK 0xff00
@ -80,515 +51,47 @@ static void dump_fde(fdevent *fde, const char *info)
#define FDE_PENDING 0x0200
#define FDE_CREATED 0x0400
static void fdevent_plist_enqueue(fdevent *node);
static void fdevent_plist_remove(fdevent *node);
static fdevent *fdevent_plist_dequeue(void);
struct PollNode {
fdevent* fde;
pollfd pollfd;
static fdevent list_pending = {
.next = &list_pending,
.prev = &list_pending,
.fd = -1,
.force_eof = 0,
.state = 0,
.events = 0,
.func = nullptr,
.arg = nullptr,
PollNode(fdevent* fde) : fde(fde) {
memset(&pollfd, 0, sizeof(pollfd));
pollfd.fd = fde->fd;
}
};
static fdevent **fd_table = 0;
static int fd_table_max = 0;
// All operations to fdevent should happen only in the main thread.
// That's why we don't need a lock for fdevent.
static std::unordered_map<int, PollNode> g_poll_node_map;
static std::list<fdevent*> g_pending_list;
#ifdef CRAPTASTIC
//HAVE_EPOLL
#include <sys/epoll.h>
static int epoll_fd = -1;
static void fdevent_init()
{
/* XXX: what's a good size for the passed in hint? */
epoll_fd = epoll_create(256);
if(epoll_fd < 0) {
perror("epoll_create() failed");
exit(1);
static std::string dump_fde(const fdevent* fde) {
std::string state;
if (fde->state & FDE_ACTIVE) {
state += "A";
}
/* mark for close-on-exec */
fcntl(epoll_fd, F_SETFD, FD_CLOEXEC);
if (fde->state & FDE_PENDING) {
state += "P";
}
if (fde->state & FDE_CREATED) {
state += "C";
}
if (fde->state & FDE_READ) {
state += "R";
}
if (fde->state & FDE_WRITE) {
state += "W";
}
if (fde->state & FDE_ERROR) {
state += "E";
}
if (fde->state & FDE_DONT_CLOSE) {
state += "D";
}
return android::base::StringPrintf("(fdevent %d %s)", fde->fd, state.c_str());
}
static void fdevent_connect(fdevent *fde)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = 0;
ev.data.ptr = fde;
#if 0
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fde->fd, &ev)) {
perror("epoll_ctl() failed\n");
exit(1);
}
#endif
}
static void fdevent_disconnect(fdevent *fde)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = 0;
ev.data.ptr = fde;
/* technically we only need to delete if we
** were actively monitoring events, but let's
** be aggressive and do it anyway, just in case
** something's out of sync
*/
epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fde->fd, &ev);
}
static void fdevent_update(fdevent *fde, unsigned events)
{
struct epoll_event ev;
int active;
active = (fde->state & FDE_EVENTMASK) != 0;
memset(&ev, 0, sizeof(ev));
ev.events = 0;
ev.data.ptr = fde;
if(events & FDE_READ) ev.events |= EPOLLIN;
if(events & FDE_WRITE) ev.events |= EPOLLOUT;
if(events & FDE_ERROR) ev.events |= (EPOLLERR | EPOLLHUP);
fde->state = (fde->state & FDE_STATEMASK) | events;
if(active) {
/* we're already active. if we're changing to *no*
** events being monitored, we need to delete, otherwise
** we need to just modify
*/
if(ev.events) {
if(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fde->fd, &ev)) {
perror("epoll_ctl() failed\n");
exit(1);
}
} else {
if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fde->fd, &ev)) {
perror("epoll_ctl() failed\n");
exit(1);
}
}
} else {
/* we're not active. if we're watching events, we need
** to add, otherwise we can just do nothing
*/
if(ev.events) {
if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fde->fd, &ev)) {
perror("epoll_ctl() failed\n");
exit(1);
}
}
}
}
static void fdevent_process()
{
struct epoll_event events[256];
fdevent *fde;
int i, n;
n = epoll_wait(epoll_fd, events, 256, -1);
if (n < 0) {
if (errno == EINTR) return;
perror("epoll_wait");
exit(1);
}
for(i = 0; i < n; i++) {
struct epoll_event *ev = events + i;
fde = ev->data.ptr;
if(ev->events & EPOLLIN) {
fde->events |= FDE_READ;
}
if(ev->events & EPOLLOUT) {
fde->events |= FDE_WRITE;
}
if(ev->events & (EPOLLERR | EPOLLHUP)) {
fde->events |= FDE_ERROR;
}
if(fde->events) {
if(fde->state & FDE_PENDING) continue;
fde->state |= FDE_PENDING;
fdevent_plist_enqueue(fde);
}
}
}
#else /* USE_SELECT */
#if defined(_WIN32)
#include <winsock2.h>
#else
#include <sys/select.h>
#endif
static fd_set read_fds;
static fd_set write_fds;
static fd_set error_fds;
static int select_n = 0;
static void fdevent_init(void)
{
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
FD_ZERO(&error_fds);
}
static void fdevent_connect(fdevent *fde)
{
if(fde->fd >= select_n) {
select_n = fde->fd + 1;
}
}
static void fdevent_disconnect(fdevent *fde)
{
int i, n;
FD_CLR(fde->fd, &read_fds);
FD_CLR(fde->fd, &write_fds);
FD_CLR(fde->fd, &error_fds);
for(n = 0, i = 0; i < select_n; i++) {
if(fd_table[i] != 0) n = i;
}
select_n = n + 1;
}
static void fdevent_update(fdevent *fde, unsigned events)
{
if(events & FDE_READ) {
FD_SET(fde->fd, &read_fds);
} else {
FD_CLR(fde->fd, &read_fds);
}
if(events & FDE_WRITE) {
FD_SET(fde->fd, &write_fds);
} else {
FD_CLR(fde->fd, &write_fds);
}
if(events & FDE_ERROR) {
FD_SET(fde->fd, &error_fds);
} else {
FD_CLR(fde->fd, &error_fds);
}
fde->state = (fde->state & FDE_STATEMASK) | events;
}
/* Looks at fd_table[] for bad FDs and sets bit in fds.
** Returns the number of bad FDs.
*/
static int fdevent_fd_check(fd_set *fds)
{
int i, n = 0;
fdevent *fde;
for(i = 0; i < select_n; i++) {
fde = fd_table[i];
if(fde == 0) continue;
if(fcntl(i, F_GETFL, NULL) < 0) {
FD_SET(i, fds);
n++;
// fde->state |= FDE_DONT_CLOSE;
}
}
return n;
}
#if !DEBUG
static inline void dump_all_fds(const char* /* extra_msg */) {}
#else
static void dump_all_fds(const char *extra_msg)
{
int i;
fdevent *fde;
// per fd: 4 digits (but really: log10(FD_SETSIZE)), 1 staus, 1 blank
char msg_buff[FD_SETSIZE*6 + 1], *pb=msg_buff;
size_t max_chars = FD_SETSIZE * 6 + 1;
int printed_out;
#define SAFE_SPRINTF(...) \
do { \
printed_out = snprintf(pb, max_chars, __VA_ARGS__); \
if (printed_out <= 0) { \
D("... snprintf failed."); \
return; \
} \
if (max_chars < (unsigned int)printed_out) { \
D("... snprintf out of space."); \
return; \
} \
pb += printed_out; \
max_chars -= printed_out; \
} while(0)
for(i = 0; i < select_n; i++) {
fde = fd_table[i];
SAFE_SPRINTF("%d", i);
if(fde == 0) {
SAFE_SPRINTF("? ");
continue;
}
if(fcntl(i, F_GETFL, NULL) < 0) {
SAFE_SPRINTF("b");
}
SAFE_SPRINTF(" ");
}
D("%s fd_table[]->fd = {%s}", extra_msg, msg_buff);
}
#endif
static void fdevent_process()
{
int i, n;
fdevent *fde;
unsigned events;
fd_set rfd, wfd, efd;
memcpy(&rfd, &read_fds, sizeof(fd_set));
memcpy(&wfd, &write_fds, sizeof(fd_set));
memcpy(&efd, &error_fds, sizeof(fd_set));
dump_all_fds("pre select()");
n = select(select_n, &rfd, &wfd, &efd, NULL);
int saved_errno = errno;
D("select() returned n=%d, errno=%d", n, n<0?saved_errno:0);
dump_all_fds("post select()");
if(n < 0) {
switch(saved_errno) {
case EINTR: return;
case EBADF:
// Can't trust the FD sets after an error.
FD_ZERO(&wfd);
FD_ZERO(&efd);
FD_ZERO(&rfd);
break;
default:
D("Unexpected select() error=%d", saved_errno);
return;
}
}
if(n <= 0) {
// We fake a read, as the rest of the code assumes
// that errors will be detected at that point.
n = fdevent_fd_check(&rfd);
}
for(i = 0; (i < select_n) && (n > 0); i++) {
events = 0;
if(FD_ISSET(i, &rfd)) { events |= FDE_READ; n--; }
if(FD_ISSET(i, &wfd)) { events |= FDE_WRITE; n--; }
if(FD_ISSET(i, &efd)) { events |= FDE_ERROR; n--; }
if(events) {
fde = fd_table[i];
if(fde == 0)
FATAL("missing fde for fd %d\n", i);
fde->events |= events;
D("got events fde->fd=%d events=%04x, state=%04x",
fde->fd, fde->events, fde->state);
if(fde->state & FDE_PENDING) continue;
fde->state |= FDE_PENDING;
fdevent_plist_enqueue(fde);
}
}
}
#endif
static void fdevent_register(fdevent *fde)
{
if(fde->fd < 0) {
FATAL("bogus negative fd (%d)\n", fde->fd);
}
if(fde->fd >= fd_table_max) {
int oldmax = fd_table_max;
if(fde->fd > 32000) {
FATAL("bogus huuuuge fd (%d)\n", fde->fd);
}
if(fd_table_max == 0) {
fdevent_init();
fd_table_max = 256;
}
while(fd_table_max <= fde->fd) {
fd_table_max *= 2;
}
fd_table = reinterpret_cast<fdevent**>(
realloc(fd_table, sizeof(fdevent*) * fd_table_max));
if(fd_table == 0) {
FATAL("could not expand fd_table to %d entries\n", fd_table_max);
}
memset(fd_table + oldmax, 0, sizeof(int) * (fd_table_max - oldmax));
}
fd_table[fde->fd] = fde;
}
static void fdevent_unregister(fdevent *fde)
{
if((fde->fd < 0) || (fde->fd >= fd_table_max)) {
FATAL("fd out of range (%d)\n", fde->fd);
}
if(fd_table[fde->fd] != fde) {
FATAL("fd_table out of sync [%d]\n", fde->fd);
}
fd_table[fde->fd] = 0;
if(!(fde->state & FDE_DONT_CLOSE)) {
dump_fde(fde, "close");
adb_close(fde->fd);
}
}
static void fdevent_plist_enqueue(fdevent *node)
{
fdevent *list = &list_pending;
node->next = list;
node->prev = list->prev;
node->prev->next = node;
list->prev = node;
}
static void fdevent_plist_remove(fdevent *node)
{
node->prev->next = node->next;
node->next->prev = node->prev;
node->next = 0;
node->prev = 0;
}
static fdevent *fdevent_plist_dequeue(void)
{
fdevent *list = &list_pending;
fdevent *node = list->next;
if(node == list) return 0;
list->next = node->next;
list->next->prev = list;
node->next = 0;
node->prev = 0;
return node;
}
static void fdevent_call_fdfunc(fdevent* fde)
{
unsigned events = fde->events;
fde->events = 0;
if(!(fde->state & FDE_PENDING)) return;
fde->state &= (~FDE_PENDING);
dump_fde(fde, "callback");
fde->func(fde->fd, events, fde->arg);
}
#if !ADB_HOST
static void fdevent_subproc_event_func(int fd, unsigned ev,
void* /* userdata */)
{
D("subproc handling on fd=%d ev=%04x", fd, ev);
// Hook oneself back into the fde's suitable for select() on read.
if((fd < 0) || (fd >= fd_table_max)) {
FATAL("fd %d out of range for fd_table \n", fd);
}
fdevent *fde = fd_table[fd];
fdevent_add(fde, FDE_READ);
if(ev & FDE_READ){
int subproc_fd;
if(!ReadFdExactly(fd, &subproc_fd, sizeof(subproc_fd))) {
FATAL("Failed to read the subproc's fd from fd=%d\n", fd);
}
if((subproc_fd < 0) || (subproc_fd >= fd_table_max)) {
D("subproc_fd %d out of range 0, fd_table_max=%d",
subproc_fd, fd_table_max);
return;
}
fdevent *subproc_fde = fd_table[subproc_fd];
if(!subproc_fde) {
D("subproc_fd %d cleared from fd_table", subproc_fd);
return;
}
if(subproc_fde->fd != subproc_fd) {
// Already reallocated?
D("subproc_fd %d != fd_table[].fd %d", subproc_fd, subproc_fde->fd);
return;
}
subproc_fde->force_eof = 1;
int rcount = 0;
ioctl(subproc_fd, FIONREAD, &rcount);
D("subproc with fd=%d has rcount=%d err=%d",
subproc_fd, rcount, errno);
if(rcount) {
// If there is data left, it will show up in the select().
// This works because there is no other thread reading that
// data when in this fd_func().
return;
}
D("subproc_fde.state=%04x", subproc_fde->state);
subproc_fde->events |= FDE_READ;
if(subproc_fde->state & FDE_PENDING) {
return;
}
subproc_fde->state |= FDE_PENDING;
fdevent_call_fdfunc(subproc_fde);
}
}
void fdevent_subproc_setup()
{
int s[2];
if(adb_socketpair(s)) {
FATAL("cannot create shell-exit socket-pair\n");
}
D("socketpair: (%d,%d)", s[0], s[1]);
SHELL_EXIT_NOTIFY_FD = s[0];
fdevent *fde;
fde = fdevent_create(s[1], fdevent_subproc_event_func, NULL);
if(!fde)
FATAL("cannot create fdevent for shell-exit handler\n");
fdevent_add(fde, FDE_READ);
}
#endif // !ADB_HOST
fdevent *fdevent_create(int fd, fd_func func, void *arg)
{
fdevent *fde = (fdevent*) malloc(sizeof(fdevent));
@ -602,98 +105,234 @@ void fdevent_destroy(fdevent *fde)
{
if(fde == 0) return;
if(!(fde->state & FDE_CREATED)) {
FATAL("fde %p not created by fdevent_create()\n", fde);
LOG(FATAL) << "destroying fde not created by fdevent_create(): " << dump_fde(fde);
}
fdevent_remove(fde);
free(fde);
}
void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg)
{
void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) {
CHECK_GE(fd, 0);
memset(fde, 0, sizeof(fdevent));
fde->state = FDE_ACTIVE;
fde->fd = fd;
fde->force_eof = 0;
fde->func = func;
fde->arg = arg;
#if !defined(_WIN32)
fcntl(fd, F_SETFL, O_NONBLOCK);
#endif
fdevent_register(fde);
dump_fde(fde, "connect");
fdevent_connect(fde);
fde->state |= FDE_ACTIVE;
if (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) {
// Here is not proper to handle the error. If it fails here, some error is
// likely to be detected by poll(), then we can let the callback function
// to handle it.
LOG(ERROR) << "failed to fcntl(" << fd << ") to be nonblock";
}
auto pair = g_poll_node_map.emplace(fde->fd, PollNode(fde));
CHECK(pair.second) << "install existing fd " << fd;
D("fdevent_install %s", dump_fde(fde).c_str());
}
void fdevent_remove(fdevent *fde)
{
if(fde->state & FDE_PENDING) {
fdevent_plist_remove(fde);
void fdevent_remove(fdevent* fde) {
D("fdevent_remove %s", dump_fde(fde).c_str());
if (fde->state & FDE_ACTIVE) {
g_poll_node_map.erase(fde->fd);
if (fde->state & FDE_PENDING) {
g_pending_list.remove(fde);
}
if (!(fde->state & FDE_DONT_CLOSE)) {
adb_close(fde->fd);
fde->fd = -1;
}
fde->state = 0;
fde->events = 0;
}
if(fde->state & FDE_ACTIVE) {
fdevent_disconnect(fde);
dump_fde(fde, "disconnect");
fdevent_unregister(fde);
}
fde->state = 0;
fde->events = 0;
}
void fdevent_set(fdevent *fde, unsigned events)
{
events &= FDE_EVENTMASK;
if((fde->state & FDE_EVENTMASK) == events) return;
if(fde->state & FDE_ACTIVE) {
fdevent_update(fde, events);
dump_fde(fde, "update");
static void fdevent_update(fdevent* fde, unsigned events) {
auto it = g_poll_node_map.find(fde->fd);
CHECK(it != g_poll_node_map.end());
PollNode& node = it->second;
if (events & FDE_READ) {
node.pollfd.events |= POLLIN;
} else {
node.pollfd.events &= ~POLLIN;
}
if (events & FDE_WRITE) {
node.pollfd.events |= POLLOUT;
} else {
node.pollfd.events &= ~POLLOUT;
}
fde->state = (fde->state & FDE_STATEMASK) | events;
}
if(fde->state & FDE_PENDING) {
/* if we're pending, make sure
** we don't signal an event that
** is no longer wanted.
*/
fde->events &= (~events);
if(fde->events == 0) {
fdevent_plist_remove(fde);
fde->state &= (~FDE_PENDING);
void fdevent_set(fdevent* fde, unsigned events) {
events &= FDE_EVENTMASK;
if ((fde->state & FDE_EVENTMASK) == events) {
return;
}
if (fde->state & FDE_ACTIVE) {
fdevent_update(fde, events);
D("fdevent_set: %s, events = %u", dump_fde(fde).c_str(), events);
if (fde->state & FDE_PENDING) {
// If we are pending, make sure we don't signal an event that is no longer wanted.
fde->events &= ~events;
if (fde->events == 0) {
g_pending_list.remove(fde);
fde->state &= ~FDE_PENDING;
}
}
}
}
void fdevent_add(fdevent *fde, unsigned events)
{
fdevent_set(
fde, (fde->state & FDE_EVENTMASK) | (events & FDE_EVENTMASK));
void fdevent_add(fdevent* fde, unsigned events) {
fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events);
}
void fdevent_del(fdevent *fde, unsigned events)
{
fdevent_set(
fde, (fde->state & FDE_EVENTMASK) & (~(events & FDE_EVENTMASK)));
void fdevent_del(fdevent* fde, unsigned events) {
fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events);
}
static std::string dump_pollfds(const std::vector<pollfd>& pollfds) {
std::string result;
for (auto& pollfd : pollfds) {
std::string op;
if (pollfd.events & POLLIN) {
op += "R";
}
if (pollfd.events & POLLOUT) {
op += "W";
}
android::base::StringAppendF(&result, " %d(%s)", pollfd.fd, op.c_str());
}
return result;
}
static void fdevent_process() {
std::vector<pollfd> pollfds;
for (auto it = g_poll_node_map.begin(); it != g_poll_node_map.end(); ++it) {
pollfds.push_back(it->second.pollfd);
}
CHECK_GT(pollfds.size(), 0u);
D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str());
int ret = TEMP_FAILURE_RETRY(poll(&pollfds[0], pollfds.size(), -1));
if (ret == -1) {
PLOG(ERROR) << "poll(), ret = " << ret;
return;
}
for (auto& pollfd : pollfds) {
unsigned events = 0;
if (pollfd.revents & POLLIN) {
events |= FDE_READ;
}
if (pollfd.revents & POLLOUT) {
events |= FDE_WRITE;
}
if (pollfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
// We fake a read, as the rest of the code assumes that errors will
// be detected at that point.
events |= FDE_READ | FDE_ERROR;
}
if (events != 0) {
auto it = g_poll_node_map.find(pollfd.fd);
CHECK(it != g_poll_node_map.end());
fdevent* fde = it->second.fde;
CHECK_EQ(fde->fd, pollfd.fd);
fde->events |= events;
D("%s got events %x", dump_fde(fde).c_str(), events);
fde->state |= FDE_PENDING;
g_pending_list.push_back(fde);
}
}
}
static void fdevent_call_fdfunc(fdevent* fde)
{
unsigned events = fde->events;
fde->events = 0;
if(!(fde->state & FDE_PENDING)) return;
fde->state &= (~FDE_PENDING);
D("fdevent_call_fdfunc %s", dump_fde(fde).c_str());
fde->func(fde->fd, events, fde->arg);
}
#if !ADB_HOST
static void fdevent_subproc_event_func(int fd, unsigned ev,
void* /* userdata */)
{
D("subproc handling on fd = %d, ev = %x", fd, ev);
CHECK_GE(fd, 0);
if (ev & FDE_READ) {
int subproc_fd;
if(!ReadFdExactly(fd, &subproc_fd, sizeof(subproc_fd))) {
LOG(FATAL) << "Failed to read the subproc's fd from " << fd;
}
auto it = g_poll_node_map.find(subproc_fd);
if (it == g_poll_node_map.end()) {
D("subproc_fd %d cleared from fd_table", subproc_fd);
return;
}
fdevent* subproc_fde = it->second.fde;
if(subproc_fde->fd != subproc_fd) {
// Already reallocated?
D("subproc_fd(%d) != subproc_fde->fd(%d)", subproc_fd, subproc_fde->fd);
return;
}
subproc_fde->force_eof = 1;
int rcount = 0;
ioctl(subproc_fd, FIONREAD, &rcount);
D("subproc with fd %d has rcount=%d, err=%d", subproc_fd, rcount, errno);
if (rcount != 0) {
// If there is data left, it will show up in the select().
// This works because there is no other thread reading that
// data when in this fd_func().
return;
}
D("subproc_fde %s", dump_fde(subproc_fde).c_str());
subproc_fde->events |= FDE_READ;
if(subproc_fde->state & FDE_PENDING) {
return;
}
subproc_fde->state |= FDE_PENDING;
fdevent_call_fdfunc(subproc_fde);
}
}
void fdevent_subproc_setup()
{
int s[2];
if(adb_socketpair(s)) {
PLOG(FATAL) << "cannot create shell-exit socket-pair";
}
D("fdevent_subproc: socket pair (%d, %d)", s[0], s[1]);
SHELL_EXIT_NOTIFY_FD = s[0];
fdevent *fde = fdevent_create(s[1], fdevent_subproc_event_func, NULL);
CHECK(fde != nullptr) << "cannot create fdevent for shell-exit handler";
fdevent_add(fde, FDE_READ);
}
#endif // !ADB_HOST
void fdevent_loop()
{
fdevent *fde;
#if !ADB_HOST
fdevent_subproc_setup();
#endif // !ADB_HOST
while (true) {
D("--- ---- waiting for events");
D("--- --- waiting for events");
fdevent_process();
while((fde = fdevent_plist_dequeue())) {
while (!g_pending_list.empty()) {
fdevent* fde = g_pending_list.front();
g_pending_list.pop_front();
fdevent_call_fdfunc(fde);
}
}

View File

@ -18,6 +18,7 @@
#include <gtest/gtest.h>
#include <limits>
#include <queue>
#include <string>
#include <vector>
@ -50,9 +51,13 @@ class FdHandler {
public:
FdHandler(int read_fd, int write_fd) : read_fd_(read_fd), write_fd_(write_fd) {
fdevent_install(&read_fde_, read_fd_, FdEventCallback, this);
fdevent_add(&read_fde_, FDE_READ | FDE_ERROR);
fdevent_add(&read_fde_, FDE_READ);
fdevent_install(&write_fde_, write_fd_, FdEventCallback, this);
fdevent_add(&write_fde_, FDE_ERROR);
}
~FdHandler() {
fdevent_remove(&read_fde_);
fdevent_remove(&write_fde_);
}
private:
@ -150,3 +155,44 @@ TEST(fdevent, smoke) {
ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
ASSERT_EQ(0, pthread_join(thread, nullptr));
}
struct InvalidFdArg {
fdevent fde;
unsigned expected_events;
size_t* happened_event_count;
};
static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) {
InvalidFdArg* arg = reinterpret_cast<InvalidFdArg*>(userdata);
ASSERT_EQ(arg->expected_events, events);
fdevent_remove(&arg->fde);
if (++*(arg->happened_event_count) == 2) {
pthread_exit(nullptr);
}
}
void InvalidFdThreadFunc(void*) {
const int INVALID_READ_FD = std::numeric_limits<int>::max() - 1;
size_t happened_event_count = 0;
InvalidFdArg read_arg;
read_arg.expected_events = FDE_READ | FDE_ERROR;
read_arg.happened_event_count = &happened_event_count;
fdevent_install(&read_arg.fde, INVALID_READ_FD, InvalidFdEventCallback, &read_arg);
fdevent_add(&read_arg.fde, FDE_READ);
const int INVALID_WRITE_FD = std::numeric_limits<int>::max();
InvalidFdArg write_arg;
write_arg.expected_events = FDE_READ | FDE_ERROR;
write_arg.happened_event_count = &happened_event_count;
fdevent_install(&write_arg.fde, INVALID_WRITE_FD, InvalidFdEventCallback, &write_arg);
fdevent_add(&write_arg.fde, FDE_WRITE);
fdevent_loop();
}
TEST(fdevent, invalid_fd) {
pthread_t thread;
ASSERT_EQ(0, pthread_create(&thread, nullptr,
reinterpret_cast<void* (*)(void*)>(InvalidFdThreadFunc),
nullptr));
ASSERT_EQ(0, pthread_join(thread, nullptr));
}