adb: add nonblocking fd Connection.
Implement a nonblocking version of FdConnection. The initial implementation will be somewhat slower than the blocking one for large packet sizes, due to an extra copy when coalescing an IOVector into an apacket, but is still substantially faster for small packets. Test: adb_benchmark Change-Id: I4900c9ddf685d3bd557b8cb43958452ecb23db53
This commit is contained in:
parent
e5aa7ee753
commit
6082e7dafb
|
@ -104,6 +104,7 @@ libadb_srcs = [
|
|||
"socket_spec.cpp",
|
||||
"sysdeps/errno.cpp",
|
||||
"transport.cpp",
|
||||
"transport_fd.cpp",
|
||||
"transport_local.cpp",
|
||||
"transport_usb.cpp",
|
||||
]
|
||||
|
|
|
@ -92,6 +92,8 @@ struct Connection {
|
|||
std::string transport_name_;
|
||||
ReadCallback read_callback_;
|
||||
ErrorCallback error_callback_;
|
||||
|
||||
static std::unique_ptr<Connection> FromFd(unique_fd fd);
|
||||
};
|
||||
|
||||
// Abstraction for a blocking packet transport.
|
||||
|
|
|
@ -24,13 +24,19 @@
|
|||
#include "sysdeps.h"
|
||||
#include "transport.h"
|
||||
|
||||
#define ADB_CONNECTION_BENCHMARK(benchmark_name, ...) \
|
||||
BENCHMARK_TEMPLATE(benchmark_name, FdConnection, ##__VA_ARGS__) \
|
||||
->Arg(1) \
|
||||
->Arg(16384) \
|
||||
->Arg(MAX_PAYLOAD) \
|
||||
#define ADB_CONNECTION_BENCHMARK(benchmark_name, ...) \
|
||||
BENCHMARK_TEMPLATE(benchmark_name, FdConnection, ##__VA_ARGS__) \
|
||||
->Arg(1) \
|
||||
->Arg(16384) \
|
||||
->Arg(MAX_PAYLOAD) \
|
||||
->UseRealTime(); \
|
||||
BENCHMARK_TEMPLATE(benchmark_name, NonblockingFdConnection, ##__VA_ARGS__) \
|
||||
->Arg(1) \
|
||||
->Arg(16384) \
|
||||
->Arg(MAX_PAYLOAD) \
|
||||
->UseRealTime()
|
||||
|
||||
struct NonblockingFdConnection;
|
||||
template <typename ConnectionType>
|
||||
std::unique_ptr<Connection> MakeConnection(unique_fd fd);
|
||||
|
||||
|
@ -40,6 +46,11 @@ std::unique_ptr<Connection> MakeConnection<FdConnection>(unique_fd fd) {
|
|||
return std::make_unique<BlockingConnectionAdapter>(std::move(fd_connection));
|
||||
}
|
||||
|
||||
template <>
|
||||
std::unique_ptr<Connection> MakeConnection<NonblockingFdConnection>(unique_fd fd) {
|
||||
return Connection::FromFd(std::move(fd));
|
||||
}
|
||||
|
||||
template <typename ConnectionType>
|
||||
void BM_Connection_Unidirectional(benchmark::State& state) {
|
||||
int fds[2];
|
||||
|
|
|
@ -0,0 +1,239 @@
|
|||
/*
|
||||
* Copyright (C) 2018 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include <android-base/logging.h>
|
||||
#include <android-base/stringprintf.h>
|
||||
#include <android-base/thread_annotations.h>
|
||||
|
||||
#include "adb_unique_fd.h"
|
||||
#include "adb_utils.h"
|
||||
#include "sysdeps.h"
|
||||
#include "sysdeps/memory.h"
|
||||
#include "transport.h"
|
||||
#include "types.h"
|
||||
|
||||
static void CreateWakeFds(unique_fd* read, unique_fd* write) {
|
||||
// TODO: eventfd on linux?
|
||||
int wake_fds[2];
|
||||
int rc = adb_socketpair(wake_fds);
|
||||
set_file_block_mode(wake_fds[0], false);
|
||||
set_file_block_mode(wake_fds[1], false);
|
||||
CHECK_EQ(0, rc);
|
||||
*read = unique_fd(wake_fds[0]);
|
||||
*write = unique_fd(wake_fds[1]);
|
||||
}
|
||||
|
||||
struct NonblockingFdConnection : public Connection {
|
||||
NonblockingFdConnection(unique_fd fd) : started_(false), fd_(std::move(fd)) {
|
||||
set_file_block_mode(fd_.get(), false);
|
||||
CreateWakeFds(&wake_fd_read_, &wake_fd_write_);
|
||||
}
|
||||
|
||||
void SetRunning(bool value) {
|
||||
std::lock_guard<std::mutex> lock(run_mutex_);
|
||||
running_ = value;
|
||||
}
|
||||
|
||||
bool IsRunning() {
|
||||
std::lock_guard<std::mutex> lock(run_mutex_);
|
||||
return running_;
|
||||
}
|
||||
|
||||
void Run(std::string* error) {
|
||||
SetRunning(true);
|
||||
while (IsRunning()) {
|
||||
adb_pollfd pfds[2] = {
|
||||
{.fd = fd_.get(), .events = POLLIN},
|
||||
{.fd = wake_fd_read_.get(), .events = POLLIN},
|
||||
};
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(this->write_mutex_);
|
||||
if (!writable_) {
|
||||
pfds[0].events |= POLLOUT;
|
||||
}
|
||||
}
|
||||
|
||||
int rc = adb_poll(pfds, 2, -1);
|
||||
if (rc == -1) {
|
||||
*error = android::base::StringPrintf("poll failed: %s", strerror(errno));
|
||||
return;
|
||||
} else if (rc == 0) {
|
||||
LOG(FATAL) << "poll timed out with an infinite timeout?";
|
||||
}
|
||||
|
||||
if (pfds[0].revents) {
|
||||
if ((pfds[0].revents & POLLOUT)) {
|
||||
std::lock_guard<std::mutex> lock(this->write_mutex_);
|
||||
WriteResult result = DispatchWrites();
|
||||
switch (result) {
|
||||
case WriteResult::Error:
|
||||
*error = "write failed";
|
||||
return;
|
||||
|
||||
case WriteResult::Completed:
|
||||
writable_ = true;
|
||||
break;
|
||||
|
||||
case WriteResult::TryAgain:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pfds[0].revents & POLLIN) {
|
||||
// TODO: Should we be getting blocks from a free list?
|
||||
auto block = std::make_unique<IOVector::block_type>(MAX_PAYLOAD);
|
||||
rc = adb_read(fd_.get(), &(*block)[0], block->size());
|
||||
if (rc == -1) {
|
||||
*error = std::string("read failed: ") + strerror(errno);
|
||||
return;
|
||||
} else if (rc == 0) {
|
||||
*error = "read failed: EOF";
|
||||
return;
|
||||
}
|
||||
block->resize(rc);
|
||||
read_buffer_.append(std::move(block));
|
||||
|
||||
if (!read_header_ && read_buffer_.size() >= sizeof(amessage)) {
|
||||
auto header_buf = read_buffer_.take_front(sizeof(amessage)).coalesce();
|
||||
CHECK_EQ(sizeof(amessage), header_buf.size());
|
||||
read_header_ = std::make_unique<amessage>();
|
||||
memcpy(read_header_.get(), header_buf.data(), sizeof(amessage));
|
||||
}
|
||||
|
||||
if (read_header_ && read_buffer_.size() >= read_header_->data_length) {
|
||||
auto data_chain = read_buffer_.take_front(read_header_->data_length);
|
||||
|
||||
// TODO: Make apacket carry around a IOVector instead of coalescing.
|
||||
auto payload = data_chain.coalesce<apacket::payload_type>();
|
||||
auto packet = std::make_unique<apacket>();
|
||||
packet->msg = *read_header_;
|
||||
packet->payload = std::move(payload);
|
||||
read_header_ = nullptr;
|
||||
read_callback_(this, std::move(packet));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pfds[1].revents) {
|
||||
uint64_t buf;
|
||||
rc = adb_read(wake_fd_read_.get(), &buf, sizeof(buf));
|
||||
CHECK_EQ(static_cast<int>(sizeof(buf)), rc);
|
||||
|
||||
// We were woken up either to add POLLOUT to our events, or to exit.
|
||||
// Do nothing.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Start() override final {
|
||||
if (started_.exchange(true)) {
|
||||
LOG(FATAL) << "Connection started multiple times?";
|
||||
}
|
||||
|
||||
thread_ = std::thread([this]() {
|
||||
std::string error = "connection closed";
|
||||
Run(&error);
|
||||
this->error_callback_(this, error);
|
||||
});
|
||||
}
|
||||
|
||||
void Stop() override final {
|
||||
SetRunning(false);
|
||||
WakeThread();
|
||||
thread_.join();
|
||||
}
|
||||
|
||||
void WakeThread() {
|
||||
uint64_t buf = 0;
|
||||
if (TEMP_FAILURE_RETRY(adb_write(wake_fd_write_.get(), &buf, sizeof(buf))) != sizeof(buf)) {
|
||||
LOG(FATAL) << "failed to wake up thread";
|
||||
}
|
||||
}
|
||||
|
||||
enum class WriteResult {
|
||||
Error,
|
||||
Completed,
|
||||
TryAgain,
|
||||
};
|
||||
|
||||
WriteResult DispatchWrites() REQUIRES(write_mutex_) {
|
||||
CHECK(!write_buffer_.empty());
|
||||
if (!writable_) {
|
||||
return WriteResult::TryAgain;
|
||||
}
|
||||
|
||||
auto iovs = write_buffer_.iovecs();
|
||||
ssize_t rc = adb_writev(fd_.get(), iovs.data(), iovs.size());
|
||||
if (rc == -1) {
|
||||
return WriteResult::Error;
|
||||
} else if (rc == 0) {
|
||||
errno = 0;
|
||||
return WriteResult::Error;
|
||||
}
|
||||
|
||||
// TODO: Implement a more efficient drop_front?
|
||||
write_buffer_.take_front(rc);
|
||||
if (write_buffer_.empty()) {
|
||||
return WriteResult::Completed;
|
||||
}
|
||||
|
||||
// There's data left in the range, which means our write returned early.
|
||||
return WriteResult::TryAgain;
|
||||
}
|
||||
|
||||
bool Write(std::unique_ptr<apacket> packet) final {
|
||||
std::lock_guard<std::mutex> lock(write_mutex_);
|
||||
const char* header_begin = reinterpret_cast<const char*>(&packet->msg);
|
||||
const char* header_end = header_begin + sizeof(packet->msg);
|
||||
auto header_block = std::make_unique<IOVector::block_type>(header_begin, header_end);
|
||||
write_buffer_.append(std::move(header_block));
|
||||
if (!packet->payload.empty()) {
|
||||
write_buffer_.append(std::make_unique<IOVector::block_type>(std::move(packet->payload)));
|
||||
}
|
||||
return DispatchWrites() != WriteResult::Error;
|
||||
}
|
||||
|
||||
std::thread thread_;
|
||||
|
||||
std::atomic<bool> started_;
|
||||
std::mutex run_mutex_;
|
||||
bool running_ GUARDED_BY(run_mutex_);
|
||||
|
||||
std::unique_ptr<amessage> read_header_;
|
||||
IOVector read_buffer_;
|
||||
|
||||
unique_fd fd_;
|
||||
unique_fd wake_fd_read_;
|
||||
unique_fd wake_fd_write_;
|
||||
|
||||
std::mutex write_mutex_;
|
||||
bool writable_ GUARDED_BY(write_mutex_) = true;
|
||||
IOVector write_buffer_ GUARDED_BY(write_mutex_);
|
||||
|
||||
IOVector incoming_queue_;
|
||||
};
|
||||
|
||||
std::unique_ptr<Connection> Connection::FromFd(unique_fd fd) {
|
||||
return std::make_unique<NonblockingFdConnection>(std::move(fd));
|
||||
}
|
Loading…
Reference in New Issue