rxbag: recorder now understands topics, and --regex, --all and --limit options; uses separate thread for writing to disk

This commit is contained in:
Tim Field 2010-05-24 07:55:01 +00:00
parent 42910e51e0
commit c7ff51f457
1 changed files with 130 additions and 37 deletions

View File

@ -29,14 +29,18 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
Recorder subscribes to ROS messages and writes them to a bag file.
"""
from __future__ import with_statement
PKG = 'rxbag'
import roslib; roslib.load_manifest(PKG)
import Queue
import re
import threading
import time
@ -46,30 +50,52 @@ import rospy
import sys
class Recorder(threading.Thread):
def __init__(self, filename, bag_lock=None, master_check_interval=5):
class Recorder(object):
def __init__(self, filename, bag_lock=None, all=True, topics=[], regex=False, limit=0, master_check_interval=1.0):
"""
Subscribe to ROS messages and record them to a bag file.
@param filename: filename of bag to write to
@type filename: str
@param master_check_interval: period to check master for new topic publications [default: 5]
@param all: all topics are to be recorded [default: True]
@type all: bool
@param topics: topics (or regexes if regex is True) to record [default: empty list]
@type topics: list of str
@param regex: topics should be considered as regular expressions [default: False]
@type regex: bool
@param limit: record only this number of messages on each topic (if non-positive, then unlimited) [default: 0]
@type limit: int
@param master_check_interval: period (in seconds) to check master for new topic publications [default: 1]
@type master_check_interval: float
"""
threading.Thread.__init__(self, target=self._run)
self._all = all
self._topics = topics
self._regex = regex
self._limit = limit
self._master_check_interval = master_check_interval
self._bag = rosbag.Bag(filename, 'w')
self._bag_lock = bag_lock if bag_lock else threading.Lock()
self._listeners = []
self._subscriber_helpers = {}
self._limited_topics = set()
self._failed_topics = set()
self._last_update = time.time()
self._write_queue = Queue.Queue()
self._paused = False
self._stop_condition = threading.Condition()
self._stop_flag = False
self._stop_condition = threading.Condition()
self._stop_flag = False
# Compile regular expressions
if self._regex:
self._regexes = [re.compile(t) for t in self._topics]
else:
self._regexes = None
self.setDaemon(False)
self._message_count = {} # topic -> int (track number of messages recorded on each topic)
self._master_check_thread = threading.Thread(target=self._run_master_check)
self._write_thread = threading.Thread(target=self._run_write)
@property
def bag(self): return self._bag
@ -82,44 +108,66 @@ class Recorder(threading.Thread):
"""
self._listeners.append(listener)
def start(self):
"""
Start subscribing and recording messages to bag.
"""
self._master_check_thread.start()
self._write_thread.start()
@property
def paused(self): return self._paused
def pause(self): self._paused = True
def unpause(self): self._paused = False
def toggle_paused(self): self._paused = not self._paused
def stop(self):
"""
Stop recording.
"""
self._stop_condition.acquire()
self._stop_flag = True
self._stop_condition.notify()
self._stop_condition.release()
with self._stop_condition:
self._stop_flag = True
self._stop_condition.notify_all()
self._write_queue.put(self)
## Implementation
def _run(self):
def _run_master_check(self):
master = rosgraph.masterapi.Master('rxbag.recorder')
try:
while not self._stop_flag:
# Subscribe to any new topics
# Check for new topics
for topic, datatype in master.getPublishedTopics(''):
if topic not in self._subscriber_helpers and topic not in self._failed_topics:
print 'Found new topic: %s [%s]' % (topic, datatype)
# Check if:
# the topic is already subscribed to, or
# we've failed to subscribe to it already, or
# we've already reached the message limit, or
# we don't want to subscribe
if topic in self._subscriber_helpers or topic in self._failed_topics or topic in self._limited_topics or not self._should_subscribe_to(topic):
continue
try:
pytype = roslib.message.get_message_class(datatype)
try:
pytype = roslib.message.get_message_class(datatype)
self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
except Exception, ex:
print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex))
self._failed_topics.add(topic)
self._message_count[topic] = 0
self._subscriber_helpers[topic] = _SubscriberHelper(self, topic, pytype)
except Exception, ex:
print >> sys.stderr, 'Error subscribing to %s (ignoring): %s' % (topic, str(ex))
self._failed_topics.add(topic)
# Wait a while
self._stop_condition.acquire()
self._stop_condition.wait(self._master_check_interval)
except Exception, ex:
print >> sys.stderr, 'Error recording to bag: %s' % str(ex)
# Unsubscribe from all topics
for subscriber_helper in self._subscriber_helpers.values():
subscriber_helper.subscriber.unregister()
for topic in list(self._subscriber_helpers.keys()):
self._unsubscribe(topic)
# Close the bag file so that the index gets written
try:
@ -127,15 +175,60 @@ class Recorder(threading.Thread):
except Exception, ex:
print >> sys.stderr, 'Error closing bag [%s]: %s' % (self._bag.filename, str(ex))
def _record(self, topic, m):
# Write the message to the bag
t = roslib.rostime.Time.from_sec(time.time())
with self._bag_lock:
self._bag.write(topic, m, t)
def _should_subscribe_to(self, topic):
if self._all:
return True
if not self._regex:
return topic in self._topics
# Notify listeners that a message has been recorded
for listener in self._listeners:
listener(topic, m, t)
for regex in self._regexes:
if regex.match(topic):
return True
return False
def _unsubscribe(self, topic):
try:
self._subscriber_helpers[topic].subscriber.unregister()
except Exception:
return
del self._subscriber_helpers[topic]
def _record(self, topic, m):
if self._paused:
return
if self._limit and self._message_count[topic] >= self._limit:
self._limited_topics.add(topic)
self._unsubscribe(topic)
return
self._write_queue.put((topic, m, rospy.get_rostime()))
self._message_count[topic] += 1
def _run_write(self):
try:
while not self._stop_flag:
# Wait for a message
item = self._write_queue.get()
if item == self:
continue
topic, m, t = item
# Write to the bag
with self._bag_lock:
self._bag.write(topic, m, t)
# Notify listeners that a message has been recorded
for listener in self._listeners:
listener(topic, m, t)
except Exception, ex:
print >> sys.stderr, 'Error write to bag: %s' % str(ex)
class _SubscriberHelper(object):
def __init__(self, recorder, topic, pytype):