Merge "adb: Add a way to distinguish between connection failures and successes"

am: 65027d9d80

Change-Id: I1c1a7e837c1d67284bec76a9169cd71707845edc
This commit is contained in:
android-build-prod (mdb) 2018-04-27 13:08:00 -07:00 committed by android-build-merger
commit 70d6050321
4 changed files with 111 additions and 8 deletions

View File

@ -132,6 +132,7 @@ void handle_online(atransport *t)
{
D("adb: online");
t->online = 1;
t->SetConnectionEstablished(true);
}
void handle_offline(atransport *t)

View File

@ -49,8 +49,16 @@ def fake_adb_server(protocol=socket.AF_INET, port=0):
# A pipe that is used to signal the thread that it should terminate.
readpipe, writepipe = os.pipe()
def _adb_packet(command, arg0, arg1, data):
bin_command = struct.unpack('I', command)[0]
buf = struct.pack('IIIIII', bin_command, arg0, arg1, len(data), 0,
bin_command ^ 0xffffffff)
buf += data
return buf
def _handle():
rlist = [readpipe, serversock]
cnxn_sent = {}
while True:
ready, _, _ = select.select(rlist, [], [])
for r in ready:
@ -68,7 +76,15 @@ def fake_adb_server(protocol=socket.AF_INET, port=0):
# Client socket
data = r.recv(1024)
if not data:
if r in cnxn_sent:
del cnxn_sent[r]
rlist.remove(r)
continue
if r in cnxn_sent:
continue
cnxn_sent[r] = True
r.sendall(_adb_packet('CNXN', 0x01000001, 1024 * 1024,
'device::ro.product.name=fakeadb'))
port = serversock.getsockname()[1]
server_thread = threading.Thread(target=_handle)

View File

@ -64,6 +64,21 @@ const char* const kFeatureStat2 = "stat_v2";
const char* const kFeatureLibusb = "libusb";
const char* const kFeaturePushSync = "push_sync";
namespace {
// A class that helps the Clang Thread Safety Analysis deal with
// std::unique_lock. Given that std::unique_lock is movable, and the analysis
// can not currently perform alias analysis, it is not annotated. In order to
// assert that the mutex is held, a ScopedAssumeLocked can be created just after
// the std::unique_lock.
class SCOPED_CAPABILITY ScopedAssumeLocked {
public:
ScopedAssumeLocked(std::mutex& mutex) ACQUIRE(mutex) {}
~ScopedAssumeLocked() RELEASE() {}
};
} // namespace
TransportId NextTransportId() {
static std::atomic<TransportId> next(1);
return next++;
@ -77,8 +92,6 @@ BlockingConnectionAdapter::~BlockingConnectionAdapter() {
Stop();
}
static void AssumeLocked(std::mutex& mutex) ASSERT_CAPABILITY(mutex) {}
void BlockingConnectionAdapter::Start() {
std::lock_guard<std::mutex> lock(mutex_);
if (started_) {
@ -103,12 +116,11 @@ void BlockingConnectionAdapter::Start() {
LOG(INFO) << this->transport_name_ << ": write thread spawning";
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
ScopedAssumeLocked assume_locked(mutex_);
cv_.wait(lock, [this]() REQUIRES(mutex_) {
return this->stopped_ || !this->write_queue_.empty();
});
AssumeLocked(mutex_);
if (this->stopped_) {
return;
}
@ -721,6 +733,30 @@ atransport* acquire_one_transport(TransportType type, const char* serial, Transp
return result;
}
bool ConnectionWaitable::WaitForConnection(std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mutex_);
ScopedAssumeLocked assume_locked(mutex_);
return cv_.wait_for(lock, timeout, [&]() REQUIRES(mutex_) {
return connection_established_ready_;
}) && connection_established_;
}
void ConnectionWaitable::SetConnectionEstablished(bool success) {
{
std::lock_guard<std::mutex> lock(mutex_);
if (connection_established_ready_) return;
connection_established_ready_ = true;
connection_established_ = success;
D("connection established with %d", success);
}
cv_.notify_one();
}
atransport::~atransport() {
// If the connection callback had not been run before, run it now.
SetConnectionEstablished(false);
}
int atransport::Write(apacket* p) {
return this->connection->Write(std::unique_ptr<apacket>(p)) ? 0 : -1;
}
@ -873,6 +909,10 @@ bool atransport::MatchesTarget(const std::string& target) const {
qual_match(target.c_str(), "device:", device, false);
}
void atransport::SetConnectionEstablished(bool success) {
connection_waitable_->SetConnectionEstablished(success);
}
#if ADB_HOST
// We use newline as our delimiter, make sure to never output it.
@ -992,8 +1032,10 @@ int register_socket_transport(int s, const char* serial, int port, int local) {
lock.unlock();
auto waitable = t->connection_waitable();
register_transport(t);
return 0;
return waitable->WaitForConnection(std::chrono::seconds(10)) ? 0 : -1;
}
#if ADB_HOST

View File

@ -20,6 +20,7 @@
#include <sys/types.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <deque>
#include <functional>
@ -30,6 +31,7 @@
#include <thread>
#include <unordered_set>
#include <android-base/macros.h>
#include <android-base/thread_annotations.h>
#include <openssl/rsa.h>
@ -160,6 +162,35 @@ struct UsbConnection : public BlockingConnection {
usb_handle* handle_;
};
// Waits for a transport's connection to be not pending. This is a separate
// object so that the transport can be destroyed and another thread can be
// notified of it in a race-free way.
class ConnectionWaitable {
public:
ConnectionWaitable() = default;
~ConnectionWaitable() = default;
// Waits until the first CNXN packet has been received by the owning
// atransport, or the specified timeout has elapsed. Can be called from any
// thread.
//
// Returns true if the CNXN packet was received in a timely fashion, false
// otherwise.
bool WaitForConnection(std::chrono::milliseconds timeout);
// Can be called from any thread when the connection stops being pending.
// Only the first invocation will be acknowledged, the rest will be no-ops.
void SetConnectionEstablished(bool success);
private:
bool connection_established_ GUARDED_BY(mutex_) = false;
bool connection_established_ready_ GUARDED_BY(mutex_) = false;
std::mutex mutex_;
std::condition_variable cv_;
DISALLOW_COPY_AND_ASSIGN(ConnectionWaitable);
};
class atransport {
public:
// TODO(danalbert): We expose waaaaaaay too much stuff because this was
@ -168,13 +199,15 @@ class atransport {
// it's better to do this piece by piece.
atransport(ConnectionState state = kCsOffline)
: id(NextTransportId()), connection_state_(state) {
: id(NextTransportId()),
connection_state_(state),
connection_waitable_(std::make_shared<ConnectionWaitable>()) {
// Initialize protocol to min version for compatibility with older versions.
// Version will be updated post-connect.
protocol_version = A_VERSION_MIN;
max_payload = MAX_PAYLOAD;
}
virtual ~atransport() {}
virtual ~atransport();
int Write(apacket* p);
void Kick();
@ -241,7 +274,14 @@ class atransport {
// This is to make it easier to use the same network target for both fastboot and adb.
bool MatchesTarget(const std::string& target) const;
private:
// Notifies that the atransport is no longer waiting for the connection
// being established.
void SetConnectionEstablished(bool success);
// Gets a shared reference to the ConnectionWaitable.
std::shared_ptr<ConnectionWaitable> connection_waitable() { return connection_waitable_; }
private:
bool kicked_ = false;
// A set of features transmitted in the banner with the initial connection.
@ -258,6 +298,10 @@ private:
std::deque<std::shared_ptr<RSA>> keys_;
#endif
// A sharable object that can be used to wait for the atransport's
// connection to be established.
std::shared_ptr<ConnectionWaitable> connection_waitable_;
DISALLOW_COPY_AND_ASSIGN(atransport);
};