Run the main tick function in a thread.
Since libvirt has supported multithreaded client connections for a while now, we can run all our libvirt polling in a thread. This will prevent the UI from blocking and becoming sluggish if there are lots of VMs or connections open. We just need to be vigilant in ensuring that all UI updating done via any tick function is scheduled with gobject.idle_add, to preserve the benefits.
This commit is contained in:
parent
2da5651b8c
commit
1224f90699
|
@ -30,6 +30,7 @@ import threading
|
|||
import gtk
|
||||
import virtinst
|
||||
|
||||
from virtManager import util
|
||||
from virtManager.domain import vmmDomain
|
||||
from virtManager.network import vmmNetwork
|
||||
from virtManager.storagepool import vmmStoragePool
|
||||
|
@ -317,12 +318,12 @@ class vmmConnection(gobject.GObject):
|
|||
if self.state != self.STATE_DISCONNECTED:
|
||||
return
|
||||
|
||||
self.state = self.STATE_CONNECTING
|
||||
self.emit("state-changed")
|
||||
self._change_state(self.STATE_CONNECTING)
|
||||
|
||||
logging.debug("Scheduling background open thread for " + self.uri)
|
||||
self.connectThreadEvent.clear()
|
||||
self.connectThread = threading.Thread(target = self._open_thread, name="Connect " + self.uri)
|
||||
self.connectThread = threading.Thread(target = self._open_thread,
|
||||
name = "Connect %s" % self.uri)
|
||||
self.connectThread.setDaemon(True)
|
||||
self.connectThread.start()
|
||||
|
||||
|
@ -475,47 +476,47 @@ class vmmConnection(gobject.GObject):
|
|||
# We want to kill off this thread asap, so schedule a gobject
|
||||
# idle even to inform the UI of result
|
||||
logging.debug("Background open thread complete, scheduling notify")
|
||||
gtk.gdk.threads_enter()
|
||||
try:
|
||||
gobject.idle_add(self._open_notify)
|
||||
finally:
|
||||
gtk.gdk.threads_leave()
|
||||
gobject.idle_add(self._open_notify)
|
||||
self.connectThread = None
|
||||
|
||||
def _open_notify(self):
|
||||
logging.debug("Notifying open result")
|
||||
gtk.gdk.threads_enter()
|
||||
|
||||
try:
|
||||
if self.state == self.STATE_ACTIVE:
|
||||
logging.debug("%s capabilities:\n%s" %
|
||||
(self.get_uri(), self.vmm.getCapabilities()))
|
||||
|
||||
self.tick()
|
||||
# If VMs disappeared since the last time we connected to
|
||||
# this uri, remove their gconf entries so we don't pollute
|
||||
# the database
|
||||
self.config.reconcile_vm_entries(self.get_uri(),
|
||||
self.vms.keys())
|
||||
self.emit("state-changed")
|
||||
|
||||
gobject.idle_add(util.idle_emit, self, "state-changed")
|
||||
|
||||
if self.state == self.STATE_DISCONNECTED:
|
||||
self.emit("connect-error", self.connectError)
|
||||
gobject.idle_add(util.idle_emit, self, "connect-error",
|
||||
self.connectError)
|
||||
self.connectError = None
|
||||
finally:
|
||||
self.connectThreadEvent.set()
|
||||
gtk.gdk.threads_leave()
|
||||
|
||||
def _change_state(self, newstate):
|
||||
if self.state != newstate:
|
||||
self.state = newstate
|
||||
self.emit("state-changed")
|
||||
|
||||
def pause(self):
|
||||
if self.state != self.STATE_ACTIVE:
|
||||
return
|
||||
self.state = self.STATE_INACTIVE
|
||||
self.emit("state-changed")
|
||||
self._change_state(self.STATE_INACTIVE)
|
||||
|
||||
def resume(self):
|
||||
if self.state != self.STATE_INACTIVE:
|
||||
return
|
||||
self.state = self.STATE_ACTIVE
|
||||
self.emit("state-changed")
|
||||
self._change_state(self.STATE_ACTIVE)
|
||||
|
||||
def close(self):
|
||||
if self.vmm == None:
|
||||
|
@ -528,8 +529,7 @@ class vmmConnection(gobject.GObject):
|
|||
self.vms = {}
|
||||
self.activeUUIDs = []
|
||||
self.record = []
|
||||
self.state = self.STATE_DISCONNECTED
|
||||
self.emit("state-changed")
|
||||
self._change_state(self.STATE_DISCONNECTED)
|
||||
|
||||
def list_vm_uuids(self):
|
||||
return self.vms.keys()
|
||||
|
@ -873,33 +873,42 @@ class vmmConnection(gobject.GObject):
|
|||
(startVMs, newVMs, oldVMs,
|
||||
self.vms, self.activeUUIDs) = self._update_vms()
|
||||
|
||||
# Update VM states
|
||||
for uuid in oldVMs:
|
||||
self.emit("vm-removed", self.uri, uuid)
|
||||
oldVMs[uuid].release_handle()
|
||||
for uuid in newVMs:
|
||||
self.emit("vm-added", self.uri, uuid)
|
||||
for uuid in startVMs:
|
||||
self.emit("vm-started", self.uri, uuid)
|
||||
def tick_send_signals():
|
||||
"""
|
||||
Responsible for signaling the UI for any updates. All possible UI
|
||||
updates need to go here to enable threading that doesn't block the
|
||||
app with long tick operations.
|
||||
"""
|
||||
|
||||
# Update virtual network states
|
||||
for uuid in oldNets:
|
||||
self.emit("net-removed", self.uri, uuid)
|
||||
for uuid in newNets:
|
||||
self.emit("net-added", self.uri, uuid)
|
||||
for uuid in startNets:
|
||||
self.emit("net-started", self.uri, uuid)
|
||||
for uuid in stopNets:
|
||||
self.emit("net-stopped", self.uri, uuid)
|
||||
# Update VM states
|
||||
for uuid in oldVMs:
|
||||
self.emit("vm-removed", self.uri, uuid)
|
||||
oldVMs[uuid].release_handle()
|
||||
for uuid in newVMs:
|
||||
self.emit("vm-added", self.uri, uuid)
|
||||
for uuid in startVMs:
|
||||
self.emit("vm-started", self.uri, uuid)
|
||||
|
||||
for uuid in oldPools:
|
||||
self.emit("pool-removed", self.uri, uuid)
|
||||
for uuid in newPools:
|
||||
self.emit("pool-added", self.uri, uuid)
|
||||
for uuid in startPools:
|
||||
self.emit("pool-started", self.uri, uuid)
|
||||
for uuid in stopPools:
|
||||
self.emit("pool-stopped", self.uri, uuid)
|
||||
# Update virtual network states
|
||||
for uuid in oldNets:
|
||||
self.emit("net-removed", self.uri, uuid)
|
||||
for uuid in newNets:
|
||||
self.emit("net-added", self.uri, uuid)
|
||||
for uuid in startNets:
|
||||
self.emit("net-started", self.uri, uuid)
|
||||
for uuid in stopNets:
|
||||
self.emit("net-stopped", self.uri, uuid)
|
||||
|
||||
for uuid in oldPools:
|
||||
self.emit("pool-removed", self.uri, uuid)
|
||||
for uuid in newPools:
|
||||
self.emit("pool-added", self.uri, uuid)
|
||||
for uuid in startPools:
|
||||
self.emit("pool-started", self.uri, uuid)
|
||||
for uuid in stopPools:
|
||||
self.emit("pool-stopped", self.uri, uuid)
|
||||
|
||||
gobject.idle_add(tick_send_signals)
|
||||
|
||||
# Finally, we sample each domain
|
||||
now = time()
|
||||
|
@ -914,6 +923,8 @@ class vmmConnection(gobject.GObject):
|
|||
if not noStatsUpdate:
|
||||
self._recalculate_stats(now)
|
||||
|
||||
gobject.idle_add(util.idle_emit, self, "resources-sampled")
|
||||
|
||||
return 1
|
||||
|
||||
def _recalculate_stats(self, now):
|
||||
|
@ -969,7 +980,6 @@ class vmmConnection(gobject.GObject):
|
|||
}
|
||||
|
||||
self.record.insert(0, newStats)
|
||||
self.emit("resources-sampled")
|
||||
|
||||
def cpu_time_vector(self):
|
||||
vector = []
|
||||
|
|
|
@ -134,7 +134,7 @@ class vmmDomain(gobject.GObject):
|
|||
if origxml != self._xml:
|
||||
# 'tick' to make sure we have the latest time
|
||||
self.tick(time.time())
|
||||
self.emit("config-changed")
|
||||
gobject.idle_add(util.idle_emit, self, "config-changed")
|
||||
|
||||
def _invalidate_xml(self):
|
||||
# Mark cached xml as invalid
|
||||
|
@ -276,7 +276,7 @@ class vmmDomain(gobject.GObject):
|
|||
# Domain just started. Invalidate inactive xml
|
||||
self._orig_inactive_xml = None
|
||||
self.lastStatus = status
|
||||
self.emit("status-changed", status)
|
||||
gobject.idle_add(util.idle_emit, self, "status-changed", status)
|
||||
|
||||
# GConf specific wranglings
|
||||
def set_console_scaling(self, value):
|
||||
|
@ -442,7 +442,7 @@ class vmmDomain(gobject.GObject):
|
|||
self._set_max_rate(r + "Rate")
|
||||
|
||||
self._update_status(info[0])
|
||||
self.emit("resources-sampled")
|
||||
gobject.idle_add(util.idle_emit, self, "resources-sampled")
|
||||
|
||||
|
||||
def current_memory(self):
|
||||
|
|
|
@ -24,6 +24,7 @@ import libvirt
|
|||
import logging
|
||||
import gnome
|
||||
import traceback
|
||||
import threading
|
||||
|
||||
from virtManager.about import vmmAbout
|
||||
from virtManager.netdevhelper import vmmNetDevHelper
|
||||
|
@ -66,6 +67,12 @@ class vmmEngine(gobject.GObject):
|
|||
self.timer = None
|
||||
self.last_timeout = 0
|
||||
|
||||
self._tick_thread = None
|
||||
self._tick_thread_slow = False
|
||||
self._libvirt_support_threading = (libvirt.getVersion() >= 6000)
|
||||
if not self._libvirt_support_threading:
|
||||
logging.debug("Libvirt doesn't support threading, skipping.")
|
||||
|
||||
# Counter keeping track of how many manager and details windows
|
||||
# are open. When it is decremented to 0, close the app
|
||||
self.windows = 0
|
||||
|
@ -145,11 +152,19 @@ class vmmEngine(gobject.GObject):
|
|||
self.timer = gobject.timeout_add(interval, self.tick)
|
||||
|
||||
def tick(self):
|
||||
gtk.gdk.threads_enter()
|
||||
try:
|
||||
if not self._libvirt_support_threading:
|
||||
return self._tick()
|
||||
finally:
|
||||
gtk.gdk.threads_leave()
|
||||
|
||||
if self._tick_thread and self._tick_thread.isAlive():
|
||||
if not self._tick_is_slow:
|
||||
logging.debug("Tick is slow, not running at requested rate.")
|
||||
return 1
|
||||
|
||||
self._tick_thread = threading.Thread(name="Tick thread",
|
||||
target=self._tick, args=())
|
||||
self._tick_thread.daemon = False
|
||||
self._tick_thread.start()
|
||||
return 1
|
||||
|
||||
def _tick(self):
|
||||
for uri in self.connections.keys():
|
||||
|
|
|
@ -227,3 +227,10 @@ def pretty_hv(gtype, domtype):
|
|||
label = "test (hvm)"
|
||||
|
||||
return label
|
||||
|
||||
def idle_emit(self, signal, *args):
|
||||
"""
|
||||
Safe wrapper for using 'self.emit' with gobject.idle_add
|
||||
"""
|
||||
self.emit(signal, *args)
|
||||
return False
|
||||
|
|
Loading…
Reference in New Issue