From 454bc7c0be9757c583b6f9fd8912a1b158773f28 Mon Sep 17 00:00:00 2001 From: Luis Hector Chavez Date: Fri, 20 Apr 2018 10:31:29 -0700 Subject: [PATCH 1/3] adb: Add a way to reconnect TCP transports This change adds a reconnect handler that tracks all TCP transports that were connected at some point, but became disconnected. It does so by attempting to reconnect every 10s for up to a minute. Bug: 74411879 Test: system/core/adb/test_adb.py Test: adb connect chromebook:22 # This runs with sslh Test: CtsBootStatsTestCases Test: emulator -show-kernel ; adb -s emulator-5554 shell Change-Id: I7b9f6d181b71ccf5c26ff96c45d36aaf6409b992 --- adb/client/main.cpp | 1 + adb/test_adb.py | 82 +++++++++++++++---- adb/transport.cpp | 174 ++++++++++++++++++++++++++++++++++++---- adb/transport.h | 24 +++++- adb/transport_local.cpp | 58 ++++++++++---- 5 files changed, 291 insertions(+), 48 deletions(-) diff --git a/adb/client/main.cpp b/adb/client/main.cpp index 31cb8536a..44ed3a253 100644 --- a/adb/client/main.cpp +++ b/adb/client/main.cpp @@ -117,6 +117,7 @@ int adb_server_main(int is_daemon, const std::string& socket_spec, int ack_reply atexit(adb_server_cleanup); init_transport_registration(); + init_reconnect_handler(); init_mdns_transport_discovery(); usb_init(); diff --git a/adb/test_adb.py b/adb/test_adb.py index 32bf0297c..ce4d4ecfe 100644 --- a/adb/test_adb.py +++ b/adb/test_adb.py @@ -75,9 +75,11 @@ def fake_adb_server(protocol=socket.AF_INET, port=0): else: # Client socket data = r.recv(1024) - if not data: + if not data or data.startswith('OPEN'): if r in cnxn_sent: del cnxn_sent[r] + r.shutdown(socket.SHUT_RDWR) + r.close() rlist.remove(r) continue if r in cnxn_sent: @@ -97,6 +99,25 @@ def fake_adb_server(protocol=socket.AF_INET, port=0): server_thread.join() +@contextlib.contextmanager +def adb_connect(unittest, serial): + """Context manager for an ADB connection. + + This automatically disconnects when done with the connection. + """ + + output = subprocess.check_output(['adb', 'connect', serial]) + unittest.assertEqual(output.strip(), 'connected to {}'.format(serial)) + + try: + yield + finally: + # Perform best-effort disconnection. Discard the output. + p = subprocess.Popen(['adb', 'disconnect', serial], + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p.communicate() + + class NonApiTest(unittest.TestCase): """Tests for ADB that aren't a part of the AndroidDevice API.""" @@ -278,29 +299,60 @@ class NonApiTest(unittest.TestCase): for protocol in (socket.AF_INET, socket.AF_INET6): try: with fake_adb_server(protocol=protocol) as port: - output = subprocess.check_output( - ['adb', 'connect', 'localhost:{}'.format(port)]) - - self.assertEqual( - output.strip(), 'connected to localhost:{}'.format(port)) + serial = 'localhost:{}'.format(port) + with adb_connect(self, serial): + pass except socket.error: print("IPv6 not available, skipping") continue def test_already_connected(self): + """Ensure that an already-connected device stays connected.""" + with fake_adb_server() as port: - output = subprocess.check_output( - ['adb', 'connect', 'localhost:{}'.format(port)]) + serial = 'localhost:{}'.format(port) + with adb_connect(self, serial): + # b/31250450: this always returns 0 but probably shouldn't. + output = subprocess.check_output(['adb', 'connect', serial]) + self.assertEqual( + output.strip(), 'already connected to {}'.format(serial)) - self.assertEqual( - output.strip(), 'connected to localhost:{}'.format(port)) + def test_reconnect(self): + """Ensure that a disconnected device reconnects.""" - # b/31250450: this always returns 0 but probably shouldn't. - output = subprocess.check_output( - ['adb', 'connect', 'localhost:{}'.format(port)]) + with fake_adb_server() as port: + serial = 'localhost:{}'.format(port) + with adb_connect(self, serial): + output = subprocess.check_output(['adb', '-s', serial, + 'get-state']) + self.assertEqual(output.strip(), 'device') - self.assertEqual( - output.strip(), 'already connected to localhost:{}'.format(port)) + # This will fail. + p = subprocess.Popen(['adb', '-s', serial, 'shell', 'true'], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + output, _ = p.communicate() + self.assertEqual(output.strip(), 'error: closed') + + subprocess.check_call(['adb', '-s', serial, 'wait-for-device']) + + output = subprocess.check_output(['adb', '-s', serial, + 'get-state']) + self.assertEqual(output.strip(), 'device') + + # Once we explicitly kick a device, it won't attempt to + # reconnect. + output = subprocess.check_output(['adb', 'disconnect', serial]) + self.assertEqual( + output.strip(), 'disconnected {}'.format(serial)) + try: + subprocess.check_output(['adb', '-s', serial, 'get-state'], + stderr=subprocess.STDOUT) + self.fail('Device should not be available') + except subprocess.CalledProcessError as e: + self.assertEqual( + e.output.strip(), + 'error: device \'{}\' not found'.format(serial)) def main(): random.seed(0) diff --git a/adb/transport.cpp b/adb/transport.cpp index be7f8fe7f..beec13a69 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include @@ -50,7 +51,9 @@ #include "adb_utils.h" #include "fdevent.h" -static void transport_unref(atransport *t); +static void register_transport(atransport* transport); +static void remove_transport(atransport* transport); +static void transport_unref(atransport* transport); // TODO: unordered_map static auto& transport_list = *new std::list(); @@ -77,6 +80,130 @@ class SCOPED_CAPABILITY ScopedAssumeLocked { ~ScopedAssumeLocked() RELEASE() {} }; +// Tracks and handles atransport*s that are attempting reconnection. +class ReconnectHandler { + public: + ReconnectHandler() = default; + ~ReconnectHandler() = default; + + // Starts the ReconnectHandler thread. + void Start(); + + // Requests the ReconnectHandler thread to stop. + void Stop(); + + // Adds the atransport* to the queue of reconnect attempts. + void TrackTransport(atransport* transport); + + private: + // The main thread loop. + void Run(); + + // Tracks a reconnection attempt. + struct ReconnectAttempt { + atransport* transport; + std::chrono::system_clock::time_point deadline; + size_t attempts_left; + }; + + // Only retry for up to one minute. + static constexpr const std::chrono::seconds kDefaultTimeout = std::chrono::seconds(10); + static constexpr const size_t kMaxAttempts = 6; + + // Protects all members. + std::mutex reconnect_mutex_; + bool running_ GUARDED_BY(reconnect_mutex_) = true; + std::thread handler_thread_; + std::condition_variable reconnect_cv_; + std::queue reconnect_queue_ GUARDED_BY(reconnect_mutex_); + + DISALLOW_COPY_AND_ASSIGN(ReconnectHandler); +}; + +void ReconnectHandler::Start() { + check_main_thread(); + handler_thread_ = std::thread(&ReconnectHandler::Run, this); +} + +void ReconnectHandler::Stop() { + check_main_thread(); + { + std::lock_guard lock(reconnect_mutex_); + running_ = false; + } + reconnect_cv_.notify_one(); + handler_thread_.join(); + + // Drain the queue to free all resources. + std::lock_guard lock(reconnect_mutex_); + while (!reconnect_queue_.empty()) { + ReconnectAttempt attempt = reconnect_queue_.front(); + reconnect_queue_.pop(); + remove_transport(attempt.transport); + } +} + +void ReconnectHandler::TrackTransport(atransport* transport) { + check_main_thread(); + { + std::lock_guard lock(reconnect_mutex_); + if (!running_) return; + reconnect_queue_.emplace(ReconnectAttempt{ + transport, std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout, + ReconnectHandler::kMaxAttempts}); + } + reconnect_cv_.notify_one(); +} + +void ReconnectHandler::Run() { + while (true) { + ReconnectAttempt attempt; + { + std::unique_lock lock(reconnect_mutex_); + ScopedAssumeLocked assume_lock(reconnect_mutex_); + + auto deadline = std::chrono::time_point::max(); + if (!reconnect_queue_.empty()) deadline = reconnect_queue_.front().deadline; + reconnect_cv_.wait_until(lock, deadline, [&]() REQUIRES(reconnect_mutex_) { + return !running_ || + (!reconnect_queue_.empty() && reconnect_queue_.front().deadline < deadline); + }); + + if (!running_) return; + attempt = reconnect_queue_.front(); + reconnect_queue_.pop(); + if (attempt.transport->kicked()) { + D("transport %s was kicked. giving up on it.", attempt.transport->serial); + remove_transport(attempt.transport); + continue; + } + } + D("attempting to reconnect %s", attempt.transport->serial); + + if (!attempt.transport->Reconnect()) { + D("attempting to reconnect %s failed.", attempt.transport->serial); + if (attempt.attempts_left == 0) { + D("transport %s exceeded the number of retry attempts. giving up on it.", + attempt.transport->serial); + remove_transport(attempt.transport); + continue; + } + + std::lock_guard lock(reconnect_mutex_); + reconnect_queue_.emplace(ReconnectAttempt{ + attempt.transport, + std::chrono::system_clock::now() + ReconnectHandler::kDefaultTimeout, + attempt.attempts_left - 1}); + continue; + } + + D("reconnection to %s succeeded.", attempt.transport->serial); + register_transport(attempt.transport); + } +} + +static auto& reconnect_handler = *new ReconnectHandler(); + } // namespace TransportId NextTransportId() { @@ -477,8 +604,6 @@ static int transport_write_action(int fd, struct tmsg* m) { return 0; } -static void remove_transport(atransport*); - static void transport_registration_func(int _fd, unsigned ev, void*) { tmsg m; atransport* t; @@ -515,8 +640,9 @@ static void transport_registration_func(int _fd, unsigned ev, void*) { /* don't create transport threads for inaccessible devices */ if (t->GetConnectionState() != kCsNoPerm) { - /* initial references are the two threads */ - t->ref_count = 1; + // The connection gets a reference to the atransport. It will release it + // upon a read/write error. + t->ref_count++; t->connection()->SetTransportName(t->serial_name()); t->connection()->SetReadCallback([t](Connection*, std::unique_ptr p) { if (!check_header(p.get(), t)) { @@ -547,13 +673,20 @@ static void transport_registration_func(int _fd, unsigned ev, void*) { { std::lock_guard lock(transport_lock); - pending_list.remove(t); - transport_list.push_front(t); + auto it = std::find(pending_list.begin(), pending_list.end(), t); + if (it != pending_list.end()) { + pending_list.remove(t); + transport_list.push_front(t); + } } update_transports(); } +void init_reconnect_handler(void) { + reconnect_handler.Start(); +} + void init_transport_registration(void) { int s[2]; @@ -572,6 +705,7 @@ void init_transport_registration(void) { } void kick_all_transports() { + reconnect_handler.Stop(); // To avoid only writing part of a packet to a transport after exit, kick all transports. std::lock_guard lock(transport_lock); for (auto t : transport_list) { @@ -601,15 +735,21 @@ static void remove_transport(atransport* transport) { } static void transport_unref(atransport* t) { + check_main_thread(); CHECK(t != nullptr); std::lock_guard lock(transport_lock); CHECK_GT(t->ref_count, 0u); t->ref_count--; if (t->ref_count == 0) { - D("transport: %s unref (kicking and closing)", t->serial); t->connection()->Stop(); - remove_transport(t); + if (t->IsTcpDevice() && !t->kicked()) { + D("transport: %s unref (attempting reconnection) %d", t->serial, t->kicked()); + reconnect_handler.TrackTransport(t); + } else { + D("transport: %s unref (kicking and closing)", t->serial); + remove_transport(t); + } } else { D("transport: %s unref (count=%zu)", t->serial, t->ref_count); } @@ -781,9 +921,8 @@ int atransport::Write(apacket* p) { } void atransport::Kick() { - if (!kicked_) { - D("kicking transport %s", this->serial); - kicked_ = true; + if (!kicked_.exchange(true)) { + D("kicking transport %p %s", this, this->serial); this->connection()->Stop(); } } @@ -941,6 +1080,10 @@ void atransport::SetConnectionEstablished(bool success) { connection_waitable_->SetConnectionEstablished(success); } +bool atransport::Reconnect() { + return reconnect_(this); +} + #if ADB_HOST // We use newline as our delimiter, make sure to never output it. @@ -1021,8 +1164,9 @@ void close_usb_devices() { } #endif // ADB_HOST -int register_socket_transport(int s, const char* serial, int port, int local) { - atransport* t = new atransport(); +int register_socket_transport(int s, const char* serial, int port, int local, + atransport::ReconnectCallback reconnect) { + atransport* t = new atransport(std::move(reconnect), kCsOffline); if (!serial) { char buf[32]; @@ -1103,7 +1247,7 @@ void kick_all_tcp_devices() { void register_usb_transport(usb_handle* usb, const char* serial, const char* devpath, unsigned writeable) { - atransport* t = new atransport((writeable ? kCsConnecting : kCsNoPerm)); + atransport* t = new atransport(writeable ? kCsOffline : kCsNoPerm); D("transport: %p init'ing for usb_handle %p (sn='%s')", t, usb, serial ? serial : ""); init_usb_transport(t, usb); diff --git a/adb/transport.h b/adb/transport.h index e1cbc092d..ae9cc023c 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -198,20 +198,27 @@ class atransport { // class in one go is a very large change. Given how bad our testing is, // it's better to do this piece by piece. - atransport(ConnectionState state = kCsConnecting) + using ReconnectCallback = std::function; + + atransport(ReconnectCallback reconnect, ConnectionState state) : id(NextTransportId()), + kicked_(false), connection_state_(state), connection_waitable_(std::make_shared()), - connection_(nullptr) { + connection_(nullptr), + reconnect_(std::move(reconnect)) { // Initialize protocol to min version for compatibility with older versions. // Version will be updated post-connect. protocol_version = A_VERSION_MIN; max_payload = MAX_PAYLOAD; } + atransport(ConnectionState state = kCsOffline) + : atransport([](atransport*) { return false; }, state) {} virtual ~atransport(); int Write(apacket* p); void Kick(); + bool kicked() const { return kicked_; } // ConnectionState can be read by all threads, but can only be written in the main thread. ConnectionState GetConnectionState() const; @@ -286,8 +293,12 @@ class atransport { // Gets a shared reference to the ConnectionWaitable. std::shared_ptr connection_waitable() { return connection_waitable_; } + // Attempts to reconnect with the underlying Connection. Returns true if the + // reconnection attempt succeeded. + bool Reconnect(); + private: - bool kicked_ = false; + std::atomic kicked_; // A set of features transmitted in the banner with the initial connection. // This is stored in the banner as 'features=feature0,feature1,etc'. @@ -310,6 +321,9 @@ class atransport { // The underlying connection object. std::shared_ptr connection_ GUARDED_BY(mutex_); + // A callback that will be invoked when the atransport needs to reconnect. + ReconnectCallback reconnect_; + std::mutex mutex_; DISALLOW_COPY_AND_ASSIGN(atransport); @@ -333,6 +347,7 @@ void update_transports(void); // Stops iteration and returns false if fn returns false, otherwise returns true. bool iterate_transports(std::function fn); +void init_reconnect_handler(void); void init_transport_registration(void); void init_mdns_transport_discovery(void); std::string list_transports(bool long_listing); @@ -347,7 +362,8 @@ void register_usb_transport(usb_handle* h, const char* serial, void connect_device(const std::string& address, std::string* response); /* cause new transports to be init'd and added to the list */ -int register_socket_transport(int s, const char* serial, int port, int local); +int register_socket_transport(int s, const char* serial, int port, int local, + atransport::ReconnectCallback reconnect); // This should only be used for transports with connection_state == kCsNoPerm. void unregister_usb_transport(usb_handle* usb); diff --git a/adb/transport_local.cpp b/adb/transport_local.cpp index e81f27c95..181d6665d 100644 --- a/adb/transport_local.cpp +++ b/adb/transport_local.cpp @@ -68,28 +68,24 @@ bool local_connect(int port) { return local_connect_arbitrary_ports(port - 1, port, &dummy) == 0; } -void connect_device(const std::string& address, std::string* response) { - if (address.empty()) { - *response = "empty address"; - return; - } - +std::tuple tcp_connect(const std::string& address, + std::string* response) { std::string serial; std::string host; int port = DEFAULT_ADB_LOCAL_TRANSPORT_PORT; if (!android::base::ParseNetAddress(address, &host, &port, &serial, response)) { - return; + return std::make_tuple(unique_fd(), port, serial); } std::string error; - int fd = network_connect(host.c_str(), port, SOCK_STREAM, 10, &error); + unique_fd fd(network_connect(host.c_str(), port, SOCK_STREAM, 10, &error)); if (fd == -1) { *response = android::base::StringPrintf("unable to connect to %s: %s", serial.c_str(), error.c_str()); - return; + return std::make_tuple(std::move(fd), port, serial); } - D("client: connected %s remote on fd %d", serial.c_str(), fd); + D("client: connected %s remote on fd %d", serial.c_str(), fd.get()); close_on_exec(fd); disable_tcp_nagle(fd); @@ -98,7 +94,38 @@ void connect_device(const std::string& address, std::string* response) { D("warning: failed to configure TCP keepalives (%s)", strerror(errno)); } - int ret = register_socket_transport(fd, serial.c_str(), port, 0); + return std::make_tuple(std::move(fd), port, serial); +} + +void connect_device(const std::string& address, std::string* response) { + if (address.empty()) { + *response = "empty address"; + return; + } + + unique_fd fd; + int port; + std::string serial; + std::tie(fd, port, serial) = tcp_connect(address, response); + auto reconnect = [address](atransport* t) { + std::string response; + unique_fd fd; + int port; + std::string serial; + std::tie(fd, port, serial) = tcp_connect(address, &response); + if (fd == -1) { + D("reconnect failed: %s", response.c_str()); + return false; + } + + // This invokes the part of register_socket_transport() that needs to be + // invoked if the atransport* has already been setup. This eventually + // calls atransport->SetConnection() with a newly created Connection* + // that will in turn send the CNXN packet. + return init_socket_transport(t, fd.release(), port, 0) >= 0; + }; + + int ret = register_socket_transport(fd.release(), serial.c_str(), port, 0, std::move(reconnect)); if (ret < 0) { adb_close(fd); if (ret == -EALREADY) { @@ -135,7 +162,8 @@ int local_connect_arbitrary_ports(int console_port, int adb_port, std::string* e close_on_exec(fd); disable_tcp_nagle(fd); std::string serial = getEmulatorSerialString(console_port); - if (register_socket_transport(fd, serial.c_str(), adb_port, 1) == 0) { + if (register_socket_transport(fd, serial.c_str(), adb_port, 1, + [](atransport*) { return false; }) == 0) { return 0; } adb_close(fd); @@ -239,7 +267,8 @@ static void server_socket_thread(int port) { close_on_exec(fd); disable_tcp_nagle(fd); std::string serial = android::base::StringPrintf("host-%d", fd); - if (register_socket_transport(fd, serial.c_str(), port, 1) != 0) { + if (register_socket_transport(fd, serial.c_str(), port, 1, + [](atransport*) { return false; }) != 0) { adb_close(fd); } } @@ -338,7 +367,8 @@ static void qemu_socket_thread(int port) { /* Host is connected. Register the transport, and start the * exchange. */ std::string serial = android::base::StringPrintf("host-%d", fd); - if (register_socket_transport(fd, serial.c_str(), port, 1) != 0 || + if (register_socket_transport(fd, serial.c_str(), port, 1, + [](atransport*) { return false; }) != 0 || !WriteFdExactly(fd, _start_req, strlen(_start_req))) { adb_close(fd); } From fbee0a9133ef9d94421f06888e6d147d3fbe2c5e Mon Sep 17 00:00:00 2001 From: Luis Hector Chavez Date: Wed, 2 May 2018 09:10:29 -0700 Subject: [PATCH 2/3] adb: Improve test_adb a bit more This change: * uses unittest.main(), which allows for a subset of the tests to be selected. * drops the requirement to have a device already connected since all the tests that need a device now spin their own mock device. * Splits the monolithic test class into more granular classes. * Makes this file be pylint-compliant. Bug: None Test: python system/core/adb/test_adb.py Test: pylint system/core/adb/test_adb.py Change-Id: I91c7ced520c3c69f855d639e0dbf7e57bb690e97 --- adb/test_adb.py | 161 +++++++++++++++++++++++++++--------------------- 1 file changed, 91 insertions(+), 70 deletions(-) diff --git a/adb/test_adb.py b/adb/test_adb.py index ce4d4ecfe..8f31a5366 100644 --- a/adb/test_adb.py +++ b/adb/test_adb.py @@ -36,10 +36,11 @@ import adb @contextlib.contextmanager -def fake_adb_server(protocol=socket.AF_INET, port=0): - """Creates a fake ADB server that just replies with a CNXN packet.""" +def fake_adbd(protocol=socket.AF_INET, port=0): + """Creates a fake ADB daemon that just replies with a CNXN packet.""" serversock = socket.socket(protocol, socket.SOCK_STREAM) + serversock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if protocol == socket.AF_INET: serversock.bind(('127.0.0.1', port)) else: @@ -60,33 +61,33 @@ def fake_adb_server(protocol=socket.AF_INET, port=0): rlist = [readpipe, serversock] cnxn_sent = {} while True: - ready, _, _ = select.select(rlist, [], []) - for r in ready: - if r == readpipe: + read_ready, _, _ = select.select(rlist, [], []) + for ready in read_ready: + if ready == readpipe: # Closure pipe - os.close(r) + os.close(ready) serversock.shutdown(socket.SHUT_RDWR) serversock.close() return - elif r == serversock: + elif ready == serversock: # Server socket - conn, _ = r.accept() + conn, _ = ready.accept() rlist.append(conn) else: # Client socket - data = r.recv(1024) + data = ready.recv(1024) if not data or data.startswith('OPEN'): - if r in cnxn_sent: - del cnxn_sent[r] - r.shutdown(socket.SHUT_RDWR) - r.close() - rlist.remove(r) + if ready in cnxn_sent: + del cnxn_sent[ready] + ready.shutdown(socket.SHUT_RDWR) + ready.close() + rlist.remove(ready) continue - if r in cnxn_sent: + if ready in cnxn_sent: continue - cnxn_sent[r] = True - r.sendall(_adb_packet('CNXN', 0x01000001, 1024 * 1024, - 'device::ro.product.name=fakeadb')) + cnxn_sent[ready] = True + ready.sendall(_adb_packet('CNXN', 0x01000001, 1024 * 1024, + 'device::ro.product.name=fakeadb')) port = serversock.getsockname()[1] server_thread = threading.Thread(target=_handle) @@ -113,13 +114,13 @@ def adb_connect(unittest, serial): yield finally: # Perform best-effort disconnection. Discard the output. - p = subprocess.Popen(['adb', 'disconnect', serial], - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - p.communicate() + subprocess.Popen(['adb', 'disconnect', serial], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE).communicate() -class NonApiTest(unittest.TestCase): - """Tests for ADB that aren't a part of the AndroidDevice API.""" +class CommandlineTest(unittest.TestCase): + """Tests for the ADB commandline.""" def test_help(self): """Make sure we get _something_ out of help.""" @@ -141,28 +142,37 @@ class NonApiTest(unittest.TestCase): revision_line, r'^Revision [0-9a-f]{12}-android$') def test_tcpip_error_messages(self): - p = subprocess.Popen(['adb', 'tcpip'], stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - out, _ = p.communicate() - self.assertEqual(1, p.returncode) + """Make sure 'adb tcpip' parsing is sane.""" + proc = subprocess.Popen(['adb', 'tcpip'], stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + out, _ = proc.communicate() + self.assertEqual(1, proc.returncode) self.assertIn('requires an argument', out) - p = subprocess.Popen(['adb', 'tcpip', 'foo'], stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - out, _ = p.communicate() - self.assertEqual(1, p.returncode) + proc = subprocess.Popen(['adb', 'tcpip', 'foo'], stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + out, _ = proc.communicate() + self.assertEqual(1, proc.returncode) self.assertIn('invalid port', out) - # Helper method that reads a pipe until it is closed, then sets the event. - def _read_pipe_and_set_event(self, pipe, event): - x = pipe.read() + +class ServerTest(unittest.TestCase): + """Tests for the ADB server.""" + + @staticmethod + def _read_pipe_and_set_event(pipe, event): + """Reads a pipe until it is closed, then sets the event.""" + pipe.read() event.set() - # Test that launch_server() does not let the adb server inherit - # stdin/stdout/stderr handles which can cause callers of adb.exe to hang. - # This test also runs fine on unix even though the impetus is an issue - # unique to Windows. def test_handle_inheritance(self): + """Test that launch_server() does not inherit handles. + + launch_server() should not let the adb server inherit + stdin/stdout/stderr handles, which can cause callers of adb.exe to hang. + This test also runs fine on unix even though the impetus is an issue + unique to Windows. + """ # This test takes 5 seconds to run on Windows: if there is no adb server # running on the the port used below, adb kill-server tries to make a # TCP connection to a closed port and that takes 1 second on Windows; @@ -184,29 +194,30 @@ class NonApiTest(unittest.TestCase): try: # Run the adb client and have it start the adb server. - p = subprocess.Popen(['adb', '-P', str(port), 'start-server'], - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + proc = subprocess.Popen(['adb', '-P', str(port), 'start-server'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) # Start threads that set events when stdout/stderr are closed. stdout_event = threading.Event() stdout_thread = threading.Thread( - target=self._read_pipe_and_set_event, - args=(p.stdout, stdout_event)) + target=ServerTest._read_pipe_and_set_event, + args=(proc.stdout, stdout_event)) stdout_thread.daemon = True stdout_thread.start() stderr_event = threading.Event() stderr_thread = threading.Thread( - target=self._read_pipe_and_set_event, - args=(p.stderr, stderr_event)) + target=ServerTest._read_pipe_and_set_event, + args=(proc.stderr, stderr_event)) stderr_thread.daemon = True stderr_thread.start() # Wait for the adb client to finish. Once that has occurred, if # stdin/stderr/stdout are still open, it must be open in the adb # server. - p.wait() + proc.wait() # Try to write to stdin which we expect is closed. If it isn't # closed, we should get an IOError. If we don't get an IOError, @@ -214,7 +225,7 @@ class NonApiTest(unittest.TestCase): # probably letting the adb server inherit stdin which would be # wrong. with self.assertRaises(IOError): - p.stdin.write('x') + proc.stdin.write('x') # Wait a few seconds for stdout/stderr to be closed (in the success # case, this won't wait at all). If there is a timeout, that means @@ -228,8 +239,12 @@ class NonApiTest(unittest.TestCase): subprocess.check_output(['adb', '-P', str(port), 'kill-server'], stderr=subprocess.STDOUT) - # Use SO_LINGER to cause TCP RST segment to be sent on socket close. + +class EmulatorTest(unittest.TestCase): + """Tests for the emulator connection.""" + def _reset_socket_on_close(self, sock): + """Use SO_LINGER to cause TCP RST segment to be sent on socket close.""" # The linger structure is two shorts on Windows, but two ints on Unix. linger_format = 'hh' if os.name == 'nt' else 'ii' l_onoff = 1 @@ -248,7 +263,7 @@ class NonApiTest(unittest.TestCase): Bug: https://code.google.com/p/android/issues/detail?id=21021 """ with contextlib.closing( - socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: + socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: # Use SO_REUSEADDR so subsequent runs of the test can grab the port # even if it is in TIME_WAIT. listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -258,7 +273,7 @@ class NonApiTest(unittest.TestCase): # Now that listening has started, start adb emu kill, telling it to # connect to our mock emulator. - p = subprocess.Popen( + proc = subprocess.Popen( ['adb', '-s', 'emulator-' + str(port), 'emu', 'kill'], stderr=subprocess.STDOUT) @@ -267,12 +282,16 @@ class NonApiTest(unittest.TestCase): # If WSAECONNABORTED (10053) is raised by any socket calls, # then adb probably isn't reading the data that we sent it. conn.sendall('Android Console: type \'help\' for a list ' + - 'of commands\r\n') + 'of commands\r\n') conn.sendall('OK\r\n') - with contextlib.closing(conn.makefile()) as f: - self.assertEqual('kill\n', f.readline()) - self.assertEqual('quit\n', f.readline()) + with contextlib.closing(conn.makefile()) as connf: + line = connf.readline() + if line.startswith('auth'): + # Ignore the first auth line. + line = connf.readline() + self.assertEqual('kill\n', line) + self.assertEqual('quit\n', connf.readline()) conn.sendall('OK: killing emulator, bye bye\r\n') @@ -285,11 +304,15 @@ class NonApiTest(unittest.TestCase): self._reset_socket_on_close(conn) # Wait for adb to finish, so we can check return code. - p.communicate() + proc.communicate() # If this fails, adb probably isn't ignoring WSAECONNRESET when # reading the response from the adb emu kill command (on Windows). - self.assertEqual(0, p.returncode) + self.assertEqual(0, proc.returncode) + + +class ConnectionTest(unittest.TestCase): + """Tests for adb connect.""" def test_connect_ipv4_ipv6(self): """Ensure that `adb connect localhost:1234` will try both IPv4 and IPv6. @@ -298,7 +321,7 @@ class NonApiTest(unittest.TestCase): """ for protocol in (socket.AF_INET, socket.AF_INET6): try: - with fake_adb_server(protocol=protocol) as port: + with fake_adbd(protocol=protocol) as port: serial = 'localhost:{}'.format(port) with adb_connect(self, serial): pass @@ -309,7 +332,7 @@ class NonApiTest(unittest.TestCase): def test_already_connected(self): """Ensure that an already-connected device stays connected.""" - with fake_adb_server() as port: + with fake_adbd() as port: serial = 'localhost:{}'.format(port) with adb_connect(self, serial): # b/31250450: this always returns 0 but probably shouldn't. @@ -320,7 +343,7 @@ class NonApiTest(unittest.TestCase): def test_reconnect(self): """Ensure that a disconnected device reconnects.""" - with fake_adb_server() as port: + with fake_adbd() as port: serial = 'localhost:{}'.format(port) with adb_connect(self, serial): output = subprocess.check_output(['adb', '-s', serial, @@ -328,10 +351,10 @@ class NonApiTest(unittest.TestCase): self.assertEqual(output.strip(), 'device') # This will fail. - p = subprocess.Popen(['adb', '-s', serial, 'shell', 'true'], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - output, _ = p.communicate() + proc = subprocess.Popen(['adb', '-s', serial, 'shell', 'true'], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + output, _ = proc.communicate() self.assertEqual(output.strip(), 'error: closed') subprocess.check_call(['adb', '-s', serial, 'wait-for-device']) @@ -349,18 +372,16 @@ class NonApiTest(unittest.TestCase): subprocess.check_output(['adb', '-s', serial, 'get-state'], stderr=subprocess.STDOUT) self.fail('Device should not be available') - except subprocess.CalledProcessError as e: + except subprocess.CalledProcessError as err: self.assertEqual( - e.output.strip(), + err.output.strip(), 'error: device \'{}\' not found'.format(serial)) + def main(): + """Main entrypoint.""" random.seed(0) - if len(adb.get_devices()) > 0: - suite = unittest.TestLoader().loadTestsFromName(__name__) - unittest.TextTestRunner(verbosity=3).run(suite) - else: - print('Test suite must be run with attached devices') + unittest.main(verbosity=3) if __name__ == '__main__': From 32559741699d46820ff9409445a21fdce012e03a Mon Sep 17 00:00:00 2001 From: Luis Hector Chavez Date: Wed, 2 May 2018 10:47:01 -0700 Subject: [PATCH 3/3] adb: Add a test for emulator connection This should prevent regressions in the future. Bug: 78991667 Test: python system/core/adb/test_adb.py Change-Id: I4d6da40da82c6d79797cec82ffaf071d4b56ddc7 --- adb/test_adb.py | 58 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/adb/test_adb.py b/adb/test_adb.py index 8f31a5366..ddd3ff041 100644 --- a/adb/test_adb.py +++ b/adb/test_adb.py @@ -119,6 +119,31 @@ def adb_connect(unittest, serial): stderr=subprocess.PIPE).communicate() +@contextlib.contextmanager +def adb_server(): + """Context manager for an ADB server. + + This creates an ADB server and returns the port it's listening on. + """ + + port = 5038 + # Kill any existing server on this non-default port. + subprocess.check_output(['adb', '-P', str(port), 'kill-server'], + stderr=subprocess.STDOUT) + read_pipe, write_pipe = os.pipe() + proc = subprocess.Popen(['adb', '-L', 'tcp:localhost:{}'.format(port), + 'fork-server', 'server', + '--reply-fd', str(write_pipe)]) + try: + os.close(write_pipe) + greeting = os.read(read_pipe, 1024) + assert greeting == 'OK\n', repr(greeting) + yield port + finally: + proc.terminate() + proc.wait() + + class CommandlineTest(unittest.TestCase): """Tests for the ADB commandline.""" @@ -310,6 +335,39 @@ class EmulatorTest(unittest.TestCase): # reading the response from the adb emu kill command (on Windows). self.assertEqual(0, proc.returncode) + def test_emulator_connect(self): + """Ensure that the emulator can connect. + + Bug: http://b/78991667 + """ + with adb_server() as server_port: + with fake_adbd() as port: + serial = 'emulator-{}'.format(port - 1) + # Ensure that the emulator is not there. + try: + subprocess.check_output(['adb', '-P', str(server_port), + '-s', serial, 'get-state'], + stderr=subprocess.STDOUT) + self.fail('Device should not be available') + except subprocess.CalledProcessError as err: + self.assertEqual( + err.output.strip(), + 'error: device \'{}\' not found'.format(serial)) + + # Let the ADB server know that the emulator has started. + with contextlib.closing( + socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.connect(('localhost', server_port)) + command = 'host:emulator:{}'.format(port) + sock.sendall('%04x%s' % (len(command), command)) + + # Ensure the emulator is there. + subprocess.check_call(['adb', '-P', str(server_port), + '-s', serial, 'wait-for-device']) + output = subprocess.check_output(['adb', '-P', str(server_port), + '-s', serial, 'get-state']) + self.assertEqual(output.strip(), 'device') + class ConnectionTest(unittest.TestCase): """Tests for adb connect."""