python-testtools/testtools/tests/test_testsuite.py

342 lines
12 KiB
Python

# Copyright (c) 2009-2015 testtools developers. See LICENSE for details.
"""Test ConcurrentTestSuite and related things."""
import doctest
from pprint import pformat
import unittest
from testtools import (
ConcurrentTestSuite,
ConcurrentStreamTestSuite,
iterate_tests,
PlaceHolder,
TestByTestResult,
TestCase,
)
from testtools.helpers import try_import
from testtools.matchers import DocTestMatches, Equals
from testtools.testresult.doubles import StreamResult as LoggingStream
from testtools.testsuite import FixtureSuite, sorted_tests
from testtools.tests.helpers import LoggingResult
FunctionFixture = try_import('fixtures.FunctionFixture')
class Sample(TestCase):
def __hash__(self):
return id(self)
def test_method1(self):
pass
def test_method2(self):
pass
class TestConcurrentTestSuiteRun(TestCase):
def test_broken_test(self):
log = []
def on_test(test, status, start_time, stop_time, tags, details):
log.append((test.id(), status, set(details.keys())))
class BrokenTest:
# Simple break - no result parameter to run()
def __call__(self):
pass
run = __call__
original_suite = unittest.TestSuite([BrokenTest()])
suite = ConcurrentTestSuite(original_suite, self.split_suite)
suite.run(TestByTestResult(on_test))
self.assertEqual([('broken-runner', 'error', {'traceback'})], log)
def test_trivial(self):
log = []
result = LoggingResult(log)
test1 = Sample('test_method1')
test2 = Sample('test_method2')
original_suite = unittest.TestSuite([test1, test2])
suite = ConcurrentTestSuite(original_suite, self.split_suite)
suite.run(result)
# log[0] is the timestamp for the first test starting.
test1 = log[1][1]
test2 = log[-1][1]
self.assertIsInstance(test1, Sample)
self.assertIsInstance(test2, Sample)
self.assertNotEqual(test1.id(), test2.id())
def test_wrap_result(self):
# ConcurrentTestSuite has a hook for wrapping the per-thread result.
wrap_log = []
def wrap_result(thread_safe_result, thread_number):
wrap_log.append(
(thread_safe_result.result.decorated, thread_number))
return thread_safe_result
result_log = []
result = LoggingResult(result_log)
test1 = Sample('test_method1')
test2 = Sample('test_method2')
original_suite = unittest.TestSuite([test1, test2])
suite = ConcurrentTestSuite(
original_suite, self.split_suite, wrap_result=wrap_result)
suite.run(result)
self.assertEqual(
[(result, 0),
(result, 1),
], wrap_log)
# Smoke test to make sure everything ran OK.
self.assertNotEqual([], result_log)
def split_suite(self, suite):
return list(iterate_tests(suite))
class TestConcurrentStreamTestSuiteRun(TestCase):
def test_trivial(self):
result = LoggingStream()
test1 = Sample('test_method1')
test2 = Sample('test_method2')
def cases():
return [(test1, '0'), (test2, '1')]
suite = ConcurrentStreamTestSuite(cases)
suite.run(result)
def freeze(set_or_none):
if set_or_none is None:
return set_or_none
return frozenset(set_or_none)
# Ignore event order: we're testing the code is all glued together,
# which just means we can pump events through and they get route codes
# added appropriately.
self.assertEqual({
('status',
'testtools.tests.test_testsuite.Sample.test_method1',
'inprogress',
None,
True,
None,
None,
False,
None,
'0',
None,
),
('status',
'testtools.tests.test_testsuite.Sample.test_method1',
'success',
frozenset(),
True,
None,
None,
False,
None,
'0',
None,
),
('status',
'testtools.tests.test_testsuite.Sample.test_method2',
'inprogress',
None,
True,
None,
None,
False,
None,
'1',
None,
),
('status',
'testtools.tests.test_testsuite.Sample.test_method2',
'success',
frozenset(),
True,
None,
None,
False,
None,
'1',
None,
),
}, {event[0:3] + (freeze(event[3]),) + event[4:10] + (None,)
for event in result._events})
def test_broken_runner(self):
# If the object called breaks, the stream is informed about it
# regardless.
class BrokenTest:
# broken - no result parameter!
def __call__(self):
pass
def run(self):
pass
result = LoggingStream()
def cases():
return [(BrokenTest(), '0')]
suite = ConcurrentStreamTestSuite(cases)
suite.run(result)
events = result._events
# Check the traceback loosely.
self.assertEqual(events[1][6].decode('utf8'),
"Traceback (most recent call last):\n")
self.assertThat(events[2][6].decode('utf8'), DocTestMatches("""\
File "...testtools/testsuite.py", line ..., in _run_test
test.run(process_result)...
""", doctest.ELLIPSIS))
self.assertThat(events[3][6].decode('utf8'), DocTestMatches("""\
TypeError: ...run() takes ...1 ...argument...2...given...
""", doctest.ELLIPSIS))
events = [event[0:10] + (None,) for event in events]
events[1] = events[1][:6] + (None,) + events[1][7:]
events[2] = events[2][:6] + (None,) + events[2][7:]
events[3] = events[3][:6] + (None,) + events[3][7:]
self.assertEqual([
('status', "broken-runner-'0'", 'inprogress', None, True, None, None, False, None, '0', None),
('status', "broken-runner-'0'", None, None, True, 'traceback', None,
False,
'text/x-traceback; charset="utf8"; language="python"',
'0',
None),
('status', "broken-runner-'0'", None, None, True, 'traceback', None,
False,
'text/x-traceback; charset="utf8"; language="python"',
'0',
None),
('status', "broken-runner-'0'", None, None, True, 'traceback', None,
True,
'text/x-traceback; charset="utf8"; language="python"',
'0',
None),
('status', "broken-runner-'0'", 'fail', set(), True, None, None, False, None, '0', None)
], events)
def split_suite(self, suite):
tests = list(enumerate(iterate_tests(suite)))
return [(test, str(pos)) for pos, test in tests]
def test_setupclass_skip(self):
# We should support setupclass skipping using cls.skipException.
# Because folk have used that.
class Skips(TestCase):
@classmethod
def setUpClass(cls):
raise cls.skipException('foo')
def test_notrun(self):
pass
# Test discovery uses the default suite from unittest (unless users
# deliberately change things, in which case they keep both pieces).
suite = unittest.TestSuite([Skips("test_notrun")])
log = []
result = LoggingResult(log)
suite.run(result)
self.assertEqual(['addSkip'], [item[0] for item in log])
def test_setupclass_upcall(self):
# Note that this is kindof-a-case-test, kindof-suite, because
# setUpClass is linked between them.
class Simples(TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
def test_simple(self):
pass
# Test discovery uses the default suite from unittest (unless users
# deliberately change things, in which case they keep both pieces).
suite = unittest.TestSuite([Simples("test_simple")])
log = []
result = LoggingResult(log)
suite.run(result)
self.assertEqual(
['startTest', 'addSuccess', 'stopTest'],
[item[0] for item in log])
class TestFixtureSuite(TestCase):
def setUp(self):
super().setUp()
if FunctionFixture is None:
self.skipTest("Need fixtures")
def test_fixture_suite(self):
log = []
class Sample(TestCase):
def test_one(self):
log.append(1)
def test_two(self):
log.append(2)
fixture = FunctionFixture(
lambda: log.append('setUp'),
lambda fixture: log.append('tearDown'))
suite = FixtureSuite(fixture, [Sample('test_one'), Sample('test_two')])
suite.run(LoggingResult([]))
self.assertEqual(['setUp', 1, 2, 'tearDown'], log)
def test_fixture_suite_sort(self):
log = []
class Sample(TestCase):
def test_one(self):
log.append(1)
def test_two(self):
log.append(2)
fixture = FunctionFixture(
lambda: log.append('setUp'),
lambda fixture: log.append('tearDown'))
suite = FixtureSuite(fixture, [Sample('test_one'), Sample('test_one')])
self.assertRaises(ValueError, suite.sort_tests)
class TestSortedTests(TestCase):
def test_sorts_custom_suites(self):
a = PlaceHolder('a')
b = PlaceHolder('b')
class Subclass(unittest.TestSuite):
def sort_tests(self):
self._tests = sorted_tests(self, True)
input_suite = Subclass([b, a])
suite = sorted_tests(input_suite)
self.assertEqual([a, b], list(iterate_tests(suite)))
self.assertEqual([input_suite], list(iter(suite)))
def test_custom_suite_without_sort_tests_works(self):
a = PlaceHolder('a')
b = PlaceHolder('b')
class Subclass(unittest.TestSuite):
pass
input_suite = Subclass([b, a])
suite = sorted_tests(input_suite)
self.assertEqual([b, a], list(iterate_tests(suite)))
self.assertEqual([input_suite], list(iter(suite)))
def test_sorts_simple_suites(self):
a = PlaceHolder('a')
b = PlaceHolder('b')
suite = sorted_tests(unittest.TestSuite([b, a]))
self.assertEqual([a, b], list(iterate_tests(suite)))
def test_duplicate_simple_suites(self):
a = PlaceHolder('a')
b = PlaceHolder('b')
c = PlaceHolder('a')
self.assertRaises(
ValueError, sorted_tests, unittest.TestSuite([a, b, c]))
def test_multiple_duplicates(self):
# If there are multiple duplicates on a test suite, we report on them
# all.
a = PlaceHolder('a')
b = PlaceHolder('b')
c = PlaceHolder('a')
d = PlaceHolder('b')
error = self.assertRaises(
ValueError, sorted_tests, unittest.TestSuite([a, b, c, d]))
self.assertThat(
str(error),
Equals("Duplicate test ids detected: {}".format(
pformat({'a': 2, 'b': 2}))))
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)