diff --git a/adb/daemon/shell_service.cpp b/adb/daemon/shell_service.cpp index 0fb14c42e..f62032d06 100644 --- a/adb/daemon/shell_service.cpp +++ b/adb/daemon/shell_service.cpp @@ -85,7 +85,6 @@ #include #include #include -#include #include #include @@ -141,6 +140,20 @@ bool CreateSocketpair(unique_fd* fd1, unique_fd* fd2) { return true; } +struct SubprocessPollfds { + adb_pollfd pfds[3]; + + adb_pollfd* data() { return pfds; } + size_t size() { return 3; } + + adb_pollfd* begin() { return pfds; } + adb_pollfd* end() { return pfds + size(); } + + adb_pollfd& stdinout_pfd() { return pfds[0]; } + adb_pollfd& stderr_pfd() { return pfds[1]; } + adb_pollfd& protocol_pfd() { return pfds[2]; } +}; + class Subprocess { public: Subprocess(std::string command, const char* terminal_type, SubprocessType type, @@ -176,8 +189,7 @@ class Subprocess { void PassDataStreams(); void WaitForExit(); - unique_fd* SelectLoop(fd_set* master_read_set_ptr, - fd_set* master_write_set_ptr); + unique_fd* PollLoop(SubprocessPollfds* pfds); // Input/output stream handlers. Success returns nullptr, failure returns // a pointer to the failed FD. @@ -545,23 +557,23 @@ void Subprocess::PassDataStreams() { } // Start by trying to read from the protocol FD, stdout, and stderr. - fd_set master_read_set, master_write_set; - FD_ZERO(&master_read_set); - FD_ZERO(&master_write_set); - for (unique_fd* sfd : {&protocol_sfd_, &stdinout_sfd_, &stderr_sfd_}) { - if (*sfd != -1) { - FD_SET(sfd->get(), &master_read_set); - } - } + SubprocessPollfds pfds; + pfds.stdinout_pfd() = {.fd = stdinout_sfd_.get(), .events = POLLIN}; + pfds.stderr_pfd() = {.fd = stderr_sfd_.get(), .events = POLLIN}; + pfds.protocol_pfd() = {.fd = protocol_sfd_.get(), .events = POLLIN}; // Pass data until the protocol FD or both the subprocess pipes die, at // which point we can't pass any more data. while (protocol_sfd_ != -1 && (stdinout_sfd_ != -1 || stderr_sfd_ != -1)) { - unique_fd* dead_sfd = SelectLoop(&master_read_set, &master_write_set); + unique_fd* dead_sfd = PollLoop(&pfds); if (dead_sfd) { D("closing FD %d", dead_sfd->get()); - FD_CLR(dead_sfd->get(), &master_read_set); - FD_CLR(dead_sfd->get(), &master_write_set); + auto it = std::find_if(pfds.begin(), pfds.end(), [=](const adb_pollfd& pfd) { + return pfd.fd == dead_sfd->get(); + }); + CHECK(it != pfds.end()); + it->fd = -1; + it->events = 0; if (dead_sfd == &protocol_sfd_) { // Using SIGHUP is a decent general way to indicate that the // controlling process is going away. If specific signals are @@ -583,30 +595,19 @@ void Subprocess::PassDataStreams() { } } -namespace { - -inline bool ValidAndInSet(const unique_fd& sfd, fd_set* set) { - return sfd != -1 && FD_ISSET(sfd.get(), set); -} - -} // namespace - -unique_fd* Subprocess::SelectLoop(fd_set* master_read_set_ptr, - fd_set* master_write_set_ptr) { - fd_set read_set, write_set; - int select_n = - std::max(std::max(protocol_sfd_.get(), stdinout_sfd_.get()), stderr_sfd_.get()) + 1; +unique_fd* Subprocess::PollLoop(SubprocessPollfds* pfds) { unique_fd* dead_sfd = nullptr; + adb_pollfd& stdinout_pfd = pfds->stdinout_pfd(); + adb_pollfd& stderr_pfd = pfds->stderr_pfd(); + adb_pollfd& protocol_pfd = pfds->protocol_pfd(); - // Keep calling select() and passing data until an FD closes/errors. + // Keep calling poll() and passing data until an FD closes/errors. while (!dead_sfd) { - memcpy(&read_set, master_read_set_ptr, sizeof(read_set)); - memcpy(&write_set, master_write_set_ptr, sizeof(write_set)); - if (select(select_n, &read_set, &write_set, nullptr, nullptr) < 0) { + if (adb_poll(pfds->data(), pfds->size(), -1) < 0) { if (errno == EINTR) { continue; } else { - PLOG(ERROR) << "select failed, closing subprocess pipes"; + PLOG(ERROR) << "poll failed, closing subprocess pipes"; stdinout_sfd_.reset(-1); stderr_sfd_.reset(-1); return nullptr; @@ -614,34 +615,47 @@ unique_fd* Subprocess::SelectLoop(fd_set* master_read_set_ptr, } // Read stdout, write to protocol FD. - if (ValidAndInSet(stdinout_sfd_, &read_set)) { + if (stdinout_pfd.fd != -1 && (stdinout_pfd.revents & POLLIN)) { dead_sfd = PassOutput(&stdinout_sfd_, ShellProtocol::kIdStdout); } // Read stderr, write to protocol FD. - if (!dead_sfd && ValidAndInSet(stderr_sfd_, &read_set)) { + if (!dead_sfd && stderr_pfd.fd != 1 && (stderr_pfd.revents & POLLIN)) { dead_sfd = PassOutput(&stderr_sfd_, ShellProtocol::kIdStderr); } // Read protocol FD, write to stdin. - if (!dead_sfd && ValidAndInSet(protocol_sfd_, &read_set)) { + if (!dead_sfd && protocol_pfd.fd != -1 && (protocol_pfd.revents & POLLIN)) { dead_sfd = PassInput(); // If we didn't finish writing, block on stdin write. if (input_bytes_left_) { - FD_CLR(protocol_sfd_.get(), master_read_set_ptr); - FD_SET(stdinout_sfd_.get(), master_write_set_ptr); + protocol_pfd.events &= ~POLLIN; + stdinout_pfd.events |= POLLOUT; } } // Continue writing to stdin; only happens if a previous write blocked. - if (!dead_sfd && ValidAndInSet(stdinout_sfd_, &write_set)) { + if (!dead_sfd && stdinout_pfd.fd != -1 && (stdinout_pfd.revents & POLLOUT)) { dead_sfd = PassInput(); // If we finished writing, go back to blocking on protocol read. if (!input_bytes_left_) { - FD_SET(protocol_sfd_.get(), master_read_set_ptr); - FD_CLR(stdinout_sfd_.get(), master_write_set_ptr); + protocol_pfd.events |= POLLIN; + stdinout_pfd.events &= ~POLLOUT; } } + + // After handling all of the events we've received, check to see if any fds have died. + if (stdinout_pfd.revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) { + return &stdinout_sfd_; + } + + if (stderr_pfd.revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) { + return &stderr_sfd_; + } + + if (protocol_pfd.revents & (POLLHUP | POLLRDHUP | POLLERR | POLLNVAL)) { + return &protocol_sfd_; + } } // while (!dead_sfd) return dead_sfd; diff --git a/adb/test_device.py b/adb/test_device.py index 57925e8b2..083adce4d 100755 --- a/adb/test_device.py +++ b/adb/test_device.py @@ -536,6 +536,36 @@ class ShellTest(DeviceTest): for i, success in result.iteritems(): self.assertTrue(success) + def disabled_test_parallel(self): + """Spawn a bunch of `adb shell` instances in parallel. + + This was broken historically due to the use of select, which only works + for fds that are numerically less than 1024. + + Bug: http://b/141955761""" + + n_procs = 2048 + procs = dict() + for i in xrange(0, n_procs): + procs[i] = subprocess.Popen( + ['adb', 'shell', 'read foo; echo $foo; read rc; exit $rc'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE + ) + + for i in xrange(0, n_procs): + procs[i].stdin.write("%d\n" % i) + + for i in xrange(0, n_procs): + response = procs[i].stdout.readline() + assert(response == "%d\n" % i) + + for i in xrange(0, n_procs): + procs[i].stdin.write("%d\n" % (i % 256)) + + for i in xrange(0, n_procs): + assert(procs[i].wait() == i % 256) + class ArgumentEscapingTest(DeviceTest): def test_shell_escaping(self):