Issue #12469: replace assertions by explicit if+raise

This commit is contained in:
Victor Stinner 2011-07-04 18:06:35 +02:00
parent d554cdf8b9
commit 0a01f13af8
1 changed files with 32 additions and 16 deletions

View File

@ -244,7 +244,8 @@ def check_signum(signals):
# reliable) # reliable)
raised = set(raised) raised = set(raised)
signals = set(signals) signals = set(signals)
assert raised == signals, "%r != %r" % (raised, signals) if raised != signals:
raise Exception("%r != %r" % (raised, signals))
{} {}
@ -280,11 +281,13 @@ def test_wakeup_fd_early(self):
time.sleep(TIMEOUT_FULL) time.sleep(TIMEOUT_FULL)
mid_time = time.time() mid_time = time.time()
dt = mid_time - before_time dt = mid_time - before_time
assert dt < TIMEOUT_HALF, dt if dt >= TIMEOUT_HALF:
raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
select.select([read], [], [], TIMEOUT_FULL) select.select([read], [], [], TIMEOUT_FULL)
after_time = time.time() after_time = time.time()
dt = after_time - mid_time dt = after_time - mid_time
assert dt < TIMEOUT_HALF, dt if dt >= TIMEOUT_HALF:
raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
""", signal.SIGALRM) """, signal.SIGALRM)
def test_wakeup_fd_during(self): def test_wakeup_fd_during(self):
@ -306,7 +309,8 @@ def test_wakeup_fd_during(self):
raise Exception("select.error not raised") raise Exception("select.error not raised")
after_time = time.time() after_time = time.time()
dt = after_time - before_time dt = after_time - before_time
assert dt < TIMEOUT_HALF, dt if dt >= TIMEOUT_HALF:
raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
""", signal.SIGALRM) """, signal.SIGALRM)
def test_signum(self): def test_signum(self):
@ -540,7 +544,8 @@ def handler(signum, frame):
signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
os.kill(os.getpid(), signum) os.kill(os.getpid(), signum)
pending = signal.sigpending() pending = signal.sigpending()
assert pending == {signum}, '%s != {%s}' % (pending, signum) if pending != {signum}:
raise Exception('%s != {%s}' % (pending, signum))
try: try:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
except ZeroDivisionError: except ZeroDivisionError:
@ -637,7 +642,8 @@ def test_sigwait(self):
def test(signum): def test(signum):
signal.alarm(1) signal.alarm(1)
received = signal.sigwait([signum]) received = signal.sigwait([signum])
assert received == signum , 'received %s, not %s' % (received, signum) if received != signum:
raise Exception('received %s, not %s' % (received, signum))
''') ''')
@unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
@ -647,7 +653,8 @@ def test_sigwaitinfo(self):
def test(signum): def test(signum):
signal.alarm(1) signal.alarm(1)
info = signal.sigwaitinfo([signum]) info = signal.sigwaitinfo([signum])
assert info.si_signo == signum, "info.si_signo != %s" % signum if info.si_signo != signum:
raise Exception("info.si_signo != %s" % signum)
''') ''')
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'), @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
@ -657,7 +664,8 @@ def test_sigtimedwait(self):
def test(signum): def test(signum):
signal.alarm(1) signal.alarm(1)
info = signal.sigtimedwait([signum], (10, 1000)) info = signal.sigtimedwait([signum], (10, 1000))
assert info.si_signo == signum, 'info.si_signo != %s' % signum if info.si_signo != signum:
raise Exception('info.si_signo != %s' % signum)
''') ''')
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'), @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
@ -672,7 +680,8 @@ def test(signum):
import os import os
os.kill(os.getpid(), signum) os.kill(os.getpid(), signum)
info = signal.sigtimedwait([signum], (0, 0)) info = signal.sigtimedwait([signum], (0, 0))
assert info.si_signo == signum, 'info.si_signo != %s' % signum if info.si_signo != signum:
raise Exception('info.si_signo != %s' % signum)
''') ''')
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'), @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
@ -681,7 +690,8 @@ def test_sigtimedwait_timeout(self):
self.wait_helper(signal.SIGALRM, ''' self.wait_helper(signal.SIGALRM, '''
def test(signum): def test(signum):
received = signal.sigtimedwait([signum], (1, 0)) received = signal.sigtimedwait([signum], (1, 0))
assert received is None, "received=%r" % (received,) if received is not None:
raise Exception("received=%r" % (received,))
''') ''')
@unittest.skipUnless(hasattr(signal, 'sigtimedwait'), @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
@ -709,7 +719,8 @@ def alarm_handler(signum, frame):
signal.sigwaitinfo([signal.SIGUSR1]) signal.sigwaitinfo([signal.SIGUSR1])
except OSError as e: except OSError as e:
if e.errno == errno.EINTR: if e.errno == errno.EINTR:
assert hndl_called, "SIGALRM handler not called" if not hndl_called:
raise Exception("SIGALRM handler not called")
else: else:
raise Exception("Expected EINTR to be raised by sigwaitinfo") raise Exception("Expected EINTR to be raised by sigwaitinfo")
else: else:
@ -796,8 +807,10 @@ def read_sigmask():
# Check the new mask # Check the new mask
blocked = read_sigmask() blocked = read_sigmask()
assert signum in blocked, "%s not in %s" % (signum, blocked) if signum not in blocked:
assert old_mask ^ blocked == {signum}, "%s ^ %s != {%s}" % (old_mask, blocked, signum) raise Exception("%s not in %s" % (signum, blocked))
if old_mask ^ blocked != {signum}:
raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
# Unblock SIGUSR1 # Unblock SIGUSR1
try: try:
@ -816,9 +829,12 @@ def read_sigmask():
# Check the new mask # Check the new mask
unblocked = read_sigmask() unblocked = read_sigmask()
assert signum not in unblocked, "%s in %s" % (signum, unblocked) if signum in unblocked:
assert blocked ^ unblocked == {signum}, "%s ^ %s != {%s}" % (blocked, unblocked, signum) raise Exception("%s in %s" % (signum, unblocked))
assert old_mask == unblocked, "%s != %s" % (old_mask, unblocked) if blocked ^ unblocked != {signum}:
raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
if old_mask != unblocked:
raise Exception("%s != %s" % (old_mask, unblocked))
""" """
assert_python_ok('-c', code) assert_python_ok('-c', code)