cpython/Lib/test/test_crossinterp.py

1143 lines
34 KiB
Python

import contextlib
import importlib
import importlib.util
import itertools
import sys
import types
import unittest
from test.support import import_helper
_testinternalcapi = import_helper.import_module('_testinternalcapi')
_interpreters = import_helper.import_module('_interpreters')
from _interpreters import NotShareableError
from test import _code_definitions as code_defs
from test import _crossinterp_definitions as defs
BUILTIN_TYPES = [o for _, o in __builtins__.items()
if isinstance(o, type)]
EXCEPTION_TYPES = [cls for cls in BUILTIN_TYPES
if issubclass(cls, BaseException)]
OTHER_TYPES = [o for n, o in vars(types).items()
if (isinstance(o, type) and
n not in ('DynamicClassAttribute', '_GeneratorWrapper'))]
DEFS = defs
with open(code_defs.__file__) as infile:
_code_defs_text = infile.read()
with open(DEFS.__file__) as infile:
_defs_text = infile.read()
_defs_text = _defs_text.replace('from ', '# from ')
DEFS_TEXT = f"""
#######################################
# from {code_defs.__file__}
{_code_defs_text}
#######################################
# from {defs.__file__}
{_defs_text}
"""
del infile, _code_defs_text, _defs_text
def load_defs(module=None):
"""Return a new copy of the test._crossinterp_definitions module.
The module's __name__ matches the "module" arg, which is either
a str or a module.
If the "module" arg is a module then the just-loaded defs are also
copied into that module.
Note that the new module is not added to sys.modules.
"""
if module is None:
modname = DEFS.__name__
elif isinstance(module, str):
modname = module
module = None
else:
modname = module.__name__
# Create the new module and populate it.
defs = import_helper.create_module(modname)
defs.__file__ = DEFS.__file__
exec(DEFS_TEXT, defs.__dict__)
# Copy the defs into the module arg, if any.
if module is not None:
for name, value in defs.__dict__.items():
if name.startswith('_'):
continue
assert not hasattr(module, name), (name, getattr(module, name))
setattr(module, name, value)
return defs
@contextlib.contextmanager
def using___main__():
"""Make sure __main__ module exists (and clean up after)."""
modname = '__main__'
if modname not in sys.modules:
with import_helper.isolated_modules():
yield import_helper.add_module(modname)
else:
with import_helper.module_restored(modname) as mod:
yield mod
@contextlib.contextmanager
def temp_module(modname):
"""Create the module and add to sys.modules, then remove it after."""
assert modname not in sys.modules, (modname,)
with import_helper.isolated_modules():
yield import_helper.add_module(modname)
@contextlib.contextmanager
def missing_defs_module(modname, *, prep=False):
assert modname not in sys.modules, (modname,)
if prep:
with import_helper.ready_to_import(modname, DEFS_TEXT):
yield modname
else:
with import_helper.isolated_modules():
yield modname
class _GetXIDataTests(unittest.TestCase):
MODE = None
def get_xidata(self, obj, *, mode=None):
mode = self._resolve_mode(mode)
return _testinternalcapi.get_crossinterp_data(obj, mode)
def get_roundtrip(self, obj, *, mode=None):
mode = self._resolve_mode(mode)
return self._get_roundtrip(obj, mode)
def _get_roundtrip(self, obj, mode):
xid = _testinternalcapi.get_crossinterp_data(obj, mode)
return _testinternalcapi.restore_crossinterp_data(xid)
def assert_roundtrip_identical(self, values, *, mode=None):
mode = self._resolve_mode(mode)
for obj in values:
with self.subTest(obj):
got = self._get_roundtrip(obj, mode)
self.assertIs(got, obj)
def assert_roundtrip_equal(self, values, *, mode=None, expecttype=None):
mode = self._resolve_mode(mode)
for obj in values:
with self.subTest(obj):
got = self._get_roundtrip(obj, mode)
self.assertEqual(got, obj)
self.assertIs(type(got),
type(obj) if expecttype is None else expecttype)
def assert_roundtrip_equal_not_identical(self, values, *,
mode=None, expecttype=None):
mode = self._resolve_mode(mode)
for obj in values:
with self.subTest(obj):
got = self._get_roundtrip(obj, mode)
self.assertIsNot(got, obj)
self.assertIs(type(got),
type(obj) if expecttype is None else expecttype)
self.assertEqual(got, obj)
def assert_roundtrip_not_equal(self, values, *,
mode=None, expecttype=None):
mode = self._resolve_mode(mode)
for obj in values:
with self.subTest(obj):
got = self._get_roundtrip(obj, mode)
self.assertIsNot(got, obj)
self.assertIs(type(got),
type(obj) if expecttype is None else expecttype)
self.assertNotEqual(got, obj)
def assert_not_shareable(self, values, exctype=None, *, mode=None):
mode = self._resolve_mode(mode)
for obj in values:
with self.subTest(obj):
with self.assertRaises(NotShareableError) as cm:
_testinternalcapi.get_crossinterp_data(obj, mode)
if exctype is not None:
self.assertIsInstance(cm.exception.__cause__, exctype)
def _resolve_mode(self, mode):
if mode is None:
mode = self.MODE
assert mode
return mode
class PickleTests(_GetXIDataTests):
MODE = 'pickle'
def test_shareable(self):
self.assert_roundtrip_equal([
# singletons
None,
True,
False,
# bytes
*(i.to_bytes(2, 'little', signed=True)
for i in range(-1, 258)),
# str
'hello world',
'你好世界',
'',
# int
sys.maxsize,
-sys.maxsize - 1,
*range(-1, 258),
# float
0.0,
1.1,
-1.0,
0.12345678,
-0.12345678,
# tuple
(),
(1,),
("hello", "world", ),
(1, True, "hello"),
((1,),),
((1, 2), (3, 4)),
((1, 2), (3, 4), (5, 6)),
])
# not shareable using xidata
self.assert_roundtrip_equal([
# int
sys.maxsize + 1,
-sys.maxsize - 2,
2**1000,
# tuple
(0, 1.0, []),
(0, 1.0, {}),
(0, 1.0, ([],)),
(0, 1.0, ({},)),
])
def test_list(self):
self.assert_roundtrip_equal_not_identical([
[],
[1, 2, 3],
[[1], (2,), {3: 4}],
])
def test_dict(self):
self.assert_roundtrip_equal_not_identical([
{},
{1: 7, 2: 8, 3: 9},
{1: [1], 2: (2,), 3: {3: 4}},
])
def test_set(self):
self.assert_roundtrip_equal_not_identical([
set(),
{1, 2, 3},
{frozenset({1}), (2,)},
])
# classes
def assert_class_defs_same(self, defs):
# Unpickle relative to the unchanged original module.
self.assert_roundtrip_identical(defs.TOP_CLASSES)
instances = []
for cls, args in defs.TOP_CLASSES.items():
if cls in defs.CLASSES_WITHOUT_EQUALITY:
continue
instances.append(cls(*args))
self.assert_roundtrip_equal_not_identical(instances)
# these don't compare equal
instances = []
for cls, args in defs.TOP_CLASSES.items():
if cls not in defs.CLASSES_WITHOUT_EQUALITY:
continue
instances.append(cls(*args))
self.assert_roundtrip_not_equal(instances)
def assert_class_defs_other_pickle(self, defs, mod):
# Pickle relative to a different module than the original.
for cls in defs.TOP_CLASSES:
assert not hasattr(mod, cls.__name__), (cls, getattr(mod, cls.__name__))
self.assert_not_shareable(defs.TOP_CLASSES)
instances = []
for cls, args in defs.TOP_CLASSES.items():
instances.append(cls(*args))
self.assert_not_shareable(instances)
def assert_class_defs_other_unpickle(self, defs, mod, *, fail=False):
# Unpickle relative to a different module than the original.
for cls in defs.TOP_CLASSES:
assert not hasattr(mod, cls.__name__), (cls, getattr(mod, cls.__name__))
instances = []
for cls, args in defs.TOP_CLASSES.items():
with self.subTest(cls):
setattr(mod, cls.__name__, cls)
xid = self.get_xidata(cls)
inst = cls(*args)
instxid = self.get_xidata(inst)
instances.append(
(cls, xid, inst, instxid))
for cls, xid, inst, instxid in instances:
with self.subTest(cls):
delattr(mod, cls.__name__)
if fail:
with self.assertRaises(NotShareableError):
_testinternalcapi.restore_crossinterp_data(xid)
continue
got = _testinternalcapi.restore_crossinterp_data(xid)
self.assertIsNot(got, cls)
self.assertNotEqual(got, cls)
gotcls = got
got = _testinternalcapi.restore_crossinterp_data(instxid)
self.assertIsNot(got, inst)
self.assertIs(type(got), gotcls)
if cls in defs.CLASSES_WITHOUT_EQUALITY:
self.assertNotEqual(got, inst)
elif cls in defs.BUILTIN_SUBCLASSES:
self.assertEqual(got, inst)
else:
self.assertNotEqual(got, inst)
def assert_class_defs_not_shareable(self, defs):
self.assert_not_shareable(defs.TOP_CLASSES)
instances = []
for cls, args in defs.TOP_CLASSES.items():
instances.append(cls(*args))
self.assert_not_shareable(instances)
def test_user_class_normal(self):
self.assert_class_defs_same(defs)
def test_user_class_in___main__(self):
with using___main__() as mod:
defs = load_defs(mod)
self.assert_class_defs_same(defs)
def test_user_class_not_in___main___with_filename(self):
with using___main__() as mod:
defs = load_defs('__main__')
assert defs.__file__
mod.__file__ = defs.__file__
self.assert_class_defs_not_shareable(defs)
def test_user_class_not_in___main___without_filename(self):
with using___main__() as mod:
defs = load_defs('__main__')
defs.__file__ = None
mod.__file__ = None
self.assert_class_defs_not_shareable(defs)
def test_user_class_not_in___main___unpickle_with_filename(self):
with using___main__() as mod:
defs = load_defs('__main__')
assert defs.__file__
mod.__file__ = defs.__file__
self.assert_class_defs_other_unpickle(defs, mod)
def test_user_class_not_in___main___unpickle_without_filename(self):
with using___main__() as mod:
defs = load_defs('__main__')
defs.__file__ = None
mod.__file__ = None
self.assert_class_defs_other_unpickle(defs, mod, fail=True)
def test_user_class_in_module(self):
with temp_module('__spam__') as mod:
defs = load_defs(mod)
self.assert_class_defs_same(defs)
def test_user_class_not_in_module_with_filename(self):
with temp_module('__spam__') as mod:
defs = load_defs(mod.__name__)
assert defs.__file__
# For now, we only address this case for __main__.
self.assert_class_defs_not_shareable(defs)
def test_user_class_not_in_module_without_filename(self):
with temp_module('__spam__') as mod:
defs = load_defs(mod.__name__)
defs.__file__ = None
self.assert_class_defs_not_shareable(defs)
def test_user_class_module_missing_then_imported(self):
with missing_defs_module('__spam__', prep=True) as modname:
defs = load_defs(modname)
# For now, we only address this case for __main__.
self.assert_class_defs_not_shareable(defs)
def test_user_class_module_missing_not_available(self):
with missing_defs_module('__spam__') as modname:
defs = load_defs(modname)
self.assert_class_defs_not_shareable(defs)
def test_nested_class(self):
eggs = defs.EggsNested()
with self.assertRaises(NotShareableError):
self.get_roundtrip(eggs)
# functions
def assert_func_defs_same(self, defs):
# Unpickle relative to the unchanged original module.
self.assert_roundtrip_identical(defs.TOP_FUNCTIONS)
def assert_func_defs_other_pickle(self, defs, mod):
# Pickle relative to a different module than the original.
for func in defs.TOP_FUNCTIONS:
assert not hasattr(mod, func.__name__), (cls, getattr(mod, func.__name__))
self.assert_not_shareable(defs.TOP_FUNCTIONS)
def assert_func_defs_other_unpickle(self, defs, mod, *, fail=False):
# Unpickle relative to a different module than the original.
for func in defs.TOP_FUNCTIONS:
assert not hasattr(mod, func.__name__), (cls, getattr(mod, func.__name__))
captured = []
for func in defs.TOP_FUNCTIONS:
with self.subTest(func):
setattr(mod, func.__name__, func)
xid = self.get_xidata(func)
captured.append(
(func, xid))
for func, xid in captured:
with self.subTest(func):
delattr(mod, func.__name__)
if fail:
with self.assertRaises(NotShareableError):
_testinternalcapi.restore_crossinterp_data(xid)
continue
got = _testinternalcapi.restore_crossinterp_data(xid)
self.assertIsNot(got, func)
self.assertNotEqual(got, func)
def assert_func_defs_not_shareable(self, defs):
self.assert_not_shareable(defs.TOP_FUNCTIONS)
def test_user_function_normal(self):
# self.assert_roundtrip_equal(defs.TOP_FUNCTIONS)
self.assert_func_defs_same(defs)
def test_user_func_in___main__(self):
with using___main__() as mod:
defs = load_defs(mod)
self.assert_func_defs_same(defs)
def test_user_func_not_in___main___with_filename(self):
with using___main__() as mod:
defs = load_defs('__main__')
assert defs.__file__
mod.__file__ = defs.__file__
self.assert_func_defs_not_shareable(defs)
def test_user_func_not_in___main___without_filename(self):
with using___main__() as mod:
defs = load_defs('__main__')
defs.__file__ = None
mod.__file__ = None
self.assert_func_defs_not_shareable(defs)
def test_user_func_not_in___main___unpickle_with_filename(self):
with using___main__() as mod:
defs = load_defs('__main__')
assert defs.__file__
mod.__file__ = defs.__file__
self.assert_func_defs_other_unpickle(defs, mod)
def test_user_func_not_in___main___unpickle_without_filename(self):
with using___main__() as mod:
defs = load_defs('__main__')
defs.__file__ = None
mod.__file__ = None
self.assert_func_defs_other_unpickle(defs, mod, fail=True)
def test_user_func_in_module(self):
with temp_module('__spam__') as mod:
defs = load_defs(mod)
self.assert_func_defs_same(defs)
def test_user_func_not_in_module_with_filename(self):
with temp_module('__spam__') as mod:
defs = load_defs(mod.__name__)
assert defs.__file__
# For now, we only address this case for __main__.
self.assert_func_defs_not_shareable(defs)
def test_user_func_not_in_module_without_filename(self):
with temp_module('__spam__') as mod:
defs = load_defs(mod.__name__)
defs.__file__ = None
self.assert_func_defs_not_shareable(defs)
def test_user_func_module_missing_then_imported(self):
with missing_defs_module('__spam__', prep=True) as modname:
defs = load_defs(modname)
# For now, we only address this case for __main__.
self.assert_func_defs_not_shareable(defs)
def test_user_func_module_missing_not_available(self):
with missing_defs_module('__spam__') as modname:
defs = load_defs(modname)
self.assert_func_defs_not_shareable(defs)
def test_nested_function(self):
self.assert_not_shareable(defs.NESTED_FUNCTIONS)
# exceptions
def test_user_exception_normal(self):
self.assert_roundtrip_not_equal([
defs.MimimalError('error!'),
])
self.assert_roundtrip_equal_not_identical([
defs.RichError('error!', 42),
])
def test_builtin_exception(self):
msg = 'error!'
try:
raise Exception
except Exception as exc:
caught = exc
special = {
BaseExceptionGroup: (msg, [caught]),
ExceptionGroup: (msg, [caught]),
# UnicodeError: (None, msg, None, None, None),
UnicodeEncodeError: ('utf-8', '', 1, 3, msg),
UnicodeDecodeError: ('utf-8', b'', 1, 3, msg),
UnicodeTranslateError: ('', 1, 3, msg),
}
exceptions = []
for cls in EXCEPTION_TYPES:
args = special.get(cls) or (msg,)
exceptions.append(cls(*args))
self.assert_roundtrip_not_equal(exceptions)
class MarshalTests(_GetXIDataTests):
MODE = 'marshal'
def test_simple_builtin_singletons(self):
self.assert_roundtrip_identical([
True,
False,
None,
Ellipsis,
])
self.assert_not_shareable([
NotImplemented,
])
def test_simple_builtin_objects(self):
self.assert_roundtrip_equal([
# int
*range(-1, 258),
sys.maxsize + 1,
sys.maxsize,
-sys.maxsize - 1,
-sys.maxsize - 2,
2**1000,
# complex
1+2j,
# float
0.0,
1.1,
-1.0,
0.12345678,
-0.12345678,
# bytes
*(i.to_bytes(2, 'little', signed=True)
for i in range(-1, 258)),
b'hello world',
# str
'hello world',
'你好世界',
'',
])
self.assert_not_shareable([
object(),
types.SimpleNamespace(),
])
def test_bytearray(self):
# bytearray is special because it unmarshals to bytes, not bytearray.
self.assert_roundtrip_equal([
bytearray(),
bytearray(b'hello world'),
], expecttype=bytes)
def test_compound_immutable_builtin_objects(self):
self.assert_roundtrip_equal([
# tuple
(),
(1,),
("hello", "world"),
(1, True, "hello"),
# frozenset
frozenset([1, 2, 3]),
])
# nested
self.assert_roundtrip_equal([
# tuple
((1,),),
((1, 2), (3, 4)),
((1, 2), (3, 4), (5, 6)),
# frozenset
frozenset([frozenset([1]), frozenset([2]), frozenset([3])]),
])
def test_compound_mutable_builtin_objects(self):
self.assert_roundtrip_equal([
# list
[],
[1, 2, 3],
# dict
{},
{1: 7, 2: 8, 3: 9},
# set
set(),
{1, 2, 3},
])
# nested
self.assert_roundtrip_equal([
[[1], [2], [3]],
{1: {'a': True}, 2: {'b': False}},
{(1, 2, 3,)},
])
def test_compound_builtin_objects_with_bad_items(self):
bogus = object()
self.assert_not_shareable([
(bogus,),
frozenset([bogus]),
[bogus],
{bogus: True},
{True: bogus},
{bogus},
])
def test_builtin_code(self):
self.assert_roundtrip_equal([
*(f.__code__ for f in defs.FUNCTIONS),
*(f.__code__ for f in defs.FUNCTION_LIKE),
])
def test_builtin_type(self):
shareable = [
StopIteration,
]
types = [
*BUILTIN_TYPES,
*OTHER_TYPES,
]
self.assert_not_shareable(cls for cls in types
if cls not in shareable)
self.assert_roundtrip_identical(cls for cls in types
if cls in shareable)
def test_builtin_function(self):
functions = [
len,
sys.is_finalizing,
sys.exit,
_testinternalcapi.get_crossinterp_data,
]
for func in functions:
assert type(func) is types.BuiltinFunctionType, func
self.assert_not_shareable(functions)
def test_builtin_exception(self):
msg = 'error!'
try:
raise Exception
except Exception as exc:
caught = exc
special = {
BaseExceptionGroup: (msg, [caught]),
ExceptionGroup: (msg, [caught]),
# UnicodeError: (None, msg, None, None, None),
UnicodeEncodeError: ('utf-8', '', 1, 3, msg),
UnicodeDecodeError: ('utf-8', b'', 1, 3, msg),
UnicodeTranslateError: ('', 1, 3, msg),
}
exceptions = []
for cls in EXCEPTION_TYPES:
args = special.get(cls) or (msg,)
exceptions.append(cls(*args))
self.assert_not_shareable(exceptions)
# Note that StopIteration (the type) can be marshalled,
# but its instances cannot.
def test_module(self):
assert type(sys) is types.ModuleType, type(sys)
assert type(defs) is types.ModuleType, type(defs)
assert type(unittest) is types.ModuleType, type(defs)
assert 'emptymod' not in sys.modules
with import_helper.ready_to_import('emptymod', ''):
import emptymod
self.assert_not_shareable([
sys,
defs,
unittest,
emptymod,
])
def test_user_class(self):
self.assert_not_shareable(defs.TOP_CLASSES)
instances = []
for cls, args in defs.TOP_CLASSES.items():
instances.append(cls(*args))
self.assert_not_shareable(instances)
def test_user_function(self):
self.assert_not_shareable(defs.TOP_FUNCTIONS)
def test_user_exception(self):
self.assert_not_shareable([
defs.MimimalError('error!'),
defs.RichError('error!', 42),
])
class CodeTests(_GetXIDataTests):
MODE = 'code'
def test_function_code(self):
self.assert_roundtrip_equal_not_identical([
*(f.__code__ for f in defs.FUNCTIONS),
*(f.__code__ for f in defs.FUNCTION_LIKE),
])
def test_functions(self):
self.assert_not_shareable([
*defs.FUNCTIONS,
*defs.FUNCTION_LIKE,
])
def test_other_objects(self):
self.assert_not_shareable([
None,
True,
False,
Ellipsis,
NotImplemented,
9999,
'spam',
b'spam',
(),
[],
{},
object(),
])
class ShareableFuncTests(_GetXIDataTests):
MODE = 'func'
def test_stateless(self):
self.assert_roundtrip_not_equal([
*defs.STATELESS_FUNCTIONS,
# Generators can be stateless too.
*defs.FUNCTION_LIKE,
])
def test_not_stateless(self):
self.assert_not_shareable([
*(f for f in defs.FUNCTIONS
if f not in defs.STATELESS_FUNCTIONS),
])
def test_other_objects(self):
self.assert_not_shareable([
None,
True,
False,
Ellipsis,
NotImplemented,
9999,
'spam',
b'spam',
(),
[],
{},
object(),
])
class PureShareableScriptTests(_GetXIDataTests):
MODE = 'script-pure'
VALID_SCRIPTS = [
'',
'spam',
'# a comment',
'print("spam")',
'raise Exception("spam")',
"""if True:
do_something()
""",
"""if True:
def spam(x):
return x
class Spam:
def eggs(self):
return 42
x = Spam().eggs()
raise ValueError(spam(x))
""",
]
INVALID_SCRIPTS = [
' pass', # IndentationError
'----', # SyntaxError
"""if True:
def spam():
# no body
spam()
""", # IndentationError
]
def test_valid_str(self):
self.assert_roundtrip_not_equal([
*self.VALID_SCRIPTS,
], expecttype=types.CodeType)
def test_invalid_str(self):
self.assert_not_shareable([
*self.INVALID_SCRIPTS,
])
def test_valid_bytes(self):
self.assert_roundtrip_not_equal([
*(s.encode('utf8') for s in self.VALID_SCRIPTS),
], expecttype=types.CodeType)
def test_invalid_bytes(self):
self.assert_not_shareable([
*(s.encode('utf8') for s in self.INVALID_SCRIPTS),
])
def test_pure_script_code(self):
self.assert_roundtrip_equal_not_identical([
*(f.__code__ for f in defs.PURE_SCRIPT_FUNCTIONS),
])
def test_impure_script_code(self):
self.assert_not_shareable([
*(f.__code__ for f in defs.SCRIPT_FUNCTIONS
if f not in defs.PURE_SCRIPT_FUNCTIONS),
])
def test_other_code(self):
self.assert_not_shareable([
*(f.__code__ for f in defs.FUNCTIONS
if f not in defs.SCRIPT_FUNCTIONS),
*(f.__code__ for f in defs.FUNCTION_LIKE),
])
def test_pure_script_function(self):
self.assert_roundtrip_not_equal([
*defs.PURE_SCRIPT_FUNCTIONS,
], expecttype=types.CodeType)
def test_impure_script_function(self):
self.assert_not_shareable([
*(f for f in defs.SCRIPT_FUNCTIONS
if f not in defs.PURE_SCRIPT_FUNCTIONS),
])
def test_other_function(self):
self.assert_not_shareable([
*(f for f in defs.FUNCTIONS
if f not in defs.SCRIPT_FUNCTIONS),
*defs.FUNCTION_LIKE,
])
def test_other_objects(self):
self.assert_not_shareable([
None,
True,
False,
Ellipsis,
NotImplemented,
(),
[],
{},
object(),
])
class ShareableScriptTests(PureShareableScriptTests):
MODE = 'script'
def test_impure_script_code(self):
self.assert_roundtrip_equal_not_identical([
*(f.__code__ for f in defs.SCRIPT_FUNCTIONS
if f not in defs.PURE_SCRIPT_FUNCTIONS),
])
def test_impure_script_function(self):
self.assert_roundtrip_not_equal([
*(f for f in defs.SCRIPT_FUNCTIONS
if f not in defs.PURE_SCRIPT_FUNCTIONS),
], expecttype=types.CodeType)
class ShareableTypeTests(_GetXIDataTests):
MODE = 'xidata'
def test_singletons(self):
self.assert_roundtrip_identical([
None,
True,
False,
])
self.assert_not_shareable([
Ellipsis,
NotImplemented,
])
def test_types(self):
self.assert_roundtrip_equal([
b'spam',
9999,
])
def test_bytes(self):
values = (i.to_bytes(2, 'little', signed=True)
for i in range(-1, 258))
self.assert_roundtrip_equal(values)
def test_strs(self):
self.assert_roundtrip_equal([
'hello world',
'你好世界',
'',
])
def test_int(self):
bounds = [sys.maxsize, -sys.maxsize - 1]
values = itertools.chain(range(-1, 258), bounds)
self.assert_roundtrip_equal(values)
def test_non_shareable_int(self):
ints = [
sys.maxsize + 1,
-sys.maxsize - 2,
2**1000,
]
self.assert_not_shareable(ints, OverflowError)
def test_float(self):
self.assert_roundtrip_equal([
0.0,
1.1,
-1.0,
0.12345678,
-0.12345678,
])
def test_tuple(self):
self.assert_roundtrip_equal([
(),
(1,),
("hello", "world", ),
(1, True, "hello"),
])
# Test nesting
self.assert_roundtrip_equal([
((1,),),
((1, 2), (3, 4)),
((1, 2), (3, 4), (5, 6)),
])
def test_tuples_containing_non_shareable_types(self):
non_shareables = [
Exception(),
object(),
]
for s in non_shareables:
value = tuple([0, 1.0, s])
with self.subTest(repr(value)):
with self.assertRaises(NotShareableError):
self.get_xidata(value)
# Check nested as well
value = tuple([0, 1., (s,)])
with self.subTest("nested " + repr(value)):
with self.assertRaises(NotShareableError):
self.get_xidata(value)
# The rest are not shareable.
def test_object(self):
self.assert_not_shareable([
object(),
])
def test_code(self):
# types.CodeType
self.assert_not_shareable([
*(f.__code__ for f in defs.FUNCTIONS),
*(f.__code__ for f in defs.FUNCTION_LIKE),
])
def test_function_object(self):
for func in defs.FUNCTIONS:
assert type(func) is types.FunctionType, func
assert type(defs.SpamOkay.okay) is types.FunctionType, func
assert type(lambda: None) is types.LambdaType
self.assert_not_shareable([
*defs.FUNCTIONS,
defs.SpamOkay.okay,
(lambda: None),
])
def test_builtin_function(self):
functions = [
len,
sys.is_finalizing,
sys.exit,
_testinternalcapi.get_crossinterp_data,
]
for func in functions:
assert type(func) is types.BuiltinFunctionType, func
self.assert_not_shareable(functions)
def test_function_like(self):
self.assert_not_shareable(defs.FUNCTION_LIKE)
self.assert_not_shareable(defs.FUNCTION_LIKE_APPLIED)
def test_builtin_wrapper(self):
_wrappers = {
defs.SpamOkay().okay: types.MethodType,
[].append: types.BuiltinMethodType,
dict.__dict__['fromkeys']: types.ClassMethodDescriptorType,
types.FunctionType.__code__: types.GetSetDescriptorType,
types.FunctionType.__globals__: types.MemberDescriptorType,
str.join: types.MethodDescriptorType,
object().__str__: types.MethodWrapperType,
object.__init__: types.WrapperDescriptorType,
}
for obj, expected in _wrappers.items():
assert type(obj) is expected, (obj, expected)
self.assert_not_shareable([
*_wrappers,
staticmethod(defs.SpamOkay.okay),
classmethod(defs.SpamOkay.okay),
property(defs.SpamOkay.okay),
])
def test_module(self):
assert type(sys) is types.ModuleType, type(sys)
assert type(defs) is types.ModuleType, type(defs)
assert type(unittest) is types.ModuleType, type(defs)
assert 'emptymod' not in sys.modules
with import_helper.ready_to_import('emptymod', ''):
import emptymod
self.assert_not_shareable([
sys,
defs,
unittest,
emptymod,
])
def test_class(self):
self.assert_not_shareable(defs.CLASSES)
instances = []
for cls, args in defs.CLASSES.items():
instances.append(cls(*args))
self.assert_not_shareable(instances)
def test_builtin_type(self):
self.assert_not_shareable([
*BUILTIN_TYPES,
*OTHER_TYPES,
])
def test_exception(self):
self.assert_not_shareable([
defs.MimimalError('error!'),
])
def test_builtin_exception(self):
msg = 'error!'
try:
raise Exception
except Exception as exc:
caught = exc
special = {
BaseExceptionGroup: (msg, [caught]),
ExceptionGroup: (msg, [caught]),
# UnicodeError: (None, msg, None, None, None),
UnicodeEncodeError: ('utf-8', '', 1, 3, msg),
UnicodeDecodeError: ('utf-8', b'', 1, 3, msg),
UnicodeTranslateError: ('', 1, 3, msg),
}
exceptions = []
for cls in EXCEPTION_TYPES:
args = special.get(cls) or (msg,)
exceptions.append(cls(*args))
self.assert_not_shareable(exceptions)
def test_builtin_objects(self):
ns = {}
exec("""if True:
try:
raise Exception
except Exception as exc:
TRACEBACK = exc.__traceback__
FRAME = TRACEBACK.tb_frame
""", ns, ns)
self.assert_not_shareable([
types.MappingProxyType({}),
types.SimpleNamespace(),
# types.CellType
types.CellType(),
# types.FrameType
ns['FRAME'],
# types.TracebackType
ns['TRACEBACK'],
])
if __name__ == '__main__':
unittest.main()