cpython/Lib/test/test_opcache.py

1855 lines
54 KiB
Python

import copy
import pickle
import dis
import threading
import types
import unittest
from test.support import (threading_helper, check_impl_detail,
requires_specialization, requires_specialization_ft,
cpython_only, requires_jit_disabled, reset_code)
from test.support.import_helper import import_module
# Skip this module on other interpreters, it is cpython specific:
if check_impl_detail(cpython=False):
raise unittest.SkipTest('implementation detail specific to cpython')
_testinternalcapi = import_module("_testinternalcapi")
def have_dict_key_versions():
# max version value that can be stored in the load global cache. This is
# determined by the type of module_keys_version and builtin_keys_version
# in _PyLoadGlobalCache, uint16_t.
max_version = 1<<16
# use a wide safety margin (use only half of what's available)
limit = max_version // 2
return _testinternalcapi.get_next_dict_keys_version() < limit
class TestBase(unittest.TestCase):
def assert_specialized(self, f, opname):
instructions = dis.get_instructions(f, adaptive=True)
opnames = {instruction.opname for instruction in instructions}
self.assertIn(opname, opnames)
def assert_no_opcode(self, f, opname):
instructions = dis.get_instructions(f, adaptive=True)
opnames = {instruction.opname for instruction in instructions}
self.assertNotIn(opname, opnames)
class TestLoadSuperAttrCache(unittest.TestCase):
def test_descriptor_not_double_executed_on_spec_fail(self):
calls = []
class Descriptor:
def __get__(self, instance, owner):
calls.append((instance, owner))
return lambda: 1
class C:
d = Descriptor()
class D(C):
def f(self):
return super().d()
d = D()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD - 1):
self.assertEqual(d.f(), 1) # warmup
calls.clear()
self.assertEqual(d.f(), 1) # try to specialize
self.assertEqual(calls, [(d, D)])
class TestLoadAttrCache(unittest.TestCase):
def test_descriptor_added_after_optimization(self):
class Descriptor:
pass
class C:
def __init__(self):
self.x = 1
x = Descriptor()
def f(o):
return o.x
o = C()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
assert f(o) == 1
Descriptor.__get__ = lambda self, instance, value: 2
Descriptor.__set__ = lambda *args: None
self.assertEqual(f(o), 2)
def test_metaclass_descriptor_added_after_optimization(self):
class Descriptor:
pass
class Metaclass(type):
attribute = Descriptor()
class Class(metaclass=Metaclass):
attribute = True
def __get__(self, instance, owner):
return False
def __set__(self, instance, value):
return None
def f():
return Class.attribute
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
Descriptor.__get__ = __get__
Descriptor.__set__ = __set__
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
def test_metaclass_descriptor_shadows_class_attribute(self):
class Metaclass(type):
@property
def attribute(self):
return True
class Class(metaclass=Metaclass):
attribute = False
def f():
return Class.attribute
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
def test_metaclass_set_descriptor_after_optimization(self):
class Metaclass(type):
pass
class Class(metaclass=Metaclass):
attribute = True
@property
def attribute(self):
return False
def f():
return Class.attribute
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
Metaclass.attribute = attribute
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
def test_metaclass_del_descriptor_after_optimization(self):
class Metaclass(type):
@property
def attribute(self):
return True
class Class(metaclass=Metaclass):
attribute = False
def f():
return Class.attribute
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
del Metaclass.attribute
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
def test_type_descriptor_shadows_attribute_method(self):
class Class:
mro = None
def f():
return Class.mro
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertIsNone(f())
def test_type_descriptor_shadows_attribute_member(self):
class Class:
__base__ = None
def f():
return Class.__base__
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertIs(f(), object)
def test_type_descriptor_shadows_attribute_getset(self):
class Class:
__name__ = "Spam"
def f():
return Class.__name__
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertEqual(f(), "Class")
def test_metaclass_getattribute(self):
class Metaclass(type):
def __getattribute__(self, name):
return True
class Class(metaclass=Metaclass):
attribute = False
def f():
return Class.attribute
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
def test_metaclass_swap(self):
class OldMetaclass(type):
@property
def attribute(self):
return True
class NewMetaclass(type):
@property
def attribute(self):
return False
class Class(metaclass=OldMetaclass):
pass
def f():
return Class.attribute
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
Class.__class__ = NewMetaclass
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
def test_load_shadowing_slot_should_raise_type_error(self):
class Class:
__slots__ = ("slot",)
class Sneaky:
__slots__ = ("shadowed",)
shadowing = Class.slot
def f(o):
o.shadowing
o = Sneaky()
o.shadowed = 42
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
with self.assertRaises(TypeError):
f(o)
def test_store_shadowing_slot_should_raise_type_error(self):
class Class:
__slots__ = ("slot",)
class Sneaky:
__slots__ = ("shadowed",)
shadowing = Class.slot
def f(o):
o.shadowing = 42
o = Sneaky()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
with self.assertRaises(TypeError):
f(o)
def test_load_borrowed_slot_should_not_crash(self):
class Class:
__slots__ = ("slot",)
class Sneaky:
borrowed = Class.slot
def f(o):
o.borrowed
o = Sneaky()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
with self.assertRaises(TypeError):
f(o)
def test_store_borrowed_slot_should_not_crash(self):
class Class:
__slots__ = ("slot",)
class Sneaky:
borrowed = Class.slot
def f(o):
o.borrowed = 42
o = Sneaky()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
with self.assertRaises(TypeError):
f(o)
class TestLoadMethodCache(unittest.TestCase):
def test_descriptor_added_after_optimization(self):
class Descriptor:
pass
class Class:
attribute = Descriptor()
def __get__(self, instance, owner):
return lambda: False
def __set__(self, instance, value):
return None
def attribute():
return True
instance = Class()
instance.attribute = attribute
def f():
return instance.attribute()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
Descriptor.__get__ = __get__
Descriptor.__set__ = __set__
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
def test_metaclass_descriptor_added_after_optimization(self):
class Descriptor:
pass
class Metaclass(type):
attribute = Descriptor()
class Class(metaclass=Metaclass):
def attribute():
return True
def __get__(self, instance, owner):
return lambda: False
def __set__(self, instance, value):
return None
def f():
return Class.attribute()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
Descriptor.__get__ = __get__
Descriptor.__set__ = __set__
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
def test_metaclass_descriptor_shadows_class_attribute(self):
class Metaclass(type):
@property
def attribute(self):
return lambda: True
class Class(metaclass=Metaclass):
def attribute():
return False
def f():
return Class.attribute()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
def test_metaclass_set_descriptor_after_optimization(self):
class Metaclass(type):
pass
class Class(metaclass=Metaclass):
def attribute():
return True
@property
def attribute(self):
return lambda: False
def f():
return Class.attribute()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
Metaclass.attribute = attribute
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
def test_metaclass_del_descriptor_after_optimization(self):
class Metaclass(type):
@property
def attribute(self):
return lambda: True
class Class(metaclass=Metaclass):
def attribute():
return False
def f():
return Class.attribute()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
del Metaclass.attribute
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
def test_type_descriptor_shadows_attribute_method(self):
class Class:
def mro():
return ["Spam", "eggs"]
def f():
return Class.mro()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertEqual(f(), ["Spam", "eggs"])
def test_type_descriptor_shadows_attribute_member(self):
class Class:
def __base__():
return "Spam"
def f():
return Class.__base__()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertNotEqual(f(), "Spam")
def test_metaclass_getattribute(self):
class Metaclass(type):
def __getattribute__(self, name):
return lambda: True
class Class(metaclass=Metaclass):
def attribute():
return False
def f():
return Class.attribute()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
def test_metaclass_swap(self):
class OldMetaclass(type):
@property
def attribute(self):
return lambda: True
class NewMetaclass(type):
@property
def attribute(self):
return lambda: False
class Class(metaclass=OldMetaclass):
pass
def f():
return Class.attribute()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertTrue(f())
Class.__class__ = NewMetaclass
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertFalse(f())
class InitTakesArg:
def __init__(self, arg):
self.arg = arg
class TestCallCache(TestBase):
def test_too_many_defaults_0(self):
def f():
pass
f.__defaults__ = (None,)
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
f()
def test_too_many_defaults_1(self):
def f(x):
pass
f.__defaults__ = (None, None)
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
f(None)
f()
def test_too_many_defaults_2(self):
def f(x, y):
pass
f.__defaults__ = (None, None, None)
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
f(None, None)
f(None)
f()
@requires_jit_disabled
@requires_specialization_ft
def test_assign_init_code(self):
class MyClass:
def __init__(self):
pass
def instantiate():
return MyClass()
# Trigger specialization
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
instantiate()
self.assert_specialized(instantiate, "CALL_ALLOC_AND_ENTER_INIT")
def count_args(self, *args):
self.num_args = len(args)
# Set MyClass.__init__.__code__ to a code object that uses different
# args
MyClass.__init__.__code__ = count_args.__code__
instantiate()
@requires_jit_disabled
@requires_specialization_ft
def test_push_init_frame_fails(self):
def instantiate():
return InitTakesArg()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
with self.assertRaises(TypeError):
instantiate()
self.assert_specialized(instantiate, "CALL_ALLOC_AND_ENTER_INIT")
with self.assertRaises(TypeError):
instantiate()
def test_recursion_check_for_general_calls(self):
def test(default=None):
return test()
with self.assertRaises(RecursionError):
test()
def make_deferred_ref_count_obj():
"""Create an object that uses deferred reference counting.
Only objects that use deferred refence counting may be stored in inline
caches in free-threaded builds. This constructs a new class named Foo,
which uses deferred reference counting.
"""
return type("Foo", (object,), {})
@threading_helper.requires_working_threading()
class TestRacesDoNotCrash(TestBase):
# Careful with these. Bigger numbers have a higher chance of catching bugs,
# but you can also burn through a *ton* of type/dict/function versions:
ITEMS = 1000
LOOPS = 4
WRITERS = 2
@requires_jit_disabled
def assert_races_do_not_crash(
self, opname, get_items, read, write, *, check_items=False
):
# This might need a few dozen loops in some cases:
for _ in range(self.LOOPS):
items = get_items()
# Reset:
if check_items:
for item in items:
reset_code(item)
else:
reset_code(read)
# Specialize:
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
read(items)
if check_items:
for item in items:
self.assert_specialized(item, opname)
else:
self.assert_specialized(read, opname)
# Create writers:
writers = []
for _ in range(self.WRITERS):
writer = threading.Thread(target=write, args=[items])
writers.append(writer)
# Run:
for writer in writers:
writer.start()
read(items) # BOOM!
for writer in writers:
writer.join()
@requires_specialization_ft
def test_binary_subscr_getitem(self):
def get_items():
class C:
__getitem__ = lambda self, item: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item[None]
except TypeError:
pass
def write(items):
for item in items:
try:
del item.__getitem__
except AttributeError:
pass
type(item).__getitem__ = lambda self, item: None
opname = "BINARY_OP_SUBSCR_GETITEM"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_binary_subscr_list_int(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = [None]
items.append(item)
return items
def read(items):
for item in items:
try:
item[0]
except IndexError:
pass
def write(items):
for item in items:
item.clear()
item.append(None)
opname = "BINARY_OP_SUBSCR_LIST_INT"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization
def test_for_iter_gen(self):
def get_items():
def g():
yield
yield
items = []
for _ in range(self.ITEMS):
item = g()
items.append(item)
return items
def read(items):
for item in items:
try:
for _ in item:
break
except ValueError:
pass
def write(items):
for item in items:
try:
for _ in item:
break
except ValueError:
pass
opname = "FOR_ITER_GEN"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization
def test_for_iter_list(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = [None]
items.append(item)
return items
def read(items):
for item in items:
for item in item:
break
def write(items):
for item in items:
item.clear()
item.append(None)
opname = "FOR_ITER_LIST"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_class(self):
def get_items():
class C:
a = make_deferred_ref_count_obj()
items = []
for _ in range(self.ITEMS):
item = C
items.append(item)
return items
def read(items):
for item in items:
try:
item.a
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.a
except AttributeError:
pass
item.a = make_deferred_ref_count_obj()
opname = "LOAD_ATTR_CLASS"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_class_with_metaclass_check(self):
def get_items():
class Meta(type):
pass
class C(metaclass=Meta):
a = make_deferred_ref_count_obj()
items = []
for _ in range(self.ITEMS):
item = C
items.append(item)
return items
def read(items):
for item in items:
try:
item.a
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.a
except AttributeError:
pass
item.a = make_deferred_ref_count_obj()
opname = "LOAD_ATTR_CLASS_WITH_METACLASS_CHECK"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_getattribute_overridden(self):
def get_items():
class C:
__getattribute__ = lambda self, name: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.a
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.__getattribute__
except AttributeError:
pass
type(item).__getattribute__ = lambda self, name: None
opname = "LOAD_ATTR_GETATTRIBUTE_OVERRIDDEN"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_instance_value(self):
def get_items():
class C:
pass
items = []
for _ in range(self.ITEMS):
item = C()
item.a = None
items.append(item)
return items
def read(items):
for item in items:
item.a
def write(items):
for item in items:
item.__dict__[None] = None
opname = "LOAD_ATTR_INSTANCE_VALUE"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_method_lazy_dict(self):
def get_items():
class C(Exception):
m = lambda self: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.m()
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.m
except AttributeError:
pass
type(item).m = lambda self: None
opname = "LOAD_ATTR_METHOD_LAZY_DICT"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_method_no_dict(self):
def get_items():
class C:
__slots__ = ()
m = lambda self: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.m()
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.m
except AttributeError:
pass
type(item).m = lambda self: None
opname = "LOAD_ATTR_METHOD_NO_DICT"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_method_with_values(self):
def get_items():
class C:
m = lambda self: None
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.m()
except AttributeError:
pass
def write(items):
for item in items:
try:
del item.m
except AttributeError:
pass
type(item).m = lambda self: None
opname = "LOAD_ATTR_METHOD_WITH_VALUES"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_module(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = types.ModuleType("<item>")
items.append(item)
return items
def read(items):
for item in items:
try:
item.__name__
except AttributeError:
pass
def write(items):
for item in items:
d = item.__dict__.copy()
item.__dict__.clear()
item.__dict__.update(d)
opname = "LOAD_ATTR_MODULE"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_property(self):
def get_items():
class C:
a = property(lambda self: None)
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
try:
item.a
except AttributeError:
pass
def write(items):
for item in items:
try:
del type(item).a
except AttributeError:
pass
type(item).a = property(lambda self: None)
opname = "LOAD_ATTR_PROPERTY"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_slot(self):
def get_items():
class C:
__slots__ = ["a", "b"]
items = []
for i in range(self.ITEMS):
item = C()
item.a = i
item.b = i + self.ITEMS
items.append(item)
return items
def read(items):
for item in items:
item.a
item.b
def write(items):
for item in items:
item.a = 100
item.b = 200
opname = "LOAD_ATTR_SLOT"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_attr_with_hint(self):
def get_items():
class C:
pass
items = []
for _ in range(self.ITEMS):
item = C()
item.a = None
# Resize into a combined unicode dict:
for i in range(_testinternalcapi.SHARED_KEYS_MAX_SIZE - 1):
setattr(item, f"_{i}", None)
items.append(item)
return items
def read(items):
for item in items:
item.a
def write(items):
for item in items:
item.__dict__[None] = None
opname = "LOAD_ATTR_WITH_HINT"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_load_global_module(self):
if not have_dict_key_versions():
raise unittest.SkipTest("Low on dict key versions")
def get_items():
items = []
for _ in range(self.ITEMS):
item = eval("lambda: x", {"x": None})
items.append(item)
return items
def read(items):
for item in items:
item()
def write(items):
for item in items:
item.__globals__[None] = None
opname = "LOAD_GLOBAL_MODULE"
self.assert_races_do_not_crash(
opname, get_items, read, write, check_items=True
)
@requires_specialization
def test_store_attr_instance_value(self):
def get_items():
class C:
pass
items = []
for _ in range(self.ITEMS):
item = C()
items.append(item)
return items
def read(items):
for item in items:
item.a = None
def write(items):
for item in items:
item.__dict__[None] = None
opname = "STORE_ATTR_INSTANCE_VALUE"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization
def test_store_attr_with_hint(self):
def get_items():
class C:
pass
items = []
for _ in range(self.ITEMS):
item = C()
# Resize into a combined unicode dict:
for i in range(_testinternalcapi.SHARED_KEYS_MAX_SIZE - 1):
setattr(item, f"_{i}", None)
items.append(item)
return items
def read(items):
for item in items:
item.a = None
def write(items):
for item in items:
item.__dict__[None] = None
opname = "STORE_ATTR_WITH_HINT"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_store_subscr_list_int(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = [None]
items.append(item)
return items
def read(items):
for item in items:
try:
item[0] = None
except IndexError:
pass
def write(items):
for item in items:
item.clear()
item.append(None)
opname = "STORE_SUBSCR_LIST_INT"
self.assert_races_do_not_crash(opname, get_items, read, write)
@requires_specialization_ft
def test_unpack_sequence_list(self):
def get_items():
items = []
for _ in range(self.ITEMS):
item = [None]
items.append(item)
return items
def read(items):
for item in items:
try:
[_] = item
except ValueError:
pass
def write(items):
for item in items:
item.clear()
item.append(None)
opname = "UNPACK_SEQUENCE_LIST"
self.assert_races_do_not_crash(opname, get_items, read, write)
class C:
pass
@requires_specialization
class TestInstanceDict(unittest.TestCase):
def setUp(self):
c = C()
c.a, c.b, c.c = 0,0,0
def test_values_on_instance(self):
c = C()
c.a = 1
C().b = 2
c.c = 3
self.assertEqual(
_testinternalcapi.get_object_dict_values(c),
(1, '<NULL>', 3)
)
def test_dict_materialization(self):
c = C()
c.a = 1
c.b = 2
c.__dict__
self.assertEqual(c.__dict__, {"a":1, "b": 2})
def test_dict_dematerialization(self):
c = C()
c.a = 1
c.b = 2
c.__dict__
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
c.a
self.assertEqual(
_testinternalcapi.get_object_dict_values(c),
(1, 2, '<NULL>')
)
def test_dict_dematerialization_multiple_refs(self):
c = C()
c.a = 1
c.b = 2
d = c.__dict__
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
c.a
self.assertIs(c.__dict__, d)
def test_dict_dematerialization_copy(self):
c = C()
c.a = 1
c.b = 2
c2 = copy.copy(c)
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
c.a
c2.a
self.assertEqual(
_testinternalcapi.get_object_dict_values(c),
(1, 2, '<NULL>')
)
self.assertEqual(
_testinternalcapi.get_object_dict_values(c2),
(1, 2, '<NULL>')
)
c3 = copy.deepcopy(c)
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
c.a
c3.a
self.assertEqual(
_testinternalcapi.get_object_dict_values(c),
(1, 2, '<NULL>')
)
#NOTE -- c3.__dict__ does not de-materialize
def test_dict_dematerialization_pickle(self):
c = C()
c.a = 1
c.b = 2
c2 = pickle.loads(pickle.dumps(c))
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
c.a
c2.a
self.assertEqual(
_testinternalcapi.get_object_dict_values(c),
(1, 2, '<NULL>')
)
self.assertEqual(
_testinternalcapi.get_object_dict_values(c2),
(1, 2, '<NULL>')
)
def test_dict_dematerialization_subclass(self):
class D(dict): pass
c = C()
c.a = 1
c.b = 2
c.__dict__ = D(c.__dict__)
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
c.a
self.assertIs(
_testinternalcapi.get_object_dict_values(c),
None
)
self.assertEqual(
c.__dict__,
{'a':1, 'b':2}
)
def test_125868(self):
def make_special_dict():
"""Create a dictionary an object with a this table:
index | key | value
----- | --- | -----
0 | 'b' | 'value'
1 | 'b' | NULL
"""
class A:
pass
a = A()
a.a = 1
a.b = 2
d = a.__dict__.copy()
del d['a']
del d['b']
d['b'] = "value"
return d
class NoInlineAorB:
pass
for i in range(ord('c'), ord('z')):
setattr(NoInlineAorB(), chr(i), i)
c = NoInlineAorB()
c.a = 0
c.b = 1
self.assertFalse(_testinternalcapi.has_inline_values(c))
def f(o, n):
for i in range(n):
o.b = i
# Prime f to store to dict slot 1
f(c, _testinternalcapi.SPECIALIZATION_THRESHOLD)
test_obj = NoInlineAorB()
test_obj.__dict__ = make_special_dict()
self.assertEqual(test_obj.b, "value")
#This should set x.b = 0
f(test_obj, 1)
self.assertEqual(test_obj.b, 0)
class TestSpecializer(TestBase):
@cpython_only
@requires_specialization_ft
def test_binary_op(self):
def binary_op_add_int():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = 1, 2
c = a + b
self.assertEqual(c, 3)
binary_op_add_int()
self.assert_specialized(binary_op_add_int, "BINARY_OP_ADD_INT")
self.assert_no_opcode(binary_op_add_int, "BINARY_OP")
def binary_op_add_unicode():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = "foo", "bar"
c = a + b
self.assertEqual(c, "foobar")
binary_op_add_unicode()
self.assert_specialized(binary_op_add_unicode, "BINARY_OP_ADD_UNICODE")
self.assert_no_opcode(binary_op_add_unicode, "BINARY_OP")
def binary_op_add_extend():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = 6, 3.0
c = a + b
self.assertEqual(c, 9.0)
c = b + a
self.assertEqual(c, 9.0)
c = a - b
self.assertEqual(c, 3.0)
c = b - a
self.assertEqual(c, -3.0)
c = a * b
self.assertEqual(c, 18.0)
c = b * a
self.assertEqual(c, 18.0)
c = a / b
self.assertEqual(c, 2.0)
c = b / a
self.assertEqual(c, 0.5)
binary_op_add_extend()
self.assert_specialized(binary_op_add_extend, "BINARY_OP_EXTEND")
self.assert_no_opcode(binary_op_add_extend, "BINARY_OP")
def binary_op_zero_division():
def compactlong_lhs(arg):
42 / arg
def float_lhs(arg):
42.0 / arg
with self.assertRaises(ZeroDivisionError):
compactlong_lhs(0)
with self.assertRaises(ZeroDivisionError):
compactlong_lhs(0.0)
with self.assertRaises(ZeroDivisionError):
float_lhs(0.0)
with self.assertRaises(ZeroDivisionError):
float_lhs(0)
self.assert_no_opcode(compactlong_lhs, "BINARY_OP_EXTEND")
self.assert_no_opcode(float_lhs, "BINARY_OP_EXTEND")
binary_op_zero_division()
def binary_op_nan():
def compactlong_lhs(arg):
return (
42 + arg,
42 - arg,
42 * arg,
42 / arg,
)
def compactlong_rhs(arg):
return (
arg + 42,
arg - 42,
arg * 2,
arg / 42,
)
nan = float('nan')
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertEqual(compactlong_lhs(1.0), (43.0, 41.0, 42.0, 42.0))
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertTrue(all(filter(lambda x: x is nan, compactlong_lhs(nan))))
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertEqual(compactlong_rhs(42.0), (84.0, 0.0, 84.0, 1.0))
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
self.assertTrue(all(filter(lambda x: x is nan, compactlong_rhs(nan))))
self.assert_no_opcode(compactlong_lhs, "BINARY_OP_EXTEND")
self.assert_no_opcode(compactlong_rhs, "BINARY_OP_EXTEND")
binary_op_nan()
def binary_op_bitwise_extend():
for _ in range(100):
a, b = 2, 7
x = a | b
self.assertEqual(x, 7)
y = a & b
self.assertEqual(y, 2)
z = a ^ b
self.assertEqual(z, 5)
a, b = 3, 9
a |= b
self.assertEqual(a, 11)
a, b = 11, 9
a &= b
self.assertEqual(a, 9)
a, b = 3, 9
a ^= b
self.assertEqual(a, 10)
binary_op_bitwise_extend()
self.assert_specialized(binary_op_bitwise_extend, "BINARY_OP_EXTEND")
self.assert_no_opcode(binary_op_bitwise_extend, "BINARY_OP")
@cpython_only
@requires_specialization_ft
def test_load_super_attr(self):
"""Ensure that LOAD_SUPER_ATTR is specialized as expected."""
class A:
def __init__(self):
meth = super().__init__
super().__init__()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
A()
self.assert_specialized(A.__init__, "LOAD_SUPER_ATTR_ATTR")
self.assert_specialized(A.__init__, "LOAD_SUPER_ATTR_METHOD")
self.assert_no_opcode(A.__init__, "LOAD_SUPER_ATTR")
# Temporarily replace super() with something else.
real_super = super
def fake_super():
def init(self):
pass
return init
# Force unspecialize
globals()['super'] = fake_super
try:
# Should be unspecialized after enough calls.
for _ in range(_testinternalcapi.SPECIALIZATION_COOLDOWN):
A()
finally:
globals()['super'] = real_super
# Ensure the specialized instructions are not present
self.assert_no_opcode(A.__init__, "LOAD_SUPER_ATTR_ATTR")
self.assert_no_opcode(A.__init__, "LOAD_SUPER_ATTR_METHOD")
@cpython_only
@requires_specialization_ft
def test_contain_op(self):
def contains_op_dict():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = 1, {1: 2, 2: 5}
self.assertTrue(a in b)
self.assertFalse(3 in b)
contains_op_dict()
self.assert_specialized(contains_op_dict, "CONTAINS_OP_DICT")
self.assert_no_opcode(contains_op_dict, "CONTAINS_OP")
def contains_op_set():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = 1, {1, 2}
self.assertTrue(a in b)
self.assertFalse(3 in b)
contains_op_set()
self.assert_specialized(contains_op_set, "CONTAINS_OP_SET")
self.assert_no_opcode(contains_op_set, "CONTAINS_OP")
@cpython_only
@requires_specialization_ft
def test_send_with(self):
def run_async(coro):
while True:
try:
coro.send(None)
except StopIteration:
break
class CM:
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
pass
async def send_with():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
async with CM():
x = 1
run_async(send_with())
# Note there are still unspecialized "SEND" opcodes in the
# cleanup paths of the 'with' statement.
self.assert_specialized(send_with, "SEND_GEN")
@cpython_only
@requires_specialization_ft
def test_send_yield_from(self):
def g():
yield None
def send_yield_from():
yield from g()
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
list(send_yield_from())
self.assert_specialized(send_yield_from, "SEND_GEN")
self.assert_no_opcode(send_yield_from, "SEND")
@cpython_only
@requires_specialization_ft
def test_store_attr_slot(self):
class C:
__slots__ = ['x']
def set_slot(n):
c = C()
for i in range(n):
c.x = i
set_slot(_testinternalcapi.SPECIALIZATION_THRESHOLD)
self.assert_specialized(set_slot, "STORE_ATTR_SLOT")
self.assert_no_opcode(set_slot, "STORE_ATTR")
# Adding a property for 'x' should unspecialize it.
C.x = property(lambda self: None, lambda self, x: None)
set_slot(_testinternalcapi.SPECIALIZATION_COOLDOWN)
self.assert_no_opcode(set_slot, "STORE_ATTR_SLOT")
@cpython_only
@requires_specialization_ft
def test_store_attr_instance_value(self):
class C:
pass
@reset_code
def set_value(n):
c = C()
for i in range(n):
c.x = i
set_value(_testinternalcapi.SPECIALIZATION_THRESHOLD)
self.assert_specialized(set_value, "STORE_ATTR_INSTANCE_VALUE")
self.assert_no_opcode(set_value, "STORE_ATTR")
# Adding a property for 'x' should unspecialize it.
C.x = property(lambda self: None, lambda self, x: None)
set_value(_testinternalcapi.SPECIALIZATION_COOLDOWN)
self.assert_no_opcode(set_value, "STORE_ATTR_INSTANCE_VALUE")
@cpython_only
@requires_specialization_ft
def test_store_attr_with_hint(self):
class C:
pass
c = C()
for i in range(_testinternalcapi.SHARED_KEYS_MAX_SIZE - 1):
setattr(c, f"_{i}", None)
@reset_code
def set_value(n):
for i in range(n):
c.x = i
set_value(_testinternalcapi.SPECIALIZATION_THRESHOLD)
self.assert_specialized(set_value, "STORE_ATTR_WITH_HINT")
self.assert_no_opcode(set_value, "STORE_ATTR")
# Adding a property for 'x' should unspecialize it.
C.x = property(lambda self: None, lambda self, x: None)
set_value(_testinternalcapi.SPECIALIZATION_COOLDOWN)
self.assert_no_opcode(set_value, "STORE_ATTR_WITH_HINT")
@cpython_only
@requires_specialization_ft
def test_to_bool(self):
def to_bool_bool():
true_cnt, false_cnt = 0, 0
elems = [e % 2 == 0 for e in range(_testinternalcapi.SPECIALIZATION_THRESHOLD)]
for e in elems:
if e:
true_cnt += 1
else:
false_cnt += 1
d, m = divmod(_testinternalcapi.SPECIALIZATION_THRESHOLD, 2)
self.assertEqual(true_cnt, d + m)
self.assertEqual(false_cnt, d)
to_bool_bool()
self.assert_specialized(to_bool_bool, "TO_BOOL_BOOL")
self.assert_no_opcode(to_bool_bool, "TO_BOOL")
def to_bool_int():
count = 0
for i in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
if i:
count += 1
else:
count -= 1
self.assertEqual(count, _testinternalcapi.SPECIALIZATION_THRESHOLD - 2)
to_bool_int()
self.assert_specialized(to_bool_int, "TO_BOOL_INT")
self.assert_no_opcode(to_bool_int, "TO_BOOL")
def to_bool_list():
count = 0
elems = list(range(_testinternalcapi.SPECIALIZATION_THRESHOLD))
while elems:
count += elems.pop()
self.assertEqual(elems, [])
self.assertEqual(count, sum(range(_testinternalcapi.SPECIALIZATION_THRESHOLD)))
to_bool_list()
self.assert_specialized(to_bool_list, "TO_BOOL_LIST")
self.assert_no_opcode(to_bool_list, "TO_BOOL")
def to_bool_none():
count = 0
elems = [None] * _testinternalcapi.SPECIALIZATION_THRESHOLD
for e in elems:
if not e:
count += 1
self.assertEqual(count, _testinternalcapi.SPECIALIZATION_THRESHOLD)
to_bool_none()
self.assert_specialized(to_bool_none, "TO_BOOL_NONE")
self.assert_no_opcode(to_bool_none, "TO_BOOL")
def to_bool_str():
count = 0
elems = [""] + ["foo"] * (_testinternalcapi.SPECIALIZATION_THRESHOLD - 1)
for e in elems:
if e:
count += 1
self.assertEqual(count, _testinternalcapi.SPECIALIZATION_THRESHOLD - 1)
to_bool_str()
self.assert_specialized(to_bool_str, "TO_BOOL_STR")
self.assert_no_opcode(to_bool_str, "TO_BOOL")
@cpython_only
@requires_specialization_ft
def test_unpack_sequence(self):
def unpack_sequence_two_tuple():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
t = 1, 2
a, b = t
self.assertEqual(a, 1)
self.assertEqual(b, 2)
unpack_sequence_two_tuple()
self.assert_specialized(unpack_sequence_two_tuple,
"UNPACK_SEQUENCE_TWO_TUPLE")
self.assert_no_opcode(unpack_sequence_two_tuple, "UNPACK_SEQUENCE")
def unpack_sequence_tuple():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b, c, d = 1, 2, 3, 4
self.assertEqual(a, 1)
self.assertEqual(b, 2)
self.assertEqual(c, 3)
self.assertEqual(d, 4)
unpack_sequence_tuple()
self.assert_specialized(unpack_sequence_tuple, "UNPACK_SEQUENCE_TUPLE")
self.assert_no_opcode(unpack_sequence_tuple, "UNPACK_SEQUENCE")
def unpack_sequence_list():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = [1, 2]
self.assertEqual(a, 1)
self.assertEqual(b, 2)
unpack_sequence_list()
self.assert_specialized(unpack_sequence_list, "UNPACK_SEQUENCE_LIST")
self.assert_no_opcode(unpack_sequence_list, "UNPACK_SEQUENCE")
@cpython_only
@requires_specialization_ft
def test_binary_subscr(self):
def binary_subscr_list_int():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a = [1, 2, 3]
for idx, expected in enumerate(a):
self.assertEqual(a[idx], expected)
binary_subscr_list_int()
self.assert_specialized(binary_subscr_list_int,
"BINARY_OP_SUBSCR_LIST_INT")
self.assert_no_opcode(binary_subscr_list_int, "BINARY_OP")
def binary_subscr_tuple_int():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a = (1, 2, 3)
for idx, expected in enumerate(a):
self.assertEqual(a[idx], expected)
binary_subscr_tuple_int()
self.assert_specialized(binary_subscr_tuple_int,
"BINARY_OP_SUBSCR_TUPLE_INT")
self.assert_no_opcode(binary_subscr_tuple_int, "BINARY_OP")
def binary_subscr_dict():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a = {1: 2, 2: 3}
self.assertEqual(a[1], 2)
self.assertEqual(a[2], 3)
binary_subscr_dict()
self.assert_specialized(binary_subscr_dict, "BINARY_OP_SUBSCR_DICT")
self.assert_no_opcode(binary_subscr_dict, "BINARY_OP")
def binary_subscr_str_int():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a = "foobar"
for idx, expected in enumerate(a):
self.assertEqual(a[idx], expected)
binary_subscr_str_int()
self.assert_specialized(binary_subscr_str_int, "BINARY_OP_SUBSCR_STR_INT")
self.assert_no_opcode(binary_subscr_str_int, "BINARY_OP")
def binary_subscr_getitems():
class C:
def __init__(self, val):
self.val = val
def __getitem__(self, item):
return self.val
items = [C(i) for i in range(_testinternalcapi.SPECIALIZATION_THRESHOLD)]
for i in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
self.assertEqual(items[i][i], i)
binary_subscr_getitems()
self.assert_specialized(binary_subscr_getitems, "BINARY_OP_SUBSCR_GETITEM")
self.assert_no_opcode(binary_subscr_getitems, "BINARY_OP")
@cpython_only
@requires_specialization_ft
def test_compare_op(self):
def compare_op_int():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = 1, 2
c = a == b
self.assertFalse(c)
compare_op_int()
self.assert_specialized(compare_op_int, "COMPARE_OP_INT")
self.assert_no_opcode(compare_op_int, "COMPARE_OP")
def compare_op_float():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = 1.0, 2.0
c = a == b
self.assertFalse(c)
compare_op_float()
self.assert_specialized(compare_op_float, "COMPARE_OP_FLOAT")
self.assert_no_opcode(compare_op_float, "COMPARE_OP")
def compare_op_str():
for _ in range(_testinternalcapi.SPECIALIZATION_THRESHOLD):
a, b = "spam", "ham"
c = a == b
self.assertFalse(c)
compare_op_str()
self.assert_specialized(compare_op_str, "COMPARE_OP_STR")
self.assert_no_opcode(compare_op_str, "COMPARE_OP")
@cpython_only
@requires_specialization_ft
def test_for_iter(self):
L = list(range(10))
def for_iter_list():
for i in L:
self.assertIn(i, L)
for_iter_list()
self.assert_specialized(for_iter_list, "FOR_ITER_LIST")
self.assert_no_opcode(for_iter_list, "FOR_ITER")
t = tuple(range(10))
def for_iter_tuple():
for i in t:
self.assertIn(i, t)
for_iter_tuple()
self.assert_specialized(for_iter_tuple, "FOR_ITER_TUPLE")
self.assert_no_opcode(for_iter_tuple, "FOR_ITER")
r = range(10)
def for_iter_range():
for i in r:
self.assertIn(i, r)
for_iter_range()
self.assert_specialized(for_iter_range, "FOR_ITER_RANGE")
self.assert_no_opcode(for_iter_range, "FOR_ITER")
def for_iter_generator():
for i in (i for i in range(10)):
i + 1
for_iter_generator()
self.assert_specialized(for_iter_generator, "FOR_ITER_GEN")
self.assert_no_opcode(for_iter_generator, "FOR_ITER")
if __name__ == "__main__":
unittest.main()