rxbag: added --record flag for recording/viewing a live system

This commit is contained in:
Tim Field 2010-05-18 07:58:19 +00:00
parent 674ed96811
commit 1a06ca65e5
5 changed files with 266 additions and 181 deletions

View File

@ -36,5 +36,5 @@
# Import rxbag main to be used by the $ROS_ROOT/bin/rxbag
from rxbag_main import rxbag_main
from bag_helper import BagHelper
import bag_helper
from message_view import MessageView, TimelineRenderer, TopicMessageView

View File

@ -34,22 +34,21 @@
PKG = 'rxbag'
import roslib; roslib.load_manifest(PKG)
import rospy
import rosbag
import bisect
import cPickle
import math
import os
import sys
import threading
import time
import numpy
def stamp_to_str(t): return time.strftime('%b %d %Y %H:%M:%S', time.localtime(t.to_sec())) + '.%03d' % (t.nsecs / 1000000)
def get_topics(bag): return sorted(set([c.topic for c in bag._get_connections()]))
def get_start_stamp(bag): return min([index[ 0].time.to_sec() for index in bag._connection_indexes.values()])
def get_end_stamp(bag): return max([index[-1].time.to_sec() for index in bag._connection_indexes.values()])
import util.progress_meter
def get_topics_by_datatype(bag):
topics_by_datatype = {}
for c in bag._get_connections():
topics_by_datatype.setdefault(c.datatype, []).append(c.topic)
return topics_by_datatype
class BagHelper:
@staticmethod
def stamp_to_str(t):
return time.strftime('%b %d %Y %H:%M:%S', time.localtime(t.to_sec())) + '.%03d' % (t.nsecs / 1000000)
def get_datatype(bag, topic):
for c in bag._get_connections(topic):
return c.datatype
return None

View File

@ -51,23 +51,20 @@ else:
sys.exit(1)
import wx
import wx.lib.wxcairo
# This is a crazy hack to get this to work on 64-bit systems
if 'wxMac' in wx.PlatformInfo:
pass # Implement if necessary
elif 'wxMSW' in wx.PlatformInfo:
pass # Implement if necessary
elif 'wxGTK' in wx.PlatformInfo:
if 'wxGTK' in wx.PlatformInfo:
# Workaround for 64-bit systems
import ctypes
gdkLib = wx.lib.wxcairo._findGDKLib()
gdkLib.gdk_cairo_create.restype = ctypes.c_void_p
import rosbag
import util.base_frame
import timeline
class RxBagApp(wx.App):
def __init__(self, input_files, options):
self.input_files = [input_files[0]]
self.input_files = input_files
self.options = options
wx.App.__init__(self)
@ -80,7 +77,19 @@ class RxBagApp(wx.App):
frame_title = 'rxbag - [%d bags]' % len(self.input_files)
frame = util.base_frame.BaseFrame(None, 'rxbag', 'Timeline', title=frame_title)
timeline_panel = timeline.TimelinePanel(self.input_files, self.options, frame, -1)
timeline_panel = timeline.TimelinePanel(frame, -1)
if self.options.record:
connect_to_ros('rxbag', 3)
timeline_panel.timeline.record_bag()
else:
for input_file in self.input_files:
try:
bag = rosbag.Bag(input_file)
timeline_panel.timeline.add_bag(bag)
except Exception, ex:
print >> sys.stderr, 'Error loading [%s]: %s' % (input_file, str(ex))
frame.Show()
self.SetTopWindow(frame)
@ -97,11 +106,15 @@ def connect_to_ros(node_name, init_timeout):
threading.Thread.__init__(self)
self.setDaemon(True)
self.inited = False
self.init_cv = threading.Condition()
def run(self):
rospy.loginfo('Attempting to connect to master node...')
rospy.loginfo('Master found. Connecting...')
rospy.init_node(node_name, anonymous=True, disable_signals=True)
self.init_cv.acquire()
self.inited = True
self.init_cv.notify_all()
self.init_cv.release()
try:
# Check whether ROS master is running
@ -110,29 +123,26 @@ def connect_to_ros(node_name, init_timeout):
# If so (i.e. no exception), attempt to initialize node
init_thread = InitNodeThread()
init_thread.init_cv.acquire()
init_thread.start()
time.sleep(init_timeout)
init_thread.init_cv.wait(init_timeout)
if init_thread.inited:
rospy.core.register_signals()
rospy.loginfo('Connected to master node.')
rospy.loginfo('Connected to ROS.')
else:
rospy.logerr('Giving up. Couldn\'t connect to master node.')
rospy.logerr('Couldn\'t connect to ROS. Running in unconnected mode.')
except:
rospy.loginfo('Master not found.')
rospy.loginfo('Master not found. Running in unconnected mode.')
def rxbag_main():
# Parse command line for input files and options
usage = "usage: %prog [options] BAG_FILE.bag"
usage = "usage: %prog [options] BAG_FILE1 [BAG_FILE2 ...]"
parser = optparse.OptionParser(usage=usage)
#parser.add_option('-t', '--init-timeout', action='store', default=0.5, help='timeout in secs for connecting to master node')
parser.add_option('-r', '--record', dest='record', default=False, action='store_true', help='record to a bag file')
options, args = parser.parse_args(sys.argv[1:])
if len(args) == 0:
parser.print_help()
return
input_files = args[:]
#connect_to_ros('rxbag', options.init_timeout)
input_files = args[:]
# Start application
app = RxBagApp(input_files, options)

View File

@ -37,8 +37,8 @@ import roslib; roslib.load_manifest(PKG)
import wx
import bag_helper
from util.layer import Layer
from bag_helper import BagHelper
class StatusLayer(Layer):
def __init__(self, parent, title, timeline, x, y, width, height):
@ -52,7 +52,7 @@ class StatusLayer(Layer):
t = roslib.rostime.Time.from_sec(self.timeline.playhead)
s = BagHelper.stamp_to_str(t)
s = bag_helper.stamp_to_str(t)
spd = self.timeline.play_speed
spd_str = None

View File

@ -39,61 +39,39 @@ import roslib; roslib.load_manifest(PKG)
import rospy
import rosbag
import rosgraph.masterapi
import bisect
import collections
import math
import os
import sys
import threading
import time
import wx
import cairo
import wx
from util.base_frame import BaseFrame
from util.layer import Layer, LayerPanel
import bag_helper
import playhead
import plugins
import status
from raw_view import RawView
import status
from util.base_frame import BaseFrame
from util.layer import Layer, LayerPanel
class TimelinePanel(LayerPanel):
def __init__(self, input_files, options, *args, **kwargs):
def __init__(self, *args, **kwargs):
LayerPanel.__init__(self, *args, **kwargs)
self.bag_files = {}
if not self._init_bag_files(input_files):
raise Exception('No valid bag files')
self._create_controls(options)
self._create_controls()
self._create_toolbar()
def _init_bag_files(self, input_files):
unindexed = []
for bag_path in input_files:
try:
bag_file = rosbag.Bag(bag_path)
except Exception, e:
print 'Error loading %s: %s' % (bag_path, e)
continue
self.bag_files[bag_file] = bag_file
if len(unindexed) + len(self.bag_files) == 0:
return False
return True
def _create_controls(self, options):
def _create_controls(self):
(width, height) = self.GetParent().GetClientSize()
x, y = 5, 19
self.timeline = Timeline(self, 'Timeline', x, y, width - x, height - y)
self.timeline.set_bag_files(self.bag_files)
self.status = status.StatusLayer(self, 'Status', self.timeline, self.timeline.x, 4, 300, 16)
@ -139,14 +117,68 @@ class TimelinePanel(LayerPanel):
tb.Realize()
class SubscriberHelper(object):
def __init__(self, timeline, topic, pytype):
self.timeline = timeline
self.subscriber = rospy.Subscriber(topic, pytype, self.callback)
self.topic = topic
self.pytype = pytype
self.last_update = time.time()
def callback(self, msg):
with self.timeline._bag_lock:
self.timeline.bag_file.write(self.topic, msg)
self.timeline.invalidated_caches.add(self.topic)
now = time.time()
if now - self.last_update > 0.1:
wx.CallAfter(self.timeline.invalidate)
self.last_update = now
class MasterCheckThread(threading.Thread):
def __init__(self, timeline, period=5):
threading.Thread.__init__(self)
self.timeline = timeline
self.period = period
self.subscriber_helpers = {}
self.failed_topics = set()
self.start()
def run(self):
master = rosgraph.masterapi.Master('rxbag')
while True:
# Subscribe to any new topics
for topic, datatype in master.getPublishedTopics(''):
if topic not in self.subscriber_helpers and topic not in self.failed_topics:
print 'Found new topic: %s [%s]' % (topic, datatype)
try:
pytype = roslib.message.get_message_class(datatype)
self.subscriber_helpers[topic] = SubscriberHelper(self.timeline, topic, pytype)
except Exception, ex:
print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex))
self.failed_topics.add(topic)
# Wait a while
time.sleep(self.period)
class Timeline(Layer):
name = 'Timeline'
def __init__(self, parent, title, x, y, width, height):
Layer.__init__(self, parent, title, x, y, width, height)
self.bag_files = {}
self.bag_file = None
self._bag_lock = threading.Lock()
self.bags = []
self.bag_file = None # @todo: remove
## Rendering parameters
@ -222,6 +254,8 @@ class Timeline(Layer):
self.timeline = timeline
self.stop_flag = False
self.start()
def run(self):
self.last_frame, self.last_playhead = None, None
@ -258,67 +292,71 @@ class Timeline(Layer):
self.play_speed = 0.0
self.play_thread = PlayThread(self)
self.play_thread.start()
self.message_history_cache = {}
self.invalidated_caches = set()
self.play_thread = PlayThread(self)
self.master_check_thread = None
###
def record_bag(self):
self.master_check_thread = MasterCheckThread(self)
self.bag_file = rosbag.Bag('rxbag.bag', 'w')
def add_bag(self, bag):
self.bags.append(bag)
self.bag_file = bag
@property
def start_stamp(self):
return self.get_start_stamp(self.bag_file)
with self._bag_lock:
return bag_helper.get_start_stamp(self.bag_file)
@property
def end_stamp(self):
return self.get_end_stamp(self.bag_file)
with self._bag_lock:
return bag_helper.get_end_stamp(self.bag_file)
@property
def topics(self):
return sorted(set([c.topic for c in self.bag_file._get_connections()]))
with self._bag_lock:
return bag_helper.get_topics(self.bag_file)
def get_start_stamp(self, bag):
return min([index[0].time.to_sec() for index in bag._connection_indexes.values()])
def get_end_stamp(self, bag):
return max([index[-1].time.to_sec() for index in bag._connection_indexes.values()])
def get_topics_by_datatype(self):
topics_by_datatype = {}
for c in self.bag_file._get_connections():
topics_by_datatype.setdefault(c.datatype, []).append(c.topic)
return topics_by_datatype
@property
def topics_by_datatype(self):
with self._bag_lock:
return bag_helper.get_topics_by_datatype(self.bag_file)
def get_datatype(self, topic):
for c in self.bag_file._get_connections(topic):
return c.datatype
with self._bag_lock:
return bag_helper.get_datatype(self.bag_file, topic)
def get_connections(self, topic):
with self._bag_lock:
return list(self.bag_file._get_connections(topic))
def get_entry(self, t, topic):
with self._bag_lock:
return self.bag_file._get_entry(t, self.bag_file._get_connections(topic))
def get_entries(self, connections, start_time, end_time):
with self._bag_lock:
return self.bag_file._get_entries(connections, start_time, end_time)
def read_message(self, position):
with self._bag_lock:
return self.bag_file._read_message(position)
##
def on_close(self, event):
self.play_thread.stop()
def save_layout(self):
user_config_dir = wx.StandardPaths.Get().GetUserConfigDir()
config_dir = os.path.join(user_config_dir, '.rxbag')
if not os.path.exists(config_dir):
os.mkdir(config_dir)
layout_file = os.path.join(config_dir, 'layout')
config = wx.Config(localFilename=layout_file)
# TODO
#for i, view in enumerate(self.views):
# config.Write('/Views/View%d' % (i + 1), view.__class__.__name__)
#config.Flush()
def load_layout(self):
user_config_dir = wx.StandardPaths.Get().GetUserConfigDir()
config_dir = os.path.join(user_config_dir, '.rxbag')
if not os.path.exists(config_dir):
return
layout_file = os.path.join(config_dir, 'layout')
config = wx.Config(localFilename=layout_file)
# TODO
### Views / listeners
def add_view(self, topic, view):
self.views.append(view)
@ -328,9 +366,34 @@ class Timeline(Layer):
self.remove_listener(topic, view)
self.views.remove(view)
def add_listener(self, topic, listener):
self.listeners.setdefault(topic, []).append(listener)
self._update_message_view()
self.invalidate()
def remove_listener(self, topic, listener):
topic_listeners = self.listeners.get(topic)
if topic_listeners is not None and listener in topic_listeners:
topic_listeners.remove(listener)
if len(topic_listeners) == 0:
del self.listeners[topic]
self.invalidate()
### Plugins
def get_viewer_types(self, datatype):
return [RawView] + self.viewer_types.get('*', []) + self.viewer_types.get(datatype, [])
def load_plugins(self):
for view, timeline_renderer, msg_types in plugins.load_plugins():
for msg_type in msg_types:
self.viewer_types.setdefault(msg_type, []).append(view)
if timeline_renderer is not None:
self.timeline_renderers[msg_type] = timeline_renderer(self)
### Timeline renderers
def get_renderers(self):
renderers = []
@ -342,19 +405,6 @@ class Timeline(Layer):
return renderers
def load_plugins(self):
for view, timeline_renderer, msg_types in plugins.load_plugins():
for msg_type in msg_types:
self.viewer_types.setdefault(msg_type, []).append(view)
if timeline_renderer is not None:
self.timeline_renderers[msg_type] = timeline_renderer(self)
def set_bag_files(self, bag_files):
self.bag_files = bag_files
# todo: handle multiple bags
self.bag_file = list(self.bag_files)[0]
def is_renderer_active(self, topic):
return topic in self.rendered_topics
@ -386,19 +436,7 @@ class Timeline(Layer):
self.invalidate()
self.parent.playhead.invalidate()
def add_listener(self, topic, listener):
self.listeners.setdefault(topic, []).append(listener)
self.update_message_view()
self.invalidate()
def remove_listener(self, topic, listener):
topic_listeners = self.listeners.get(topic)
if topic_listeners is not None and listener in topic_listeners:
topic_listeners.remove(listener)
if len(topic_listeners) == 0:
del self.listeners[topic]
self.invalidate()
###
@property
def history_bottom(self):
@ -411,7 +449,7 @@ class Timeline(Layer):
def _set_status(self, status):
self.parent.GetParent().SetStatusText(str(status))
## Navigation
### Navigation
def navigate_play(self): self.play_speed = 1.0
def navigate_stop(self): self.play_speed = 0.0
@ -439,7 +477,7 @@ class Timeline(Layer):
def navigate_start(self): self.set_playhead(self.start_stamp)
def navigate_end(self): self.set_playhead(self.end_stamp)
## View port
### View port
def reset_timeline(self):
self.reset_zoom()
@ -460,7 +498,11 @@ class Timeline(Layer):
self.invalidate()
def reset_zoom(self):
self.set_timeline_view(self.start_stamp, self.end_stamp)
start_stamp, end_stamp = self.start_stamp, self.end_stamp
if end_stamp - start_stamp < 5.0:
end_stamp = start_stamp + 5.0
self.set_timeline_view(start_stamp, end_stamp)
def zoom_in(self): self.zoom_timeline(0.5)
def zoom_out(self): self.zoom_timeline(2.0)
@ -507,11 +549,11 @@ class Timeline(Layer):
self.playhead_positions = {}
for topic in self.topics:
entry = self.bag_file._get_entry(playhead_time, self.bag_file._get_connections(topic))
entry = self.get_entry(playhead_time, topic)
if entry:
self.playhead_positions[topic] = entry.position
self.update_message_view()
self._update_message_view()
self.parent.status.invalidate()
self.parent.playhead.update_position()
@ -524,7 +566,7 @@ class Timeline(Layer):
self.resize(w - self.x, h - self.y) # resize layer to fill client area
def paint(self, dc):
if len(self.topics) == 0:
if not self.bag_file or len(self.topics) == 0:
return
if self.stamp_left is None:
@ -713,6 +755,27 @@ class Timeline(Layer):
else:
self._draw_topic_history(dc, topic)
def _update_message_cache(self, topic):
if topic not in self.message_history_cache:
# Don't have any cache of messages in this topic
start_time = roslib.rostime.Time.from_sec(max(0.0, self.start_stamp))
topic_cache = []
self.message_history_cache[topic] = topic_cache
else:
# Check if the cache has been invalidated
if topic not in self.invalidated_caches:
return
topic_cache = self.message_history_cache[topic]
start_time = roslib.rostime.Time.from_sec(max(0.0, topic_cache[-1]))
end_time = roslib.rostime.Time.from_sec(max(0.0, self.end_stamp))
for entry in self.get_entries(self.get_connections(topic), start_time, end_time):
topic_cache.append(entry.time.to_sec())
self.invalidated_caches.remove(topic)
def _draw_topic_history(self, dc, topic):
"""
Draw boxes to show message regions on timelines.
@ -722,10 +785,7 @@ class Timeline(Layer):
msg_y = y + 1
msg_height = h - 1
# Get all the connections on this topic
connections = list(self.bag_file._get_connections(topic))
datatype = connections[0].datatype
datatype = self.get_datatype(topic)
# Get the renderer and the message combine interval
renderer = None
@ -737,14 +797,9 @@ class Timeline(Layer):
if msg_combine_interval is None:
msg_combine_interval = self.map_dx_to_dstamp(self.default_msg_combine_px)
if topic not in self.message_history_cache:
start_time = roslib.rostime.Time.from_sec(max(0.0, self.start_stamp))
end_time = roslib.rostime.Time.from_sec(max(0.0, self.end_stamp))
all_stamps = list(entry.time.to_sec() for entry in self.bag_file._get_entries(connections, start_time, end_time))
self._update_message_cache(topic)
self.message_history_cache[topic] = all_stamps
else:
all_stamps = self.message_history_cache[topic]
all_stamps = self.message_history_cache[topic]
start_index = bisect.bisect_left(all_stamps, self.stamp_left)
end_index = bisect.bisect_left(all_stamps, self.stamp_right)
@ -842,8 +897,7 @@ class Timeline(Layer):
dc.line_to(x + w + 1, y + 2)
dc.stroke()
def map_dx_to_dstamp(self, dx):
return float(dx) * (self.stamp_right - self.stamp_left) / self.history_width
### Pixel location <-> timestamp
def map_x_to_stamp(self, x, clamp_to_visible=True):
fraction = float(x - self.history_left) / self.history_width
@ -856,8 +910,8 @@ class Timeline(Layer):
return max(0.0, self.stamp_left + fraction * (self.stamp_right - self.stamp_left))
def map_dstamp_to_dx(self, dstamp):
return (float(dstamp) * self.history_width) / (self.stamp_right - self.stamp_left)
def map_dx_to_dstamp(self, dx):
return float(dx) * (self.stamp_right - self.stamp_left) / self.history_width
def map_stamp_to_x(self, stamp, clamp_to_visible=True):
fraction = (stamp - self.stamp_left) / (self.stamp_right - self.stamp_left)
@ -867,9 +921,10 @@ class Timeline(Layer):
return self.history_left + fraction * self.history_width
### Control
def map_dstamp_to_dx(self, dstamp):
return (float(dstamp) * self.history_width) / (self.stamp_right - self.stamp_left)
# Mouse events
### Mouse events
def on_left_down(self, event):
self.clicked_pos = event.GetPosition()
@ -893,6 +948,9 @@ class Timeline(Layer):
if not self.contains(*self.clicked_pos):
return
if not self.bag_file:
return
self.parent.PopupMenu(TimelinePopupMenu(self.parent, self), self.clicked_pos)
def on_mousewheel(self, event):
@ -935,16 +993,9 @@ class Timeline(Layer):
self.clicked_pos = mouse_pos
##
###
def load_msg_by_stamp(self, topic, stamp):
index = topic.find_stamp_index(stamp)
if not index:
return None
pos = topic.msg_positions[index][1]
def update_message_view(self):
def _update_message_view(self):
if not self.playhead_positions:
return
@ -954,7 +1005,7 @@ class Timeline(Layer):
playhead_position = self.playhead_positions[topic]
if playhead_position is not None:
# Load the message
msgs[topic] = self.bag_file._read_message(playhead_position)
msgs[topic] = self.read_message(playhead_position)
continue
msgs[topic] = None
@ -979,6 +1030,31 @@ class Timeline(Layer):
except wx.PyDeadObjectError:
self.remove_listener(topic, listener)
### Layout management
def save_layout(self):
user_config_dir = wx.StandardPaths.Get().GetUserConfigDir()
config_dir = os.path.join(user_config_dir, '.rxbag')
if not os.path.exists(config_dir):
os.mkdir(config_dir)
layout_file = os.path.join(config_dir, 'layout')
config = wx.Config(localFilename=layout_file)
# TODO
#for i, view in enumerate(self.views):
# config.Write('/Views/View%d' % (i + 1), view.__class__.__name__)
#config.Flush()
def load_layout(self):
user_config_dir = wx.StandardPaths.Get().GetUserConfigDir()
config_dir = os.path.join(user_config_dir, '.rxbag')
if not os.path.exists(config_dir):
return
layout_file = os.path.join(config_dir, 'layout')
config = wx.Config(localFilename=layout_file)
# TODO
class TimelinePopupMenu(wx.Menu):
"""
Timeline popup menu. Allows user to manipulate the timeline view, and open new message views.
@ -1043,7 +1119,7 @@ class TimelinePopupMenu(wx.Menu):
self.view_datatype_menu = wx.Menu()
self.AppendSubMenu(self.view_datatype_menu, 'View (by datatype)...', 'View message detail')
topics_by_datatype = self.timeline.get_topics_by_datatype()
topics_by_datatype = self.timeline.topics_by_datatype
for datatype in sorted(topics_by_datatype):
# View... / datatype