diff --git a/tools/rxbag/src/rxbag/__init__.py b/tools/rxbag/src/rxbag/__init__.py index c1dfe674..7227f31a 100755 --- a/tools/rxbag/src/rxbag/__init__.py +++ b/tools/rxbag/src/rxbag/__init__.py @@ -36,5 +36,5 @@ # Import rxbag main to be used by the $ROS_ROOT/bin/rxbag from rxbag_main import rxbag_main -from bag_helper import BagHelper +import bag_helper from message_view import MessageView, TimelineRenderer, TopicMessageView diff --git a/tools/rxbag/src/rxbag/bag_helper.py b/tools/rxbag/src/rxbag/bag_helper.py index 424ae4f6..d1274c57 100755 --- a/tools/rxbag/src/rxbag/bag_helper.py +++ b/tools/rxbag/src/rxbag/bag_helper.py @@ -34,22 +34,21 @@ PKG = 'rxbag' import roslib; roslib.load_manifest(PKG) -import rospy -import rosbag -import bisect -import cPickle -import math -import os -import sys -import threading import time -import numpy +def stamp_to_str(t): return time.strftime('%b %d %Y %H:%M:%S', time.localtime(t.to_sec())) + '.%03d' % (t.nsecs / 1000000) +def get_topics(bag): return sorted(set([c.topic for c in bag._get_connections()])) +def get_start_stamp(bag): return min([index[ 0].time.to_sec() for index in bag._connection_indexes.values()]) +def get_end_stamp(bag): return max([index[-1].time.to_sec() for index in bag._connection_indexes.values()]) -import util.progress_meter +def get_topics_by_datatype(bag): + topics_by_datatype = {} + for c in bag._get_connections(): + topics_by_datatype.setdefault(c.datatype, []).append(c.topic) + return topics_by_datatype -class BagHelper: - @staticmethod - def stamp_to_str(t): - return time.strftime('%b %d %Y %H:%M:%S', time.localtime(t.to_sec())) + '.%03d' % (t.nsecs / 1000000) +def get_datatype(bag, topic): + for c in bag._get_connections(topic): + return c.datatype + return None diff --git a/tools/rxbag/src/rxbag/rxbag_main.py b/tools/rxbag/src/rxbag/rxbag_main.py index 1a4a71fc..42327729 100755 --- a/tools/rxbag/src/rxbag/rxbag_main.py +++ b/tools/rxbag/src/rxbag/rxbag_main.py @@ -51,23 +51,20 @@ else: sys.exit(1) import wx import wx.lib.wxcairo - -# This is a crazy hack to get this to work on 64-bit systems -if 'wxMac' in wx.PlatformInfo: - pass # Implement if necessary -elif 'wxMSW' in wx.PlatformInfo: - pass # Implement if necessary -elif 'wxGTK' in wx.PlatformInfo: +if 'wxGTK' in wx.PlatformInfo: + # Workaround for 64-bit systems import ctypes gdkLib = wx.lib.wxcairo._findGDKLib() gdkLib.gdk_cairo_create.restype = ctypes.c_void_p +import rosbag + import util.base_frame import timeline class RxBagApp(wx.App): def __init__(self, input_files, options): - self.input_files = [input_files[0]] + self.input_files = input_files self.options = options wx.App.__init__(self) @@ -80,7 +77,19 @@ class RxBagApp(wx.App): frame_title = 'rxbag - [%d bags]' % len(self.input_files) frame = util.base_frame.BaseFrame(None, 'rxbag', 'Timeline', title=frame_title) - timeline_panel = timeline.TimelinePanel(self.input_files, self.options, frame, -1) + timeline_panel = timeline.TimelinePanel(frame, -1) + + if self.options.record: + connect_to_ros('rxbag', 3) + timeline_panel.timeline.record_bag() + else: + for input_file in self.input_files: + try: + bag = rosbag.Bag(input_file) + timeline_panel.timeline.add_bag(bag) + except Exception, ex: + print >> sys.stderr, 'Error loading [%s]: %s' % (input_file, str(ex)) + frame.Show() self.SetTopWindow(frame) @@ -91,48 +100,49 @@ class RxBagApp(wx.App): return True def connect_to_ros(node_name, init_timeout): - # Attempt to connect to master node + # Attempt to connect to master node class InitNodeThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.setDaemon(True) self.inited = False - + self.init_cv = threading.Condition() + def run(self): - rospy.loginfo('Attempting to connect to master node...') + rospy.loginfo('Master found. Connecting...') rospy.init_node(node_name, anonymous=True, disable_signals=True) + self.init_cv.acquire() self.inited = True + self.init_cv.notify_all() + self.init_cv.release() try: - # Check whether ROS master is running + # Check whether ROS master is running master = rospy.get_master() master.getPid() - - # If so (i.e. no exception), attempt to initialize node + + # If so (i.e. no exception), attempt to initialize node init_thread = InitNodeThread() + init_thread.init_cv.acquire() init_thread.start() - time.sleep(init_timeout) + init_thread.init_cv.wait(init_timeout) if init_thread.inited: rospy.core.register_signals() - rospy.loginfo('Connected to master node.') + rospy.loginfo('Connected to ROS.') else: - rospy.logerr('Giving up. Couldn\'t connect to master node.') + rospy.logerr('Couldn\'t connect to ROS. Running in unconnected mode.') except: - rospy.loginfo('Master not found.') + rospy.loginfo('Master not found. Running in unconnected mode.') def rxbag_main(): # Parse command line for input files and options - usage = "usage: %prog [options] BAG_FILE.bag" + usage = "usage: %prog [options] BAG_FILE1 [BAG_FILE2 ...]" parser = optparse.OptionParser(usage=usage) - #parser.add_option('-t', '--init-timeout', action='store', default=0.5, help='timeout in secs for connecting to master node') + parser.add_option('-r', '--record', dest='record', default=False, action='store_true', help='record to a bag file') options, args = parser.parse_args(sys.argv[1:]) - if len(args) == 0: - parser.print_help() - return - input_files = args[:] - #connect_to_ros('rxbag', options.init_timeout) + input_files = args[:] # Start application app = RxBagApp(input_files, options) diff --git a/tools/rxbag/src/rxbag/status.py b/tools/rxbag/src/rxbag/status.py index 0c31919e..3a8a6812 100755 --- a/tools/rxbag/src/rxbag/status.py +++ b/tools/rxbag/src/rxbag/status.py @@ -37,8 +37,8 @@ import roslib; roslib.load_manifest(PKG) import wx +import bag_helper from util.layer import Layer -from bag_helper import BagHelper class StatusLayer(Layer): def __init__(self, parent, title, timeline, x, y, width, height): @@ -52,7 +52,7 @@ class StatusLayer(Layer): t = roslib.rostime.Time.from_sec(self.timeline.playhead) - s = BagHelper.stamp_to_str(t) + s = bag_helper.stamp_to_str(t) spd = self.timeline.play_speed spd_str = None diff --git a/tools/rxbag/src/rxbag/timeline.py b/tools/rxbag/src/rxbag/timeline.py index cf4c5c04..1097539f 100755 --- a/tools/rxbag/src/rxbag/timeline.py +++ b/tools/rxbag/src/rxbag/timeline.py @@ -39,61 +39,39 @@ import roslib; roslib.load_manifest(PKG) import rospy import rosbag +import rosgraph.masterapi import bisect import collections import math import os +import sys import threading import time -import wx import cairo +import wx -from util.base_frame import BaseFrame -from util.layer import Layer, LayerPanel +import bag_helper import playhead import plugins -import status - from raw_view import RawView +import status +from util.base_frame import BaseFrame +from util.layer import Layer, LayerPanel class TimelinePanel(LayerPanel): - def __init__(self, input_files, options, *args, **kwargs): + def __init__(self, *args, **kwargs): LayerPanel.__init__(self, *args, **kwargs) - - self.bag_files = {} - if not self._init_bag_files(input_files): - raise Exception('No valid bag files') - - self._create_controls(options) + self._create_controls() self._create_toolbar() - def _init_bag_files(self, input_files): - unindexed = [] - - for bag_path in input_files: - try: - bag_file = rosbag.Bag(bag_path) - except Exception, e: - print 'Error loading %s: %s' % (bag_path, e) - continue - - self.bag_files[bag_file] = bag_file - - if len(unindexed) + len(self.bag_files) == 0: - return False - - return True - - def _create_controls(self, options): + def _create_controls(self): (width, height) = self.GetParent().GetClientSize() x, y = 5, 19 - self.timeline = Timeline(self, 'Timeline', x, y, width - x, height - y) - self.timeline.set_bag_files(self.bag_files) self.status = status.StatusLayer(self, 'Status', self.timeline, self.timeline.x, 4, 300, 16) @@ -139,14 +117,68 @@ class TimelinePanel(LayerPanel): tb.Realize() +class SubscriberHelper(object): + def __init__(self, timeline, topic, pytype): + self.timeline = timeline + self.subscriber = rospy.Subscriber(topic, pytype, self.callback) + self.topic = topic + self.pytype = pytype + + self.last_update = time.time() + + def callback(self, msg): + with self.timeline._bag_lock: + self.timeline.bag_file.write(self.topic, msg) + + self.timeline.invalidated_caches.add(self.topic) + + now = time.time() + if now - self.last_update > 0.1: + wx.CallAfter(self.timeline.invalidate) + self.last_update = now + +class MasterCheckThread(threading.Thread): + def __init__(self, timeline, period=5): + threading.Thread.__init__(self) + + self.timeline = timeline + self.period = period + + self.subscriber_helpers = {} + + self.failed_topics = set() + + self.start() + + def run(self): + master = rosgraph.masterapi.Master('rxbag') + + while True: + # Subscribe to any new topics + for topic, datatype in master.getPublishedTopics(''): + if topic not in self.subscriber_helpers and topic not in self.failed_topics: + print 'Found new topic: %s [%s]' % (topic, datatype) + + try: + pytype = roslib.message.get_message_class(datatype) + self.subscriber_helpers[topic] = SubscriberHelper(self.timeline, topic, pytype) + except Exception, ex: + print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex)) + self.failed_topics.add(topic) + + # Wait a while + time.sleep(self.period) + class Timeline(Layer): name = 'Timeline' def __init__(self, parent, title, x, y, width, height): Layer.__init__(self, parent, title, x, y, width, height) - self.bag_files = {} - self.bag_file = None + self._bag_lock = threading.Lock() + + self.bags = [] + self.bag_file = None # @todo: remove ## Rendering parameters @@ -221,6 +253,8 @@ class Timeline(Layer): self.timeline = timeline self.stop_flag = False + + self.start() def run(self): self.last_frame, self.last_playhead = None, None @@ -258,67 +292,71 @@ class Timeline(Layer): self.play_speed = 0.0 - self.play_thread = PlayThread(self) - self.play_thread.start() - self.message_history_cache = {} + self.invalidated_caches = set() + + self.play_thread = PlayThread(self) + + self.master_check_thread = None + + ### + + def record_bag(self): + self.master_check_thread = MasterCheckThread(self) + + self.bag_file = rosbag.Bag('rxbag.bag', 'w') + + def add_bag(self, bag): + self.bags.append(bag) + + self.bag_file = bag @property def start_stamp(self): - return self.get_start_stamp(self.bag_file) + with self._bag_lock: + return bag_helper.get_start_stamp(self.bag_file) @property def end_stamp(self): - return self.get_end_stamp(self.bag_file) + with self._bag_lock: + return bag_helper.get_end_stamp(self.bag_file) @property def topics(self): - return sorted(set([c.topic for c in self.bag_file._get_connections()])) + with self._bag_lock: + return bag_helper.get_topics(self.bag_file) - def get_start_stamp(self, bag): - return min([index[0].time.to_sec() for index in bag._connection_indexes.values()]) - - def get_end_stamp(self, bag): - return max([index[-1].time.to_sec() for index in bag._connection_indexes.values()]) - - def get_topics_by_datatype(self): - topics_by_datatype = {} - for c in self.bag_file._get_connections(): - topics_by_datatype.setdefault(c.datatype, []).append(c.topic) - return topics_by_datatype + @property + def topics_by_datatype(self): + with self._bag_lock: + return bag_helper.get_topics_by_datatype(self.bag_file) def get_datatype(self, topic): - for c in self.bag_file._get_connections(topic): - return c.datatype + with self._bag_lock: + return bag_helper.get_datatype(self.bag_file, topic) + + def get_connections(self, topic): + with self._bag_lock: + return list(self.bag_file._get_connections(topic)) + + def get_entry(self, t, topic): + with self._bag_lock: + return self.bag_file._get_entry(t, self.bag_file._get_connections(topic)) + + def get_entries(self, connections, start_time, end_time): + with self._bag_lock: + return self.bag_file._get_entries(connections, start_time, end_time) + + def read_message(self, position): + with self._bag_lock: + return self.bag_file._read_message(position) ## def on_close(self, event): self.play_thread.stop() - def save_layout(self): - user_config_dir = wx.StandardPaths.Get().GetUserConfigDir() - config_dir = os.path.join(user_config_dir, '.rxbag') - if not os.path.exists(config_dir): - os.mkdir(config_dir) - layout_file = os.path.join(config_dir, 'layout') - config = wx.Config(localFilename=layout_file) - - # TODO - - #for i, view in enumerate(self.views): - # config.Write('/Views/View%d' % (i + 1), view.__class__.__name__) - #config.Flush() - - def load_layout(self): - user_config_dir = wx.StandardPaths.Get().GetUserConfigDir() - config_dir = os.path.join(user_config_dir, '.rxbag') - if not os.path.exists(config_dir): - return - layout_file = os.path.join(config_dir, 'layout') - config = wx.Config(localFilename=layout_file) - - # TODO + ### Views / listeners def add_view(self, topic, view): self.views.append(view) @@ -328,9 +366,34 @@ class Timeline(Layer): self.remove_listener(topic, view) self.views.remove(view) + def add_listener(self, topic, listener): + self.listeners.setdefault(topic, []).append(listener) + + self._update_message_view() + self.invalidate() + + def remove_listener(self, topic, listener): + topic_listeners = self.listeners.get(topic) + if topic_listeners is not None and listener in topic_listeners: + topic_listeners.remove(listener) + if len(topic_listeners) == 0: + del self.listeners[topic] + self.invalidate() + + ### Plugins + def get_viewer_types(self, datatype): return [RawView] + self.viewer_types.get('*', []) + self.viewer_types.get(datatype, []) + def load_plugins(self): + for view, timeline_renderer, msg_types in plugins.load_plugins(): + for msg_type in msg_types: + self.viewer_types.setdefault(msg_type, []).append(view) + if timeline_renderer is not None: + self.timeline_renderers[msg_type] = timeline_renderer(self) + + ### Timeline renderers + def get_renderers(self): renderers = [] @@ -342,19 +405,6 @@ class Timeline(Layer): return renderers - def load_plugins(self): - for view, timeline_renderer, msg_types in plugins.load_plugins(): - for msg_type in msg_types: - self.viewer_types.setdefault(msg_type, []).append(view) - if timeline_renderer is not None: - self.timeline_renderers[msg_type] = timeline_renderer(self) - - def set_bag_files(self, bag_files): - self.bag_files = bag_files - - # todo: handle multiple bags - self.bag_file = list(self.bag_files)[0] - def is_renderer_active(self, topic): return topic in self.rendered_topics @@ -386,19 +436,7 @@ class Timeline(Layer): self.invalidate() self.parent.playhead.invalidate() - def add_listener(self, topic, listener): - self.listeners.setdefault(topic, []).append(listener) - - self.update_message_view() - self.invalidate() - - def remove_listener(self, topic, listener): - topic_listeners = self.listeners.get(topic) - if topic_listeners is not None and listener in topic_listeners: - topic_listeners.remove(listener) - if len(topic_listeners) == 0: - del self.listeners[topic] - self.invalidate() + ### @property def history_bottom(self): @@ -411,7 +449,7 @@ class Timeline(Layer): def _set_status(self, status): self.parent.GetParent().SetStatusText(str(status)) - ## Navigation + ### Navigation def navigate_play(self): self.play_speed = 1.0 def navigate_stop(self): self.play_speed = 0.0 @@ -439,7 +477,7 @@ class Timeline(Layer): def navigate_start(self): self.set_playhead(self.start_stamp) def navigate_end(self): self.set_playhead(self.end_stamp) - ## View port + ### View port def reset_timeline(self): self.reset_zoom() @@ -460,7 +498,11 @@ class Timeline(Layer): self.invalidate() def reset_zoom(self): - self.set_timeline_view(self.start_stamp, self.end_stamp) + start_stamp, end_stamp = self.start_stamp, self.end_stamp + if end_stamp - start_stamp < 5.0: + end_stamp = start_stamp + 5.0 + + self.set_timeline_view(start_stamp, end_stamp) def zoom_in(self): self.zoom_timeline(0.5) def zoom_out(self): self.zoom_timeline(2.0) @@ -507,11 +549,11 @@ class Timeline(Layer): self.playhead_positions = {} for topic in self.topics: - entry = self.bag_file._get_entry(playhead_time, self.bag_file._get_connections(topic)) + entry = self.get_entry(playhead_time, topic) if entry: self.playhead_positions[topic] = entry.position - self.update_message_view() + self._update_message_view() self.parent.status.invalidate() self.parent.playhead.update_position() @@ -524,7 +566,7 @@ class Timeline(Layer): self.resize(w - self.x, h - self.y) # resize layer to fill client area def paint(self, dc): - if len(self.topics) == 0: + if not self.bag_file or len(self.topics) == 0: return if self.stamp_left is None: @@ -713,6 +755,27 @@ class Timeline(Layer): else: self._draw_topic_history(dc, topic) + def _update_message_cache(self, topic): + if topic not in self.message_history_cache: + # Don't have any cache of messages in this topic + start_time = roslib.rostime.Time.from_sec(max(0.0, self.start_stamp)) + topic_cache = [] + self.message_history_cache[topic] = topic_cache + else: + # Check if the cache has been invalidated + if topic not in self.invalidated_caches: + return + + topic_cache = self.message_history_cache[topic] + start_time = roslib.rostime.Time.from_sec(max(0.0, topic_cache[-1])) + + end_time = roslib.rostime.Time.from_sec(max(0.0, self.end_stamp)) + + for entry in self.get_entries(self.get_connections(topic), start_time, end_time): + topic_cache.append(entry.time.to_sec()) + + self.invalidated_caches.remove(topic) + def _draw_topic_history(self, dc, topic): """ Draw boxes to show message regions on timelines. @@ -722,10 +785,7 @@ class Timeline(Layer): msg_y = y + 1 msg_height = h - 1 - # Get all the connections on this topic - connections = list(self.bag_file._get_connections(topic)) - - datatype = connections[0].datatype + datatype = self.get_datatype(topic) # Get the renderer and the message combine interval renderer = None @@ -736,15 +796,10 @@ class Timeline(Layer): msg_combine_interval = self.map_dx_to_dstamp(renderer.msg_combine_px) if msg_combine_interval is None: msg_combine_interval = self.map_dx_to_dstamp(self.default_msg_combine_px) - - if topic not in self.message_history_cache: - start_time = roslib.rostime.Time.from_sec(max(0.0, self.start_stamp)) - end_time = roslib.rostime.Time.from_sec(max(0.0, self.end_stamp)) - all_stamps = list(entry.time.to_sec() for entry in self.bag_file._get_entries(connections, start_time, end_time)) - - self.message_history_cache[topic] = all_stamps - else: - all_stamps = self.message_history_cache[topic] + + self._update_message_cache(topic) + + all_stamps = self.message_history_cache[topic] start_index = bisect.bisect_left(all_stamps, self.stamp_left) end_index = bisect.bisect_left(all_stamps, self.stamp_right) @@ -842,8 +897,7 @@ class Timeline(Layer): dc.line_to(x + w + 1, y + 2) dc.stroke() - def map_dx_to_dstamp(self, dx): - return float(dx) * (self.stamp_right - self.stamp_left) / self.history_width + ### Pixel location <-> timestamp def map_x_to_stamp(self, x, clamp_to_visible=True): fraction = float(x - self.history_left) / self.history_width @@ -856,8 +910,8 @@ class Timeline(Layer): return max(0.0, self.stamp_left + fraction * (self.stamp_right - self.stamp_left)) - def map_dstamp_to_dx(self, dstamp): - return (float(dstamp) * self.history_width) / (self.stamp_right - self.stamp_left) + def map_dx_to_dstamp(self, dx): + return float(dx) * (self.stamp_right - self.stamp_left) / self.history_width def map_stamp_to_x(self, stamp, clamp_to_visible=True): fraction = (stamp - self.stamp_left) / (self.stamp_right - self.stamp_left) @@ -867,9 +921,10 @@ class Timeline(Layer): return self.history_left + fraction * self.history_width - ### Control + def map_dstamp_to_dx(self, dstamp): + return (float(dstamp) * self.history_width) / (self.stamp_right - self.stamp_left) - # Mouse events + ### Mouse events def on_left_down(self, event): self.clicked_pos = event.GetPosition() @@ -893,6 +948,9 @@ class Timeline(Layer): if not self.contains(*self.clicked_pos): return + if not self.bag_file: + return + self.parent.PopupMenu(TimelinePopupMenu(self.parent, self), self.clicked_pos) def on_mousewheel(self, event): @@ -934,17 +992,10 @@ class Timeline(Layer): self.set_playhead(self.map_x_to_stamp(x)) self.clicked_pos = mouse_pos - - ## - - def load_msg_by_stamp(self, topic, stamp): - index = topic.find_stamp_index(stamp) - if not index: - return None - pos = topic.msg_positions[index][1] + ### - def update_message_view(self): + def _update_message_view(self): if not self.playhead_positions: return @@ -954,7 +1005,7 @@ class Timeline(Layer): playhead_position = self.playhead_positions[topic] if playhead_position is not None: # Load the message - msgs[topic] = self.bag_file._read_message(playhead_position) + msgs[topic] = self.read_message(playhead_position) continue msgs[topic] = None @@ -979,6 +1030,31 @@ class Timeline(Layer): except wx.PyDeadObjectError: self.remove_listener(topic, listener) + ### Layout management + + def save_layout(self): + user_config_dir = wx.StandardPaths.Get().GetUserConfigDir() + config_dir = os.path.join(user_config_dir, '.rxbag') + if not os.path.exists(config_dir): + os.mkdir(config_dir) + layout_file = os.path.join(config_dir, 'layout') + config = wx.Config(localFilename=layout_file) + + # TODO + #for i, view in enumerate(self.views): + # config.Write('/Views/View%d' % (i + 1), view.__class__.__name__) + #config.Flush() + + def load_layout(self): + user_config_dir = wx.StandardPaths.Get().GetUserConfigDir() + config_dir = os.path.join(user_config_dir, '.rxbag') + if not os.path.exists(config_dir): + return + layout_file = os.path.join(config_dir, 'layout') + config = wx.Config(localFilename=layout_file) + + # TODO + class TimelinePopupMenu(wx.Menu): """ Timeline popup menu. Allows user to manipulate the timeline view, and open new message views. @@ -1043,7 +1119,7 @@ class TimelinePopupMenu(wx.Menu): self.view_datatype_menu = wx.Menu() self.AppendSubMenu(self.view_datatype_menu, 'View (by datatype)...', 'View message detail') - topics_by_datatype = self.timeline.get_topics_by_datatype() + topics_by_datatype = self.timeline.topics_by_datatype for datatype in sorted(topics_by_datatype): # View... / datatype