Merge streaming tests
This commit is contained in:
parent
e3fbf63c42
commit
3d80d5e641
|
@ -7,10 +7,11 @@
|
|||
#include "test.h"
|
||||
|
||||
#include <carla/ThreadGroup.h>
|
||||
#include <carla/streaming/low_level/Client.h>
|
||||
#include <carla/streaming/low_level/Server.h>
|
||||
#include <carla/streaming/detail/Dispatcher.h>
|
||||
#include <carla/streaming/detail/tcp/Client.h>
|
||||
#include <carla/streaming/detail/tcp/Server.h>
|
||||
#include <carla/streaming/low_level/Client.h>
|
||||
#include <carla/streaming/low_level/Server.h>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
|
@ -37,7 +38,7 @@ private:
|
|||
carla::ThreadGroup _threads;
|
||||
};
|
||||
|
||||
TEST(streaming_low_level, sending_strings) {
|
||||
TEST(streaming, low_level_sending_strings) {
|
||||
using namespace util::buffer;
|
||||
using namespace carla::streaming;
|
||||
using namespace carla::streaming::detail;
|
||||
|
@ -72,7 +73,7 @@ TEST(streaming_low_level, sending_strings) {
|
|||
ASSERT_EQ(message_count, number_of_messages);
|
||||
}
|
||||
|
||||
TEST(streaming_low_level, unsubscribing) {
|
||||
TEST(streaming, low_level_unsubscribing) {
|
||||
using namespace util::buffer;
|
||||
using namespace carla::streaming;
|
||||
using namespace carla::streaming::detail;
|
||||
|
@ -114,3 +115,51 @@ TEST(streaming_low_level, unsubscribing) {
|
|||
ASSERT_EQ(message_count, number_of_messages);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(streaming, low_level_tcp_small_message) {
|
||||
using namespace carla::streaming;
|
||||
using namespace carla::streaming::detail;
|
||||
|
||||
boost::asio::io_service io_service;
|
||||
tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT);
|
||||
|
||||
tcp::Server srv(io_service, ep);
|
||||
srv.SetTimeout(1s);
|
||||
std::atomic_bool done{false};
|
||||
std::atomic_size_t message_count{0u};
|
||||
|
||||
const std::string msg = "Hola!";
|
||||
|
||||
srv.Listen([&](std::shared_ptr<tcp::ServerSession> session) {
|
||||
ASSERT_EQ(session->get_stream_id(), 1u);
|
||||
while (!done) {
|
||||
session->Write(carla::Buffer(msg));
|
||||
std::this_thread::sleep_for(1ns);
|
||||
}
|
||||
std::cout << "done!\n";
|
||||
});
|
||||
|
||||
Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(ep)};
|
||||
auto stream = dispatcher.MakeStream();
|
||||
auto c = std::make_shared<tcp::Client>(io_service, stream.token(), [&](carla::Buffer message) {
|
||||
++message_count;
|
||||
ASSERT_FALSE(message.empty());
|
||||
ASSERT_EQ(message.size(), 5u);
|
||||
const std::string received = util::buffer::as_string(message);
|
||||
ASSERT_EQ(received, msg);
|
||||
});
|
||||
c->Connect();
|
||||
|
||||
// We need at least two threads because this server loop consumes one.
|
||||
carla::ThreadGroup threads;
|
||||
threads.CreateThreads(
|
||||
std::max(2u, std::thread::hardware_concurrency()),
|
||||
[&]() { io_service.run(); });
|
||||
|
||||
std::this_thread::sleep_for(2s);
|
||||
io_service.stop();
|
||||
done = true;
|
||||
std::cout << "client received " << message_count << " messages\n";
|
||||
ASSERT_GT(message_count, 10u);
|
||||
c->Stop();
|
||||
}
|
|
@ -1,62 +0,0 @@
|
|||
// Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma
|
||||
// de Barcelona (UAB).
|
||||
//
|
||||
// This work is licensed under the terms of the MIT license.
|
||||
// For a copy, see <https://opensource.org/licenses/MIT>.
|
||||
|
||||
#include "test.h"
|
||||
|
||||
#include <carla/ThreadGroup.h>
|
||||
#include <carla/streaming/detail/Dispatcher.h>
|
||||
#include <carla/streaming/detail/tcp/Client.h>
|
||||
#include <carla/streaming/detail/tcp/Server.h>
|
||||
|
||||
#include <atomic>
|
||||
|
||||
TEST(streaming_low_level_tcp, small_message) {
|
||||
using namespace carla::streaming;
|
||||
using namespace carla::streaming::detail;
|
||||
|
||||
boost::asio::io_service io_service;
|
||||
tcp::Server::endpoint ep(boost::asio::ip::tcp::v4(), TESTING_PORT);
|
||||
|
||||
tcp::Server srv(io_service, ep);
|
||||
srv.SetTimeout(1s);
|
||||
std::atomic_bool done{false};
|
||||
std::atomic_size_t message_count{0u};
|
||||
|
||||
const std::string msg = "Hola!";
|
||||
|
||||
srv.Listen([&](std::shared_ptr<tcp::ServerSession> session) {
|
||||
ASSERT_EQ(session->get_stream_id(), 1u);
|
||||
while (!done) {
|
||||
session->Write(carla::Buffer(msg));
|
||||
std::this_thread::sleep_for(1ns);
|
||||
}
|
||||
std::cout << "done!\n";
|
||||
});
|
||||
|
||||
Dispatcher dispatcher{make_endpoint<tcp::Client::protocol_type>(ep)};
|
||||
auto stream = dispatcher.MakeStream();
|
||||
auto c = std::make_shared<tcp::Client>(io_service, stream.token(), [&](carla::Buffer message) {
|
||||
++message_count;
|
||||
ASSERT_FALSE(message.empty());
|
||||
ASSERT_EQ(message.size(), 5u);
|
||||
const std::string received = util::buffer::as_string(message);
|
||||
ASSERT_EQ(received, msg);
|
||||
});
|
||||
c->Connect();
|
||||
|
||||
// We need at least two threads because this server loop consumes one.
|
||||
carla::ThreadGroup threads;
|
||||
threads.CreateThreads(
|
||||
std::max(2u, std::thread::hardware_concurrency()),
|
||||
[&]() { io_service.run(); });
|
||||
|
||||
std::this_thread::sleep_for(2s);
|
||||
io_service.stop();
|
||||
done = true;
|
||||
std::cout << "client received " << message_count << " messages\n";
|
||||
ASSERT_GT(message_count, 10u);
|
||||
c->Stop();
|
||||
}
|
Loading…
Reference in New Issue