rosbag: changed format from MSG_DEF to CONNECTION messages; now storing connection header

This commit is contained in:
Tim Field 2010-04-28 02:38:29 +00:00
parent fc1a99c047
commit 5041f27204
16 changed files with 736 additions and 736 deletions

View File

@ -20,6 +20,9 @@ target_link_libraries(record rosbag)
rosbuild_add_executable(play src/play.cpp)
target_link_libraries(play rosbag)
rosbuild_add_executable(mytest test/mytest.cpp)
target_link_libraries(mytest rosbag)
INSTALL(TARGETS record play
RUNTIME DESTINATION $ENV{ROS_ROOT}/bin)

View File

@ -96,6 +96,17 @@ chunk_1
compression=bz2 # type of compression (optional)
size=<uncompressed bytes> # size of data when uncompressed (optional)
---
conn_1i
op=7
topic=topic
id=0
---
topic=topic
message_definition=
type=
md5sum=
latching=1
callerid=caller1234
msg_def_1i
op=1
topic=
@ -106,14 +117,16 @@ chunk_1
<empty>
msg_1
op=2
topic=
time=
topic=...
time=...
conn=...
---
<serialized msg>
msg_2
op=2
topic=
time=
topic=...
time=...
conn=...
---
<serialized msg>
@ -163,6 +176,13 @@ chunk_index_2.0
...
conn_1
op=7
id=0
---
latching=1
callerid=caller1234
msg_def_1
op=1
topic=

View File

@ -53,6 +53,7 @@
#include <set>
#include <stdexcept>
#include <boost/format.hpp>
#include <boost/iterator/iterator_facade.hpp>
#include <boost/thread/mutex.hpp>
@ -60,8 +61,8 @@ namespace rosbag {
namespace bagmode
{
//! The possible modes to open a bag in
enum BagMode
//! The possible modes to open a bag in
enum BagMode
{
Write = 0,
Read = 1,
@ -84,17 +85,16 @@ public:
Bag();
~Bag();
bool open(std::string const& filename, BagMode mode = bagmode::Default); //!< Open a bag file
void open(std::string const& filename, BagMode mode = bagmode::Default); //!< Open a bag file
void close(); //!< Close the bag file (write to disk, append index, etc.)
std::string getFileName() const; //!< Get the filename of the bag
BagMode getMode() const; //!< Get the mode the bag is in
int getVersion() const; //!< Get the version of the open bagfile
int getMajorVersion() const; //!< Get the major-version of the open bagfile
int getMinorVersion() const; //!< Get the minor-version of the open bagfile
uint64_t getOffset() const; //!< Get the offset into the actual file
std::string getFileName() const; //!< Get the filename of the bag
BagMode getMode() const; //!< Get the mode the bag is in
int getVersion() const; //!< Get the version of the open bagfile
int getMajorVersion() const; //!< Get the major-version of the open bagfile
int getMinorVersion() const; //!< Get the minor-version of the open bagfile
uint64_t getOffset() const; //!< Get the offset into the actual file
// Version 1.3 options
void setCompression(CompressionType compression); //!< Set the compression method to use for writing chunks
CompressionType getCompression() const; //!< Get the compression method to use for writing chunks
void setChunkThreshold(uint32_t chunk_threshold); //!< Set the threshold for creating new chunks
@ -135,14 +135,13 @@ public:
void dump();
private:
//This helper function actually does the write with an arbitary serializable message thing
// This helper function actually does the write with an arbitary serializable message thing
template<class T>
void write_(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> connection_header);
bool openRead (std::string const& filename);
bool openWrite (std::string const& filename);
bool openAppend(std::string const& filename);
void openRead (std::string const& filename);
void openWrite (std::string const& filename);
void openAppend(std::string const& filename);
void closeWrite();
@ -152,19 +151,19 @@ private:
void startWritingVersion200();
void stopWritingVersion200();
bool startReadingVersion102();
bool startReadingVersion200();
void startReadingVersion102();
void startReadingVersion200();
// Writing
void writeVersion();
void writeFileHeaderRecord();
void writeMessageDefinitionRecord(TopicInfo const* topic_info);
void appendMessageDefinitionRecordToBuffer(Buffer& buf, TopicInfo const* topic_info);
void writeConnectionRecord(ConnectionInfo const* connection_info);
void appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info);
template<class T>
void writeMessageDataRecord(std::string const& topic, ros::Time const& time, bool latching, std::string const& callerid, T const& msg);
void writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg);
void writeTopicIndexRecords();
void writeMessageDefinitionRecords();
void writeConnectionRecords();
void writeChunkInfoRecords();
void startWritingChunk(ros::Time time);
void writeChunkHeader(CompressionType compression, uint32_t compressed_size, uint32_t uncompressed_size);
@ -172,37 +171,41 @@ private:
// Reading
bool readVersion();
bool readFileHeaderRecord();
bool readMessageDefinitionRecord();
void readVersion();
void readFileHeaderRecord();
void readConnectionRecord();
void readMessageDefinitionRecord();
ros::Header readMessageDataHeader(IndexEntry const& index_entry);
uint32_t readMessageDataSize(IndexEntry const& index_entry);
// Would be nice not to have to template this on Stream. Also,
// we don't need to read the header here either. It just so
// happens to be the most efficient way to skip it at the moment.
template<typename Stream>
void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream);
bool readChunkHeader(ChunkHeader& chunk_header);
bool readTopicIndexRecord();
bool readTopicIndexDataVersion0(uint32_t count, std::string const& topic);
bool readTopicIndexDataVersion1(uint32_t count, std::string const& topic, uint64_t chunk_pos);
bool readChunkInfoRecord();
void readChunkHeader(ChunkHeader& chunk_header);
void readTopicIndexRecord();
void readTopicIndexDataVersion0(uint32_t count, std::string const& topic);
void readTopicIndexDataVersion1(uint32_t count, std::string const& topic, uint64_t chunk_pos);
void readChunkInfoRecord();
bool decompressChunk(uint64_t chunk_pos);
bool decompressRawChunk(ChunkHeader const& chunk_header);
bool decompressBz2Chunk(ChunkHeader const& chunk_header);
void decompressChunk(uint64_t chunk_pos);
void decompressRawChunk(ChunkHeader const& chunk_header);
void decompressBz2Chunk(ChunkHeader const& chunk_header);
uint32_t getChunkOffset() const;
// Record header I/O
void writeHeader(ros::M_string const& fields, uint32_t data_len);
void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields, uint32_t data_len);
bool readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read);
bool readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read);
bool readHeader(ros::Header& header, uint32_t& data_size);
void writeHeader(ros::M_string const& fields);
void writeDataLength(uint32_t data_len);
void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields);
void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len);
void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read);
void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read);
bool readHeader(ros::Header& header);
bool readDataLength(uint32_t& data_size);
bool isOp(ros::M_string& fields, uint8_t reqOp);
// Header fields
@ -213,12 +216,12 @@ private:
std::string toHeaderString(ros::Time const* field);
template<typename T>
bool readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data);
void readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data);
bool readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data);
bool readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data);
void readField(ros::M_string const& fields, std::string const& field_name, unsigned int min_len, unsigned int max_len, bool required, std::string& data);
void readField(ros::M_string const& fields, std::string const& field_name, bool required, std::string& data);
bool readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data);
void readField(ros::M_string const& fields, std::string const& field_name, bool required, ros::Time& data);
ros::M_string::const_iterator checkField(ros::M_string const& fields, std::string const& field,
unsigned int min_len, unsigned int max_len, bool required) const;
@ -240,7 +243,7 @@ private:
uint64_t file_header_pos_;
uint64_t index_data_pos_;
uint32_t topic_count_;
uint32_t connection_count_;
uint32_t chunk_count_;
boost::mutex record_mutex_;
@ -252,21 +255,26 @@ private:
uint64_t curr_chunk_data_pos_;
std::map<std::string, std::multiset<IndexEntry> > curr_chunk_topic_indexes_;
std::map<std::string, TopicInfo*> topic_infos_;
std::vector<ChunkInfo> chunk_infos_;
uint32_t top_connection_id_;
std::map<std::string, uint32_t> topic_connection_ids_;
std::map<ros::M_string*, uint32_t> header_connection_ids_;
std::map<uint32_t, ConnectionInfo*> connection_infos_;
std::map<std::string, TopicInfo*> topic_infos_;
std::vector<ChunkInfo> chunk_infos_;
std::map<std::string, std::multiset<IndexEntry> > topic_indexes_;
Buffer header_buffer_; //!< reusable buffer in which to assemble the record header before writing to file
Buffer record_buffer_; //!< reusable buffer in which to assemble the record data before writing to file
Buffer header_buffer_; //!< reusable buffer in which to assemble the record header before writing to file
Buffer record_buffer_; //!< reusable buffer in which to assemble the record data before writing to file
Buffer chunk_buffer_; //!< reusable buffer to read chunk into
Buffer decompress_buffer_; //!< reusable buffer to decompress chunks into
Buffer chunk_buffer_; //!< reusable buffer to read chunk into
Buffer decompress_buffer_; //!< reusable buffer to decompress chunks into
Buffer outgoing_chunk_buffer_; //!< reusable buffer to read chunk into
Buffer outgoing_chunk_buffer_; //!< reusable buffer to read chunk into
Buffer* current_buffer_;
uint64_t decompressed_chunk_; //!< position of decompressed chunk
uint64_t decompressed_chunk_; //!< position of decompressed chunk
};
}
@ -283,17 +291,13 @@ std::string Bag::toHeaderString(T const* field) {
}
template<typename T>
bool Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) {
ros::M_string::const_iterator i;
if ((i = checkField(fields, field_name, sizeof(T), sizeof(T), required)) == fields.end())
return false;
void Bag::readField(ros::M_string const& fields, std::string const& field_name, bool required, T* data) {
ros::M_string::const_iterator i = checkField(fields, field_name, sizeof(T), sizeof(T), required);
memcpy(data, i->second.data(), sizeof(T));
return true;
}
template<typename Stream>
void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream)
{
void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) {
ros::Header header;
uint32_t data_size;
uint32_t bytes_read;
@ -306,11 +310,9 @@ void Bag::readMessageDataIntoStream(IndexEntry const& index_entry, Stream& strea
if (data_size > 0)
memcpy(stream.advance(data_size), current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
return;
default:
ROS_FATAL("Unhandled version: %d", version_);
break;
default:
throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
}
}
@ -331,59 +333,78 @@ boost::shared_ptr<T const> Bag::instantiateBuffer(IndexEntry const& index_entry)
ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
ros::serialization::deserialize(s, *p);
}
break;
return p;
default:
ROS_FATAL("Unhandled version: %d", version_);
break;
throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
}
return p;
}
template<class T>
void Bag::write_(std::string const& topic, ros::Time const& time, T const& msg, boost::shared_ptr<ros::M_string> connection_header) {
// Whenever we write we increment our revision
bag_revision_++;
bool needs_def_written = false;
// @todo refactor into separate function
// Get ID for connection header
bool needs_conn_written = false;
ConnectionInfo* connection_info = NULL;
uint32_t conn_id = 0;
ros::M_string* header_address = connection_header.get();
if (header_address == NULL) {
// No connection header: we'll manufacture one, and store by topic.
std::map<std::string, uint32_t>::iterator topic_connection_ids_iter = topic_connection_ids_.find(topic);
if (topic_connection_ids_iter == topic_connection_ids_.end()) {
conn_id = top_connection_id_++;
topic_connection_ids_[topic] = conn_id;
// Flag that we need to write a connection record
needs_conn_written = true;
}
else {
conn_id = topic_connection_ids_iter->second;
connection_info = connection_infos_[conn_id];
}
}
else {
// Store the connection info by the address of the connection header
std::map<ros::M_string*, uint32_t>::iterator header_connection_ids_iter = header_connection_ids_.find(header_address);
if (header_connection_ids_iter == header_connection_ids_.end()) {
conn_id = top_connection_id_++;
header_connection_ids_[header_address] = conn_id;
// Flag that we need to write a connection record
needs_conn_written = true;
}
else {
conn_id = header_connection_ids_iter->second;
connection_info = connection_infos_[conn_id];
}
}
// Store the topic info
TopicInfo* topic_info;
{
boost::mutex::scoped_lock lock(topic_infos_mutex_);
std::map<std::string, TopicInfo*>::iterator key = topic_infos_.find(topic);
if (key == topic_infos_.end()) {
// Extract the topic info from the message
topic_info = new TopicInfo();
topic_info->topic = topic;
topic_info->msg_def = ros::message_traits::definition(msg);
topic_info->datatype = ros::message_traits::datatype(msg);
topic_info->md5sum = ros::message_traits::md5sum(msg);
topic_info->msg_def = ros::message_traits::definition(msg);
topic_infos_[topic] = topic_info;
// Initialize the topic index
topic_indexes_[topic] = std::multiset<IndexEntry>();
// Flag that we need to write a message definition
needs_def_written = true;
}
else
topic_info = key->second;
}
// Get information about possible latching and callerid from the connection header
bool latching = false;
std::string callerid("");
if (connection_header != NULL) {
ros::M_string::iterator latch_iter = connection_header->find(std::string("latching"));
if (latch_iter != connection_header->end() && latch_iter->second != std::string("0"))
latching = true;
ros::M_string::iterator callerid_iter = connection_header->find(std::string("callerid"));
if (callerid_iter != connection_header->end())
callerid = callerid_iter->second;
}
{
//! \todo enabling this lock seems to cause a deadlock - do we even need it?
//boost::mutex::scoped_lock lock(record_mutex_);
@ -396,11 +417,26 @@ void Bag::write_(std::string const& topic, ros::Time const& time, T const& msg,
if (!chunk_open_)
startWritingChunk(time);
// Write a message definition record, if necessary
if (needs_def_written)
{
writeMessageDefinitionRecord(topic_info);
appendMessageDefinitionRecordToBuffer(outgoing_chunk_buffer_, topic_info);
// Write connection info record, if necessary
if (needs_conn_written) {
// Create connection info
connection_info = new ConnectionInfo();
connection_info->topic = topic;
connection_info->id = conn_id;
if (connection_header != NULL) {
for (ros::M_string::const_iterator i = connection_header->begin(); i != connection_header->end(); i++)
connection_info->header[i->first] = i->second;
}
else {
connection_info->header["topic"] = topic_info->topic;
connection_info->header["type"] = topic_info->datatype;
connection_info->header["md5sum"] = topic_info->md5sum;
connection_info->header["message_definition"] = topic_info->msg_def;
}
connection_infos_[conn_id] = connection_info;
writeConnectionRecord(connection_info);
appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info);
}
// Add to topic index
@ -411,7 +447,6 @@ void Bag::write_(std::string const& topic, ros::Time const& time, T const& msg,
std::multiset<IndexEntry>& chunk_index = curr_chunk_topic_indexes_[topic];
chunk_index.insert(chunk_index.end(), index_entry);
std::multiset<IndexEntry>& index = topic_indexes_[topic];
index.insert(index.end(), index_entry);
@ -419,32 +454,28 @@ void Bag::write_(std::string const& topic, ros::Time const& time, T const& msg,
curr_chunk_info_.topic_counts[topic]++;
// Write the message data
writeMessageDataRecord(topic, time, latching, callerid, msg);
writeMessageDataRecord(conn_id, time, msg);
// Check if we want to stop this chunk
uint32_t chunk_size = getChunkOffset();
ROS_DEBUG(" curr_chunk_size=%d (threshold=%d)", chunk_size, chunk_threshold_);
if (chunk_size > chunk_threshold_)
{
if (chunk_size > chunk_threshold_) {
// Empty the outgoing chunk
stopWritingChunk();
outgoing_chunk_buffer_.setSize(0);
// We no longer have a valid curr_chunk_info...
// We no longer have a valid curr_chunk_info
curr_chunk_info_.pos = -1;
}
}
}
template<class T>
void Bag::writeMessageDataRecord(std::string const& topic, ros::Time const& time, bool latching, std::string const& callerid, T const& msg) {
void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg) {
ros::M_string header;
header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA);
header[TOPIC_FIELD_NAME] = topic;
header[TIME_FIELD_NAME] = toHeaderString(&time);
if (latching) {
header[LATCHING_FIELD_NAME] = std::string("1");
header[CALLERID_FIELD_NAME] = callerid;
}
header[OP_FIELD_NAME] = toHeaderString(&OP_MSG_DATA);
header[CONNECTION_FIELD_NAME] = toHeaderString(&conn_id);
header[TIME_FIELD_NAME] = toHeaderString(&time);
// Assemble message in memory first, because we need to write its length
uint32_t msg_ser_len = ros::serialization::serializationLength(msg);
@ -456,19 +487,20 @@ void Bag::writeMessageDataRecord(std::string const& topic, ros::Time const& time
// TODO: with a little work here we can serialize directly into the file -- sweet
ros::serialization::serialize(s, msg);
ROS_DEBUG("Writing MSG_DATA [%llu:%d]: topic=%s sec=%d nsec=%d data_len=%d",
(unsigned long long) file_.getOffset(), getChunkOffset(), topic.c_str(), time.sec, time.nsec, msg_ser_len);
ROS_DEBUG("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
(unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len);
// TODO: If we are clever here, we can serialize into the
// outgoing_chunk_buffer and get rid of the record_buffer_ all
// together.
writeHeader(header, msg_ser_len);
writeHeader(header);
writeDataLength(msg_ser_len);
write((char*) record_buffer_.getData(), msg_ser_len);
// TODO: Using appendHeaderToBuffer is ugly. We need a better abstraction
appendHeaderToBuffer(outgoing_chunk_buffer_, header, msg_ser_len);
appendHeaderToBuffer(outgoing_chunk_buffer_, header);
appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
uint32_t offset = outgoing_chunk_buffer_.getSize();
outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);

View File

@ -56,25 +56,25 @@ static const unsigned char OP_CHUNK_INFO = 0x06;
static const unsigned char OP_CONNECTION = 0x07;
// Header field keys
static const std::string OP_FIELD_NAME = "op";
static const std::string TOPIC_FIELD_NAME = "topic";
static const std::string MD5_FIELD_NAME = "md5";
static const std::string TYPE_FIELD_NAME = "type";
static const std::string DEF_FIELD_NAME = "def";
static const std::string VER_FIELD_NAME = "ver";
static const std::string COUNT_FIELD_NAME = "count";
static const std::string INDEX_POS_FIELD_NAME = "index_pos"; // 1.2+
static const std::string TOPIC_COUNT_FIELD_NAME = "topic_count"; // 2.0+
static const std::string CHUNK_COUNT_FIELD_NAME = "chunk_count"; // 2.0+
static const std::string CHUNK_POS_FIELD_NAME = "chunk_pos"; // 2.0+
static const std::string COMPRESSION_FIELD_NAME = "compression"; // 2.0+
static const std::string SIZE_FIELD_NAME = "size"; // 2.0+
static const std::string TIME_FIELD_NAME = "time"; // 2.0+
static const std::string START_TIME_FIELD_NAME = "start_time"; // 2.0+
static const std::string END_TIME_FIELD_NAME = "end_time"; // 2.0+
static const std::string CONNECTION_FIELD_NAME = "conn"; // 2.0+
static const std::string OP_FIELD_NAME = "op";
static const std::string TOPIC_FIELD_NAME = "topic";
static const std::string VER_FIELD_NAME = "ver";
static const std::string COUNT_FIELD_NAME = "count";
static const std::string INDEX_POS_FIELD_NAME = "index_pos"; // 1.2+
static const std::string CONNECTION_COUNT_FIELD_NAME = "conn_count"; // 2.0+
static const std::string CHUNK_COUNT_FIELD_NAME = "chunk_count"; // 2.0+
static const std::string CHUNK_POS_FIELD_NAME = "chunk_pos"; // 2.0+
static const std::string COMPRESSION_FIELD_NAME = "compression"; // 2.0+
static const std::string SIZE_FIELD_NAME = "size"; // 2.0+
static const std::string TIME_FIELD_NAME = "time"; // 2.0+
static const std::string START_TIME_FIELD_NAME = "start_time"; // 2.0+
static const std::string END_TIME_FIELD_NAME = "end_time"; // 2.0+
static const std::string CONNECTION_FIELD_NAME = "conn"; // 2.0+
// Legacy fields
static const std::string MD5_FIELD_NAME = "md5"; // <2.0
static const std::string TYPE_FIELD_NAME = "type"; // <2.0
static const std::string DEF_FIELD_NAME = "def"; // <2.0
static const std::string SEC_FIELD_NAME = "sec"; // <2.0
static const std::string NSEC_FIELD_NAME = "nsec"; // <2.0
static const std::string LATCHING_FIELD_NAME = "latching"; // <2.0

View File

@ -37,21 +37,31 @@
#include <ros/exception.h>
using ros::Exception;
namespace rosbag {
//! Exception thrown if trying to add or read from an unopened recorder/player
class BagNotOpenException : public Exception { };
//! Base class for rosbag exceptions
class BagException : public ros::Exception
{
public:
BagException(std::string const& msg) : ros::Exception(msg) { }
};
//! Exception thrown when assorted IO problems
class BagIOException : public Exception { };
//! Exception thrown if trying to read from or write to an unopened bag
class BagNotOpenException : public BagException { };
//! Exception thrown if an invalid MsgPos (such as from another bag) is passed to seek
class InvalidMsgPosException : public Exception { };
//! Exception thrown when on IO problems
class BagIOException : public BagException
{
public:
BagIOException(std::string const& msg) : BagException(msg) { }
};
//! Exception thrown if trying to instantiate a MsgInstance as the wrong type
class InstantiateException : public Exception { };
//! Exception thrown on problems reading the bag format
class BagFormatException : public BagException
{
public:
BagFormatException(std::string const& msg) : BagException(msg) { }
};
}

View File

@ -53,15 +53,15 @@ class Query
public:
//! The base query takes an optional time-range
/*!
* param start_time the beginning of the time_range for the query
* param end_time the end of the time_range for the query
* param start_time the beginning of the time_range for the query
* param end_time the end of the time_range for the query
*/
Query(ros::Time const& start_time = ros::TIME_MIN,
ros::Time const& end_time = ros::TIME_MAX);
virtual ~Query();
ros::Time getStartTime() const; //!< Accessor for start-timep
ros::Time getEndTime() const; //!< Accessor for end-timep
ros::Time getStartTime() const; //!< Get the start-time
ros::Time getEndTime() const; //!< Get the end-time
//! Virtual function which determines topic-inclusion
/*!
@ -71,58 +71,60 @@ public:
virtual bool evaluate(TopicInfo const* topic_info) const;
//! Virtual function to clone so we can store a copy of the query
virtual Query* clone() const { return new Query(*this); }
virtual Query* clone() const;
private:
ros::Time start_time_;
ros::Time end_time_;
};
//! A Query the returns messages on a specified topic
//! A Query the returns messages on the specified topics
class TopicQuery : public Query
{
public:
TopicQuery(std::string const& topic,
ros::Time const& start_time = ros::TIME_MIN,
ros::Time const& end_time = ros::TIME_MAX);
TopicQuery(std::vector<std::string> const& topics,
ros::Time const& start_time = ros::TIME_MIN,
ros::Time const& end_time = ros::TIME_MAX);
virtual bool evaluate(TopicInfo const*) const;
virtual Query* clone() const { return new TopicQuery(*this); }
virtual Query* clone() const;
private:
std::vector<std::string> topics_;
};
//! A Query the returns messages of a specified type
//! A Query the returns messages of the specified types
class TypeQuery : public Query
{
public:
TypeQuery(std::vector<std::string> const& topics,
ros::Time const& start_time = ros::TIME_MIN,
ros::Time const& end_time = ros::TIME_MAX);
TypeQuery(std::string const& type,
ros::Time const& start_time = ros::TIME_MIN,
ros::Time const& end_time = ros::TIME_MAX);
TypeQuery(std::vector<std::string> const& types,
ros::Time const& start_time = ros::TIME_MIN,
ros::Time const& end_time = ros::TIME_MAX);
virtual bool evaluate(TopicInfo const*) const;
virtual Query* clone() const { return new TypeQuery(*this); }
virtual Query* clone() const;
private:
std::vector<std::string> topics_;
std::vector<std::string> types_;
};
//! Pairs of queries and the bags they come from (used internally by View)
struct BagQuery
{
Bag* bag;
Query* query;
BagQuery(Bag* _bag, Query const& _query, uint32_t _bag_revision);
~BagQuery();
Bag* bag;
Query* query;
uint32_t bag_revision;
BagQuery(Bag* _bag, const Query& _query, uint32_t _bag_revision) : bag(_bag), bag_revision(_bag_revision)
{
query = _query.clone();
}
~BagQuery() {
delete query;
}
};
struct MessageRange

View File

@ -38,11 +38,19 @@
#include <map>
#include <vector>
#include "ros/message.h"
#include "ros/time.h"
namespace rosbag
{
struct ConnectionInfo
{
uint32_t id;
std::string topic;
ros::M_string header;
};
struct TopicInfo
{
std::string topic;

File diff suppressed because it is too large Load Diff

View File

@ -53,6 +53,7 @@ bool MessageInstance::getLatching() const {
return false;
}
// @todo: this should cache the header
std::string MessageInstance::getCallerId() const {
ros::Header header = bag_->readMessageDataHeader(index_entry_);
ros::M_string& fields = *header.getValues();

View File

@ -117,8 +117,7 @@ void Player::publish() {
ROS_INFO("Opening %s", filename.c_str());
shared_ptr<Bag> bag(new Bag);
if (!bag->open(filename, bagmode::Read))
throw Exception((boost::format("Error opening file: %1%") % filename.c_str()).str());
bag->open(filename, bagmode::Read);
bags_.push_back(bag);
}

View File

@ -53,30 +53,70 @@ ros::Time Query::getEndTime() const { return end_time_; }
bool Query::evaluate(TopicInfo const*) const { return true; }
Query* Query::clone() const { return new Query(*this); }
// TopicQuery
TopicQuery::TopicQuery(std::string const& topic, ros::Time const& start_time, ros::Time const& end_time)
: Query(start_time, end_time)
{
topics_.push_back(topic);
}
TopicQuery::TopicQuery(std::vector<std::string> const& topics, ros::Time const& start_time, ros::Time const& end_time)
: Query(start_time, end_time), topics_(topics)
{
}
bool TopicQuery::evaluate(TopicInfo const* info) const {
foreach(string const& topic, topics_)
if (topic == info->topic)
{
return true;
}
return false;
}
Query* TopicQuery::clone() const { return new TopicQuery(*this); }
// TypeQuery
TypeQuery::TypeQuery(std::string const& type, ros::Time const& start_time, ros::Time const& end_time)
: Query(start_time, end_time)
{
types_.push_back(type);
}
TypeQuery::TypeQuery(std::vector<std::string> const& types, ros::Time const& start_time, ros::Time const& end_time)
: Query(start_time, end_time), types_(types)
{
}
bool TypeQuery::evaluate(TopicInfo const* info) const {
foreach(string const& type, types_)
if (type == info->datatype)
return true;
return false;
}
Query* TypeQuery::clone() const { return new TypeQuery(*this); }
// BagQuery
BagQuery::BagQuery(Bag* _bag, Query const& _query, uint32_t _bag_revision) : bag(_bag), bag_revision(_bag_revision) {
query = _query.clone();
}
BagQuery::~BagQuery() {
delete query;
}
// MessageRange
MessageRange::MessageRange(std::multiset<IndexEntry>::const_iterator const& _begin,
std::multiset<IndexEntry>::const_iterator const& _end,
TopicInfo const* _topic_info,
BagQuery const* _bag_query)
std::multiset<IndexEntry>::const_iterator const& _end,
TopicInfo const* _topic_info,
BagQuery const* _bag_query)
: begin(_begin), end(_end), topic_info(_topic_info), bag_query(_bag_query)
{
}

View File

@ -279,8 +279,11 @@ void Recorder::startWriting() {
bag_.setCompression(options_.compression);
updateFilenames();
if (!bag_.open(write_filename_, bagmode::Write)) {
ROS_FATAL("Could not open output file: %s", write_filename_.c_str());
try {
bag_.open(write_filename_, bagmode::Write);
}
catch (rosbag::BagException e) {
ROS_ERROR(e.what());
exit_code_ = 1;
ros::shutdown();
}
@ -360,19 +363,22 @@ void Recorder::doRecordSnapshotter() {
string target_filename = out_queue.filename;
string write_filename = target_filename + string(".active");
if (!bag_.open(write_filename, bagmode::Write)) {
ROS_ERROR("Could not open file: %s", out_queue.filename.c_str());
try {
bag_.open(write_filename, bagmode::Write);
}
else {
while (!out_queue.queue->empty()) {
OutgoingMessage out = out_queue.queue->front();
out_queue.queue->pop();
bag_.write(out.topic, out.time, out.msg);
}
stopWriting();
catch (rosbag::BagException ex) {
ROS_ERROR(ex.what());
return;
}
while (!out_queue.queue->empty()) {
OutgoingMessage out = out_queue.queue->front();
out_queue.queue->pop();
bag_.write(out.topic, out.time, out.msg);
}
stopWriting();
}
}

View File

@ -118,16 +118,17 @@ class Bag(object):
self._reader = None
self._topic_infos = {} # topic -> TopicInfo
self._topic_infos = {} # topic -> TopicInfo
self._file_header_pos = None
self._index_data_pos = 0 # (1.2+)
self._topic_indexes = {} # topic -> IndexEntry[] (1.2+)
self._index_data_pos = 0 # (1.2+)
self._topic_indexes = {} # topic -> IndexEntry[] (1.2+)
self._topic_count = 0 # (2.0)
self._chunk_count = 0 # (2.0)
self._chunk_infos = [] # ChunkInfo[] (2.0)
self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0)
self._connection_infos = {} # topic -> ConnectionInfo
self._connection_count = 0 # (2.0)
self._chunk_count = 0 # (2.0)
self._chunk_infos = [] # ChunkInfo[] (2.0)
self._chunk_headers = {} # chunk_pos -> ChunkHeader (2.0)
self._buffer = StringIO()
@ -281,8 +282,10 @@ class Bag(object):
if not self._chunk_open:
self._start_writing_chunk(t)
# Write message definition record, if necessary
if topic not in self._topic_infos:
# Write connection record, if necessary
if topic not in self._connection_infos:
conn_id = len(self._connection_infos)
if raw:
msg_type, serialized_bytes, md5sum, pytype = msg
@ -298,12 +301,16 @@ class Bag(object):
print >> sys.stderr, 'WARNING: md5sum of loaded type [%s] does not match that specified' % msg_type
#raise ROSBagException('md5sum of loaded type does not match that of data being recorded')
topic_info = _TopicInfo(topic, msg_type, md5sum, pytype._full_text)
header = { 'topic' : topic, 'type' : msg_type, 'md5sum' : md5sum, 'message_definition' : pytype._full_text }
else:
topic_info = _TopicInfo(topic, msg.__class__._type, msg.__class__._md5sum, msg._full_text)
self._write_message_definition_record(topic_info)
self._topic_infos[topic] = topic_info
header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text }
connection_info = _ConnectionInfo(conn_id, topic, header)
self._write_connection_record(connection_info)
self._connection_infos[topic] = connection_info
self._topic_infos[topic] = connection_info.toTopicInfo()
# Update the current chunk's topic index
index_entry = _IndexEntry200(t, self._curr_chunk_info.pos, self._get_chunk_offset())
@ -558,7 +565,7 @@ class Bag(object):
elif self._version == 102:
# Get the op code of the first record. If it's FILE_HEADER, then we have an indexed 1.2 bag.
first_record_pos = self._file.tell()
header = _read_record_header(self._file)
header = _read_header(self._file)
op = _read_uint8_field(header, 'op')
self._file.seek(first_record_pos)
@ -601,9 +608,9 @@ class Bag(object):
# Remember this location as the start of the index
self._index_data_pos = self._file.tell()
# Write topic infos
for topic_info in self._topic_infos.values():
self._write_message_definition_record(topic_info)
# Write connection infos
for connection_info in self._connection_infos.values():
self._write_connection_record(connection_info)
# Write chunk infos
for chunk_info in self._chunk_infos:
@ -687,25 +694,25 @@ class Bag(object):
self._curr_compression = compression
def _write_file_header_record(self, index_pos, topic_count, chunk_count):
def _write_file_header_record(self, index_pos, connection_count, chunk_count):
header = {
'op': _pack_uint8(_OP_FILE_HEADER),
'index_pos': _pack_uint64(index_pos),
'topic_count': _pack_uint32(topic_count),
'conn_count': _pack_uint32(connection_count),
'chunk_count': _pack_uint32(chunk_count)
}
_write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH)
def _write_message_definition_record(self, topic_info):
def _write_connection_record(self, connection_info):
header = {
'op': _pack_uint8(_OP_MSG_DEF),
'topic': topic_info.topic,
'md5': topic_info.md5sum,
'type': topic_info.datatype,
'def': topic_info.msg_def
'op': _pack_uint8(_OP_CONNECTION),
'topic': connection_info.topic,
'conn': _pack_uint32(connection_info.id)
}
_write_record(self._output_file, header)
_write_header(self._output_file, header)
_write_header(self._output_file, connection_info.header)
def _write_message_data_record(self, topic, t, serialized_bytes):
header = {
'op': _pack_uint8(_OP_MSG_DATA),
@ -720,7 +727,7 @@ class Bag(object):
'compression': chunk_header.compression,
'size': _pack_uint32(chunk_header.uncompressed_size)
}
_write_record_header(self._file, header)
_write_header(self._file, header)
self._file.write(_pack_uint32(chunk_header.compressed_size))
@ -771,12 +778,26 @@ _OP_FILE_HEADER = 0x03
_OP_INDEX_DATA = 0x04
_OP_CHUNK = 0x05
_OP_CHUNK_INFO = 0x06
_OP_CONNECTION = 0x07
_VERSION = '#ROSBAG V2.0'
_FILE_HEADER_LENGTH = 4096
_INDEX_VERSION = 1
_CHUNK_INDEX_VERSION = 1
class _ConnectionInfo(object):
def __init__(self, id, topic, header):
self.id = id
self.topic = topic
self.header = header
def toTopicInfo(self):
h = self.header
return _TopicInfo(self.topic, h['type'], h['md5sum'], h['message_definition'])
def __str__(self):
return '%d on %s: %s' % (self.id, self.topic, str(self.header))
class _TopicInfo(object):
def __init__(self, topic, datatype, md5sum, msg_def):
self.topic = topic
@ -879,7 +900,7 @@ def _read_uint64_field(header, field): return _read_field(header, field, _unpack
def _read_time_field (header, field): return _read_field(header, field, _unpack_time)
def _write_record(f, header, data='', padded_size=None):
header_str = _write_record_header(f, header)
header_str = _write_header(f, header)
if padded_size is not None:
header_len = len(header_str)
@ -890,39 +911,39 @@ def _write_record(f, header, data='', padded_size=None):
_write_sized(f, data)
def _write_record_header(f, header):
def _write_header(f, header):
header_str = ''.join([_pack_uint32(len(k) + 1 + len(v)) + k + '=' + v for k, v in header.items()])
_write_sized(f, header_str)
return header_str
def _read_record_header(f, req_op=None):
def _read_header(f, req_op=None):
bag_pos = f.tell()
# Read record header
# Read header
try:
record_header = _read_sized(f)
header = _read_sized(f)
except ROSBagException, ex:
raise ROSBagFormatException('Error reading record header: %s' % str(ex))
raise ROSBagFormatException('Error reading header: %s' % str(ex))
# Parse header into a dict
header_dict = {}
while record_header != '':
while header != '':
# Read size
if len(record_header) < 4:
raise ROSBagFormatException('Error reading record header field')
(size,) = struct.unpack("<L", record_header[:4])
record_header = record_header[4:]
if len(header) < 4:
raise ROSBagFormatException('Error reading header field')
(size,) = struct.unpack("<L", header[:4])
header = header[4:]
# Read bytes
if len(record_header) < size:
raise ROSBagFormatException('Error reading record header field')
(name, sep, value) = record_header[:size].partition('=')
if len(header) < size:
raise ROSBagFormatException('Error reading header field')
(name, sep, value) = header[:size].partition('=')
if sep == '':
raise ROSBagFormatException('Error reading record header field')
raise ROSBagFormatException('Error reading header field')
header_dict[name] = value
record_header = record_header[size:]
header = header[size:]
# Check the op code of the header, if supplied
if req_op is not None:
@ -976,7 +997,7 @@ class _BagReader(object):
def read_message_definition_record(self, header=None):
if not header:
header = _read_record_header(self.bag._file, _OP_MSG_DEF)
header = _read_header(self.bag._file, _OP_MSG_DEF)
topic = _read_str_field(header, 'topic')
datatype = _read_str_field(header, 'type')
@ -1090,7 +1111,7 @@ class _BagReader102_Unindexed(_BagReader):
position = f.tell()
try:
header = _read_record_header(f)
header = _read_header(f)
except:
return
@ -1169,7 +1190,7 @@ class _BagReader102_Indexed(_BagReader):
self.bag._topic_infos[topic_info.topic] = topic_info
def read_file_header_record(self):
header = _read_record_header(self.bag._file, _OP_FILE_HEADER)
header = _read_header(self.bag._file, _OP_FILE_HEADER)
self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
@ -1179,7 +1200,7 @@ class _BagReader102_Indexed(_BagReader):
f = self.bag._file
try:
header = _read_record_header(f, _OP_INDEX_DATA)
header = _read_header(f, _OP_INDEX_DATA)
except struct.error:
return None
@ -1210,7 +1231,7 @@ class _BagReader102_Indexed(_BagReader):
# Skip any MSG_DEF records
while True:
header = _read_record_header(f)
header = _read_header(f)
op = _read_uint8_field(header, 'op')
if op != _OP_MSG_DEF:
break
@ -1282,10 +1303,14 @@ class _BagReader200(_BagReader):
# Seek to the end of the chunks
self.bag._file.seek(self.bag._index_data_pos)
# Read the message definition records (one for each topic)
for i in range(self.bag._topic_count):
topic_info = self.read_message_definition_record()
self.bag._topic_infos[topic_info.topic] = topic_info
# Read the connection records
for i in range(self.bag._connection_count):
connection_info = self.read_connection_record()
self.bag._connection_infos[connection_info.id] = connection_info
topic = connection_info.header['topic']
if topic not in self.bag._topic_infos:
self.bag._topic_infos[topic] = connection_info.toTopicInfo()
# Read the chunk info records
self.bag._chunk_infos = []
@ -1314,18 +1339,28 @@ class _BagReader200(_BagReader):
self.bag._topic_indexes[topic] = index
def read_file_header_record(self):
header = _read_record_header(self.bag._file, _OP_FILE_HEADER)
header = _read_header(self.bag._file, _OP_FILE_HEADER)
self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
self.bag._topic_count = _read_uint32_field(header, 'topic_count')
self.bag._index_data_pos = _read_uint64_field(header, 'index_pos')
self.bag._chunk_count = _read_uint32_field(header, 'chunk_count')
self.bag._connection_count = _read_uint32_field(header, 'conn_count')
_read_record_data(self.bag._file)
def read_connection_record(self):
header = _read_header(self.bag._file, _OP_CONNECTION)
conn_id = _read_uint32_field(header, 'conn')
topic = _read_str_field (header, 'topic')
connection_header = _read_header(self.bag._file)
return _ConnectionInfo(conn_id, topic, connection_header)
def read_chunk_info_record(self):
f = self.bag._file
header = _read_record_header(f, _OP_CHUNK_INFO)
header = _read_header(f, _OP_CHUNK_INFO)
chunk_info_version = _read_uint32_field(header, 'ver')
@ -1350,7 +1385,7 @@ class _BagReader200(_BagReader):
raise ROSBagFormatException('Unknown chunk info record version: %d' % chunk_info_version)
def read_chunk_header(self):
header = _read_record_header(self.bag._file, _OP_CHUNK)
header = _read_header(self.bag._file, _OP_CHUNK)
compression = _read_str_field (header, 'compression')
uncompressed_size = _read_uint32_field(header, 'size')
@ -1364,7 +1399,7 @@ class _BagReader200(_BagReader):
def read_topic_index_record(self):
f = self.bag._file
header = _read_record_header(f, _OP_INDEX_DATA)
header = _read_header(f, _OP_INDEX_DATA)
index_version = _read_uint32_field(header, 'ver')
topic = _read_str_field (header, 'topic')
@ -1415,11 +1450,11 @@ class _BagReader200(_BagReader):
f = self.decompressed_chunk_io
f.seek(offset)
# Skip any MSG_DEF records
# Skip any CONNECTION records
while True:
header = _read_record_header(f)
header = _read_header(f)
op = _read_uint8_field(header, 'op')
if op != _OP_MSG_DEF:
if op != _OP_CONNECTION:
break
_read_record_data(f)

View File

@ -120,17 +120,16 @@ View::~View() {
}
//! Simply copy the merge_queue state into the iterator
View::iterator View::begin()
{
View::iterator View::begin() {
update();
return iterator(this);
}
//! Default constructed iterator signifies end
View::iterator View::end() { return iterator(this, true); }
View::iterator View::end() { return iterator(this, true); }
//! \todo: this doesn't work for now
uint32_t View::size() const { return 0; }
uint32_t View::size() const { return 0; }
void View::addQuery(Bag& bag, Query const& query) {
vector<ViewIterHelper> iters;
@ -140,78 +139,49 @@ void View::addQuery(Bag& bag, Query const& query) {
updateQueries(queries_.back());
}
void View::updateQueries(BagQuery* q)
{
void View::updateQueries(BagQuery* q) {
for (map<string, TopicInfo*>::iterator i = q->bag->topic_infos_.begin(); i != q->bag->topic_infos_.end(); i++) {
// Skip if the query doesn't evaluate to true
if (!q->query->evaluate(i->second))
continue;
map<string, multiset<IndexEntry> >::iterator j = q->bag->topic_indexes_.find(i->second->topic);
// Skip if the bag doeesn't have the corresponding index
// Skip if the bag doesn't have the corresponding index
if (j == q->bag->topic_indexes_.end())
continue;
// lower_bound/upper_bound do a binary search to find the appropriate range of Index Entries given our time range
std::multiset<IndexEntry>::const_iterator begin = std::lower_bound(j->second.begin(), j->second.end(), q->query->getStartTime(), IndexEntryCompare());
std::multiset<IndexEntry>::const_iterator end = std::upper_bound(j->second.begin(), j->second.end(), q->query->getEndTime(), IndexEntryCompare());
std::multiset<IndexEntry>::const_iterator end = std::upper_bound(j->second.begin(), j->second.end(), q->query->getEndTime(), IndexEntryCompare());
TopicInfo* topic_info = i->second;
// todo: this could be made faster with a map of maps
bool found = false;
// This could be made faster with a map of maps
for (vector<MessageRange*>::iterator k = ranges_.begin();
k != ranges_.end();
k++)
{
for (vector<MessageRange*>::iterator k = ranges_.begin(); k != ranges_.end(); k++) {
MessageRange* r = *k;
// If the topic and query are already in our ranges, we update
if (r->bag_query == q && r->topic_info->topic == topic_info->topic)
{
if (r->bag_query == q && r->topic_info->topic == topic_info->topic) {
r->begin = begin;
r->end = end;
found = true;
r->end = end;
found = true;
break;
}
}
if (!found)
ranges_.push_back(new MessageRange(begin, end, topic_info, q));
}
view_revision_ += 1;
view_revision_++;
}
void View::update() {
// todo this can be completely skipped if the bag is read-only
void View::update()
{
// TODO: This can be completely skipped if the bag is read-only
foreach(BagQuery* query, queries_)
{
if (query->bag->bag_revision_ != query->bag_revision)
{
// ROS_DEBUG("Query has been outdated by bag -- re-evauating");
// Deleting affected range
/*
for (std::vector<MessageRange*>::iterator iter = ranges_.begin();
iter != ranges_.end();)
{
if ((*iter)->bag_query == query)
{
// ROS_DEBUG("Erasing corresponding range");
delete *iter;
iter = ranges_.erase(iter);
} else {
iter++;
}
}
*/
foreach(BagQuery* query, queries_) {
if (query->bag->bag_revision_ != query->bag_revision) {
updateQueries(query);
query->bag_revision = query->bag->bag_revision_;
}

View File

@ -1,3 +1,5 @@
#!/usr/bin/env python
#
# test_bag.py
PKG = 'rosbag'

View File

@ -43,40 +43,6 @@
#define foreach BOOST_FOREACH
struct IndexEntryMultiSetCompare
{
bool operator()(rosbag::IndexEntry const& a, rosbag::IndexEntry const& b) const { return a.time < b.time; }
};
TEST(rosbag, multiset_vs_vector)
{
int count = 1000 * 1000;
std::multiset<rosbag::IndexEntry, IndexEntryMultiSetCompare> s;
ros::Time start = ros::Time::now();
for (int i = 0; i < count; i++) {
rosbag::IndexEntry e;
e.time = ros::Time::now();
e.chunk_pos = 0;
e.offset = 0;
s.insert(s.end(), e);
}
ros::Time end = ros::Time::now();
std::cout << "multiset: " << end - start << std::endl;
std::vector<rosbag::IndexEntry> v;
start = ros::Time::now();
for (int i = 0; i < count; i++) {
rosbag::IndexEntry e;
e.time = ros::Time::now();
e.chunk_pos = 0;
e.offset = 0;
v.push_back(e);
}
end = ros::Time::now();
std::cout << "vector: " << end - start << std::endl;
}
class BagTest : public testing::Test
{
protected:
@ -161,7 +127,6 @@ TEST(rosbag, simpleread)
view.addQuery(bag, rosbag::TopicQuery(topics));
foreach(rosbag::MessageInstance const m, view) {
std_msgs::String::ConstPtr s = m.instantiate<std_msgs::String>();
if (s != NULL)
ASSERT_EQ(s->data, std::string("foo"));
@ -184,21 +149,11 @@ TEST(rosbag, timequery)
for (int i = 0; i < 1000; i++) {
imsg.data = i;
switch (rand() % 5) {
case 0:
outbag.write("t0", ros::Time(i, 0), imsg);
break;
case 1:
outbag.write("t1", ros::Time(i, 0), imsg);
break;
case 2:
outbag.write("t2", ros::Time(i, 0), imsg);
break;
case 3:
outbag.write("t2", ros::Time(i, 0), imsg);
break;
case 4:
outbag.write("t4", ros::Time(i, 0), imsg);
break;
case 0: outbag.write("t0", ros::Time(i, 0), imsg); break;
case 1: outbag.write("t1", ros::Time(i, 0), imsg); break;
case 2: outbag.write("t2", ros::Time(i, 0), imsg); break;
case 3: outbag.write("t2", ros::Time(i, 0), imsg); break;
case 4: outbag.write("t4", ros::Time(i, 0), imsg); break;
}
}
outbag.close();
@ -213,8 +168,7 @@ TEST(rosbag, timequery)
foreach(rosbag::MessageInstance const m, view) {
std_msgs::Int32::ConstPtr imsg = m.instantiate<std_msgs::Int32>();
if (imsg != NULL)
{
if (imsg != NULL) {
ASSERT_EQ(imsg->data, i++);
ASSERT_TRUE(m.getTime() < ros::Time(783,0));
}
@ -235,26 +189,11 @@ TEST(rosbag, topicquery)
for (int i = 0; i < 1000; i++) {
switch (rand() % 5) {
case 0:
imsg.data = j0++;
outbag.write("t0", ros::Time(i, 0), imsg);
break;
case 1:
imsg.data = j0++;
outbag.write("t1", ros::Time(i, 0), imsg);
break;
case 2:
imsg.data = j1++;
outbag.write("t2", ros::Time(i, 0), imsg);
break;
case 3:
imsg.data = j1++;
outbag.write("t3", ros::Time(i, 0), imsg);
break;
case 4:
imsg.data = j1++;
outbag.write("t4", ros::Time(i, 0), imsg);
break;
case 0: imsg.data = j0++; outbag.write("t0", ros::Time(i, 0), imsg); break;
case 1: imsg.data = j0++; outbag.write("t1", ros::Time(i, 0), imsg); break;
case 2: imsg.data = j1++; outbag.write("t2", ros::Time(i, 0), imsg); break;
case 3: imsg.data = j1++; outbag.write("t3", ros::Time(i, 0), imsg); break;
case 4: imsg.data = j1++; outbag.write("t4", ros::Time(i, 0), imsg); break;
}
}
outbag.close();
@ -272,9 +211,7 @@ TEST(rosbag, topicquery)
foreach(rosbag::MessageInstance const m, view) {
std_msgs::Int32::ConstPtr imsg = m.instantiate<std_msgs::Int32>();
if (imsg != NULL)
{
ASSERT_EQ(imsg->data, i++);
}
}
bag.close();
@ -292,21 +229,11 @@ TEST(rosbag, verifymultibag)
for (int i = 0; i < 1000; i++) {
imsg.data = i;
switch (rand() % 5) {
case 0:
outbag1.write("t0", ros::Time::now(), imsg);
break;
case 1:
outbag1.write("t1", ros::Time::now(), imsg);
break;
case 2:
outbag1.write("t2", ros::Time::now(), imsg);
break;
case 3:
outbag2.write("t0", ros::Time::now(), imsg);
break;
case 4:
outbag2.write("t1", ros::Time::now(), imsg);
break;
case 0: outbag1.write("t0", ros::Time::now(), imsg); break;
case 1: outbag1.write("t1", ros::Time::now(), imsg); break;
case 2: outbag1.write("t2", ros::Time::now(), imsg); break;
case 3: outbag2.write("t0", ros::Time::now(), imsg); break;
case 4: outbag2.write("t1", ros::Time::now(), imsg); break;
}
}
@ -373,11 +300,10 @@ TEST(rosbag, modifyview)
rosbag::View::iterator iter = view.begin();
for (int i = 0; i < 50; i++) {
std_msgs::Int32::ConstPtr imsg = iter->instantiate<std_msgs::Int32> ();
std_msgs::Int32::ConstPtr imsg = iter->instantiate<std_msgs::Int32>();
if (imsg != NULL) {
ASSERT_EQ(imsg->data, j0);
j0+=2;
j0 += 2;
}
iter++;
}
@ -385,23 +311,18 @@ TEST(rosbag, modifyview)
// We now add our query, and expect it to show up
view.addQuery(bag, rosbag::TopicQuery(t1));
for (int i = 0; i < 50; i++)
{
for (int i = 0; i < 50; i++) {
std_msgs::Int32::ConstPtr imsg = iter->instantiate<std_msgs::Int32>();
if (imsg != NULL)
{
if (imsg != NULL) {
ASSERT_EQ(imsg->data, j0);
j0+=2;
j0 += 2;
}
iter++;
imsg = iter->instantiate<std_msgs::Int32>();
if (imsg != NULL)
{
imsg = iter->instantiate<std_msgs::Int32>();
if (imsg != NULL) {
ASSERT_EQ(imsg->data, j1);
j1+=2;
j1 += 2;
}
iter++;
}
@ -409,12 +330,9 @@ TEST(rosbag, modifyview)
bag.close();
}
TEST(rosbag, modifybag)
{
rosbag::Bag rwbag;
// Looks like mode is largely being ignored at the moment.
rwbag.open("rwbag.bag", rosbag::bagmode::Write);// | rosbag::bagmode::Read);
std::vector<std::string> t0 = boost::assign::list_of("t0");
@ -430,40 +348,38 @@ TEST(rosbag, modifybag)
// Verify begin gets us to 5
rosbag::View::iterator iter1 = view.begin();
std_msgs::Int32::ConstPtr imsg = iter1->instantiate<std_msgs::Int32> ();
std_msgs::Int32::ConstPtr imsg = iter1->instantiate<std_msgs::Int32>();
ASSERT_EQ(imsg->data, 5);
for (int i = 0; i < 5; i++)
{
for (int i = 0; i < 5; i++) {
omsg.data = i;
rwbag.write("t0", ros::Time(i + 1, 0), omsg);
}
// New iterator should be at 0
rosbag::View::iterator iter2 = view.begin();
imsg = iter2->instantiate<std_msgs::Int32> ();
imsg = iter2->instantiate<std_msgs::Int32>();
ASSERT_EQ(imsg->data, 0);
// Increment it once
iter2++;
// Output additional messages after time 5
for (int i = 6; i < 10; i++)
{
for (int i = 6; i < 10; i++) {
omsg.data = i;
rwbag.write("t0", ros::Time(i + 1, 0), omsg);
}
// Iter2 should contain 1->10
for (int i = 1; i < 10; i++) {
imsg = iter2->instantiate<std_msgs::Int32> ();
imsg = iter2->instantiate<std_msgs::Int32>();
ASSERT_EQ(imsg->data, i);
iter2++;
}
// Iter1 should contain 5->10
for (int i = 5; i < 10; i++) {
imsg = iter1->instantiate<std_msgs::Int32> ();
imsg = iter1->instantiate<std_msgs::Int32>();
ASSERT_EQ(imsg->data, i);
iter1++;
}
@ -471,8 +387,6 @@ TEST(rosbag, modifybag)
rwbag.close();
}
TEST_F(BagTest, WriteThenReadWorks) {
std::string filename("test/WriteThenRead.bag");
@ -492,11 +406,13 @@ TEST_F(BagTest, AppendWorks) {
b1.open(filename, rosbag::bagmode::Write);
b1.write("chatter", ros::Time::now(), foo_);
b1.close();
dumpContents(b1);
rosbag::Bag b2;
b2.open(filename, rosbag::bagmode::Append);
b2.write("numbers", ros::Time::now(), i_);
b2.close();
dumpContents(b2);
checkContents(filename);
}
@ -518,7 +434,6 @@ TEST_F(BagTest, ReadAppendWorks) {
checkContents(filename);
}
TEST_F(BagTest, ChunkedFileWorks) {
std::string filename("test/ChunkedFile.bag");