diff --git a/Lib/test/test_free_threading/test_iteration.py b/Lib/test/test_free_threading/test_iteration.py new file mode 100644 index 000000000000..a51ad0cf83a0 --- /dev/null +++ b/Lib/test/test_free_threading/test_iteration.py @@ -0,0 +1,127 @@ +import threading +import unittest +from test import support + +# The race conditions these tests were written for only happen every now and +# then, even with the current numbers. To find rare race conditions, bumping +# these up will help, but it makes the test runtime highly variable under +# free-threading. Overhead is much higher under ThreadSanitizer, but it's +# also much better at detecting certain races, so we don't need as many +# items/threads. +if support.check_sanitizer(thread=True): + NUMITEMS = 1000 + NUMTHREADS = 2 +else: + NUMITEMS = 100000 + NUMTHREADS = 5 +NUMMUTATORS = 2 + +class ContendedTupleIterationTest(unittest.TestCase): + def make_testdata(self, n): + return tuple(range(n)) + + def assert_iterator_results(self, results, expected): + # Most iterators are not atomic (yet?) so they can skip or duplicate + # items, but they should not invent new items (like the range + # iterator currently does). + extra_items = set(results) - set(expected) + self.assertEqual(set(), extra_items) + + def run_threads(self, func, *args, numthreads=NUMTHREADS): + threads = [] + for _ in range(numthreads): + t = threading.Thread(target=func, args=args) + t.start() + threads.append(t) + return threads + + def test_iteration(self): + """Test iteration over a shared container""" + seq = self.make_testdata(NUMITEMS) + results = [] + start = threading.Barrier(NUMTHREADS) + def worker(): + idx = 0 + start.wait() + for item in seq: + idx += 1 + results.append(idx) + threads = self.run_threads(worker) + for t in threads: + t.join() + # Each thread has its own iterator, so results should be entirely predictable. + self.assertEqual(results, [NUMITEMS] * NUMTHREADS) + + def test_shared_iterator(self): + """Test iteration over a shared iterator""" + seq = self.make_testdata(NUMITEMS) + it = iter(seq) + results = [] + start = threading.Barrier(NUMTHREADS) + def worker(): + items = [] + start.wait() + # We want a tight loop, so put items in the shared list at the end. + for item in it: + items.append(item) + results.extend(items) + threads = self.run_threads(worker) + for t in threads: + t.join() + self.assert_iterator_results(results, seq) + +class ContendedListIterationTest(ContendedTupleIterationTest): + def make_testdata(self, n): + return list(range(n)) + + def test_iteration_while_mutating(self): + """Test iteration over a shared mutating container.""" + seq = self.make_testdata(NUMITEMS) + results = [] + start = threading.Barrier(NUMTHREADS + NUMMUTATORS) + endmutate = threading.Event() + def mutator(): + orig = seq[:] + # Make changes big enough to cause resizing of the list, with + # items shifted around for good measure. + replacement = (orig * 3)[NUMITEMS//2:] + start.wait() + while not endmutate.is_set(): + seq.extend(replacement) + seq[:0] = orig + seq.__imul__(2) + seq.extend(seq) + seq[:] = orig + def worker(): + items = [] + start.wait() + # We want a tight loop, so put items in the shared list at the end. + for item in seq: + items.append(item) + results.extend(items) + mutators = () + try: + threads = self.run_threads(worker) + mutators = self.run_threads(mutator, numthreads=NUMMUTATORS) + for t in threads: + t.join() + finally: + endmutate.set() + for m in mutators: + m.join() + self.assert_iterator_results(results, list(seq)) + + +class ContendedRangeIterationTest(ContendedTupleIterationTest): + def make_testdata(self, n): + return range(n) + + def assert_iterator_results(self, results, expected): + # Range iterators that are shared between threads will (right now) + # sometimes produce items after the end of the range, sometimes + # _far_ after the end of the range. That should be fixed, but for + # now, let's just check they're integers that could have resulted + # from stepping beyond the range bounds. + extra_items = set(results) - set(expected) + for item in extra_items: + self.assertEqual((item - expected.start) % expected.step, 0) diff --git a/Objects/listobject.c b/Objects/listobject.c index 120e353b709e..84faa5a32a1f 100644 --- a/Objects/listobject.c +++ b/Objects/listobject.c @@ -357,7 +357,7 @@ list_get_item_ref(PyListObject *op, Py_ssize_t i) return NULL; } Py_ssize_t cap = list_capacity(ob_item); - assert(cap != -1 && cap >= size); + assert(cap != -1); if (!valid_index(i, cap)) { return NULL; } @@ -784,7 +784,8 @@ list_repeat_lock_held(PyListObject *a, Py_ssize_t n) _Py_RefcntAdd(*src, n); *dest++ = *src++; } - + // TODO: _Py_memory_repeat calls are not safe for shared lists in + // GIL_DISABLED builds. (See issue #129069) _Py_memory_repeat((char *)np->ob_item, sizeof(PyObject *)*output_size, sizeof(PyObject *)*input_size); } @@ -919,6 +920,8 @@ list_ass_slice_lock_held(PyListObject *a, Py_ssize_t ilow, Py_ssize_t ihigh, PyO if (d < 0) { /* Delete -d items */ Py_ssize_t tail; tail = (Py_SIZE(a) - ihigh) * sizeof(PyObject *); + // TODO: these memmove/memcpy calls are not safe for shared lists in + // GIL_DISABLED builds. (See issue #129069) memmove(&item[ihigh+d], &item[ihigh], tail); if (list_resize(a, Py_SIZE(a) + d) < 0) { memmove(&item[ihigh], &item[ihigh+d], tail); @@ -932,12 +935,14 @@ list_ass_slice_lock_held(PyListObject *a, Py_ssize_t ilow, Py_ssize_t ihigh, PyO if (list_resize(a, k+d) < 0) goto Error; item = a->ob_item; + // TODO: these memmove/memcpy calls are not safe for shared lists in + // GIL_DISABLED builds. (See issue #129069) memmove(&item[ihigh+d], &item[ihigh], (k - ihigh)*sizeof(PyObject *)); } for (k = 0; k < n; k++, ilow++) { PyObject *w = vitem[k]; - item[ilow] = Py_XNewRef(w); + FT_ATOMIC_STORE_PTR_RELEASE(item[ilow], Py_XNewRef(w)); } for (k = norig - 1; k >= 0; --k) Py_XDECREF(recycle[k]); @@ -1017,6 +1022,8 @@ list_inplace_repeat_lock_held(PyListObject *self, Py_ssize_t n) for (Py_ssize_t j = 0; j < input_size; j++) { _Py_RefcntAdd(items[j], n-1); } + // TODO: _Py_memory_repeat calls are not safe for shared lists in + // GIL_DISABLED builds. (See issue #129069) _Py_memory_repeat((char *)items, sizeof(PyObject *)*output_size, sizeof(PyObject *)*input_size); return 0; @@ -3993,7 +4000,7 @@ listiter_setstate(PyObject *self, PyObject *state) index = -1; else if (index > PyList_GET_SIZE(it->it_seq)) index = PyList_GET_SIZE(it->it_seq); /* iterator exhausted */ - it->it_index = index; + FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index); } Py_RETURN_NONE; } @@ -4145,7 +4152,7 @@ listreviter_setstate(PyObject *self, PyObject *state) index = -1; else if (index > PyList_GET_SIZE(it->it_seq) - 1) index = PyList_GET_SIZE(it->it_seq) - 1; - it->it_index = index; + FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index); } Py_RETURN_NONE; } @@ -4162,18 +4169,19 @@ listiter_reduce_general(void *_it, int forward) * call must be before access of iterator pointers. * see issue #101765 */ - /* the objects are not the same, index is of different types! */ if (forward) { iter = _PyEval_GetBuiltin(&_Py_ID(iter)); _PyListIterObject *it = (_PyListIterObject *)_it; - if (it->it_index >= 0) { - return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index); + Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index); + if (idx >= 0) { + return Py_BuildValue("N(O)n", iter, it->it_seq, idx); } } else { iter = _PyEval_GetBuiltin(&_Py_ID(reversed)); listreviterobject *it = (listreviterobject *)_it; - if (it->it_index >= 0) { - return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index); + Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index); + if (idx >= 0) { + return Py_BuildValue("N(O)n", iter, it->it_seq, idx); } } /* empty iterator, create an empty list */ diff --git a/Objects/tupleobject.c b/Objects/tupleobject.c index 60af9e40e3fe..b7416a5a1c53 100644 --- a/Objects/tupleobject.c +++ b/Objects/tupleobject.c @@ -1014,18 +1014,23 @@ tupleiter_next(PyObject *self) assert(it != NULL); seq = it->it_seq; +#ifndef Py_GIL_DISABLED if (seq == NULL) return NULL; +#endif assert(PyTuple_Check(seq)); - if (it->it_index < PyTuple_GET_SIZE(seq)) { - item = PyTuple_GET_ITEM(seq, it->it_index); - ++it->it_index; + Py_ssize_t index = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index); + if (index < PyTuple_GET_SIZE(seq)) { + FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index + 1); + item = PyTuple_GET_ITEM(seq, index); return Py_NewRef(item); } +#ifndef Py_GIL_DISABLED it->it_seq = NULL; Py_DECREF(seq); +#endif return NULL; } @@ -1034,8 +1039,15 @@ tupleiter_len(PyObject *self, PyObject *Py_UNUSED(ignored)) { _PyTupleIterObject *it = _PyTupleIterObject_CAST(self); Py_ssize_t len = 0; +#ifdef Py_GIL_DISABLED + Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index); + Py_ssize_t seq_len = PyTuple_GET_SIZE(it->it_seq); + if (idx < seq_len) + len = seq_len - idx; +#else if (it->it_seq) len = PyTuple_GET_SIZE(it->it_seq) - it->it_index; +#endif return PyLong_FromSsize_t(len); } @@ -1051,10 +1063,15 @@ tupleiter_reduce(PyObject *self, PyObject *Py_UNUSED(ignored)) * see issue #101765 */ _PyTupleIterObject *it = _PyTupleIterObject_CAST(self); +#ifdef Py_GIL_DISABLED + Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index); + if (idx < PyTuple_GET_SIZE(it->it_seq)) + return Py_BuildValue("N(O)n", iter, it->it_seq, idx); +#else if (it->it_seq) return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index); - else - return Py_BuildValue("N(())", iter); +#endif + return Py_BuildValue("N(())", iter); } static PyObject * @@ -1069,7 +1086,7 @@ tupleiter_setstate(PyObject *self, PyObject *state) index = 0; else if (index > PyTuple_GET_SIZE(it->it_seq)) index = PyTuple_GET_SIZE(it->it_seq); /* exhausted iterator */ - it->it_index = index; + FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index); } Py_RETURN_NONE; } diff --git a/Tools/tsan/suppressions_free_threading.txt b/Tools/tsan/suppressions_free_threading.txt index e4cf2a76db35..1802473fea3f 100644 --- a/Tools/tsan/suppressions_free_threading.txt +++ b/Tools/tsan/suppressions_free_threading.txt @@ -18,12 +18,10 @@ race:set_allocator_unlocked # https://gist.github.com/swtaarrs/08dfe7883b4c975c31ecb39388987a67 race:free_threadstate - # These warnings trigger directly in a CPython function. race_top:assign_version_tag race_top:_multiprocessing_SemLock_acquire_impl -race_top:list_get_item_ref race_top:_Py_slot_tp_getattr_hook race_top:add_threadstate race_top:dump_traceback @@ -54,3 +52,12 @@ race_top:update_one_slot # https://gist.github.com/mpage/6962e8870606cfc960e159b407a0cb40 thread:pthread_create + +# Range iteration is not thread-safe yet (issue #129068) +race_top:rangeiter_next + +# List resizing happens through different paths ending in memcpy or memmove +# (for efficiency), which will probably need to rewritten as explicit loops +# of ptr-sized copies to be thread-safe. (Issue #129069) +race:list_ass_slice_lock_held +race:list_inplace_repeat_lock_held