From 5041f272048fa0a0078bc71dfbdac81531f5c576 Mon Sep 17 00:00:00 2001 From: Tim Field Date: Wed, 28 Apr 2010 02:38:29 +0000 Subject: [PATCH] rosbag: changed format from MSG_DEF to CONNECTION messages; now storing connection header --- tools/rosbag/CMakeLists.txt | 3 + tools/rosbag/FORMATS | 28 +- tools/rosbag/include/rosbag/bag.h | 252 +++++----- tools/rosbag/include/rosbag/constants.h | 34 +- tools/rosbag/include/rosbag/exceptions.h | 30 +- tools/rosbag/include/rosbag/query.h | 50 +- tools/rosbag/include/rosbag/structures.h | 8 + tools/rosbag/src/bag.cpp | 603 +++++++++++------------ tools/rosbag/src/message_instance.cpp | 1 + tools/rosbag/src/player.cpp | 3 +- tools/rosbag/src/query.cpp | 52 +- tools/rosbag/src/recorder.cpp | 32 +- tools/rosbag/src/rosbag/bag.py | 161 +++--- tools/rosbag/src/view.cpp | 64 +-- tools/rosbag/test/test_bag.py | 2 + tools/rosbag/test/utest.cpp | 149 ++---- 16 files changed, 736 insertions(+), 736 deletions(-) diff --git a/tools/rosbag/CMakeLists.txt b/tools/rosbag/CMakeLists.txt index a006ec1d..492272a5 100644 --- a/tools/rosbag/CMakeLists.txt +++ b/tools/rosbag/CMakeLists.txt @@ -20,6 +20,9 @@ target_link_libraries(record rosbag) rosbuild_add_executable(play src/play.cpp) target_link_libraries(play rosbag) +rosbuild_add_executable(mytest test/mytest.cpp) +target_link_libraries(mytest rosbag) + INSTALL(TARGETS record play RUNTIME DESTINATION $ENV{ROS_ROOT}/bin) diff --git a/tools/rosbag/FORMATS b/tools/rosbag/FORMATS index 819b9215..424db06f 100644 --- a/tools/rosbag/FORMATS +++ b/tools/rosbag/FORMATS @@ -96,6 +96,17 @@ chunk_1 compression=bz2 # type of compression (optional) size= # size of data when uncompressed (optional) --- + conn_1i + op=7 + topic=topic + id=0 + --- + topic=topic + message_definition= + type= + md5sum= + latching=1 + callerid=caller1234 msg_def_1i op=1 topic= @@ -106,14 +117,16 @@ chunk_1 msg_1 op=2 - topic= - time= + topic=... + time=... + conn=... --- msg_2 op=2 - topic= - time= + topic=... + time=... + conn=... --- @@ -163,6 +176,13 @@ chunk_index_2.0 ... +conn_1 + op=7 + id=0 + --- + latching=1 + callerid=caller1234 + msg_def_1 op=1 topic= diff --git a/tools/rosbag/include/rosbag/bag.h b/tools/rosbag/include/rosbag/bag.h index d73bfd07..e2f62219 100644 --- a/tools/rosbag/include/rosbag/bag.h +++ b/tools/rosbag/include/rosbag/bag.h @@ -53,6 +53,7 @@ #include #include +#include #include #include @@ -60,8 +61,8 @@ namespace rosbag { namespace bagmode { -//! The possible modes to open a bag in -enum BagMode + //! The possible modes to open a bag in + enum BagMode { Write = 0, Read = 1, @@ -84,17 +85,16 @@ public: Bag(); ~Bag(); - bool open(std::string const& filename, BagMode mode = bagmode::Default); //!< Open a bag file + void open(std::string const& filename, BagMode mode = bagmode::Default); //!< Open a bag file void close(); //!< Close the bag file (write to disk, append index, etc.) - std::string getFileName() const; //!< Get the filename of the bag - BagMode getMode() const; //!< Get the mode the bag is in - int getVersion() const; //!< Get the version of the open bagfile - int getMajorVersion() const; //!< Get the major-version of the open bagfile - int getMinorVersion() const; //!< Get the minor-version of the open bagfile - uint64_t getOffset() const; //!< Get the offset into the actual file + std::string getFileName() const; //!< Get the filename of the bag + BagMode getMode() const; //!< Get the mode the bag is in + int getVersion() const; //!< Get the version of the open bagfile + int getMajorVersion() const; //!< Get the major-version of the open bagfile + int getMinorVersion() const; //!< Get the minor-version of the open bagfile + uint64_t getOffset() const; //!< Get the offset into the actual file - // Version 1.3 options void setCompression(CompressionType compression); //!< Set the compression method to use for writing chunks CompressionType getCompression() const; //!< Get the compression method to use for writing chunks void setChunkThreshold(uint32_t chunk_threshold); //!< Set the threshold for creating new chunks @@ -135,14 +135,13 @@ public: void dump(); private: - - //This helper function actually does the write with an arbitary serializable message thing + // This helper function actually does the write with an arbitary serializable message thing template void write_(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr connection_header); - bool openRead (std::string const& filename); - bool openWrite (std::string const& filename); - bool openAppend(std::string const& filename); + void openRead (std::string const& filename); + void openWrite (std::string const& filename); + void openAppend(std::string const& filename); void closeWrite(); @@ -152,19 +151,19 @@ private: void startWritingVersion200(); void stopWritingVersion200(); - bool startReadingVersion102(); - bool startReadingVersion200(); + void startReadingVersion102(); + void startReadingVersion200(); // Writing void writeVersion(); void writeFileHeaderRecord(); - void writeMessageDefinitionRecord(TopicInfo const* topic_info); - void appendMessageDefinitionRecordToBuffer(Buffer& buf, TopicInfo const* topic_info); + void writeConnectionRecord(ConnectionInfo const* connection_info); + void appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info); template - void writeMessageDataRecord(std::string const& topic, ros::Time const& time, bool latching, std::string const& callerid, T const& msg); + void writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg); void writeTopicIndexRecords(); - void writeMessageDefinitionRecords(); + void writeConnectionRecords(); void writeChunkInfoRecords(); void startWritingChunk(ros::Time time); void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size); @@ -172,37 +171,41 @@ private: // Reading - bool readVersion(); - bool readFileHeaderRecord(); - bool readMessageDefinitionRecord(); + void readVersion(); + void readFileHeaderRecord(); + void readConnectionRecord(); + void readMessageDefinitionRecord(); ros::Header readMessageDataHeader(IndexEntry const& index_entry); uint32_t readMessageDataSize(IndexEntry const& index_entry); - + // Would be nice not to have to template this on Stream. Also, // we don't need to read the header here either. It just so // happens to be the most efficient way to skip it at the moment. template void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream); - bool readChunkHeader(ChunkHeader& chunk_header); - bool readTopicIndexRecord(); - bool readTopicIndexDataVersion0(uint32_t count, std::string const& topic); - bool readTopicIndexDataVersion1(uint32_t count, std::string const& topic, uint64_t chunk_pos); - bool readChunkInfoRecord(); + void readChunkHeader(ChunkHeader& chunk_header); + void readTopicIndexRecord(); + void readTopicIndexDataVersion0(uint32_t count, std::string const& topic); + void readTopicIndexDataVersion1(uint32_t count, std::string const& topic, uint64_t chunk_pos); + void readChunkInfoRecord(); - bool decompressChunk(uint64_t chunk_pos); - bool decompressRawChunk(ChunkHeader const& chunk_header); - bool decompressBz2Chunk(ChunkHeader const& chunk_header); + void decompressChunk(uint64_t chunk_pos); + void decompressRawChunk(ChunkHeader const& chunk_header); + void decompressBz2Chunk(ChunkHeader const& chunk_header); uint32_t getChunkOffset() const; // Record header I/O - void writeHeader(ros::M_string const& fields, uint32_t data_len); - void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields, uint32_t data_len); - bool readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read); - bool readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read); - bool readHeader(ros::Header& header, uint32_t& data_size); + void writeHeader(ros::M_string const& fields); + void writeDataLength(uint32_t data_len); + void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields); + void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len); + void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read); + void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read); + bool readHeader(ros::Header& header); + bool readDataLength(uint32_t& data_size); bool isOp(ros::M_string& fields, uint8_t reqOp); // Header fields @@ -213,12 +216,12 @@ private: std::string toHeaderString(ros::Time const* field); template - bool readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data); + void readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data); - bool readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data); - bool readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data); + void readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data); + void readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data); - bool readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data); + void readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data); ros::M_string::const_iterator checkField(ros::M_string const& fields, std::string const& field, unsigned int min_len, unsigned int max_len, bool required) const; @@ -240,7 +243,7 @@ private: uint64_t file_header_pos_; uint64_t index_data_pos_; - uint32_t topic_count_; + uint32_t connection_count_; uint32_t chunk_count_; boost::mutex record_mutex_; @@ -252,21 +255,26 @@ private: uint64_t curr_chunk_data_pos_; std::map > curr_chunk_topic_indexes_; - std::map topic_infos_; - std::vector chunk_infos_; + uint32_t top_connection_id_; + std::map topic_connection_ids_; + std::map header_connection_ids_; + std::map connection_infos_; + + std::map topic_infos_; + std::vector chunk_infos_; std::map > topic_indexes_; - Buffer header_buffer_; //!< reusable buffer in which to assemble the record header before writing to file - Buffer record_buffer_; //!< reusable buffer in which to assemble the record data before writing to file + Buffer header_buffer_; //!< reusable buffer in which to assemble the record header before writing to file + Buffer record_buffer_; //!< reusable buffer in which to assemble the record data before writing to file - Buffer chunk_buffer_; //!< reusable buffer to read chunk into - Buffer decompress_buffer_; //!< reusable buffer to decompress chunks into + Buffer chunk_buffer_; //!< reusable buffer to read chunk into + Buffer decompress_buffer_; //!< reusable buffer to decompress chunks into - Buffer outgoing_chunk_buffer_; //!< reusable buffer to read chunk into + Buffer outgoing_chunk_buffer_; //!< reusable buffer to read chunk into Buffer* current_buffer_; - uint64_t decompressed_chunk_; //!< position of decompressed chunk + uint64_t decompressed_chunk_; //!< position of decompressed chunk }; } @@ -283,17 +291,13 @@ std::string Bag::toHeaderString(T const* field) { } template -bool Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) { - ros::M_string::const_iterator i; - if ((i = checkField(fields, field_name, sizeof(T), sizeof(T), required)) == fields.end()) - return false; +void Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) { + ros::M_string::const_iterator i = checkField(fields, field_name, sizeof(T), sizeof(T), required); memcpy(data, i->second.data(), sizeof(T)); - return true; } template -void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) -{ +void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) { ros::Header header; uint32_t data_size; uint32_t bytes_read; @@ -306,11 +310,9 @@ void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& strea if (data_size > 0) memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.offset + bytes_read, data_size); - - return; - default: - ROS_FATAL("Unhandled version: %d", version_); break; + default: + throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str()); } } @@ -331,59 +333,78 @@ boost::shared_ptr Bag::instantiateBuffer(IndexEntry const& index_entry) ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size); ros::serialization::deserialize(s, *p); } - break; + return p; default: - ROS_FATAL("Unhandled version: %d", version_); - break; + throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str()); } - - return p; } template void Bag::write_(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr connection_header) { - // Whenever we write we increment our revision bag_revision_++; - bool needs_def_written = false; + // @todo refactor into separate function + // Get ID for connection header + bool needs_conn_written = false; + ConnectionInfo* connection_info = NULL; + uint32_t conn_id = 0; + ros::M_string* header_address = connection_header.get(); + if (header_address == NULL) { + // No connection header: we'll manufacture one, and store by topic. + + std::map::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic); + if (topic_connection_ids_iter == topic_connection_ids_.end()) { + conn_id = top_connection_id_++; + topic_connection_ids_[topic] = conn_id; + + // Flag that we need to write a connection record + needs_conn_written = true; + } + else { + conn_id = topic_connection_ids_iter->second; + connection_info = connection_infos_[conn_id]; + } + } + else { + // Store the connection info by the address of the connection header + + std::map::iterator header_connection_ids_iter = header_connection_ids_.find(header_address); + if (header_connection_ids_iter == header_connection_ids_.end()) { + conn_id = top_connection_id_++; + header_connection_ids_[header_address] = conn_id; + + // Flag that we need to write a connection record + needs_conn_written = true; + } + else { + conn_id = header_connection_ids_iter->second; + connection_info = connection_infos_[conn_id]; + } + } + + // Store the topic info TopicInfo* topic_info; { boost::mutex::scoped_lock lock(topic_infos_mutex_); - + std::map::iterator key = topic_infos_.find(topic); if (key == topic_infos_.end()) { // Extract the topic info from the message topic_info = new TopicInfo(); topic_info->topic = topic; - topic_info->msg_def = ros::message_traits::definition(msg); topic_info->datatype = ros::message_traits::datatype(msg); topic_info->md5sum = ros::message_traits::md5sum(msg); + topic_info->msg_def = ros::message_traits::definition(msg); topic_infos_[topic] = topic_info; // Initialize the topic index topic_indexes_[topic] = std::multiset(); - - // Flag that we need to write a message definition - needs_def_written = true; } else topic_info = key->second; } - // Get information about possible latching and callerid from the connection header - bool latching = false; - std::string callerid(""); - if (connection_header != NULL) { - ros::M_string::iterator latch_iter = connection_header->find(std::string("latching")); - if (latch_iter != connection_header->end() && latch_iter->second != std::string("0")) - latching = true; - - ros::M_string::iterator callerid_iter = connection_header->find(std::string("callerid")); - if (callerid_iter != connection_header->end()) - callerid = callerid_iter->second; - } - { //! \todo enabling this lock seems to cause a deadlock - do we even need it? //boost::mutex::scoped_lock lock(record_mutex_); @@ -396,11 +417,26 @@ void Bag::write_(std::string const& topic, ros::Time const& time, T const& msg, if (!chunk_open_) startWritingChunk(time); - // Write a message definition record, if necessary - if (needs_def_written) - { - writeMessageDefinitionRecord(topic_info); - appendMessageDefinitionRecordToBuffer(outgoing_chunk_buffer_, topic_info); + // Write connection info record, if necessary + if (needs_conn_written) { + // Create connection info + connection_info = new ConnectionInfo(); + connection_info->topic = topic; + connection_info->id = conn_id; + if (connection_header != NULL) { + for (ros::M_string::const_iterator i = connection_header->begin(); i != connection_header->end(); i++) + connection_info->header[i->first] = i->second; + } + else { + connection_info->header["topic"] = topic_info->topic; + connection_info->header["type"] = topic_info->datatype; + connection_info->header["md5sum"] = topic_info->md5sum; + connection_info->header["message_definition"] = topic_info->msg_def; + } + connection_infos_[conn_id] = connection_info; + + writeConnectionRecord(connection_info); + appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info); } // Add to topic index @@ -411,7 +447,6 @@ void Bag::write_(std::string const& topic, ros::Time const& time, T const& msg, std::multiset& chunk_index = curr_chunk_topic_indexes_[topic]; chunk_index.insert(chunk_index.end(), index_entry); - std::multiset& index = topic_indexes_[topic]; index.insert(index.end(), index_entry); @@ -419,32 +454,28 @@ void Bag::write_(std::string const& topic, ros::Time const& time, T const& msg, curr_chunk_info_.topic_counts[topic]++; // Write the message data - writeMessageDataRecord(topic, time, latching, callerid, msg); + writeMessageDataRecord(conn_id, time, msg); // Check if we want to stop this chunk uint32_t chunk_size = getChunkOffset(); ROS_DEBUG(" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_); - if (chunk_size > chunk_threshold_) - { + if (chunk_size > chunk_threshold_) { // Empty the outgoing chunk stopWritingChunk(); outgoing_chunk_buffer_.setSize(0); - // We no longer have a valid curr_chunk_info... + + // We no longer have a valid curr_chunk_info curr_chunk_info_.pos = -1; } } } template -void Bag::writeMessageDataRecord(std::string const& topic, ros::Time const& time, bool latching, std::string const& callerid, T const& msg) { +void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg) { ros::M_string header; - header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA); - header[TOPIC_FIELD_NAME] = topic; - header[TIME_FIELD_NAME] = toHeaderString(&time); - if (latching) { - header[LATCHING_FIELD_NAME] = std::string("1"); - header[CALLERID_FIELD_NAME] = callerid; - } + header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA); + header[CONNECTION_FIELD_NAME] = toHeaderString(&conn_id); + header[TIME_FIELD_NAME] = toHeaderString(&time); // Assemble message in memory first, because we need to write its length uint32_t msg_ser_len = ros::serialization::serializationLength(msg); @@ -456,19 +487,20 @@ void Bag::writeMessageDataRecord(std::string const& topic, ros::Time const& time // TODO: with a little work here we can serialize directly into the file -- sweet ros::serialization::serialize(s, msg); - ROS_DEBUG("Writing MSG_DATA [%llu:%d]: topic=%s sec=%d nsec=%d data_len=%d", - (unsigned long long) file_.getOffset(), getChunkOffset(), topic.c_str(), time.sec, time.nsec, msg_ser_len); - + ROS_DEBUG("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d", + (unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len); // TODO: If we are clever here, we can serialize into the // outgoing_chunk_buffer and get rid of the record_buffer_ all // together. - writeHeader(header, msg_ser_len); + writeHeader(header); + writeDataLength(msg_ser_len); write((char*) record_buffer_.getData(), msg_ser_len); // TODO: Using appendHeaderToBuffer is ugly. We need a better abstraction - appendHeaderToBuffer(outgoing_chunk_buffer_, header, msg_ser_len); + appendHeaderToBuffer(outgoing_chunk_buffer_, header); + appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len); uint32_t offset = outgoing_chunk_buffer_.getSize(); outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len); diff --git a/tools/rosbag/include/rosbag/constants.h b/tools/rosbag/include/rosbag/constants.h index dc8b74a3..8cfd40ba 100644 --- a/tools/rosbag/include/rosbag/constants.h +++ b/tools/rosbag/include/rosbag/constants.h @@ -56,25 +56,25 @@ static const unsigned char OP_CHUNK_INFO = 0x06; static const unsigned char OP_CONNECTION = 0x07; // Header field keys -static const std::string OP_FIELD_NAME = "op"; -static const std::string TOPIC_FIELD_NAME = "topic"; -static const std::string MD5_FIELD_NAME = "md5"; -static const std::string TYPE_FIELD_NAME = "type"; -static const std::string DEF_FIELD_NAME = "def"; -static const std::string VER_FIELD_NAME = "ver"; -static const std::string COUNT_FIELD_NAME = "count"; -static const std::string INDEX_POS_FIELD_NAME = "index_pos"; // 1.2+ -static const std::string TOPIC_COUNT_FIELD_NAME = "topic_count"; // 2.0+ -static const std::string CHUNK_COUNT_FIELD_NAME = "chunk_count"; // 2.0+ -static const std::string CHUNK_POS_FIELD_NAME = "chunk_pos"; // 2.0+ -static const std::string COMPRESSION_FIELD_NAME = "compression"; // 2.0+ -static const std::string SIZE_FIELD_NAME = "size"; // 2.0+ -static const std::string TIME_FIELD_NAME = "time"; // 2.0+ -static const std::string START_TIME_FIELD_NAME = "start_time"; // 2.0+ -static const std::string END_TIME_FIELD_NAME = "end_time"; // 2.0+ -static const std::string CONNECTION_FIELD_NAME = "conn"; // 2.0+ +static const std::string OP_FIELD_NAME = "op"; +static const std::string TOPIC_FIELD_NAME = "topic"; +static const std::string VER_FIELD_NAME = "ver"; +static const std::string COUNT_FIELD_NAME = "count"; +static const std::string INDEX_POS_FIELD_NAME = "index_pos"; // 1.2+ +static const std::string CONNECTION_COUNT_FIELD_NAME = "conn_count"; // 2.0+ +static const std::string CHUNK_COUNT_FIELD_NAME = "chunk_count"; // 2.0+ +static const std::string CHUNK_POS_FIELD_NAME = "chunk_pos"; // 2.0+ +static const std::string COMPRESSION_FIELD_NAME = "compression"; // 2.0+ +static const std::string SIZE_FIELD_NAME = "size"; // 2.0+ +static const std::string TIME_FIELD_NAME = "time"; // 2.0+ +static const std::string START_TIME_FIELD_NAME = "start_time"; // 2.0+ +static const std::string END_TIME_FIELD_NAME = "end_time"; // 2.0+ +static const std::string CONNECTION_FIELD_NAME = "conn"; // 2.0+ // Legacy fields +static const std::string MD5_FIELD_NAME = "md5"; // <2.0 +static const std::string TYPE_FIELD_NAME = "type"; // <2.0 +static const std::string DEF_FIELD_NAME = "def"; // <2.0 static const std::string SEC_FIELD_NAME = "sec"; // <2.0 static const std::string NSEC_FIELD_NAME = "nsec"; // <2.0 static const std::string LATCHING_FIELD_NAME = "latching"; // <2.0 diff --git a/tools/rosbag/include/rosbag/exceptions.h b/tools/rosbag/include/rosbag/exceptions.h index 45af2d8a..e7b66624 100644 --- a/tools/rosbag/include/rosbag/exceptions.h +++ b/tools/rosbag/include/rosbag/exceptions.h @@ -37,21 +37,31 @@ #include -using ros::Exception; - namespace rosbag { -//! Exception thrown if trying to add or read from an unopened recorder/player -class BagNotOpenException : public Exception { }; +//! Base class for rosbag exceptions +class BagException : public ros::Exception +{ +public: + BagException(std::string const& msg) : ros::Exception(msg) { } +}; -//! Exception thrown when assorted IO problems -class BagIOException : public Exception { }; +//! Exception thrown if trying to read from or write to an unopened bag +class BagNotOpenException : public BagException { }; -//! Exception thrown if an invalid MsgPos (such as from another bag) is passed to seek -class InvalidMsgPosException : public Exception { }; +//! Exception thrown when on IO problems +class BagIOException : public BagException +{ +public: + BagIOException(std::string const& msg) : BagException(msg) { } +}; -//! Exception thrown if trying to instantiate a MsgInstance as the wrong type -class InstantiateException : public Exception { }; +//! Exception thrown on problems reading the bag format +class BagFormatException : public BagException +{ +public: + BagFormatException(std::string const& msg) : BagException(msg) { } +}; } diff --git a/tools/rosbag/include/rosbag/query.h b/tools/rosbag/include/rosbag/query.h index b1a25c97..1601897b 100644 --- a/tools/rosbag/include/rosbag/query.h +++ b/tools/rosbag/include/rosbag/query.h @@ -53,15 +53,15 @@ class Query public: //! The base query takes an optional time-range /*! - * param start_time the beginning of the time_range for the query - * param end_time the end of the time_range for the query + * param start_time the beginning of the time_range for the query + * param end_time the end of the time_range for the query */ Query(ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX); virtual ~Query(); - ros::Time getStartTime() const; //!< Accessor for start-timep - ros::Time getEndTime() const; //!< Accessor for end-timep + ros::Time getStartTime() const; //!< Get the start-time + ros::Time getEndTime() const; //!< Get the end-time //! Virtual function which determines topic-inclusion /*! @@ -71,58 +71,60 @@ public: virtual bool evaluate(TopicInfo const* topic_info) const; //! Virtual function to clone so we can store a copy of the query - virtual Query* clone() const { return new Query(*this); } - + virtual Query* clone() const; private: ros::Time start_time_; ros::Time end_time_; }; -//! A Query the returns messages on a specified topic +//! A Query the returns messages on the specified topics class TopicQuery : public Query { public: + TopicQuery(std::string const& topic, + ros::Time const& start_time = ros::TIME_MIN, + ros::Time const& end_time = ros::TIME_MAX); + TopicQuery(std::vector const& topics, ros::Time const& start_time = ros::TIME_MIN, ros::Time const& end_time = ros::TIME_MAX); virtual bool evaluate(TopicInfo const*) const; - virtual Query* clone() const { return new TopicQuery(*this); } + virtual Query* clone() const; private: std::vector topics_; }; -//! A Query the returns messages of a specified type +//! A Query the returns messages of the specified types class TypeQuery : public Query { public: - TypeQuery(std::vector const& topics, - ros::Time const& start_time = ros::TIME_MIN, - ros::Time const& end_time = ros::TIME_MAX); + TypeQuery(std::string const& type, + ros::Time const& start_time = ros::TIME_MIN, + ros::Time const& end_time = ros::TIME_MAX); + + TypeQuery(std::vector const& types, + ros::Time const& start_time = ros::TIME_MIN, + ros::Time const& end_time = ros::TIME_MAX); virtual bool evaluate(TopicInfo const*) const; - virtual Query* clone() const { return new TypeQuery(*this); } + virtual Query* clone() const; private: - std::vector topics_; + std::vector types_; }; //! Pairs of queries and the bags they come from (used internally by View) struct BagQuery { - Bag* bag; - Query* query; + BagQuery(Bag* _bag, Query const& _query, uint32_t _bag_revision); + ~BagQuery(); + + Bag* bag; + Query* query; uint32_t bag_revision; - BagQuery(Bag* _bag, const Query& _query, uint32_t _bag_revision) : bag(_bag), bag_revision(_bag_revision) - { - query = _query.clone(); - } - - ~BagQuery() { - delete query; - } }; struct MessageRange diff --git a/tools/rosbag/include/rosbag/structures.h b/tools/rosbag/include/rosbag/structures.h index 37b9c11a..afb69fad 100644 --- a/tools/rosbag/include/rosbag/structures.h +++ b/tools/rosbag/include/rosbag/structures.h @@ -38,11 +38,19 @@ #include #include +#include "ros/message.h" #include "ros/time.h" namespace rosbag { +struct ConnectionInfo +{ + uint32_t id; + std::string topic; + ros::M_string header; +}; + struct TopicInfo { std::string topic; diff --git a/tools/rosbag/src/bag.cpp b/tools/rosbag/src/bag.cpp index d112820a..ed51e700 100644 --- a/tools/rosbag/src/bag.cpp +++ b/tools/rosbag/src/bag.cpp @@ -44,6 +44,7 @@ using std::priority_queue; using std::string; using std::vector; using std::multiset; +using boost::format; using boost::shared_ptr; using ros::M_string; using ros::Time; @@ -51,15 +52,16 @@ using ros::Time; namespace rosbag { Bag::Bag() : - compression_(compression::BZ2), + compression_(compression::None), chunk_threshold_(768 * 1024), // 768KB chunks bag_revision_(0), file_header_pos_(0), index_data_pos_(0), - topic_count_(0), + connection_count_(0), chunk_count_(0), chunk_open_(false), curr_chunk_data_pos_(0), + top_connection_id_(0), decompressed_chunk_(0) { } @@ -67,63 +69,50 @@ Bag::Bag() : Bag::~Bag() { close(); + for (map::iterator i = connection_infos_.begin(); i != connection_infos_.end(); i++) + delete i->second; for (map::iterator i = topic_infos_.begin(); i != topic_infos_.end(); i++) - delete i->second; + delete i->second; } -bool Bag::open(string const& filename, BagMode mode) { +void Bag::open(string const& filename, BagMode mode) { mode_ = mode; switch (mode_) { - case bagmode::Read: return openRead(filename); - case bagmode::Write: return openWrite(filename); - case bagmode::Append: return openAppend(filename); + case bagmode::Read: openRead(filename); break; + case bagmode::Write: openWrite(filename); break; + case bagmode::Append: openAppend(filename); break; default: - ROS_ERROR("Unknown mode: %d", (int) mode); + throw BagException((format("Unknown mode: %1%") % (int) mode).str()); } - - return false; } -bool Bag::openRead(string const& filename) { - if (!file_.openRead(filename)) { - ROS_ERROR("Failed to open file: %s", filename.c_str()); - return false; - } +void Bag::openRead(string const& filename) { + if (!file_.openRead(filename)) + throw BagIOException((format("Failed to open file: %1%") % filename.c_str()).str()); - if (!readVersion()) - return false; + readVersion(); switch (version_) { - case 102: return startReadingVersion102(); - case 200: return startReadingVersion200(); + case 102: startReadingVersion102(); break; + case 200: startReadingVersion200(); break; default: - ROS_ERROR("Unsupported bag file version: %d.%d", getMajorVersion(), getMinorVersion()); - return false; + throw BagException((format("Unsupported bag file version: %1%.%2%") % getMajorVersion() % getMinorVersion()).str()); } - - return true; } -bool Bag::openWrite(string const& filename) { - if (!file_.openWrite(filename)) { - ROS_ERROR("Failed to open file: %s", filename.c_str()); - return false; - } +void Bag::openWrite(string const& filename) { + if (!file_.openWrite(filename)) + throw BagIOException("Failed to open file: " + filename); startWritingVersion200(); - return true; } -bool Bag::openAppend(string const& filename) { - if (!file_.openReadWrite(filename)) { - ROS_ERROR("Failed to open file: %s", filename.c_str()); - return false; - } +void Bag::openAppend(string const& filename) { + if (!file_.openReadWrite(filename)) + throw BagIOException("Failed to open file: " + filename); - // Read in the version and file header - if (!readVersion()) - return false; + readVersion(); startReadingVersion200(); // Truncate the file to chop off the index @@ -136,19 +125,17 @@ bool Bag::openAppend(string const& filename) { // Seek to the end of the file seek(0, std::ios::end); - - return true; } -void Bag::write(std::string const& topic, ros::Time const& time, ros::Message::ConstPtr msg) { +void Bag::write(string const& topic, Time const& time, ros::Message::ConstPtr msg) { write_(topic, time, *msg, msg->__connection_header); } -void Bag::write(std::string const& topic, ros::Time const& time, ros::Message const& msg) { +void Bag::write(string const& topic, Time const& time, ros::Message const& msg) { write_(topic, time, msg, msg.__connection_header); } -void Bag::write(std::string const& topic, ros::Time const& time, MessageInstance const& msg, boost::shared_ptr connection_header) { +void Bag::write(string const& topic, Time const& time, MessageInstance const& msg, shared_ptr connection_header) { write_(topic, time, msg, connection_header); } @@ -181,11 +168,23 @@ string Bag::getFileName() const { return file_.getFileName(); } BagMode Bag::getMode() const { return mode_; } uint64_t Bag::getOffset() const { return file_.getOffset(); } -void Bag::setChunkThreshold(uint32_t chunk_threshold) { chunk_threshold_ = chunk_threshold; } -uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; } +uint32_t Bag::getChunkThreshold() const { return chunk_threshold_; } -void Bag::setCompression(CompressionType compression) { compression_ = compression; } -CompressionType Bag::getCompression() const { return compression_; } +void Bag::setChunkThreshold(uint32_t chunk_threshold) { + if (file_.isOpen() && chunk_open_) + stopWritingChunk(); + + chunk_threshold_ = chunk_threshold; +} + +CompressionType Bag::getCompression() const { return compression_; } + +void Bag::setCompression(CompressionType compression) { + if (file_.isOpen() && chunk_open_) + stopWritingChunk(); + + compression_ = compression; +} // Version @@ -199,18 +198,15 @@ void Bag::writeVersion() { write(version); } -bool Bag::readVersion() { - // Read the version line +void Bag::readVersion() { string version_line = file_.getline(); file_header_pos_ = file_.getOffset(); char logtypename[100]; int version_major, version_minor; - if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3) { - ROS_ERROR("Error reading version line"); - return false; - } + if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3) + throw BagIOException("Error reading version line"); // Special case if (version_major == 0 && version_line[0] == '#') @@ -219,8 +215,6 @@ bool Bag::readVersion() { version_ = version_major * 100 + version_minor; ROS_DEBUG("Read VERSION: version=%d", version_); - - return true; } int Bag::getVersion() const { return version_; } @@ -240,40 +234,28 @@ void Bag::stopWritingVersion200() { stopWritingChunk(); index_data_pos_ = file_.getOffset(); - writeMessageDefinitionRecords(); + writeConnectionRecords(); writeChunkInfoRecords(); seek(file_header_pos_); writeFileHeaderRecord(); - - topic_infos_.clear(); } -bool Bag::startReadingVersion200() { - ROS_DEBUG("Reading in version 2.0 bag"); - +void Bag::startReadingVersion200() { // Read the file header record, which points to the end of the chunks - if (!readFileHeaderRecord()) - return false; + readFileHeaderRecord(); // Seek to the end of the chunks seek(index_data_pos_); - // Read the message definition records (one for each topic) - for (uint32_t i = 0; i < topic_count_; i++) { - if (!readMessageDefinitionRecord()) { - ROS_ERROR("Failed to read message definition record"); - return false; - } - } + // Read the connection records (one for each connection) + for (uint32_t i = 0; i < connection_count_; i++) + readConnectionRecord(); + top_connection_id_ = connection_count_; // Read the chunk info records - for (uint32_t i = 0; i < chunk_count_; i++) { - if (!readChunkInfoRecord()) { - ROS_ERROR("Failed to read chunk info record"); - return false; - } - } + for (uint32_t i = 0; i < chunk_count_; i++) + readChunkInfoRecord(); // Read the topic indexes foreach(ChunkInfo const& chunk_info, chunk_infos_) { @@ -283,30 +265,25 @@ bool Bag::startReadingVersion200() { // Skip over the chunk data ChunkHeader chunk_header; - if (!readChunkHeader(chunk_header)) - return false; + readChunkHeader(chunk_header); seek(chunk_header.compressed_size, std::ios::cur); // Read the topic index records after the chunk for (unsigned int i = 0; i < chunk_info.topic_counts.size(); i++) - if (!readTopicIndexRecord()) - return false; + readTopicIndexRecord(); } - // At this point we don't have a curr_chunkinfo anymore so we reset it + // At this point we don't have a curr_chunk_info anymore so we reset it curr_chunk_info_ = ChunkInfo(); - - return true; } -bool Bag::startReadingVersion102() { +void Bag::startReadingVersion102() { // @todo: handle 1.2 bags with no index ROS_DEBUG("Reading in version 1.2 bag"); // Read the file header record, which points to the start of the topic indexes - if (!readFileHeaderRecord()) - return false; + readFileHeaderRecord(); // Seek to the beginning of the topic index records seek(index_data_pos_); @@ -324,11 +301,8 @@ bool Bag::startReadingVersion102() { seek(first_entry.chunk_pos); - if (!readMessageDefinitionRecord()) - return false; + readMessageDefinitionRecord(); } - - return true; } // File header record @@ -336,18 +310,18 @@ bool Bag::startReadingVersion102() { void Bag::writeFileHeaderRecord() { boost::mutex::scoped_lock lock(record_mutex_); - topic_count_ = topic_infos_.size(); - chunk_count_ = chunk_infos_.size(); + connection_count_ = connection_infos_.size(); + chunk_count_ = chunk_infos_.size(); - ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu topic_count=%d chunk_count=%d", - (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, topic_count_, chunk_count_); + ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d", + (unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_); // Write file header record M_string header; - header[OP_FIELD_NAME] = toHeaderString(&OP_FILE_HEADER); - header[INDEX_POS_FIELD_NAME] = toHeaderString(&index_data_pos_); - header[TOPIC_COUNT_FIELD_NAME] = toHeaderString(&topic_count_); - header[CHUNK_COUNT_FIELD_NAME] = toHeaderString(&chunk_count_); + header[OP_FIELD_NAME] = toHeaderString(&OP_FILE_HEADER); + header[INDEX_POS_FIELD_NAME] = toHeaderString(&index_data_pos_); + header[CONNECTION_COUNT_FIELD_NAME] = toHeaderString(&connection_count_); + header[CHUNK_COUNT_FIELD_NAME] = toHeaderString(&chunk_count_); boost::shared_array header_buffer; uint32_t header_len; @@ -367,38 +341,31 @@ void Bag::writeFileHeaderRecord() { } } -bool Bag::readFileHeaderRecord() { +void Bag::readFileHeaderRecord() { ros::Header header; - uint32_t data_size; - if (!readHeader(header, data_size)) { - ROS_ERROR("Error reading FILE_HEADER record"); - return false; - } + uint32_t data_size; + if (!readHeader(header) || !readDataLength(data_size)) + throw BagFormatException("Error reading FILE_HEADER record"); M_string& fields = *header.getValues(); - if (!isOp(fields, OP_FILE_HEADER)) { - ROS_ERROR("Expected FILE_HEADER op not found"); - return false; - } + if (!isOp(fields, OP_FILE_HEADER)) + throw BagFormatException("Expected FILE_HEADER op not found"); // Read index position - if (!readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_)) - return false; + readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_); // Read topic and chunks count if (version_ >= 200) { - readField(fields, TOPIC_COUNT_FIELD_NAME, true, &topic_count_); - readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_); + readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_); + readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_); } - ROS_DEBUG("Read FILE_HEADER: index_pos=%llu topic_count=%d chunk_count=%d", - (unsigned long long) index_data_pos_, topic_count_, chunk_count_); + ROS_DEBUG("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d", + (unsigned long long) index_data_pos_, connection_count_, chunk_count_); // Skip the data section (just padding) seek(data_size, std::ios::cur); - - return true; } uint32_t Bag::getChunkOffset() const { @@ -430,24 +397,11 @@ void Bag::stopWritingChunk() { // Add this chunk to the index chunk_infos_.push_back(curr_chunk_info_); - // We push these into the master index as we go. - /* - for (map >::const_iterator i = curr_chunk_topic_indexes_.begin(); i != curr_chunk_topic_indexes_.end(); i++) { - foreach(IndexEntry const& e, i->second) { - ROS_DEBUG("adding to topic index: %s -> %llu:%d", i->first.c_str(), (unsigned long long) e.chunk_pos, e.offset); - std::multiset& index = topic_indexes_[i->first]; - index.insert(index.end(), e); - } - } - */ - // Get the uncompressed and compressed sizes uint32_t uncompressed_size = getChunkOffset(); file_.setWriteMode(compression::None); uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_; - ROS_DEBUG("<<< END CHUNK: uncompressed = %d compressed = %d", uncompressed_size, compressed_size); - // Rewrite the chunk header with the size of the chunk (remembering current offset) uint64_t end_of_chunk_pos = file_.getOffset(); seek(curr_chunk_info_.pos); @@ -479,25 +433,25 @@ void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK); header[COMPRESSION_FIELD_NAME] = chunk_header.compression; header[SIZE_FIELD_NAME] = toHeaderString(&chunk_header.uncompressed_size); + writeHeader(header); - writeHeader(header, chunk_header.compressed_size); + writeDataLength(chunk_header.compressed_size); } -bool Bag::readChunkHeader(ChunkHeader& chunk_header) { +void Bag::readChunkHeader(ChunkHeader& chunk_header) { ros::Header header; - if (!readHeader(header, chunk_header.compressed_size)) - return false; + if (!readHeader(header) || !readDataLength(chunk_header.compressed_size)) + throw BagFormatException("Error reading CHUNK record"); M_string& fields = *header.getValues(); - if (!isOp(fields, OP_CHUNK) || - !readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression) || - !readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size)) - return false; + if (!isOp(fields, OP_CHUNK)) + throw BagFormatException("Expected CHUNK op not found"); + + readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression); + readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size); ROS_DEBUG("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size); - - return true; } // Topic index records @@ -516,9 +470,9 @@ void Bag::writeTopicIndexRecords() { header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION); uint32_t topic_index_size = topic_index.size(); header[COUNT_FIELD_NAME] = toHeaderString(&topic_index_size); - - uint32_t data_len = topic_index_size * sizeof(IndexEntry); - writeHeader(header, data_len); + writeHeader(header); + + writeDataLength(topic_index_size * sizeof(IndexEntry)); ROS_DEBUG("Writing INDEX_DATA: topic=%s ver=%d count=%d", topic.c_str(), INDEX_VERSION, topic_index_size); @@ -533,42 +487,35 @@ void Bag::writeTopicIndexRecords() { } } -bool Bag::readTopicIndexRecord() { +void Bag::readTopicIndexRecord() { ros::Header header; uint32_t data_size; - if (!readHeader(header, data_size)) { - ROS_ERROR("Error reading INDEX_DATA header"); - return false; - } + if (!readHeader(header) || !readDataLength(data_size)) + throw BagFormatException("Error reading INDEX_DATA header"); M_string& fields = *header.getValues(); - if (!isOp(fields, OP_INDEX_DATA)) { - ROS_ERROR("Expected INDEX_DATA record"); - return false; - } + if (!isOp(fields, OP_INDEX_DATA)) + throw BagFormatException("Expected INDEX_DATA record"); uint32_t index_version; string topic; uint32_t count; - if (!readField(fields, VER_FIELD_NAME, true, &index_version) || - !readField(fields, TOPIC_FIELD_NAME, true, topic) || - !readField(fields, COUNT_FIELD_NAME, true, &count)) - return false; + readField(fields, VER_FIELD_NAME, true, &index_version); + readField(fields, TOPIC_FIELD_NAME, true, topic); + readField(fields, COUNT_FIELD_NAME, true, &count); ROS_DEBUG("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count); switch (index_version) { case 0: return readTopicIndexDataVersion0(count, topic); case 1: return readTopicIndexDataVersion1(count, topic, curr_chunk_info_.pos); - default: { - ROS_ERROR("Unsupported INDEX_DATA version: %d", index_version); - return false; - } + default: + throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str()); } } //! Store the position of the message in the chunk_pos field -bool Bag::readTopicIndexDataVersion0(uint32_t count, string const& topic) { +void Bag::readTopicIndexDataVersion0(uint32_t count, string const& topic) { // Read the index entry records multiset& topic_index = topic_indexes_[topic]; for (uint32_t i = 0; i < count; i++) { @@ -585,11 +532,9 @@ bool Bag::readTopicIndexDataVersion0(uint32_t count, string const& topic) { topic_index.insert(topic_index.end(), index_entry); } - - return true; } -bool Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64_t chunk_pos) { +void Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64_t chunk_pos) { // Read the index entry records multiset& topic_index = topic_indexes_[topic]; for (uint32_t i = 0; i < count; i++) { @@ -606,67 +551,108 @@ bool Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64 topic_index.insert(topic_index.end(), index_entry); } +} - return true; +// Connection records + +void Bag::writeConnectionRecords() { + boost::mutex::scoped_lock lock(record_mutex_); + + for (map::const_iterator i = connection_infos_.begin(); i != connection_infos_.end(); i++) { + ConnectionInfo const* connection_info = i->second; + writeConnectionRecord(connection_info); + } +} + +void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) { + ROS_DEBUG("Writing CONNECTION [%llu:%d]: topic=%s id=%d", + (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id); + + M_string header; + header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION); + header[TOPIC_FIELD_NAME] = connection_info->topic; + header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id); + writeHeader(header); + + writeHeader(connection_info->header); +} + +void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) { + M_string header; + header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION); + header[TOPIC_FIELD_NAME] = connection_info->topic; + header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id); + appendHeaderToBuffer(buf, header); + + appendHeaderToBuffer(buf, connection_info->header); +} + +void Bag::readConnectionRecord() { + ros::Header header; + if (!readHeader(header)) + throw BagFormatException("Error reading CONNECTION header"); + M_string& fields = *header.getValues(); + + if (!isOp(fields, OP_CONNECTION)) + throw BagFormatException("Expected CONNECTION op not found"); + + string topic; + uint32_t id; + readField(fields, CONNECTION_FIELD_NAME, true, &id); + readField(fields, TOPIC_FIELD_NAME, true, topic); + + ros::Header connection_header; + if (!readHeader(connection_header)) + throw BagFormatException("Error reading connection header"); + + // If this is a new connection, update connection_infos_ + map::iterator key = connection_infos_.find(id); + if (key == connection_infos_.end()) { + ConnectionInfo* connection_info = new ConnectionInfo(); + connection_info->id = id; + connection_info->topic = topic; + for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++) + connection_info->header[i->first] = i->second; + connection_infos_[id] = connection_info; + + ROS_DEBUG("Read CONNECTION: topic=%s id=%d", topic.c_str(), id); + + // If this is a new topic, update topic_infos_ + map::iterator key = topic_infos_.find(topic); + if (key == topic_infos_.end()) { + string msg_def = connection_info->header["message_definition"]; + string datatype = connection_info->header["datatype"]; + string md5sum = connection_info->header["md5sum"]; + + TopicInfo* topic_info = new TopicInfo(); + topic_info->topic = topic; + topic_info->msg_def = msg_def; + topic_info->datatype = datatype; + topic_info->md5sum = md5sum; + topic_infos_[topic] = topic_info; + + ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str()); + } + } } // Message definition records -void Bag::writeMessageDefinitionRecords() { - boost::mutex::scoped_lock lock(record_mutex_); - - for (map::const_iterator i = topic_infos_.begin(); i != topic_infos_.end(); i++) { - TopicInfo const* topic_info = i->second; - writeMessageDefinitionRecord(topic_info); - } -} - -void Bag::writeMessageDefinitionRecord(TopicInfo const* topic_info) { - ROS_DEBUG("Writing MSG_DEF [%llu:%d]: topic=%s md5sum=%s type=%s def=...", - (unsigned long long) file_.getOffset(), getChunkOffset(), topic_info->topic.c_str(), topic_info->md5sum.c_str(), topic_info->datatype.c_str()); - - M_string header; - header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DEF); - header[TOPIC_FIELD_NAME] = topic_info->topic; - header[MD5_FIELD_NAME] = topic_info->md5sum; - header[TYPE_FIELD_NAME] = topic_info->datatype; - header[DEF_FIELD_NAME] = topic_info->msg_def; - - writeHeader(header, 0); -} - -void Bag::appendMessageDefinitionRecordToBuffer(Buffer& buf, TopicInfo const* topic_info) -{ - M_string header; - header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DEF); - header[TOPIC_FIELD_NAME] = topic_info->topic; - header[MD5_FIELD_NAME] = topic_info->md5sum; - header[TYPE_FIELD_NAME] = topic_info->datatype; - header[DEF_FIELD_NAME] = topic_info->msg_def; - - appendHeaderToBuffer(buf, header, 0); -} - -bool Bag::readMessageDefinitionRecord() { +void Bag::readMessageDefinitionRecord() { ros::Header header; - uint32_t data_size; - if (!readHeader(header, data_size)) { - ROS_ERROR("Error reading message definition header"); - return false; - } + uint32_t data_size; + if (!readHeader(header) || !readDataLength(data_size)) + throw BagFormatException("Error reading message definition header"); M_string& fields = *header.getValues(); - if (!isOp(fields, OP_MSG_DEF)) { - ROS_ERROR("Expected MSG_DEF op not found"); - return false; - } + if (!isOp(fields, OP_MSG_DEF)) + throw BagFormatException("Expected MSG_DEF op not found"); string topic, md5sum, datatype, message_definition; - if (!readField(fields, TOPIC_FIELD_NAME, true, topic) || - !readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum) || - !readField(fields, TYPE_FIELD_NAME, true, datatype) || - !readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition)) - return false; + readField(fields, TOPIC_FIELD_NAME, true, topic); + readField(fields, MD5_FIELD_NAME, 32, 32, true, md5sum); + readField(fields, TYPE_FIELD_NAME, true, datatype); + readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition); map::iterator key = topic_infos_.find(topic); if (key == topic_infos_.end()) { @@ -677,59 +663,42 @@ bool Bag::readMessageDefinitionRecord() { topic_info->md5sum = md5sum; topic_infos_[topic] = topic_info; - ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s def=...", topic.c_str(), md5sum.c_str(), datatype.c_str()); + ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str()); } - - return true; } -bool Bag::decompressChunk(uint64_t chunk_pos) -{ - - if (curr_chunk_info_.pos == chunk_pos) - { +void Bag::decompressChunk(uint64_t chunk_pos) { + if (curr_chunk_info_.pos == chunk_pos) { current_buffer_ = &outgoing_chunk_buffer_; - return true; + return; } - else - { + else { current_buffer_ = &decompress_buffer_; } if (decompressed_chunk_ == chunk_pos) - return true; + return; // Seek to the start of the chunk seek(chunk_pos); // Read the chunk header ChunkHeader chunk_header; - if (!readChunkHeader(chunk_header)) - return false; - - bool ret = false; + readChunkHeader(chunk_header); // Read and decompress the chunk. These assume we are at the right place in the stream already if (chunk_header.compression == COMPRESSION_NONE) - { - ret = decompressRawChunk(chunk_header); - } else if (chunk_header.compression == COMPRESSION_BZ2) - { - ret = decompressBz2Chunk(chunk_header); - } else { - ROS_ERROR("Unknown compression: %s", chunk_header.compression.c_str()); - } + decompressRawChunk(chunk_header); + else if (chunk_header.compression == COMPRESSION_BZ2) + decompressBz2Chunk(chunk_header); + else + throw BagFormatException("Unknown compression: " + chunk_header.compression); - if (ret) - decompressed_chunk_ = chunk_pos; - - return ret; + decompressed_chunk_ = chunk_pos; } - // Reading this into a buffer isn't completely necessary, but we do it anyways for now -bool Bag::decompressRawChunk(ChunkHeader const& chunk_header) -{ +void Bag::decompressRawChunk(ChunkHeader const& chunk_header) { assert(chunk_header.compression == COMPRESSION_NONE); assert(chunk_header.compressed_size == chunk_header.uncompressed_size); @@ -738,11 +707,10 @@ bool Bag::decompressRawChunk(ChunkHeader const& chunk_header) decompress_buffer_.setSize(chunk_header.compressed_size); file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size); - return true; + // todo check read was successful } -bool Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) -{ +void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) { assert(chunk_header.compression == COMPRESSION_BZ2); CompressionType compression = compression::BZ2; @@ -755,11 +723,10 @@ bool Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) decompress_buffer_.setSize(chunk_header.uncompressed_size); file_.decompress(compression, decompress_buffer_.getData(), decompress_buffer_.getSize(), chunk_buffer_.getData(), chunk_buffer_.getSize()); - return true; + // todo check read was successful } -ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) -{ +ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) { ros::Header header; uint32_t data_size; uint32_t bytes_read; @@ -770,18 +737,12 @@ ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read); return header; default: - ROS_FATAL("Unhandled version: %d", version_); - break; + throw BagFormatException((format("Unhandled version: %1%") % version_).str()); } - - return ros::Header(); } - -// This actually loads the header, which shouldn't be necessary. Not -// the end of the world for now. -uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) -{ +// NOTE: this loads the header, which is unnecessary +uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) { ros::Header header; uint32_t data_size; uint32_t bytes_read; @@ -793,17 +754,14 @@ uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read); return data_size; - default: - ROS_FATAL("Unhandled version: %d", version_); - break; - } - return 0; + default: + throw BagFormatException((format("Unhandled version: %1%") % version_).str()); + } } -// Will fill this in later /* - bool Bag::loadMessageDataRecord102(string const& topic, uint64_t offset) { +bool Bag::loadMessageDataRecord102(string const& topic, uint64_t offset) { ROS_DEBUG("loadMessageDataRecord: offset=%llu", (unsigned long long) offset); seek(offset); @@ -860,7 +818,9 @@ void Bag::writeChunkInfoRecords() { chunk_info.end_time.sec, chunk_info.end_time.nsec, data_len); - writeHeader(header, data_len); + writeHeader(header); + + writeDataLength(data_len); // Write the topic names and counts for (map::const_iterator i = chunk_info.topic_counts.begin(); i != chunk_info.topic_counts.end(); i++) { @@ -878,30 +838,30 @@ void Bag::writeChunkInfoRecords() { } } -bool Bag::readChunkInfoRecord() { +void Bag::readChunkInfoRecord() { // Read a CHUNK_INFO header ros::Header header; uint32_t data_size; - if (!readHeader(header, data_size)) - return false; + if (!readHeader(header) || !readDataLength(data_size)) + throw BagFormatException("Error reading CHUNK_INFO record header"); M_string& fields = *header.getValues(); if (!isOp(fields, OP_CHUNK_INFO)) - return false; + throw BagFormatException("Expected CHUNK_INFO op not found"); // Check that the chunk info version is current uint32_t chunk_info_version; - if (!readField(fields, VER_FIELD_NAME, true, &chunk_info_version)) - return false; - assert(chunk_info_version == CHUNK_INFO_VERSION); + readField(fields, VER_FIELD_NAME, true, &chunk_info_version); + if (chunk_info_version != CHUNK_INFO_VERSION) + throw BagFormatException((format("Expected CHUNK_INFO version %1%, read %2%") % CHUNK_INFO_VERSION % chunk_info_version).str()); // Read the chunk position, timestamp, and topic count fields ChunkInfo chunk_info; uint32_t chunk_topic_count; - if (!readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos) || - !readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time) || - !readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time) || - !readField(fields, COUNT_FIELD_NAME, true, &chunk_topic_count)) - return false; + readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos); + readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time); + readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time); + readField(fields, COUNT_FIELD_NAME, true, &chunk_topic_count); + ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu topic_count=%d start=%d.%d end=%d.%d", (unsigned long long) chunk_info.pos, chunk_topic_count, chunk_info.start_time.sec, chunk_info.start_time.nsec, @@ -924,46 +884,52 @@ bool Bag::readChunkInfoRecord() { } chunk_infos_.push_back(chunk_info); - - return true; } // Record I/O bool Bag::isOp(M_string& fields, uint8_t reqOp) { uint8_t op; - return readField(fields, OP_FIELD_NAME, true, &op) && (op == reqOp); + readField(fields, OP_FIELD_NAME, true, &op); + return op == reqOp; } -void Bag::writeHeader(M_string const& fields, uint32_t data_len) { +void Bag::writeHeader(M_string const& fields) { boost::shared_array header_buffer; uint32_t header_len; ros::Header::write(fields, header_buffer, header_len); - write((char*) &header_len, 4); write((char*) header_buffer.get(), header_len); +} +void Bag::writeDataLength(uint32_t data_len) { write((char*) &data_len, 4); } -void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields, uint32_t data_len) { +void Bag::appendHeaderToBuffer(Buffer& buf, M_string const& fields) { boost::shared_array header_buffer; uint32_t header_len; ros::Header::write(fields, header_buffer, header_len); uint32_t offset = buf.getSize(); - buf.setSize(buf.getSize() + header_len + 8); + buf.setSize(buf.getSize() + 4 + header_len); memcpy(buf.getData() + offset, &header_len, 4); offset += 4; memcpy(buf.getData() + offset, header_buffer.get(), header_len); - offset += header_len; +} + +void Bag::appendDataLengthToBuffer(Buffer& buf, uint32_t data_len) { + uint32_t offset = buf.getSize(); + + buf.setSize(buf.getSize() + 4); + memcpy(buf.getData() + offset, &data_len, 4); } //! \todo clean this up -bool Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) { +void Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) { ROS_ASSERT(buffer.getSize() > 8); uint8_t* start = (uint8_t*) buffer.getData() + offset; @@ -979,7 +945,7 @@ bool Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& hea string error_msg; bool parsed = header.parse(ptr, header_len, error_msg); if (!parsed) - return false; + throw BagFormatException("Error parsing header"); ptr += header_len; // Read the data size @@ -987,35 +953,28 @@ bool Bag::readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& hea ptr += 4; bytes_read = ptr - start; - - return true; } - -bool Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) -{ +void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& total_bytes_read) { total_bytes_read = 0; uint8_t op; do { ROS_DEBUG("reading header from buffer: offset=%d", offset); uint32_t bytes_read; - if (!readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read)) - return false; + readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read); + offset += bytes_read; total_bytes_read += bytes_read; - if (!readField(*header.getValues(), OP_FIELD_NAME, true, &op)) - return false; + readField(*header.getValues(), OP_FIELD_NAME, true, &op); } while (op == OP_MSG_DEF); - - assert(op == OP_MSG_DATA); - return true; + if (op != OP_MSG_DATA) + throw BagFormatException("Expected MSG_DATA op not found"); } - -bool Bag::readHeader(ros::Header& header, uint32_t& data_size) { +bool Bag::readHeader(ros::Header& header) { // Read the header length uint32_t header_len; read((char*) &header_len, 4); @@ -1030,9 +989,11 @@ bool Bag::readHeader(ros::Header& header, uint32_t& data_size) { if (!parsed) return false; - // Read the data size - read((char*) &data_size, 4); + return true; +} +bool Bag::readDataLength(uint32_t& data_size) { + read((char*) &data_size, 4); return true; } @@ -1040,37 +1001,29 @@ M_string::const_iterator Bag::checkField(M_string const& fields, string const& f M_string::const_iterator fitr = fields.find(field); if (fitr == fields.end()) { if (required) - ROS_ERROR("Required '%s' field missing", field.c_str()); - } - else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len)) { - ROS_ERROR("Field '%s' is wrong size (%u bytes)", field.c_str(), (uint32_t) fitr->second.size()); - return fields.end(); + throw BagFormatException("Required '" + field + "' field missing"); } + else if ((fitr->second.size() < min_len) || (fitr->second.size() > max_len)) + throw BagFormatException((format("Field '%1%' is wrong size (%2% bytes)") % field % (uint32_t) fitr->second.size()).str()); return fitr; } -bool Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) { - return readField(fields, field_name, 1, UINT_MAX, true, data); +void Bag::readField(M_string const& fields, string const& field_name, bool required, string& data) { + readField(fields, field_name, 1, UINT_MAX, true, data); } -bool Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) { - M_string::const_iterator i; - if ((i = checkField(fields, field_name, min_len, max_len, required)) == fields.end()) - return false; - data = i->second; - return true; +void Bag::readField(M_string const& fields, string const& field_name, unsigned int min_len, unsigned int max_len, bool required, string& data) { + data = checkField(fields, field_name, min_len, max_len, required)->second; } -bool Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) { +void Bag::readField(M_string const& fields, string const& field_name, bool required, Time& data) { uint64_t packed_time; - if (!readField(fields, field_name, required, &packed_time)) - return false; + readField(fields, field_name, required, &packed_time); uint64_t bitmask = (1LL << 33) - 1; data.sec = (uint32_t) (packed_time & bitmask); data.nsec = (uint32_t) (packed_time >> 32); - return true; } std::string Bag::toHeaderString(Time const* field) { @@ -1096,8 +1049,12 @@ void Bag::dump() { std::cout << " topic: " << i->first << std::endl; std::cout << "chunk_infos:" << std::endl; - for (vector::const_iterator i = chunk_infos_.begin(); i != chunk_infos_.end(); i++) + for (vector::const_iterator i = chunk_infos_.begin(); i != chunk_infos_.end(); i++) { std::cout << " chunk: " << (*i).topic_counts.size() << " topics" << std::endl; + for (map::const_iterator j = (*i).topic_counts.begin(); j != (*i).topic_counts.end(); j++) { + std::cout << " - " << j->first << ": " << j->second << std::endl; + } + } std::cout << "topic_indexes:" << std::endl; for (map >::const_iterator i = topic_indexes_.begin(); i != topic_indexes_.end(); i++) { diff --git a/tools/rosbag/src/message_instance.cpp b/tools/rosbag/src/message_instance.cpp index da01d211..5402b3ec 100644 --- a/tools/rosbag/src/message_instance.cpp +++ b/tools/rosbag/src/message_instance.cpp @@ -53,6 +53,7 @@ bool MessageInstance::getLatching() const { return false; } +// @todo: this should cache the header std::string MessageInstance::getCallerId() const { ros::Header header = bag_->readMessageDataHeader(index_entry_); ros::M_string& fields = *header.getValues(); diff --git a/tools/rosbag/src/player.cpp b/tools/rosbag/src/player.cpp index d482f94a..69baba26 100644 --- a/tools/rosbag/src/player.cpp +++ b/tools/rosbag/src/player.cpp @@ -117,8 +117,7 @@ void Player::publish() { ROS_INFO("Opening %s", filename.c_str()); shared_ptr bag(new Bag); - if (!bag->open(filename, bagmode::Read)) - throw Exception((boost::format("Error opening file: %1%") % filename.c_str()).str()); + bag->open(filename, bagmode::Read); bags_.push_back(bag); } diff --git a/tools/rosbag/src/query.cpp b/tools/rosbag/src/query.cpp index 5cf58e5c..8e443160 100644 --- a/tools/rosbag/src/query.cpp +++ b/tools/rosbag/src/query.cpp @@ -53,30 +53,70 @@ ros::Time Query::getEndTime() const { return end_time_; } bool Query::evaluate(TopicInfo const*) const { return true; } +Query* Query::clone() const { return new Query(*this); } + // TopicQuery +TopicQuery::TopicQuery(std::string const& topic, ros::Time const& start_time, ros::Time const& end_time) + : Query(start_time, end_time) +{ + topics_.push_back(topic); +} + TopicQuery::TopicQuery(std::vector const& topics, ros::Time const& start_time, ros::Time const& end_time) : Query(start_time, end_time), topics_(topics) { } bool TopicQuery::evaluate(TopicInfo const* info) const { - foreach(string const& topic, topics_) if (topic == info->topic) - { return true; - } return false; } +Query* TopicQuery::clone() const { return new TopicQuery(*this); } + +// TypeQuery + +TypeQuery::TypeQuery(std::string const& type, ros::Time const& start_time, ros::Time const& end_time) + : Query(start_time, end_time) +{ + types_.push_back(type); +} + +TypeQuery::TypeQuery(std::vector const& types, ros::Time const& start_time, ros::Time const& end_time) + : Query(start_time, end_time), types_(types) +{ +} + +bool TypeQuery::evaluate(TopicInfo const* info) const { + foreach(string const& type, types_) + if (type == info->datatype) + return true; + + return false; +} + +Query* TypeQuery::clone() const { return new TypeQuery(*this); } + +// BagQuery + +BagQuery::BagQuery(Bag* _bag, Query const& _query, uint32_t _bag_revision) : bag(_bag), bag_revision(_bag_revision) { + query = _query.clone(); +} + +BagQuery::~BagQuery() { + delete query; +} + // MessageRange MessageRange::MessageRange(std::multiset::const_iterator const& _begin, - std::multiset::const_iterator const& _end, - TopicInfo const* _topic_info, - BagQuery const* _bag_query) + std::multiset::const_iterator const& _end, + TopicInfo const* _topic_info, + BagQuery const* _bag_query) : begin(_begin), end(_end), topic_info(_topic_info), bag_query(_bag_query) { } diff --git a/tools/rosbag/src/recorder.cpp b/tools/rosbag/src/recorder.cpp index d567d6f6..2b005996 100644 --- a/tools/rosbag/src/recorder.cpp +++ b/tools/rosbag/src/recorder.cpp @@ -279,8 +279,11 @@ void Recorder::startWriting() { bag_.setCompression(options_.compression); updateFilenames(); - if (!bag_.open(write_filename_, bagmode::Write)) { - ROS_FATAL("Could not open output file: %s", write_filename_.c_str()); + try { + bag_.open(write_filename_, bagmode::Write); + } + catch (rosbag::BagException e) { + ROS_ERROR(e.what()); exit_code_ = 1; ros::shutdown(); } @@ -360,19 +363,22 @@ void Recorder::doRecordSnapshotter() { string target_filename = out_queue.filename; string write_filename = target_filename + string(".active"); - if (!bag_.open(write_filename, bagmode::Write)) { - ROS_ERROR("Could not open file: %s", out_queue.filename.c_str()); + try { + bag_.open(write_filename, bagmode::Write); } - else { - while (!out_queue.queue->empty()) { - OutgoingMessage out = out_queue.queue->front(); - out_queue.queue->pop(); - - bag_.write(out.topic, out.time, out.msg); - } - - stopWriting(); + catch (rosbag::BagException ex) { + ROS_ERROR(ex.what()); + return; } + + while (!out_queue.queue->empty()) { + OutgoingMessage out = out_queue.queue->front(); + out_queue.queue->pop(); + + bag_.write(out.topic, out.time, out.msg); + } + + stopWriting(); } } diff --git a/tools/rosbag/src/rosbag/bag.py b/tools/rosbag/src/rosbag/bag.py index 8584f4a2..6bb2743e 100644 --- a/tools/rosbag/src/rosbag/bag.py +++ b/tools/rosbag/src/rosbag/bag.py @@ -118,16 +118,17 @@ class Bag(object): self._reader = None - self._topic_infos = {} # topic -> TopicInfo + self._topic_infos = {} # topic -> TopicInfo self._file_header_pos = None - self._index_data_pos = 0 # (1.2+) - self._topic_indexes = {} # topic -> IndexEntry[] (1.2+) + self._index_data_pos = 0 # (1.2+) + self._topic_indexes = {} # topic -> IndexEntry[] (1.2+) - self._topic_count = 0 # (2.0) - self._chunk_count = 0 # (2.0) - self._chunk_infos = [] # ChunkInfo[] (2.0) - self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0) + self._connection_infos = {} # topic -> ConnectionInfo + self._connection_count = 0 # (2.0) + self._chunk_count = 0 # (2.0) + self._chunk_infos = [] # ChunkInfo[] (2.0) + self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0) self._buffer = StringIO() @@ -281,8 +282,10 @@ class Bag(object): if not self._chunk_open: self._start_writing_chunk(t) - # Write message definition record, if necessary - if topic not in self._topic_infos: + # Write connection record, if necessary + if topic not in self._connection_infos: + conn_id = len(self._connection_infos) + if raw: msg_type, serialized_bytes, md5sum, pytype = msg @@ -298,12 +301,16 @@ class Bag(object): print >> sys.stderr, 'WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type #raise ROSBagException('md5sum of loaded type does not match that of data being recorded') - topic_info = _TopicInfo(topic, msg_type, md5sum, pytype._full_text) + header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text } else: - topic_info = _TopicInfo(topic, msg.__class__._type, msg.__class__._md5sum, msg._full_text) - - self._write_message_definition_record(topic_info) - self._topic_infos[topic] = topic_info + header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text } + + connection_info = _ConnectionInfo(conn_id, topic, header) + + self._write_connection_record(connection_info) + + self._connection_infos[topic] = connection_info + self._topic_infos[topic] = connection_info.toTopicInfo() # Update the current chunk's topic index index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset()) @@ -558,7 +565,7 @@ class Bag(object): elif self._version == 102: # Get the op code of the first record. If it's FILE_HEADER, then we have an indexed 1.2 bag. first_record_pos = self._file.tell() - header = _read_record_header(self._file) + header = _read_header(self._file) op = _read_uint8_field(header, 'op') self._file.seek(first_record_pos) @@ -601,9 +608,9 @@ class Bag(object): # Remember this location as the start of the index self._index_data_pos = self._file.tell() - # Write topic infos - for topic_info in self._topic_infos.values(): - self._write_message_definition_record(topic_info) + # Write connection infos + for connection_info in self._connection_infos.values(): + self._write_connection_record(connection_info) # Write chunk infos for chunk_info in self._chunk_infos: @@ -687,25 +694,25 @@ class Bag(object): self._curr_compression = compression - def _write_file_header_record(self, index_pos, topic_count, chunk_count): + def _write_file_header_record(self, index_pos, connection_count, chunk_count): header = { 'op': _pack_uint8(_OP_FILE_HEADER), 'index_pos': _pack_uint64(index_pos), - 'topic_count': _pack_uint32(topic_count), + 'conn_count': _pack_uint32(connection_count), 'chunk_count': _pack_uint32(chunk_count) } _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH) - def _write_message_definition_record(self, topic_info): + def _write_connection_record(self, connection_info): header = { - 'op': _pack_uint8(_OP_MSG_DEF), - 'topic': topic_info.topic, - 'md5': topic_info.md5sum, - 'type': topic_info.datatype, - 'def': topic_info.msg_def + 'op': _pack_uint8(_OP_CONNECTION), + 'topic': connection_info.topic, + 'conn': _pack_uint32(connection_info.id) } - _write_record(self._output_file, header) + _write_header(self._output_file, header) + _write_header(self._output_file, connection_info.header) + def _write_message_data_record(self, topic, t, serialized_bytes): header = { 'op': _pack_uint8(_OP_MSG_DATA), @@ -720,7 +727,7 @@ class Bag(object): 'compression': chunk_header.compression, 'size': _pack_uint32(chunk_header.uncompressed_size) } - _write_record_header(self._file, header) + _write_header(self._file, header) self._file.write(_pack_uint32(chunk_header.compressed_size)) @@ -771,12 +778,26 @@ _OP_FILE_HEADER = 0x03 _OP_INDEX_DATA = 0x04 _OP_CHUNK = 0x05 _OP_CHUNK_INFO = 0x06 +_OP_CONNECTION = 0x07 _VERSION = '#ROSBAG V2.0' _FILE_HEADER_LENGTH = 4096 _INDEX_VERSION = 1 _CHUNK_INDEX_VERSION = 1 +class _ConnectionInfo(object): + def __init__(self, id, topic, header): + self.id = id + self.topic = topic + self.header = header + + def toTopicInfo(self): + h = self.header + return _TopicInfo(self.topic, h['type'], h['md5sum'], h['message_definition']) + + def __str__(self): + return '%d on %s: %s' % (self.id, self.topic, str(self.header)) + class _TopicInfo(object): def __init__(self, topic, datatype, md5sum, msg_def): self.topic = topic @@ -879,7 +900,7 @@ def _read_uint64_field(header, field): return _read_field(header, field, _unpack def _read_time_field (header, field): return _read_field(header, field, _unpack_time) def _write_record(f, header, data='', padded_size=None): - header_str = _write_record_header(f, header) + header_str = _write_header(f, header) if padded_size is not None: header_len = len(header_str) @@ -890,39 +911,39 @@ def _write_record(f, header, data='', padded_size=None): _write_sized(f, data) -def _write_record_header(f, header): +def _write_header(f, header): header_str = ''.join([_pack_uint32(len(k) + 1 + len(v)) + k + '=' + v for k, v in header.items()]) _write_sized(f, header_str) return header_str -def _read_record_header(f, req_op=None): +def _read_header(f, req_op=None): bag_pos = f.tell() - # Read record header + # Read header try: - record_header = _read_sized(f) + header = _read_sized(f) except ROSBagException, ex: - raise ROSBagFormatException('Error reading record header: %s' % str(ex)) + raise ROSBagFormatException('Error reading header: %s' % str(ex)) # Parse header into a dict header_dict = {} - while record_header != '': + while header != '': # Read size - if len(record_header) < 4: - raise ROSBagFormatException('Error reading record header field') - (size,) = struct.unpack(" iters; @@ -140,78 +139,49 @@ void View::addQuery(Bag& bag, Query const& query) { updateQueries(queries_.back()); } -void View::updateQueries(BagQuery* q) -{ +void View::updateQueries(BagQuery* q) { for (map::iterator i = q->bag->topic_infos_.begin(); i != q->bag->topic_infos_.end(); i++) { - // Skip if the query doesn't evaluate to true if (!q->query->evaluate(i->second)) continue; map >::iterator j = q->bag->topic_indexes_.find(i->second->topic); - // Skip if the bag doeesn't have the corresponding index + // Skip if the bag doesn't have the corresponding index if (j == q->bag->topic_indexes_.end()) continue; // lower_bound/upper_bound do a binary search to find the appropriate range of Index Entries given our time range std::multiset::const_iterator begin = std::lower_bound(j->second.begin(), j->second.end(), q->query->getStartTime(), IndexEntryCompare()); - std::multiset::const_iterator end = std::upper_bound(j->second.begin(), j->second.end(), q->query->getEndTime(), IndexEntryCompare()); + std::multiset::const_iterator end = std::upper_bound(j->second.begin(), j->second.end(), q->query->getEndTime(), IndexEntryCompare()); TopicInfo* topic_info = i->second; + // todo: this could be made faster with a map of maps bool found = false; - - // This could be made faster with a map of maps - for (vector::iterator k = ranges_.begin(); - k != ranges_.end(); - k++) - { + for (vector::iterator k = ranges_.begin(); k != ranges_.end(); k++) { MessageRange* r = *k; + // If the topic and query are already in our ranges, we update - if (r->bag_query == q && r->topic_info->topic == topic_info->topic) - { + if (r->bag_query == q && r->topic_info->topic == topic_info->topic) { r->begin = begin; - r->end = end; - found = true; + r->end = end; + found = true; break; } } - if (!found) ranges_.push_back(new MessageRange(begin, end, topic_info, q)); } - view_revision_ += 1; + view_revision_++; } +void View::update() { + // todo this can be completely skipped if the bag is read-only -void View::update() -{ - // TODO: This can be completely skipped if the bag is read-only - - foreach(BagQuery* query, queries_) - { - if (query->bag->bag_revision_ != query->bag_revision) - { - // ROS_DEBUG("Query has been outdated by bag -- re-evauating"); - // Deleting affected range - - /* - for (std::vector::iterator iter = ranges_.begin(); - iter != ranges_.end();) - { - if ((*iter)->bag_query == query) - { - // ROS_DEBUG("Erasing corresponding range"); - delete *iter; - iter = ranges_.erase(iter); - } else { - iter++; - } - } - */ - + foreach(BagQuery* query, queries_) { + if (query->bag->bag_revision_ != query->bag_revision) { updateQueries(query); query->bag_revision = query->bag->bag_revision_; } diff --git a/tools/rosbag/test/test_bag.py b/tools/rosbag/test/test_bag.py index 3844b538..c7ae9efd 100644 --- a/tools/rosbag/test/test_bag.py +++ b/tools/rosbag/test/test_bag.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python +# # test_bag.py PKG = 'rosbag' diff --git a/tools/rosbag/test/utest.cpp b/tools/rosbag/test/utest.cpp index 5166e767..f8f141a5 100644 --- a/tools/rosbag/test/utest.cpp +++ b/tools/rosbag/test/utest.cpp @@ -43,40 +43,6 @@ #define foreach BOOST_FOREACH -struct IndexEntryMultiSetCompare -{ - bool operator()(rosbag::IndexEntry const& a, rosbag::IndexEntry const& b) const { return a.time < b.time; } -}; - -TEST(rosbag, multiset_vs_vector) -{ - int count = 1000 * 1000; - - std::multiset s; - ros::Time start = ros::Time::now(); - for (int i = 0; i < count; i++) { - rosbag::IndexEntry e; - e.time = ros::Time::now(); - e.chunk_pos = 0; - e.offset = 0; - s.insert(s.end(), e); - } - ros::Time end = ros::Time::now(); - std::cout << "multiset: " << end - start << std::endl; - - std::vector v; - start = ros::Time::now(); - for (int i = 0; i < count; i++) { - rosbag::IndexEntry e; - e.time = ros::Time::now(); - e.chunk_pos = 0; - e.offset = 0; - v.push_back(e); - } - end = ros::Time::now(); - std::cout << "vector: " << end - start << std::endl; -} - class BagTest : public testing::Test { protected: @@ -161,7 +127,6 @@ TEST(rosbag, simpleread) view.addQuery(bag, rosbag::TopicQuery(topics)); foreach(rosbag::MessageInstance const m, view) { - std_msgs::String::ConstPtr s = m.instantiate(); if (s != NULL) ASSERT_EQ(s->data, std::string("foo")); @@ -184,21 +149,11 @@ TEST(rosbag, timequery) for (int i = 0; i < 1000; i++) { imsg.data = i; switch (rand() % 5) { - case 0: - outbag.write("t0", ros::Time(i, 0), imsg); - break; - case 1: - outbag.write("t1", ros::Time(i, 0), imsg); - break; - case 2: - outbag.write("t2", ros::Time(i, 0), imsg); - break; - case 3: - outbag.write("t2", ros::Time(i, 0), imsg); - break; - case 4: - outbag.write("t4", ros::Time(i, 0), imsg); - break; + case 0: outbag.write("t0", ros::Time(i, 0), imsg); break; + case 1: outbag.write("t1", ros::Time(i, 0), imsg); break; + case 2: outbag.write("t2", ros::Time(i, 0), imsg); break; + case 3: outbag.write("t2", ros::Time(i, 0), imsg); break; + case 4: outbag.write("t4", ros::Time(i, 0), imsg); break; } } outbag.close(); @@ -213,8 +168,7 @@ TEST(rosbag, timequery) foreach(rosbag::MessageInstance const m, view) { std_msgs::Int32::ConstPtr imsg = m.instantiate(); - if (imsg != NULL) - { + if (imsg != NULL) { ASSERT_EQ(imsg->data, i++); ASSERT_TRUE(m.getTime() < ros::Time(783,0)); } @@ -235,26 +189,11 @@ TEST(rosbag, topicquery) for (int i = 0; i < 1000; i++) { switch (rand() % 5) { - case 0: - imsg.data = j0++; - outbag.write("t0", ros::Time(i, 0), imsg); - break; - case 1: - imsg.data = j0++; - outbag.write("t1", ros::Time(i, 0), imsg); - break; - case 2: - imsg.data = j1++; - outbag.write("t2", ros::Time(i, 0), imsg); - break; - case 3: - imsg.data = j1++; - outbag.write("t3", ros::Time(i, 0), imsg); - break; - case 4: - imsg.data = j1++; - outbag.write("t4", ros::Time(i, 0), imsg); - break; + case 0: imsg.data = j0++; outbag.write("t0", ros::Time(i, 0), imsg); break; + case 1: imsg.data = j0++; outbag.write("t1", ros::Time(i, 0), imsg); break; + case 2: imsg.data = j1++; outbag.write("t2", ros::Time(i, 0), imsg); break; + case 3: imsg.data = j1++; outbag.write("t3", ros::Time(i, 0), imsg); break; + case 4: imsg.data = j1++; outbag.write("t4", ros::Time(i, 0), imsg); break; } } outbag.close(); @@ -272,9 +211,7 @@ TEST(rosbag, topicquery) foreach(rosbag::MessageInstance const m, view) { std_msgs::Int32::ConstPtr imsg = m.instantiate(); if (imsg != NULL) - { ASSERT_EQ(imsg->data, i++); - } } bag.close(); @@ -292,21 +229,11 @@ TEST(rosbag, verifymultibag) for (int i = 0; i < 1000; i++) { imsg.data = i; switch (rand() % 5) { - case 0: - outbag1.write("t0", ros::Time::now(), imsg); - break; - case 1: - outbag1.write("t1", ros::Time::now(), imsg); - break; - case 2: - outbag1.write("t2", ros::Time::now(), imsg); - break; - case 3: - outbag2.write("t0", ros::Time::now(), imsg); - break; - case 4: - outbag2.write("t1", ros::Time::now(), imsg); - break; + case 0: outbag1.write("t0", ros::Time::now(), imsg); break; + case 1: outbag1.write("t1", ros::Time::now(), imsg); break; + case 2: outbag1.write("t2", ros::Time::now(), imsg); break; + case 3: outbag2.write("t0", ros::Time::now(), imsg); break; + case 4: outbag2.write("t1", ros::Time::now(), imsg); break; } } @@ -373,11 +300,10 @@ TEST(rosbag, modifyview) rosbag::View::iterator iter = view.begin(); for (int i = 0; i < 50; i++) { - std_msgs::Int32::ConstPtr imsg = iter->instantiate (); - + std_msgs::Int32::ConstPtr imsg = iter->instantiate(); if (imsg != NULL) { ASSERT_EQ(imsg->data, j0); - j0+=2; + j0 += 2; } iter++; } @@ -385,23 +311,18 @@ TEST(rosbag, modifyview) // We now add our query, and expect it to show up view.addQuery(bag, rosbag::TopicQuery(t1)); - for (int i = 0; i < 50; i++) - { + for (int i = 0; i < 50; i++) { std_msgs::Int32::ConstPtr imsg = iter->instantiate(); - - if (imsg != NULL) - { + if (imsg != NULL) { ASSERT_EQ(imsg->data, j0); - j0+=2; + j0 += 2; } - iter++; - imsg = iter->instantiate(); - if (imsg != NULL) - { + imsg = iter->instantiate(); + if (imsg != NULL) { ASSERT_EQ(imsg->data, j1); - j1+=2; + j1 += 2; } iter++; } @@ -409,12 +330,9 @@ TEST(rosbag, modifyview) bag.close(); } - - TEST(rosbag, modifybag) { rosbag::Bag rwbag; - // Looks like mode is largely being ignored at the moment. rwbag.open("rwbag.bag", rosbag::bagmode::Write);// | rosbag::bagmode::Read); std::vector t0 = boost::assign::list_of("t0"); @@ -430,40 +348,38 @@ TEST(rosbag, modifybag) // Verify begin gets us to 5 rosbag::View::iterator iter1 = view.begin(); - std_msgs::Int32::ConstPtr imsg = iter1->instantiate (); + std_msgs::Int32::ConstPtr imsg = iter1->instantiate(); ASSERT_EQ(imsg->data, 5); - for (int i = 0; i < 5; i++) - { + for (int i = 0; i < 5; i++) { omsg.data = i; rwbag.write("t0", ros::Time(i + 1, 0), omsg); } // New iterator should be at 0 rosbag::View::iterator iter2 = view.begin(); - imsg = iter2->instantiate (); + imsg = iter2->instantiate(); ASSERT_EQ(imsg->data, 0); // Increment it once iter2++; // Output additional messages after time 5 - for (int i = 6; i < 10; i++) - { + for (int i = 6; i < 10; i++) { omsg.data = i; rwbag.write("t0", ros::Time(i + 1, 0), omsg); } // Iter2 should contain 1->10 for (int i = 1; i < 10; i++) { - imsg = iter2->instantiate (); + imsg = iter2->instantiate(); ASSERT_EQ(imsg->data, i); iter2++; } // Iter1 should contain 5->10 for (int i = 5; i < 10; i++) { - imsg = iter1->instantiate (); + imsg = iter1->instantiate(); ASSERT_EQ(imsg->data, i); iter1++; } @@ -471,8 +387,6 @@ TEST(rosbag, modifybag) rwbag.close(); } - - TEST_F(BagTest, WriteThenReadWorks) { std::string filename("test/WriteThenRead.bag"); @@ -492,11 +406,13 @@ TEST_F(BagTest, AppendWorks) { b1.open(filename, rosbag::bagmode::Write); b1.write("chatter", ros::Time::now(), foo_); b1.close(); + dumpContents(b1); rosbag::Bag b2; b2.open(filename, rosbag::bagmode::Append); b2.write("numbers", ros::Time::now(), i_); b2.close(); + dumpContents(b2); checkContents(filename); } @@ -518,7 +434,6 @@ TEST_F(BagTest, ReadAppendWorks) { checkContents(filename); } - TEST_F(BagTest, ChunkedFileWorks) { std::string filename("test/ChunkedFile.bag");