Python implementation, fist cut

This commit is contained in:
James Bowman 2009-10-09 16:34:09 +00:00
parent df13cd9f75
commit 91226c01c9
6 changed files with 495 additions and 0 deletions

View File

@ -0,0 +1,204 @@
# -*- coding: utf-8 -*-
#
# message_filters documentation build configuration file, created by
# sphinx-quickstart on Mon Jun 1 14:21:53 2009.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import roslib
roslib.load_manifest('message_filters')
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.append(os.path.abspath('.'))
# -- General configuration -----------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.intersphinx', 'sphinx.ext.pngmath']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'message_filters'
copyright = u'2009, Willow Garage, Inc.'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '0.1'
# The full version, including alpha/beta/rc tags.
release = '0.1.0'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of documents that shouldn't be included in the build.
#unused_docs = []
# List of directories, relative to source directory, that shouldn't be searched
# for source files.
exclude_trees = ['_build']
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
html_theme = 'default'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_use_modindex = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = ''
# Output file base name for HTML help builder.
htmlhelp_basename = 'tfdoc'
# -- Options for LaTeX output --------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'message_filters.tex', u'stereo\\_utils Documentation',
u'James Bowman', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_use_modindex = True
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
'http://docs.python.org/': None,
'http://opencv.willowgarage.com/documentation/': None,
'http://opencv.willowgarage.com/documentation/python/': None,
'http://docs.scipy.org/doc/numpy' : None
}

View File

@ -0,0 +1,59 @@
Message Filters --- chained message processing
==============================================
:mod:`message_filters` is a collection of message "filters" which take messages in,
either from a ROS subscription or another filter,
and may or may not output the message
at some time in the future, depending on a policy defined for that filter.
message_filters also defines a common interface for these filters, allowing you to chain them together.
The filters currently implemented in this package are:
* :class:`message_filters.Subscriber` - A source filter, which wraps a ROS subscription. Most filter chains will begin with a Subscriber.
* :class:`message_filters.Cache` - Caches messages which pass through it, allowing later lookup by time stamp.
* :class:`message_filters.TimeSynchronizer` - Synchronizes multiple messages by their timestamps, only passing them through when all have arrived.
* :class:`message_filters.TimeSequencer` - Tries to pass messages through ordered by their timestamps, even if some arrive out of order.
Here's a simple example of using a Subscriber with a Cache::
def myCallback(posemsg):
print posemsg
sub = message_filters.Subscriber("pose_topic", robot_msgs.msg.Pose)
cache = message_filters.Cache(sub, 10)
cache.registerCallback(myCallback)
The Subscriber here acts as the source of messages. Each message is passed to the cache, which then passes it through to the
user's callback ``myCallback``.
The message filter interface
----------------------------
For an object to be usable as a message filter, it needs to have one method,
``registerCallback``. To collect messages from a message filter, register a callback with::
anyfilter.registerCallback(my_callback)
The signature of ``my_callback`` varies according to the message filter. For many filters it is simply::
def my_callback(msg):
where ``msg`` is the message.
Message filters that accept input from an upstream
message filter (e.g. :class:`message_filters.Cache`) register their own
message handler as a callback.
Output connections are registered through the ``registerCallback()`` function.
.. automodule:: message_filters
:members: Subscriber, Cache, TimeSynchronizer, TimeSequencer
:inherited-members:
Indices and tables
==================
* :ref:`genindex`
* :ref:`search`

View File

@ -4,8 +4,11 @@
<license>BSD</license>
<review status="API cleared" notes=""/>
<depend package="roscpp" />
<depend package="rospy" />
<depend package="rostest" />
<export>
<cpp cflags="-I${prefix}/include" lflags="-Wl,-rpath,${prefix}/lib -L${prefix}/lib -lmessage_filters `rosboost-cfg --lflags thread,signals`" />
<rosdoc config="rosdoc.yaml" />
</export>
<url>http://pr.willowgarage.com/wiki/message_filters</url>
</package>

View File

@ -0,0 +1,7 @@
- builder: sphinx
name: Python API
output_dir: python
- builder: doxygen
name: C++ API
output_dir: c++
file_patterns: '*.c *.cpp *.h *.cc *.hh *.dox'

View File

@ -0,0 +1,138 @@
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Message Filter Objects
======================
"""
class SimpleFilter:
def __init__(self):
self.callbacks = {}
def registerCallback(self, cb, *args):
"""
Register a callback function `cb` to be called when this filter
has output.
The filter calls the function ``cb`` with a filter-dependent list of arguments,
followed by the call-supplied arguments ``args``.
"""
conn = len(self.callbacks)
self.callbacks[conn] = (cb, args)
return conn
def signalMessage(self, *msg):
for (cb, args) in self.callbacks.values():
cb(*(msg + args))
class Subscriber(SimpleFilter):
"""
ROS subscription filter. Identical arguments as :class:`rospy.Subscriber`.
This class acts as a highest-level filter, simply passing messages
from a ROS subscription through to the filters which have connected
to it.
"""
def __init__(self, *args):
SimpleFilter.__init__(self)
self.topic = args[0]
self.sub = rospy.Subscriber(*args, **{"callback" : self.callback})
def callback(self, msg):
self.signalMessage(msg)
def getTopic(self):
return self.topic
class Cache(SimpleFilter):
"""
Stores a time history of messages.
Given a stream of messages, the most recent ``cache_size`` messages
are cached in a ring buffer, from which time intervals of the cache
can then be retrieved by the client.
"""
def __init__(self, f, cache_size = 1):
SimpleFilter.__init__(self)
self.connectInput(f)
self.cache_size = cache_size
def connectInput(self, f):
self.incoming_connection = f.registerCallback(self.add)
def add(self, msg):
# Add msg to cache... XXX TODO
self.signalMessage(msg)
class TimeSynchronizer(SimpleFilter):
"""
Synchronizes messages by their timestamps.
:class:`TimeSynchronizer` synchronizes incoming message filters by the
timestamps contained in their messages' headers. TimeSynchronizer
listens on multiple input message filters ``fs``, and invokes the callback
when it has a collection of messages with matching timestamps.
The signature of the callback function is::
def callback(msg1, ... msgN):
where N is the number of input message filters, and each message is
the output of the corresponding filter in ``fs``.
The required ``queue size`` parameter specifies how many sets of
messages it should store from each input filter (by timestamp)
while waiting for messages to arrive and complete their "set".
"""
def __init__(self, fs, queue_size):
SimpleFilter.__init__(self)
self.connectInput(fs)
self.queue_size = queue_size
def connectInput(self, fs):
self.queues = [{} for f in fs]
self.input_connections = [f.registerCallback(self.add, q) for (f, q) in zip(fs, self.queues)]
def add(self, msg, my_queue):
my_queue[msg.header.stamp] = msg
while len(my_queue) > self.queue_size:
del my_queue[min(my_queue)]
# common is the set of timestamps that occur in all queues
common = reduce(set.intersection, [set(q) for q in self.queues])
for t in sorted(common):
# msgs is list of msgs (one from each queue) with stamp t
msgs = [q[t] for q in self.queues]
self.signalMessage(*msgs)
for q in self.queues:
del q[t]

View File

@ -0,0 +1,84 @@
# image_transport::SubscriberFilter wide_left; // "/wide_stereo/left/image_raw"
# image_transport::SubscriberFilter wide_right; // "/wide_stereo/right/image_raw"
# message_filters::Subscriber<CameraInfo> wide_left_info; // "/wide_stereo/left/camera_info"
# message_filters::Subscriber<CameraInfo> wide_right_info; // "/wide_stereo/right/camera_info"
# message_filters::TimeSynchronizer<Image, CameraInfo, Image, CameraInfo> wide;
#
# PersonDataRecorder() :
# wide_left(nh_, "/wide_stereo/left/image_raw", 10),
# wide_right(nh_, "/wide_stereo/right/image_raw", 10),
# wide_left_info(nh_, "/wide_stereo/left/camera_info", 10),
# wide_right_info(nh_, "/wide_stereo/right/camera_info", 10),
# wide(wide_left, wide_left_info, wide_right, wide_right_info, 4),
#
# wide.registerCallback(boost::bind(&PersonDataRecorder::wideCB, this, _1, _2, _3, _4));
import roslib
roslib.load_manifest('message_filters')
import rostest
import rospy
import random
import unittest
from message_filters import SimpleFilter, Subscriber, Cache, TimeSynchronizer
class MockHeader:
pass
class MockMessage:
def __init__(self, stamp, data):
self.header = MockHeader()
self.header.stamp = stamp
self.data = data
class MockFilter(SimpleFilter):
pass
class TestDirected(unittest.TestCase):
def cb_collector_2msg(self, msg1, msg2):
self.collector.append((msg1, msg2))
def test_synchronizer(self):
m0 = MockFilter()
m1 = MockFilter()
ts = TimeSynchronizer([m0, m1], 1)
ts.registerCallback(self.cb_collector_2msg)
if 0:
# Simple case, pairs of messages, make sure that they get combined
for t in range(10):
self.collector = []
msg0 = MockMessage(t, 33)
msg1 = MockMessage(t, 34)
m0.signalMessage(msg0)
self.assertEqual(self.collector, [])
m1.signalMessage(msg1)
self.assertEqual(self.collector, [(msg0, msg1)])
# Scramble sequences of length N. Make sure that TimeSequencer recombines them.
random.seed(0)
for N in range(1, 10):
m0 = MockFilter()
m1 = MockFilter()
seq0 = [MockMessage(t, random.random()) for t in range(N)]
seq1 = [MockMessage(t, random.random()) for t in range(N)]
# random.shuffle(seq0)
ts = TimeSynchronizer([m0, m1], N)
ts.registerCallback(self.cb_collector_2msg)
self.collector = []
for msg in random.sample(seq0, N):
m0.signalMessage(msg)
self.assertEqual(self.collector, [])
for msg in random.sample(seq1, N):
m1.signalMessage(msg)
self.assertEqual(set(self.collector), set(zip(seq0, seq1)))
if __name__ == '__main__':
if 0:
rostest.unitrun('message_filters', 'directed', TestDirected)
else:
suite = unittest.TestSuite()
suite.addTest(TestDirected('test_synchronizer'))
unittest.TextTestRunner(verbosity=2).run(suite)