From bd0a2a0f3f3f19763ccab0b19de8291f264405d2 Mon Sep 17 00:00:00 2001 From: Tim Field Date: Fri, 7 May 2010 09:22:48 +0000 Subject: [PATCH] rosbag: removing mutexes; removed debugging dump() --- tools/rosbag/src/bag.cpp | 110 +++++++++++---------------------------- 1 file changed, 29 insertions(+), 81 deletions(-) diff --git a/tools/rosbag/src/bag.cpp b/tools/rosbag/src/bag.cpp index b09b2a3e..d3714084 100644 --- a/tools/rosbag/src/bag.cpp +++ b/tools/rosbag/src/bag.cpp @@ -93,20 +93,13 @@ void Bag::open(string const& filename, uint32_t mode) { mode_ = (BagMode) mode; if (mode_ & bagmode::Append) - { - openAppend(filename); - } + openAppend(filename); else if (mode_ & bagmode::Write) - { - openWrite(filename); - } - else if (mode_ & bagmode::Read) { - openRead(filename); - } + openWrite(filename); + else if (mode_ & bagmode::Read) + openRead(filename); else - { throw BagException((format("Unknown mode: %1%") % (int) mode).str()); - } // Determine file size uint64_t offset = file_.getOffset(); @@ -117,8 +110,7 @@ void Bag::open(string const& filename, uint32_t mode) { void Bag::openRead(string const& filename) { - if (!file_.openRead(filename)) - throw BagIOException((format("Failed to open file: %1%") % filename.c_str()).str()); + file_.openRead(filename); readVersion(); @@ -131,15 +123,13 @@ void Bag::openRead(string const& filename) { } void Bag::openWrite(string const& filename) { - if (!file_.openWrite(filename)) - throw BagIOException("Failed to open file: " + filename); + file_.openWrite(filename); startWriting(); } void Bag::openAppend(string const& filename) { - if (!file_.openReadWrite(filename)) - throw BagIOException("Failed to open file: " + filename); + file_.openReadWrite(filename); readVersion(); startReadingVersion200(); @@ -163,18 +153,7 @@ void Bag::close() { if (mode_ & bagmode::Write || mode_ & bagmode::Append) closeWrite(); - // Unfortunately closing this possibly enormous file takes a while - // (especially over NFS) and handling of a SIGINT while a file is - // closing leads to a double free. So, we disable signals while - // we close the file. - - // Darwin doesn't have sighandler_t; I hope that sig_t on Linux does - // the right thing. - //sighandler_t old = signal(SIGINT, SIG_IGN); - - sig_t old = signal(SIGINT, SIG_IGN); file_.close(); - signal(SIGINT, old); } void Bag::closeWrite() { @@ -225,10 +204,6 @@ void Bag::readVersion() { if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3) throw BagIOException("Error reading version line"); - // Special case - if (version_major == 0 && version_line[0] == '#') - version_major = 1; - version_ = version_major * 100 + version_minor; ROS_DEBUG("Read VERSION: version=%d", version_); @@ -292,10 +267,15 @@ void Bag::startReadingVersion200() { curr_chunk_info_ = ChunkInfo(); } -// @todo: handle 1.2 bags with no index void Bag::startReadingVersion102() { - // Read the file header record, which points to the start of the topic indexes - readFileHeaderRecord(); + try + { + // Read the file header record, which points to the start of the topic indexes + readFileHeaderRecord(); + } + catch (BagFormatException ex) { + throw BagUnindexedException(); + } // Get the length of the file seek(0, std::ios::end); @@ -324,8 +304,6 @@ void Bag::startReadingVersion102() { // File header record void Bag::writeFileHeaderRecord() { - boost::mutex::scoped_lock lock(record_mutex_); - connection_count_ = connections_.size(); chunk_count_ = chunks_.size(); @@ -371,6 +349,9 @@ void Bag::readFileHeaderRecord() { // Read index position readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_); + if (index_data_pos_ == 0) + throw BagUnindexedException(); + // Read topic and chunks count if (version_ >= 200) { readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_); @@ -477,8 +458,6 @@ void Bag::readChunkHeader(ChunkHeader& chunk_header) const { // Index records void Bag::writeIndexRecords() { - boost::mutex::scoped_lock lock(record_mutex_); - for (map >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) { uint32_t connection_id = i->first; multiset const& index = i->second; @@ -608,8 +587,6 @@ void Bag::readConnectionIndexRecord200() { // Connection records void Bag::writeConnectionRecords() { - boost::mutex::scoped_lock lock(record_mutex_); - for (map::const_iterator i = connections_.begin(); i != connections_.end(); i++) { ConnectionInfo const* connection_info = i->second; writeConnectionRecord(connection_info); @@ -748,8 +725,8 @@ void Bag::decompressChunk(uint64_t chunk_pos) const { decompressed_chunk_ = chunk_pos; } -void Bag::loadMessageDataRecord102(uint64_t offset, ros::Header& header) const { - ROS_DEBUG("loadMessageDataRecord: offset=%llu", (unsigned long long) offset); +void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const { + ROS_DEBUG("readMessageDataRecord: offset=%llu", (unsigned long long) offset); seek(offset); @@ -810,7 +787,7 @@ ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) { readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read); return header; case 102: - loadMessageDataRecord102(index_entry.chunk_pos, header); + readMessageDataRecord102(index_entry.chunk_pos, header); return header; default: throw BagFormatException((format("Unhandled version: %1%") % version_).str()); @@ -829,7 +806,7 @@ uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const { readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read); return data_size; case 102: - loadMessageDataRecord102(index_entry.chunk_pos, header); + readMessageDataRecord102(index_entry.chunk_pos, header); return data_size; default: throw BagFormatException((format("Unhandled version: %1%") % version_).str()); @@ -837,8 +814,6 @@ uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const { } void Bag::writeChunkInfoRecords() { - boost::mutex::scoped_lock lock(record_mutex_); - foreach(ChunkInfo const& chunk_info, chunks_) { // Write the chunk info header M_string header; @@ -1068,6 +1043,13 @@ std::string Bag::toHeaderString(Time const* field) const { return toHeaderString(&packed_time); } +ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) { + ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def); + ros::M_string::const_iterator header_iter = c->header->find("latching"); + opts.latch = (header_iter != c->header->end() && header_iter->second == "1"); + return opts; +} + // Low-level I/O void Bag::write(string const& s) { write(s.c_str(), s.length()); } @@ -1076,38 +1058,4 @@ void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n); void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); } void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); } -// Debugging - -void Bag::dump() { - std::cout << "chunk_open: " << chunk_open_ << std::endl; - std::cout << "curr_chunk_info: " << curr_chunk_info_.connection_counts.size() << " connections" << std::endl; - - std::cout << "connections:" << std::endl; - for (map::const_iterator i = connections_.begin(); i != connections_.end(); i++) - std::cout << " connection: " << i->first << " " << i->second->topic << std::endl; - - std::cout << "chunks:" << std::endl; - for (vector::const_iterator i = chunks_.begin(); i != chunks_.end(); i++) { - std::cout << " chunk: " << (*i).connection_counts.size() << " connections" << std::endl; - for (map::const_iterator j = (*i).connection_counts.begin(); j != (*i).connection_counts.end(); j++) { - std::cout << " - " << j->first << ": " << j->second << std::endl; - } - } - - std::cout << "connection_indexes:" << std::endl; - for (map >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) { - std::cout << " connection: " << i->first << std::endl; - for (multiset::const_iterator j = i->second.begin(); j != i->second.end(); j++) { - std::cout << " - " << j->chunk_pos << ":" << j->offset << std::endl; - } - } -} - -ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) { - ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def); - ros::M_string::const_iterator header_iter = c->header->find("latching"); - opts.latch = (header_iter != c->header->end() && header_iter->second == "1"); - return opts; -} - } // namespace rosbag