gh-109700: Improve stress tests for interpreter creation (GH-109946)

* Ensure that destructors are called in the test that created interpreters, not after finishing it.
* Try to create/run interpreters in threads simultaneously.
* Mark tests that requires over 6GB of memory with bigmemtest.
This commit is contained in:
Serhiy Storchaka 2025-05-04 21:53:24 +03:00 committed by GitHub
parent 95d2a81ba8
commit 61b50a98b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 21 additions and 9 deletions

View File

@ -21,21 +21,29 @@ def test_create_many_sequential(self):
for _ in range(100):
interp = interpreters.create()
alive.append(interp)
del alive
support.gc_collect()
@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_create_many_threaded(self):
@support.bigmemtest(size=200, memuse=32*2**20, dry_run=False)
def test_create_many_threaded(self, size):
alive = []
start = threading.Event()
def task():
# try to create all interpreters simultaneously
if not start.wait(support.SHORT_TIMEOUT):
raise TimeoutError
interp = interpreters.create()
alive.append(interp)
threads = (threading.Thread(target=task) for _ in range(200))
threads = [threading.Thread(target=task) for _ in range(size)]
with threading_helper.start_threads(threads):
pass
start.set()
del alive
support.gc_collect()
@support.requires_resource('cpu')
@threading_helper.requires_working_threading()
def test_many_threads_running_interp_in_other_interp(self):
@support.bigmemtest(size=200, memuse=34*2**20, dry_run=False)
def test_many_threads_running_interp_in_other_interp(self, size):
start = threading.Event()
interp = interpreters.create()
script = f"""if True:
@ -47,6 +55,9 @@ def run():
interp = interpreters.create()
alreadyrunning = (f'{interpreters.InterpreterError}: '
'interpreter already running')
# try to run all interpreters simultaneously
if not start.wait(support.SHORT_TIMEOUT):
raise TimeoutError
success = False
while not success:
try:
@ -58,9 +69,10 @@ def run():
else:
success = True
threads = (threading.Thread(target=run) for _ in range(200))
threads = [threading.Thread(target=run) for _ in range(size)]
with threading_helper.start_threads(threads):
pass
start.set()
support.gc_collect()
if __name__ == '__main__':