connection: Unify initial object refresh handling

Have libvirtobjects advertise a routine specifically for initial setup,
and emit a signal when it's complete. Then dispatch the associated conn
signal on demand as the objects are initialized. This should avoid a
whole class of ordering issues, and is easier to follow IMO.
This commit is contained in:
Cole Robinson 2015-04-10 14:08:25 -04:00
parent 629627b663
commit d0ffd954cd
7 changed files with 147 additions and 105 deletions

View File

@ -171,6 +171,9 @@ class vmmConnection(vmmGObject):
self._backend = virtinst.VirtualConnection(self._uri)
self._closing = False
self._init_object_count = None
self._init_object_event = None
self._network_capable = None
self._storage_capable = None
self._interface_capable = None
@ -877,6 +880,9 @@ class vmmConnection(vmmGObject):
self._backend.close()
self.record = []
if self._init_object_event:
self._init_object_event.clear()
self._objects.cleanup()
self._objects = _ObjectList()
@ -962,20 +968,31 @@ class vmmConnection(vmmGObject):
logging.debug("Connection doesn't support KeepAlive, "
"skipping")
# The initial tick will set up a threading event that will only
# trigger after all the polled libvirt objects are fully initialized.
# That way we only report the connection is open when everything is
# nicely setup for the rest of the app.
self._init_object_event = threading.Event()
self._init_object_count = 0
self.schedule_priority_tick(stats_update=True,
pollvm=True, pollnet=True,
pollpool=True, polliface=True,
pollnodedev=True, force=True, initial_poll=True)
self._init_object_event.wait()
self._init_object_event = None
self._init_object_count = None
def _open_thread(self):
try:
is_active, connectError = self._do_open()
if is_active:
self._populate_initial_state()
else:
self.idle_add(self._change_state, self._STATE_DISCONNECTED)
if is_active:
self.schedule_priority_tick(stats_update=True,
pollvm=True, pollnet=True,
pollpool=True, polliface=True,
pollnodedev=True,
force=True)
self.idle_add(self._change_state, is_active and
self._STATE_ACTIVE or self._STATE_DISCONNECTED)
except Exception, e:
is_active = False
self._schedule_close()
@ -990,22 +1007,23 @@ class vmmConnection(vmmGObject):
# Tick/Update methods #
#######################
def _send_object_signals(self, new_objects, gone_objects,
finish_connecting):
def _gone_object_signals(self, gone_objects):
"""
Responsible for signaling the UI for any updates. All possible UI
updates need to go here to enable threading that doesn't block the
app with long tick operations.
"""
# Connection closed out from under us
if not self._backend.is_open():
return
for obj in gone_objects:
class_name = obj.class_name()
if not self._objects.remove(obj):
logging.debug("Requested removal of %s=%s, but it's "
"not in our object list.", class_name, obj.get_name())
continue
class_name = obj.class_name()
logging.debug("%s=%s removed", class_name, obj.get_name())
if class_name == "domain":
self.emit("vm-removed", obj.get_connkey())
@ -1019,11 +1037,18 @@ class vmmConnection(vmmGObject):
self.emit("nodedev-removed", obj.get_connkey())
obj.cleanup()
for obj in new_objects:
if not self._objects.add(obj):
continue
def _new_object_cb(self, obj):
if not self._backend.is_open():
return
try:
class_name = obj.class_name()
if not self._objects.add(obj):
logging.debug("New %s=%s requested, but it's already tracked.",
class_name, obj.get_name())
return
if class_name != "nodedev":
# Skip nodedev logging since it's noisy and not interesting
logging.debug("%s=%s status=%s added", class_name,
@ -1038,34 +1063,11 @@ class vmmConnection(vmmGObject):
self.emit("interface-added", obj.get_connkey())
elif class_name == "nodedev":
self.emit("nodedev-added", obj.get_connkey())
if finish_connecting:
self._change_state(self._STATE_ACTIVE)
def _refresh_new_objects(self, newlist):
if not newlist:
return
def _refresh_generic():
for obj in newlist:
obj.refresh_xml()
def _refresh_pool():
for pool in newlist:
pool.refresh()
def _refresh_volumes(p):
for vol in p.get_volumes():
vol.refresh_xml()
self._start_thread(_refresh_volumes,
"pool=%s refreshing xml for volumes" % pool.get_name(),
(pool,))
cb = _refresh_generic
if hasattr(newlist[0], "get_volumes"):
cb = _refresh_pool
self._start_thread(cb,
"refreshing xml for new %s" % newlist[0].__class__)
finally:
if self._init_object_event:
self._init_object_count -= 1
if self._init_object_count <= 0:
self._init_object_event.set()
def _update_nets(self, dopoll):
keymap = dict((o.get_connkey(), o) for o in self.list_nets())
@ -1102,60 +1104,71 @@ class vmmConnection(vmmGObject):
return pollhelpers.fetch_vms(self._backend, keymap,
(lambda obj, key: vmmDomain(self, obj, key)))
def _poll(self, pollvm, pollnet, pollpool, polliface, pollnodedev):
def _poll(self, initial_poll,
pollvm, pollnet, pollpool, polliface, pollnodedev):
"""
Helper called from tick() to do necessary polling and return
the relevant object lists
"""
gone_objects = []
new_objects = []
preexisting_objects = []
def _process_objects(polloutput):
gone, new, master = polloutput
if initial_poll:
self._init_object_count += len(new)
gone_objects.extend(gone)
new_objects.extend(new)
preexisting_objects.extend([o for o in master if o not in new])
return new
self._refresh_new_objects(_process_objects(
self._update_nets(pollnet)))
self._refresh_new_objects(_process_objects(
self._update_pools(pollpool)))
self._refresh_new_objects(_process_objects(
self._update_interfaces(polliface)))
self._refresh_new_objects(_process_objects(
self._update_nodedevs(pollnodedev)))
new_vms = _process_objects(self._update_vms(pollvm))
new_nets = _process_objects(self._update_nets(pollnet))
new_pools = _process_objects(self._update_pools(pollpool))
new_ifaces = _process_objects(self._update_interfaces(polliface))
new_nodedevs = _process_objects(self._update_nodedevs(pollnodedev))
# These are refreshing in their __init__ method, because the
# data is wanted immediately
_process_objects(self._update_vms(pollvm))
# Kick off one thread per object type to handle the initial
# XML fetching. Going any more fine grained then this probably
# won't be that useful due to libvirt's locking structure.
#
# Would prefer to start refreshing some objects before all polling
# is complete, but we need init_object_count to be fully accurate
# before we start initializing objects
for newlist in [new_vms, new_nets, new_pools,
new_ifaces, new_nodedevs]:
if not newlist:
continue
return gone_objects, new_objects, preexisting_objects
def cb(lst):
for obj in lst:
obj.connect_once("initialized", self._new_object_cb)
obj.init_libvirt_state()
self._start_thread(cb,
"refreshing xml for new %s" % newlist[0].class_name(),
args=(newlist,))
return gone_objects, preexisting_objects
def _tick(self, stats_update,
pollvm=False, pollnet=False,
pollpool=False, polliface=False,
pollnodedev=False,
force=False):
force=False, initial_poll=False):
"""
main update function: polls for new objects, updates stats, ...
:param force: Perform the requested polling even if async events
are in use
are in use.
"""
finish_connecting = False
if self._closing:
return
if self.is_disconnected():
return
if self.is_connecting():
# If in 'connecting' state, and force requested, this means
# we are performing the initial poll.
if not force:
return
finish_connecting = True
if self.is_connecting() and not force:
return
# We need to set this before the event check, since stats polling
# is independent of events
@ -1169,10 +1182,9 @@ class vmmConnection(vmmGObject):
self.hostinfo = self._backend.getInfo()
gone_objects, new_objects, preexisting_objects = self._poll(
pollvm, pollnet, pollpool, polliface, pollnodedev)
self.idle_add(self._send_object_signals,
new_objects, gone_objects, finish_connecting)
gone_objects, preexisting_objects = self._poll(
initial_poll, pollvm, pollnet, pollpool, polliface, pollnodedev)
self.idle_add(self._gone_object_signals, gone_objects)
# Only tick() pre-existing objects, since new objects will be
# initialized asynchronously and tick() would be redundant

View File

@ -170,8 +170,6 @@ class vmmDomainSnapshot(vmmLibvirtObject):
vmmLibvirtObject.__init__(self, conn, backend, backend.getName(),
DomainSnapshot)
self.refresh_xml()
##########################
# Required class methods #
@ -192,6 +190,8 @@ class vmmDomainSnapshot(vmmLibvirtObject):
def tick(self, stats_update=True):
ignore = stats_update
def _init_libvirt_state(self):
self.refresh_xml()
###########
@ -344,20 +344,12 @@ class vmmDomain(vmmLibvirtObject):
self.inspection = vmmInspectionData()
if isinstance(self._backend, Guest):
return
self._libvirt_init()
def _cleanup(self):
for snap in self._snapshot_list or []:
snap.cleanup()
self._snapshot_list = None
def _libvirt_init(self):
"""
Initialization to do if backed by a libvirt virDomain
"""
def _init_libvirt_state(self):
self.managedsave_supported = self.conn.check_support(
self.conn.SUPPORT_DOMAIN_MANAGED_SAVE, self._backend)
self.remote_console_supported = self.conn.check_support(
@ -374,16 +366,16 @@ class vmmDomain(vmmLibvirtObject):
(self._inactive_xml_flags,
self._active_xml_flags) = self.conn.get_dom_flags(self._backend)
# Prime caches
self.tick()
self.has_managed_save()
self.snapshots_supported()
self.toggle_sample_network_traffic()
self.toggle_sample_disk_io()
self.toggle_sample_mem_stats()
self.toggle_sample_cpu_stats()
# Prime caches
self.refresh_xml()
self.has_managed_save()
self.snapshots_supported()
# Hook up listeners that need to be cleaned up
self.add_gsettings_handle(
self.config.on_stats_enable_cpu_poll_changed(
@ -541,7 +533,6 @@ class vmmDomain(vmmLibvirtObject):
def _invalidate_xml(self):
vmmLibvirtObject._invalidate_xml(self)
self._id = None
self._has_managed_save = None
self._status_reason = None
self._has_managed_save = None
@ -1092,7 +1083,9 @@ class vmmDomain(vmmLibvirtObject):
if self._snapshot_list is None:
newlist = []
for rawsnap in self._backend.listAllSnapshots():
newlist.append(vmmDomainSnapshot(self.conn, rawsnap))
obj = vmmDomainSnapshot(self.conn, rawsnap)
obj.init_libvirt_state()
newlist.append(obj)
self._snapshot_list = newlist
return self._snapshot_list[:]

View File

@ -27,9 +27,6 @@ class vmmInterface(vmmLibvirtObject):
def __init__(self, conn, backend, key):
vmmLibvirtObject.__init__(self, conn, backend, key, Interface)
(self._inactive_xml_flags,
self._active_xml_flags) = self.conn.get_interface_flags(self._backend)
##########################
# Required class methods #
@ -55,6 +52,12 @@ class vmmInterface(vmmLibvirtObject):
ignore = stats_update
self._refresh_status()
def _init_libvirt_state(self):
(self._inactive_xml_flags,
self._active_xml_flags) = self.conn.get_interface_flags(self._backend)
self.tick()
#####################
# Object operations #

View File

@ -28,8 +28,7 @@ from .baseclass import vmmGObject
class vmmLibvirtObject(vmmGObject):
__gsignals__ = {
"state-changed": (GObject.SignalFlags.RUN_FIRST, None, []),
"started": (GObject.SignalFlags.RUN_FIRST, None, []),
"stopped": (GObject.SignalFlags.RUN_FIRST, None, []),
"initialized": (GObject.SignalFlags.RUN_FIRST, None, []),
}
_STATUS_ACTIVE = 1
@ -42,7 +41,7 @@ class vmmLibvirtObject(vmmGObject):
self._key = key
self._parseclass = parseclass
self._initial_status_update = False
self._initialized = False
self.__status = self._STATUS_ACTIVE
self._support_isactive = None
@ -54,12 +53,11 @@ class vmmLibvirtObject(vmmGObject):
self._inactive_xml_flags = 0
self._active_xml_flags = 0
# Cache object name
# Cache object name. We may need to do this even
# before init_libvirt_state since it might be needed ahead of time.
self._name = None
self.get_name()
self._refresh_status()
@staticmethod
def log_redefine_xml_diff(obj, origxml, newxml):
objname = "<%s name=%s>" % (obj.__class__.__name__, obj.get_name())
@ -177,6 +175,23 @@ class vmmLibvirtObject(vmmGObject):
def tick(self, stats_update=True):
raise NotImplementedError()
def _init_libvirt_state(self):
raise NotImplementedError()
def init_libvirt_state(self):
"""
Function called by vmmConnection to populate initial state when
a new object appears.
"""
if self._initialized:
return
try:
self._init_libvirt_state()
finally:
self._initialized = True
self.idle_emit("initialized")
###################
# Status handling #
@ -215,7 +230,7 @@ class vmmLibvirtObject(vmmGObject):
"""
if (self._using_events() and
skip_if_have_events and
self._initial_status_update):
self._initialized):
return
try:
@ -237,8 +252,6 @@ class vmmLibvirtObject(vmmGObject):
kwargs = {"force": True, poll_param: True}
logging.debug("Scheduling priority tick with: %s", kwargs)
self.conn.schedule_priority_tick(**kwargs)
finally:
self._initial_status_update = True
def _backend_get_active(self):
if self._support_isactive is None:

View File

@ -68,6 +68,9 @@ class vmmNetwork(vmmLibvirtObject):
ignore = stats_update
self._refresh_status()
def _init_libvirt_state(self):
self.tick()
###########
# Actions #

View File

@ -30,7 +30,6 @@ def _parse_convert(conn, parsexml=None):
class vmmNodeDevice(vmmLibvirtObject):
def __init__(self, conn, backend, key):
vmmLibvirtObject.__init__(self, conn, backend, key, _parse_convert)
self._name = key
def _conn_tick_poll_param(self):
return "pollnodedev"
@ -41,8 +40,13 @@ class vmmNodeDevice(vmmLibvirtObject):
return self._backend.XMLDesc(flags)
def _get_backend_status(self):
return self._STATUS_ACTIVE
def _backend_get_name(self):
return self.get_connkey()
def is_active(self):
return True
def tick(self, stats_update=True):
# Deliberately empty
ignore = stats_update
def _init_libvirt_state(self):
self.tick()

View File

@ -56,7 +56,10 @@ class vmmStorageVolume(vmmLibvirtObject):
return self._STATUS_ACTIVE
def tick(self, stats_update=True):
# Deliberately empty
ignore = stats_update
def _init_libvirt_state(self):
self.refresh_xml()
###########
@ -141,6 +144,12 @@ class vmmStoragePool(vmmLibvirtObject):
ignore = stats_update
self._refresh_status()
def _init_libvirt_state(self):
self.tick()
self.refresh(skip_xml_refresh=True)
for vol in self.get_volumes():
vol.init_libvirt_state()
###########
# Actions #
@ -160,12 +169,17 @@ class vmmStoragePool(vmmLibvirtObject):
self._backend.undefine()
self._backend = None
def refresh(self):
def refresh(self, skip_xml_refresh=False):
"""
:param skip_xml_refresh: Only used by init_libvirt_state to avoid
double XML updating
"""
if not self.is_active():
return
self._backend.refresh(0)
self.refresh_xml()
if skip_xml_refresh:
self.refresh_xml()
self._update_volumes()
self.idle_emit("refreshed")
self._last_refresh_time = time.time()