From 4c936397c1dc8b127d7e27a9ee02ff7d0deef785 Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Wed, 3 May 2017 14:10:39 -0700 Subject: [PATCH 1/3] adb: add fdevent_run_on_main_thread. Add a function to run a function on the main thread, to allow fdevents that depend on a blocking function to be registered. Bug: http://b/37869663 Test: adb_test on linux Change-Id: I84a0b372360420b7647057297b8f437e8afa874e --- adb/fdevent.cpp | 85 ++++++++++++++++++++++++++++++++++++-------- adb/fdevent.h | 5 +++ adb/fdevent_test.cpp | 21 +++++++++++ adb/fdevent_test.h | 8 ++--- 4 files changed, 101 insertions(+), 18 deletions(-) diff --git a/adb/fdevent.cpp b/adb/fdevent.cpp index 72c9eef42..b28de4b52 100644 --- a/adb/fdevent.cpp +++ b/adb/fdevent.cpp @@ -26,15 +26,19 @@ #include #include +#include #include +#include #include #include #include #include +#include #include "adb_io.h" #include "adb_trace.h" +#include "adb_unique_fd.h" #include "adb_utils.h" #if !ADB_HOST @@ -75,6 +79,10 @@ static std::atomic terminate_loop(false); static bool main_thread_valid; static unsigned long main_thread_id; +static auto& run_queue_notify_fd = *new unique_fd(); +static auto& run_queue_mutex = *new std::mutex(); +static auto& run_queue GUARDED_BY(run_queue_mutex) = *new std::vector>(); + void check_main_thread() { if (main_thread_valid) { CHECK_EQ(main_thread_id, adb_thread_id()); @@ -112,8 +120,7 @@ static std::string dump_fde(const fdevent* fde) { return android::base::StringPrintf("(fdevent %d %s)", fde->fd, state.c_str()); } -fdevent *fdevent_create(int fd, fd_func func, void *arg) -{ +fdevent* fdevent_create(int fd, fd_func func, void* arg) { check_main_thread(); fdevent *fde = (fdevent*) malloc(sizeof(fdevent)); if(fde == 0) return 0; @@ -122,8 +129,7 @@ fdevent *fdevent_create(int fd, fd_func func, void *arg) return fde; } -void fdevent_destroy(fdevent *fde) -{ +void fdevent_destroy(fdevent* fde) { check_main_thread(); if(fde == 0) return; if(!(fde->state & FDE_CREATED)) { @@ -278,8 +284,7 @@ static void fdevent_process() { } } -static void fdevent_call_fdfunc(fdevent* fde) -{ +static void fdevent_call_fdfunc(fdevent* fde) { unsigned events = fde->events; fde->events = 0; CHECK(fde->state & FDE_PENDING); @@ -292,10 +297,7 @@ static void fdevent_call_fdfunc(fdevent* fde) #include -static void fdevent_subproc_event_func(int fd, unsigned ev, - void* /* userdata */) -{ - +static void fdevent_subproc_event_func(int fd, unsigned ev, void* /* userdata */) { D("subproc handling on fd = %d, ev = %x", fd, ev); CHECK_GE(fd, 0); @@ -342,8 +344,7 @@ static void fdevent_subproc_event_func(int fd, unsigned ev, } } -void fdevent_subproc_setup() -{ +static void fdevent_subproc_setup() { int s[2]; if(adb_socketpair(s)) { @@ -358,12 +359,63 @@ void fdevent_subproc_setup() } #endif // !ADB_HOST -void fdevent_loop() -{ +static void fdevent_run_flush() REQUIRES(run_queue_mutex) { + for (auto& f : run_queue) { + f(); + } + run_queue.clear(); +} + +static void fdevent_run_func(int fd, unsigned ev, void* /* userdata */) { + CHECK_GE(fd, 0); + CHECK(ev & FDE_READ); + + char buf[1024]; + + // Empty the fd. + if (adb_read(fd, buf, sizeof(buf)) == -1) { + PLOG(FATAL) << "failed to empty run queue notify fd"; + } + + std::lock_guard lock(run_queue_mutex); + fdevent_run_flush(); +} + +static void fdevent_run_setup() { + std::lock_guard lock(run_queue_mutex); + CHECK(run_queue_notify_fd.get() == -1); + int s[2]; + if (adb_socketpair(s) != 0) { + PLOG(FATAL) << "failed to create run queue notify socketpair"; + } + + run_queue_notify_fd.reset(s[0]); + fdevent* fde = fdevent_create(s[1], fdevent_run_func, nullptr); + CHECK(fde != nullptr); + fdevent_add(fde, FDE_READ); + + fdevent_run_flush(); +} + +void fdevent_run_on_main_thread(std::function fn) { + std::lock_guard lock(run_queue_mutex); + run_queue.push_back(std::move(fn)); + + // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up. + // In that case, rely on the setup code to flush the queue without a notification being needed. + if (run_queue_notify_fd != -1) { + if (adb_write(run_queue_notify_fd.get(), "", 1) != 1) { + PLOG(FATAL) << "failed to write to run queue notify fd"; + } + } +} + +void fdevent_loop() { set_main_thread(); #if !ADB_HOST fdevent_subproc_setup(); #endif // !ADB_HOST + fdevent_run_setup(); while (true) { if (terminate_loop) { @@ -393,6 +445,11 @@ size_t fdevent_installed_count() { void fdevent_reset() { g_poll_node_map.clear(); g_pending_list.clear(); + + std::lock_guard lock(run_queue_mutex); + run_queue_notify_fd.reset(); + run_queue.clear(); + main_thread_valid = false; terminate_loop = false; } diff --git a/adb/fdevent.h b/adb/fdevent.h index e32845afc..896400ad5 100644 --- a/adb/fdevent.h +++ b/adb/fdevent.h @@ -20,6 +20,8 @@ #include #include /* for int64_t */ +#include + /* events that may be observed */ #define FDE_READ 0x0001 #define FDE_WRITE 0x0002 @@ -78,6 +80,9 @@ void fdevent_loop(); void check_main_thread(); +// Queue an operation to run on the main thread. +void fdevent_run_on_main_thread(std::function fn); + // The following functions are used only for tests. void fdevent_terminate_loop(); size_t fdevent_installed_count(); diff --git a/adb/fdevent_test.cpp b/adb/fdevent_test.cpp index bdb973a01..86e020957 100644 --- a/adb/fdevent_test.cpp +++ b/adb/fdevent_test.cpp @@ -173,3 +173,24 @@ TEST_F(FdeventTest, invalid_fd) { std::thread thread(InvalidFdThreadFunc); thread.join(); } + +TEST_F(FdeventTest, run_on_main_thread) { + std::vector vec; + + PrepareThread(); + std::thread thread(fdevent_loop); + + for (int i = 0; i < 100; ++i) { + fdevent_run_on_main_thread([i, &vec]() { + check_main_thread(); + vec.push_back(i); + }); + } + + TerminateThread(thread); + + ASSERT_EQ(100u, vec.size()); + for (int i = 0; i < 100; ++i) { + ASSERT_EQ(i, vec[i]); + } +} diff --git a/adb/fdevent_test.h b/adb/fdevent_test.h index f4215ae19..5ca49ac08 100644 --- a/adb/fdevent_test.h +++ b/adb/fdevent_test.h @@ -53,11 +53,11 @@ class FdeventTest : public ::testing::Test { size_t GetAdditionalLocalSocketCount() { #if ADB_HOST - // dummy socket installed in PrepareThread() - return 1; -#else - // dummy socket and one more socket installed in fdevent_subproc_setup() + // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket return 2; +#else + // dummy socket + fdevent_run_on_main_thread + fdevent_subproc_setup() sockets + return 3; #endif } From 6f46e6b912b2cd30a699757c3f4bbf9b679e2b79 Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Wed, 3 May 2017 14:23:09 -0700 Subject: [PATCH 2/3] adb: initialize mDNS asynchronously. Use fdevent_run_on_main_thread to initialize mDNS in a thread and register an fdevent from the main thread upon success. This reduces the startup time of `adb server` by ~3 seconds when mDNS can't be successfully started. With an already running adb server, `time adb server nodaemon` goes from: adb server nodaemon 0.00s user 0.16s system 4% cpu 3.817 total to: adb server nodaemon 0.00s user 0.01s system 1% cpu 0.665 total Bug: http://b/37869663 Test: `adb server nodaemon` with an existing adb server Change-Id: Ia5a1a2a138610f3bf6792400050ca68f95ae3734 --- adb/transport_mdns.cpp | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/adb/transport_mdns.cpp b/adb/transport_mdns.cpp index e49b1c668..3603f0974 100644 --- a/adb/transport_mdns.cpp +++ b/adb/transport_mdns.cpp @@ -24,6 +24,8 @@ #include #endif +#include + #include #include @@ -262,19 +264,22 @@ static void DNSSD_API register_mdns_transport(DNSServiceRef sdRef, } } -void init_mdns_transport_discovery(void) { - DNSServiceErrorType errorCode = - DNSServiceBrowse(&service_ref, 0, 0, kADBServiceType, nullptr, - register_mdns_transport, nullptr); +void init_mdns_transport_discovery_thread(void) { + DNSServiceErrorType errorCode = DNSServiceBrowse(&service_ref, 0, 0, kADBServiceType, nullptr, + register_mdns_transport, nullptr); if (errorCode != kDNSServiceErr_NoError) { D("Got %d initiating mDNS browse.", errorCode); return; } - fdevent_install(&service_ref_fde, - adb_DNSServiceRefSockFD(service_ref), - pump_service_ref, - &service_ref); - fdevent_set(&service_ref_fde, FDE_READ); + fdevent_run_on_main_thread([]() { + fdevent_install(&service_ref_fde, adb_DNSServiceRefSockFD(service_ref), pump_service_ref, + &service_ref); + fdevent_set(&service_ref_fde, FDE_READ); + }); +} + +void init_mdns_transport_discovery(void) { + std::thread(init_mdns_transport_discovery_thread).detach(); } From fd713e53e890f5b3bf26a1c5a980137cc4378c3a Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Wed, 3 May 2017 22:37:10 -0700 Subject: [PATCH 3/3] adb: wait for devices to come up instead of sleeping for 3s. Replace a hard-coded 3 second sleep with logic to wait until we've scanned USB devices once and they've all come online. Before: adb shell true 0.00s user 0.00s system 0% cpu 3.047 total After: adb shell true 0.00s user 0.00s system 9% cpu 0.041 total Bug: http://b/37869663 Test: `time adb shell true` after adb kill-server Change-Id: I251d42afb885908ed9d03167287594ea16650d3f --- adb/adb.cpp | 55 ++++++++++++++++++++++++++++++++++++--- adb/adb.h | 14 ++++++++++ adb/adb_client.cpp | 8 +++--- adb/client/main.cpp | 51 ++++++++++++++++++++---------------- adb/client/usb_libusb.cpp | 2 ++ adb/transport.cpp | 19 ++++++++++++++ adb/transport.h | 4 +++ 7 files changed, 123 insertions(+), 30 deletions(-) diff --git a/adb/adb.cpp b/adb/adb.cpp index 39e71e5f3..0181daa60 100644 --- a/adb/adb.cpp +++ b/adb/adb.cpp @@ -31,6 +31,8 @@ #include #include +#include +#include #include #include #include @@ -48,6 +50,7 @@ #include "adb_io.h" #include "adb_listeners.h" #include "adb_utils.h" +#include "sysdeps/chrono.h" #include "transport.h" #if !ADB_HOST @@ -313,19 +316,15 @@ void parse_banner(const std::string& banner, atransport* t) { if (type == "bootloader") { D("setting connection_state to kCsBootloader"); t->SetConnectionState(kCsBootloader); - update_transports(); } else if (type == "device") { D("setting connection_state to kCsDevice"); t->SetConnectionState(kCsDevice); - update_transports(); } else if (type == "recovery") { D("setting connection_state to kCsRecovery"); t->SetConnectionState(kCsRecovery); - update_transports(); } else if (type == "sideload") { D("setting connection_state to kCsSideload"); t->SetConnectionState(kCsSideload); - update_transports(); } else { D("setting connection_state to kCsHost"); t->SetConnectionState(kCsHost); @@ -353,6 +352,8 @@ static void handle_new_connection(atransport* t, apacket* p) { send_auth_request(t); } #endif + + update_transports(); } void handle_packet(apacket *p, atransport *t) @@ -1229,4 +1230,50 @@ int handle_host_request(const char* service, TransportType type, return ret - 1; return -1; } + +static auto& init_mutex = *new std::mutex(); +static auto& init_cv = *new std::condition_variable(); +static bool device_scan_complete = false; +static bool transports_ready = false; + +void update_transport_status() { + bool result = iterate_transports([](const atransport* t) { + if (t->type == kTransportUsb && t->online != 1) { + return false; + } + return true; + }); + + D("update_transport_status: transports_ready = %s", result ? "true" : "false"); + + bool ready; + + { + std::lock_guard lock(init_mutex); + transports_ready = result; + ready = transports_ready && device_scan_complete; + } + + if (ready) { + D("update_transport_status: notifying"); + init_cv.notify_all(); + } +} + +void adb_notify_device_scan_complete() { + D("device scan complete"); + + { + std::lock_guard lock(init_mutex); + device_scan_complete = true; + } + + update_transport_status(); +} + +void adb_wait_for_device_initialization() { + std::unique_lock lock(init_mutex); + init_cv.wait_for(lock, 3s, []() { return device_scan_complete && transports_ready; }); +} + #endif // ADB_HOST diff --git a/adb/adb.h b/adb/adb.h index e3675d844..d6b2b818f 100644 --- a/adb/adb.h +++ b/adb/adb.h @@ -228,4 +228,18 @@ void SendConnectOnHost(atransport* t); void parse_banner(const std::string&, atransport* t); +// On startup, the adb server needs to wait until all of the connected devices are ready. +// To do this, we need to know when the scan has identified all of the potential new transports, and +// when each transport becomes ready. +// TODO: Do this for mDNS as well, instead of just USB? + +// We've found all of the transports we potentially care about. +void adb_notify_device_scan_complete(); + +// One or more transports have changed status, check to see if we're ready. +void update_transport_status(); + +// Wait until device scan has completed and every transport is ready, or a timeout elapses. +void adb_wait_for_device_initialization(); + #endif diff --git a/adb/adb_client.cpp b/adb/adb_client.cpp index b6568875c..4f3ff25df 100644 --- a/adb/adb_client.cpp +++ b/adb/adb_client.cpp @@ -28,12 +28,15 @@ #include #include +#include +#include #include #include #include #include #include +#include #include #include "adb_io.h" @@ -177,9 +180,8 @@ int adb_connect(const std::string& service, std::string* error) { } else { fprintf(stderr, "* daemon started successfully\n"); } - // Give the server some time to start properly and detect devices. - std::this_thread::sleep_for(3s); - // fall through to _adb_connect + // The server will wait until it detects all of its connected devices before acking. + // Fall through to _adb_connect. } else { // If a server is already running, check its version matches. int version = ADB_SERVER_VERSION - 1; diff --git a/adb/client/main.cpp b/adb/client/main.cpp index 606203cb4..fe5099c35 100644 --- a/adb/client/main.cpp +++ b/adb/client/main.cpp @@ -156,33 +156,38 @@ int adb_server_main(int is_daemon, const std::string& socket_spec, int ack_reply } #endif - // Inform our parent that we are up and running. + // Wait for the USB scan to complete before notifying the parent that we're up. + // We need to perform this in a thread, because we would otherwise block the event loop. + std::thread notify_thread([ack_reply_fd]() { + adb_wait_for_device_initialization(); - // Any error output written to stderr now goes to adb.log. We could - // keep around a copy of the stderr fd and use that to write any errors - // encountered by the following code, but that is probably overkill. + // Any error output written to stderr now goes to adb.log. We could + // keep around a copy of the stderr fd and use that to write any errors + // encountered by the following code, but that is probably overkill. #if defined(_WIN32) - const HANDLE ack_reply_handle = cast_int_to_handle(ack_reply_fd); - const CHAR ack[] = "OK\n"; - const DWORD bytes_to_write = arraysize(ack) - 1; - DWORD written = 0; - if (!WriteFile(ack_reply_handle, ack, bytes_to_write, &written, NULL)) { - fatal("adb: cannot write ACK to handle 0x%p: %s", ack_reply_handle, - android::base::SystemErrorCodeToString(GetLastError()).c_str()); - } - if (written != bytes_to_write) { - fatal("adb: cannot write %lu bytes of ACK: only wrote %lu bytes", - bytes_to_write, written); - } - CloseHandle(ack_reply_handle); + const HANDLE ack_reply_handle = cast_int_to_handle(ack_reply_fd); + const CHAR ack[] = "OK\n"; + const DWORD bytes_to_write = arraysize(ack) - 1; + DWORD written = 0; + if (!WriteFile(ack_reply_handle, ack, bytes_to_write, &written, NULL)) { + fatal("adb: cannot write ACK to handle 0x%p: %s", ack_reply_handle, + android::base::SystemErrorCodeToString(GetLastError()).c_str()); + } + if (written != bytes_to_write) { + fatal("adb: cannot write %lu bytes of ACK: only wrote %lu bytes", bytes_to_write, + written); + } + CloseHandle(ack_reply_handle); #else - // TODO(danalbert): Can't use SendOkay because we're sending "OK\n", not - // "OKAY". - if (!android::base::WriteStringToFd("OK\n", ack_reply_fd)) { - fatal_errno("error writing ACK to fd %d", ack_reply_fd); - } - unix_close(ack_reply_fd); + // TODO(danalbert): Can't use SendOkay because we're sending "OK\n", not + // "OKAY". + if (!android::base::WriteStringToFd("OK\n", ack_reply_fd)) { + fatal_errno("error writing ACK to fd %d", ack_reply_fd); + } + unix_close(ack_reply_fd); #endif + }); + notify_thread.detach(); } D("Event loop starting"); diff --git a/adb/client/usb_libusb.cpp b/adb/client/usb_libusb.cpp index d39884ac7..20610ee46 100644 --- a/adb/client/usb_libusb.cpp +++ b/adb/client/usb_libusb.cpp @@ -352,6 +352,8 @@ static void poll_for_devices() { } libusb_free_device_list(list, 1); + adb_notify_device_scan_complete(); + std::this_thread::sleep_for(500ms); } } diff --git a/adb/transport.cpp b/adb/transport.cpp index cc8c16252..20de6421e 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -400,8 +400,27 @@ asocket* create_device_tracker(void) { return &tracker->socket; } +// Check if all of the USB transports are connected. +bool iterate_transports(std::function fn) { + std::lock_guard lock(transport_lock); + for (const auto& t : transport_list) { + if (!fn(t)) { + return false; + } + } + for (const auto& t : pending_list) { + if (!fn(t)) { + return false; + } + } + return true; +} + // Call this function each time the transport list has changed. void update_transports() { + update_transport_status(); + + // Notify `adb track-devices` clients. std::string transports = list_transports(false); device_tracker* tracker = device_tracker_list; diff --git a/adb/transport.h b/adb/transport.h index 8c15d663d..e129355ba 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -198,6 +198,10 @@ atransport* acquire_one_transport(TransportType type, const char* serial, bool* void kick_transport(atransport* t); void update_transports(void); +// Iterates across all of the current and pending transports. +// Stops iteration and returns false if fn returns false, otherwise returns true. +bool iterate_transports(std::function fn); + void init_transport_registration(void); void init_mdns_transport_discovery(void); std::string list_transports(bool long_listing);