to ros_comm

This commit is contained in:
Tully Foote 2010-10-26 21:12:25 +00:00
parent 7864cf084a
commit 44421dfa8b
30 changed files with 0 additions and 3369 deletions

View File

@ -1,9 +0,0 @@
cmake_minimum_required(VERSION 2.4.6)
include($ENV{ROS_ROOT}/core/rosbuild/rosbuild.cmake)
rosbuild_init()
execute_process(COMMAND rm -f ${PROJECT_SOURCE_DIR}/src/rostest/rostest.pyc)
rosbuild_add_gtest(test/test_permuter test/test_permuter.cpp)
rosbuild_add_rostest(test/hztest0.test)
rosbuild_add_rostest(test/hztest.test)

View File

@ -1,13 +0,0 @@
include $(shell rospack find mk)/cmake.mk
#OUT = ../../bin/rostest
#
#all: $(OUT)
#
#$(OUT): bin/rostest
# cp bin/rostest $(OUT)
#
#clean:
# -rm $(OUT)
# -rm src/rostest/*.pyc
#

View File

@ -1,63 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
## Catunit is a simple script that takes all the xml-formatted unit
## test output in ROS_ROOT/test/test_results and aggregates them
## into a single XML file
import roslib; roslib.load_manifest('rostest')
import sys
import os
import rostest.xmlresults as xmlr
def main():
print "[catunit]: STARTING"
if len(sys.argv) != 2:
print >> sys.stderr, "usage: catunit output-file"
sys.exit(1)
output_file = sys.argv[1]
print "[catunit]: writing aggregated test results to %s"%os.path.abspath(output_file)
result = xmlr.read_all()
f = open(output_file, 'w')
try:
f.write(result.xml().encode('utf-8'))
finally:
f.close()
print "[catunit]: FINISHED"
if __name__ == '__main__':
main()

View File

@ -1,98 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
Cleanunit is a simple script that takes all the xml-formatted unit
test output in test_results and aggregates them into
test_results/_hudson. In this process, it strips any characters that
tend to cause Hudson trouble.
"""
from __future__ import with_statement
import roslib; roslib.load_manifest('rostest')
import sys
import os
import roslib.rosenv
import rostest.xmlresults as xmlr
def prepare_dirs(output_dir_name):
test_results_dir = roslib.rosenv.get_test_results_dir()
print "will read test results from", test_results_dir
output_dir = os.path.join(test_results_dir, output_dir_name)
if not os.path.exists(output_dir):
print "creating directory", output_dir
os.makedirs(output_dir)
return test_results_dir, output_dir
## read results from \a test_results_dir and write them into \a output_dir
def clean_results(test_results_dir, output_dir, filter):
for d in os.listdir(test_results_dir):
if filter and d in filter:
continue
print "looking at", d
test_dir = os.path.join(test_results_dir, d)
if not os.path.isdir(test_dir):
continue
base_test_name = os.path.basename(test_dir)
# for each test result that a package generated, read it, then
# rewrite it to our output directory. This will invoke our
# cleaning rules on the XML that protect the result from Hudson
# issues.
for file in os.listdir(test_dir):
if file.endswith('.xml'):
test_name = base_test_name + '.' + file[:-4]
file = os.path.join(test_dir, file)
try:
result = xmlr.read(file, test_name)
output_path = os.path.join(output_dir, "%s.xml"%test_name)
with open(output_path, 'w') as f:
print "re-writing", output_path
f.write(result.xml().encode('utf-8'))
except Exception, e:
print >> sys.stderr, "ignoring [%s]: %s"%(file, e)
def main():
print "[cleanunit]: STARTING"
output_dir_name = '_hudson'
test_results_dir, output_dir = prepare_dirs(output_dir_name)
print "[cleanunit]: writing aggregated test results to %s"%output_dir
clean_results(test_results_dir, output_dir, [output_dir_name, '.svn'])
print "[cleanunit]: FINISHED"
if __name__ == '__main__':
main()

View File

@ -1,86 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
Generate HTML reports from coverage.py (aka python-coverage). This is
currently a no-frills backend tool.
"""
from __future__ import with_statement
import sys
import roslib
try:
import coverage
except ImportError, e:
print >> sys.stderr, "ERROR: cannot import python-coverage, coverage report will not run.\nTo install coverage, run 'easy_install coverage'"
sys.exit(1)
def coverage_html():
import os.path
if not os.path.isfile('.coverage-modules'):
print >> sys.stderr, "No .coverage-modules file; nothing to do"
return
with open('.coverage-modules','r') as f:
modules = [x for x in f.read().split('\n') if x.strip()]
cov = coverage.coverage()
cov.load()
# import everything
for m in modules:
try:
base = m.split('.')[0]
roslib.load_manifest(base)
__import__(m)
except:
print >> sys.stderr, "WARN: cannot import %s"%base
print "Generating for\n"+'\n'.join([" * %s"%m for m in modules])
# load the module instances to pass to coverage so it can generate annotation html reports
mods = []
# TODO: rewrite, buggy
for m in modules:
mods.extend([v for v in sys.modules.values() if v and v.__name__.startswith(m) and not v in mods])
# dump the output to covhtml directory
cov.html_report(mods, directory="covhtml")
if __name__ == '__main__':
coverage_html()

View File

@ -1,115 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
from __future__ import with_statement
import roslib; roslib.load_manifest('rostest')
import os
import sys
import roslib.packages
import roslaunch.rlutil
def usage():
print >> sys.stderr, """Usage:
\troslaunch-check <file|directory> [env=value...]
"""
print sys.argv
sys.exit(os.EX_USAGE)
def check_roslaunch_file(roslaunch_file):
print "checking", roslaunch_file
error_msg = roslaunch.rlutil.check_roslaunch(roslaunch_file)
# error message has to be XML attr safe
if error_msg:
return "[%s]:\n\t%s"%(roslaunch_file,error_msg)
def check_roslaunch_dir(roslaunch_dir):
error_msgs = []
for f in os.listdir(roslaunch_dir):
if f.endswith('.launch'):
roslaunch_file = os.path.join(roslaunch_dir, f)
if os.path.isfile(roslaunch_file):
error_msgs.append(check_roslaunch_file(roslaunch_file))
# error message has to be XML attr safe
return '\n'.join([e for e in error_msgs if e])
## run check and output test result file
if __name__ == '__main__':
if len(sys.argv) < 2:
usage()
roslaunch_path = sys.argv[1]
# #2590: implementing this quick and dirty as this script should only be used by higher level tools
env_vars = sys.argv[2:]
for e in env_vars:
var, val = e.split('=')
os.environ[var] = val
pkg_dir, pkg = roslib.packages.get_dir_pkg(roslaunch_path)
if os.path.isfile(roslaunch_path):
error_msg = check_roslaunch_file(roslaunch_path)
outname = os.path.basename(roslaunch_path).replace('.', '_')
else:
print "checking *.launch in directory", roslaunch_path
error_msg = check_roslaunch_dir(roslaunch_path)
abspath = os.path.abspath
relpath = abspath(roslaunch_path)[len(abspath(pkg_dir))+1:]
outname = relpath.replace(os.sep, '_')
if outname == '.':
outname = '_pkg'
import rostest.rostestutil
test_file = rostest.rostestutil.xmlResultsFile(pkg, "roslaunch_check_"+outname, is_rostest=False)
print "...writing test results to", test_file
test_name = roslaunch_path
if error_msg:
print>> sys.stderr, "FAILURE:\n%s"%error_msg
if not os.path.exists(os.path.dirname(test_file)):
os.makedirs(os.path.dirname(test_file))
with open(test_file, 'w') as f:
message = "roslaunch check [%s] failed"%(roslaunch_path)
f.write(rostest.rostestutil.test_failure_junit_xml(test_name, message, stdout=error_msg))
f.close()
print "wrote test file to [%s]"%test_file
sys.exit(1)
else:
print "passed"
with open(test_file, 'w') as f:
f.write(rostest.rostestutil.test_success_junit_xml(test_name))

View File

@ -1,39 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
# $Author$
import roslib; roslib.load_manifest('rostest')
from rostest.rostest import rostestmain
rostestmain()

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python
import os
import sys
def usage():
print >> sys.stderr, """Usage:
\trostest-check-results test-file.xml
or
\trostest-check-results --rostest pkg-name test-file.xml
"""
print sys.argv
sys.exit(os.EX_USAGE)
## writes a test failure out to test file if it doesn't exist
if __name__ == '__main__':
if len(sys.argv) < 2:
usage()
if '--rostest' in sys.argv[1:]:
if len(sys.argv) != 4:
usage()
test_pkg, test_file = [a for a in sys.argv[1:] if a != '--rostest']
# this logic derives the output filename that rostest uses
import roslib; roslib.load_manifest('rostest')
import roslib.packages
import rostest.rostestutil
pkg_dir, _ = roslib.packages.get_dir_pkg(test_file)
outname = rostest.rostestutil.rostest_name_from_path(pkg_dir, test_file)
test_file = rostest.rostestutil.xmlResultsFile(test_pkg, outname, is_rostest=True)
else:
if len(sys.argv) != 2:
usage()
test_file = sys.argv[1]
print "Checking for test results in %s"%test_file
if not os.path.exists(test_file):
if not os.path.exists(os.path.dirname(test_file)):
os.makedirs(os.path.dirname(test_file))
print "Cannot find results, writing failure results to", test_file
f = open(test_file, 'w')
try:
test_name = os.path.basename(test_file)
d = {'test': test_name, 'test_file': test_file }
f.write("""<?xml version="1.0" encoding="UTF-8"?>
<testsuite tests="1" failures="1" time="1" errors="0" name="%(test)s">
<testcase name="test_ran" status="run" time="1" classname="Results">
<failure message="Unable to find test results for %(test)s, test did not run.\nExpected results in %(test_file)s" type=""/>
</testcase>
</testsuite>"""%d)
finally:
f.close()

View File

@ -1,106 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
## script that prints summary of aggregated test results to console
import roslib; roslib.load_manifest('rostest')
import sys
import cStringIO
import os
import roslib.rospack
import rostest.xmlresults as xmlr
def create_summary(result, packages):
buff = cStringIO.StringIO()
buff.write('-'*80+'\n')
buff.write('\033[1m[AGGREGATED TEST RESULTS SUMMARY]\033[0m\n\n')
errors_failures = [r for r in result.test_case_results if r.errors or r.failures]
if errors_failures:
buff.write('ERRORS/FAILURES:\n')
for tc_result in errors_failures:
buff.write(tc_result.description)
buff.write("PACKAGES: \n%s\n\n"%'\n'.join([" * %s"%p for p in packages]))
buff.write('\nSUMMARY\n')
if (result.num_errors + result.num_failures) == 0:
buff.write("\033[32m * RESULT: SUCCESS\033[0m\n")
else:
buff.write("\033[1;31m * RESULT: FAIL\033[0m\n")
# TODO: still some issues with the numbers adding up if tests fail to launch
# number of errors from the inner tests, plus add in count for tests
# that didn't run properly ('result' object).
buff.write(" * TESTS: %s\n"%result.num_tests)
if result.num_errors:
buff.write("\033[1;31m * ERRORS: %s\033[0m\n"%result.num_errors)
else:
buff.write(" * ERRORS: 0\n")
if result.num_failures:
buff.write("\033[1;31m * FAILURES: %s\033[0m\n"%result.num_failures)
else:
buff.write(" * FAILURES: 0\n")
return buff.getvalue()
def main():
from optparse import OptionParser
parser = OptionParser(usage="usage: rostest-results [options] package")
parser.add_option("--nodeps",
dest="no_deps", default=False,
action="store_true",
help="don't compute test results for the specified package only")
(options, args) = parser.parse_args()
if len(args) != 1:
parser.error("Only one package may be specified")
package = args[0]
if options.no_deps:
packages = [package]
else:
packages = [package] + roslib.rospack.rospackexec(['depends-on', package]).split('\n')
packages = [p for p in packages if p]
result = xmlr.read_all(packages)
print create_summary(result, packages)
if result.num_errors or result.num_failures:
sys.exit(1)
if __name__ == '__main__':
main()

View File

@ -1,39 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""test-results-dir simply prints the directory that test_results are stored in"""
import roslib.rosenv
print roslib.rosenv.get_test_results_dir()

View File

@ -1,7 +0,0 @@
[epydoc]
name: rostest
modules: rostest
inheritance: included
url: http://ros.org/wiki/rostest
frames: no
private: no

View File

@ -1,25 +0,0 @@
<launch>
<node pkg="rospy_demo" type="talker" />
<!--
Tests are nodes that implement one of the xUnit-style test cases
(e.g. googletest, unittest). They must output their results in the
xUnit-style XML format.
You define tests just like a node except you must also specify a
test-name and you can't use a respawn attribute
A roslaunch file may contain multiple tests. They will be run in
the order you specify. Between each test, the entire roslaunch
will be restarted, including the master (your master 'auto'
setting will be overriden during rostest).
-->
<test test-name="listener_test" pkg="test_rospy" type="listener_test" />
<!-- tests are run in the order you specify -->
<!-- test test-name="listener_test2" pkg="test_rospy" type="listener_test" / -->
</launch>

View File

@ -1,171 +0,0 @@
/*
* Copyright (c) 2009, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the Willow Garage, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/** \author Tully Foote */
#ifndef ROSTEST_PERMUTER_H
#define ROSTEST_PERMUTER_H
#include <vector>
#include "boost/thread/mutex.hpp"
namespace rostest
{
/** \brief A base class for storing pointers to generic data types
*/
class PermuteOptionBase
{
public:
virtual void reset() =0;
virtual bool step() =0;
virtual ~PermuteOptionBase() {};
};
/**\brief A class to hold a set of option values and currently used state
* This class holds
*/
template<class T>
class PermuteOption : public PermuteOptionBase
{
public:
PermuteOption(const std::vector<T>& options, T* output)
{
options_ = options;
output_ = output;
reset();
}
virtual ~PermuteOption(){};
void reset(){
boost::mutex::scoped_lock lock(access_mutex_);
current_element_ = options_.begin();
*output_ = *current_element_;
};
bool step()
{
boost::mutex::scoped_lock lock(access_mutex_);
current_element_++;
if (current_element_ == options_.end())
return false;
*output_ = *current_element_;
return true;
};
private:
/// Local storage of the possible values
std::vector<T> options_;
/// The output variable
T* output_;
typedef typename std::vector<T>::iterator V_T_iterator;
/// The last updated element
V_T_iterator current_element_;
boost::mutex access_mutex_;
};
/** \brief A class to provide easy permutation of options
* This class provides a way to collapse independent
* permutations of options into a single loop.
*/
class Permuter
{
public:
/** \brief Destructor to clean up allocated data */
virtual ~Permuter(){ clearAll();};
/** \brief Add a set of values and an output to the iteration
* @param values The set of possible values for this output
* @param output The value to set at each iteration
*/
template<class T>
void addOptionSet(const std::vector<T>& values, T* output)
{
boost::mutex::scoped_lock lock(access_mutex_);
options_.push_back(static_cast<PermuteOptionBase*> (new PermuteOption<T>(values, output)));
lock.unlock();//reset locks on its own
reset();
};
/** \brief Reset the internal counters */
void reset(){
boost::mutex::scoped_lock lock(access_mutex_);
for (unsigned int level= 0; level < options_.size(); level++)
options_[level]->reset();
};
/** \brief Iterate to the next value in the iteration
* Returns true unless done iterating.
*/
bool step()
{
boost::mutex::scoped_lock lock(access_mutex_);
// base case just iterating
for (unsigned int level= 0; level < options_.size(); level++)
{
if(options_[level]->step())
{
//printf("stepping level %d returning true \n", level);
return true;
}
else
{
//printf("reseting level %d\n", level);
options_[level]->reset();
}
}
return false;
};
/** \brief Clear all stored data */
void clearAll()
{
boost::mutex::scoped_lock lock(access_mutex_);
for ( unsigned int i = 0 ; i < options_.size(); i++)
{
delete options_[i];
}
options_.clear();
};
private:
std::vector<PermuteOptionBase*> options_; ///< Store all the option objects
boost::mutex access_mutex_;
};
}
#endif //ROSTEST_PERMUTER_H

View File

@ -1,96 +0,0 @@
/**
\mainpage
\htmlinclude manifest.html
\b rostest consists of a tool for writing integration tests as well as a Python
library that assists in the writing of ROS-based unit tests.
Within ROS, we define three levels of testing:
- Library unit test
- ROS node unit test
- ROS node integration test
rostest is mainly targetted at ROS-node-level tests, though it
provides support for all three. At the library unit test level,
rostest provides a simple hook for generating XML output from Python
unittests. At the ROS-node-level, rostest defines a framework for
launching collections of nodes and making assertions against them.
This rostest framework enables testing of ROS nodes at the message
I/O interface. In unit testing parlance, rostest uses roslaunch files
to define test fixtures. You can think of rostest as a unit test where
the setup and teardown are implemented via roslaunch. During setup,
rostest will set all parameters and launch all nodes in the
roslaunch. During teardown, roslaunch will terminate all nodes and
restart the master/parameter server.
The main features that rostest provides are:
- easy to setup/teardown test fixtures (i.e. nodes, master, parameter server state) using roslaunch
- Python: enable easy unit test writing, including ROS-state assertsions
- aggregate test XML result files for continuous integration servers
rostest is still under development. Some proposed features include:
- manipulate core-level ROS infrastructure like /time
- hooks into /rosout logging
\section codeapi Code API
rostest exports a small client API containing functions for (1)
running unit tests and (2) making assertions against the ROS system
state.
- \ref clientapi
\section rosapi ROS API
rostest exports no ROS API other than those used internally by roslaunch.
\section commandline Command-line tools
rostest has three command-line tools: \b rostest, \b rostest-summary,
and \b catunit. Only \b rostest should be of particular interest to
most users as the rest will be used indirectly..
\subsection rostest rostest
rostest runs a rostest XML file as a unit test. By default, it will
output its XML results in ROS_ROOT/test/test_results/package_name/.
You may also pass it a --text option, in which case it will generate
all of its output to the screen instead.
The ROS CMake macros come with commands that automate the usage of
rostest.
\subsubsection Usage
\verbatim
$ rostest xml-file [--text]
\endverbatim
\par Example
\verbatim
$ rostest test/test-foo.xml
\endverbatim
\subsection catunit catunit
\b catunit concatenates JUnit-style XML result files in ROS_ROOT/test/test_results into a single XML file.
This is designed mainly for usage by continuous integration servers.
\subsubsection Usage
\verbatim
$ ./bin/catunit output-file
\endverbatim
\par Example
\verbatim
$ bin/catunit test-results.xml
\endverbatim
*/

View File

@ -1,29 +0,0 @@
<package>
<description brief="ROS Test Framework">
<p>
Integration test suite based on roslaunch that is compatible with xUnit frameworks.
</p>
<p>
rostest uses xmlrunner.py (Public Domain), which was written by
Sebastian Rittau (srittau@jroger.in-berlin.de) and Paolo Borelli.
</p>
</description>
<author>Ken Conley/kwc@willowgarage.com</author>
<license>BSD</license>
<export>
<cpp cflags="-I${prefix}/include"/>
<rosdoc config="rosdoc.yaml" />
</export>
<review status="Doc reviewed" notes="2010/01/12"/>
<url>http://ros.org/wiki/rostest</url>
<depend package="roslaunch"/>
<depend package="roslib"/>
<platform os="ubuntu" version="9.04"/>
<platform os="ubuntu" version="9.10"/>
<platform os="ubuntu" version="10.04"/>
<platform os="macports" version="macports"/>
</package>

View File

@ -1,238 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
## Integration test node that subscribes to any topic and verifies
## the publishing rate to be within a specified bounds. The following
## parameters must be set:
##
## * ~/hz: expected hz
## * ~/hzerror: errors bound for hz
## * ~/test_duration: time (in secs) to run test
##
import roslib; roslib.load_manifest('rostest')
import sys
import threading
import time
import unittest
import rospy
import rostest
NAME = 'hztest'
from threading import Thread
class HzTest(unittest.TestCase):
def __init__(self, *args):
super(HzTest, self).__init__(*args)
rospy.init_node(NAME)
self.lock = threading.Lock()
self.message_received = False
def setUp(self):
self.errors = []
# Count of all messages received
self.msg_count = 0
# Time of first message received
self.msg_t0 = -1.0
# Time of last message received
self.msg_tn = -1.0
## performs two tests of a node, first with /rostime off, then with /rostime on
def test_hz(self):
# Fetch parameters
try:
# expected publishing rate
hz = float(rospy.get_param('~hz'))
# length of test
test_duration = float(rospy.get_param('~test_duration'))
# topic to test
topic = rospy.get_param('~topic')
# time to wait before
wait_time = rospy.get_param('~wait_time', 20.)
except KeyError, e:
self.fail('hztest not initialized properly. Parameter [%s] not set. debug[%s] debug[%s]'%(str(e), rospy.get_caller_id(), rospy.resolve_name(e.args[0])))
# We only require hzerror if hz is non-zero
hzerror = 0.0
if hz != 0.0:
try:
# margin of error allowed
hzerror = float(rospy.get_param('~hzerror'))
except KeyError, e:
self.fail('hztest not initialized properly. Parameter [%s] not set. debug[%s] debug[%s]'%(str(e), rospy.get_caller_id(), rospy.resolve_name(e.args[0])))
# We optionally check each inter-message interval
try:
self.check_intervals = bool(rospy.get_param('~check_intervals'))
except KeyError, e:
self.check_intervals = False
print """Hz: %s
Hz Error: %s
Topic: %s
Test Duration: %s"""%(hz, hzerror, topic, test_duration)
self._test_hz(hz, hzerror, topic, test_duration, wait_time)
def _test_hz(self, hz, hzerror, topic, test_duration, wait_time):
self.assert_(hz >= 0.0, "bad parameter (hz)")
self.assert_(hzerror >= 0.0, "bad parameter (hzerror)")
self.assert_(test_duration > 0.0, "bad parameter (test_duration)")
self.assert_(len(topic), "bad parameter (topic")
if hz == 0:
self.min_rate = 0.0
self.max_rate = 0.0
self.min_interval = 0.0
self.max_interval = 0.0
else:
self.min_rate = hz - hzerror
self.max_rate = hz + hzerror
self.min_interval = 1.0 / self.max_rate
if self.min_rate <= 0.0:
self.max_interval = 0.0
else:
self.max_interval = 1.0 / self.min_rate
# Start actual test
sub = rospy.Subscriber(topic, rospy.AnyMsg, self.callback)
self.assert_(not self.errors, "bad initialization state (errors)")
print "Waiting for messages"
# we have to wait until the first message is received before measuring the rate
# as time can advance too much before publisher is up
# - give the test 20 seconds to start, may parameterize this in the future
wallclock_timeout_t = time.time() + wait_time
while not self.message_received and time.time() < wallclock_timeout_t:
time.sleep(0.1)
if hz > 0.:
self.assert_(self.message_received, "no messages before timeout")
else:
self.failIf(self.message_received, "message received")
print "Starting rate measurement"
timeout_t = rospy.get_time() + test_duration
while rospy.get_time() < timeout_t:
rospy.sleep(0.1)
print "Done waiting, validating results"
sub.unregister()
# Check that we got at least one message
if hz > 0:
self.assert_(self.msg_count > 0, "no messages received")
else:
self.assertEquals(0, self.msg_count)
# Check whether inter-message intervals were violated (if we were
# checking them)
self.assert_(not self.errors, '\n'.join(self.errors))
# If we have a non-zero rate target, make sure that we hit it on
# average
if hz > 0.0:
self.assert_(self.msg_t0 >= 0.0, "no first message received")
self.assert_(self.msg_tn >= 0.0, "no last message received")
dt = self.msg_tn - self.msg_t0
self.assert_(dt > 0.0, "only one message received")
rate = ( self.msg_count - 1) / dt
self.assert_(rate >= self.min_rate,
"average rate (%.3fHz) exceeded minimum (%.3fHz)" %
(rate, self.min_rate))
self.assert_(rate <= self.max_rate,
"average rate (%.3fHz) exceeded maximum (%.3fHz)" %
(rate, self.max_rate))
def callback(self, msg):
# flag that message has been received
self.message_received = True
try:
self.lock.acquire()
curr_rostime = rospy.get_rostime()
#print "CURR ROSTIME", curr_rostime.to_sec()
if curr_rostime.is_zero():
return
curr = curr_rostime.to_sec()
if self.msg_t0 <= 0.0 or self.msg_t0 > curr:
self.msg_t0 = curr
self.msg_count = 1
last = 0
else:
self.msg_count += 1
last = self.msg_tn
self.msg_tn = curr
# If we're instructed to check each inter-message interval, do
# so
if self.check_intervals and last > 0:
interval = curr - last
if interval < self.min_interval:
print >> sys.stderr, "CURR", curr
print >> sys.stderr, "LAST", last
print >> sys.stderr, "msg_count", self.msg_count
print >> sys.stderr, "msg_tn", self.msg_tn
self.errors.append(
'min_interval exceeded: %s [actual] vs. %s [min]'%\
(interval, self.min_interval))
# If max_interval is <= 0.0, then we have no max
elif self.max_interval > 0.0 and interval > self.max_interval:
self.errors.append(
'max_interval exceeded: %s [actual] vs. %s [max]'%\
(interval, self.max_interval))
finally:
self.lock.release()
if __name__ == '__main__':
# A dirty hack to work around an apparent race condition at startup
# that causes some hztests to fail. Most evident in the tests of
# rosstage.
time.sleep(0.75)
try:
rostest.run('rostest', NAME, HzTest, sys.argv)
except KeyboardInterrupt, e:
pass
print "exiting"

View File

@ -1,2 +0,0 @@
- builder: epydoc
config: epydoc.config

View File

@ -1,254 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
from __future__ import with_statement
"""
Interface for using rostest from other Python code as well as running
Python unittests with additional reporting mechanisms and rosbuild
(CMake) integration.
"""
import sys
XML_OUTPUT_FLAG='--gtest_output=xml:' #use gtest-compatible flag
def is_subscriber(topic, subscriber_id):
"""
Predicate to check whether or not master think subscriber_id
subscribes to topic.
@return: True if still register as a subscriber
@rtype: bool
"""
import roslib.scriptutil as scriptutil
return scriptutil.is_subscriber(topic, subscriber_id)
def is_publisher(topic, publisher_id):
"""
Predicate to check whether or not master think publisher_id
publishes topic.
@return: True if still register as a publisher
@rtype: bool
"""
import roslib.scriptutil as scriptutil
return scriptutil.is_publisher(topic, publisher_id)
def rosrun(package, test_name, test, sysargs=None):
"""
Run a rostest/unittest-based integration test.
@param package: name of package that test is in
@type package: str
@param test_name: name of test that is being run
@type test_name: str
@param test: test class
@type test: unittest.TestCase
@param sysargs: command-line args. If not specified, this defaults to sys.argv. rostest
will look for the --text and --gtest_output parameters
@type sysargs: list
"""
if sysargs is None:
# lazy-init sys args
import sys
sysargs = sys.argv
#parse sysargs
result_file = None
for arg in sysargs:
if arg.startswith(XML_OUTPUT_FLAG):
result_file = arg[len(XML_OUTPUT_FLAG):]
text_mode = '--text' in sysargs
coverage_mode = '--cov' in sysargs
if coverage_mode:
_start_coverage(package)
# lazy-import so that these don't affect coverage tests
from rostestutil import createXMLRunner, printSummary
import unittest
import rospy
suite = unittest.TestLoader().loadTestsFromTestCase(test)
if text_mode:
result = unittest.TextTestRunner(verbosity=2).run(suite)
else:
result = createXMLRunner(package, test_name, result_file).run(suite)
if coverage_mode:
_stop_coverage(package)
printSummary(result)
# shutdown any node resources in case test forgets to
rospy.signal_shutdown('test complete')
if not result.wasSuccessful():
import sys
sys.exit(1)
# TODO: rename to rosrun -- migrating name to avoid confusion and enable easy xmlrunner use
run = rosrun
def unitrun(package, test_name, test, sysargs=None, coverage_packages=None):
"""
Wrapper routine from running python unitttests with
JUnit-compatible XML output. This is meant for unittests that do
not not need a running ROS graph (i.e. offline tests only).
This enables JUnit-compatible test reporting so that
test results can be reported to higher-level tools.
@param package: name of ROS package that is running the test
@type package: str
@param coverage_packages: list of Python package to compute coverage results for. Defaults to package
@type coverage_packages: [str]
"""
if sysargs is None:
# lazy-init sys args
import sys
sysargs = sys.argv
import unittest
if coverage_packages is None:
coverage_packages = [package]
#parse sysargs
result_file = None
for arg in sysargs:
if arg.startswith(XML_OUTPUT_FLAG):
result_file = arg[len(XML_OUTPUT_FLAG):]
text_mode = '--text' in sysargs
coverage_mode = '--cov' in sysargs or '--covhtml' in sysargs
if coverage_mode:
_start_coverage(coverage_packages)
# lazy-import after coverage tests begin
from rostestutil import createXMLRunner, printSummary
suite = unittest.TestLoader().loadTestsFromTestCase(test)
if text_mode:
result = unittest.TextTestRunner(verbosity=2).run(suite)
else:
result = createXMLRunner(package, test_name, result_file).run(suite)
if coverage_mode:
cov_html_dir = 'covhtml' if '--covhtml' in sysargs else None
_stop_coverage(coverage_packages, html=cov_html_dir)
printSummary(result)
if not result.wasSuccessful():
import sys
sys.exit(1)
# coverage instance
_cov = None
def _start_coverage(packages):
global _cov
try:
import coverage
try:
_cov = coverage.coverage()
# load previous results as we need to accumulate
_cov.load()
_cov.start()
except coverage.CoverageException:
print >> sys.stderr, "WARNING: you have an older version of python-coverage that is not support. Please update to the version provided by 'easy_install coverage'"
except ImportError, e:
print >> sys.stderr, """WARNING: cannot import python-coverage, coverage tests will not run.
To install coverage, run 'easy_install coverage'"""
try:
# reload the module to get coverage
for package in packages:
if package in sys.modules:
reload(sys.modules[package])
except ImportError, e:
print >> sys.stderr, "WARNING: cannot import '%s', will not generate coverage report"%package
return
def _stop_coverage(packages, html=None):
"""
@param packages: list of packages to generate coverage reports for
@type packages: [str]
@param html: (optional) if not None, directory to generate html report to
@type html: str
"""
if _cov is None:
return
import sys, os
try:
_cov.stop()
# accumulate results
_cov.save()
# - update our own .coverage-modules file list for
# coverage-html tool. The reason we read and rewrite instead
# of append is that this does a uniqueness check to keep the
# file from growing unbounded
if os.path.exists('.coverage-modules'):
with open('.coverage-modules','r') as f:
all_packages = set([x for x in f.read().split('\n') if x.strip()] + packages)
else:
all_packages = set(packages)
with open('.coverage-modules','w') as f:
f.write('\n'.join(all_packages)+'\n')
try:
# list of all modules for html report
all_mods = []
# iterate over packages to generate per-package console reports
for package in packages:
pkg = __import__(package)
m = [v for v in sys.modules.values() if v and v.__name__.startswith(package)]
all_mods.extend(m)
# generate overall report and per module analysis
_cov.report(m, show_missing=0)
for mod in m:
res = _cov.analysis(mod)
print "\n%s:\nMissing lines: %s"%(res[0], res[3])
if html:
print "="*80+"\ngenerating html coverage report to %s\n"%html+"="*80
_cov.html_report(all_mods, directory=html)
except ImportError, e:
print >> sys.stderr, "WARNING: cannot import '%s', will not generate coverage report"%package
except ImportError, e:
print >> sys.stderr, """WARNING: cannot import python-coverage, coverage tests will not run.
To install coverage, run 'easy_install coverage'"""
#502: backwards compatibility for unbuilt rostest packages
def rostestmain():
#NOTE: this is importing from rostest.rostest
from rostest.rostest_main import rostestmain as _main
_main()

View File

@ -1,161 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2010, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id: launch.py 8960 2010-04-07 23:54:50Z kwc $
"""
rostest implementation of running bare (gtest-compatible) unit test
executables. These do not run in a ROS environment.
"""
import os
import unittest
import time
import roslib.packages
import roslaunch.pmon
from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \
xmlResultsFile, rostest_name_from_path, printlog, printlogerr
import rostest.xmlresults
BARE_TIME_LIMIT = 60.
class BareTestCase(unittest.TestCase):
def __init__(self, exe, args, results, retry=0, time_limit=None, test_name=None):
"""
@param exe: path to executable to run
@type exe: str
@param args: arguments to exe
@type args: [str]
@param results: test results accumulator
@param retry: (optional) number of retries for test
@type retry: int
@param time_limit: (optional) time limit for test. Defaults to BARE_TIME_LIMIT.
@type time_limit: float
@param test_name: (optional) override automatically geneated test name
@type test_name: str
"""
super(BareTestCase, self).__init__()
self.results = results
_, self.package = roslib.packages.get_dir_pkg(exe)
self.exe = exe
if test_name is None:
self.test_name = os.path.basename(exe)
else:
self.test_name = test_name
self.args = [self.exe] + args
self.retry = retry
self.time_limit = time_limit or BARE_TIME_LIMIT
self.pmon = None
def setUp(self):
self.pmon = roslaunch.pmon.start_process_monitor()
def tearDown(self):
if self.pmon is not None:
roslaunch.pmon.shutdown_process_monitor(self.pmon)
self.pmon = None
def runTest(self):
self.failIf(self.package is None, "unable to determine package of executable")
done = False
while not done:
test_name = self.test_name
printlog("Running test [%s]", test_name)
#setup the test
# - we pass in the output test_file name so we can scrape it
test_file = xmlResultsFile(self.package, test_name, False)
if os.path.exists(test_file):
printlog("removing previous test results file [%s]", test_file)
os.remove(test_file)
self.args.append('--gtest_output=xml:%s'%test_file)
# run the test, blocks until completion
printlog("running test %s"%test_name)
timeout_failure = False
run_id = None
#TODO: really need different, non-node version of LocalProcess instead of these extra args
process = roslaunch.nodeprocess.LocalProcess(run_id, self.package, self.test_name, self.args, os.environ, False, cwd='cwd', is_node=False)
pm = self.pmon
pm.register(process)
success = process.start()
self.assert_(success, "test failed to start")
#poll until test terminates or alloted time exceed
timeout_t = time.time() + self.time_limit
try:
while pm.mainthread_spin_once() and process.is_alive():
#test fails on timeout
if time.time() > timeout_t:
raise roslaunch.launch.RLTestTimeoutException("test max time allotted")
time.sleep(0.1)
except roslaunch.launch.RLTestTimeoutException, e:
if self.retry:
timeout_failure = True
else:
raise
if not timeout_failure:
printlog("test [%s] finished"%test_name)
else:
printlogerr("test [%s] timed out"%test_name)
# load in test_file
if not timeout_failure:
self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name)
printlog("test [%s] results are in [%s]", test_name, test_file)
results = rostest.xmlresults.read(test_file, test_name)
test_fail = results.num_errors or results.num_failures
else:
test_fail = True
if self.retry > 0 and test_fail:
self.retry -= 1
printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, self.retry))
else:
done = True
self.results.accumulate(results)
printlog("test [%s] results summary: %s errors, %s failures, %s tests",
test_name, results.num_errors, results.num_failures, results.num_tests)
printlog("[ROSTEST] test [%s] done", test_name)

View File

@ -1,197 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
from __future__ import with_statement
# NOTE: this has not survived the many refactorings and roslaunch changes well. There are too many ugly globals and bad
# code organizational choices at this point, but it's not a high priority to cleanup.
import os
import sys
import time
import unittest
import logging
import roslaunch
import roslib.packages
import roslib.roslogging
from rostest.rostestutil import createXMLRunner, printRostestSummary, \
xmlResultsFile, rostest_name_from_path
from rostest.rostest_parent import ROSTestLaunchParent
import rostest.baretest
import rostest.xmlresults
import rostest.runner
_NAME = 'rostest'
def configure_logging():
import socket
logfile_basename = 'rostest-%s-%s.log'%(socket.gethostname(), os.getpid())
logfile_name = roslib.roslogging.configure_logging('rostest', filename=logfile_basename)
if logfile_name:
print "... logging to %s"%logfile_name
return logfile_name
def write_bad_filename_failure(test_file, results_file, outname):
# similar to rostest-check-results
results_file_dir = os.path.dirname(results_file)
if not os.path.isdir(results_file_dir):
os.makedirs(results_file_dir)
with open(results_file, 'w') as f:
d = {'test': outname, 'test_file': test_file }
f.write("""<?xml version="1.0" encoding="UTF-8"?>
<testsuite tests="1" failures="1" time="1" errors="0" name="%(test)s">
<testcase name="test_ran" status="run" time="1" classname="Results">
<failure message="rostest file [%(test_file)s] does not exist" type=""/>
</testcase>
</testsuite>"""%d)
def rostestmain():
import roslaunch.rlutil
# make sure all loggers are configured properly
logfile_name = configure_logging()
logger = logging.getLogger('rostest')
import roslaunch.core
roslaunch.core.add_printlog_handler(logger.info)
roslaunch.core.add_printerrlog_handler(logger.error)
from optparse import OptionParser
parser = OptionParser(usage="usage: %prog [options] file", prog=_NAME)
parser.add_option("-t", "--text",
action="store_true", dest="text_mode", default=False,
help="Run with stdout output instead of XML output")
parser.add_option("--bare",
action="store_true", dest="bare", default=False,
help="Run bare gtest-compatible executable instead of rostest")
parser.add_option("--bare-limit", metavar="TIME_LIMIT",
dest="bare_limit", default=60,
help="Set time limit for --bare executable")
parser.add_option("--bare-name", metavar="TEST_NAME",
dest="bare_name", default=None,
help="Test name for --bare executable")
(options, args) = parser.parse_args()
try:
if options.bare:
# no need to resolve arguments in this case
pass
else:
args = roslaunch.rlutil.resolve_launch_arguments(args)
except roslaunch.core.RLException, e:
print >> sys.stderr, str(e)
sys.exit(1)
logger.info('rostest starting with options %s, args %s'%(options, args))
if len(args) == 0:
parser.error("You must supply a test file argument to rostest.")
if len(args) != 1 and not options.bare:
parser.error("rostest only accepts a single test file")
# compute some common names we'll be using to generate test names and files
test_file = args[0]
pkg_dir, pkg = roslib.packages.get_dir_pkg(test_file)
outname = rostest_name_from_path(pkg_dir, test_file)
# #1140
if not options.bare and not os.path.isfile(test_file):
results_file = xmlResultsFile(pkg, outname, True)
write_bad_filename_failure(test_file, results_file, outname)
parser.error("test file is invalid. Generated failure case result file in %s"%results_file)
try:
if options.bare:
# TODO: does bare-retry make sense?
time_limit = float(options.bare_limit) if options.bare_limit else None
testCase = rostest.baretest.BareTestCase(test_file, args[1:], rostest.runner.getResults(), retry=0, time_limit=time_limit, test_name=options.bare_name)
suite = unittest.TestSuite()
suite.addTest(testCase)
# override outname
if options.bare_name:
outname = options.bare_name
else:
testCase = rostest.runner.createUnitTest(pkg, test_file)
suite = unittest.TestLoader().loadTestsFromTestCase(testCase)
if options.text_mode:
rostest.runner.setTextMode(True)
result = unittest.TextTestRunner(verbosity=2).run(suite)
else:
is_rostest = True
results_file = xmlResultsFile(pkg, outname, is_rostest)
xml_runner = createXMLRunner(pkg, outname, \
results_file=results_file, \
is_rostest=is_rostest)
result = xml_runner.run(suite)
#_accumulateResults(xmlresults.read(results_file))
finally:
# really make sure that all of our processes have been killed
test_parents = rostest.runner.getRostestParents()
for r in test_parents:
logger.info("finally rostest parent tearDown [%s]", r)
r.tearDown()
del test_parents[:]
from roslaunch.pmon import pmon_shutdown
logger.info("calling pmon_shutdown")
pmon_shutdown()
logger.info("... done calling pmon_shutdown")
# print config errors after test has run so that we don't get caught up in .xml results
config = rostest.runner.getConfig()
if config:
if config.config_errors:
print >> sys.stderr, "\n[ROSTEST WARNINGS]"+'-'*62+'\n'
for err in config.config_errors:
print >> sys.stderr, " * %s"%err
print ''
# summary is worthless if textMode is on as we cannot scrape .xml results
subtest_results = rostest.runner.getResults()
if not options.text_mode:
printRostestSummary(result, subtest_results)
else:
print "WARNING: overall test result is not accurate when --text is enabled"
if logfile_name:
print "rostest log file is in %s"%logfile_name
if not result.wasSuccessful():
sys.exit(1)
elif subtest_results.num_errors or subtest_results.num_failures:
sys.exit(2)
if __name__ == '__main__':
rostestmain()

View File

@ -1,92 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
import logging
import sys
import roslaunch.config
from roslaunch.core import printlog_bold, RLException
import roslaunch.launch
import roslaunch.pmon
import roslaunch.server
import roslaunch.xmlloader
#TODOXXX: probably move process listener infrastructure into here
import roslaunch.parent
class ROSTestLaunchParent(roslaunch.parent.ROSLaunchParent):
## @param run_id str: UUID of roslaunch session
## @throws RLException
def __init__(self, config, roslaunch_files, port):
if config is None:
raise Exception("config not initialized")
# we generate a run_id for each test
run_id = roslaunch.core.generate_run_id()
super(ROSTestLaunchParent, self).__init__(run_id, roslaunch_files, is_core=True, port=port)
self.config = config
self.config.master.auto = self.config.master.AUTO_RESTART
def _load_config(self):
# disable super, just in case, though this shouldn't get called
pass
## initializes self.config and xmlrpc infrastructure
def setUp(self):
self._start_infrastructure()
self._init_runner()
def tearDown(self):
if self.runner is not None:
runner = self.runner
runner.stop()
self._stop_infrastructure()
## perform launch of nodes, does not launch tests. rostest_parent
## follows a different pattern of init/run than the normal
## roslaunch, which is why it does not reuse start()/spin()
def launch(self):
if self.runner is not None:
return self.runner.launch()
else:
raise Exception("no runner to launch")
## run the test, blocks until completion
def run_test(self, test):
if self.runner is not None:
# run the test, blocks until completion
return self.runner.run_test(test)
else:
raise Exception("no runner")

View File

@ -1,246 +0,0 @@
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
"""
rostest helper routines.
"""
# IMPORTANT: no routine here can in anyway cause rospy to be loaded (that includes roslaunch)
import os
import sys
import cStringIO
import logging
import roslib.rosenv
import rostest.xmlrunner
def printlog(msg, *args):
if args:
msg = msg%args
logging.getLogger('rostest').info(msg)
print "[ROSTEST]"+msg
def printlogerr(msg, *args):
if args:
msg = msg%args
logging.getLogger('rostest').error(msg)
print >> sys.stderr, "[ROSTEST]"+msg
_errors = None
def getErrors():
return _errors
def rostest_name_from_path(pkg_dir, test_file):
"""
Derive name of rostest name based on file name/path.
@return: name of rostest
@rtype: str
"""
test_file_abs = os.path.abspath(test_file)
if test_file_abs.startswith(pkg_dir):
# compute package-relative path
test_file = test_file_abs[len(pkg_dir):]
if test_file[0] == os.sep:
test_file = test_file[1:]
outname = test_file.replace(os.sep, '_')
if '.' in outname:
outname = outname[:outname.rfind('.')]
return outname
def printRostestSummary(result, rostest_results):
"""
Print summary of rostest results to stdout.
"""
# we have two separate result objects, which can be a bit
# confusing. 'result' counts successful _running_ of tests
# (i.e. doesn't check for actual test success). The 'r' result
# object contains results of the actual tests.
global _errors
_errors = result.errors
buff = cStringIO.StringIO()
buff.write("[ROSTEST]"+'-'*71+'\n\n')
for tc_result in rostest_results.test_case_results:
buff.write(tc_result.description)
for tc_result in result.failures:
buff.write("[%s][failed]\n"%tc_result[0]._testMethodName)
buff.write('\nSUMMARY\n')
if result.wasSuccessful() and (rostest_results.num_errors + rostest_results.num_failures) == 0:
buff.write("\033[32m * RESULT: SUCCESS\033[0m\n")
else:
buff.write("\033[1;31m * RESULT: FAIL\033[0m\n")
# TODO: still some issues with the numbers adding up if tests fail to launch
# number of errors from the inner tests, plus add in count for tests
# that didn't run properly ('result' object).
buff.write(" * TESTS: %s\n"%rostest_results.num_tests)
num_errors = rostest_results.num_errors+len(result.errors)
if num_errors:
buff.write("\033[1;31m * ERRORS: %s\033[0m\n"%num_errors)
else:
buff.write(" * ERRORS: 0\n")
num_failures = rostest_results.num_failures+len(result.failures)
if num_failures:
buff.write("\033[1;31m * FAILURES: %s\033[0m\n"%num_failures)
else:
buff.write(" * FAILURES: 0\n")
if result.failures:
buff.write("\nERROR: The following tests failed to run:\n")
for tc_result in result.failures:
buff.write(" * " +tc_result[0]._testMethodName + "\n")
print buff.getvalue()
def printSummary(result):
"""
Print summary of unit test result to stdout
@param result: test results
"""
buff = cStringIO.StringIO()
buff.write("-------------------------------------------------------------\nSUMMARY:\n")
if result.wasSuccessful():
buff.write("\033[32m * RESULT: SUCCESS\033[0m\n")
else:
buff.write(" * RESULT: FAIL\n")
buff.write(" * TESTS: %s\n"%result.testsRun)
buff.write(" * ERRORS: %s [%s]\n"%(len(result.errors), ', '.join([e[0]._testMethodName for e in result.errors])))
buff.write(" * FAILURES: %s [%s]\n"%(len(result.failures), ','.join([e[0]._testMethodName for e in result.failures])))
print buff.getvalue()
def createXMLRunner(test_pkg, test_name, results_file=None, is_rostest=False):
"""
Create the unittest test runner with XML output
@param test_pkg: package name
@type test_pkg: str
@param test_name: test name
@type test_name: str
@param is_rostest: if True, use naming scheme for rostest itself instead of individual unit test naming
@type is_rostest: bool
"""
test_name = os.path.basename(test_name)
# determine output xml file name
if not results_file:
results_file = xmlResultsFile(test_pkg, test_name, is_rostest)
test_dir = os.path.abspath(os.path.dirname(results_file))
if not os.path.exists(test_dir):
try:
roslib.rosenv.makedirs_with_parent_perms(test_dir) #NOTE: this will pass up an error exception if it fails
except OSError:
raise IOError("cannot create test results directory [%s]. Please check permissions."%(test_dir))
elif os.path.isfile(test_dir):
raise Exception("ERROR: cannot run test suite, file is preventing creation of test dir: %s"%test_dir)
print "[ROSTEST] Outputting test results to %s"%results_file
outstream = open(results_file, 'w')
return rostest.xmlrunner.XMLTestRunner(stream=outstream)
def xmlResultsFile(test_pkg, test_name, is_rostest=False):
"""
@param test_pkg: name of test's package
@type test_pkg: str
@param test_name str: name of test
@type test_name: str
@param is_rostest: True if the results file is for a rostest-generated unit instance
@type is_rostest: bool
@return: name of xml results file for specified test
@rtype: str
"""
test_dir = os.path.join(roslib.rosenv.get_test_results_dir(), test_pkg)
if not os.path.exists(test_dir):
try:
roslib.rosenv.makedirs_with_parent_perms(test_dir)
except OSError:
raise IOError("cannot create test results directory [%s]. Please check permissions."%(test_dir))
# #576: strip out chars that would bork the filename
# this is fairly primitive, but for now just trying to catch some common cases
for c in ' "\'&$!`/\\':
if c in test_name:
test_name = test_name.replace(c, '_')
if is_rostest:
return os.path.join(test_dir, 'TEST-rostest__%s.xml'%test_name)
else:
return os.path.join(test_dir, 'TEST-%s.xml'%test_name)
def test_failure_junit_xml(test_name, message, stdout=None):
"""
Generate JUnit XML file for a unary test suite where the test failed
@param test_name: Name of test that failed
@type test_name: str
@param message: failure message
@type message: str
@param stdout: stdout data to include in report
@type stdout: str
"""
if not stdout:
return """<?xml version="1.0" encoding="UTF-8"?>
<testsuite tests="1" failures="1" time="1" errors="0" name="%s">
<testcase name="test_ran" status="run" time="1" classname="Results">
<failure message="%s" type=""/>
</testcase>
</testsuite>"""%(test_name, message)
else:
return """<?xml version="1.0" encoding="UTF-8"?>
<testsuite tests="1" failures="1" time="1" errors="0" name="%s">
<testcase name="test_ran" status="run" time="1" classname="Results">
<failure message="%s" type=""/>
</testcase>
<system-out><![CDATA[[
%s
]]></system-out>
</testsuite>"""%(test_name, message, stdout)
def test_success_junit_xml(test_name):
"""
Generate JUnit XML file for a unary test suite where the test succeeded.
@param test_name: Name of test that passed
@type test_name: str
"""
return """<?xml version="1.0" encoding="UTF-8"?>
<testsuite tests="1" failures="0" time="1" errors="0" name="%s">
<testcase name="test_ran" status="run" time="1" classname="Results">
</testcase>
</testsuite>"""%(test_name)

View File

@ -1,224 +0,0 @@
from __future__ import with_statement
import os
import sys
import logging
import time
import unittest
import roslaunch
import roslib.packages
from rostest.rostestutil import createXMLRunner, printSummary, printRostestSummary, \
xmlResultsFile, printlog, printlogerr
from rostest.rostest_parent import ROSTestLaunchParent
import rostest.xmlresults
_DEFAULT_TEST_PORT = 22422
# NOTE: ignoring Python style guide as unittest is sadly written with Java-like camel casing
_results = rostest.xmlresults.Result('rostest', 0, 0, 0)
def _accumulateResults(results):
_results.accumulate(results)
def getResults():
return _results
_textMode = False
def setTextMode(val):
global _textMode
_textMode = val
# global store of all ROSLaunchRunners so we can do an extra shutdown
# in the rare event a tearDown fails to execute
_test_parents = []
_config = None
def _addRostestParent(runner):
global _test_parents, _config
logging.getLogger('rostest').info("_addRostestParent [%s]", runner)
_test_parents.append(runner)
_config = runner.config
def getConfig():
return _config
def getRostestParents():
return _test_parents
# TODO: convert most of this into a run() routine of a RoslaunchRunner subclass
## generate test failure if tests with same name in launch file
def failDuplicateRunner(testName):
def fn(self):
print "Duplicate tests named [%s] in rostest suite"%testName
self.fail("Duplicate tests named [%s] in rostest suite"%testName)
return fn
def failRunner(testName, message):
def fn(self):
print >> sys.stderr, message
self.fail(message)
return fn
def rostestRunner(test, test_pkg):
"""
Test function generator that takes in a roslaunch Test object and
returns a class instance method that runs the test. TestCase
setUp() is responsible for ensuring that the rest of the roslaunch
state is correct and tearDown() is responsible for tearing
everything down cleanly.
@param test: rost test to run
@type test: roslaunch.Test
@return: function object to run testObj
@rtype: fn
"""
## test case pass/fail is a measure of whether or not the test ran
def fn(self):
done = False
while not done:
self.assert_(self.test_parent is not None, "ROSTestParent initialization failed")
test_name = test.test_name
printlog("Running test [%s]", test_name)
#launch the other nodes
succeeded, failed = self.test_parent.launch()
self.assert_(not failed, "Test Fixture Nodes %s failed to launch"%failed)
#setup the test
# - we pass in the output test_file name so we can scrape it
test_file = xmlResultsFile(test_pkg, test_name, False)
if os.path.exists(test_file):
printlog("removing previous test results file [%s]", test_file)
os.remove(test_file)
# TODO: have to redeclare this due to a bug -- this file
# needs to be renamed as it aliases the module where the
# constant is elsewhere defined. The fix is to rename
# rostest.py
XML_OUTPUT_FLAG='--gtest_output=xml:' #use gtest-compatible flag
test.args = "%s %s%s"%(test.args, XML_OUTPUT_FLAG, test_file)
if _textMode:
test.output = 'screen'
test.args = test.args + " --text"
# run the test, blocks until completion
printlog("running test %s"%test_name)
timeout_failure = False
try:
self.test_parent.run_test(test)
except roslaunch.launch.RLTestTimeoutException, e:
if test.retry:
timeout_failure = True
else:
raise
if not timeout_failure:
printlog("test [%s] finished"%test_name)
else:
printlogerr("test [%s] timed out"%test_name)
# load in test_file
if not _textMode or timeout_failure:
if not timeout_failure:
self.assert_(os.path.isfile(test_file), "test [%s] did not generate test results"%test_name)
printlog("test [%s] results are in [%s]", test_name, test_file)
results = rostest.xmlresults.read(test_file, test_name)
test_fail = results.num_errors or results.num_failures
else:
test_fail = True
if test.retry > 0 and test_fail:
test.retry -= 1
printlog("test [%s] failed, retrying. Retries left: %s"%(test_name, test.retry))
self.tearDown()
self.setUp()
else:
done = True
_accumulateResults(results)
printlog("test [%s] results summary: %s errors, %s failures, %s tests",
test_name, results.num_errors, results.num_failures, results.num_tests)
#self.assertEquals(0, results.num_errors, "unit test reported errors")
#self.assertEquals(0, results.num_failures, "unit test reported failures")
else:
if test.retry:
printlogerr("retry is disabled in --text mode")
done = True
printlog("[ROSTEST] test [%s] done", test_name)
return fn
## Function that becomes TestCase.setup()
def setUp(self):
# new test_parent for each run. we are a bit inefficient as it would be possible to
# reuse the roslaunch base infrastructure for each test, but the roslaunch code
# is not abstracted well enough yet
self.test_parent = ROSTestLaunchParent(self.config, [self.test_file], port=_DEFAULT_TEST_PORT)
printlog("setup[%s] run_id[%s] starting", self.test_file, self.test_parent.run_id)
self.test_parent.setUp()
# the config attribute makes it easy for tests to access the ROSLaunchConfig instance
self.config = self.test_parent.config
_addRostestParent(self.test_parent)
printlog("setup[%s] run_id[%s] done", self.test_file, self.test_parent.run_id)
## Function that becomes TestCase.tearDown()
def tearDown(self):
printlog("tearDown[%s]", self.test_file)
if self.test_parent:
self.test_parent.tearDown()
printlog("rostest teardown %s complete", self.test_file)
def createUnitTest(pkg, test_file):
"""
Unit test factory. Constructs a unittest class based on the roslaunch
@param pkg: package name
@type pkg: str
@param test_file: rostest filename
@type test_file: str
"""
# parse the config to find the test files
config = roslaunch.parent.load_config_default([test_file], _DEFAULT_TEST_PORT)
config.master.log_output = True
# pass in config to class as a property so that test_parent can be initialized
classdict = { 'setUp': setUp, 'tearDown': tearDown, 'config': config,
'test_parent': None, 'test_file': test_file }
# add in the tests
testNames = []
for test in config.tests:
# #1989: find test first to make sure it exists and is executable
err_msg = None
try:
cmd = roslib.packages.find_node(test.package, test.type, \
test.machine.ros_root, test.machine.ros_package_path)
if not cmd:
err_msg = "Test node [%s/%s] does not exist or is not executable"%(test.package, test.type)
except roslib.packages.ROSPkgException, e:
err_msg = "Package [%s] for test node [%s/%s] does not exist"%(test.package, test.package, test.type)
testName = 'test%s'%(test.test_name)
if err_msg:
classdict[testName] = failRunner(test.test_name, err_msg)
elif testName in testNames:
classdict[testName] = failDuplicateRunner(test.test_name)
else:
classdict[testName] = rostestRunner(test, pkg)
testNames.append(testName)
# instantiate the TestCase instance with our magically-created tests
return type('RosTest',(unittest.TestCase,),classdict)

View File

@ -1,301 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Revision $Id$
import os
import sys
import string
from xml.dom.minidom import parse, parseString
from xml.dom import Node as DomNode
import roslib.rosenv
## Common container for 'error' and 'failure' results
class _TestInfo(object):
## @param type str: type attribute from xml
## @param text str: text property from xml
def __init__(self, type, text):
self.type = type
self.text = text
## 'error' result container
class TestError(_TestInfo):
def xml(self):
return u'<error type="%s"><![CDATA[%s]]></error>'%(self.type, self.text)
## 'failure' result container
class TestFailure(_TestInfo):
def xml(self):
return u'<failure type="%s"><![CDATA[%s]]></failure>'%(self.type, self.text)
## 'testcase' result container
class TestCaseResult(object):
## @param name str: name of testcase
def __init__(self, name):
self.name = name
self.failures = []
self.errors = []
self.time = 0.0
self.classname = ''
## @return bool: True if test passed
def _passed(self):
return not self.errors and not self.failures
## bool: True if test passed without errors or failures
passed = property(_passed)
## @return str: description of testcase failure
def _failure_description(self):
if self.failures:
tmpl = "[%s][FAILURE]"%self.name
tmpl = tmpl + '-'*(80-len(tmpl))
tmpl = tmpl+"\n%s\n"+'-'*80+"\n\n"
return '\n'.join(tmpl%x.text for x in self.failures)
return ''
## @return str: description of testcase error
def _error_description(self):
if self.errors:
tmpl = "[%s][ERROR]"%self.name
tmpl = tmpl + '-'*(80-len(tmpl))
tmpl = tmpl+"\n%s\n"+'-'*80+"\n\n"
return '\n'.join(tmpl%x.text for x in self.errors)
return ''
## @return str: description of testcase result
def _description(self):
if self.passed:
return "[%s][passed]\n"%self.name
else:
return self._failure_description()+\
self._error_description()
## str: printable description of testcase result
description = property(_description)
## @param failure TestFailure
def add_failure(self, failure):
self.failures.append(failure)
## @param failure TestError
def add_error(self, error):
self.errors.append(error)
def xml(self):
return u' <testcase classname="%s" name="%s" time="%s">\n'%(self.classname, self.name, self.time)+\
'\n '.join([f.xml() for f in self.failures])+\
'\n '.join([e.xml() for e in self.errors])+\
' </testcase>'
class Result(object):
__slots__ = ['name', 'num_errors', 'num_failures', 'num_tests', \
'test_case_results', 'system_out', 'system_err', 'time']
def __init__(self, name, num_errors, num_failures, num_tests):
self.name = name
self.num_errors = num_errors
self.num_failures = num_failures
self.num_tests = num_tests
self.test_case_results = []
self.system_out = ''
self.system_err = ''
self.time = 0.0
## Add results from \a r to this result
## @param r Result: results to aggregate with this result
def accumulate(self, r):
self.num_errors += r.num_errors
self.num_failures += r.num_failures
self.num_tests += r.num_tests
self.test_case_results.extend(r.test_case_results)
if r.system_out:
self.system_out += '\n'+r.system_out
if r.system_err:
self.system_err += '\n'+r.system_err
## Add results from a testcase to this result container
## @param r TestCaseResult
def add_test_case_result(self, r):
self.test_case_results.append(r)
## @return document as unicode (UTF-8 declared) XML
def xml(self):
return u'<?xml version="1.0" encoding="utf-8"?>'+\
'<testsuite name="%s" tests="%s" errors="%s" failures="%s" time="%s">'%\
(self.name, self.num_tests, self.num_errors, self.num_failures, self.time)+\
'\n'.join([tc.xml() for tc in self.test_case_results])+\
' <system-out><![CDATA[%s]]></system-out>'%self.system_out+\
' <system-err><![CDATA[%s]]></system-err>'%self.system_err+\
'</testsuite>'
def _text(tag):
return reduce(lambda x, y: x + y, [c.data for c in tag.childNodes if c.nodeType in [DomNode.TEXT_NODE, DomNode.CDATA_SECTION_NODE]], "").strip()
def _load_suite_results(test_suite_name, test_suite, result):
nodes = [n for n in test_suite.childNodes \
if n.nodeType == DomNode.ELEMENT_NODE]
for node in nodes:
name = node.tagName
if name == 'testsuite':
# for now we flatten this hierarchy
_load_suite_results(test_suite_name, node, result)
elif name == 'system-out':
if _text(node):
system_out = "[%s] stdout"%test_suite_name + "-"*(71-len(test_suite_name))
system_out += '\n'+_text(node)
result.system_out += system_out
elif name == 'system-err':
if _text(node):
system_err = "[%s] stderr"%test_suite_name + "-"*(71-len(test_suite_name))
system_err += '\n'+_text(node)
result.system_err += system_err
elif name == 'testcase':
name = node.getAttribute('name') or 'unknown'
classname = node.getAttribute('classname') or 'unknown'
# mangle the classname for some sense of uniformity
# between rostest/unittest/gtest
if '__main__.' in classname:
classname = classname[classname.find('__main__.')+9:]
if classname == 'rostest.rostest.RosTest':
classname = 'rostest'
elif not classname.startswith(result.name):
classname = "%s.%s"%(result.name,classname)
time = node.getAttribute('time') or 0.0
tc_result = TestCaseResult("%s/%s"%(test_suite_name,name))
tc_result.classname = classname
tc_result.time = time
result.add_test_case_result(tc_result)
for d in [n for n in node.childNodes \
if n.nodeType == DomNode.ELEMENT_NODE]:
# convert 'message' attributes to text elements to keep
# python unittest and gtest consistent
if d.tagName == 'failure':
message = d.getAttribute('message') or ''
text = _text(d) or message
x = TestFailure(d.getAttribute('type') or '', text)
tc_result.add_failure(x)
elif d.tagName == 'error':
message = d.getAttribute('message') or ''
text = _text(d) or message
x = TestError(d.getAttribute('type') or '', text)
tc_result.add_error(x)
## #603: unit test suites are not good about screening out illegal
## unicode characters. This little recipe I from http://boodebr.org/main/python/all-about-python-and-unicode#UNI_XML
## screens these out
import re
RE_XML_ILLEGAL = u'([\u0000-\u0008\u000b-\u000c\u000e-\u001f\ufffe-\uffff])' + \
u'|' + \
u'([%s-%s][^%s-%s])|([^%s-%s][%s-%s])|([%s-%s]$)|(^[%s-%s])' % \
(unichr(0xd800),unichr(0xdbff),unichr(0xdc00),unichr(0xdfff),
unichr(0xd800),unichr(0xdbff),unichr(0xdc00),unichr(0xdfff),
unichr(0xd800),unichr(0xdbff),unichr(0xdc00),unichr(0xdfff))
_safe_xml_regex = re.compile(RE_XML_ILLEGAL)
## read in file, screen out unsafe unicode characters
def _read_file_safe_xml(test_file):
import codecs
try:
# this is ugly, but the files in question that are problematic
# do not declare unicode type.
try:
f = codecs.open(test_file, "r", "utf-8" )
x = f.read()
except:
f.close()
f = codecs.open(test_file, "r", "iso8859-1" )
x = f.read()
for match in _safe_xml_regex.finditer(x):
x = x[:match.start()] + "?" + x[match.end():]
return x.encode("utf-8")
finally:
f.close()
## Read in the test_result file
## @param test_file str: test file path
## @param test_name str: name of test
## @return Result test results
def read(test_file, test_name):
try:
xml_str = _read_file_safe_xml(test_file)
test_suite = parseString(xml_str).getElementsByTagName('testsuite')
except Exception, e:
import traceback
traceback.print_exc()
print "WARN: cannot read test result file [%s]: %s"%(test_file, str(e))
return Result(test_name, 0, 0, 0)
if not test_suite:
print "WARN: test result file [%s] contains no results"%test_file
return Result(test_name, 0, 0, 0)
test_suite = test_suite[0]
vals = [test_suite.getAttribute(attr) for attr in ['errors', 'failures', 'tests']]
vals = [v or 0 for v in vals]
err, fail, tests = [string.atoi(val) for val in vals]
result = Result(test_name, err, fail, tests)
result.time = test_suite.getAttribute('time') or 0.0
# Create a prefix based on the test result filename. The idea is to
# disambiguate the case when tests of the same name are provided in
# different .xml files. We use the name of the parent directory
test_file_base = os.path.basename(os.path.dirname(test_file))
fname = os.path.basename(test_file)
if fname.startswith('TEST-'):
fname = fname[5:]
if fname.endswith('.xml'):
fname = fname[:-4]
test_file_base = "%s.%s"%(test_file_base, fname)
_load_suite_results(test_file_base, test_suite, result)
return result
def read_all(filter=[]):
"""
Read in the test_results and aggregate into a single Result object
@param filter: list of packages that should be processed
@type filter: [str]
@return: aggregated result
@rtype: L{Result}
"""
dir_ = roslib.rosenv.get_test_results_dir()
root_result = Result('ros', 0, 0, 0)
if not os.path.exists(dir_):
return root_result
for d in os.listdir(dir_):
if filter and not d in filter:
continue
subdir = os.path.join(dir_, d)
if os.path.isdir(subdir):
for file in os.listdir(subdir):
if file.endswith('.xml'):
file = os.path.join(subdir, file)
result = read(file, os.path.basename(subdir))
root_result.accumulate(result)
return root_result

View File

@ -1,425 +0,0 @@
"""
XML Test Runner for PyUnit
"""
# Written by Sebastian Rittau <srittau@jroger.in-berlin.de> and placed in
# the Public Domain. With contributions by Paolo Borelli.
__revision__ = "$Id$"
import os.path
import re
import sys
import time
import traceback
import unittest
from StringIO import StringIO
from xml.sax.saxutils import escape
from StringIO import StringIO
class _TestInfo(object):
"""Information about a particular test.
Used by _XMLTestResult.
"""
def __init__(self, test, time):
(self._class, self._method) = test.id().rsplit(".", 1)
self._time = time
self._error = None
self._failure = None
@staticmethod
def create_success(test, time):
"""Create a _TestInfo instance for a successful test."""
return _TestInfo(test, time)
@staticmethod
def create_failure(test, time, failure):
"""Create a _TestInfo instance for a failed test."""
info = _TestInfo(test, time)
info._failure = failure
return info
@staticmethod
def create_error(test, time, error):
"""Create a _TestInfo instance for an erroneous test."""
info = _TestInfo(test, time)
info._error = error
return info
def print_report(self, stream):
"""Print information about this test case in XML format to the
supplied stream.
"""
stream.write(' <testcase classname="%(class)s" name="%(method)s" time="%(time).4f">' % \
{
"class": self._class,
"method": self._method,
"time": self._time,
})
if self._failure != None:
self._print_error(stream, 'failure', self._failure)
if self._error != None:
self._print_error(stream, 'error', self._error)
stream.write('</testcase>\n')
def print_report_text(self, stream):
#stream.write(' <testcase classname="%(class)s" name="%(method)s" time="%(time).4f">' % \
# {
# "class": self._class,
# "method": self._method,
# "time": self._time,
# })
stream.write(self._method)
if self._failure != None:
stream.write(' ... FAILURE!\n')
self._print_error_text(stream, 'failure', self._failure)
if self._error != None:
stream.write(' ... ERROR!\n')
self._print_error_text(stream, 'error', self._error)
if self._failure == None and self._error == None:
stream.write(' ... ok\n')
def _print_error(self, stream, tagname, error):
"""Print information from a failure or error to the supplied stream."""
text = escape(str(error[1]))
stream.write('\n')
stream.write(' <%s type="%s">%s\n' \
% (tagname, str(error[0].__name__), text))
tb_stream = StringIO()
traceback.print_tb(error[2], None, tb_stream)
stream.write(escape(tb_stream.getvalue()))
stream.write(' </%s>\n' % tagname)
stream.write(' ')
def _print_error_text(self, stream, tagname, error):
"""Print information from a failure or error to the supplied stream."""
text = escape(str(error[1]))
stream.write('%s: %s\n' \
% (tagname.upper(), text))
tb_stream = StringIO()
traceback.print_tb(error[2], None, tb_stream)
stream.write(escape(tb_stream.getvalue()))
stream.write('-'*80 + '\n')
class _XMLTestResult(unittest.TestResult):
"""A test result class that stores result as XML.
Used by XMLTestRunner.
"""
def __init__(self, classname):
unittest.TestResult.__init__(self)
self._test_name = classname
self._start_time = None
self._tests = []
self._error = None
self._failure = None
def startTest(self, test):
unittest.TestResult.startTest(self, test)
self._error = None
self._failure = None
self._start_time = time.time()
def stopTest(self, test):
time_taken = time.time() - self._start_time
unittest.TestResult.stopTest(self, test)
if self._error:
info = _TestInfo.create_error(test, time_taken, self._error)
elif self._failure:
info = _TestInfo.create_failure(test, time_taken, self._failure)
else:
info = _TestInfo.create_success(test, time_taken)
self._tests.append(info)
def addError(self, test, err):
unittest.TestResult.addError(self, test, err)
self._error = err
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
self._failure = err
def print_report(self, stream, time_taken, out, err):
"""Prints the XML report to the supplied stream.
The time the tests took to perform as well as the captured standard
output and standard error streams must be passed in.a
"""
stream.write('<testsuite errors="%(e)d" failures="%(f)d" ' % \
{ "e": len(self.errors), "f": len(self.failures) })
stream.write('name="%(n)s" tests="%(t)d" time="%(time).3f">\n' % \
{
"n": self._test_name,
"t": self.testsRun,
"time": time_taken,
})
for info in self._tests:
info.print_report(stream)
stream.write(' <system-out><![CDATA[%s]]></system-out>\n' % out)
stream.write(' <system-err><![CDATA[%s]]></system-err>\n' % err)
stream.write('</testsuite>\n')
def print_report_text(self, stream, time_taken, out, err):
"""Prints the text report to the supplied stream.
The time the tests took to perform as well as the captured standard
output and standard error streams must be passed in.a
"""
#stream.write('<testsuite errors="%(e)d" failures="%(f)d" ' % \
# { "e": len(self.errors), "f": len(self.failures) })
#stream.write('name="%(n)s" tests="%(t)d" time="%(time).3f">\n' % \
# {
# "n": self._test_name,
# "t": self.testsRun,
# "time": time_taken,
# })
for info in self._tests:
info.print_report_text(stream)
class XMLTestRunner(object):
"""A test runner that stores results in XML format compatible with JUnit.
XMLTestRunner(stream=None) -> XML test runner
The XML file is written to the supplied stream. If stream is None, the
results are stored in a file called TEST-<module>.<class>.xml in the
current working directory (if not overridden with the path property),
where <module> and <class> are the module and class name of the test class.
"""
def __init__(self, stream=None):
self._stream = stream
self._path = "."
def run(self, test):
"""Run the given test case or test suite."""
class_ = test.__class__
classname = class_.__module__ + "." + class_.__name__
if self._stream == None:
filename = "TEST-%s.xml" % classname
stream = file(os.path.join(self._path, filename), "w")
stream.write('<?xml version="1.0" encoding="utf-8"?>\n')
else:
stream = self._stream
result = _XMLTestResult(classname)
start_time = time.time()
# TODO: Python 2.5: Use the with statement
old_stdout = sys.stdout
old_stderr = sys.stderr
sys.stdout = StringIO()
sys.stderr = StringIO()
try:
test(result)
try:
out_s = sys.stdout.getvalue()
except AttributeError:
out_s = ""
try:
err_s = sys.stderr.getvalue()
except AttributeError:
err_s = ""
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
time_taken = time.time() - start_time
result.print_report(stream, time_taken, out_s, err_s)
result.print_report_text(sys.stdout, time_taken, out_s, err_s)
if self._stream == None:
stream.close()
return result
def _set_path(self, path):
self._path = path
path = property(lambda self: self._path, _set_path, None,
"""The path where the XML files are stored.
This property is ignored when the XML file is written to a file
stream.""")
class XMLTestRunnerTest(unittest.TestCase):
def setUp(self):
self._stream = StringIO()
def _try_test_run(self, test_class, expected):
"""Run the test suite against the supplied test class and compare the
XML result against the expected XML string. Fail if the expected
string doesn't match the actual string. All time attribute in the
expected string should have the value "0.000". All error and failure
messages are reduced to "Foobar".
"""
runner = XMLTestRunner(self._stream)
runner.run(unittest.makeSuite(test_class))
got = self._stream.getvalue()
# Replace all time="X.YYY" attributes by time="0.000" to enable a
# simple string comparison.
got = re.sub(r'time="\d+\.\d+"', 'time="0.000"', got)
# Likewise, replace all failure and error messages by a simple "Foobar"
# string.
got = re.sub(r'(?s)<failure (.*?)>.*?</failure>', r'<failure \1>Foobar</failure>', got)
got = re.sub(r'(?s)<error (.*?)>.*?</error>', r'<error \1>Foobar</error>', got)
self.assertEqual(expected, got)
def test_no_tests(self):
"""Regression test: Check whether a test run without any tests
matches a previous run.
"""
class TestTest(unittest.TestCase):
pass
self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="0" time="0.000">
<system-out><![CDATA[]]></system-out>
<system-err><![CDATA[]]></system-err>
</testsuite>
""")
def test_success(self):
"""Regression test: Check whether a test run with a successful test
matches a previous run.
"""
class TestTest(unittest.TestCase):
def test_foo(self):
pass
self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">
<testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>
<system-out><![CDATA[]]></system-out>
<system-err><![CDATA[]]></system-err>
</testsuite>
""")
def test_failure(self):
"""Regression test: Check whether a test run with a failing test
matches a previous run.
"""
class TestTest(unittest.TestCase):
def test_foo(self):
self.assert_(False)
self._try_test_run(TestTest, """<testsuite errors="0" failures="1" name="unittest.TestSuite" tests="1" time="0.000">
<testcase classname="__main__.TestTest" name="test_foo" time="0.000">
<failure type="exceptions.AssertionError">Foobar</failure>
</testcase>
<system-out><![CDATA[]]></system-out>
<system-err><![CDATA[]]></system-err>
</testsuite>
""")
def test_error(self):
"""Regression test: Check whether a test run with a erroneous test
matches a previous run.
"""
class TestTest(unittest.TestCase):
def test_foo(self):
raise IndexError()
self._try_test_run(TestTest, """<testsuite errors="1" failures="0" name="unittest.TestSuite" tests="1" time="0.000">
<testcase classname="__main__.TestTest" name="test_foo" time="0.000">
<error type="exceptions.IndexError">Foobar</error>
</testcase>
<system-out><![CDATA[]]></system-out>
<system-err><![CDATA[]]></system-err>
</testsuite>
""")
def test_stdout_capture(self):
"""Regression test: Check whether a test run with output to stdout
matches a previous run.
"""
class TestTest(unittest.TestCase):
def test_foo(self):
print "Test"
self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">
<testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>
<system-out><![CDATA[Test
]]></system-out>
<system-err><![CDATA[]]></system-err>
</testsuite>
""")
def test_stderr_capture(self):
"""Regression test: Check whether a test run with output to stderr
matches a previous run.
"""
class TestTest(unittest.TestCase):
def test_foo(self):
print >>sys.stderr, "Test"
self._try_test_run(TestTest, """<testsuite errors="0" failures="0" name="unittest.TestSuite" tests="1" time="0.000">
<testcase classname="__main__.TestTest" name="test_foo" time="0.000"></testcase>
<system-out><![CDATA[]]></system-out>
<system-err><![CDATA[Test
]]></system-err>
</testsuite>
""")
class NullStream(object):
"""A file-like object that discards everything written to it."""
def write(self, buffer):
pass
def test_unittests_changing_stdout(self):
"""Check whether the XMLTestRunner recovers gracefully from unit tests
that change stdout, but don't change it back properly.
"""
class TestTest(unittest.TestCase):
def test_foo(self):
sys.stdout = XMLTestRunnerTest.NullStream()
runner = XMLTestRunner(self._stream)
runner.run(unittest.makeSuite(TestTest))
def test_unittests_changing_stderr(self):
"""Check whether the XMLTestRunner recovers gracefully from unit tests
that change stderr, but don't change it back properly.
"""
class TestTest(unittest.TestCase):
def test_foo(self):
sys.stderr = XMLTestRunnerTest.NullStream()
runner = XMLTestRunner(self._stream)
runner.run(unittest.makeSuite(TestTest))
class XMLTestProgram(unittest.TestProgram):
def runTests(self):
if self.testRunner is None:
self.testRunner = XMLTestRunner()
unittest.TestProgram.runTests(self)
main = XMLTestProgram
if __name__ == "__main__":
main(module=None)

View File

@ -1,11 +0,0 @@
<launch>
<node name="talker" pkg="test_ros" type="talker.py" />
<param name="hztest1/topic" value="chatter" />
<param name="hztest1/hz" value="10.0" />
<param name="hztest1/hzerror" value="0.5" />
<param name="hztest1/test_duration" value="5.0" />
<param name="hztest1/wait_time" value="21.0" />
<test test-name="hztest_test" pkg="rostest" type="hztest" name="hztest1" />
</launch>

View File

@ -1,7 +0,0 @@
<launch>
<!-- verify that hztest works with 0 rate. NOTE: there is no test for failure here, which needs to be added somehow -->
<param name="hztest0/topic" value="fake" />
<param name="hztest0/hz" value="0.0" />
<param name="hztest0/test_duration" value="5.0" />
<test test-name="hz0_test" pkg="rostest" type="hztest" name="hztest0" />
</launch>

View File

@ -1,7 +0,0 @@
<launch>
<!-- as these tests are designed to fail, they aren't be added to the normal test regression suite.
they must be manually run with the knowledge that they will fail -->
<test test-name="time_limit_test" pkg="rostest" type="time_limit_test.py" time-limit="1" />
<!-- test normal timeout (60 seconds) -->
<test test-name="time_limit_test__no_limit" pkg="rostest" type="time_limit_test.py" />
</launch>

View File

@ -1,213 +0,0 @@
/*
* Copyright (c) 2008, Willow Garage, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of the Willow Garage, Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include <gtest/gtest.h>
#include <string>
#include "rostest/permuter.h"
using namespace rostest;
double epsilon = 1e-9;
TEST(Permuter, PermuteOption)
{
std::vector<double> vals;
vals.push_back(1.0);
vals.push_back(2.0);
vals.push_back(3.0);
vals.push_back(4.0);
double value = 0;
PermuteOption<double> op(vals, &value);
for ( unsigned int i = 0; i < vals.size(); i++)
{
EXPECT_NEAR(vals[i], value, epsilon);
if (i < vals.size() -1)
EXPECT_TRUE(op.step());
else
EXPECT_FALSE(op.step());
};
}
TEST(Permuter, OneDoublePermuteOption)
{
double epsilon = 1e-9;
rostest::Permuter permuter;
std::vector<double> vals;
vals.push_back(1.0);
vals.push_back(2.0);
vals.push_back(3.0);
vals.push_back(4.0);
double value = 0;
permuter.addOptionSet(vals, &value);
for ( unsigned int i = 0; i < vals.size(); i++)
{
EXPECT_NEAR(vals[i], value, epsilon);
if (i < vals.size() -1)
EXPECT_TRUE(permuter.step());
else
EXPECT_FALSE(permuter.step());
};
}
TEST(Permuter, TwoDoubleOptions)
{
double epsilon = 1e-9;
Permuter permuter;
std::vector<double> vals;
vals.push_back(1.0);
vals.push_back(2.0);
vals.push_back(3.0);
vals.push_back(4.0);
double value = 0;
std::vector<double> vals2;
vals2.push_back(9.0);
vals2.push_back(8.0);
vals2.push_back(7.0);
vals2.push_back(6.0);
double value2;
permuter.addOptionSet(vals, &value);
permuter.addOptionSet(vals2, &value2);
for ( unsigned int j = 0; j < vals2.size(); j++)
for ( unsigned int i = 0; i < vals.size(); i++)
{
//printf("%f?=%f %f?=%f\n", value, vals[i], value2, vals2[j]);
EXPECT_NEAR(vals[i], value, epsilon);
EXPECT_NEAR(vals2[j], value2, epsilon);
if (i == vals.size() -1 && j == vals2.size() -1)
EXPECT_FALSE(permuter.step());
else
EXPECT_TRUE(permuter.step());
};
}
TEST(Permuter, ThreeDoubleOptions)
{
double epsilon = 1e-9;
Permuter permuter;
std::vector<double> vals;
vals.push_back(1.0);
vals.push_back(2.0);
vals.push_back(3.0);
vals.push_back(4.0);
double value = 0;
std::vector<double> vals2;
vals2.push_back(9.0);
vals2.push_back(8.0);
vals2.push_back(7.0);
vals2.push_back(6.0);
double value2;
std::vector<double> vals3;
vals3.push_back(99.0);
vals3.push_back(88.0);
vals3.push_back(78.0);
vals3.push_back(63.0);
double value3;
permuter.addOptionSet(vals, &value);
permuter.addOptionSet(vals2, &value2);
permuter.addOptionSet(vals3, &value3);
for ( unsigned int k = 0; k < vals3.size(); k++)
for ( unsigned int j = 0; j < vals2.size(); j++)
for ( unsigned int i = 0; i < vals.size(); i++)
{
EXPECT_NEAR(vals[i], value, epsilon);
EXPECT_NEAR(vals2[j], value2, epsilon);
EXPECT_NEAR(vals3[k], value3, epsilon);
if (i == vals.size() -1 && j == vals2.size() -1&& k == vals3.size() -1)
EXPECT_FALSE(permuter.step());
else
EXPECT_TRUE(permuter.step());
};
}
TEST(Permuter, DoubleStringPermuteOptions)
{
double epsilon = 1e-9;
Permuter permuter;
std::vector<double> vals;
vals.push_back(1.0);
vals.push_back(2.0);
vals.push_back(3.0);
vals.push_back(4.0);
double value = 0;
std::vector<std::string> vals2;
vals2.push_back("hi");
vals2.push_back("there");
vals2.push_back("this");
vals2.push_back("works");
std::string value2;
permuter.addOptionSet(vals, &value);
permuter.addOptionSet(vals2, &value2);
for ( unsigned int j = 0; j < vals2.size(); j++)
for ( unsigned int i = 0; i < vals.size(); i++)
{
//printf("%f?=%f %s?=%s\n", value, vals[i], value2.c_str(), vals2[j].c_str());
EXPECT_NEAR(vals[i], value, epsilon);
EXPECT_STREQ(vals2[j].c_str(), value2.c_str());
if (i == vals.size() -1 && j == vals2.size() -1)
EXPECT_FALSE(permuter.step());
else
EXPECT_TRUE(permuter.step());
};
}
int main(int argc, char **argv){
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@ -1,41 +0,0 @@
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
## only point of this test is to be killed within a short period of time
import roslib; roslib.load_manifest('rostest')
import rospy
while not rospy.is_shutdown():
pass