229 lines
7.9 KiB
Python
229 lines
7.9 KiB
Python
# Copyright (C) 2013, 2014 Red Hat, Inc.
|
|
#
|
|
# This work is licensed under the GNU GPLv2 or later.
|
|
# See the COPYING file in the top-level directory.
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
|
|
import virtinst
|
|
from virtinst import DeviceDisk
|
|
|
|
from tests import utils
|
|
|
|
|
|
def test_disk_numtotarget():
|
|
# Various testing our target generation
|
|
#
|
|
# Note: using single quotes in strings to avoid
|
|
# codespell flagging the 'ba' assert
|
|
assert DeviceDisk.num_to_target(1) == 'a'
|
|
assert DeviceDisk.num_to_target(2) == 'b'
|
|
assert DeviceDisk.num_to_target(26) == 'z'
|
|
assert DeviceDisk.num_to_target(27) == 'aa'
|
|
assert DeviceDisk.num_to_target(28) == 'ab'
|
|
assert DeviceDisk.num_to_target(52) == 'az'
|
|
assert DeviceDisk.num_to_target(53) == 'ba'
|
|
assert DeviceDisk.num_to_target(27 * 26) == 'zz'
|
|
assert DeviceDisk.num_to_target(27 * 26 + 1) == 'aaa'
|
|
|
|
assert DeviceDisk.target_to_num('hda') == 0
|
|
assert DeviceDisk.target_to_num('hdb') == 1
|
|
assert DeviceDisk.target_to_num('sdz') == 25
|
|
assert DeviceDisk.target_to_num('sdaa') == 26
|
|
assert DeviceDisk.target_to_num('vdab') == 27
|
|
assert DeviceDisk.target_to_num('vdaz') == 51
|
|
assert DeviceDisk.target_to_num('xvdba') == 52
|
|
assert DeviceDisk.target_to_num('xvdzz') == 26 * (25 + 1) + 25
|
|
assert DeviceDisk.target_to_num('xvdaaa') == 26 * 26 * 1 + 26 * 1 + 0
|
|
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
disk = virtinst.DeviceDisk(conn)
|
|
disk.bus = 'ide'
|
|
|
|
assert disk.generate_target([]) == 'hda'
|
|
assert disk.generate_target(['hda']) == 'hdb'
|
|
assert disk.generate_target(['hdb', 'sda']) == 'hdc'
|
|
assert disk.generate_target(['hda', 'hdd']) == 'hdb'
|
|
|
|
|
|
def test_disk_dir_searchable(monkeypatch):
|
|
# Normally the dir searchable test is skipped in the test suite,
|
|
# but let's contrive an example that should trigger all the code
|
|
# to ensure it isn't horribly broken
|
|
conn = utils.URIs.open_kvm()
|
|
|
|
def _set_caps_baselabel_uid(uid):
|
|
secmodel = [s for s in conn.caps.host.secmodels
|
|
if s.model == "dac"][0]
|
|
for baselabel in [b for b in secmodel.baselabels
|
|
if b.type in ["qemu", "kvm"]]:
|
|
baselabel.content = "+%s:+%s" % (uid, uid)
|
|
|
|
tmpobj = tempfile.TemporaryDirectory(prefix="virtinst-test-search")
|
|
tmpdir = tmpobj.name
|
|
try:
|
|
# path="" should trigger early return
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn, "")
|
|
assert searchdata.uid is None
|
|
# path=None should trigger early return
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn, None)
|
|
assert searchdata.uid is None
|
|
|
|
# Invalid uid
|
|
_set_caps_baselabel_uid(-1)
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
|
|
assert searchdata.uid is None
|
|
|
|
# Use our uid, verify it shows we have expected access
|
|
_set_caps_baselabel_uid(os.getuid())
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn,
|
|
tmpdir + "/footest")
|
|
assert searchdata.uid == os.getuid()
|
|
assert searchdata.fixlist == []
|
|
|
|
# Remove perms on the tmpdir, now it should report failures
|
|
os.chmod(tmpdir, 0o000)
|
|
searchdata = virtinst.DeviceDisk.check_path_search(conn, tmpdir)
|
|
assert searchdata.fixlist == [tmpdir]
|
|
|
|
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
|
|
assert not bool(errdict)
|
|
|
|
# Mock setfacl to definitely fail
|
|
with monkeypatch.context() as m:
|
|
m.setattr("virtinst.diskbackend.SETFACL", "getfacl")
|
|
errdict = virtinst.DeviceDisk.fix_path_search(searchdata)
|
|
|
|
finally:
|
|
# Reset changes we made
|
|
conn.invalidate_caps()
|
|
os.chmod(tmpdir, 0o777)
|
|
|
|
|
|
def test_disk_path_in_use_kernel():
|
|
# Extra tests for DeviceDisk.path_in_use
|
|
conn = utils.URIs.open_kvm()
|
|
|
|
# Comparing against kernel
|
|
vms = virtinst.DeviceDisk.path_in_use_by(
|
|
conn, "/pool-dir/test-arm-kernel")
|
|
assert vms == ["test-arm-kernel"]
|
|
|
|
|
|
def test_disk_diskbackend_misc():
|
|
# Test get_size() with vol_install
|
|
conn = utils.URIs.open_testdefault_cached()
|
|
disk = virtinst.DeviceDisk(conn)
|
|
pool = conn.storagePoolLookupByName("pool-dir")
|
|
vol_install = disk.build_vol_install(conn, "newvol1.img", pool, 1, False)
|
|
disk.set_vol_install(vol_install)
|
|
assert disk.get_size() == 1.0
|
|
|
|
# Test some blockdev inspecting
|
|
conn = utils.URIs.openconn("test:///default")
|
|
if os.path.exists("/dev/loop0"):
|
|
disk = virtinst.DeviceDisk(conn)
|
|
disk.set_source_path("/dev/loop0")
|
|
assert disk.type == "block"
|
|
disk.get_size()
|
|
|
|
# Test sparse cloning
|
|
tmpinput = tempfile.NamedTemporaryFile()
|
|
open(tmpinput.name, "wb").write(b'\0' * 10000)
|
|
|
|
srcdisk = virtinst.DeviceDisk(conn)
|
|
srcdisk.set_source_path(tmpinput.name)
|
|
|
|
newdisk = virtinst.DeviceDisk(conn)
|
|
tmpoutput = tempfile.NamedTemporaryFile()
|
|
os.unlink(tmpoutput.name)
|
|
newdisk.set_source_path(tmpoutput.name)
|
|
newdisk.set_local_disk_to_clone(srcdisk, True)
|
|
newdisk.build_storage(None)
|
|
|
|
# Test cloning onto existing disk
|
|
newdisk = virtinst.DeviceDisk(conn, parsexml=newdisk.get_xml())
|
|
newdisk.set_source_path(newdisk.get_source_path())
|
|
newdisk.set_local_disk_to_clone(srcdisk, True)
|
|
newdisk.build_storage(None)
|
|
|
|
newdisk = virtinst.DeviceDisk(conn)
|
|
newdisk.type = "block"
|
|
newdisk.set_source_path("/dev/foo/idontexist")
|
|
assert newdisk.get_size() == 0
|
|
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
volpath = "/pool-dir/test-clone-simple.img"
|
|
assert virtinst.DeviceDisk.path_definitely_exists(conn, volpath)
|
|
disk = virtinst.DeviceDisk(conn)
|
|
disk.set_source_path(volpath)
|
|
assert disk.get_size()
|
|
|
|
|
|
def test_disk_diskbackend_parse():
|
|
# Test that calling validate() on parsed disk XML doesn't attempt
|
|
# to verify the path exists. Assume it's a working config
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
xml = ("<disk type='file' device='disk'>"
|
|
"<source file='/A/B/C/D/NOPE'/>"
|
|
"</disk>")
|
|
disk = virtinst.DeviceDisk(conn, parsexml=xml)
|
|
disk.validate()
|
|
disk.is_size_conflict()
|
|
disk.build_storage(None)
|
|
assert getattr(disk, "_storage_backend").is_stub() is True
|
|
|
|
# Stub backend coverage testing
|
|
backend = getattr(disk, "_storage_backend")
|
|
assert disk.get_parent_pool() is None
|
|
assert disk.get_vol_object() is None
|
|
assert disk.get_vol_install() is None
|
|
assert disk.get_size() == 0
|
|
assert backend.get_vol_xml() is None
|
|
assert backend.get_dev_type() == "file"
|
|
assert backend.get_driver_type() is None
|
|
assert backend.get_parent_pool() is None
|
|
|
|
disk.set_backend_for_existing_path()
|
|
assert getattr(disk, "_storage_backend").is_stub() is False
|
|
|
|
with pytest.raises(ValueError):
|
|
disk.validate()
|
|
|
|
# Ensure set_backend_for_existing_path resolves a path
|
|
# to its existing storage volume
|
|
xml = ("<disk type='file' device='disk'>"
|
|
"<source file='/pool-dir/default-vol'/>"
|
|
"</disk>")
|
|
disk = virtinst.DeviceDisk(conn, parsexml=xml)
|
|
disk.set_backend_for_existing_path()
|
|
assert disk.get_vol_object()
|
|
|
|
# Verify set_backend_for_existing_path doesn't error
|
|
# for a variety of disks
|
|
dom = conn.lookupByName("test-many-devices")
|
|
guest = virtinst.Guest(conn, parsexml=dom.XMLDesc(0))
|
|
for disk in guest.devices.disk:
|
|
disk.set_backend_for_existing_path()
|
|
|
|
|
|
def test_disk_rbd_path():
|
|
conn = utils.URIs.open_testdriver_cached()
|
|
diskxml1 = """
|
|
<disk type="network" device="disk">
|
|
<source protocol="rbd" name="rbd-sourcename/some-rbd-vol">
|
|
<host name="ceph-mon-1.example.com" port="6789"/>
|
|
<host name="ceph-mon-2.example.com" port="6789"/>
|
|
<host name="ceph-mon-3.example.com" port="6789"/>
|
|
</source>
|
|
<target dev="vdag" bus="virtio"/>
|
|
</disk>
|
|
"""
|
|
|
|
disk1 = virtinst.DeviceDisk(conn, parsexml=diskxml1)
|
|
disk1.set_backend_for_existing_path()
|
|
assert disk1.get_vol_object().name() == "some-rbd-vol"
|