virt-manager/virtinst/nodedev.py

235 lines
7.2 KiB
Python

#
# Copyright 2009, 2013 Red Hat, Inc.
#
# This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory.
import os
from .logger import log
from .xmlbuilder import XMLBuilder, XMLProperty, XMLChildProperty
def _compare_int(nodedev_val, hostdev_val):
def _intify(val):
try:
if "0x" in str(val):
return int(val or '0x00', 16)
else:
return int(val)
except Exception:
return -1
nodedev_val = _intify(nodedev_val)
hostdev_val = _intify(hostdev_val)
return (nodedev_val == hostdev_val or hostdev_val == -1)
class DevNode(XMLBuilder):
XML_NAME = "devnode"
node_type = XMLProperty("./@type")
path = XMLProperty(".")
class NodeDevice(XMLBuilder):
CAPABILITY_TYPE_NET = "net"
CAPABILITY_TYPE_PCI = "pci"
CAPABILITY_TYPE_USBDEV = "usb_device"
CAPABILITY_TYPE_STORAGE = "storage"
CAPABILITY_TYPE_SCSIBUS = "scsi_host"
CAPABILITY_TYPE_SCSIDEV = "scsi"
CAPABILITY_TYPE_DRM = "drm"
@staticmethod
def lookupNodedevFromString(conn, idstring):
"""
Convert the passed libvirt node device name to a NodeDevice
instance, with proper error reporting. If the name is name is not
found, we will attempt to parse the name as would be passed to
devAddressToNodeDev
:param conn: libvirt.virConnect instance to perform the lookup on
:param idstring: libvirt node device name to lookup, or address
of the form:
- bus.addr (ex. 001.003 for a usb device)
- vendor:product (ex. 0x1234:0x5678 for a usb device
- (domain:)bus:slot.func (ex. 00:10.0 for a pci device)
:returns: NodeDevice instance
"""
# First try and see if this is a libvirt nodedev name
for nodedev in conn.fetch_all_nodedevs():
if nodedev.name == idstring:
return nodedev
try:
return _AddressStringToNodedev(conn, idstring)
except Exception:
log.debug("Error looking up nodedev from idstring=%s",
idstring, exc_info=True)
raise
XML_NAME = "device"
# Libvirt can generate bogus 'system' XML:
# https://bugzilla.redhat.com/show_bug.cgi?id=1184131
_XML_SANITIZE = True
name = XMLProperty("./name")
parent = XMLProperty("./parent")
device_type = XMLProperty("./capability/@type")
def compare_to_hostdev(self, hostdev):
if self.device_type == "pci":
if hostdev.type != "pci":
return False
return (_compare_int(self.domain, hostdev.domain) and
_compare_int(self.bus, hostdev.bus) and
_compare_int(self.slot, hostdev.slot) and
_compare_int(self.function, hostdev.function))
if self.device_type == "usb_device":
if hostdev.type != "usb":
return False
return (_compare_int(self.product_id, hostdev.product) and
_compare_int(self.vendor_id, hostdev.vendor) and
_compare_int(self.bus, hostdev.bus) and
_compare_int(self.device, hostdev.device))
return False
########################
# XML helper functions #
########################
def is_pci_sriov(self):
return self._capability_type == "virt_functions"
def is_pci_bridge(self):
return self._capability_type == "pci-bridge"
def is_usb_linux_root_hub(self):
return (self.vendor_id == "0x1d6b" and
self.product_id in ["0x0001", "0x0002", "0x0003"])
def is_drm_render(self):
return self.device_type == "drm" and self.drm_type == "render"
##################
# XML properties #
##################
# type='net' options
interface = XMLProperty("./capability/interface")
# type='pci' options
domain = XMLProperty("./capability/domain")
bus = XMLProperty("./capability/bus")
slot = XMLProperty("./capability/slot")
function = XMLProperty("./capability/function")
product_name = XMLProperty("./capability/product")
vendor_name = XMLProperty("./capability/vendor")
_capability_type = XMLProperty("./capability/capability/@type")
# type='usb' options
device = XMLProperty("./capability/device")
product_id = XMLProperty("./capability/product/@id")
vendor_id = XMLProperty("./capability/vendor/@id")
# type='scsi' options
host = XMLProperty("./capability/host")
target = XMLProperty("./capability/target")
lun = XMLProperty("./capability/lun")
# type='storage' options
block = XMLProperty("./capability/block")
drive_type = XMLProperty("./capability/drive_type")
media_label = XMLProperty(
"./capability/capability[@type='removable']/media_label")
media_available = XMLProperty(
"./capability/capability[@type='removable']/media_available",
is_int=True)
# type='drm' options
drm_type = XMLProperty("./capability/type")
devnodes = XMLChildProperty(DevNode)
def get_devnode(self, parent="by-path"):
for d in self.devnodes:
paths = d.path.split(os.sep)
if len(paths) > 2 and paths[-2] == parent:
return d
if len(self.devnodes) > 0:
return self.devnodes[0]
def _AddressStringToHostdev(conn, addrstr):
from .devices import DeviceHostdev
hostdev = DeviceHostdev(conn)
try:
# Determine addrstr type
if addrstr.count(":") in [1, 2] and "." in addrstr:
addrstr, func = addrstr.split(".", 1)
addrstr, slot = addrstr.rsplit(":", 1)
domain = "0"
if ":" in addrstr:
domain, bus = addrstr.split(":", 1)
else:
bus = addrstr
hostdev.type = "pci"
hostdev.domain = "0x%.4X" % int(domain, 16)
hostdev.function = "0x%.2X" % int(func, 16)
hostdev.slot = "0x%.2X" % int(slot, 16)
hostdev.bus = "0x%.2X" % int(bus, 16)
elif ":" in addrstr:
vendor, product = addrstr.split(":")
hostdev.type = "usb"
hostdev.vendor = "0x%.4X" % int(vendor, 16)
hostdev.product = "0x%.4X" % int(product, 16)
elif "." in addrstr:
bus, device = addrstr.split(".", 1)
hostdev.type = "usb"
hostdev.bus = bus
hostdev.device = device
else:
raise RuntimeError("Unknown address type")
except Exception:
log.debug("Error parsing node device string.", exc_info=True)
raise
return hostdev
def _AddressStringToNodedev(conn, addrstr):
hostdev = _AddressStringToHostdev(conn, addrstr)
# Iterate over node devices and compare
count = 0
nodedev = None
for xmlobj in conn.fetch_all_nodedevs():
if xmlobj.compare_to_hostdev(hostdev):
nodedev = xmlobj
count += 1
if count == 1:
return nodedev
elif count > 1:
raise ValueError(_("%s corresponds to multiple node devices") %
addrstr)
elif count < 1:
raise ValueError(_("Did not find a matching node device for '%s'") %
addrstr)