rosbag: OP_CHUNK_INFO records now store counts by connection, not by topic

This commit is contained in:
Tim Field 2010-04-29 02:39:08 +00:00
parent 9a64f4b13f
commit 1edcd89758
4 changed files with 30 additions and 51 deletions

View File

@ -211,10 +211,10 @@ chunk_info_1
end=end_stamp_1
count=K
---
len(topic_1), topic_1, count_1
len(topic_2), topic_2, count_2
conn_1, count_1
conn_2, count_2
...
len(topic_K), topic_K, count_K
conn_K, count_K
chunk_info_2
...

View File

@ -402,11 +402,6 @@ void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg,
connection_info->md5sum = std::string(ros::message_traits::md5sum(msg));
connection_info->msg_def = std::string(ros::message_traits::definition(msg));
ROS_DEBUG("NEW CONNECTION");
ROS_DEBUG(" datatype: %s", ros::message_traits::datatype(msg));
ROS_DEBUG(" md5sum: %s", ros::message_traits::md5sum(msg));
ROS_DEBUG(" msg_def: %s", ros::message_traits::definition(msg));
if (connection_header != NULL) {
for (ros::M_string::const_iterator i = connection_header->begin(); i != connection_header->end(); i++)
connection_info->header[i->first] = i->second;
@ -438,8 +433,8 @@ void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg,
std::multiset<IndexEntry>& index = topic_indexes_[topic];
index.insert(index.end(), index_entry);
// Increment the topic count
curr_chunk_info_.topic_counts[topic]++;
// Increment the connection count
curr_chunk_info_.connection_counts[connection_info->id]++;
// Write the message data
writeMessageDataRecord(conn_id, time, msg);

View File

@ -60,7 +60,7 @@ struct ChunkInfo
ros::Time end_time; //! latest timestamp of a message in the chunk
uint64_t pos; //! absolute byte offset of chunk record in bag file
std::map<std::string, uint32_t> topic_counts; //! number of messages in each topic stored in the chunk
std::map<uint32_t, uint32_t> connection_counts; //! number of messages in each connection stored in the chunk
};
struct ChunkHeader

View File

@ -267,7 +267,7 @@ void Bag::startReadingVersion200() {
seek(chunk_header.compressed_size, std::ios::cur);
// Read the index records after the chunk
for (unsigned int i = 0; i < chunk_info.topic_counts.size(); i++)
for (unsigned int i = 0; i < chunk_info.connection_counts.size(); i++)
readConnectionIndexRecord200();
}
@ -802,43 +802,32 @@ void Bag::writeChunkInfoRecords() {
foreach(ChunkInfo const& chunk_info, chunks_) {
// Write the chunk info header
M_string header;
uint32_t chunk_topic_count = chunk_info.topic_counts.size();
uint32_t chunk_connection_count = chunk_info.connection_counts.size();
header[OP_FIELD_NAME] = toHeaderString(&OP_CHUNK_INFO);
header[VER_FIELD_NAME] = toHeaderString(&CHUNK_INFO_VERSION);
header[CHUNK_POS_FIELD_NAME] = toHeaderString(&chunk_info.pos);
header[START_TIME_FIELD_NAME] = toHeaderString(&chunk_info.start_time);
header[END_TIME_FIELD_NAME] = toHeaderString(&chunk_info.end_time);
header[COUNT_FIELD_NAME] = toHeaderString(&chunk_topic_count);
header[COUNT_FIELD_NAME] = toHeaderString(&chunk_connection_count);
// Measure length of data
uint32_t data_len = 0;
for (map<string, uint32_t>::const_iterator i = chunk_info.topic_counts.begin(); i != chunk_info.topic_counts.end(); i++) {
string const& topic = i->first;
data_len += 4 + topic.size() + 4; // 4 bytes for length of topic_name + topic_name + 4 bytes for topic count
}
ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d data_len=%d",
ROS_DEBUG("Writing CHUNK_INFO [%llu]: ver=%d pos=%llu start=%d.%d end=%d.%d",
(unsigned long long) file_.getOffset(), CHUNK_INFO_VERSION, (unsigned long long) chunk_info.pos,
chunk_info.start_time.sec, chunk_info.start_time.nsec,
chunk_info.end_time.sec, chunk_info.end_time.nsec,
data_len);
chunk_info.end_time.sec, chunk_info.end_time.nsec);
writeHeader(header);
writeDataLength(data_len);
writeDataLength(8 * chunk_connection_count);
// Write the topic names and counts
for (map<string, uint32_t>::const_iterator i = chunk_info.topic_counts.begin(); i != chunk_info.topic_counts.end(); i++) {
string const& topic = i->first;
uint32_t count = i->second;
for (map<uint32_t, uint32_t>::const_iterator i = chunk_info.connection_counts.begin(); i != chunk_info.connection_counts.end(); i++) {
uint32_t connection_id = i->first;
uint32_t count = i->second;
uint32_t topic_name_size = topic.size();
write((char*) &topic_name_size, 4);
write(topic);
write((char*) &connection_id, 4);
write((char*) &count, 4);
ROS_DEBUG(" - %s: %d", topic.c_str(), count);
ROS_DEBUG(" - %d: %d", connection_id, count);
}
}
}
@ -864,28 +853,23 @@ void Bag::readChunkInfoRecord() {
readField(fields, CHUNK_POS_FIELD_NAME, true, &chunk_info.pos);
readField(fields, START_TIME_FIELD_NAME, true, chunk_info.start_time);
readField(fields, END_TIME_FIELD_NAME, true, chunk_info.end_time);
uint32_t chunk_topic_count;
readField(fields, COUNT_FIELD_NAME, true, &chunk_topic_count);
uint32_t chunk_connection_count;
readField(fields, COUNT_FIELD_NAME, true, &chunk_connection_count);
ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu topic_count=%d start=%d.%d end=%d.%d",
(unsigned long long) chunk_info.pos, chunk_topic_count,
ROS_DEBUG("Read CHUNK_INFO: chunk_pos=%llu connection_count=%d start=%d.%d end=%d.%d",
(unsigned long long) chunk_info.pos, chunk_connection_count,
chunk_info.start_time.sec, chunk_info.start_time.nsec,
chunk_info.end_time.sec, chunk_info.end_time.nsec);
// Read the topic count entries
char topic_name_buf[4096];
for (uint32_t i = 0; i < chunk_topic_count; i ++) {
uint32_t topic_name_len;
uint32_t topic_count;
read((char*) &topic_name_len, 4);
read((char*) &topic_name_buf, topic_name_len);
read((char*) &topic_count, 4);
for (uint32_t i = 0; i < chunk_connection_count; i ++) {
uint32_t connection_id, connection_count;
read((char*) &connection_id, 4);
read((char*) &connection_count, 4);
string topic = string(topic_name_buf, topic_name_len);
ROS_DEBUG(" %d: %d messages", connection_id, connection_count);
ROS_DEBUG(" %s: %d messages", topic.c_str(), topic_count);
chunk_info.topic_counts[topic] = topic_count;
chunk_info.connection_counts[connection_id] = connection_count;
}
chunks_.push_back(chunk_info);
@ -1047,7 +1031,7 @@ void Bag::seek(uint64_t pos, int origin) { file_.seek(pos, origin);
void Bag::dump() {
std::cout << "chunk_open: " << chunk_open_ << std::endl;
std::cout << "curr_chunk_info: " << curr_chunk_info_.topic_counts.size() << " topics" << std::endl;
std::cout << "curr_chunk_info: " << curr_chunk_info_.connection_counts.size() << " connections" << std::endl;
std::cout << "connections:" << std::endl;
for (map<uint32_t, ConnectionInfo*>::const_iterator i = connections_.begin(); i != connections_.end(); i++)
@ -1055,8 +1039,8 @@ void Bag::dump() {
std::cout << "chunks:" << std::endl;
for (vector<ChunkInfo>::const_iterator i = chunks_.begin(); i != chunks_.end(); i++) {
std::cout << " chunk: " << (*i).topic_counts.size() << " topics" << std::endl;
for (map<string, uint32_t>::const_iterator j = (*i).topic_counts.begin(); j != (*i).topic_counts.end(); j++) {
std::cout << " chunk: " << (*i).connection_counts.size() << " connections" << std::endl;
for (map<uint32_t, uint32_t>::const_iterator j = (*i).connection_counts.begin(); j != (*i).connection_counts.end(); j++) {
std::cout << " - " << j->first << ": " << j->second << std::endl;
}
}