rosbag: removing mutexes; removed debugging dump()

This commit is contained in:
Tim Field 2010-05-07 09:22:48 +00:00
parent 0d12b171f8
commit bd0a2a0f3f
1 changed files with 29 additions and 81 deletions

View File

@ -93,20 +93,13 @@ void Bag::open(string const& filename, uint32_t mode) {
mode_ = (BagMode) mode;
if (mode_ & bagmode::Append)
{
openAppend(filename);
}
openAppend(filename);
else if (mode_ & bagmode::Write)
{
openWrite(filename);
}
else if (mode_ & bagmode::Read) {
openRead(filename);
}
openWrite(filename);
else if (mode_ & bagmode::Read)
openRead(filename);
else
{
throw BagException((format("Unknown mode: %1%") % (int) mode).str());
}
// Determine file size
uint64_t offset = file_.getOffset();
@ -117,8 +110,7 @@ void Bag::open(string const& filename, uint32_t mode) {
void Bag::openRead(string const& filename) {
if (!file_.openRead(filename))
throw BagIOException((format("Failed to open file: %1%") % filename.c_str()).str());
file_.openRead(filename);
readVersion();
@ -131,15 +123,13 @@ void Bag::openRead(string const& filename) {
}
void Bag::openWrite(string const& filename) {
if (!file_.openWrite(filename))
throw BagIOException("Failed to open file: " + filename);
file_.openWrite(filename);
startWriting();
}
void Bag::openAppend(string const& filename) {
if (!file_.openReadWrite(filename))
throw BagIOException("Failed to open file: " + filename);
file_.openReadWrite(filename);
readVersion();
startReadingVersion200();
@ -163,18 +153,7 @@ void Bag::close() {
if (mode_ & bagmode::Write || mode_ & bagmode::Append)
closeWrite();
// Unfortunately closing this possibly enormous file takes a while
// (especially over NFS) and handling of a SIGINT while a file is
// closing leads to a double free. So, we disable signals while
// we close the file.
// Darwin doesn't have sighandler_t; I hope that sig_t on Linux does
// the right thing.
//sighandler_t old = signal(SIGINT, SIG_IGN);
sig_t old = signal(SIGINT, SIG_IGN);
file_.close();
signal(SIGINT, old);
}
void Bag::closeWrite() {
@ -225,10 +204,6 @@ void Bag::readVersion() {
if (sscanf(version_line.c_str(), "#ROS%s V%d.%d", logtypename, &version_major, &version_minor) != 3)
throw BagIOException("Error reading version line");
// Special case
if (version_major == 0 && version_line[0] == '#')
version_major = 1;
version_ = version_major * 100 + version_minor;
ROS_DEBUG("Read VERSION: version=%d", version_);
@ -292,10 +267,15 @@ void Bag::startReadingVersion200() {
curr_chunk_info_ = ChunkInfo();
}
// @todo: handle 1.2 bags with no index
void Bag::startReadingVersion102() {
// Read the file header record, which points to the start of the topic indexes
readFileHeaderRecord();
try
{
// Read the file header record, which points to the start of the topic indexes
readFileHeaderRecord();
}
catch (BagFormatException ex) {
throw BagUnindexedException();
}
// Get the length of the file
seek(0, std::ios::end);
@ -324,8 +304,6 @@ void Bag::startReadingVersion102() {
// File header record
void Bag::writeFileHeaderRecord() {
boost::mutex::scoped_lock lock(record_mutex_);
connection_count_ = connections_.size();
chunk_count_ = chunks_.size();
@ -371,6 +349,9 @@ void Bag::readFileHeaderRecord() {
// Read index position
readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
if (index_data_pos_ == 0)
throw BagUnindexedException();
// Read topic and chunks count
if (version_ >= 200) {
readField(fields, CONNECTION_COUNT_FIELD_NAME, true, &connection_count_);
@ -477,8 +458,6 @@ void Bag::readChunkHeader(ChunkHeader& chunk_header) const {
// Index records
void Bag::writeIndexRecords() {
boost::mutex::scoped_lock lock(record_mutex_);
for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = curr_chunk_connection_indexes_.begin(); i != curr_chunk_connection_indexes_.end(); i++) {
uint32_t connection_id = i->first;
multiset<IndexEntry> const& index = i->second;
@ -608,8 +587,6 @@ void Bag::readConnectionIndexRecord200() {
// Connection records
void Bag::writeConnectionRecords() {
boost::mutex::scoped_lock lock(record_mutex_);
for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++) {
ConnectionInfo const* connection_info = i->second;
writeConnectionRecord(connection_info);
@ -748,8 +725,8 @@ void Bag::decompressChunk(uint64_t chunk_pos) const {
decompressed_chunk_ = chunk_pos;
}
void Bag::loadMessageDataRecord102(uint64_t offset, ros::Header& header) const {
ROS_DEBUG("loadMessageDataRecord: offset=%llu", (unsigned long long) offset);
void Bag::readMessageDataRecord102(uint64_t offset, ros::Header& header) const {
ROS_DEBUG("readMessageDataRecord: offset=%llu", (unsigned long long) offset);
seek(offset);
@ -810,7 +787,7 @@ ros::Header Bag::readMessageDataHeader(IndexEntry const& index_entry) {
readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
return header;
case 102:
loadMessageDataRecord102(index_entry.chunk_pos, header);
readMessageDataRecord102(index_entry.chunk_pos, header);
return header;
default:
throw BagFormatException((format("Unhandled version: %1%") % version_).str());
@ -829,7 +806,7 @@ uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
readMessageDataHeaderFromBuffer(*current_buffer_, index_entry.offset, header, data_size, bytes_read);
return data_size;
case 102:
loadMessageDataRecord102(index_entry.chunk_pos, header);
readMessageDataRecord102(index_entry.chunk_pos, header);
return data_size;
default:
throw BagFormatException((format("Unhandled version: %1%") % version_).str());
@ -837,8 +814,6 @@ uint32_t Bag::readMessageDataSize(IndexEntry const& index_entry) const {
}
void Bag::writeChunkInfoRecords() {
boost::mutex::scoped_lock lock(record_mutex_);
foreach(ChunkInfo const& chunk_info, chunks_) {
// Write the chunk info header
M_string header;
@ -1068,6 +1043,13 @@ std::string Bag::toHeaderString(Time const* field) const {
return toHeaderString(&packed_time);
}
ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) {
ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
ros::M_string::const_iterator header_iter = c->header->find("latching");
opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
return opts;
}
// Low-level I/O
void Bag::write(string const& s) { write(s.c_str(), s.length()); }
@ -1076,38 +1058,4 @@ void Bag::write(char const* s, std::streamsize n) { file_.write((char*) s, n);
void Bag::read(char* b, std::streamsize n) const { file_.read(b, n); }
void Bag::seek(uint64_t pos, int origin) const { file_.seek(pos, origin); }
// Debugging
void Bag::dump() {
std::cout << "chunk_open: " << chunk_open_ << std::endl;
std::cout << "curr_chunk_info: " << curr_chunk_info_.connection_counts.size() << " connections" << std::endl;
std::cout << "connections:" << std::endl;
for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++)
std::cout << " connection: " << i->first << " " << i->second->topic << std::endl;
std::cout << "chunks:" << std::endl;
for (vector<ChunkInfo>::const_iterator i = chunks_.begin(); i != chunks_.end(); i++) {
std::cout << " chunk: " << (*i).connection_counts.size() << " connections" << std::endl;
for (map<uint32_t, uint32_t>::const_iterator j = (*i).connection_counts.begin(); j != (*i).connection_counts.end(); j++) {
std::cout << " - " << j->first << ": " << j->second << std::endl;
}
}
std::cout << "connection_indexes:" << std::endl;
for (map<uint32_t, multiset<IndexEntry> >::const_iterator i = connection_indexes_.begin(); i != connection_indexes_.end(); i++) {
std::cout << " connection: " << i->first << std::endl;
for (multiset<IndexEntry>::const_iterator j = i->second.begin(); j != i->second.end(); j++) {
std::cout << " - " << j->chunk_pos << ":" << j->offset << std::endl;
}
}
}
ros::AdvertiseOptions createAdvertiseOptions(const ConnectionInfo* c, uint32_t queue_size) {
ros::AdvertiseOptions opts(c->topic, queue_size, c->md5sum, c->datatype, c->msg_def);
ros::M_string::const_iterator header_iter = c->header->find("latching");
opts.latch = (header_iter != c->header->end() && header_iter->second == "1");
return opts;
}
} // namespace rosbag