diskbackend: Add lots of test coverage

Signed-off-by: Cole Robinson <crobinso@redhat.com>
This commit is contained in:
Cole Robinson 2020-01-29 07:30:51 -05:00
parent 04c0d48ef7
commit e7bb021c4c
6 changed files with 149 additions and 120 deletions

View File

@ -217,6 +217,11 @@
<source file="/var/lib/libvirt/images/disk.img"/>
<target dev="vdp" bus="virtio"/>
</disk>
<disk type="block" device="disk">
<driver name="qemu" type="raw" cache="none" io="native"/>
<source dev="/dev/disk-pool/disk"/>
<target dev="vdq" bus="virtio"/>
</disk>
<controller type="usb" index="0" model="ich9-ehci1">
<address type="pci" domain="0" bus="0" slot="4" function="7"/>
</controller>

View File

@ -615,6 +615,7 @@ vcpus.vcpu1.id=2,vcpus.vcpu1.enabled=yes
source.reservations.managed=no,source.reservations.source.type=unix,source.reservations.source.path=/var/run/test/pr-helper0.sock,source.reservations.source.mode=client
--disk vol=iscsi-direct/unit:0:0:1
--disk size=.0001,format=raw
--disk size=.0001,pool=disk-pool
--network user,mac=12:34:56:78:11:22,portgroup=foo,link_state=down,rom_bar=on,rom_file=/tmp/foo
--network bridge=foobar,model=virtio,driver_name=qemu,driver_queues=3,filterref=foobar,rom.bar=off,rom.file=/some/rom,source.portgroup=foo

View File

@ -206,6 +206,17 @@ class TestXMLMisc(unittest.TestCase):
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
self.assertTrue(not bool(errdict))
# Mock setfacl to definitely, as getfacl won't accept args
with unittest.mock.patch("virtinst.diskbackend.SETFACL",
"getfacl"):
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
# Test uid check short circuiting
searchdata.uid = os.getuid()
os.chown(tmpdir, os.getuid(), os.getgid())
assert virtinst.diskbackend.is_path_searchable(
tmpdir, os.getuid(), "foo") == []
def test_path_in_use(self):
# Extra tests for DeviceDisk.path_in_use
conn = utils.URIs.open_kvm()
@ -238,3 +249,20 @@ class TestXMLMisc(unittest.TestCase):
except Exception as e:
if not self.conn.support.is_libvirt_error_no_domain(e):
raise
def test_disk_backend(self):
# Test get_size() with vol_install
disk = virtinst.DeviceDisk(self.conn)
pool = self.conn.storagePoolLookupByName("default-pool")
vol_install = disk.build_vol_install(self.conn, "newvol1.img",
pool, 1, False)
disk.set_vol_install(vol_install)
assert disk.get_size() == 1.0
# Test some blockdev inspecting
if os.path.exists("/dev/loop0"):
conn = utils.URIs.openconn("test:///default")
disk = virtinst.DeviceDisk(conn)
disk.path = "/dev/loop0"
assert disk.type == "block"
disk.get_size()

View File

@ -1558,11 +1558,9 @@ class XMLParseTest(unittest.TestCase):
guest.devices.replace_child(guest.devices.disk[4], newdisk)
utils.diff_compare(guest.get_xml(), parsefile)
def testDiskRevalidate(self):
"""
Test that calling validate() on parsed disk XML doesn't attempt
to verify the path exists. Assume it's a working config
"""
def testDiskBackend(self):
# Test that calling validate() on parsed disk XML doesn't attempt
# to verify the path exists. Assume it's a working config
xml = ("<disk type='file' device='disk'>"
"<source file='/A/B/C/D/NOPE'/>"
"</disk>")
@ -1572,6 +1570,17 @@ class XMLParseTest(unittest.TestCase):
disk.build_storage(None)
self.assertTrue(getattr(disk, "_storage_backend").is_stub())
# Stub backend coverage testing
backend = getattr(disk, "_storage_backend")
assert disk.get_parent_pool() is None
assert disk.get_vol_object() is None
assert disk.get_vol_install() is None
assert disk.get_size() == 0
assert backend.get_vol_xml() is None
assert backend.get_dev_type() == "file"
assert backend.get_driver_type() is None
assert backend.get_parent_pool() is None
disk.set_backend_for_existing_path()
self.assertFalse(getattr(disk, "_storage_backend").is_stub())

View File

@ -734,13 +734,13 @@ class DeviceDisk(Device):
return
if (not self._storage_backend.will_create_storage() and
not self._storage_backend.exists()):
if (not self._storage_backend.exists() and
not self._storage_backend.will_create_storage()):
raise ValueError(
_("Must specify storage creation parameters for "
"non-existent path '%s'.") % self.path)
self._storage_backend.validate(self)
self._storage_backend.validate()
def build_storage(self, meter):
"""

View File

@ -50,29 +50,29 @@ def _lookup_vol_by_basename(pool, path):
return pool.storageVolLookupByName(name)
def _stat_disk(path):
"""
Returns the tuple (isreg, size)
"""
def _get_block_size(path):
try:
fd = os.open(path, os.O_RDONLY)
# os.SEEK_END is not present on all systems
size = os.lseek(fd, 0, 2) # pragma: no cover
os.close(fd) # pragma: no cover
except Exception:
size = 0
return size
def _get_size(path):
if not os.path.exists(path):
return True, 0
return 0
if _stat_is_block(path):
return _get_block_size(path)
return os.path.getsize(path)
mode = os.stat(path)[stat.ST_MODE]
# os.path.getsize('/dev/..') can be zero on some platforms
if stat.S_ISBLK(mode):
try:
fd = os.open(path, os.O_RDONLY)
# os.SEEK_END is not present on all systems
size = os.lseek(fd, 0, 2)
os.close(fd)
except Exception:
size = 0
return False, size
elif stat.S_ISREG(mode):
return True, os.path.getsize(path)
return True, 0
def _stat_is_block(path):
if not os.path.exists(path):
return False
return stat.S_ISBLK(os.stat(path)[stat.ST_MODE])
def _check_if_path_managed(conn, path):
@ -98,14 +98,14 @@ def _check_if_path_managed(conn, path):
if verr:
try:
vol = _lookup_vol_by_basename(pool, path)
except Exception:
except Exception: # pragma: no cover
pass
except Exception as e:
except Exception as e: # pragma: no cover
vol = None
pool = None
verr = str(e)
if not vol and not pool and verr:
if not vol and not pool and verr: # pragma: no cover
raise ValueError(_("Cannot use storage %(path)s: %(err)s") %
{'path': path, 'err': verr})
@ -178,25 +178,8 @@ def _get_dev_type(path, vol_xml, vol_object, pool_xml, remote):
"""
Try to get device type for volume.
"""
if vol_xml:
if vol_xml.type:
return vol_xml.type
# If vol_xml.type is None the vol_xml.file_type can return only
# these types: block, network or file
if vol_xml.file_type == libvirt.VIR_STORAGE_VOL_BLOCK:
return "block"
elif vol_xml.file_type == libvirt.VIR_STORAGE_VOL_NETWORK:
return "network"
if vol_object:
t = vol_object.info()[0]
if t == StorageVolume.TYPE_FILE:
return "file"
elif t == StorageVolume.TYPE_BLOCK:
return "block"
elif t == StorageVolume.TYPE_NETWORK:
return "network"
if vol_xml and vol_xml.type:
return vol_xml.type
if pool_xml:
t = pool_xml.get_disk_type()
@ -205,6 +188,17 @@ def _get_dev_type(path, vol_xml, vol_object, pool_xml, remote):
elif t == StorageVolume.TYPE_NETWORK:
return "network"
if vol_object: # pragma: no cover
# This path is hard to test, because test suite XML always has
# the vol_xml.type set
t = vol_object.info()[0]
if t == StorageVolume.TYPE_FILE:
return "file"
elif t == StorageVolume.TYPE_BLOCK:
return "block"
elif t == StorageVolume.TYPE_NETWORK:
return "network"
if path:
if path_is_url(path):
return "network"
@ -212,9 +206,7 @@ def _get_dev_type(path, vol_xml, vol_object, pool_xml, remote):
if not remote:
if os.path.isdir(path):
return "dir"
elif _stat_disk(path)[0]:
return "file"
else:
elif _stat_is_block(path):
return "block"
return "file"
@ -246,8 +238,11 @@ def path_definitely_exists(conn, path):
# ACL/path perm helpers #
#########################
SETFACL = "setfacl"
def _fix_perms_acl(dirname, username):
cmd = ["setfacl", "--modify", "user:%s:x" % username, dirname]
cmd = [SETFACL, "--modify", "user:%s:x" % username, dirname]
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@ -269,8 +264,8 @@ def _fix_perms_chmod(dirname):
if os.stat(dirname).st_mode != newmode:
# Trying to change perms on vfat at least doesn't work
# but also doesn't seem to error. Try and detect that
raise ValueError(_("Permissions on '%s' did not stick") %
dirname)
raise ValueError( # pragma: no cover
_("Permissions on '%s' did not stick") % dirname)
def set_dirs_searchable(dirlist, username):
@ -289,7 +284,7 @@ def set_dirs_searchable(dirlist, username):
try:
# If we reach here, ACL setting failed, try chmod
_fix_perms_chmod(dirname)
except Exception as e:
except Exception as e: # pragma: no cover
errdict[dirname] = str(e)
return errdict
@ -304,13 +299,13 @@ def _is_dir_searchable(dirname, uid, username):
try:
statinfo = os.stat(dirname)
except OSError:
except OSError: # pragma: no cover
return False
if uid == statinfo.st_uid:
flag = stat.S_IXUSR
elif uid == statinfo.st_gid:
flag = stat.S_IXGRP
flag = stat.S_IXGRP # pragma: no cover
else:
flag = stat.S_IXOTH
@ -324,11 +319,11 @@ def _is_dir_searchable(dirname, uid, username):
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
except OSError:
except OSError: # pragma: no cover
log.debug("Didn't find the getfacl command.")
return False
if proc.returncode != 0:
if proc.returncode != 0: # pragma: no cover
log.debug("Cmd '%s' failed: %s", cmd, err)
return False
@ -386,7 +381,7 @@ class _StorageBase(object):
self._parent_pool_xml = StoragePool(self._conn,
parsexml=self.get_parent_pool().XMLDesc(0))
return self._parent_pool_xml
def validate(self, disk):
def validate(self):
raise NotImplementedError()
def get_path(self):
raise NotImplementedError()
@ -424,7 +419,11 @@ class _StorageCreator(_StorageBase):
##############
def create(self, progresscb):
raise NotImplementedError()
raise NotImplementedError
def validate(self):
raise NotImplementedError
def get_size(self):
raise NotImplementedError
def get_path(self):
if self._vol_install and not self._path:
@ -445,12 +444,6 @@ class _StorageCreator(_StorageBase):
def get_vol_xml(self):
return self._vol_install
def get_size(self):
if self._size is None:
self._size = (float(self._vol_install.capacity) /
1024.0 / 1024.0 / 1024.0)
return self._size
def get_dev_type(self):
if not self._dev_type:
self._dev_type = _get_dev_type(self._path, self._vol_install, None,
@ -464,25 +457,6 @@ class _StorageCreator(_StorageBase):
return self._vol_install.format
return "raw"
def validate(self, disk):
if disk.device in ["floppy", "cdrom"]:
raise ValueError(_("Cannot create storage for %s device.") %
disk.device)
if self._vol_install:
self._vol_install.validate()
return
if self._size is None:
raise ValueError(_("size is required for non-existent disk "
"'%s'" % self.get_path()))
err, msg = self.is_size_conflict()
if err:
raise ValueError(msg)
if msg:
log.warning(msg)
def will_create_storage(self):
return True
def get_vol_object(self):
@ -495,6 +469,28 @@ class _StorageCreator(_StorageBase):
return False
class ManagedStorageCreator(_StorageCreator):
"""
Handles storage creation via libvirt APIs. All the actual creation
logic lives in StorageVolume, this is mostly about pulling out bits
from that class and mapping them to DeviceDisk elements
"""
def __init__(self, conn, vol_install):
_StorageCreator.__init__(self, conn)
self._pool = vol_install.pool
self._vol_install = vol_install
def create(self, progresscb):
return self._vol_install.install(meter=progresscb)
def is_size_conflict(self):
return self._vol_install.is_size_conflict()
def validate(self):
return self._vol_install.validate()
def get_size(self):
return float(self._vol_install.capacity) / 1024.0 / 1024.0 / 1024.0
class CloneStorageCreator(_StorageCreator):
"""
Handles manually copying local files for Cloner
@ -511,11 +507,14 @@ class CloneStorageCreator(_StorageCreator):
self._size = size
self._sparse = sparse
def get_size(self):
return self._size
def is_size_conflict(self):
ret = False
msg = None
if self.get_dev_type() == "block":
avail = _stat_disk(self._path)[1]
avail = _get_size(self._path)
else:
vfs = os.statvfs(os.path.dirname(self._path))
avail = vfs.f_frsize * vfs.f_bavail
@ -535,6 +534,17 @@ class CloneStorageCreator(_StorageCreator):
((need // (1024 * 1024)), (avail // (1024 * 1024))))
return (ret, msg)
def validate(self):
if self._size is None:
raise ValueError(_("size is required for non-existent disk "
"'%s'" % self.get_path()))
err, msg = self.is_size_conflict()
if err:
raise ValueError(msg)
if msg:
log.warning(msg)
def create(self, progresscb):
text = (_("Cloning %(srcfile)s") %
{'srcfile': os.path.basename(self._input_path)})
@ -547,7 +557,7 @@ class CloneStorageCreator(_StorageCreator):
self._clone_local(progresscb, size_bytes)
def _clone_local(self, meter, size_bytes):
if self._input_path == "/dev/null":
if self._input_path == "/dev/null": # pragma: no cover
# Not really sure why this check is here,
# but keeping for compat
log.debug("Source dev was /dev/null. Skipping")
@ -605,7 +615,7 @@ class CloneStorageCreator(_StorageCreator):
i += s
if i < size_bytes:
meter.update(i)
except OSError as e:
except OSError as e: # pragma: no cover
raise RuntimeError(_("Error cloning diskimage %s to %s: %s") %
(self._input_path, self._output_path, str(e)))
finally:
@ -615,24 +625,6 @@ class CloneStorageCreator(_StorageCreator):
os.close(dst_fd)
class ManagedStorageCreator(_StorageCreator):
"""
Handles storage creation via libvirt APIs. All the actual creation
logic lives in StorageVolume, this is mostly about pulling out bits
from that class and mapping them to DeviceDisk elements
"""
def __init__(self, conn, vol_install):
_StorageCreator.__init__(self, conn)
self._pool = vol_install.pool
self._vol_install = vol_install
def create(self, progresscb):
return self._vol_install.install(meter=progresscb)
def is_size_conflict(self):
return self._vol_install.is_size_conflict()
class StorageBackendStub(_StorageBase):
"""
Class representing a storage path for a parsed XML disk, that we
@ -662,8 +654,7 @@ class StorageBackendStub(_StorageBase):
def get_driver_type(self):
return self._driver_type
def validate(self, disk):
ignore = disk
def validate(self):
return
def get_vol_install(self):
return None
@ -732,15 +723,15 @@ class StorageBackend(_StorageBase):
if self._vol_object:
ret = self.get_vol_xml().capacity
elif self._path:
ret = _stat_disk(self._path)[1]
ret = _get_size(self._path)
self._size = (float(ret) / 1024.0 / 1024.0 / 1024.0)
return self._size
def exists(self):
if self._exists is None:
if self._path is None:
if self._vol_object:
self._exists = True
elif self._vol_object:
elif self._path is None:
self._exists = True
elif (not self.get_dev_type() == "network" and
not self._conn.is_remote() and
@ -781,8 +772,7 @@ class StorageBackend(_StorageBase):
return ret
return None
def validate(self, disk):
ignore = disk
def validate(self):
return
def get_vol_install(self):
return None
@ -790,7 +780,3 @@ class StorageBackend(_StorageBase):
return (False, None)
def will_create_storage(self):
return False
def create(self, progresscb):
ignore = progresscb
raise RuntimeError("programming error: %s can't create storage" %
self.__class__.__name__)