From 3f88b6e0e2a8a8a865394f2d6f6ebc5933bcc7ea Mon Sep 17 00:00:00 2001 From: Tim Field Date: Wed, 28 Apr 2010 23:44:21 +0000 Subject: [PATCH] rosbag: removed TopicInfo; everything uses the more informative ConnectionInfo --- tools/rosbag/src/bag.cpp | 232 +++++++++++++------------- tools/rosbag/src/message_instance.cpp | 41 ++--- tools/rosbag/src/player.cpp | 13 +- tools/rosbag/src/query.cpp | 12 +- tools/rosbag/src/recorder.cpp | 4 +- tools/rosbag/src/view.cpp | 35 ++-- 6 files changed, 170 insertions(+), 167 deletions(-) diff --git a/tools/rosbag/src/bag.cpp b/tools/rosbag/src/bag.cpp index a9c1935a..a0a5e944 100644 --- a/tools/rosbag/src/bag.cpp +++ b/tools/rosbag/src/bag.cpp @@ -69,9 +69,7 @@ Bag::Bag() : Bag::~Bag() { close(); - for (map::iterator i = connection_infos_.begin(); i != connection_infos_.end(); i++) - delete i->second; - for (map::iterator i = topic_infos_.begin(); i != topic_infos_.end(); i++) + for (map::iterator i = connections_.begin(); i != connections_.end(); i++) delete i->second; } @@ -128,15 +126,15 @@ void Bag::openAppend(string const& filename) { } void Bag::write(string const& topic, Time const& time, ros::Message::ConstPtr msg) { - write_(topic, time, *msg, msg->__connection_header); + doWrite(topic, time, *msg, msg->__connection_header); } void Bag::write(string const& topic, Time const& time, ros::Message const& msg) { - write_(topic, time, msg, msg.__connection_header); + doWrite(topic, time, msg, msg.__connection_header); } void Bag::write(string const& topic, Time const& time, MessageInstance const& msg, shared_ptr connection_header) { - write_(topic, time, msg, connection_header); + doWrite(topic, time, msg, connection_header); } void Bag::close() { @@ -191,7 +189,7 @@ void Bag::setCompression(CompressionType compression) { void Bag::writeVersion() { string version = string("#ROSBAG V") + VERSION + string("\n"); - ROS_DEBUG("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str()); + ROS_INFO("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str()); version_ = 200; @@ -214,7 +212,7 @@ void Bag::readVersion() { version_ = version_major * 100 + version_minor; - ROS_DEBUG("Read VERSION: version=%d", version_); + ROS_INFO("Read VERSION: version=%d", version_); } int Bag::getVersion() const { return version_; } @@ -257,8 +255,8 @@ void Bag::startReadingVersion200() { for (uint32_t i = 0; i < chunk_count_; i++) readChunkInfoRecord(); - // Read the topic indexes - foreach(ChunkInfo const& chunk_info, chunk_infos_) { + // Read the connection indexes for each chunk + foreach(ChunkInfo const& chunk_info, chunks_) { curr_chunk_info_ = chunk_info; seek(curr_chunk_info_.pos); @@ -268,20 +266,17 @@ void Bag::startReadingVersion200() { readChunkHeader(chunk_header); seek(chunk_header.compressed_size, std::ios::cur); - // Read the topic index records after the chunk + // Read the index records after the chunk for (unsigned int i = 0; i < chunk_info.topic_counts.size(); i++) - readTopicIndexRecord(); + readConnectionIndexRecord200(); } // At this point we don't have a curr_chunk_info anymore so we reset it curr_chunk_info_ = ChunkInfo(); } +// @todo: handle 1.2 bags with no index void Bag::startReadingVersion102() { - // @todo: handle 1.2 bags with no index - - ROS_DEBUG("Reading in version 1.2 bag"); - // Read the file header record, which points to the start of the topic indexes readFileHeaderRecord(); @@ -294,18 +289,18 @@ void Bag::startReadingVersion102() { // Read the topic index records, which point to the offsets of each message in the file while (file_.getOffset() < filelength) - readTopicIndexRecord(); + readTopicIndexRecord102(); // Read the message definition records (which are the first entry in the topic indexes) for (map >::const_iterator i = topic_indexes_.begin(); i != topic_indexes_.end(); i++) { multiset const& topic_index = i->second; - IndexEntry const& first_entry = *topic_index.begin(); + IndexEntry const& first_entry = *topic_index.begin(); - ROS_DEBUG("Reading message definition for %s at %llu", i->first.c_str(), (unsigned long long) first_entry.chunk_pos); + ROS_INFO("Reading message definition for %s at %llu", i->first.c_str(), (unsigned long long) first_entry.chunk_pos); seek(first_entry.chunk_pos); - readMessageDefinitionRecord(); + readMessageDefinitionRecord102(); } } @@ -314,10 +309,10 @@ void Bag::startReadingVersion102() { void Bag::writeFileHeaderRecord() { boost::mutex::scoped_lock lock(record_mutex_); - connection_count_ = connection_infos_.size(); - chunk_count_ = chunk_infos_.size(); + connection_count_ = connections_.size(); + chunk_count_ = chunks_.size(); - ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d", + ROS_INFO("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d", (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_); // Write file header record @@ -365,7 +360,7 @@ void Bag::readFileHeaderRecord() { readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_); } - ROS_DEBUG("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d", + ROS_INFO("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d", (unsigned long long) index_data_pos_, connection_count_, chunk_count_); // Skip the data section (just padding) @@ -399,7 +394,7 @@ void Bag::startWritingChunk(Time time) { void Bag::stopWritingChunk() { // Add this chunk to the index - chunk_infos_.push_back(curr_chunk_info_); + chunks_.push_back(curr_chunk_info_); // Get the uncompressed and compressed sizes uint32_t uncompressed_size = getChunkOffset(); @@ -411,9 +406,10 @@ void Bag::stopWritingChunk() { seek(curr_chunk_info_.pos); writeChunkHeader(compression_, compressed_size, uncompressed_size); - // Write out the topic indexes and clear them + // Write out the indexes and clear them seek(end_of_chunk_pos); - writeTopicIndexRecords(); + writeIndexRecords(); + curr_chunk_connection_indexes_.clear(); curr_chunk_topic_indexes_.clear(); // Flag that we're starting a new chunk @@ -430,7 +426,7 @@ void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size chunk_header.compressed_size = compressed_size; chunk_header.uncompressed_size = uncompressed_size; - ROS_DEBUG("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d", + ROS_INFO("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d", (unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size); M_string header; @@ -455,52 +451,52 @@ void Bag::readChunkHeader(ChunkHeader& chunk_header) { readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression); readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size); - ROS_DEBUG("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size); + ROS_INFO("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size); } -// Topic index records +// Index records -void Bag::writeTopicIndexRecords() { +void Bag::writeIndexRecords() { boost::mutex::scoped_lock lock(record_mutex_); - for (map >::const_iterator i = curr_chunk_topic_indexes_.begin(); i != curr_chunk_topic_indexes_.end(); i++) { - string const& topic = i->first; - multiset const& topic_index = i->second; + for (map >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) { + uint32_t connection_id = i->first; + multiset const& index = i->second; // Write the index record header + uint32_t index_size = index.size(); M_string header; - header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA); - header[TOPIC_FIELD_NAME] = topic; - header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION); - uint32_t topic_index_size = topic_index.size(); - header[COUNT_FIELD_NAME] = toHeaderString(&topic_index_size); + header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA); + header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id); + header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION); + header[COUNT_FIELD_NAME] = toHeaderString(&index_size); writeHeader(header); - writeDataLength(topic_index_size * sizeof(IndexEntry)); + writeDataLength(index_size * sizeof(IndexEntry)); + + ROS_INFO("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size); - ROS_DEBUG("Writing INDEX_DATA: topic=%s ver=%d count=%d", topic.c_str(), INDEX_VERSION, topic_index_size); - // Write the index record data (pairs of timestamp and position in file) - foreach(IndexEntry const& e, topic_index) { + foreach(IndexEntry const& e, index) { write((char*) &e.time.sec, 4); write((char*) &e.time.nsec, 4); write((char*) &e.offset, 4); - ROS_DEBUG(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset); + ROS_INFO(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset); } } } -void Bag::readTopicIndexRecord() { +void Bag::readTopicIndexRecord102() { ros::Header header; uint32_t data_size; if (!readHeader(header) || !readDataLength(data_size)) throw BagFormatException("Error reading INDEX_DATA header"); M_string& fields = *header.getValues(); - + if (!isOp(fields, OP_INDEX_DATA)) throw BagFormatException("Expected INDEX_DATA record"); - + uint32_t index_version; string topic; uint32_t count; @@ -508,19 +504,13 @@ void Bag::readTopicIndexRecord() { readField(fields, TOPIC_FIELD_NAME, true, topic); readField(fields, COUNT_FIELD_NAME, true, &count); - ROS_DEBUG("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count); + ROS_INFO("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count); - switch (index_version) { - case 0: return readTopicIndexDataVersion0(count, topic); - case 1: return readTopicIndexDataVersion1(count, topic, curr_chunk_info_.pos); - default: + if (index_version != 0) throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str()); - } -} -//! Store the position of the message in the chunk_pos field -void Bag::readTopicIndexDataVersion0(uint32_t count, string const& topic) { - // Read the index entry records + //! NOTE: stores the position of the message in the chunk_pos field + // todo: create fake connection infos multiset& topic_index = topic_indexes_[topic]; for (uint32_t i = 0; i < count; i++) { IndexEntry index_entry; @@ -532,15 +522,41 @@ void Bag::readTopicIndexDataVersion0(uint32_t count, string const& topic) { index_entry.time = Time(sec, nsec); index_entry.offset = 0; - ROS_DEBUG(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos); + ROS_INFO(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos); topic_index.insert(topic_index.end(), index_entry); } } -void Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64_t chunk_pos) { - // Read the index entry records - multiset& topic_index = topic_indexes_[topic]; +void Bag::readConnectionIndexRecord200() { + ros::Header header; + uint32_t data_size; + if (!readHeader(header) || !readDataLength(data_size)) + throw BagFormatException("Error reading INDEX_DATA header"); + M_string& fields = *header.getValues(); + + if (!isOp(fields, OP_INDEX_DATA)) + throw BagFormatException("Expected INDEX_DATA record"); + + uint32_t index_version; + uint32_t connection_id; + uint32_t count; + readField(fields, VER_FIELD_NAME, true, &index_version); + readField(fields, CONNECTION_FIELD_NAME, true, &connection_id); + readField(fields, COUNT_FIELD_NAME, true, &count); + + ROS_INFO("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count); + + if (index_version != 1) + throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str()); + + uint64_t chunk_pos = curr_chunk_info_.pos; + + ConnectionInfo* connection_info = connections_[connection_id]; + multiset& connection_index = connection_indexes_[connection_id]; + string const& topic = connection_info->topic; + multiset& topic_index = topic_indexes_[topic]; + for (uint32_t i = 0; i < count; i++) { IndexEntry index_entry; index_entry.chunk_pos = chunk_pos; @@ -551,8 +567,9 @@ void Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64 read((char*) &index_entry.offset, 4); index_entry.time = Time(sec, nsec); - ROS_DEBUG(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset); + ROS_INFO(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset); + connection_index.insert(connection_index.end(), index_entry); topic_index.insert(topic_index.end(), index_entry); } } @@ -562,14 +579,14 @@ void Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64 void Bag::writeConnectionRecords() { boost::mutex::scoped_lock lock(record_mutex_); - for (map::const_iterator i = connection_infos_.begin(); i != connection_infos_.end(); i++) { + for (map::const_iterator i = connections_.begin(); i != connections_.end(); i++) { ConnectionInfo const* connection_info = i->second; writeConnectionRecord(connection_info); } } void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) { - ROS_DEBUG("Writing CONNECTION [%llu:%d]: topic=%s id=%d", + ROS_INFO("Writing CONNECTION [%llu:%d]: topic=%s id=%d", (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id); M_string header; @@ -609,38 +626,24 @@ void Bag::readConnectionRecord() { if (!readHeader(connection_header)) throw BagFormatException("Error reading connection header"); - // If this is a new connection, update connection_infos_ - map::iterator key = connection_infos_.find(id); - if (key == connection_infos_.end()) { + // If this is a new connection, update connections + map::iterator key = connections_.find(id); + if (key == connections_.end()) { ConnectionInfo* connection_info = new ConnectionInfo(); - connection_info->id = id; - connection_info->topic = topic; + connection_info->id = id; + connection_info->topic = topic; for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++) connection_info->header[i->first] = i->second; - connection_infos_[id] = connection_info; + connection_info->msg_def = connection_info->header["message_definition"]; + connection_info->datatype = connection_info->header["datatype"]; + connection_info->md5sum = connection_info->header["md5sum"]; + connections_[id] = connection_info; - ROS_DEBUG("Read CONNECTION: topic=%s id=%d", topic.c_str(), id); - - // If this is a new topic, update topic_infos_ - map::iterator key = topic_infos_.find(topic); - if (key == topic_infos_.end()) { - string msg_def = connection_info->header["message_definition"]; - string datatype = connection_info->header["datatype"]; - string md5sum = connection_info->header["md5sum"]; - - TopicInfo* topic_info = new TopicInfo(); - topic_info->topic = topic; - topic_info->msg_def = msg_def; - topic_info->datatype = datatype; - topic_info->md5sum = md5sum; - topic_infos_[topic] = topic_info; - - ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str()); - } + ROS_INFO("Read CONNECTION: topic=%s id=%d", topic.c_str(), id); } } -void Bag::readMessageDefinitionRecord() { +void Bag::readMessageDefinitionRecord102() { ros::Header header; uint32_t data_size; if (!readHeader(header) || !readDataLength(data_size)) @@ -656,17 +659,20 @@ void Bag::readMessageDefinitionRecord() { readField(fields, TYPE_FIELD_NAME, true, datatype); readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition); - map::iterator key = topic_infos_.find(topic); - if (key == topic_infos_.end()) { - TopicInfo* topic_info = new TopicInfo(); - topic_info->topic = topic; - topic_info->msg_def = message_definition; - topic_info->datatype = datatype; - topic_info->md5sum = md5sum; - topic_infos_[topic] = topic_info; + /* + map::iterator key = connections_.find(id); + if (key == connections_.end()) { + ConnectionInfo* connection_info = new ConnectionInfo(); + connection_info->id = id; + connection_info->topic = topic; + connection_info->msg_def = message_definition; + connection_info->datatype = datatype; + connection_info->md5sum = md5sum; + connections_[id] = connection_info; - ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str()); + ROS_INFO("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str()); } + */ } void Bag::decompressChunk(uint64_t chunk_pos) { @@ -700,7 +706,7 @@ void Bag::decompressChunk(uint64_t chunk_pos) { } void Bag::loadMessageDataRecord102(uint64_t offset) { - ROS_DEBUG("loadMessageDataRecord: offset=%llu", (unsigned long long) offset); + ROS_INFO("loadMessageDataRecord: offset=%llu", (unsigned long long) offset); seek(offset); @@ -727,7 +733,7 @@ void Bag::decompressRawChunk(ChunkHeader const& chunk_header) { assert(chunk_header.compression == COMPRESSION_NONE); assert(chunk_header.compressed_size == chunk_header.uncompressed_size); - ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size); + ROS_INFO("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size); decompress_buffer_.setSize(chunk_header.compressed_size); file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size); @@ -740,7 +746,7 @@ void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) { CompressionType compression = compression::BZ2; - ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size); + ROS_INFO("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size); chunk_buffer_.setSize(chunk_header.compressed_size); file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size); @@ -793,7 +799,7 @@ uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) { void Bag::writeChunkInfoRecords() { boost::mutex::scoped_lock lock(record_mutex_); - foreach(ChunkInfo const& chunk_info, chunk_infos_) { + foreach(ChunkInfo const& chunk_info, chunks_) { // Write the chunk info header M_string header; uint32_t chunk_topic_count = chunk_info.topic_counts.size(); @@ -811,7 +817,7 @@ void Bag::writeChunkInfoRecords() { data_len += 4 + topic.size() + 4; // 4 bytes for length of topic_name + topic_name + 4 bytes for topic count } - ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d data_len=%d", + ROS_INFO("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d data_len=%d", (unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos, chunk_info.start_time.sec, chunk_info.start_time.nsec, chunk_info.end_time.sec, chunk_info.end_time.nsec, @@ -832,7 +838,7 @@ void Bag::writeChunkInfoRecords() { write(topic); write((char*) &count, 4); - ROS_DEBUG(" - %s: %d", topic.c_str(), count); + ROS_INFO(" - %s: %d", topic.c_str(), count); } } } @@ -861,7 +867,7 @@ void Bag::readChunkInfoRecord() { uint32_t chunk_topic_count; readField(fields, COUNT_FIELD_NAME, true, &chunk_topic_count); - ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu topic_count=%d start=%d.%d end=%d.%d", + ROS_INFO("Read CHUNK_INFO: chunk_pos=%llu topic_count=%d start=%d.%d end=%d.%d", (unsigned long long) chunk_info.pos, chunk_topic_count, chunk_info.start_time.sec, chunk_info.start_time.nsec, chunk_info.end_time.sec, chunk_info.end_time.nsec); @@ -877,12 +883,12 @@ void Bag::readChunkInfoRecord() { string topic = string(topic_name_buf, topic_name_len); - ROS_DEBUG(" %s: %d messages", topic.c_str(), topic_count); + ROS_INFO(" %s: %d messages", topic.c_str(), topic_count); chunk_info.topic_counts[topic] = topic_count; } - chunk_infos_.push_back(chunk_info); + chunks_.push_back(chunk_info); } // Record I/O @@ -958,7 +964,7 @@ void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros:: total_bytes_read = 0; uint8_t op; do { - ROS_DEBUG("reading header from buffer: offset=%d", offset); + ROS_INFO("reading header from buffer: offset=%d", offset); uint32_t bytes_read; readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read); @@ -1043,12 +1049,12 @@ void Bag::dump() { std::cout << "chunk_open: " << chunk_open_ << std::endl; std::cout << "curr_chunk_info: " << curr_chunk_info_.topic_counts.size() << " topics" << std::endl; - std::cout << "topic_infos:" << std::endl; - for (map::const_iterator i = topic_infos_.begin(); i != topic_infos_.end(); i++) - std::cout << " topic: " << i->first << std::endl; + std::cout << "connections:" << std::endl; + for (map::const_iterator i = connections_.begin(); i != connections_.end(); i++) + std::cout << " connection: " << i->first << " " << i->second->topic << std::endl; - std::cout << "chunk_infos:" << std::endl; - for (vector::const_iterator i = chunk_infos_.begin(); i != chunk_infos_.end(); i++) { + std::cout << "chunks:" << std::endl; + for (vector::const_iterator i = chunks_.begin(); i != chunks_.end(); i++) { std::cout << " chunk: " << (*i).topic_counts.size() << " topics" << std::endl; for (map::const_iterator j = (*i).topic_counts.begin(); j != (*i).topic_counts.end(); j++) { std::cout << " - " << j->first << ": " << j->second << std::endl; diff --git a/tools/rosbag/src/message_instance.cpp b/tools/rosbag/src/message_instance.cpp index 5402b3ec..6cdd6128 100644 --- a/tools/rosbag/src/message_instance.cpp +++ b/tools/rosbag/src/message_instance.cpp @@ -32,38 +32,17 @@ using ros::Time; namespace rosbag { -MessageInstance::MessageInstance(TopicInfo const* topic_info, IndexEntry const& index_entry, Bag& bag) : - topic_info_(topic_info), index_entry_(index_entry), bag_(&bag) { } - -string const& MessageInstance::getTopic() const { return topic_info_->topic; } -string const& MessageInstance::getDataType() const { return topic_info_->datatype; } -string const& MessageInstance::getMD5Sum() const { return topic_info_->md5sum; } -string const& MessageInstance::getMessageDefinition() const { return topic_info_->msg_def; } -Time const& MessageInstance::getTime() const { return index_entry_.time; } - -// @todo: this should cache the header -bool MessageInstance::getLatching() const { - ros::Header header = bag_->readMessageDataHeader(index_entry_); - ros::M_string& fields = *header.getValues(); - - ros::M_string::iterator latch_iter = fields.find(string("latching")); - if (latch_iter != fields.end() && latch_iter->second != string("0")) - return true; - else - return false; +MessageInstance::MessageInstance(ConnectionInfo const* connection_info, IndexEntry const& index_entry, Bag& bag) : + connection_info_(connection_info), index_entry_(index_entry), bag_(&bag) +{ } -// @todo: this should cache the header -std::string MessageInstance::getCallerId() const { - ros::Header header = bag_->readMessageDataHeader(index_entry_); - ros::M_string& fields = *header.getValues(); - - ros::M_string::iterator callerid_iter = fields.find(string("callerid")); - if (callerid_iter != fields.end()) - return callerid_iter->second; - else - return std::string(""); -} +string const& MessageInstance::getTopic() const { return connection_info_->topic; } +string const& MessageInstance::getDataType() const { return connection_info_->datatype; } +string const& MessageInstance::getMD5Sum() const { return connection_info_->md5sum; } +string const& MessageInstance::getMessageDefinition() const { return connection_info_->msg_def; } +ros::M_string const& MessageInstance::getConnectionHeader() const { return connection_info_->header; } +Time const& MessageInstance::getTime() const { return index_entry_.time; } uint32_t MessageInstance::size() const { return bag_->readMessageDataSize(index_entry_); @@ -73,4 +52,4 @@ ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& m, uint32_t return ros::AdvertiseOptions(m.getTopic(), queue_size, m.getMD5Sum(), m.getDataType(), m.getMessageDefinition()); } -} +} // namespace rosbag diff --git a/tools/rosbag/src/player.cpp b/tools/rosbag/src/player.cpp index 5dd42712..f2288033 100644 --- a/tools/rosbag/src/player.cpp +++ b/tools/rosbag/src/player.cpp @@ -163,13 +163,22 @@ void Player::doPublish(MessageInstance const& m) { ros::Time const& time = m.getTime(); // Make a unique id composed of the callerid and the topic allowing separate advertisers for separate latching topics - string callerid = m.getCallerId(); + + ros::M_string const& header = m.getConnectionHeader(); + + ros::M_string::const_iterator header_iter = header.find("callerid"); + string callerid; + if (header_iter != header.end()) + callerid = header_iter->second; + string callerid_topic = callerid + topic; map::iterator pub_iter = publishers_.find(callerid_topic); if (pub_iter == publishers_.end()) { ros::AdvertiseOptions opts = createAdvertiseOptions(m, options_.queue_size); - opts.latch = m.getLatching(); + + ros::M_string::const_iterator header_iter = header.find("latching"); + opts.latch = (header_iter != header.end() && header_iter->second == "1"); ros::Publisher pub = node_handle_->advertise(opts); publishers_.insert(publishers_.begin(), pair(callerid_topic, pub)); diff --git a/tools/rosbag/src/query.cpp b/tools/rosbag/src/query.cpp index 8e443160..a35cfd7d 100644 --- a/tools/rosbag/src/query.cpp +++ b/tools/rosbag/src/query.cpp @@ -51,7 +51,7 @@ Query::~Query() { } ros::Time Query::getStartTime() const { return start_time_; } ros::Time Query::getEndTime() const { return end_time_; } -bool Query::evaluate(TopicInfo const*) const { return true; } +bool Query::evaluate(ConnectionInfo const*) const { return true; } Query* Query::clone() const { return new Query(*this); } @@ -68,7 +68,7 @@ TopicQuery::TopicQuery(std::vector const& topics, ros::Time const& { } -bool TopicQuery::evaluate(TopicInfo const* info) const { +bool TopicQuery::evaluate(ConnectionInfo const* info) const { foreach(string const& topic, topics_) if (topic == info->topic) return true; @@ -91,7 +91,7 @@ TypeQuery::TypeQuery(std::vector const& types, ros::Time const& sta { } -bool TypeQuery::evaluate(TopicInfo const* info) const { +bool TypeQuery::evaluate(ConnectionInfo const* info) const { foreach(string const& type, types_) if (type == info->datatype) return true; @@ -115,9 +115,9 @@ BagQuery::~BagQuery() { MessageRange::MessageRange(std::multiset::const_iterator const& _begin, std::multiset::const_iterator const& _end, - TopicInfo const* _topic_info, + ConnectionInfo const* _connection_info, BagQuery const* _bag_query) - : begin(_begin), end(_end), topic_info(_topic_info), bag_query(_bag_query) + : begin(_begin), end(_end), connection_info(_connection_info), bag_query(_bag_query) { } @@ -132,4 +132,4 @@ bool ViewIterHelperCompare::operator()(ViewIterHelper const& a, ViewIterHelper c return (a.iter)->time > (b.iter)->time; } -} +} // namespace rosbag diff --git a/tools/rosbag/src/recorder.cpp b/tools/rosbag/src/recorder.cpp index 2b005996..9d8f1797 100644 --- a/tools/rosbag/src/recorder.cpp +++ b/tools/rosbag/src/recorder.cpp @@ -283,7 +283,7 @@ void Recorder::startWriting() { bag_.open(write_filename_, bagmode::Write); } catch (rosbag::BagException e) { - ROS_ERROR(e.what()); + ROS_ERROR("Error writing: %s", e.what()); exit_code_ = 1; ros::shutdown(); } @@ -367,7 +367,7 @@ void Recorder::doRecordSnapshotter() { bag_.open(write_filename, bagmode::Write); } catch (rosbag::BagException ex) { - ROS_ERROR(ex.what()); + ROS_ERROR("Error writing: %s", ex.what()); return; } diff --git a/tools/rosbag/src/view.cpp b/tools/rosbag/src/view.cpp index 0045a775..b8623bcf 100644 --- a/tools/rosbag/src/view.cpp +++ b/tools/rosbag/src/view.cpp @@ -61,8 +61,7 @@ void View::iterator::populate() { void View::iterator::populateSeek(multiset::const_iterator iter) { iters_.clear(); foreach(MessageRange const* range, view_->ranges_) { - multiset::const_iterator start = std::lower_bound(range->begin, - range->end, iter->time, IndexEntryCompare()); + multiset::const_iterator start = std::lower_bound(range->begin, range->end, iter->time, IndexEntryCompare()); if (start != range->end) iters_.push_back(ViewIterHelper(start, range)); } @@ -105,8 +104,16 @@ void View::iterator::increment() { //! \todo some check in case we are at end MessageInstance View::iterator::dereference() const { + ROS_ASSERT(iters_.size() > 0); + ViewIterHelper const& i = iters_.back(); - return MessageInstance(i.range->topic_info, *(i.iter), *(i.range->bag_query->bag)); + + ROS_ASSERT(i.range != NULL); + ROS_ASSERT(i.range->connection_info != NULL); + ROS_ASSERT(i.range->bag_query != NULL); + ROS_ASSERT(i.range->bag_query->bag != NULL); + + return MessageInstance(i.range->connection_info, *(i.iter), *(i.range->bag_query->bag)); } // View @@ -140,22 +147,24 @@ void View::addQuery(Bag& bag, Query const& query) { } void View::updateQueries(BagQuery* q) { - for (map::iterator i = q->bag->topic_infos_.begin(); i != q->bag->topic_infos_.end(); i++) { + for (map::iterator i = q->bag->connections_.begin(); i != q->bag->connections_.end(); i++) { + ConnectionInfo const* connection = i->second; + // Skip if the query doesn't evaluate to true - if (!q->query->evaluate(i->second)) + if (!q->query->evaluate(connection)) continue; - map >::iterator j = q->bag->topic_indexes_.find(i->second->topic); + map >::iterator j = q->bag->connection_indexes_.find(connection->id); // Skip if the bag doesn't have the corresponding index - if (j == q->bag->topic_indexes_.end()) + if (j == q->bag->connection_indexes_.end()) continue; + multiset& index = j->second; // lower_bound/upper_bound do a binary search to find the appropriate range of Index Entries given our time range - - std::multiset::const_iterator begin = std::lower_bound(j->second.begin(), j->second.end(), q->query->getStartTime(), IndexEntryCompare()); - std::multiset::const_iterator end = std::upper_bound(j->second.begin(), j->second.end(), q->query->getEndTime(), IndexEntryCompare()); - TopicInfo* topic_info = i->second; + + std::multiset::const_iterator begin = std::lower_bound(index.begin(), index.end(), q->query->getStartTime(), IndexEntryCompare()); + std::multiset::const_iterator end = std::upper_bound(index.begin(), index.end(), q->query->getEndTime(), IndexEntryCompare()); // todo: this could be made faster with a map of maps bool found = false; @@ -163,7 +172,7 @@ void View::updateQueries(BagQuery* q) { MessageRange* r = *k; // If the topic and query are already in our ranges, we update - if (r->bag_query == q && r->topic_info->topic == topic_info->topic) { + if (r->bag_query == q && r->connection_info->topic == connection->topic) { r->begin = begin; r->end = end; found = true; @@ -171,7 +180,7 @@ void View::updateQueries(BagQuery* q) { } } if (!found) - ranges_.push_back(new MessageRange(begin, end, topic_info, q)); + ranges_.push_back(new MessageRange(begin, end, connection, q)); } view_revision_++;