diff --git a/adb/test_adb.py b/adb/test_adb.py index e7711062c..3bb433ddd 100644 --- a/adb/test_adb.py +++ b/adb/test_adb.py @@ -162,15 +162,14 @@ class NonApiTest(unittest.TestCase): Bug: https://code.google.com/p/android/issues/detail?id=21021 """ - port = 12345 - with contextlib.closing( socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as listener: # Use SO_REUSEADDR so subsequent runs of the test can grab the port # even if it is in TIME_WAIT. listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - listener.bind(('127.0.0.1', port)) + listener.bind(('127.0.0.1', 0)) listener.listen(4) + port = listener.getsockname()[1] # Now that listening has started, start adb emu kill, telling it to # connect to our mock emulator. @@ -233,6 +232,24 @@ class NonApiTest(unittest.TestCase): output.strip(), 'connected to localhost:{}'.format(port)) s.close() + def test_already_connected(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('127.0.0.1', 0)) + s.listen(2) + + port = s.getsockname()[1] + output = subprocess.check_output( + ['adb', 'connect', 'localhost:{}'.format(port)]) + + self.assertEqual( + output.strip(), 'connected to localhost:{}'.format(port)) + + # b/31250450: this always returns 0 but probably shouldn't. + output = subprocess.check_output( + ['adb', 'connect', 'localhost:{}'.format(port)]) + + self.assertEqual( + output.strip(), 'already connected to localhost:{}'.format(port)) def main(): random.seed(0) diff --git a/adb/transport.cpp b/adb/transport.cpp index 37b56e28f..2867d3837 100644 --- a/adb/transport.cpp +++ b/adb/transport.cpp @@ -77,7 +77,15 @@ BlockingConnectionAdapter::~BlockingConnectionAdapter() { Stop(); } +static void AssumeLocked(std::mutex& mutex) ASSERT_CAPABILITY(mutex) {} + void BlockingConnectionAdapter::Start() { + std::lock_guard lock(mutex_); + if (started_) { + LOG(FATAL) << "BlockingConnectionAdapter(" << this->transport_name_ + << "): started multiple times"; + } + read_thread_ = std::thread([this]() { LOG(INFO) << this->transport_name_ << ": read thread spawning"; while (true) { @@ -95,7 +103,11 @@ void BlockingConnectionAdapter::Start() { LOG(INFO) << this->transport_name_ << ": write thread spawning"; while (true) { std::unique_lock lock(mutex_); - cv_.wait(lock, [this]() { return this->stopped_ || !this->write_queue_.empty(); }); + cv_.wait(lock, [this]() REQUIRES(mutex_) { + return this->stopped_ || !this->write_queue_.empty(); + }); + + AssumeLocked(mutex_); if (this->stopped_) { return; @@ -111,25 +123,44 @@ void BlockingConnectionAdapter::Start() { } std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "write failed"); }); }); + + started_ = true; } void BlockingConnectionAdapter::Stop() { - std::unique_lock lock(mutex_); - if (stopped_) { - LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): already stopped"; - return; - } + { + std::lock_guard lock(mutex_); + if (!started_) { + LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): not started"; + return; + } - stopped_ = true; - lock.unlock(); + if (stopped_) { + LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ + << "): already stopped"; + return; + } + + stopped_ = true; + } LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopping"; this->underlying_->Close(); - this->cv_.notify_one(); - read_thread_.join(); - write_thread_.join(); + + // Move the threads out into locals with the lock taken, and then unlock to let them exit. + std::thread read_thread; + std::thread write_thread; + + { + std::lock_guard lock(mutex_); + read_thread = std::move(read_thread_); + write_thread = std::move(write_thread_); + } + + read_thread.join(); + write_thread.join(); LOG(INFO) << "BlockingConnectionAdapter(" << this->transport_name_ << "): stopped"; std::call_once(this->error_flag_, [this]() { this->error_callback_(this, "requested stop"); }); @@ -137,7 +168,7 @@ void BlockingConnectionAdapter::Stop() { bool BlockingConnectionAdapter::Write(std::unique_ptr packet) { { - std::unique_lock lock(this->mutex_); + std::lock_guard lock(this->mutex_); write_queue_.emplace_back(std::move(packet)); } diff --git a/adb/transport.h b/adb/transport.h index 8b71e3405..d18c36232 100644 --- a/adb/transport.h +++ b/adb/transport.h @@ -30,6 +30,7 @@ #include #include +#include #include #include "adb.h" @@ -121,13 +122,14 @@ struct BlockingConnectionAdapter : public Connection { virtual void Start() override final; virtual void Stop() override final; - bool stopped_ = false; + bool started_ GUARDED_BY(mutex_) = false; + bool stopped_ GUARDED_BY(mutex_) = false; std::unique_ptr underlying_; - std::thread read_thread_; - std::thread write_thread_; + std::thread read_thread_ GUARDED_BY(mutex_); + std::thread write_thread_ GUARDED_BY(mutex_); - std::deque> write_queue_; + std::deque> write_queue_ GUARDED_BY(mutex_); std::mutex mutex_; std::condition_variable cv_;