rosbag: cleaning up Python unit tests
This commit is contained in:
parent
bc78523fae
commit
8f9a94879d
|
@ -19,8 +19,8 @@ class TestRosbag(unittest.TestCase):
|
|||
def setUp(self):
|
||||
pass
|
||||
|
||||
def test_opening_stream(self):
|
||||
f = open('test.bag', 'w')
|
||||
def test_opening_stream_works(self):
|
||||
f = open('/tmp/test_opening_stream_works.bag', 'w')
|
||||
b = rosbag.Bag(f, 'w')
|
||||
for i in range(10):
|
||||
msg = Int32()
|
||||
|
@ -28,46 +28,37 @@ class TestRosbag(unittest.TestCase):
|
|||
b.write('/int', msg)
|
||||
b.close()
|
||||
|
||||
f = open('test.bag', 'r')
|
||||
f = open('/tmp/test_opening_stream_works.bag', 'r')
|
||||
b = rosbag.Bag(f, 'r')
|
||||
self.assert_(len(list(b.readMessages())) == 10)
|
||||
b.close()
|
||||
|
||||
def test_invalid_bag_arguments_fails(self):
|
||||
f = '/tmp/test_invalid_bad_arguments_fails.bag'
|
||||
|
||||
def fn1(): rosbag.Bag('')
|
||||
def fn2(): rosbag.Bag(None)
|
||||
def fn3(): rosbag.Bag('test.bag', 'z')
|
||||
def fn4(): rosbag.Bag('test.bag', 'r', compression='foobar')
|
||||
def fn5(): rosbag.Bag('test.bag', 'r', chunk_threshold=-1000)
|
||||
def fn3(): rosbag.Bag(f, 'z')
|
||||
def fn4(): rosbag.Bag(f, 'r', compression='foobar')
|
||||
def fn5(): rosbag.Bag(f, 'r', chunk_threshold=-1000)
|
||||
for fn in [fn1, fn2, fn3, fn4, fn5]:
|
||||
self.failUnlessRaises(ValueError, fn)
|
||||
|
||||
def test_io_on_close_fails(self):
|
||||
def fn():
|
||||
b = rosbag.Bag('test.bag', 'w')
|
||||
b = rosbag.Bag('/tmp/test_io_close_fails.bag', 'w')
|
||||
b.close()
|
||||
size = b.size()
|
||||
self.failUnlessRaises(ValueError, fn)
|
||||
|
||||
def test_write_invalid_message_fails(self):
|
||||
def fn():
|
||||
b = rosbag.Bag('test.bag', 'w')
|
||||
b = rosbag.Bag('/tmp/test_write_invalid_message_fails.bag', 'w')
|
||||
b.write(None, None, None)
|
||||
self.failUnlessRaises(ValueError, fn)
|
||||
|
||||
def test_write_non_chronological_fails(self):
|
||||
def fn():
|
||||
b = rosbag.Bag('test.bag', 'w')
|
||||
for i in range(5, 0, -1):
|
||||
msg = Int32()
|
||||
msg.data = i
|
||||
b.write('/ints', msg, roslib.rostime.Time.from_sec(i))
|
||||
b.close()
|
||||
|
||||
self.failUnlessRaises(rosbag.ROSBagException, fn)
|
||||
|
||||
def test_simple_write_uncompressed(self):
|
||||
b = rosbag.Bag('test.bag', 'w')
|
||||
def test_simple_write_uncompressed_works(self):
|
||||
b = rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag', 'w')
|
||||
msg_count = 0
|
||||
for i in range(5, 0, -1):
|
||||
msg = Int32()
|
||||
|
@ -77,16 +68,16 @@ class TestRosbag(unittest.TestCase):
|
|||
msg_count += 1
|
||||
b.close()
|
||||
|
||||
msgs = list(rosbag.Bag('test.bag').readMessages())
|
||||
msgs = list(rosbag.Bag('/tmp/test_simple_write_uncompressed_works.bag').readMessages())
|
||||
|
||||
self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
|
||||
|
||||
for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
|
||||
self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
|
||||
|
||||
def test_large_write(self):
|
||||
def test_large_write_works(self):
|
||||
for compression in [rosbag.Compression.NONE, rosbag.Compression.BZ2]:
|
||||
b = rosbag.Bag('large_write.bag', 'w', compression=compression)
|
||||
b = rosbag.Bag('/tmp/test_large_write_works.bag', 'w', compression=compression)
|
||||
msg_count = 0
|
||||
for i in range(10000):
|
||||
msg = Int32()
|
||||
|
@ -96,15 +87,15 @@ class TestRosbag(unittest.TestCase):
|
|||
msg_count += 1
|
||||
b.close()
|
||||
|
||||
msgs = list(rosbag.Bag('large_write.bag').readMessages())
|
||||
msgs = list(rosbag.Bag('/tmp/test_large_write_works.bag').readMessages())
|
||||
|
||||
self.assert_(len(msgs) == msg_count, 'not all messages written: expected %d, got %d' % (msg_count, len(msgs)))
|
||||
|
||||
for (_, _, t1), (_, _, t2) in zip(msgs, msgs[1:]):
|
||||
self.assert_(t1 < t2, 'messages returned unordered: got timestamp %s before %s' % (str(t1), str(t2)))
|
||||
|
||||
def test_get_messages_time_range(self):
|
||||
b = rosbag.Bag('timing.bag', 'w')
|
||||
def test_get_messages_time_range_works(self):
|
||||
b = rosbag.Bag('/tmp/test_get_messages_time_range_works.bag', 'w')
|
||||
for i in range(30):
|
||||
msg = Int32()
|
||||
msg.data = i
|
||||
|
@ -115,7 +106,7 @@ class TestRosbag(unittest.TestCase):
|
|||
start_time = roslib.rostime.Time.from_sec(3)
|
||||
end_time = roslib.rostime.Time.from_sec(7)
|
||||
|
||||
self.assert_(len(list(rosbag.Bag('timing.bag').readMessages(start_time=start_time, end_time=end_time))) == 5)
|
||||
self.assert_(len(list(rosbag.Bag('/tmp/test_get_messages_time_range_works.bag').readMessages(start_time=start_time, end_time=end_time))) == 5)
|
||||
|
||||
if __name__ == '__main__':
|
||||
import rostest
|
||||
|
|
Loading…
Reference in New Issue