rosbag: new BagUnindexedException

This commit is contained in:
Tim Field 2010-05-07 09:25:33 +00:00
parent 975f5dc7a6
commit 99cdb63157
5 changed files with 111 additions and 148 deletions

View File

@ -56,7 +56,6 @@
#include <boost/format.hpp> #include <boost/format.hpp>
#include <boost/iterator/iterator_facade.hpp> #include <boost/iterator/iterator_facade.hpp>
#include <boost/thread/mutex.hpp>
namespace rosbag { namespace rosbag {
@ -111,7 +110,7 @@ public:
* \param topic The topic name * \param topic The topic name
* \param event The message event to be added * \param event The message event to be added
* *
* Can throw BagNotOpenException or BagIOException * Can throw BagIOException
*/ */
template<class T> template<class T>
void write(std::string const& topic, ros::MessageEvent<T> const& event); void write(std::string const& topic, ros::MessageEvent<T> const& event);
@ -123,7 +122,7 @@ public:
* \param msg The message to be added * \param msg The message to be added
* \param connection_header A connection header. * \param connection_header A connection header.
* *
* Can throw BagNotOpenException or BagIOException * Can throw BagIOException
*/ */
template<class T> template<class T>
void write(std::string const& topic, ros::Time const& time, T const& msg, void write(std::string const& topic, ros::Time const& time, T const& msg,
@ -136,7 +135,7 @@ public:
* \param msg The message to be added * \param msg The message to be added
* \param connection_header A connection header. * \param connection_header A connection header.
* *
* Can throw BagNotOpenException or BagIOException * Can throw BagIOException
*/ */
template<class T> template<class T>
void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg, void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T const> const& msg,
@ -149,15 +148,12 @@ public:
* \param msg The message to be added * \param msg The message to be added
* \param connection_header A connection header. * \param connection_header A connection header.
* *
* Can throw BagNotOpenException or BagIOException * Can throw BagIOException
*/ */
template<class T> template<class T>
void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg, void write(std::string const& topic, ros::Time const& time, boost::shared_ptr<T> const& msg,
boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>()); boost::shared_ptr<ros::M_string> connection_header = boost::shared_ptr<ros::M_string>());
void dump();
private: private:
// This helper function actually does the write with an arbitrary serializable message // This helper function actually does the write with an arbitrary serializable message
template<class T> template<class T>
@ -204,13 +200,11 @@ private:
void readTopicIndexRecord102(); void readTopicIndexRecord102();
void readMessageDefinitionRecord102(); void readMessageDefinitionRecord102();
void readMessageDataRecord102(uint64_t offset, ros::Header& header) const;
ros::Header readMessageDataHeader(IndexEntry const& index_entry); ros::Header readMessageDataHeader(IndexEntry const& index_entry);
uint32_t readMessageDataSize(IndexEntry const& index_entry) const; uint32_t readMessageDataSize(IndexEntry const& index_entry) const;
// Would be nice not to have to template this on Stream. Also,
// we don't need to read the header here either. It just so
// happens to be the most efficient way to skip it at the moment.
template<typename Stream> template<typename Stream>
void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const; void readMessageDataIntoStream(IndexEntry const& index_entry, Stream& stream) const;
@ -225,14 +219,13 @@ private:
void writeDataLength(uint32_t data_len); void writeDataLength(uint32_t data_len);
void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields); void appendHeaderToBuffer(Buffer& buf, ros::M_string const& fields);
void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len); void appendDataLengthToBuffer(Buffer& buf, uint32_t data_len);
void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const; void readHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const; void readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::Header& header, uint32_t& data_size, uint32_t& bytes_read) const;
bool readHeader(ros::Header& header) const; bool readHeader(ros::Header& header) const;
bool readDataLength(uint32_t& data_size) const; bool readDataLength(uint32_t& data_size) const;
bool isOp(ros::M_string& fields, uint8_t reqOp) const; bool isOp(ros::M_string& fields, uint8_t reqOp) const;
void loadMessageDataRecord102(uint64_t offset, ros::Header& header) const;
// Header fields // Header fields
template<typename T> template<typename T>
@ -271,8 +264,6 @@ private:
uint64_t index_data_pos_; uint64_t index_data_pos_;
uint32_t connection_count_; uint32_t connection_count_;
uint32_t chunk_count_; uint32_t chunk_count_;
boost::mutex record_mutex_;
// Current chunk // Current chunk
bool chunk_open_; bool chunk_open_;
@ -367,86 +358,86 @@ boost::shared_ptr<T> Bag::instantiateBuffer(IndexEntry const& index_entry) const
{ {
case 200: case 200:
{ {
decompressChunk(index_entry.chunk_pos); decompressChunk(index_entry.chunk_pos);
// Read the message header // Read the message header
ros::Header header; ros::Header header;
uint32_t data_size; uint32_t data_size;
uint32_t bytes_read; uint32_t bytes_read;
readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read); readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
// Read the connection id from the header // Read the connection id from the header
uint32_t connection_id; uint32_t connection_id;
readField(*header.getValues(), CONNECTION_FIELD_NAME, true, &connection_id); readField(*header.getValues(), CONNECTION_FIELD_NAME, true, &connection_id);
std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id); std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
if (connection_iter == connections_.end()) if (connection_iter == connections_.end())
throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str()); throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
ConnectionInfo* connection_info = connection_iter->second; ConnectionInfo* connection_info = connection_iter->second;
boost::shared_ptr<T> p = boost::shared_ptr<T>(new T()); boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
// Set the connection header (if this is a ros::Message) // Set the connection header (if this is a ros::Message)
ros::assignSubscriptionConnectionHeader<T>(p.get(), connection_info->header); ros::assignSubscriptionConnectionHeader<T>(p.get(), connection_info->header);
ros::serialization::PreDeserializeParams<T> predes_params; ros::serialization::PreDeserializeParams<T> predes_params;
predes_params.message = p; predes_params.message = p;
predes_params.connection_header = connection_info->header; predes_params.connection_header = connection_info->header;
ros::serialization::PreDeserialize<T>::notify(predes_params); ros::serialization::PreDeserialize<T>::notify(predes_params);
// Deserialize the message // Deserialize the message
ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size); ros::serialization::IStream s(current_buffer_->getData() + index_entry.offset + bytes_read, data_size);
ros::serialization::deserialize(s, *p); ros::serialization::deserialize(s, *p);
return p; return p;
} }
case 102: case 102:
{ {
// Read the message record // Read the message record
ros::Header header; ros::Header header;
loadMessageDataRecord102(index_entry.chunk_pos, header); readMessageDataRecord102(index_entry.chunk_pos, header);
ros::M_string& fields = *header.getValues(); ros::M_string& fields = *header.getValues();
// Read the connection id from the header // Read the connection id from the header
std::string topic, latching("0"), callerid; std::string topic, latching("0"), callerid;
readField(fields, TOPIC_FIELD_NAME, true, topic); readField(fields, TOPIC_FIELD_NAME, true, topic);
readField(fields, LATCHING_FIELD_NAME, false, latching); readField(fields, LATCHING_FIELD_NAME, false, latching);
readField(fields, CALLERID_FIELD_NAME, false, callerid); readField(fields, CALLERID_FIELD_NAME, false, callerid);
std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic); std::map<std::string, uint32_t>::const_iterator topic_conn_id_iter = topic_connection_ids_.find(topic);
if (topic_conn_id_iter == topic_connection_ids_.end()) if (topic_conn_id_iter == topic_connection_ids_.end())
throw BagFormatException((boost::format("Unknown topic: %1%") % topic).str()); throw BagFormatException((boost::format("Unknown topic: %1%") % topic).str());
uint32_t connection_id = topic_conn_id_iter->second; uint32_t connection_id = topic_conn_id_iter->second;
std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id); std::map<uint32_t, ConnectionInfo*>::const_iterator connection_iter = connections_.find(connection_id);
if (connection_iter == connections_.end()) if (connection_iter == connections_.end())
throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str()); throw BagFormatException((boost::format("Unknown connection ID: %1%") % connection_id).str());
ConnectionInfo* connection_info = connection_iter->second; ConnectionInfo* connection_info = connection_iter->second;
boost::shared_ptr<T> p = boost::shared_ptr<T>(new T()); boost::shared_ptr<T> p = boost::shared_ptr<T>(new T());
// Create a new connection header, updated with the latching and callerid values // Create a new connection header, updated with the latching and callerid values
boost::shared_ptr<ros::M_string> message_header(new ros::M_string); boost::shared_ptr<ros::M_string> message_header(new ros::M_string);
for (ros::M_string::const_iterator i = connection_info->header->begin(); i != connection_info->header->end(); i++) for (ros::M_string::const_iterator i = connection_info->header->begin(); i != connection_info->header->end(); i++)
(*message_header)[i->first] = i->second; (*message_header)[i->first] = i->second;
(*message_header)["latching"] = latching; (*message_header)["latching"] = latching;
(*message_header)["callerid"] = callerid; (*message_header)["callerid"] = callerid;
// Set the connection header (if this is a ros::Message) // Set the connection header (if this is a ros::Message)
ros::assignSubscriptionConnectionHeader<T>(p.get(), message_header); ros::assignSubscriptionConnectionHeader<T>(p.get(), message_header);
ros::serialization::PreDeserializeParams<T> predes_params; ros::serialization::PreDeserializeParams<T> predes_params;
predes_params.message = p; predes_params.message = p;
predes_params.connection_header = message_header; predes_params.connection_header = message_header;
ros::serialization::PreDeserialize<T>::notify(predes_params); ros::serialization::PreDeserialize<T>::notify(predes_params);
// Deserialize the message // Deserialize the message
ros::serialization::IStream s(record_buffer_.getData(), record_buffer_.getSize()); ros::serialization::IStream s(record_buffer_.getData(), record_buffer_.getSize());
ros::serialization::deserialize(s, *p); ros::serialization::deserialize(s, *p);
return p; return p;
} }
default: default:
throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str()); throw BagFormatException((boost::format("Unhandled version: %1%") % version_).str());
@ -490,12 +481,7 @@ void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg,
} }
{ {
//! \todo enabling this lock seems to cause a deadlock - do we even need it?
//boost::mutex::scoped_lock lock(record_mutex_);
// Seek to the end of the file (needed in case previous operation was a read) // Seek to the end of the file (needed in case previous operation was a read)
//! \todo only do when necessary
ROS_DEBUG("Seeking for write...");
seek(0, std::ios::end); seek(0, std::ios::end);
file_size_ = file_.getOffset(); file_size_ = file_.getOffset();
@ -571,19 +557,15 @@ void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T cons
ros::serialization::OStream s(record_buffer_.getData(), msg_ser_len); ros::serialization::OStream s(record_buffer_.getData(), msg_ser_len);
// TODO: If we are clever here, we can serialize into the outgoing_chunk_buffer and get // todo: serialize into the outgoing_chunk_buffer & remove record_buffer_
// rid of the record_buffer_ altogether
ROS_DEBUG("SERIALIZE");
ros::serialization::serialize(s, msg); ros::serialization::serialize(s, msg);
// We do an extra seek here since writing our data record may // We do an extra seek here since writing our data record may
// have indirectly moved our file-pointer if it was a // have indirectly moved our file-pointer if it was a
// MessageInstance for our own bag // MessageInstance for our own bag
ROS_DEBUG("Seeking for write2...");
seek(0, std::ios::end); seek(0, std::ios::end);
file_size_ = file_.getOffset(); file_size_ = file_.getOffset();
ROS_DEBUG("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d", ROS_DEBUG("Writing MSG_DATA [%llu:%d]: conn=%d sec=%d nsec=%d data_len=%d",
(unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len); (unsigned long long) file_.getOffset(), getChunkOffset(), conn_id, time.sec, time.nsec, msg_ser_len);
@ -591,7 +573,7 @@ void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T cons
writeDataLength(msg_ser_len); writeDataLength(msg_ser_len);
write((char*) record_buffer_.getData(), msg_ser_len); write((char*) record_buffer_.getData(), msg_ser_len);
// TODO: Using appendHeaderToBuffer is ugly. We need a better abstraction // todo: use better abstraction than appendHeaderToBuffer
appendHeaderToBuffer(outgoing_chunk_buffer_, header); appendHeaderToBuffer(outgoing_chunk_buffer_, header);
appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len); appendDataLengthToBuffer(outgoing_chunk_buffer_, msg_ser_len);
@ -599,9 +581,11 @@ void Bag::writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T cons
outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len); outgoing_chunk_buffer_.setSize(outgoing_chunk_buffer_.getSize() + msg_ser_len);
memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len); memcpy(outgoing_chunk_buffer_.getData() + offset, record_buffer_.getData(), msg_ser_len);
// Update the current chunk time // Update the current chunk time range
if (time > curr_chunk_info_.end_time) if (time > curr_chunk_info_.end_time)
curr_chunk_info_.end_time = time; curr_chunk_info_.end_time = time;
else if (time < curr_chunk_info_.start_time)
curr_chunk_info_.start_time = time;
} }
} // namespace rosbag } // namespace rosbag

View File

@ -57,11 +57,11 @@ public:
ChunkedFile(); ChunkedFile();
~ChunkedFile(); ~ChunkedFile();
bool openWrite (std::string const& filename); //!< open file for writing void openWrite (std::string const& filename); //!< open file for writing
bool openRead (std::string const& filename); //!< open file for reading void openRead (std::string const& filename); //!< open file for reading
bool openReadWrite(std::string const& filename); //!< open file for reading & writing void openReadWrite(std::string const& filename); //!< open file for reading & writing
bool close(); //!< close the file void close(); //!< close the file
std::string getFileName() const; //!< return path of currently open file std::string getFileName() const; //!< return path of currently open file
uint64_t getOffset() const; //!< return current offset from the beginning of the file uint64_t getOffset() const; //!< return current offset from the beginning of the file
@ -69,20 +69,20 @@ public:
bool isOpen() const; //!< return true if file is open for reading or writing bool isOpen() const; //!< return true if file is open for reading or writing
bool good() const; //!< return true if hasn't reached end-of-file and no error bool good() const; //!< return true if hasn't reached end-of-file and no error
bool setReadMode(CompressionType type); void setReadMode(CompressionType type);
bool setWriteMode(CompressionType type); void setWriteMode(CompressionType type);
// File I/O // File I/O
size_t write(std::string const& s); void write(std::string const& s);
size_t write(void* ptr, size_t size); //!< write size bytes from ptr to the file void write(void* ptr, size_t size); //!< write size bytes from ptr to the file
size_t read(void* ptr, size_t size); //!< read size bytes from the file into ptr void read(void* ptr, size_t size); //!< read size bytes from the file into ptr
std::string getline(); std::string getline();
bool truncate(uint64_t length); bool truncate(uint64_t length);
bool seek(uint64_t offset, int origin = std::ios_base::beg); //!< seek to given offset from origin void seek(uint64_t offset, int origin = std::ios_base::beg); //!< seek to given offset from origin
void decompress(CompressionType compression, uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len); void decompress(CompressionType compression, uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len);
private: private:
bool open(std::string const& filename, std::string const& mode); void open(std::string const& filename, std::string const& mode);
void clearUnused(); void clearUnused();
private: private:

View File

@ -46,9 +46,6 @@ public:
BagException(std::string const& msg) : ros::Exception(msg) { } BagException(std::string const& msg) : ros::Exception(msg) { }
}; };
//! Exception thrown if trying to read from or write to an unopened bag
class BagNotOpenException : public BagException { };
//! Exception thrown when on IO problems //! Exception thrown when on IO problems
class BagIOException : public BagException class BagIOException : public BagException
{ {
@ -63,6 +60,13 @@ public:
BagFormatException(std::string const& msg) : BagException(msg) { } BagFormatException(std::string const& msg) : BagException(msg) { }
}; };
//! Exception thrown on problems reading the bag index
class BagUnindexedException : public BagException
{
public:
BagUnindexedException() : BagException("Bag unindexed") { }
};
} // namespace rosbag } // namespace rosbag
#endif #endif

View File

@ -95,6 +95,9 @@ public:
/*! Set the current time */ /*! Set the current time */
void setTime(const ros::Time& time); void setTime(const ros::Time& time);
/*! Get the current time */
ros::Time const& getTime() const;
/*! Run the clock for AT MOST duration /*! Run the clock for AT MOST duration
* *
* If horizon has been reached this function returns immediately * If horizon has been reached this function returns immediately
@ -147,15 +150,14 @@ private:
void setupTerminal(); void setupTerminal();
void restoreTerminal(); void restoreTerminal();
ros::Time getSysTime(); void doPublish(rosbag::MessageInstance const& m);
void doPublish(rosbag::MessageInstance const& m);
private: private:
PlayerOptions options_; PlayerOptions options_;
ros::NodeHandle node_handle_; ros::NodeHandle node_handle_;
bool paused_; bool paused_;
ros::WallTime paused_time_; ros::WallTime paused_time_;

View File

@ -42,7 +42,8 @@
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <bzlib.h> #include <bzlib.h>
//#include <zlib.h>
#include "rosbag/exceptions.h"
namespace rosbag { namespace rosbag {
@ -52,7 +53,6 @@ namespace compression
{ {
None = 0, None = 0,
BZ2 = 1, BZ2 = 1,
//ZLIB = 2
}; };
} }
typedef compression::CompressionType CompressionType; typedef compression::CompressionType CompressionType;
@ -67,16 +67,16 @@ public:
virtual CompressionType getCompressionType() const = 0; virtual CompressionType getCompressionType() const = 0;
virtual size_t write(void* ptr, size_t size) = 0; virtual void write(void* ptr, size_t size) = 0;
virtual size_t read (void* ptr, size_t size) = 0; virtual void read (void* ptr, size_t size) = 0;
virtual void decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len) = 0; virtual void decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len) = 0;
virtual void startWrite(); virtual void startWrite();
virtual void stopWrite(); virtual void stopWrite();
virtual void startRead(); virtual void startRead();
virtual void stopRead(); virtual void stopRead();
protected: protected:
FILE* getFilePointer(); FILE* getFilePointer();
@ -103,7 +103,6 @@ public:
private: private:
boost::shared_ptr<Stream> uncompressed_stream_; boost::shared_ptr<Stream> uncompressed_stream_;
boost::shared_ptr<Stream> bz2_stream_; boost::shared_ptr<Stream> bz2_stream_;
//boost::shared_ptr<Stream> zlib_stream_;
}; };
class UncompressedStream : public Stream class UncompressedStream : public Stream
@ -113,10 +112,10 @@ public:
CompressionType getCompressionType() const; CompressionType getCompressionType() const;
size_t write(void* ptr, size_t size); void write(void* ptr, size_t size);
size_t read(void* ptr, size_t size); void read(void* ptr, size_t size);
void decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len); void decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len);
}; };
/*! /*!
@ -129,18 +128,15 @@ public:
CompressionType getCompressionType() const; CompressionType getCompressionType() const;
void startWrite(); void startWrite();
size_t write(void* ptr, size_t size); void write(void* ptr, size_t size);
void stopWrite(); void stopWrite();
void startRead(); void startRead();
size_t read(void* ptr, size_t size); void read(void* ptr, size_t size);
void stopRead(); void stopRead();
void decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len); void decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len);
private:
void checkError() const;
private: private:
int verbosity_; //!< level of debugging output (0-4; 0 default). 0 is silent, 4 is max verbose debugging output int verbosity_; //!< level of debugging output (0-4; 0 default). 0 is silent, 4 is max verbose debugging output
@ -151,29 +147,6 @@ private:
int bzerror_; //!< last error from bzlib int bzerror_; //!< last error from bzlib
}; };
#if 0
/*!
* ZLIBStream use zlibc (http://www.zlib.net) for reading/writing compressed data in the ZLIB format.
*/
class ZLIBStream : public Stream
{
public:
ZLIBStream(ChunkedFile* file);
CompressionType getCompressionType() const;
void startWrite();
size_t write(void* ptr, size_t size);
void stopWrite();
void startRead();
size_t read(void* ptr, size_t size);
void stopRead();
void decompress(uint8_t* dest, unsigned int dest_len, uint8_t* source, unsigned int source_len);
};
#endif
} // namespace rosbag } // namespace rosbag
#endif #endif