From c251ec55d3e4ed3647b8631bb09d00a2666d17a9 Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Tue, 3 Apr 2018 12:55:18 -0700 Subject: [PATCH] adb: don't abort when connecting to the same address twice. When connecting to an address, we construct a transport first, and then check whether we've already connected to that address. The consequent destruction of the BlockingConnectionAdapter attempts to join threads that haven't been started, which aborts. Make it safe to destruct a BlockingConnectionAdapter without calling Start on it first, to solve this. Bug: http://b/69137547 Test: nc -l 12345 & (adb connect localhost:12345; adb connect localhost:12345) Test: python test_adb.py Change-Id: I6cb968a62dbac6332907e06575893d764905ee62 --- adb/test_adb.py | 23 +++++++++++++++++--- adb/transport.cpp | 55 ++++++++++++++++++++++++++++++++++++----------- adb/transport.h | 10 +++++---- 3 files changed, 69 insertions(+), 19 deletions(-) diff --git a/adb/test_adb.py b/adb/test_adb.py index e7711062c..3bb433ddd 100644 --- a/adb/test_adb.py +++ b/adb/test_adb.py @@ -162,15 +162,14 @@ class NonApiTest(unittest.TestCase): Bug: https://code.google.com/p/android/issues/detail?id=21021 """ - port = 12345 - with contextlib.closing( socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: # Use SO_REUSEADDR so subsequent runs of the test can grab the port # even if it is in TIME_WAIT. listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listener.bind(('127.0.0.1', port)) + listener.bind(('127.0.0.1', 0)) listener.listen(4) + port = listener.getsockname()[1] # Now that listening has started, start adb emu kill, telling it to # connect to our mock emulator. @@ -233,6 +232,24 @@ class NonApiTest(unittest.TestCase): output.strip(), 'connected to localhost:{}'.format(port)) s.close() + def test_already_connected(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('127.0.0.1', 0)) + s.listen(2) + + port = s.getsockname()[1] + output = subprocess.check_output( + ['adb', 'connect', 'localhost:{}'.format(port)]) + + self.assertEqual( + output.strip(), 'connected to localhost:{}'.format(port)) + + # b/31250450: this always returns 0 but probably shouldn't. + output = subprocess.check_output( + ['adb', 'connect', 'localhost:{}'.format(port)]) + + self.assertEqual( + output.strip(), 'already connected to localhost:{}'.format(port)) def main(): random.seed(0) diff --git a/adb/transport.cpp b/adb/transport.cpp index 37b56e28f..2867d3837 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -77,7 +77,15 @@ BlockingConnectionAdapter::~BlockingConnectionAdapter() { Stop(); } +static void AssumeLocked(std::mutex& mutex) ASSERT_CAPABILITY(mutex) {} + void BlockingConnectionAdapter::Start() { + std::lock_guard lock(mutex_); + if (started_) { + LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_ + << "): started multiple times"; + } + read_thread_ = std::thread([this]() { LOG(INFO) << this->transport_name_ << ": read thread spawning"; while (true) { @@ -95,7 +103,11 @@ void BlockingConnectionAdapter::Start() { LOG(INFO) << this->transport_name_ << ": write thread spawning"; while (true) { std::unique_lock lock(mutex_); - cv_.wait(lock, [this]() { return this->stopped_ || !this->write_queue_.empty(); }); + cv_.wait(lock, [this]() REQUIRES(mutex_) { + return this->stopped_ || !this->write_queue_.empty(); + }); + + AssumeLocked(mutex_); if (this->stopped_) { return; @@ -111,25 +123,44 @@ void BlockingConnectionAdapter::Start() { } std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); }); }); + + started_ = true; } void BlockingConnectionAdapter::Stop() { - std::unique_lock lock(mutex_); - if (stopped_) { - LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): already stopped"; - return; - } + { + std::lock_guard lock(mutex_); + if (!started_) { + LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started"; + return; + } - stopped_ = true; - lock.unlock(); + if (stopped_) { + LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ + << "): already stopped"; + return; + } + + stopped_ = true; + } LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping"; this->underlying_->Close(); - this->cv_.notify_one(); - read_thread_.join(); - write_thread_.join(); + + // Move the threads out into locals with the lock taken, and then unlock to let them exit. + std::thread read_thread; + std::thread write_thread; + + { + std::lock_guard lock(mutex_); + read_thread = std::move(read_thread_); + write_thread = std::move(write_thread_); + } + + read_thread.join(); + write_thread.join(); LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped"; std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); }); @@ -137,7 +168,7 @@ void BlockingConnectionAdapter::Stop() { bool BlockingConnectionAdapter::Write(std::unique_ptr packet) { { - std::unique_lock lock(this->mutex_); + std::lock_guard lock(this->mutex_); write_queue_.emplace_back(std::move(packet)); } diff --git a/adb/transport.h b/adb/transport.h index 8b71e3405..d18c36232 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -30,6 +30,7 @@ #include #include +#include #include #include "adb.h" @@ -121,13 +122,14 @@ struct BlockingConnectionAdapter : public Connection { virtual void Start() override final; virtual void Stop() override final; - bool stopped_ = false; + bool started_ GUARDED_BY(mutex_) = false; + bool stopped_ GUARDED_BY(mutex_) = false; std::unique_ptr underlying_; - std::thread read_thread_; - std::thread write_thread_; + std::thread read_thread_ GUARDED_BY(mutex_); + std::thread write_thread_ GUARDED_BY(mutex_); - std::deque> write_queue_; + std::deque> write_queue_ GUARDED_BY(mutex_); std::mutex mutex_; std::condition_variable cv_;