diff --git a/test/performance/perf_roscpp/CMakeLists.txt b/test/performance/perf_roscpp/CMakeLists.txt deleted file mode 100644 index 551fb845..00000000 --- a/test/performance/perf_roscpp/CMakeLists.txt +++ /dev/null @@ -1,31 +0,0 @@ -cmake_minimum_required(VERSION 2.4.6) -include($ENV{ROS_ROOT}/core/rosbuild/rosbuild.cmake) - -# Set the build type. Options are: -# Coverage : w/ debug symbols, w/o optimization, w/ code-coverage -# Debug : w/ debug symbols, w/o optimization -# Release : w/o debug symbols, w/ optimization -# RelWithDebInfo : w/ debug symbols, w/ optimization -# MinSizeRel : w/o debug symbols, w/ optimization, stripped binaries -#set(ROS_BUILD_TYPE RelWithDebInfo) - -rosbuild_init() - -#set the default path for built executables to the "bin" directory -set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin) -#set the default path for built libraries to the "lib" directory -set(LIBRARY_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/lib) - -#uncomment if you have defined messages -rosbuild_genmsg() -#uncomment if you have defined services -rosbuild_gensrv() - -rosbuild_add_boost_directories() - -#common commands for building c++ executables and libraries -rosbuild_add_library(${PROJECT_NAME} src/intra.cpp src/inter.cpp) -rosbuild_link_boost(${PROJECT_NAME} thread) - -rosbuild_add_executable(intra_suite src/intra_suite.cpp) -target_link_libraries(intra_suite ${PROJECT_NAME}) diff --git a/test/performance/perf_roscpp/Makefile b/test/performance/perf_roscpp/Makefile deleted file mode 100644 index b75b928f..00000000 --- a/test/performance/perf_roscpp/Makefile +++ /dev/null @@ -1 +0,0 @@ -include $(shell rospack find mk)/cmake.mk \ No newline at end of file diff --git a/test/performance/perf_roscpp/include/perf_roscpp/inter.h b/test/performance/perf_roscpp/include/perf_roscpp/inter.h deleted file mode 100644 index 8b137891..00000000 --- a/test/performance/perf_roscpp/include/perf_roscpp/inter.h +++ /dev/null @@ -1 +0,0 @@ - diff --git a/test/performance/perf_roscpp/include/perf_roscpp/intra.h b/test/performance/perf_roscpp/include/perf_roscpp/intra.h deleted file mode 100644 index 00685937..00000000 --- a/test/performance/perf_roscpp/include/perf_roscpp/intra.h +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Software License Agreement (BSD License) - * - * Copyright (c) 2009, Willow Garage, Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials provided - * with the distribution. - * * Neither the name of Willow Garage, Inc. nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS - * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE - * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, - * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#ifndef PERF_ROSCPP_INTRA_H -#define PERF_ROSCPP_INTRA_H - -#include -#include - -namespace perf_roscpp -{ -namespace intra -{ - -struct ThroughputResult -{ - double test_duration; - uint64_t streams; - uint64_t message_size; - uint32_t sender_threads; - uint32_t receiver_threads; - - uint64_t messages_sent; - uint64_t messages_received; - - uint64_t total_bytes_sent; - uint64_t total_bytes_received; - uint64_t bytes_per_second; - - ros::WallTime test_start; - ros::WallTime test_end; -}; - -ThroughputResult throughput(double duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads); - -struct LatencyResult -{ - uint64_t count_per_stream; - uint64_t streams; - uint64_t message_size; - uint32_t sender_threads; - uint32_t receiver_threads; - - uint64_t total_message_count; - - double latency_avg; - double latency_min; - double latency_max; - - ros::WallTime test_start; - ros::WallTime test_end; -}; -LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads); - -struct STLatencyResult -{ - uint64_t total_message_count; - - double latency_avg; - double latency_min; - double latency_max; - - ros::WallTime test_start; - ros::WallTime test_end; -}; -STLatencyResult stlatency(uint32_t message_count); - -} // namespace intra -} // namespace perf_roscpp - -#endif // PERF_ROSCPP_INTRA_H diff --git a/test/performance/perf_roscpp/manifest.xml b/test/performance/perf_roscpp/manifest.xml deleted file mode 100644 index 932cddf3..00000000 --- a/test/performance/perf_roscpp/manifest.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - - This is a set of performance tests for roscpp, mainly to provide numbers to optimize against. - - - Josh Faust (jfaust@willowgarage.com) - BSD - - http://ros.org/wiki/perf_roscpp - - - - - - - - diff --git a/test/performance/perf_roscpp/msg/LatencyMessage.msg b/test/performance/perf_roscpp/msg/LatencyMessage.msg deleted file mode 100644 index eedfac91..00000000 --- a/test/performance/perf_roscpp/msg/LatencyMessage.msg +++ /dev/null @@ -1,6 +0,0 @@ -float64 publish_time -float64 receipt_time -uint64 count -uint32 thread_index -uint8[] array - diff --git a/test/performance/perf_roscpp/msg/ThroughputMessage.msg b/test/performance/perf_roscpp/msg/ThroughputMessage.msg deleted file mode 100644 index ec10b9be..00000000 --- a/test/performance/perf_roscpp/msg/ThroughputMessage.msg +++ /dev/null @@ -1,2 +0,0 @@ -uint8[] array - diff --git a/test/performance/perf_roscpp/src/inter.cpp b/test/performance/perf_roscpp/src/inter.cpp deleted file mode 100644 index 8b137891..00000000 --- a/test/performance/perf_roscpp/src/inter.cpp +++ /dev/null @@ -1 +0,0 @@ - diff --git a/test/performance/perf_roscpp/src/intra.cpp b/test/performance/perf_roscpp/src/intra.cpp deleted file mode 100644 index 7ef22549..00000000 --- a/test/performance/perf_roscpp/src/intra.cpp +++ /dev/null @@ -1,720 +0,0 @@ -/* - * Software License Agreement (BSD License) - * - * Copyright (c) 2009, Willow Garage, Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials provided - * with the distribution. - * * Neither the name of Willow Garage, Inc. nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS - * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE - * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, - * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "perf_roscpp/intra.h" -#include "perf_roscpp/ThroughputMessage.h" -#include "perf_roscpp/LatencyMessage.h" - -#include -#include - -#include - -#include - -namespace perf_roscpp -{ -namespace intra -{ - -class ThroughputTest -{ -public: - ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads); - - ThroughputResult run(); - -private: - void sendThread(boost::barrier* all_connected); - void receiveThread(boost::barrier* all_started, boost::barrier* all_start, ros::WallTime* end_time); - - void callback(const ThroughputMessageConstPtr& msg); - - boost::mutex mutex_; - - struct ReceiveThreadResult - { - uint64_t bytes_received; - uint64_t messages_received; - - ros::WallTime last_recv_time; - }; - boost::thread_specific_ptr receive_thread_result_; - std::vector > receive_results_; - - struct SendThreadResult - { - uint64_t bytes_sent; - uint64_t messages_sent; - - ros::WallTime first_send_time; - }; - boost::thread_specific_ptr send_thread_result_; - std::vector > send_results_; - - ros::CallbackQueue queue_; - std::vector pubs_; - - boost::thread_group receive_threads_; - boost::thread_group send_threads_; - - double test_duration_; - uint32_t streams_; - uint32_t message_size_; - uint32_t sender_threads_; - uint32_t receiver_threads_; -}; - -ThroughputTest::ThroughputTest(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads) -: test_duration_(test_duration) -, streams_(streams) -, message_size_(message_size) -, sender_threads_(sender_threads) -, receiver_threads_(receiver_threads) -{ -} - -void ThroughputTest::callback(const ThroughputMessageConstPtr& msg) -{ - ReceiveThreadResult& r = *receive_thread_result_; - - r.bytes_received += msg->serializationLength() + 4; // 4 byte message length field - ++r.messages_received; - - r.last_recv_time = ros::WallTime::now(); - - //ROS_INFO_STREAM("Received message " << r.messages_received); -} - -void ThroughputTest::receiveThread(boost::barrier* all_ready, boost::barrier* all_start, ros::WallTime* end_time) -{ - receive_thread_result_.reset(new ReceiveThreadResult); - - ReceiveThreadResult& r = *receive_thread_result_; - r.messages_received = 0; - r.bytes_received = 0; - - ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] waiting for all threads to start"); - - all_ready->wait(); - all_start->wait(); - ros::WallTime local_end_time = *end_time; - - ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] running"); - - while (ros::WallTime::now() < local_end_time) - { - queue_.callOne(ros::WallDuration(0.01)); - } - - ROS_INFO_STREAM("Receive thread [" << boost::this_thread::get_id() << "] adding results and exiting"); - - boost::mutex::scoped_lock lock(mutex_); - receive_results_.push_back(boost::shared_ptr(receive_thread_result_.release())); -} - -void ThroughputTest::sendThread(boost::barrier* all_connected) -{ - send_thread_result_.reset(new SendThreadResult); - SendThreadResult& r = *send_thread_result_; - - ros::NodeHandle nh; - nh.setCallbackQueue(&queue_); - std::vector pubs; - for (uint32_t i = 0; i < streams_; ++i) - { - std::stringstream ss; - ss << "throughput_perf_test_" << i; - pubs.push_back(nh.advertise(ss.str(), 1)); - } - - // Need to keep around the publishers so the connections don't go away - { - boost::mutex::scoped_lock lock(mutex_); - pubs_.insert(pubs_.end(), pubs.begin(), pubs.end()); - } - - ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] waiting for connections"); - bool cont = true; - while (cont) - { - cont = false; - for (uint32_t i = 0; i < streams_; ++i) - { - if (pubs[i].getNumSubscribers() == 0) - { - cont = true; - } - } - } - - ThroughputMessagePtr msg(new ThroughputMessage); - msg->array.resize(message_size_); - - all_connected->wait(); - - ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] all connections established, beginning to publish"); - - r.first_send_time = ros::WallTime::now(); - r.bytes_sent = 0; - r.messages_sent = 0; - - try - { - const uint32_t streams = streams_; - while (!boost::this_thread::interruption_requested()) - { - for (uint32_t j = 0; j < streams; ++j) - { - pubs[j].publish(msg); - - ++r.messages_sent; - r.bytes_sent += msg->serializationLength() + 4; - } - - boost::this_thread::yield(); - } - } - catch (boost::thread_interrupted&) - { - } - - ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] exiting"); - - boost::mutex::scoped_lock lock(mutex_); - send_results_.push_back(boost::shared_ptr(send_thread_result_.release())); -} - -ThroughputResult ThroughputTest::run() -{ - ROS_INFO("Starting receive threads"); - ThroughputResult r; - r.test_start = ros::WallTime::now(); - - ros::NodeHandle nh; - nh.setCallbackQueue(&queue_); - - std::vector subs; - for (uint32_t i = 0; i < streams_; ++i) - { - std::stringstream ss; - ss << "throughput_perf_test_" << i; - subs.push_back(nh.subscribe(ss.str(), 0, &ThroughputTest::callback, this, ros::TransportHints().tcpNoDelay())); - } - - boost::barrier sender_all_connected(sender_threads_ + 1); - boost::barrier receiver_all_ready(receiver_threads_ + 1); - boost::barrier receiver_start(receiver_threads_ + 1); - ros::WallTime test_end_time; - - for (uint32_t i = 0; i < receiver_threads_; ++i) - { - receive_threads_.create_thread(boost::bind(&ThroughputTest::receiveThread, this, &receiver_all_ready, &receiver_start, &test_end_time)); - } - - for (uint32_t i = 0; i < sender_threads_; ++i) - { - send_threads_.create_thread(boost::bind(&ThroughputTest::sendThread, this, &sender_all_connected)); - } - - receiver_all_ready.wait(); - test_end_time = ros::WallTime::now() + ros::WallDuration(test_duration_); - receiver_start.wait(); - - ros::WallTime pub_start = ros::WallTime::now(); - sender_all_connected.wait(); - - receive_threads_.join_all(); - ROS_INFO("All receive threads done"); - - send_threads_.interrupt_all(); - send_threads_.join_all(); - ROS_INFO("All publish threads done"); - - ROS_INFO("Collating results"); - - r.test_end = ros::WallTime::now(); - - r.bytes_per_second = 0; - r.message_size = message_size_; - r.messages_received = 0; - r.messages_sent = 0; - r.receiver_threads = receiver_threads_; - r.sender_threads = sender_threads_; - r.total_bytes_received = 0; - r.total_bytes_sent = 0; - r.test_duration = test_duration_; - r.streams = streams_; - - ros::WallTime rec_end; - { - std::vector >::iterator it = receive_results_.begin(); - std::vector >::iterator end = receive_results_.end(); - for (; it != end; ++it) - { - ReceiveThreadResult& rr = **it; - r.total_bytes_received += rr.bytes_received; - r.messages_received += rr.messages_received; - - rec_end = std::max(rec_end, rr.last_recv_time); - } - } - - { - std::vector >::iterator it = send_results_.begin(); - std::vector >::iterator end = send_results_.end(); - for (; it != end; ++it) - { - SendThreadResult& sr = **it; - r.total_bytes_sent += sr.bytes_sent; - r.messages_sent += sr.messages_sent; - - pub_start = std::min(pub_start, sr.first_send_time); - } - } - - r.bytes_per_second = (double)r.total_bytes_received / (rec_end - pub_start).toSec(); - - ROS_INFO("Done collating results"); - - return r; -} - -ThroughputResult throughput(double test_duration, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads) -{ - ROS_INFO_STREAM("*****************************************************"); - ROS_INFO_STREAM("Running throughput test: "<< "receiver_threads [" << receiver_threads << "], sender_threads [" << sender_threads << "], streams [" << streams << "], test_duration [" << test_duration << "], message_size [" << message_size << "]"); - - ThroughputTest t(test_duration, streams, message_size, sender_threads, receiver_threads); - return t.run(); -} - -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -class LatencyTest -{ -public: - LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads); - - LatencyResult run(); - -private: - void sendThread(boost::barrier* b, uint32_t i); - void receiveThread(); - - void receiveCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub); - void sendCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub, uint32_t thread_index); - - boost::mutex mutex_; - - struct ThreadResult - { - uint64_t message_count; - - std::vector latencies; - }; - boost::thread_specific_ptr thread_result_; - std::vector > results_; - - ros::CallbackQueue receive_queue_; - - boost::thread_group send_threads_; - - uint32_t count_per_stream_; - uint32_t streams_; - uint32_t message_size_; - uint32_t sender_threads_; - uint32_t receiver_threads_; -}; - -LatencyTest::LatencyTest(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads) -: count_per_stream_(count_per_stream) -, streams_(streams) -, message_size_(message_size) -, sender_threads_(sender_threads) -, receiver_threads_(receiver_threads) -{ -} - -void LatencyTest::receiveCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub) -{ - ros::WallTime receipt_time = ros::WallTime::now(); - LatencyMessagePtr reply = boost::const_pointer_cast(msg); - reply->receipt_time = receipt_time.toSec(); - pub.publish(reply); - //ROS_INFO("Receiver received message %d", msg->count); -} - -void LatencyTest::sendCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub, uint32_t thread_index) -{ - if (msg->thread_index != thread_index) - { - return; - } - - thread_result_->latencies.push_back(msg->receipt_time - msg->publish_time); - ++thread_result_->message_count; - - LatencyMessagePtr reply = boost::const_pointer_cast(msg); - reply->publish_time = ros::WallTime::now().toSec(); - ++reply->count; - - //ROS_INFO("Sender received return message %d", msg->count); - - if (reply->count < count_per_stream_ * streams_) - { - pub.publish(reply); - } -} - -void LatencyTest::sendThread(boost::barrier* all_connected, uint32_t thread_index) -{ - thread_result_.reset(new ThreadResult); - ThreadResult& r = *thread_result_; - - ros::NodeHandle nh; - ros::CallbackQueue queue; - nh.setCallbackQueue(&queue); - std::vector pubs; - std::vector subs; - pubs.reserve(streams_); - for (uint32_t i = 0; i < streams_; ++i) - { - std::stringstream ss; - ss << "latency_perf_test_" << i; - pubs.push_back(nh.advertise(ss.str(), 0)); - - ss << "_return"; - subs.push_back(nh.subscribe(ss.str(), 0, boost::bind(&LatencyTest::sendCallback, this, _1, boost::ref(pubs[i]), thread_index), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay())); - } - - bool cont = true; - while (cont) - { - cont = false; - for (uint32_t i = 0; i < streams_; ++i) - { - if (pubs[i].getNumSubscribers() == 0) - { - cont = true; - } - } - } - - std::vector messages; - for (uint32_t i = 0; i < streams_; ++i) - { - LatencyMessagePtr msg(new LatencyMessage); - msg->thread_index = thread_index; - msg->array.resize(message_size_); - messages.push_back(msg); - } - - all_connected->wait(); - - r.message_count = 0; - - const uint32_t count = count_per_stream_; - const uint32_t streams = streams_; - const uint32_t total_messages = count * streams; - for (uint32_t j = 0; j < streams; ++j) - { - messages[j]->publish_time = ros::WallTime::now().toSec(); - pubs[j].publish(messages[j]); - } - - while (r.latencies.size() < total_messages) - { - queue.callAvailable(ros::WallDuration(0.01)); - } - - ROS_INFO_STREAM("Publish thread [" << boost::this_thread::get_id() << "] exiting"); - - boost::mutex::scoped_lock lock(mutex_); - results_.push_back(boost::shared_ptr(thread_result_.release())); -} - -LatencyResult LatencyTest::run() -{ - ROS_INFO("Starting receive threads"); - LatencyResult r; - r.test_start = ros::WallTime::now(); - - ros::NodeHandle nh; - nh.setCallbackQueue(&receive_queue_); - - std::vector subs; - std::vector pubs; - pubs.reserve(streams_ * sender_threads_); - for (uint32_t i = 0; i < streams_; ++i) - { - std::stringstream ss; - ss << "latency_perf_test_" << i; - std::string sub_topic = ss.str(); - ss << "_return"; - std::string pub_topic = ss.str(); - pubs.push_back(nh.advertise(pub_topic, 0)); - subs.push_back(nh.subscribe(sub_topic, 0, boost::bind(&LatencyTest::receiveCallback, this, _1, boost::ref(pubs.back())), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay())); - } - - boost::barrier all_connected(1 + sender_threads_); - - ros::WallTime pub_start = ros::WallTime::now(); - ROS_INFO("Starting publish threads"); - for (uint32_t i = 0; i < sender_threads_; ++i) - { - send_threads_.create_thread(boost::bind(&LatencyTest::sendThread, this, &all_connected, i)); - } - - ROS_INFO("Waiting for all connections to establish"); - - bool cont = true; - while (cont) - { - cont = false; - for (uint32_t i = 0; i < streams_; ++i) - { - if (pubs[i].getNumSubscribers() == 0) - { - cont = true; - } - } - } - - all_connected.wait(); - ROS_INFO("All connections established"); - - ros::AsyncSpinner receive_spinner(receiver_threads_, &receive_queue_); - receive_spinner.start(); - - send_threads_.join_all(); - ROS_INFO("All publish threads done"); - - ROS_INFO("Collating results"); - - r.test_end = ros::WallTime::now(); - - r.latency_avg = 0; - r.latency_max = 0; - r.latency_min = 9999999999999ULL; - r.total_message_count = 0; - r.message_size = message_size_; - r.receiver_threads = receiver_threads_; - r.sender_threads = sender_threads_; - r.count_per_stream = count_per_stream_; - r.streams = streams_; - - double latency_total = 0.0; - uint32_t latency_count = 0; - { - std::vector >::iterator it = results_.begin(); - std::vector >::iterator end = results_.end(); - for (; it != end; ++it) - { - ThreadResult& tr = **it; - r.total_message_count += tr.message_count; - - std::vector::iterator lat_it = tr.latencies.begin(); - std::vector::iterator lat_end = tr.latencies.end(); - for (; lat_it != lat_end; ++lat_it) - { - double latency = *lat_it; - r.latency_min = std::min(r.latency_min, latency); - r.latency_max = std::max(r.latency_max, latency); - ++latency_count; - latency_total += latency; - } - } - } - - r.latency_avg = latency_total / latency_count; - - ROS_INFO("Done collating results"); - - return r; -} - -LatencyResult latency(uint32_t count_per_stream, uint32_t streams, uint32_t message_size, uint32_t sender_threads, uint32_t receiver_threads) -{ - ROS_INFO_STREAM("*****************************************************"); - ROS_INFO_STREAM("Running latency test: "<< "receiver_threads [" << receiver_threads << "], sender_threads [" << sender_threads << "], streams [" << streams << "], count_per_stream [" << count_per_stream << "], message_size [" << message_size << "]"); - - LatencyTest t(count_per_stream, streams, message_size, sender_threads, receiver_threads); - return t.run(); -} - -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// - -class STLatencyTest -{ -public: - STLatencyTest(uint32_t message_count); - - STLatencyResult run(); - -private: - - void receiveCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub); - void sendCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub); - - struct Result - { - std::vector latencies; - }; - Result result_; - - ros::CallbackQueue receive_queue_; - - uint32_t message_count_; -}; - -STLatencyTest::STLatencyTest(uint32_t message_count) -: message_count_(message_count) -{ -} - -void STLatencyTest::receiveCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub) -{ - ros::WallTime receipt_time = ros::WallTime::now(); - LatencyMessagePtr reply = boost::const_pointer_cast(msg); - reply->receipt_time = receipt_time.toSec(); - pub.publish(reply); - //ROS_INFO("Receiver received message %d", msg->count); -} - -void STLatencyTest::sendCallback(const LatencyMessageConstPtr& msg, ros::Publisher& pub) -{ - result_.latencies.push_back(msg->receipt_time - msg->publish_time); - - LatencyMessagePtr reply = boost::const_pointer_cast(msg); - reply->publish_time = ros::WallTime::now().toSec(); - ++reply->count; - - //ROS_INFO("Sender received return message %d", msg->count); - - if (reply->count < message_count_) - { - pub.publish(reply); - } -} - -STLatencyResult STLatencyTest::run() -{ - ROS_INFO("Starting receive threads"); - STLatencyResult r; - r.test_start = ros::WallTime::now(); - - ros::NodeHandle nh; - nh.setCallbackQueue(&receive_queue_); - - ros::Publisher recv_pub = nh.advertise("stlatency_perf_test_return", 0); - ros::Subscriber recv_sub = nh.subscribe("stlatency_perf_test", 0, boost::bind(&STLatencyTest::receiveCallback, this, _1, boost::ref(recv_pub)), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay()); - ros::Publisher send_pub = nh.advertise("stlatency_perf_test", 0); - ros::Subscriber send_sub = nh.subscribe("stlatency_perf_test_return", 0, boost::bind(&STLatencyTest::sendCallback, this, _1, boost::ref(send_pub)), ros::VoidConstPtr(), ros::TransportHints().tcpNoDelay()); - - ROS_INFO("Waiting for all connections to establish"); - - bool cont = true; - while (cont) - { - cont = recv_pub.getNumSubscribers() == 0 || send_pub.getNumSubscribers() == 0; - ros::WallDuration(0.001).sleep(); - } - - ROS_INFO("All connections established"); - - LatencyMessagePtr msg(new LatencyMessage); - msg->publish_time = ros::WallTime::now().toSec(); - send_pub.publish(msg); - while (msg->count < message_count_) - { - receive_queue_.callAvailable(ros::WallDuration(0.1)); - } - - r.test_end = ros::WallTime::now(); - - r.latency_avg = 0; - r.latency_max = 0; - r.latency_min = 9999999999999ULL; - r.total_message_count = message_count_; - - double latency_total = 0.0; - uint32_t latency_count = 0; - { - std::vector::iterator lat_it = result_.latencies.begin(); - std::vector::iterator lat_end = result_.latencies.end(); - for (; lat_it != lat_end; ++lat_it) - { - double latency = *lat_it; - r.latency_min = std::min(r.latency_min, latency); - r.latency_max = std::max(r.latency_max, latency); - ++latency_count; - latency_total += latency; - } - } - - r.latency_avg = latency_total / latency_count; - - ROS_INFO("Done collating results"); - - return r; -} - -STLatencyResult stlatency(uint32_t message_count) -{ - ROS_INFO_STREAM("*****************************************************"); - ROS_INFO_STREAM("Running single-threaded latency test: message count[" << message_count << "]"); - - STLatencyTest t(message_count); - return t.run(); -} - -} // namespace intra -} // namespace perf_roscpp diff --git a/test/performance/perf_roscpp/src/intra_suite.cpp b/test/performance/perf_roscpp/src/intra_suite.cpp deleted file mode 100644 index e319e3da..00000000 --- a/test/performance/perf_roscpp/src/intra_suite.cpp +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Software License Agreement (BSD License) - * - * Copyright (c) 2009, Willow Garage, Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions - * are met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials provided - * with the distribution. - * * Neither the name of Willow Garage, Inc. nor the names of its - * contributors may be used to endorse or promote products derived - * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS - * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE - * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, - * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, - * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; - * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER - * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT - * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN - * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - */ - -#include "perf_roscpp/intra.h" - -#include - -#include -#include -#include -#include - -using namespace perf_roscpp; -using namespace std; - -typedef std::vector V_ThroughputResult; -typedef std::vector V_LatencyResult; -typedef std::vector V_STLatencyResult; - -void printResult(std::ostream& out, uint32_t test_num, intra::ThroughputResult& r) -{ - - out << "----------------------------------------------------------\n"; - out << "Throughput Test " << test_num << ": receiver_threads [" << r.receiver_threads << "], sender_threads [" << r.sender_threads << "], streams [" << r.streams << "], test_duration [" << r.test_duration << "], message_size [" << r.message_size << "]\n"; - out << "\tMessages Sent: " << r.messages_sent << endl; - out << "\tMessages Received: " << r.messages_received << " (" << (double)r.messages_received / (double)r.messages_sent * 100.0 << "%)" << endl; - out << "\tBytes Sent: " << r.total_bytes_sent << endl; - out << "\tBytes Received: " << r.total_bytes_received << endl; - out << "\tBytes Per Second: " << r.bytes_per_second << " (" << r.bytes_per_second / (1024.0 * 1024.0) << " MB/s)" << endl; -} - -void printResult(std::ostream& out, uint32_t test_num, intra::LatencyResult& r) -{ - out << "----------------------------------------------------------\n"; - out << "Multi-Threaded Latency Test " << test_num << ": receiver_threads [" << r.receiver_threads << "], sender_threads [" << r.sender_threads << "], streams [" << r.streams << "], count_per_stream [" << r.count_per_stream << "], message_size [" << r.message_size << "]\n"; - out << "\tMessage Count: " << r.total_message_count << endl; - out << "\tLatency Average: " << r.latency_avg << endl; - out << "\tLatency Min: " << r.latency_min << endl; - out << "\tLatency Max: " << r.latency_max << endl; -} - -void printResult(std::ostream& out, uint32_t test_num, intra::STLatencyResult& r) -{ - out << "----------------------------------------------------------\n"; - out << "Single-Threaded Latency Test " << test_num << endl; - out << "\tMessage Count: " << r.total_message_count << endl; - out << "\tLatency Average: " << r.latency_avg << endl; - out << "\tLatency Min: " << r.latency_min << endl; - out << "\tLatency Max: " << r.latency_max << endl; -} - -void addResult(V_ThroughputResult& results, intra::ThroughputResult r, std::ostream& out, uint32_t i) -{ - results.push_back(r); - printResult(out, i, results.back()); -} - -void addResult(V_LatencyResult& results, intra::LatencyResult r, std::ostream& out, uint32_t i) -{ - results.push_back(r); - printResult(out, i, results.back()); -} - -void addResult(V_STLatencyResult& results, intra::STLatencyResult r, std::ostream& out, uint32_t i) -{ - results.push_back(r); - printResult(out, i, results.back()); -} - -void runThroughputTests(std::ostream& out, V_ThroughputResult& results) -{ - uint32_t i = 0; - // test duration, streams, message size , send threads, receive threads - addResult(results, intra::throughput(1 , 1 , 100 , 1 , 1 ), out, i++); - addResult(results, intra::throughput(1 , 1 , 1024*1024*10 , 1 , 1 ), out, i++); - addResult(results, intra::throughput(1 , 1 , 1024*1024*100, 1 , 1 ), out, i++); - - addResult(results, intra::throughput(10 , 1 , 100 , 1 , 1 ), out, i++); - addResult(results, intra::throughput(10 , 1 , 1024*1024*10 , 1 , 1 ), out, i++); - addResult(results, intra::throughput(10 , 1 , 1024*1024*100, 1 , 1 ), out, i++); - -#if 0 - addResult(results, intra::throughput(10 , 1 , 100 , 1 , 10 ), out, i++); - addResult(results, intra::throughput(10 , 1 , 1024*1024*10 , 1 , 10 ), out, i++); - addResult(results, intra::throughput(10 , 1 , 1024*1024*100, 1 , 10 ), out, i++); - - addResult(results, intra::throughput(1 , 10 , 100 , 1 , 1 ), out, i++); - addResult(results, intra::throughput(1 , 10 , 1024*1024*1 , 1 , 1 ), out, i++); - addResult(results, intra::throughput(1 , 10 , 1024*1024*10 , 1 , 1 ), out, i++); - - addResult(results, intra::throughput(10 , 10 , 100 , 1 , 10 ), out, i++); - addResult(results, intra::throughput(10 , 10 , 1024*1024*1 , 1 , 10 ), out, i++); - addResult(results, intra::throughput(10 , 10 , 1024*1024*10 , 1 , 10 ), out, i++); -#endif -} - -void runLatencyTests(std::ostream& out, V_LatencyResult& results) -{ - uint32_t i = 0; - // count per stream, streams, message size , receive threads - addResult(results, intra::latency(100000 , 1 , 1 , 1 , 1 ), out, i++); - addResult(results, intra::latency(10000 , 1 , 1024 , 1 , 1 ), out, i++); - addResult(results, intra::latency(1000 , 1 , 1024*1024 , 1 , 1 ), out, i++); - addResult(results, intra::latency(100 , 1 , 1024*1024*100, 1 , 1 ), out, i++); - -#if 0 - addResult(results, intra::latency(100000 , 1 , 1 , 1 , 10 ), out, i++); - addResult(results, intra::latency(10000 , 1 , 1024 , 1 , 10 ), out, i++); - addResult(results, intra::latency(1000 , 1 , 1024*1024 , 1 , 10 ), out, i++); - addResult(results, intra::latency(100 , 1 , 1024*1024*100, 1 , 10 ), out, i++); - - addResult(results, intra::latency(100000 , 10 , 1 , 1 , 1 ), out, i++); - addResult(results, intra::latency(10000 , 10 , 1024 , 1 , 1 ), out, i++); - addResult(results, intra::latency(1000 , 10 , 1024*1024 , 1 , 1 ), out, i++); - addResult(results, intra::latency(100 , 10 , 1024*1024*100, 1 , 1 ), out, i++); - - addResult(results, intra::latency(10000 , 10 , 1 , 10 , 1 ), out, i++); - addResult(results, intra::latency(1000 , 10 , 1024 , 10 , 1 ), out, i++); - addResult(results, intra::latency(100 , 10 , 1024*1024 , 10 , 1 ), out, i++); - // 100mb test allocates too much memory -#endif -} - -void runSTLatencyTests(std::ostream& out, V_STLatencyResult& results) -{ - uint32_t i = 0; - addResult(results, intra::stlatency(10000), out, i++); - addResult(results, intra::stlatency(100000), out, i++); - addResult(results, intra::stlatency(1000000), out, i++); -} - -int main(int argc, char** argv) -{ - std::ofstream out("intra_suite_out.txt", std::ios::out); - out << std::fixed; - out.precision(10); - cout << std::fixed; - cout.precision(10); - - ROS_ASSERT(out.is_open()); - - ros::init(argc, argv, "perf_roscpp_intra_suite", ros::init_options::NoSigintHandler|ros::init_options::NoRosout); - ros::NodeHandle nh; - - V_ThroughputResult throughput_results; - runThroughputTests(out, throughput_results); - - V_LatencyResult latency_results; - runLatencyTests(out, latency_results); - - V_STLatencyResult stlatency_results; - runSTLatencyTests(out, stlatency_results); - - printf("\n\n\n***************************** Results *****************************\n\n"); - uint32_t i = 0; - { - V_ThroughputResult::iterator it = throughput_results.begin(); - V_ThroughputResult::iterator end = throughput_results.end(); - for (; it != end; ++it, ++i) - { - intra::ThroughputResult& r = *it; - printResult(cout, i, r); - } - } - - i = 0; - { - V_LatencyResult::iterator it = latency_results.begin(); - V_LatencyResult::iterator end = latency_results.end(); - for (; it != end; ++it, ++i) - { - intra::LatencyResult& r = *it; - printResult(cout, i, r); - } - } - - i = 0; - { - V_STLatencyResult::iterator it = stlatency_results.begin(); - V_STLatencyResult::iterator end = stlatency_results.end(); - for (; it != end; ++it, ++i) - { - intra::STLatencyResult& r = *it; - printResult(cout, i, r); - } - } -}