mirror of https://github.com/python/cpython.git
subprocess's Popen.wait() is now thread safe so that multiple threads
may be calling wait() or poll() on a Popen instance at the same time without losing the Popen.returncode value. Fixes issue #21291.
This commit is contained in:
parent
9e599673b4
commit
d65ba51e24
|
@ -405,6 +405,10 @@ class STARTUPINFO:
|
||||||
import _posixsubprocess
|
import _posixsubprocess
|
||||||
import select
|
import select
|
||||||
import selectors
|
import selectors
|
||||||
|
try:
|
||||||
|
import threading
|
||||||
|
except ImportError:
|
||||||
|
import dummy_threading as threading
|
||||||
|
|
||||||
# When select or poll has indicated that the file is writable,
|
# When select or poll has indicated that the file is writable,
|
||||||
# we can write up to _PIPE_BUF bytes without risk of blocking.
|
# we can write up to _PIPE_BUF bytes without risk of blocking.
|
||||||
|
@ -748,6 +752,12 @@ def __init__(self, args, bufsize=-1, executable=None,
|
||||||
pass_fds=()):
|
pass_fds=()):
|
||||||
"""Create new Popen instance."""
|
"""Create new Popen instance."""
|
||||||
_cleanup()
|
_cleanup()
|
||||||
|
# Held while anything is calling waitpid before returncode has been
|
||||||
|
# updated to prevent clobbering returncode if wait() or poll() are
|
||||||
|
# called from multiple threads at once. After acquiring the lock,
|
||||||
|
# code must re-check self.returncode to see if another thread just
|
||||||
|
# finished a waitpid() call.
|
||||||
|
self._waitpid_lock = threading.Lock()
|
||||||
|
|
||||||
self._input = None
|
self._input = None
|
||||||
self._communication_started = False
|
self._communication_started = False
|
||||||
|
@ -1450,6 +1460,7 @@ def _execute_child(self, args, executable, preexec_fn, close_fds,
|
||||||
def _handle_exitstatus(self, sts, _WIFSIGNALED=os.WIFSIGNALED,
|
def _handle_exitstatus(self, sts, _WIFSIGNALED=os.WIFSIGNALED,
|
||||||
_WTERMSIG=os.WTERMSIG, _WIFEXITED=os.WIFEXITED,
|
_WTERMSIG=os.WTERMSIG, _WIFEXITED=os.WIFEXITED,
|
||||||
_WEXITSTATUS=os.WEXITSTATUS):
|
_WEXITSTATUS=os.WEXITSTATUS):
|
||||||
|
"""All callers to this function MUST hold self._waitpid_lock."""
|
||||||
# This method is called (indirectly) by __del__, so it cannot
|
# This method is called (indirectly) by __del__, so it cannot
|
||||||
# refer to anything outside of its local scope.
|
# refer to anything outside of its local scope.
|
||||||
if _WIFSIGNALED(sts):
|
if _WIFSIGNALED(sts):
|
||||||
|
@ -1471,7 +1482,13 @@ def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,
|
||||||
|
|
||||||
"""
|
"""
|
||||||
if self.returncode is None:
|
if self.returncode is None:
|
||||||
|
if not self._waitpid_lock.acquire(False):
|
||||||
|
# Something else is busy calling waitpid. Don't allow two
|
||||||
|
# at once. We know nothing yet.
|
||||||
|
return None
|
||||||
try:
|
try:
|
||||||
|
if self.returncode is not None:
|
||||||
|
return self.returncode # Another thread waited.
|
||||||
pid, sts = _waitpid(self.pid, _WNOHANG)
|
pid, sts = _waitpid(self.pid, _WNOHANG)
|
||||||
if pid == self.pid:
|
if pid == self.pid:
|
||||||
self._handle_exitstatus(sts)
|
self._handle_exitstatus(sts)
|
||||||
|
@ -1485,10 +1502,13 @@ def _internal_poll(self, _deadstate=None, _waitpid=os.waitpid,
|
||||||
# can't get the status.
|
# can't get the status.
|
||||||
# http://bugs.python.org/issue15756
|
# http://bugs.python.org/issue15756
|
||||||
self.returncode = 0
|
self.returncode = 0
|
||||||
|
finally:
|
||||||
|
self._waitpid_lock.release()
|
||||||
return self.returncode
|
return self.returncode
|
||||||
|
|
||||||
|
|
||||||
def _try_wait(self, wait_flags):
|
def _try_wait(self, wait_flags):
|
||||||
|
"""All callers to this function MUST hold self._waitpid_lock."""
|
||||||
try:
|
try:
|
||||||
(pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags)
|
(pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
|
@ -1521,11 +1541,17 @@ def wait(self, timeout=None, endtime=None):
|
||||||
# cribbed from Lib/threading.py in Thread.wait() at r71065.
|
# cribbed from Lib/threading.py in Thread.wait() at r71065.
|
||||||
delay = 0.0005 # 500 us -> initial delay of 1 ms
|
delay = 0.0005 # 500 us -> initial delay of 1 ms
|
||||||
while True:
|
while True:
|
||||||
(pid, sts) = self._try_wait(os.WNOHANG)
|
if self._waitpid_lock.acquire(False):
|
||||||
assert pid == self.pid or pid == 0
|
try:
|
||||||
if pid == self.pid:
|
if self.returncode is not None:
|
||||||
self._handle_exitstatus(sts)
|
break # Another thread waited.
|
||||||
break
|
(pid, sts) = self._try_wait(os.WNOHANG)
|
||||||
|
assert pid == self.pid or pid == 0
|
||||||
|
if pid == self.pid:
|
||||||
|
self._handle_exitstatus(sts)
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
self._waitpid_lock.release()
|
||||||
remaining = self._remaining_time(endtime)
|
remaining = self._remaining_time(endtime)
|
||||||
if remaining <= 0:
|
if remaining <= 0:
|
||||||
raise TimeoutExpired(self.args, timeout)
|
raise TimeoutExpired(self.args, timeout)
|
||||||
|
@ -1533,11 +1559,15 @@ def wait(self, timeout=None, endtime=None):
|
||||||
time.sleep(delay)
|
time.sleep(delay)
|
||||||
else:
|
else:
|
||||||
while self.returncode is None:
|
while self.returncode is None:
|
||||||
(pid, sts) = self._try_wait(0)
|
with self._waitpid_lock:
|
||||||
# Check the pid and loop as waitpid has been known to return
|
if self.returncode is not None:
|
||||||
# 0 even without WNOHANG in odd situations. issue14396.
|
break # Another thread waited.
|
||||||
if pid == self.pid:
|
(pid, sts) = self._try_wait(0)
|
||||||
self._handle_exitstatus(sts)
|
# Check the pid and loop as waitpid has been known to
|
||||||
|
# return 0 even without WNOHANG in odd situations.
|
||||||
|
# http://bugs.python.org/issue14396.
|
||||||
|
if pid == self.pid:
|
||||||
|
self._handle_exitstatus(sts)
|
||||||
return self.returncode
|
return self.returncode
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1052,6 +1052,54 @@ def open_fds():
|
||||||
if exc is not None:
|
if exc is not None:
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
|
@unittest.skipIf(threading is None, "threading required")
|
||||||
|
def test_threadsafe_wait(self):
|
||||||
|
"""Issue21291: Popen.wait() needs to be threadsafe for returncode."""
|
||||||
|
proc = subprocess.Popen([sys.executable, '-c',
|
||||||
|
'import time; time.sleep(12)'])
|
||||||
|
self.assertEqual(proc.returncode, None)
|
||||||
|
results = []
|
||||||
|
|
||||||
|
def kill_proc_timer_thread():
|
||||||
|
results.append(('thread-start-poll-result', proc.poll()))
|
||||||
|
# terminate it from the thread and wait for the result.
|
||||||
|
proc.kill()
|
||||||
|
proc.wait()
|
||||||
|
results.append(('thread-after-kill-and-wait', proc.returncode))
|
||||||
|
# this wait should be a no-op given the above.
|
||||||
|
proc.wait()
|
||||||
|
results.append(('thread-after-second-wait', proc.returncode))
|
||||||
|
|
||||||
|
# This is a timing sensitive test, the failure mode is
|
||||||
|
# triggered when both the main thread and this thread are in
|
||||||
|
# the wait() call at once. The delay here is to allow the
|
||||||
|
# main thread to most likely be blocked in its wait() call.
|
||||||
|
t = threading.Timer(0.2, kill_proc_timer_thread)
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
# Wait for the process to finish; the thread should kill it
|
||||||
|
# long before it finishes on its own. Supplying a timeout
|
||||||
|
# triggers a different code path for better coverage.
|
||||||
|
proc.wait(timeout=20)
|
||||||
|
# Should be -9 because of the proc.kill() from the thread.
|
||||||
|
self.assertEqual(proc.returncode, -9,
|
||||||
|
msg="unexpected result in wait from main thread")
|
||||||
|
|
||||||
|
# This should be a no-op with no change in returncode.
|
||||||
|
proc.wait()
|
||||||
|
self.assertEqual(proc.returncode, -9,
|
||||||
|
msg="unexpected result in second main wait.")
|
||||||
|
|
||||||
|
t.join()
|
||||||
|
# Ensure that all of the thread results are as expected.
|
||||||
|
# When a race condition occurs in wait(), the returncode could
|
||||||
|
# be set by the wrong thread that doesn't actually have it
|
||||||
|
# leading to an incorrect value.
|
||||||
|
self.assertEqual([('thread-start-poll-result', None),
|
||||||
|
('thread-after-kill-and-wait', -9),
|
||||||
|
('thread-after-second-wait', -9)],
|
||||||
|
results)
|
||||||
|
|
||||||
def test_issue8780(self):
|
def test_issue8780(self):
|
||||||
# Ensure that stdout is inherited from the parent
|
# Ensure that stdout is inherited from the parent
|
||||||
# if stdout=PIPE is not used
|
# if stdout=PIPE is not used
|
||||||
|
|
|
@ -39,6 +39,10 @@ Core and Builtins
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- Issue #21291: subprocess's Popen.wait() is now thread safe so that
|
||||||
|
multiple threads may be calling wait() or poll() on a Popen instance
|
||||||
|
at the same time without losing the Popen.returncode value.
|
||||||
|
|
||||||
- Issue #21127: Path objects can now be instantiated from str subclass
|
- Issue #21127: Path objects can now be instantiated from str subclass
|
||||||
instances (such as numpy.str_).
|
instances (such as numpy.str_).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue