From 672cdfeeff8ad64622a529ed0c4d1b6923df7539 Mon Sep 17 00:00:00 2001 From: Josh Gao Date: Sat, 21 Mar 2020 16:44:09 -0700 Subject: [PATCH] adb: fix sync. adbd's file sync service doesn't handle a full socket gracefully, immediately terminating the service as soon as it fails to write a response. This would generally be fine if the socket's buffer were as large as it claims (212992 by default with a 64-bit kernel), but this buffer size is a giant lie, as each write has 576 bytes of overhead that's used up in the send buffer. When setting the send buffer size, the kernel helpfully doubles the value to attempt to account for the overhead, but when writing 8 byte responses, only 2% of the buffer actually gets used for responses, so we run out of buffer after 364 files instead of the 26624 that would be expected. Fix this by processing the responses as they become available, and calculate a maximum limit to how many sends we dispatch before we stop and wait for responses to come in. Test: manually modified adbd to respond with giant error messages, and modified adb to not read responses until we choose to block Change-Id: Ieb8c935662864211e2fd16c337ffed0992990086 --- adb/client/file_sync_client.cpp | 132 ++++++++++++++++++++++++-------- 1 file changed, 99 insertions(+), 33 deletions(-) diff --git a/adb/client/file_sync_client.cpp b/adb/client/file_sync_client.cpp index 04ad5363d..d201239e2 100644 --- a/adb/client/file_sync_client.cpp +++ b/adb/client/file_sync_client.cpp @@ -204,7 +204,8 @@ struct TransferLedger { class SyncConnection { public: - SyncConnection() { + SyncConnection() : acknowledgement_buffer_(sizeof(sync_status) + SYNC_DATA_MAX) { + acknowledgement_buffer_.resize(0); max = SYNC_DATA_MAX; // TODO: decide at runtime. std::string error; @@ -502,34 +503,6 @@ class SyncConnection { return WriteOrDie(lpath, rpath, &msg.data, sizeof(msg.data)); } - bool ReadAcknowledgments() { - bool result = true; - while (!deferred_acknowledgements_.empty()) { - auto [from, to] = std::move(deferred_acknowledgements_.front()); - deferred_acknowledgements_.pop_front(); - result &= CopyDone(from, to); - } - return result; - } - - bool CopyDone(const std::string& from, const std::string& to) { - syncmsg msg; - if (!ReadFdExactly(fd, &msg.status, sizeof(msg.status))) { - Error("failed to copy '%s' to '%s': couldn't read from device", from.c_str(), - to.c_str()); - return false; - } - if (msg.status.id == ID_OKAY) { - return true; - } - if (msg.status.id != ID_FAIL) { - Error("failed to copy '%s' to '%s': unknown reason %d", from.c_str(), to.c_str(), - msg.status.id); - return false; - } - return ReportCopyFailure(from, to, msg); - } - bool ReportCopyFailure(const std::string& from, const std::string& to, const syncmsg& msg) { std::vector buf(msg.status.msglen + 1); if (!ReadFdExactly(fd, &buf[0], msg.status.msglen)) { @@ -542,6 +515,97 @@ class SyncConnection { return false; } + void CopyDone() { deferred_acknowledgements_.pop_front(); } + + void ReportDeferredCopyFailure(const std::string& msg) { + auto& [from, to] = deferred_acknowledgements_.front(); + Error("failed to copy '%s' to '%s': remote %s", from.c_str(), to.c_str(), msg.c_str()); + deferred_acknowledgements_.pop_front(); + } + + bool ReadAcknowledgements(bool read_all = false) { + // We need to read enough such that adbd's intermediate socket's write buffer can't be + // full. The default buffer on Linux is 212992 bytes, but there's 576 bytes of bookkeeping + // overhead per write. The worst case scenario is a continuous string of failures, since + // each logical packet is divided into two writes. If our packet size if conservatively 512 + // bytes long, this leaves us with space for 128 responses. + constexpr size_t max_deferred_acks = 128; + auto& buf = acknowledgement_buffer_; + adb_pollfd pfd = {.fd = fd.get(), .events = POLLIN}; + while (!deferred_acknowledgements_.empty()) { + bool should_block = read_all || deferred_acknowledgements_.size() >= max_deferred_acks; + + ssize_t rc = adb_poll(&pfd, 1, should_block ? -1 : 0); + if (rc == 0) { + CHECK(!should_block); + return true; + } + + if (acknowledgement_buffer_.size() < sizeof(sync_status)) { + const ssize_t header_bytes_left = sizeof(sync_status) - buf.size(); + ssize_t rc = adb_read(fd, buf.end(), header_bytes_left); + if (rc <= 0) { + Error("failed to read copy response"); + return false; + } + + buf.resize(buf.size() + rc); + if (rc != header_bytes_left) { + // Early exit if we run out of data in the socket. + return true; + } + + if (!should_block) { + // We don't want to read again yet, because the socket might be empty. + continue; + } + } + + auto* hdr = reinterpret_cast(buf.data()); + if (hdr->id == ID_OKAY) { + buf.resize(0); + if (hdr->msglen != 0) { + Error("received ID_OKAY with msg_len (%" PRIu32 " != 0", hdr->msglen); + return false; + } + CopyDone(); + continue; + } else if (hdr->id != ID_FAIL) { + Error("unexpected response from daemon: id = %#" PRIx32, hdr->id); + return false; + } else if (hdr->msglen > SYNC_DATA_MAX) { + Error("too-long message length from daemon: msglen = %" PRIu32, hdr->msglen); + return false; + } + + const ssize_t msg_bytes_left = hdr->msglen + sizeof(sync_status) - buf.size(); + CHECK_GE(msg_bytes_left, 0); + if (msg_bytes_left > 0) { + ssize_t rc = adb_read(fd, buf.end(), msg_bytes_left); + if (rc <= 0) { + Error("failed to read copy failure message"); + return false; + } + + buf.resize(buf.size() + rc); + if (rc != msg_bytes_left) { + if (should_block) { + continue; + } else { + return true; + } + } + + std::string msg(buf.begin() + sizeof(sync_status), buf.end()); + ReportDeferredCopyFailure(msg); + buf.resize(0); + return false; + } + } + + return true; + } + void Printf(const char* fmt, ...) __attribute__((__format__(__printf__, 2, 3))) { std::string s; @@ -608,6 +672,7 @@ class SyncConnection { private: std::deque> deferred_acknowledgements_; + Block acknowledgement_buffer_; FeatureSet features_; bool have_stat_v2_; bool have_ls_v2_; @@ -716,7 +781,7 @@ static bool sync_send(SyncConnection& sc, const std::string& lpath, const std::s if (!sc.SendSmallFile(rpath, mode, lpath, rpath, mtime, buf, data_length)) { return false; } - return true; + return sc.ReadAcknowledgements(); #endif } @@ -739,7 +804,7 @@ static bool sync_send(SyncConnection& sc, const std::string& lpath, const std::s return false; } } - return true; + return sc.ReadAcknowledgements(); } static bool sync_recv(SyncConnection& sc, const char* rpath, const char* lpath, @@ -966,8 +1031,9 @@ static bool copy_local_dir_remote(SyncConnection& sc, std::string lpath, } sc.RecordFilesSkipped(skipped); + bool success = sc.ReadAcknowledgements(true); sc.ReportTransferRate(lpath, TransferDirection::push); - return true; + return success; } bool do_sync_push(const std::vector& srcs, const char* dst, bool sync) { @@ -1060,7 +1126,7 @@ bool do_sync_push(const std::vector& srcs, const char* dst, bool sy sc.ReportTransferRate(src_path, TransferDirection::push); } - success &= sc.ReadAcknowledgments(); + success &= sc.ReadAcknowledgements(true); sc.ReportOverallTransferRate(TransferDirection::push); return success; }