Merge changes I6541bb13,I486055bb,Id6ac1c54,I16cf7d44

* changes:
  adb: make fdevent_test, socket_test compile on Windows.
  adb: add fd exhaustion test, fix errno reporting in sysdeps_win32.
  adb: move win32 fd base to 2048, fix fd allocation.
  adb: don't emulate fdevent or socketpair on Windows.
This commit is contained in:
Josh Gao 2016-02-20 01:41:09 +00:00 committed by Gerrit Code Review
commit 4a9084228a
10 changed files with 585 additions and 1668 deletions

View File

@ -50,6 +50,7 @@ LIBADB_SRC_FILES := \
adb_listeners.cpp \
adb_trace.cpp \
adb_utils.cpp \
fdevent.cpp \
sockets.cpp \
transport.cpp \
transport_local.cpp \
@ -58,6 +59,8 @@ LIBADB_SRC_FILES := \
LIBADB_TEST_SRCS := \
adb_io_test.cpp \
adb_utils_test.cpp \
fdevent_test.cpp \
socket_test.cpp \
sysdeps_test.cpp \
transport_test.cpp \
@ -75,12 +78,10 @@ LIBADB_windows_CFLAGS := \
$(ADB_COMMON_windows_CFLAGS) \
LIBADB_darwin_SRC_FILES := \
fdevent.cpp \
get_my_path_darwin.cpp \
usb_osx.cpp \
LIBADB_linux_SRC_FILES := \
fdevent.cpp \
get_my_path_linux.cpp \
usb_linux.cpp \
@ -88,14 +89,6 @@ LIBADB_windows_SRC_FILES := \
sysdeps_win32.cpp \
usb_windows.cpp \
LIBADB_TEST_linux_SRCS := \
fdevent_test.cpp \
socket_test.cpp \
LIBADB_TEST_darwin_SRCS := \
fdevent_test.cpp \
socket_test.cpp \
LIBADB_TEST_windows_SRCS := \
sysdeps_win32_test.cpp \

View File

@ -213,6 +213,7 @@ std::string perror_str(const char* msg) {
}
#if !defined(_WIN32)
// Windows version provided in sysdeps_win32.cpp
bool set_file_block_mode(int fd, bool block) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {

View File

@ -21,12 +21,11 @@
#include "fdevent.h"
#include <fcntl.h>
#include <poll.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <atomic>
#include <list>
#include <unordered_map>
#include <vector>
@ -54,7 +53,7 @@ int SHELL_EXIT_NOTIFY_FD = -1;
struct PollNode {
fdevent* fde;
::pollfd pollfd;
adb_pollfd pollfd;
PollNode(fdevent* fde) : fde(fde) {
memset(&pollfd, 0, sizeof(pollfd));
@ -72,18 +71,19 @@ struct PollNode {
// That's why we don't need a lock for fdevent.
static auto& g_poll_node_map = *new std::unordered_map<int, PollNode>();
static auto& g_pending_list = *new std::list<fdevent*>();
static std::atomic<bool> terminate_loop(false);
static bool main_thread_valid;
static pthread_t main_thread;
static unsigned long main_thread_id;
static void check_main_thread() {
if (main_thread_valid) {
CHECK_NE(0, pthread_equal(main_thread, pthread_self()));
CHECK_EQ(main_thread_id, adb_thread_id());
}
}
static void set_main_thread() {
main_thread_valid = true;
main_thread = pthread_self();
main_thread_id = adb_thread_id();
}
static std::string dump_fde(const fdevent* fde) {
@ -217,7 +217,7 @@ void fdevent_del(fdevent* fde, unsigned events) {
fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events);
}
static std::string dump_pollfds(const std::vector<pollfd>& pollfds) {
static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) {
std::string result;
for (const auto& pollfd : pollfds) {
std::string op;
@ -233,13 +233,13 @@ static std::string dump_pollfds(const std::vector<pollfd>& pollfds) {
}
static void fdevent_process() {
std::vector<pollfd> pollfds;
std::vector<adb_pollfd> pollfds;
for (const auto& pair : g_poll_node_map) {
pollfds.push_back(pair.second.pollfd);
}
CHECK_GT(pollfds.size(), 0u);
D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str());
int ret = TEMP_FAILURE_RETRY(poll(&pollfds[0], pollfds.size(), -1));
int ret = adb_poll(&pollfds[0], pollfds.size(), -1);
if (ret == -1) {
PLOG(ERROR) << "poll(), ret = " << ret;
return;
@ -289,6 +289,9 @@ static void fdevent_call_fdfunc(fdevent* fde)
}
#if !ADB_HOST
#include <sys/ioctl.h>
static void fdevent_subproc_event_func(int fd, unsigned ev,
void* /* userdata */)
{
@ -363,6 +366,10 @@ void fdevent_loop()
#endif // !ADB_HOST
while (true) {
if (terminate_loop) {
return;
}
D("--- --- waiting for events");
fdevent_process();
@ -375,6 +382,10 @@ void fdevent_loop()
}
}
void fdevent_terminate_loop() {
terminate_loop = true;
}
size_t fdevent_installed_count() {
return g_poll_node_map.size();
}
@ -383,4 +394,5 @@ void fdevent_reset() {
g_poll_node_map.clear();
g_pending_list.clear();
main_thread_valid = false;
terminate_loop = false;
}

View File

@ -76,9 +76,9 @@ void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms);
*/
void fdevent_loop();
// For debugging only.
// The following functions are used only for tests.
void fdevent_terminate_loop();
size_t fdevent_installed_count();
// For debugging only.
void fdevent_reset();
#endif

View File

@ -18,15 +18,13 @@
#include <gtest/gtest.h>
#include <pthread.h>
#include <signal.h>
#include <limits>
#include <queue>
#include <string>
#include <vector>
#include "adb_io.h"
#include "fdevent_test.h"
class FdHandler {
public:
@ -48,7 +46,7 @@ class FdHandler {
if (events & FDE_READ) {
ASSERT_EQ(fd, handler->read_fd_);
char c;
ASSERT_EQ(1, read(fd, &c, 1));
ASSERT_EQ(1, adb_read(fd, &c, 1));
handler->queue_.push(c);
fdevent_add(&handler->write_fde_, FDE_WRITE);
}
@ -57,7 +55,7 @@ class FdHandler {
ASSERT_FALSE(handler->queue_.empty());
char c = handler->queue_.front();
handler->queue_.pop();
ASSERT_EQ(1, write(fd, &c, 1));
ASSERT_EQ(1, adb_write(fd, &c, 1));
if (handler->queue_.empty()) {
fdevent_del(&handler->write_fde_, FDE_WRITE);
}
@ -72,29 +70,19 @@ class FdHandler {
std::queue<char> queue_;
};
static void signal_handler(int) {
pthread_exit(nullptr);
}
class FdeventTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
ASSERT_NE(SIG_ERR, signal(SIGUSR1, signal_handler));
ASSERT_NE(SIG_ERR, signal(SIGPIPE, SIG_IGN));
}
virtual void SetUp() {
fdevent_reset();
ASSERT_EQ(0u, fdevent_installed_count());
}
};
struct ThreadArg {
int first_read_fd;
int last_write_fd;
size_t middle_pipe_count;
};
TEST_F(FdeventTest, fdevent_terminate) {
adb_thread_t thread;
PrepareThread();
ASSERT_TRUE(adb_thread_create([](void*) { fdevent_loop(); }, nullptr, &thread));
TerminateThread(thread);
}
static void FdEventThreadFunc(ThreadArg* arg) {
std::vector<int> read_fds;
std::vector<int> write_fds;
@ -102,7 +90,7 @@ static void FdEventThreadFunc(ThreadArg* arg) {
read_fds.push_back(arg->first_read_fd);
for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
int fds[2];
ASSERT_EQ(0, pipe(fds));
ASSERT_EQ(0, adb_socketpair(fds));
read_fds.push_back(fds[0]);
write_fds.push_back(fds[1]);
}
@ -122,9 +110,9 @@ TEST_F(FdeventTest, smoke) {
const std::string MESSAGE = "fdevent_test";
int fd_pair1[2];
int fd_pair2[2];
ASSERT_EQ(0, pipe(fd_pair1));
ASSERT_EQ(0, pipe(fd_pair2));
pthread_t thread;
ASSERT_EQ(0, adb_socketpair(fd_pair1));
ASSERT_EQ(0, adb_socketpair(fd_pair2));
adb_thread_t thread;
ThreadArg thread_arg;
thread_arg.first_read_fd = fd_pair1[0];
thread_arg.last_write_fd = fd_pair2[1];
@ -132,9 +120,9 @@ TEST_F(FdeventTest, smoke) {
int writer = fd_pair1[1];
int reader = fd_pair2[0];
ASSERT_EQ(0, pthread_create(&thread, nullptr,
reinterpret_cast<void* (*)(void*)>(FdEventThreadFunc),
&thread_arg));
PrepareThread();
ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(FdEventThreadFunc), &thread_arg,
&thread));
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
@ -144,10 +132,9 @@ TEST_F(FdeventTest, smoke) {
ASSERT_EQ(read_buffer, write_buffer);
}
ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
ASSERT_EQ(0, pthread_join(thread, nullptr));
ASSERT_EQ(0, close(writer));
ASSERT_EQ(0, close(reader));
TerminateThread(thread);
ASSERT_EQ(0, adb_close(writer));
ASSERT_EQ(0, adb_close(reader));
}
struct InvalidFdArg {
@ -161,7 +148,7 @@ static void InvalidFdEventCallback(int fd, unsigned events, void* userdata) {
ASSERT_EQ(arg->expected_events, events);
fdevent_remove(&arg->fde);
if (++*(arg->happened_event_count) == 2) {
pthread_exit(nullptr);
fdevent_terminate_loop();
}
}
@ -184,9 +171,7 @@ static void InvalidFdThreadFunc(void*) {
}
TEST_F(FdeventTest, invalid_fd) {
pthread_t thread;
ASSERT_EQ(0, pthread_create(&thread, nullptr,
reinterpret_cast<void* (*)(void*)>(InvalidFdThreadFunc),
nullptr));
ASSERT_EQ(0, pthread_join(thread, nullptr));
adb_thread_t thread;
ASSERT_TRUE(adb_thread_create(InvalidFdThreadFunc, nullptr, &thread));
ASSERT_TRUE(adb_thread_join(thread));
}

58
adb/fdevent_test.h Normal file
View File

@ -0,0 +1,58 @@
/*
* Copyright (C) 2016 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include "socket.h"
#include "sysdeps.h"
class FdeventTest : public ::testing::Test {
protected:
int dummy = -1;
static void SetUpTestCase() {
#if !defined(_WIN32)
ASSERT_NE(SIG_ERR, signal(SIGPIPE, SIG_IGN));
#endif
}
void SetUp() override {
fdevent_reset();
ASSERT_EQ(0u, fdevent_installed_count());
}
// Register a dummy socket used to wake up the fdevent loop to tell it to die.
void PrepareThread() {
int dummy_fds[2];
if (adb_socketpair(dummy_fds) != 0) {
FAIL() << "failed to create socketpair: " << strerror(errno);
}
asocket* dummy_socket = create_local_socket(dummy_fds[1]);
if (!dummy_socket) {
FAIL() << "failed to create local socket: " << strerror(errno);
}
dummy_socket->ready(dummy_socket);
dummy = dummy_fds[0];
}
void TerminateThread(adb_thread_t thread) {
fdevent_terminate_loop();
ASSERT_TRUE(WriteFdExactly(dummy, "", 1));
ASSERT_TRUE(adb_thread_join(thread));
ASSERT_EQ(0, adb_close(dummy));
}
};

View File

@ -18,119 +18,89 @@
#include <gtest/gtest.h>
#include <array>
#include <limits>
#include <queue>
#include <string>
#include <vector>
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include "adb.h"
#include "adb_io.h"
#include "fdevent_test.h"
#include "socket.h"
#include "sysdeps.h"
static void signal_handler(int) {
ASSERT_EQ(1u, fdevent_installed_count());
pthread_exit(nullptr);
}
// On host, register a dummy socket, so fdevet_loop() will not abort when previously
// registered local sockets are all closed. On device, fdevent_subproc_setup() installs
// one fdevent which can be considered as dummy socket.
static void InstallDummySocket() {
#if ADB_HOST
int dummy_fds[2];
ASSERT_EQ(0, pipe(dummy_fds));
asocket* dummy_socket = create_local_socket(dummy_fds[0]);
ASSERT_TRUE(dummy_socket != nullptr);
dummy_socket->ready(dummy_socket);
#endif
}
struct ThreadArg {
int first_read_fd;
int last_write_fd;
size_t middle_pipe_count;
};
static void FdEventThreadFunc(ThreadArg* arg) {
std::vector<int> read_fds;
std::vector<int> write_fds;
class LocalSocketTest : public FdeventTest {};
read_fds.push_back(arg->first_read_fd);
for (size_t i = 0; i < arg->middle_pipe_count; ++i) {
int fds[2];
ASSERT_EQ(0, adb_socketpair(fds));
read_fds.push_back(fds[0]);
write_fds.push_back(fds[1]);
}
write_fds.push_back(arg->last_write_fd);
for (size_t i = 0; i < read_fds.size(); ++i) {
asocket* reader = create_local_socket(read_fds[i]);
ASSERT_TRUE(reader != nullptr);
asocket* writer = create_local_socket(write_fds[i]);
ASSERT_TRUE(writer != nullptr);
reader->peer = writer;
writer->peer = reader;
reader->ready(reader);
}
InstallDummySocket();
static void FdEventThreadFunc(void*) {
fdevent_loop();
}
class LocalSocketTest : public ::testing::Test {
protected:
static void SetUpTestCase() {
ASSERT_NE(SIG_ERR, signal(SIGUSR1, signal_handler));
ASSERT_NE(SIG_ERR, signal(SIGPIPE, SIG_IGN));
}
virtual void SetUp() {
fdevent_reset();
ASSERT_EQ(0u, fdevent_installed_count());
}
};
TEST_F(LocalSocketTest, smoke) {
const size_t PIPE_COUNT = 100;
const size_t MESSAGE_LOOP_COUNT = 100;
// Join two socketpairs with a chain of intermediate socketpairs.
int first[2];
std::vector<std::array<int, 2>> intermediates;
int last[2];
constexpr size_t INTERMEDIATE_COUNT = 50;
constexpr size_t MESSAGE_LOOP_COUNT = 100;
const std::string MESSAGE = "socket_test";
int fd_pair1[2];
int fd_pair2[2];
ASSERT_EQ(0, adb_socketpair(fd_pair1));
ASSERT_EQ(0, adb_socketpair(fd_pair2));
pthread_t thread;
ThreadArg thread_arg;
thread_arg.first_read_fd = fd_pair1[0];
thread_arg.last_write_fd = fd_pair2[1];
thread_arg.middle_pipe_count = PIPE_COUNT;
int writer = fd_pair1[1];
int reader = fd_pair2[0];
ASSERT_EQ(0, pthread_create(&thread, nullptr,
reinterpret_cast<void* (*)(void*)>(FdEventThreadFunc),
&thread_arg));
intermediates.resize(INTERMEDIATE_COUNT);
ASSERT_EQ(0, adb_socketpair(first)) << strerror(errno);
ASSERT_EQ(0, adb_socketpair(last)) << strerror(errno);
asocket* prev_tail = create_local_socket(first[1]);
ASSERT_NE(nullptr, prev_tail);
auto connect = [](asocket* tail, asocket* head) {
tail->peer = head;
head->peer = tail;
tail->ready(tail);
};
for (auto& intermediate : intermediates) {
ASSERT_EQ(0, adb_socketpair(intermediate.data())) << strerror(errno);
asocket* head = create_local_socket(intermediate[0]);
ASSERT_NE(nullptr, head);
asocket* tail = create_local_socket(intermediate[1]);
ASSERT_NE(nullptr, tail);
connect(prev_tail, head);
prev_tail = tail;
}
asocket* end = create_local_socket(last[0]);
ASSERT_NE(nullptr, end);
connect(prev_tail, end);
PrepareThread();
adb_thread_t thread;
ASSERT_TRUE(adb_thread_create(FdEventThreadFunc, nullptr, &thread));
usleep(1000);
for (size_t i = 0; i < MESSAGE_LOOP_COUNT; ++i) {
std::string read_buffer = MESSAGE;
std::string write_buffer(MESSAGE.size(), 'a');
ASSERT_TRUE(WriteFdExactly(writer, read_buffer.c_str(), read_buffer.size()));
ASSERT_TRUE(ReadFdExactly(reader, &write_buffer[0], write_buffer.size()));
ASSERT_TRUE(WriteFdExactly(first[0], &read_buffer[0], read_buffer.size()));
ASSERT_TRUE(ReadFdExactly(last[1], &write_buffer[0], write_buffer.size()));
ASSERT_EQ(read_buffer, write_buffer);
}
ASSERT_EQ(0, adb_close(writer));
ASSERT_EQ(0, adb_close(reader));
// Wait until the local sockets are closed.
sleep(1);
ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
ASSERT_EQ(0, pthread_join(thread, nullptr));
ASSERT_EQ(0, adb_close(first[0]));
ASSERT_EQ(0, adb_close(last[1]));
// Wait until the local sockets are closed.
adb_sleep_ms(100);
TerminateThread(thread);
}
struct CloseWithPacketArg {
@ -160,7 +130,6 @@ static void CloseWithPacketThreadFunc(CloseWithPacketArg* arg) {
s->peer = cause_close_s;
cause_close_s->ready(cause_close_s);
InstallDummySocket();
fdevent_loop();
}
@ -176,21 +145,19 @@ TEST_F(LocalSocketTest, close_socket_with_packet) {
CloseWithPacketArg arg;
arg.socket_fd = socket_fd[1];
arg.cause_close_fd = cause_close_fd[1];
pthread_t thread;
ASSERT_EQ(0, pthread_create(&thread, nullptr,
reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
&arg));
// Wait until the fdevent_loop() starts.
sleep(1);
ASSERT_EQ(0, adb_close(cause_close_fd[0]));
sleep(1);
ASSERT_EQ(2u, fdevent_installed_count());
ASSERT_EQ(0, adb_close(socket_fd[0]));
// Wait until the socket is closed.
sleep(1);
ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
ASSERT_EQ(0, pthread_join(thread, nullptr));
PrepareThread();
adb_thread_t thread;
ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
&arg, &thread));
// Wait until the fdevent_loop() starts.
adb_sleep_ms(100);
ASSERT_EQ(0, adb_close(cause_close_fd[0]));
adb_sleep_ms(100);
EXPECT_EQ(2u, fdevent_installed_count());
ASSERT_EQ(0, adb_close(socket_fd[0]));
TerminateThread(thread);
}
// This test checks if we can read packets from a closing local socket.
@ -203,26 +170,23 @@ TEST_F(LocalSocketTest, read_from_closing_socket) {
arg.socket_fd = socket_fd[1];
arg.cause_close_fd = cause_close_fd[1];
pthread_t thread;
ASSERT_EQ(0, pthread_create(&thread, nullptr,
reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
&arg));
PrepareThread();
adb_thread_t thread;
ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
&arg, &thread));
// Wait until the fdevent_loop() starts.
sleep(1);
adb_sleep_ms(100);
ASSERT_EQ(0, adb_close(cause_close_fd[0]));
sleep(1);
ASSERT_EQ(2u, fdevent_installed_count());
adb_sleep_ms(100);
EXPECT_EQ(2u, fdevent_installed_count());
// Verify if we can read successfully.
std::vector<char> buf(arg.bytes_written);
ASSERT_NE(0u, arg.bytes_written);
ASSERT_EQ(true, ReadFdExactly(socket_fd[0], buf.data(), buf.size()));
ASSERT_EQ(0, adb_close(socket_fd[0]));
// Wait until the socket is closed.
sleep(1);
ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
ASSERT_EQ(0, pthread_join(thread, nullptr));
TerminateThread(thread);
}
// This test checks if we can close local socket in the following situation:
@ -238,20 +202,17 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {
arg.socket_fd = socket_fd[1];
arg.cause_close_fd = cause_close_fd[1];
pthread_t thread;
ASSERT_EQ(0, pthread_create(&thread, nullptr,
reinterpret_cast<void* (*)(void*)>(CloseWithPacketThreadFunc),
&arg));
PrepareThread();
adb_thread_t thread;
ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseWithPacketThreadFunc),
&arg, &thread));
// Wait until the fdevent_loop() starts.
sleep(1);
ASSERT_EQ(3u, fdevent_installed_count());
adb_sleep_ms(100);
EXPECT_EQ(3u, fdevent_installed_count());
ASSERT_EQ(0, adb_close(socket_fd[0]));
// Wait until the socket is closed.
sleep(1);
ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
ASSERT_EQ(0, pthread_join(thread, nullptr));
TerminateThread(thread);
}
#if defined(__linux__)
@ -260,50 +221,52 @@ static void ClientThreadFunc() {
std::string error;
int fd = network_loopback_client(5038, SOCK_STREAM, &error);
ASSERT_GE(fd, 0) << error;
sleep(2);
adb_sleep_ms(200);
ASSERT_EQ(0, adb_close(fd));
}
struct CloseRdHupSocketArg {
int socket_fd;
int socket_fd;
};
static void CloseRdHupSocketThreadFunc(CloseRdHupSocketArg* arg) {
asocket* s = create_local_socket(arg->socket_fd);
ASSERT_TRUE(s != nullptr);
asocket* s = create_local_socket(arg->socket_fd);
ASSERT_TRUE(s != nullptr);
InstallDummySocket();
fdevent_loop();
fdevent_loop();
}
// This test checks if we can close sockets in CLOSE_WAIT state.
TEST_F(LocalSocketTest, close_socket_in_CLOSE_WAIT_state) {
std::string error;
int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error);
ASSERT_GE(listen_fd, 0);
pthread_t client_thread;
ASSERT_EQ(0, pthread_create(&client_thread, nullptr,
reinterpret_cast<void* (*)(void*)>(ClientThreadFunc), nullptr));
std::string error;
int listen_fd = network_inaddr_any_server(5038, SOCK_STREAM, &error);
ASSERT_GE(listen_fd, 0);
struct sockaddr addr;
socklen_t alen;
alen = sizeof(addr);
int accept_fd = adb_socket_accept(listen_fd, &addr, &alen);
ASSERT_GE(accept_fd, 0);
CloseRdHupSocketArg arg;
arg.socket_fd = accept_fd;
pthread_t thread;
ASSERT_EQ(0, pthread_create(&thread, nullptr,
reinterpret_cast<void* (*)(void*)>(CloseRdHupSocketThreadFunc),
&arg));
// Wait until the fdevent_loop() starts.
sleep(1);
ASSERT_EQ(2u, fdevent_installed_count());
// Wait until the client closes its socket.
ASSERT_EQ(0, pthread_join(client_thread, nullptr));
sleep(2);
ASSERT_EQ(0, pthread_kill(thread, SIGUSR1));
ASSERT_EQ(0, pthread_join(thread, nullptr));
adb_thread_t client_thread;
ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(ClientThreadFunc), nullptr,
&client_thread));
struct sockaddr addr;
socklen_t alen;
alen = sizeof(addr);
int accept_fd = adb_socket_accept(listen_fd, &addr, &alen);
ASSERT_GE(accept_fd, 0);
CloseRdHupSocketArg arg;
arg.socket_fd = accept_fd;
PrepareThread();
adb_thread_t thread;
ASSERT_TRUE(adb_thread_create(reinterpret_cast<void (*)(void*)>(CloseRdHupSocketThreadFunc),
&arg, &thread));
// Wait until the fdevent_loop() starts.
adb_sleep_ms(100);
EXPECT_EQ(2u, fdevent_installed_count());
// Wait until the client closes its socket.
ASSERT_TRUE(adb_thread_join(client_thread));
TerminateThread(thread);
}
#endif // defined(__linux__)

View File

@ -180,6 +180,14 @@ static __inline__ int adb_thread_setname(const std::string& name) {
return 0;
}
static __inline__ adb_thread_t adb_thread_self() {
return GetCurrentThread();
}
static __inline__ bool adb_thread_equal(adb_thread_t lhs, adb_thread_t rhs) {
return GetThreadId(lhs) == GetThreadId(rhs);
}
static __inline__ unsigned long adb_thread_id()
{
return GetCurrentThreadId();
@ -263,24 +271,6 @@ int unix_isatty(int fd);
/* normally provided by <cutils/misc.h> */
extern void* load_file(const char* pathname, unsigned* psize);
/* normally provided by "fdevent.h" */
#define FDE_READ 0x0001
#define FDE_WRITE 0x0002
#define FDE_ERROR 0x0004
#define FDE_DONT_CLOSE 0x0080
typedef void (*fd_func)(int fd, unsigned events, void *userdata);
fdevent *fdevent_create(int fd, fd_func func, void *arg);
void fdevent_destroy(fdevent *fde);
void fdevent_install(fdevent *fde, int fd, fd_func func, void *arg);
void fdevent_remove(fdevent *item);
void fdevent_set(fdevent *fde, unsigned events);
void fdevent_add(fdevent *fde, unsigned events);
void fdevent_del(fdevent *fde, unsigned events);
void fdevent_loop();
static __inline__ void adb_sleep_ms( int mseconds )
{
Sleep( mseconds );
@ -304,6 +294,14 @@ extern int adb_setsockopt(int fd, int level, int optname, const void* optva
extern int adb_socketpair( int sv[2] );
struct adb_pollfd {
int fd;
short events;
short revents;
};
extern int adb_poll(adb_pollfd* fds, size_t nfds, int timeout);
#define poll ___xxx_poll
static __inline__ int adb_is_absolute_host_path(const char* path) {
return isalpha(path[0]) && path[1] == ':' && path[2] == '\\';
}
@ -456,14 +454,14 @@ size_t ParseCompleteUTF8(const char* first, const char* last, std::vector<char>*
#else /* !_WIN32 a.k.a. Unix */
#include "fdevent.h"
#include <cutils/misc.h>
#include <cutils/sockets.h>
#include <cutils/threads.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <pthread.h>
#include <unistd.h>
@ -803,6 +801,13 @@ static __inline__ int adb_socketpair( int sv[2] )
#undef socketpair
#define socketpair ___xxx_socketpair
typedef struct pollfd adb_pollfd;
static __inline__ int adb_poll(adb_pollfd* fds, size_t nfds, int timeout) {
return TEMP_FAILURE_RETRY(poll(fds, nfds, timeout));
}
#define poll ___xxx_poll
static __inline__ void adb_sleep_ms( int mseconds )
{
usleep( mseconds*1000 );

View File

@ -18,6 +18,7 @@
#include <unistd.h>
#include <atomic>
#include "adb_io.h"
#include "sysdeps.h"
static void increment_atomic_int(void* c) {
@ -67,3 +68,125 @@ TEST(sysdeps_thread, exit) {
nullptr, &thread));
ASSERT_TRUE(adb_thread_join(thread));
}
TEST(sysdeps_socketpair, smoke) {
int fds[2];
ASSERT_EQ(0, adb_socketpair(fds)) << strerror(errno);
ASSERT_TRUE(WriteFdExactly(fds[0], "foo", 4));
ASSERT_TRUE(WriteFdExactly(fds[1], "bar", 4));
char buf[4];
ASSERT_TRUE(ReadFdExactly(fds[1], buf, 4));
ASSERT_STREQ(buf, "foo");
ASSERT_TRUE(ReadFdExactly(fds[0], buf, 4));
ASSERT_STREQ(buf, "bar");
ASSERT_EQ(0, adb_close(fds[0]));
ASSERT_EQ(0, adb_close(fds[1]));
}
TEST(sysdeps_fd, exhaustion) {
std::vector<int> fds;
int socketpair[2];
while (adb_socketpair(socketpair) == 0) {
fds.push_back(socketpair[0]);
fds.push_back(socketpair[1]);
}
ASSERT_EQ(EMFILE, errno) << strerror(errno);
for (int fd : fds) {
ASSERT_EQ(0, adb_close(fd));
}
ASSERT_EQ(0, adb_socketpair(socketpair));
ASSERT_EQ(socketpair[0], fds[0]);
ASSERT_EQ(socketpair[1], fds[1]);
ASSERT_EQ(0, adb_close(socketpair[0]));
ASSERT_EQ(0, adb_close(socketpair[1]));
}
class sysdeps_poll : public ::testing::Test {
protected:
int fds[2];
void SetUp() override {
ASSERT_EQ(0, adb_socketpair(fds)) << strerror(errno);
}
void TearDown() override {
ASSERT_EQ(0, adb_close(fds[0]));
ASSERT_EQ(0, adb_close(fds[1]));
}
};
TEST_F(sysdeps_poll, smoke) {
adb_pollfd pfd[2];
pfd[0].fd = fds[0];
pfd[0].events = POLLRDNORM;
pfd[1].fd = fds[1];
pfd[1].events = POLLWRNORM;
EXPECT_EQ(1, adb_poll(pfd, 2, 0));
EXPECT_EQ(0, pfd[0].revents);
EXPECT_EQ(POLLWRNORM, pfd[1].revents);
ASSERT_TRUE(WriteFdExactly(fds[1], "foo", 4));
// Wait for the socketpair to be flushed.
EXPECT_EQ(1, adb_poll(pfd, 1, 100));
EXPECT_EQ(POLLRDNORM, pfd[0].revents);
EXPECT_EQ(2, adb_poll(pfd, 2, 0));
EXPECT_EQ(POLLRDNORM, pfd[0].revents);
EXPECT_EQ(POLLWRNORM, pfd[1].revents);
}
TEST_F(sysdeps_poll, timeout) {
adb_pollfd pfd;
pfd.fd = fds[0];
pfd.events = POLLRDNORM;
EXPECT_EQ(0, adb_poll(&pfd, 1, 100));
EXPECT_EQ(0, pfd.revents);
ASSERT_TRUE(WriteFdExactly(fds[1], "foo", 4));
EXPECT_EQ(1, adb_poll(&pfd, 1, 100));
EXPECT_EQ(POLLRDNORM, pfd.revents);
}
TEST_F(sysdeps_poll, invalid_fd) {
adb_pollfd pfd[3];
pfd[0].fd = fds[0];
pfd[0].events = POLLRDNORM;
pfd[1].fd = INT_MAX;
pfd[1].events = POLLRDNORM;
pfd[2].fd = fds[1];
pfd[2].events = POLLWRNORM;
ASSERT_TRUE(WriteFdExactly(fds[1], "foo", 4));
// Wait for the socketpair to be flushed.
EXPECT_EQ(1, adb_poll(pfd, 1, 100));
EXPECT_EQ(POLLRDNORM, pfd[0].revents);
EXPECT_EQ(3, adb_poll(pfd, 3, 0));
EXPECT_EQ(POLLRDNORM, pfd[0].revents);
EXPECT_EQ(POLLNVAL, pfd[1].revents);
EXPECT_EQ(POLLWRNORM, pfd[2].revents);
}
TEST_F(sysdeps_poll, duplicate_fd) {
adb_pollfd pfd[2];
pfd[0].fd = fds[0];
pfd[0].events = POLLRDNORM;
pfd[1] = pfd[0];
EXPECT_EQ(0, adb_poll(pfd, 2, 0));
EXPECT_EQ(0, pfd[0].revents);
EXPECT_EQ(0, pfd[1].revents);
ASSERT_TRUE(WriteFdExactly(fds[1], "foo", 4));
EXPECT_EQ(2, adb_poll(pfd, 2, 100));
EXPECT_EQ(POLLRDNORM, pfd[0].revents);
EXPECT_EQ(POLLRDNORM, pfd[1].revents);
}

File diff suppressed because it is too large Load Diff