rosbag: removed TopicInfo; everything uses the more informative ConnectionInfo

This commit is contained in:
Tim Field 2010-04-28 23:44:21 +00:00
parent c111c0c682
commit 3f88b6e0e2
6 changed files with 170 additions and 167 deletions

View File

@ -69,9 +69,7 @@ Bag::Bag() :
Bag::~Bag() {
close();
for (map<uint32_t, ConnectionInfo*>::iterator i = connection_infos_.begin(); i != connection_infos_.end(); i++)
delete i->second;
for (map<string, TopicInfo*>::iterator i = topic_infos_.begin(); i != topic_infos_.end(); i++)
for (map<uint32_t, ConnectionInfo*>::iterator i = connections_.begin(); i != connections_.end(); i++)
delete i->second;
}
@ -128,15 +126,15 @@ void Bag::openAppend(string const& filename) {
}
void Bag::write(string const& topic, Time const& time, ros::Message::ConstPtr msg) {
write_(topic, time, *msg, msg->__connection_header);
doWrite(topic, time, *msg, msg->__connection_header);
}
void Bag::write(string const& topic, Time const& time, ros::Message const& msg) {
write_(topic, time, msg, msg.__connection_header);
doWrite(topic, time, msg, msg.__connection_header);
}
void Bag::write(string const& topic, Time const& time, MessageInstance const& msg, shared_ptr<M_string> connection_header) {
write_(topic, time, msg, connection_header);
doWrite(topic, time, msg, connection_header);
}
void Bag::close() {
@ -191,7 +189,7 @@ void Bag::setCompression(CompressionType compression) {
void Bag::writeVersion() {
string version = string("#ROSBAG V") + VERSION + string("\n");
ROS_DEBUG("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
ROS_INFO("Writing VERSION [%llu]: %s", (unsigned long long) file_.getOffset(), version.c_str());
version_ = 200;
@ -214,7 +212,7 @@ void Bag::readVersion() {
version_ = version_major * 100 + version_minor;
ROS_DEBUG("Read VERSION: version=%d", version_);
ROS_INFO("Read VERSION: version=%d", version_);
}
int Bag::getVersion() const { return version_; }
@ -257,8 +255,8 @@ void Bag::startReadingVersion200() {
for (uint32_t i = 0; i < chunk_count_; i++)
readChunkInfoRecord();
// Read the topic indexes
foreach(ChunkInfo const& chunk_info, chunk_infos_) {
// Read the connection indexes for each chunk
foreach(ChunkInfo const& chunk_info, chunks_) {
curr_chunk_info_ = chunk_info;
seek(curr_chunk_info_.pos);
@ -268,20 +266,17 @@ void Bag::startReadingVersion200() {
readChunkHeader(chunk_header);
seek(chunk_header.compressed_size, std::ios::cur);
// Read the topic index records after the chunk
// Read the index records after the chunk
for (unsigned int i = 0; i < chunk_info.topic_counts.size(); i++)
readTopicIndexRecord();
readConnectionIndexRecord200();
}
// At this point we don't have a curr_chunk_info anymore so we reset it
curr_chunk_info_ = ChunkInfo();
}
// @todo: handle 1.2 bags with no index
void Bag::startReadingVersion102() {
// @todo: handle 1.2 bags with no index
ROS_DEBUG("Reading in version 1.2 bag");
// Read the file header record, which points to the start of the topic indexes
readFileHeaderRecord();
@ -294,18 +289,18 @@ void Bag::startReadingVersion102() {
// Read the topic index records, which point to the offsets of each message in the file
while (file_.getOffset() < filelength)
readTopicIndexRecord();
readTopicIndexRecord102();
// Read the message definition records (which are the first entry in the topic indexes)
for (map<string, multiset<IndexEntry> >::const_iterator i = topic_indexes_.begin(); i != topic_indexes_.end(); i++) {
multiset<IndexEntry> const& topic_index = i->second;
IndexEntry const& first_entry = *topic_index.begin();
IndexEntry const& first_entry = *topic_index.begin();
ROS_DEBUG("Reading message definition for %s at %llu", i->first.c_str(), (unsigned long long) first_entry.chunk_pos);
ROS_INFO("Reading message definition for %s at %llu", i->first.c_str(), (unsigned long long) first_entry.chunk_pos);
seek(first_entry.chunk_pos);
readMessageDefinitionRecord();
readMessageDefinitionRecord102();
}
}
@ -314,10 +309,10 @@ void Bag::startReadingVersion102() {
void Bag::writeFileHeaderRecord() {
boost::mutex::scoped_lock lock(record_mutex_);
connection_count_ = connection_infos_.size();
chunk_count_ = chunk_infos_.size();
connection_count_ = connections_.size();
chunk_count_ = chunks_.size();
ROS_DEBUG("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
ROS_INFO("Writing FILE_HEADER [%llu]: index_pos=%llu connection_count=%d chunk_count=%d",
(unsigned long long) file_.getOffset(), (unsigned long long) index_data_pos_, connection_count_, chunk_count_);
// Write file header record
@ -365,7 +360,7 @@ void Bag::readFileHeaderRecord() {
readField(fields, CHUNK_COUNT_FIELD_NAME, true, &chunk_count_);
}
ROS_DEBUG("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
ROS_INFO("Read FILE_HEADER: index_pos=%llu connection_count=%d chunk_count=%d",
(unsigned long long) index_data_pos_, connection_count_, chunk_count_);
// Skip the data section (just padding)
@ -399,7 +394,7 @@ void Bag::startWritingChunk(Time time) {
void Bag::stopWritingChunk() {
// Add this chunk to the index
chunk_infos_.push_back(curr_chunk_info_);
chunks_.push_back(curr_chunk_info_);
// Get the uncompressed and compressed sizes
uint32_t uncompressed_size = getChunkOffset();
@ -411,9 +406,10 @@ void Bag::stopWritingChunk() {
seek(curr_chunk_info_.pos);
writeChunkHeader(compression_, compressed_size, uncompressed_size);
// Write out the topic indexes and clear them
// Write out the indexes and clear them
seek(end_of_chunk_pos);
writeTopicIndexRecords();
writeIndexRecords();
curr_chunk_connection_indexes_.clear();
curr_chunk_topic_indexes_.clear();
// Flag that we're starting a new chunk
@ -430,7 +426,7 @@ void Bag::writeChunkHeader(CompressionType compression, uint32_t compressed_size
chunk_header.compressed_size = compressed_size;
chunk_header.uncompressed_size = uncompressed_size;
ROS_DEBUG("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
ROS_INFO("Writing CHUNK [%llu]: compression=%s compressed=%d uncompressed=%d",
(unsigned long long) file_.getOffset(), chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size);
M_string header;
@ -455,52 +451,52 @@ void Bag::readChunkHeader(ChunkHeader& chunk_header) {
readField(fields, COMPRESSION_FIELD_NAME, true, chunk_header.compression);
readField(fields, SIZE_FIELD_NAME, true, &chunk_header.uncompressed_size);
ROS_DEBUG("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
ROS_INFO("Read CHUNK: compression=%s size=%d uncompressed=%d (%f)", chunk_header.compression.c_str(), chunk_header.compressed_size, chunk_header.uncompressed_size, 100 * ((double) chunk_header.compressed_size) / chunk_header.uncompressed_size);
}
// Topic index records
// Index records
void Bag::writeTopicIndexRecords() {
void Bag::writeIndexRecords() {
boost::mutex::scoped_lock lock(record_mutex_);
for (map<string, multiset<IndexEntry> >::const_iterator i = curr_chunk_topic_indexes_.begin(); i != curr_chunk_topic_indexes_.end(); i++) {
string const& topic = i->first;
multiset<IndexEntry> const& topic_index = i->second;
for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
uint32_t connection_id = i->first;
multiset<IndexEntry> const& index = i->second;
// Write the index record header
uint32_t index_size = index.size();
M_string header;
header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA);
header[TOPIC_FIELD_NAME] = topic;
header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION);
uint32_t topic_index_size = topic_index.size();
header[COUNT_FIELD_NAME] = toHeaderString(&topic_index_size);
header[OP_FIELD_NAME] = toHeaderString(&OP_INDEX_DATA);
header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_id);
header[VER_FIELD_NAME] = toHeaderString(&INDEX_VERSION);
header[COUNT_FIELD_NAME] = toHeaderString(&index_size);
writeHeader(header);
writeDataLength(topic_index_size * sizeof(IndexEntry));
writeDataLength(index_size * sizeof(IndexEntry));
ROS_INFO("Writing INDEX_DATA: connection=%d ver=%d count=%d", connection_id, INDEX_VERSION, index_size);
ROS_DEBUG("Writing INDEX_DATA: topic=%s ver=%d count=%d", topic.c_str(), INDEX_VERSION, topic_index_size);
// Write the index record data (pairs of timestamp and position in file)
foreach(IndexEntry const& e, topic_index) {
foreach(IndexEntry const& e, index) {
write((char*) &e.time.sec, 4);
write((char*) &e.time.nsec, 4);
write((char*) &e.offset, 4);
ROS_DEBUG(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
ROS_INFO(" - %d.%d: %d", e.time.sec, e.time.nsec, e.offset);
}
}
}
void Bag::readTopicIndexRecord() {
void Bag::readTopicIndexRecord102() {
ros::Header header;
uint32_t data_size;
if (!readHeader(header) || !readDataLength(data_size))
throw BagFormatException("Error reading INDEX_DATA header");
M_string& fields = *header.getValues();
if (!isOp(fields, OP_INDEX_DATA))
throw BagFormatException("Expected INDEX_DATA record");
uint32_t index_version;
string topic;
uint32_t count;
@ -508,19 +504,13 @@ void Bag::readTopicIndexRecord() {
readField(fields, TOPIC_FIELD_NAME, true, topic);
readField(fields, COUNT_FIELD_NAME, true, &count);
ROS_DEBUG("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
ROS_INFO("Read INDEX_DATA: ver=%d topic=%s count=%d", index_version, topic.c_str(), count);
switch (index_version) {
case 0: return readTopicIndexDataVersion0(count, topic);
case 1: return readTopicIndexDataVersion1(count, topic, curr_chunk_info_.pos);
default:
if (index_version != 0)
throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
}
}
//! Store the position of the message in the chunk_pos field
void Bag::readTopicIndexDataVersion0(uint32_t count, string const& topic) {
// Read the index entry records
//! NOTE: stores the position of the message in the chunk_pos field
// todo: create fake connection infos
multiset<IndexEntry>& topic_index = topic_indexes_[topic];
for (uint32_t i = 0; i < count; i++) {
IndexEntry index_entry;
@ -532,15 +522,41 @@ void Bag::readTopicIndexDataVersion0(uint32_t count, string const& topic) {
index_entry.time = Time(sec, nsec);
index_entry.offset = 0;
ROS_DEBUG(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
ROS_INFO(" - %d.%d: %llu", sec, nsec, (unsigned long long) index_entry.chunk_pos);
topic_index.insert(topic_index.end(), index_entry);
}
}
void Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64_t chunk_pos) {
// Read the index entry records
multiset<IndexEntry>& topic_index = topic_indexes_[topic];
void Bag::readConnectionIndexRecord200() {
ros::Header header;
uint32_t data_size;
if (!readHeader(header) || !readDataLength(data_size))
throw BagFormatException("Error reading INDEX_DATA header");
M_string& fields = *header.getValues();
if (!isOp(fields, OP_INDEX_DATA))
throw BagFormatException("Expected INDEX_DATA record");
uint32_t index_version;
uint32_t connection_id;
uint32_t count;
readField(fields, VER_FIELD_NAME, true, &index_version);
readField(fields, CONNECTION_FIELD_NAME, true, &connection_id);
readField(fields, COUNT_FIELD_NAME, true, &count);
ROS_INFO("Read INDEX_DATA: ver=%d connection=%d count=%d", index_version, connection_id, count);
if (index_version != 1)
throw BagFormatException((format("Unsupported INDEX_DATA version: %1%") % index_version).str());
uint64_t chunk_pos = curr_chunk_info_.pos;
ConnectionInfo* connection_info = connections_[connection_id];
multiset<IndexEntry>& connection_index = connection_indexes_[connection_id];
string const& topic = connection_info->topic;
multiset<IndexEntry>& topic_index = topic_indexes_[topic];
for (uint32_t i = 0; i < count; i++) {
IndexEntry index_entry;
index_entry.chunk_pos = chunk_pos;
@ -551,8 +567,9 @@ void Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64
read((char*) &index_entry.offset, 4);
index_entry.time = Time(sec, nsec);
ROS_DEBUG(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
ROS_INFO(" - %d.%d: %llu+%d", sec, nsec, (unsigned long long) index_entry.chunk_pos, index_entry.offset);
connection_index.insert(connection_index.end(), index_entry);
topic_index.insert(topic_index.end(), index_entry);
}
}
@ -562,14 +579,14 @@ void Bag::readTopicIndexDataVersion1(uint32_t count, string const& topic, uint64
void Bag::writeConnectionRecords() {
boost::mutex::scoped_lock lock(record_mutex_);
for (map<uint32_t, ConnectionInfo*>::const_iterator i = connection_infos_.begin(); i != connection_infos_.end(); i++) {
for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
ConnectionInfo const* connection_info = i->second;
writeConnectionRecord(connection_info);
}
}
void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) {
ROS_DEBUG("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
ROS_INFO("Writing CONNECTION [%llu:%d]: topic=%s id=%d",
(unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id);
M_string header;
@ -609,38 +626,24 @@ void Bag::readConnectionRecord() {
if (!readHeader(connection_header))
throw BagFormatException("Error reading connection header");
// If this is a new connection, update connection_infos_
map<uint32_t, ConnectionInfo*>::iterator key = connection_infos_.find(id);
if (key == connection_infos_.end()) {
// If this is a new connection, update connections
map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
if (key == connections_.end()) {
ConnectionInfo* connection_info = new ConnectionInfo();
connection_info->id = id;
connection_info->topic = topic;
connection_info->id = id;
connection_info->topic = topic;
for (M_string::const_iterator i = connection_header.getValues()->begin(); i != connection_header.getValues()->end(); i++)
connection_info->header[i->first] = i->second;
connection_infos_[id] = connection_info;
connection_info->msg_def = connection_info->header["message_definition"];
connection_info->datatype = connection_info->header["datatype"];
connection_info->md5sum = connection_info->header["md5sum"];
connections_[id] = connection_info;
ROS_DEBUG("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
// If this is a new topic, update topic_infos_
map<string, TopicInfo*>::iterator key = topic_infos_.find(topic);
if (key == topic_infos_.end()) {
string msg_def = connection_info->header["message_definition"];
string datatype = connection_info->header["datatype"];
string md5sum = connection_info->header["md5sum"];
TopicInfo* topic_info = new TopicInfo();
topic_info->topic = topic;
topic_info->msg_def = msg_def;
topic_info->datatype = datatype;
topic_info->md5sum = md5sum;
topic_infos_[topic] = topic_info;
ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
}
ROS_INFO("Read CONNECTION: topic=%s id=%d", topic.c_str(), id);
}
}
void Bag::readMessageDefinitionRecord() {
void Bag::readMessageDefinitionRecord102() {
ros::Header header;
uint32_t data_size;
if (!readHeader(header) || !readDataLength(data_size))
@ -656,17 +659,20 @@ void Bag::readMessageDefinitionRecord() {
readField(fields, TYPE_FIELD_NAME, true, datatype);
readField(fields, DEF_FIELD_NAME, 0, UINT_MAX, true, message_definition);
map<string, TopicInfo*>::iterator key = topic_infos_.find(topic);
if (key == topic_infos_.end()) {
TopicInfo* topic_info = new TopicInfo();
topic_info->topic = topic;
topic_info->msg_def = message_definition;
topic_info->datatype = datatype;
topic_info->md5sum = md5sum;
topic_infos_[topic] = topic_info;
/*
map<uint32_t, ConnectionInfo*>::iterator key = connections_.find(id);
if (key == connections_.end()) {
ConnectionInfo* connection_info = new ConnectionInfo();
connection_info->id = id;
connection_info->topic = topic;
connection_info->msg_def = message_definition;
connection_info->datatype = datatype;
connection_info->md5sum = md5sum;
connections_[id] = connection_info;
ROS_DEBUG("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
ROS_INFO("Read MSG_DEF: topic=%s md5sum=%s datatype=%s", topic.c_str(), md5sum.c_str(), datatype.c_str());
}
*/
}
void Bag::decompressChunk(uint64_t chunk_pos) {
@ -700,7 +706,7 @@ void Bag::decompressChunk(uint64_t chunk_pos) {
}
void Bag::loadMessageDataRecord102(uint64_t offset) {
ROS_DEBUG("loadMessageDataRecord: offset=%llu", (unsigned long long) offset);
ROS_INFO("loadMessageDataRecord: offset=%llu", (unsigned long long) offset);
seek(offset);
@ -727,7 +733,7 @@ void Bag::decompressRawChunk(ChunkHeader const& chunk_header) {
assert(chunk_header.compression == COMPRESSION_NONE);
assert(chunk_header.compressed_size == chunk_header.uncompressed_size);
ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
ROS_INFO("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
decompress_buffer_.setSize(chunk_header.compressed_size);
file_.read((char*) decompress_buffer_.getData(), chunk_header.compressed_size);
@ -740,7 +746,7 @@ void Bag::decompressBz2Chunk(ChunkHeader const& chunk_header) {
CompressionType compression = compression::BZ2;
ROS_DEBUG("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
ROS_INFO("compressed_size: %d uncompressed_size: %d", chunk_header.compressed_size, chunk_header.uncompressed_size);
chunk_buffer_.setSize(chunk_header.compressed_size);
file_.read((char*) chunk_buffer_.getData(), chunk_header.compressed_size);
@ -793,7 +799,7 @@ uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) {
void Bag::writeChunkInfoRecords() {
boost::mutex::scoped_lock lock(record_mutex_);
foreach(ChunkInfo const& chunk_info, chunk_infos_) {
foreach(ChunkInfo const& chunk_info, chunks_) {
// Write the chunk info header
M_string header;
uint32_t chunk_topic_count = chunk_info.topic_counts.size();
@ -811,7 +817,7 @@ void Bag::writeChunkInfoRecords() {
data_len += 4 + topic.size() + 4; // 4 bytes for length of topic_name + topic_name + 4 bytes for topic count
}
ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d data_len=%d",
ROS_INFO("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d data_len=%d",
(unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
chunk_info.start_time.sec, chunk_info.start_time.nsec,
chunk_info.end_time.sec, chunk_info.end_time.nsec,
@ -832,7 +838,7 @@ void Bag::writeChunkInfoRecords() {
write(topic);
write((char*) &count, 4);
ROS_DEBUG(" - %s: %d", topic.c_str(), count);
ROS_INFO(" - %s: %d", topic.c_str(), count);
}
}
}
@ -861,7 +867,7 @@ void Bag::readChunkInfoRecord() {
uint32_t chunk_topic_count;
readField(fields, COUNT_FIELD_NAME, true, &chunk_topic_count);
ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu topic_count=%d start=%d.%d end=%d.%d",
ROS_INFO("Read CHUNK_INFO: chunk_pos=%llu topic_count=%d start=%d.%d end=%d.%d",
(unsigned long long) chunk_info.pos, chunk_topic_count,
chunk_info.start_time.sec, chunk_info.start_time.nsec,
chunk_info.end_time.sec, chunk_info.end_time.nsec);
@ -877,12 +883,12 @@ void Bag::readChunkInfoRecord() {
string topic = string(topic_name_buf, topic_name_len);
ROS_DEBUG(" %s: %d messages", topic.c_str(), topic_count);
ROS_INFO(" %s: %d messages", topic.c_str(), topic_count);
chunk_info.topic_counts[topic] = topic_count;
}
chunk_infos_.push_back(chunk_info);
chunks_.push_back(chunk_info);
}
// Record I/O
@ -958,7 +964,7 @@ void Bag::readMessageDataHeaderFromBuffer(Buffer& buffer, uint32_t offset, ros::
total_bytes_read = 0;
uint8_t op;
do {
ROS_DEBUG("reading header from buffer: offset=%d", offset);
ROS_INFO("reading header from buffer: offset=%d", offset);
uint32_t bytes_read;
readHeaderFromBuffer(*current_buffer_, offset, header, data_size, bytes_read);
@ -1043,12 +1049,12 @@ void Bag::dump() {
std::cout << "chunk_open: " << chunk_open_ << std::endl;
std::cout << "curr_chunk_info: " << curr_chunk_info_.topic_counts.size() << " topics" << std::endl;
std::cout << "topic_infos:" << std::endl;
for (map<string, TopicInfo*>::const_iterator i = topic_infos_.begin(); i != topic_infos_.end(); i++)
std::cout << " topic: " << i->first << std::endl;
std::cout << "connections:" << std::endl;
for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++)
std::cout << " connection: " << i->first << " " << i->second->topic << std::endl;
std::cout << "chunk_infos:" << std::endl;
for (vector<ChunkInfo>::const_iterator i = chunk_infos_.begin(); i != chunk_infos_.end(); i++) {
std::cout << "chunks:" << std::endl;
for (vector<ChunkInfo>::const_iterator i = chunks_.begin(); i != chunks_.end(); i++) {
std::cout << " chunk: " << (*i).topic_counts.size() << " topics" << std::endl;
for (map<string, uint32_t>::const_iterator j = (*i).topic_counts.begin(); j != (*i).topic_counts.end(); j++) {
std::cout << " - " << j->first << ": " << j->second << std::endl;

View File

@ -32,38 +32,17 @@ using ros::Time;
namespace rosbag {
MessageInstance::MessageInstance(TopicInfo const* topic_info, IndexEntry const& index_entry, Bag& bag) :
topic_info_(topic_info), index_entry_(index_entry), bag_(&bag) { }
string const& MessageInstance::getTopic() const { return topic_info_->topic; }
string const& MessageInstance::getDataType() const { return topic_info_->datatype; }
string const& MessageInstance::getMD5Sum() const { return topic_info_->md5sum; }
string const& MessageInstance::getMessageDefinition() const { return topic_info_->msg_def; }
Time const& MessageInstance::getTime() const { return index_entry_.time; }
// @todo: this should cache the header
bool MessageInstance::getLatching() const {
ros::Header header = bag_->readMessageDataHeader(index_entry_);
ros::M_string& fields = *header.getValues();
ros::M_string::iterator latch_iter = fields.find(string("latching"));
if (latch_iter != fields.end() && latch_iter->second != string("0"))
return true;
else
return false;
MessageInstance::MessageInstance(ConnectionInfo const* connection_info, IndexEntry const& index_entry, Bag& bag) :
connection_info_(connection_info), index_entry_(index_entry), bag_(&bag)
{
}
// @todo: this should cache the header
std::string MessageInstance::getCallerId() const {
ros::Header header = bag_->readMessageDataHeader(index_entry_);
ros::M_string& fields = *header.getValues();
ros::M_string::iterator callerid_iter = fields.find(string("callerid"));
if (callerid_iter != fields.end())
return callerid_iter->second;
else
return std::string("");
}
string const& MessageInstance::getTopic() const { return connection_info_->topic; }
string const& MessageInstance::getDataType() const { return connection_info_->datatype; }
string const& MessageInstance::getMD5Sum() const { return connection_info_->md5sum; }
string const& MessageInstance::getMessageDefinition() const { return connection_info_->msg_def; }
ros::M_string const& MessageInstance::getConnectionHeader() const { return connection_info_->header; }
Time const& MessageInstance::getTime() const { return index_entry_.time; }
uint32_t MessageInstance::size() const {
return bag_->readMessageDataSize(index_entry_);
@ -73,4 +52,4 @@ ros::AdvertiseOptions createAdvertiseOptions(MessageInstance const& m, uint32_t
return ros::AdvertiseOptions(m.getTopic(), queue_size, m.getMD5Sum(), m.getDataType(), m.getMessageDefinition());
}
}
} // namespace rosbag

View File

@ -163,13 +163,22 @@ void Player::doPublish(MessageInstance const& m) {
ros::Time const& time = m.getTime();
// Make a unique id composed of the callerid and the topic allowing separate advertisers for separate latching topics
string callerid = m.getCallerId();
ros::M_string const& header = m.getConnectionHeader();
ros::M_string::const_iterator header_iter = header.find("callerid");
string callerid;
if (header_iter != header.end())
callerid = header_iter->second;
string callerid_topic = callerid + topic;
map<string, ros::Publisher>::iterator pub_iter = publishers_.find(callerid_topic);
if (pub_iter == publishers_.end()) {
ros::AdvertiseOptions opts = createAdvertiseOptions(m, options_.queue_size);
opts.latch = m.getLatching();
ros::M_string::const_iterator header_iter = header.find("latching");
opts.latch = (header_iter != header.end() && header_iter->second == "1");
ros::Publisher pub = node_handle_->advertise(opts);
publishers_.insert(publishers_.begin(), pair<string, ros::Publisher>(callerid_topic, pub));

View File

@ -51,7 +51,7 @@ Query::~Query() { }
ros::Time Query::getStartTime() const { return start_time_; }
ros::Time Query::getEndTime() const { return end_time_; }
bool Query::evaluate(TopicInfo const*) const { return true; }
bool Query::evaluate(ConnectionInfo const*) const { return true; }
Query* Query::clone() const { return new Query(*this); }
@ -68,7 +68,7 @@ TopicQuery::TopicQuery(std::vector<std::string> const& topics, ros::Time const&
{
}
bool TopicQuery::evaluate(TopicInfo const* info) const {
bool TopicQuery::evaluate(ConnectionInfo const* info) const {
foreach(string const& topic, topics_)
if (topic == info->topic)
return true;
@ -91,7 +91,7 @@ TypeQuery::TypeQuery(std::vector<std::string> const& types, ros::Time const& sta
{
}
bool TypeQuery::evaluate(TopicInfo const* info) const {
bool TypeQuery::evaluate(ConnectionInfo const* info) const {
foreach(string const& type, types_)
if (type == info->datatype)
return true;
@ -115,9 +115,9 @@ BagQuery::~BagQuery() {
MessageRange::MessageRange(std::multiset<IndexEntry>::const_iterator const& _begin,
std::multiset<IndexEntry>::const_iterator const& _end,
TopicInfo const* _topic_info,
ConnectionInfo const* _connection_info,
BagQuery const* _bag_query)
: begin(_begin), end(_end), topic_info(_topic_info), bag_query(_bag_query)
: begin(_begin), end(_end), connection_info(_connection_info), bag_query(_bag_query)
{
}
@ -132,4 +132,4 @@ bool ViewIterHelperCompare::operator()(ViewIterHelper const& a, ViewIterHelper c
return (a.iter)->time > (b.iter)->time;
}
}
} // namespace rosbag

View File

@ -283,7 +283,7 @@ void Recorder::startWriting() {
bag_.open(write_filename_, bagmode::Write);
}
catch (rosbag::BagException e) {
ROS_ERROR(e.what());
ROS_ERROR("Error writing: %s", e.what());
exit_code_ = 1;
ros::shutdown();
}
@ -367,7 +367,7 @@ void Recorder::doRecordSnapshotter() {
bag_.open(write_filename, bagmode::Write);
}
catch (rosbag::BagException ex) {
ROS_ERROR(ex.what());
ROS_ERROR("Error writing: %s", ex.what());
return;
}

View File

@ -61,8 +61,7 @@ void View::iterator::populate() {
void View::iterator::populateSeek(multiset<IndexEntry>::const_iterator iter) {
iters_.clear();
foreach(MessageRange const* range, view_->ranges_) {
multiset<IndexEntry>::const_iterator start = std::lower_bound(range->begin,
range->end, iter->time, IndexEntryCompare());
multiset<IndexEntry>::const_iterator start = std::lower_bound(range->begin, range->end, iter->time, IndexEntryCompare());
if (start != range->end)
iters_.push_back(ViewIterHelper(start, range));
}
@ -105,8 +104,16 @@ void View::iterator::increment() {
//! \todo some check in case we are at end
MessageInstance View::iterator::dereference() const {
ROS_ASSERT(iters_.size() > 0);
ViewIterHelper const& i = iters_.back();
return MessageInstance(i.range->topic_info, *(i.iter), *(i.range->bag_query->bag));
ROS_ASSERT(i.range != NULL);
ROS_ASSERT(i.range->connection_info != NULL);
ROS_ASSERT(i.range->bag_query != NULL);
ROS_ASSERT(i.range->bag_query->bag != NULL);
return MessageInstance(i.range->connection_info, *(i.iter), *(i.range->bag_query->bag));
}
// View
@ -140,22 +147,24 @@ void View::addQuery(Bag& bag, Query const& query) {
}
void View::updateQueries(BagQuery* q) {
for (map<string, TopicInfo*>::iterator i = q->bag->topic_infos_.begin(); i != q->bag->topic_infos_.end(); i++) {
for (map<uint32_t, ConnectionInfo*>::iterator i = q->bag->connections_.begin(); i != q->bag->connections_.end(); i++) {
ConnectionInfo const* connection = i->second;
// Skip if the query doesn't evaluate to true
if (!q->query->evaluate(i->second))
if (!q->query->evaluate(connection))
continue;
map<string, multiset<IndexEntry> >::iterator j = q->bag->topic_indexes_.find(i->second->topic);
map<uint32_t, multiset<IndexEntry> >::iterator j = q->bag->connection_indexes_.find(connection->id);
// Skip if the bag doesn't have the corresponding index
if (j == q->bag->topic_indexes_.end())
if (j == q->bag->connection_indexes_.end())
continue;
multiset<IndexEntry>& index = j->second;
// lower_bound/upper_bound do a binary search to find the appropriate range of Index Entries given our time range
std::multiset<IndexEntry>::const_iterator begin = std::lower_bound(j->second.begin(), j->second.end(), q->query->getStartTime(), IndexEntryCompare());
std::multiset<IndexEntry>::const_iterator end = std::upper_bound(j->second.begin(), j->second.end(), q->query->getEndTime(), IndexEntryCompare());
TopicInfo* topic_info = i->second;
std::multiset<IndexEntry>::const_iterator begin = std::lower_bound(index.begin(), index.end(), q->query->getStartTime(), IndexEntryCompare());
std::multiset<IndexEntry>::const_iterator end = std::upper_bound(index.begin(), index.end(), q->query->getEndTime(), IndexEntryCompare());
// todo: this could be made faster with a map of maps
bool found = false;
@ -163,7 +172,7 @@ void View::updateQueries(BagQuery* q) {
MessageRange* r = *k;
// If the topic and query are already in our ranges, we update
if (r->bag_query == q && r->topic_info->topic == topic_info->topic) {
if (r->bag_query == q && r->connection_info->topic == connection->topic) {
r->begin = begin;
r->end = end;
found = true;
@ -171,7 +180,7 @@ void View::updateQueries(BagQuery* q) {
}
}
if (!found)
ranges_.push_back(new MessageRange(begin, end, topic_info, q));
ranges_.push_back(new MessageRange(begin, end, connection, q));
}
view_revision_++;