rxplay: read internal bag file index

This commit is contained in:
Tim Field 2009-12-18 04:58:39 +00:00
parent 9d5aa24332
commit 25d2e3c2f3
4 changed files with 82 additions and 193 deletions

View File

@ -1,158 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
PKG = 'rxplay'
import roslib; roslib.load_manifest(PKG)
import rospy
import rosrecord
import os
## A utility class for reading ROS messages (optionally using an index)
class BagFile:
def __init__(self, path):
self.path = path
self._file = None
self._next_msg = None
## Iterate through the raw messages
def raw_messages(self):
if not self._file and not self._open():
return
f, next_msg = self._file, self._next_msg
while True:
try:
pos = f.tell()
topic, msg, t = next_msg(f, True)
if msg == None or rospy.is_shutdown():
break
yield pos, topic, msg, t
except rosrecord.ROSRecordException, e:
rospy.logerr('ROSRecordException: couldn\'t read msg at pos %d - %s' % (pos, e))
break
except IOError:
rospy.logerr('IOError: couldn\'t read msg at pos %d' % pos)
break
except KeyError:
rospy.logerr('KeyError: couldn\'t read msg at pos %d' % pos)
break
self._close()
## Returns (datatype, message, timestamp)
def load_message(self, pos, index=None):
msgs = list(self.load_messages([pos], index))
if len(msgs) == 0:
return (None, None, None)
return msgs[0]
## Returns iterator of (datatype, message, timestamp)
def load_messages(self, positions, index=None):
if not self._file and not self._open():
return
f, next_msg = self._file, self._next_msg
try:
for pos in positions:
f.seek(pos)
topic, raw_msg, t = next_msg(f, True)
if raw_msg:
(datatype, message_data, md5, bag_pos, pytype) = raw_msg
msg = pytype()
msg.deserialize(message_data)
if index:
index.datatype_defs_read.add(datatype)
yield (datatype, msg, t)
except rosrecord.ROSRecordException, e:
rospy.logerr('ROSRecordException: couldn\'t read %d - %s' % (pos, e))
except IOError:
rospy.logerr('IOError: couldn\'t read %d' % pos)
except KeyError:
rospy.logerr('KeyError: couldn\'t read %d' % pos)
def read_datatype_defs(self, index):
if not self._file and not self._open():
return
f, next_msg = self._file, self._next_msg
index.datatype_defs_read = set()
try:
for datatype, pos in index._data.datatype_first_pos.items():
f.seek(pos)
topic, raw_msg, t = next_msg(f, True)
if raw_msg:
(datatype, message_data, md5, bag_pos, pytype) = raw_msg
msg = pytype()
msg.deserialize(message_data)
index.datatype_defs_read.add(datatype)
except rosrecord.ROSRecordException, e:
rospy.logerr('ROSRecordException: couldn\'t read %d - %s' % (pos, e))
except IOError:
rospy.logerr('IOError: couldn\'t read %d' % pos)
except KeyError:
rospy.logerr('KeyError: couldn\'t read %d' % pos)
def _open(self):
# TODO: error handling
self._file, v = rosrecord.open_log_file(self.path)
self._next_msg = {
rosrecord.HEADER_V1_1 : rosrecord.next_msg_v1_1,
rosrecord.HEADER_V1_2 : rosrecord.next_msg_v1_2
}[v]
return True
def _close(self):
if not self._file:
return
self._file.close()
self._file = None
self._next_msg = None

View File

@ -47,7 +47,6 @@ import time
import numpy import numpy
from bag_file import BagFile
import util.progress_meter import util.progress_meter
class BagIndex: class BagIndex:
@ -86,7 +85,7 @@ class BagIndex:
return return
self._start_stamp = self._data.get_start_stamp() self._start_stamp = self._data.get_start_stamp()
self._end_stamp = self._data.get_end_stamp() self._end_stamp = self._data.get_end_stamp()
self._dirty_stats = False self._dirty_stats = False
## Add a record to the index ## Add a record to the index
@ -111,7 +110,7 @@ class BagIndex:
@staticmethod @staticmethod
def stamp_to_str(secs): def stamp_to_str(secs):
secs_frac = secs - int(secs) secs_frac = secs - int(secs)
secs_frac_str = ('%.2f' % secs_frac)[1:] secs_frac_str = ('%.2f' % secs_frac)[1:]
return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str return time.strftime('%b %d %Y %H:%M:%S', time.localtime(secs)) + secs_frac_str
@ -124,25 +123,25 @@ class BagIndex:
dur_secs = duration % 60 dur_secs = duration % 60
dur_mins = duration / 60 dur_mins = duration / 60
dur_hrs = dur_mins / 60 dur_hrs = dur_mins / 60
if dur_hrs > 0: if dur_hrs > 0:
dur_mins = dur_mins % 60 dur_mins = dur_mins % 60
s = 'Duration: %dhr %dmin %ds (%ds)\n' % (dur_hrs, dur_mins, dur_secs, duration) s = 'Duration: %dhr %dmin %ds (%ds)\n' % (dur_hrs, dur_mins, dur_secs, duration)
else: else:
s = 'Duration: %dmin %ds (%ds)\n' % (dur_mins, dur_secs, duration) s = 'Duration: %dmin %ds (%ds)\n' % (dur_mins, dur_secs, duration)
s += 'Start: %s (%.2f)\n' % (self.stamp_to_str(self.start_stamp), self.start_stamp) s += 'Start: %s (%.2f)\n' % (self.stamp_to_str(self.start_stamp), self.start_stamp)
s += 'End: %s (%.2f)\n' % (self.stamp_to_str(self.end_stamp), self.end_stamp) s += 'End: %s (%.2f)\n' % (self.stamp_to_str(self.end_stamp), self.end_stamp)
s += 'Messages: %d\n' % (sum([len(p) for p in self._data.msg_positions.values()])) s += 'Messages: %d\n' % (sum([len(p) for p in self._data.msg_positions.values()]))
s += 'Topics:' s += 'Topics:'
max_topic_len = max([len(topic) for topic in self.topics]) max_topic_len = max([len(topic) for topic in self.topics])
max_datatype_len = max([len(self.get_datatype(topic)) for topic in self.topics]) max_datatype_len = max([len(self.get_datatype(topic)) for topic in self.topics])
for i, topic in enumerate(sorted(self.topics)): for i, topic in enumerate(sorted(self.topics)):
indent = (3 if i == 0 else 10) indent = (3 if i == 0 else 10)
positions = numpy.array([stamp for (stamp, pos) in self.msg_positions[topic]]) positions = numpy.array([stamp for (stamp, pos) in self.msg_positions[topic]])
datatype = self.get_datatype(topic) datatype = self.get_datatype(topic)
msg_count = len(positions) msg_count = len(positions)
s += '%s%-*s : %-*s %7d msgs' % (' ' * indent, max_datatype_len, datatype, max_topic_len, topic, msg_count) s += '%s%-*s : %-*s %7d msgs' % (' ' * indent, max_datatype_len, datatype, max_topic_len, topic, msg_count)
@ -163,8 +162,8 @@ class BagIndex:
class BagIndexData: class BagIndexData:
def __init__(self): def __init__(self):
self.datatype_first_pos = {} # datatype -> file offset self.datatype_first_pos = {} # datatype -> file offset
self.topic_datatypes = {} # topic -> datatype self.topic_datatypes = {} # topic -> datatype
self.msg_positions = {} # topic -> [(timestamp, file offset), ...] self.msg_positions = {} # topic -> [(timestamp, file offset), ...]
## Return first timestamp for a given topic ## Return first timestamp for a given topic
def get_topic_start_stamp(self, topic): def get_topic_start_stamp(self, topic):
@ -212,6 +211,39 @@ class BagIndexData:
return max(0, min(index, len(self.msg_positions[topic]) - 1)) return max(0, min(index, len(self.msg_positions[topic]) - 1))
## Reads an index from a bag file
class BagIndexReader:
def __init__(self, bag_path):
self.bag_path = bag_path
self.index = BagIndex()
def load(self):
try:
bag_file = rosrecord.BagReader(self.bag_path)
if bag_file.read_index() is None:
self.index = None
return None
# Read the index in ascending order of topic's first message position (ensures that each datatype message definition gets read)
topic_first_positions = sorted([(ti[0][1], t) for t, ti in bag_file.index.items()])
for _, topic in topic_first_positions:
topic_index = bag_file.index[topic]
datatype = bag_file.datatypes[topic]
for stamp, pos in topic_index:
self.index.add_record(pos, topic, datatype, stamp)
self.index.loaded = True
except Exception, e:
rospy.logerr('Unsuccessful loading index from %s: %s' % (self.bag_path, e))
self.index = None
return self.index
## Serialize bag file index to/from a binary file ## Serialize bag file index to/from a binary file
class BagIndexPickler: class BagIndexPickler:
def __init__(self, index_path): def __init__(self, index_path):
@ -259,20 +291,27 @@ class BagIndexFactory:
def load(self): def load(self):
try: try:
bag_file = BagFile(self.bag_path) bag_file = rosrecord.BagReader(self.bag_path)
file_size = os.path.getsize(self.bag_path) if bag_file.read_index() is not None:
for topic, topic_index in bag_file.index.items():
progress = util.progress_meter.ProgressMeter(self.bag_path, file_size, 1.0) datatype = bag_file.datatypes[topic]
for i, (pos, topic, raw_msg, stamp) in enumerate(bag_file.raw_messages()): for stamp, pos in topic_index:
(datatype, message_data, md5, bag_pos, pytype) = raw_msg self.index.add_record(pos, topic, datatype, stamp)
else:
self.index.add_record(pos, topic, datatype, stamp) file_size = os.path.getsize(self.bag_path)
progress.step(pos) progress = util.progress_meter.ProgressMeter(self.bag_path, file_size, 1.0)
progress.finish() for i, (pos, topic, raw_msg, stamp) in enumerate(bag_file.raw_messages()):
(datatype, message_data, md5, bag_pos, pytype) = raw_msg
self.index.add_record(pos, topic, datatype, stamp)
progress.step(pos)
progress.finish()
self.index.loaded = True self.index.loaded = True

View File

@ -67,12 +67,15 @@ class RawView(msg_view.TopicMsgView):
def message_viewed(self, bag_file, bag_index, topic, stamp, datatype, msg_index, msg): def message_viewed(self, bag_file, bag_index, topic, stamp, datatype, msg_index, msg):
msg_view.TopicMsgView.message_viewed(self, bag_file, bag_index, topic, stamp, datatype, msg_index, msg) msg_view.TopicMsgView.message_viewed(self, bag_file, bag_index, topic, stamp, datatype, msg_index, msg)
self.msg_title.SetValue('%s #%d' % (time.asctime(time.localtime(stamp.to_sec())), msg_index))
self.msg_title.Refresh()
self.msg_incoming = msg if stamp is None or msg_index is None:
self.invalidate() self.message_cleared()
else:
self.msg_title.SetValue('%s #%d' % (time.asctime(time.localtime(stamp.to_sec())), msg_index))
self.msg_title.Refresh()
self.msg_incoming = msg
self.invalidate()
def message_cleared(self): def message_cleared(self):
msg_view.TopicMsgView.message_cleared(self) msg_view.TopicMsgView.message_cleared(self)

View File

@ -38,6 +38,8 @@ PKG = 'rxplay'
import roslib; roslib.load_manifest(PKG) import roslib; roslib.load_manifest(PKG)
import rospy import rospy
import rosrecord
import bisect import bisect
import collections import collections
import math import math
@ -49,13 +51,12 @@ import wx
from util.base_frame import BaseFrame from util.base_frame import BaseFrame
from util.layer import Layer, LayerPanel from util.layer import Layer, LayerPanel
import playhead import playhead
import status
import plugins import plugins
import status
from raw_view import RawView from raw_view import RawView
from bag_file import BagFile from bag_index import BagIndex, BagIndexFactory, BagIndexPickler, BagIndexReader
from bag_index import BagIndex, BagIndexFactory, BagIndexPickler
class TimelinePanel(LayerPanel): class TimelinePanel(LayerPanel):
def __init__(self, input_files, options, *args, **kwargs): def __init__(self, input_files, options, *args, **kwargs):
@ -74,9 +75,13 @@ class TimelinePanel(LayerPanel):
for i, bag_path in enumerate(input_files): for i, bag_path in enumerate(input_files):
rospy.loginfo('%4d / %4d' % (i + 1, len(input_files))) rospy.loginfo('%4d / %4d' % (i + 1, len(input_files)))
bag_file = BagFile(bag_path) bag_file = rosrecord.BagReader(bag_path)
# Try to read the index directly from the bag file, or hit the index if not found
this_bag_index = BagIndexReader(bag_path).load()
if this_bag_index is None:
this_bag_index = BagIndexPickler(bag_path + '.index').load()
this_bag_index = BagIndexPickler(bag_path + '.index').load()
if this_bag_index is not None: if this_bag_index is not None:
bag_file.read_datatype_defs(this_bag_index) bag_file.read_datatype_defs(this_bag_index)
@ -94,7 +99,7 @@ class TimelinePanel(LayerPanel):
bag_index = bag_index_factory.index bag_index = bag_index_factory.index
bag_file = BagFile(bag_path) bag_file = rosrecord.BagReader(bag_path)
self.bag_files[bag_file] = bag_index self.bag_files[bag_file] = bag_index
# Background thread to generate, then save the index # Background thread to generate, then save the index