roslib: #1607 unit tests for fill_message_args, accompanied by big rewrite of fill_message_args

This commit is contained in:
Ken Conley 2009-10-02 21:00:26 +00:00
parent b80ae28587
commit 393db6dfff
4 changed files with 208 additions and 49 deletions

View File

@ -34,6 +34,7 @@
import cStringIO
import math
import itertools
import struct
import types
@ -338,6 +339,36 @@ def get_printable_message_args(msg, buff=None, prefix=''):
buff.write(prefix+f+' ')
return buff.getvalue().rstrip()
def _fill_val(msg, f, v, prefix):
if not f in msg.__slots__:
raise ROSMessageException("No field name [%s%s]"%(prefix, f))
def_val = getattr(msg, f)
if isinstance(def_val, Message) or isinstance(def_val, roslib.rostime._TVal):
_fill_message_args(def_val, v, prefix=(prefix+f+'.'))
elif type(def_val) == list:
if not type(v) == list:
raise ROSMessageException("Field [%s%s] must be a list instead of: %s"%(prefix, f, v))
# determine base_type of field by looking at _slot_types
idx = msg.__slots__.index(f)
t = msg._slot_types[idx]
base_type = roslib.msgs.base_msg_type(t)
# - for primitives, we just directly set (we don't
# type-check. we rely on serialization type checker)
if base_type in roslib.msgs.PRIMITIVE_TYPES:
setattr(msg, f, v)
# - for complex types, we have to iteratively append to def_val
else:
list_msg_class = get_message_class(base_type)
for el in v:
inner_msg = list_msg_class()
_fill_message_args(inner_msg, el, prefix)
def_val.append(inner_msg)
else:
#print "SET2", f, v
setattr(msg, f, v)
def _fill_message_args(msg, msg_args, prefix=''):
"""
Populate message with specified args.
@ -355,69 +386,44 @@ def _fill_message_args(msg, msg_args, prefix=''):
raise ROSMessageException("msg must be a Message instance: %s"%msg)
if type(msg_args) == dict:
#print "DICT ARGS", msg_args
#print "ACTIVE SLOTS",msg.__slots__
for f, v in msg_args.iteritems():
if not f in msg.__slots__:
raise ROSMessageException("No field name [%s%s]"%(prefix, f))
def_val = getattr(msg, f)
if isinstance(def_val, Message) or isinstance(def_val, roslib.rostime._TVal):
leftovers = _fill_message_args(def_val, v, prefix=(prefix+f+'.'))
if leftovers:
raise ROSMessageException("Too many arguments for field [%s%s]: %s"%(prefix, f, v))
elif type(def_val) == list:
if not type(v) == list:
raise ROSMessageException("Field [%s%s] must be a list instead of: %s"%(prefix, f, v))
idx = msg.__slots__.index(f)
t = msg._slot_types[idx]
base_type = roslib.msgs.base_msg_type(t)
if base_type in roslib.msgs.PRIMITIVE_TYPES:
setattr(msg, f, v)
else:
list_msg_class = get_message_class(base_type)
for el in v:
print "EL", el, list_msg_class
inner_msg = list_msg_class()
_fill_message_args(inner_msg, el, prefix)
def_val.append(inner_msg)
else:
setattr(msg, f, v)
_fill_val(msg, f, v, prefix)
elif type(msg_args) == list:
#print "LIST ARGS", msg_args
#print "ACTIVE SLOTS",msg.__slots__
for f in msg.__slots__:
if not msg_args:
raise ROSMessageException("Not enough arguments to fill message, trying to fill %s. Args are %s"%(f, str(msg_args)))
def_val = getattr(msg, f)
if isinstance(def_val, Message) or isinstance(def_val, roslib.rostime._TVal):
# if the next msg_arg is a dictionary, then we can assume the dictionary itself represents the message
if type(msg_args[0]) == dict:
next_ = msg_args[0]
msg_args = msg_args[1:]
_fill_message_args(def_val, next_, prefix=(prefix+f+'.'))
else:
msg_args = _fill_message_args(def_val, msg_args, prefix=(prefix+f+'.'))
else:
next_ = msg_args[0]
msg_args = msg_args[1:]
if type(next_) == dict:
raise ROSMessageException("received dictionary for non-message field[%s%s]: %s"%(prefix, f, next_))
else:
setattr(msg, f, next_)
return msg_args
if len(msg_args) > len(msg.__slots__):
raise ROSMessageException("Too many arguments for field [%s %s]: %s"%(prefix, msg, msg_args))
elif len(msg_args) < len(msg.__slots__):
raise ROSMessageException("Not enough arguments for field [%s %s]: %s"%(prefix, msg, msg_args))
for f, v in itertools.izip(msg.__slots__, msg_args):
_fill_val(msg, f, v, prefix)
else:
raise ROSMessageException("invalid message_args type: %s"%msg_args)
raise ROSMessageException("invalid message_args type: %s"%str(msg_args))
def fill_message_args(msg, msg_args):
"""
Populate message with specified args.
Populate message with specified args. Args are assumed to be a
list of arguments from a command-line YAML parser. See
http://www.ros.org/wiki/ROS/YAMLCommandLine for specification on
how messages are filled.
@param msg: message to fill
@type msg: Message
@param msg_args: list of arguments to set fields to
@type msg_args: [args]
@raise ROSMessageException: if not enough/too many message arguments to fill message
"""
leftovers = _fill_message_args(msg, msg_args, '')
if leftovers:
raise ROSMessageException("received too many arguments for message")
if len(msg_args) == 1 and type(msg_args[0]) == dict:
# according to spec, if we only get one msg_arg and it's a dictionary, we
# use it directly
_fill_message_args(msg, msg_args[0], '')
else:
_fill_message_args(msg, msg_args, '')

View File

@ -0,0 +1,5 @@
time t
duration d
std_msgs/String str_msg
std_msgs/String[] str_msg_array
int32 i32

View File

@ -0,0 +1,4 @@
int32 i32
string str
int32[] i32_array
bool b

View File

@ -261,6 +261,150 @@ d: 123000000456""", strify_message(M5(Time(987, 654), Duration(123, 456))))
self.assertEquals(roslib.msg.Header, get_message_class('roslib/Header'))
self.assertEquals(roslib.msg.Log, get_message_class('roslib/Log'))
def test_fill_message_args_embed_time(self):
from roslib.rostime import Time, Duration
from roslib.message import fill_message_args
from test_roslib.msg import FillEmbedTime
# test fill_message_args with embeds and time vals
# time t
# duration d
# std_msgs/String str_msg
# std_msgs/String[] str_msg_array
# int32 i32
tests = [
]
m = FillEmbedTime()
fill_message_args(m, [{}])
self.assertEquals(m.t, Time())
self.assertEquals(m.d, Duration())
self.assertEquals(m.str_msg.data, '')
self.assertEquals(m.str_msg_array, [])
self.assertEquals(m.i32, 0)
# list tests
# - these should be equivalent
equiv = [
[[10, 20], [30, 40], ['foo'], [['bar'], ['baz']], 32],
[{'secs': 10, 'nsecs': 20}, {'secs': 30, 'nsecs': 40}, ['foo'], [['bar'], ['baz']], 32],
[[10, 20], [30, 40], {'data': 'foo'}, [['bar'], ['baz']], 32],
[[10, 20], [30, 40], ['foo'], [{'data': 'bar'}, {'data': 'baz'}], 32],
[{'t': [10, 20], 'd': [30, 40], 'str_msg': {'data': 'foo'}, 'str_msg_array': [{'data': 'bar'}, {'data': 'baz'}], 'i32': 32}],
]
for test in equiv:
m = FillEmbedTime()
try:
fill_message_args(m, test)
except Exception, e:
import traceback
self.fail("failed to fill with : %s\n%s"%(str(test), traceback.format_exc()))
self.assertEquals(m.t, Time(10, 20))
self.assertEquals(m.d, Duration(30, 40))
self.assertEquals(m.str_msg.data, 'foo')
self.assertEquals(len(m.str_msg_array), 2, m.str_msg_array)
self.assertEquals(m.str_msg_array[0].data, 'bar')
self.assertEquals(m.str_msg_array[1].data, 'baz')
self.assertEquals(m.i32, 32)
bad = [
# underfill in sub-args
[[10, 20], [30, 40], ['foo'], [['bar'], ['baz']]],
[[10], [30, 40], ['foo'], [['bar'], ['baz']], 32],
[[10, 20], [30], ['foo'], [['bar'], ['baz']], 32],
[[10, 20], [30, 40], [], [['bar'], ['baz']], 32],
[[10, 20], [30, 40], ['foo'], [['bar'], []], 32],
# overfill
[[10, 20], [30, 40], ['foo'], [['bar'], ['baz']], 32, 64],
[[10, 20, 30], [30, 40], ['foo'], [['bar'], ['baz']], 32],
[[10, 20], [30, 40, 50], ['foo'], [['bar'], ['baz']], 32],
[[10, 20], [30, 40], ['foo', 'bar'], [['bar'], ['baz']], 32],
[[10, 20], [30, 40], ['foo'], [['bar', 'baz'], ['baz']], 32],
[[10, 20], [30, 40], ['foo'], [['bar'], ['baz', 'car']], 32],
# invalid fields
[{'secs': 10, 'nsecs': 20, 'foo': 1}, {'secs': 30, 'nsecs': 40}, ['foo'], [['bar'], ['baz']], 32],
[{'secs': 10, 'nsecs': 20}, {'secs': 30, 'nsecs': 40, 'foo': 1}, ['foo'], [['bar'], ['baz']], 32],
[[10, 20], [30, 40], {'data': 'foo', 'fata': 1}, [['bar'], ['baz']], 32],
[[10, 20], [30, 40], ['foo'], [{'data': 'bar'}, {'beta': 'baz'}], 32],
[{'t': [10, 20], 'd': [30, 40], 'str_msg': {'data': 'foo'}, 'str_msg_array': [{'data': 'bar'}, {'data': 'baz'}], 'i32': 32, 'i64': 64}],
]
for b in bad:
failed = True
try:
m = FillEmbedTime()
fill_message_args(m, b)
except roslib.message.ROSMessageException:
failed = False
self.failIf(failed, "fill_message_args should have failed: %s"%str(b))
def test_fill_message_args_simple(self):
from roslib.message import fill_message_args
from test_roslib.msg import FillSimple
#int32 i32
#string str
#int32[] i32_array
#bool b
simple_tests = [
[1, 'foo', [], True],
[1, 'foo', [1, 2, 3, 4], False],
]
for test in simple_tests:
m = FillSimple()
fill_message_args(m, test)
self.assertEquals(m.i32, test[0])
self.assertEquals(m.str, test[1])
self.assertEquals(m.i32_array, test[2])
self.assertEquals(m.b, test[3])
# test with dictionaries
m = FillSimple()
fill_message_args(m, [{}])
self.assertEquals(m.i32, 0)
self.assertEquals(m.str, '')
self.assertEquals(m.i32_array, [])
self.assertEquals(m.b, False)
m = FillSimple()
fill_message_args(m, [{'i32': 10}])
self.assertEquals(m.i32, 10)
self.assertEquals(m.str, '')
self.assertEquals(m.i32_array, [])
self.assertEquals(m.b, False)
m = FillSimple()
fill_message_args(m, [{'str': 'hello', 'i32_array': [1, 2, 3]}])
self.assertEquals(m.i32, 0)
self.assertEquals(m.str, 'hello')
self.assertEquals(m.i32_array, [1, 2, 3])
self.assertEquals(m.b, False)
# fill_message_args currently does not type check
bad = [
# extra key
[{'bad': 1, 'str': 'hello', 'i32_array': [1, 2, 3]}],
# underfill
[1, 'foo', [1, 2, 3]],
# overfill
[1, 'foo', [1, 2, 3], True, 1],
]
for b in bad:
failed = True
try:
m = FillSimple()
fill_message_args(m, b)
except roslib.message.ROSMessageException:
failed = False
self.failIf(failed, "fill_message_args should have failed: %s"%str(b))
def test_get_service_class(self):
from roslib.message import get_service_class