mirror of https://github.com/python/cpython.git
bpo-29861: release references to multiprocessing Pool tasks (#743)
* bpo-29861: release references to multiprocessing Pool tasks Release references to tasks, their arguments and their results as soon as they are finished, instead of keeping them alive until another task arrives. * Comments in test
This commit is contained in:
parent
e304e33c16
commit
8988945cdc
|
@ -128,6 +128,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
|
||||||
util.debug("Possible encoding error while sending result: %s" % (
|
util.debug("Possible encoding error while sending result: %s" % (
|
||||||
wrapped))
|
wrapped))
|
||||||
put((job, i, (False, wrapped)))
|
put((job, i, (False, wrapped)))
|
||||||
|
|
||||||
|
task = job = result = func = args = kwds = None
|
||||||
completed += 1
|
completed += 1
|
||||||
util.debug('worker exiting after %d tasks' % completed)
|
util.debug('worker exiting after %d tasks' % completed)
|
||||||
|
|
||||||
|
@ -402,10 +404,11 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
|
||||||
if set_length:
|
if set_length:
|
||||||
util.debug('doing set_length()')
|
util.debug('doing set_length()')
|
||||||
set_length(i+1)
|
set_length(i+1)
|
||||||
|
finally:
|
||||||
|
task = taskseq = job = None
|
||||||
else:
|
else:
|
||||||
util.debug('task handler got sentinel')
|
util.debug('task handler got sentinel')
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# tell result handler to finish when cache is empty
|
# tell result handler to finish when cache is empty
|
||||||
util.debug('task handler sending sentinel to result handler')
|
util.debug('task handler sending sentinel to result handler')
|
||||||
|
@ -445,6 +448,7 @@ def _handle_results(outqueue, get, cache):
|
||||||
cache[job]._set(i, obj)
|
cache[job]._set(i, obj)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
task = job = obj = None
|
||||||
|
|
||||||
while cache and thread._state != TERMINATE:
|
while cache and thread._state != TERMINATE:
|
||||||
try:
|
try:
|
||||||
|
@ -461,6 +465,7 @@ def _handle_results(outqueue, get, cache):
|
||||||
cache[job]._set(i, obj)
|
cache[job]._set(i, obj)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
pass
|
pass
|
||||||
|
task = job = obj = None
|
||||||
|
|
||||||
if hasattr(outqueue, '_reader'):
|
if hasattr(outqueue, '_reader'):
|
||||||
util.debug('ensuring that outqueue is not full')
|
util.debug('ensuring that outqueue is not full')
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
import logging
|
import logging
|
||||||
import struct
|
import struct
|
||||||
import operator
|
import operator
|
||||||
|
import weakref
|
||||||
import test.support
|
import test.support
|
||||||
import test.support.script_helper
|
import test.support.script_helper
|
||||||
|
|
||||||
|
@ -1738,6 +1739,19 @@ def raise_large_valuerror(wait):
|
||||||
time.sleep(wait)
|
time.sleep(wait)
|
||||||
raise ValueError("x" * 1024**2)
|
raise ValueError("x" * 1024**2)
|
||||||
|
|
||||||
|
def identity(x):
|
||||||
|
return x
|
||||||
|
|
||||||
|
class CountedObject(object):
|
||||||
|
n_instances = 0
|
||||||
|
|
||||||
|
def __new__(cls):
|
||||||
|
cls.n_instances += 1
|
||||||
|
return object.__new__(cls)
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
type(self).n_instances -= 1
|
||||||
|
|
||||||
class SayWhenError(ValueError): pass
|
class SayWhenError(ValueError): pass
|
||||||
|
|
||||||
def exception_throwing_generator(total, when):
|
def exception_throwing_generator(total, when):
|
||||||
|
@ -1746,6 +1760,7 @@ def exception_throwing_generator(total, when):
|
||||||
raise SayWhenError("Somebody said when")
|
raise SayWhenError("Somebody said when")
|
||||||
yield i
|
yield i
|
||||||
|
|
||||||
|
|
||||||
class _TestPool(BaseTestCase):
|
class _TestPool(BaseTestCase):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
@ -2000,6 +2015,19 @@ def test_map_no_failfast(self):
|
||||||
# check that we indeed waited for all jobs
|
# check that we indeed waited for all jobs
|
||||||
self.assertGreater(time.time() - t_start, 0.9)
|
self.assertGreater(time.time() - t_start, 0.9)
|
||||||
|
|
||||||
|
def test_release_task_refs(self):
|
||||||
|
# Issue #29861: task arguments and results should not be kept
|
||||||
|
# alive after we are done with them.
|
||||||
|
objs = [CountedObject() for i in range(10)]
|
||||||
|
refs = [weakref.ref(o) for o in objs]
|
||||||
|
self.pool.map(identity, objs)
|
||||||
|
|
||||||
|
del objs
|
||||||
|
self.assertEqual(set(wr() for wr in refs), {None})
|
||||||
|
# With a process pool, copies of the objects are returned, check
|
||||||
|
# they were released too.
|
||||||
|
self.assertEqual(CountedObject.n_instances, 0)
|
||||||
|
|
||||||
|
|
||||||
def raising():
|
def raising():
|
||||||
raise KeyError("key")
|
raise KeyError("key")
|
||||||
|
|
|
@ -287,6 +287,9 @@ Extension Modules
|
||||||
Library
|
Library
|
||||||
-------
|
-------
|
||||||
|
|
||||||
|
- bpo-29861: Release references to tasks, their arguments and their results
|
||||||
|
as soon as they are finished in multiprocessing.Pool.
|
||||||
|
|
||||||
- bpo-19930: The mode argument of os.makedirs() no longer affects the file
|
- bpo-19930: The mode argument of os.makedirs() no longer affects the file
|
||||||
permission bits of newly-created intermediate-level directories.
|
permission bits of newly-created intermediate-level directories.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue