238 lines
7.9 KiB
Python
238 lines
7.9 KiB
Python
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
|
|
# MA 02110-1301 USA.
|
|
|
|
import os
|
|
import unittest
|
|
|
|
import virtinst.Storage
|
|
from virtinst.Storage import StoragePool, StorageVolume
|
|
|
|
from tests import utils
|
|
|
|
# pylint: disable=W0212
|
|
# Access to protected member, needed to unittest stuff
|
|
|
|
basepath = os.path.join(os.getcwd(), "tests", "storage-xml")
|
|
|
|
|
|
def generate_uuid_from_string(msg):
|
|
|
|
res = msg.split("-", 1)
|
|
if len(res) > 1:
|
|
# Split off common prefix
|
|
msg = res[1]
|
|
|
|
numstr = ""
|
|
for c in msg:
|
|
numstr += str(ord(c))
|
|
|
|
numstr *= 32
|
|
numstr = numstr[0:32]
|
|
|
|
return numstr
|
|
|
|
|
|
def _findFreePoolName(conn, namebase):
|
|
|
|
i = 0
|
|
while True:
|
|
poolname = namebase + "-%d" % i
|
|
try:
|
|
StorageVolume.lookup_pool_by_name(conn=conn, pool_name=poolname)
|
|
i += 1
|
|
except:
|
|
return poolname
|
|
|
|
|
|
def createPool(conn, ptype, poolname=None, fmt=None, target_path=None,
|
|
source_path=None, source_name=None, uuid=None, iqn=None):
|
|
poolclass = StoragePool.get_pool_class(ptype)
|
|
|
|
if poolname is None:
|
|
poolname = _findFreePoolName(conn, str(ptype) + "-pool")
|
|
|
|
if uuid is None:
|
|
uuid = generate_uuid_from_string(poolname)
|
|
|
|
pool_inst = poolclass(conn=conn, name=poolname, uuid=uuid)
|
|
|
|
if hasattr(pool_inst, "host"):
|
|
pool_inst.host = "some.random.hostname"
|
|
if hasattr(pool_inst, "source_path"):
|
|
pool_inst.source_path = source_path or "/some/source/path"
|
|
if hasattr(pool_inst, "target_path"):
|
|
pool_inst.target_path = target_path or "/some/target/path"
|
|
if fmt and hasattr(pool_inst, "format"):
|
|
pool_inst.format = fmt
|
|
if source_name and hasattr(pool_inst, "source_name"):
|
|
pool_inst.source_name = source_name
|
|
if iqn and hasattr(pool_inst, "iqn"):
|
|
pool_inst.iqn = iqn
|
|
|
|
return poolCompare(pool_inst)
|
|
|
|
|
|
def poolCompare(pool_inst):
|
|
filename = os.path.join(basepath, pool_inst.name + ".xml")
|
|
out_expect = pool_inst.get_xml_config()
|
|
|
|
if not os.path.exists(filename):
|
|
open(filename, "w").write(out_expect)
|
|
utils.diff_compare(out_expect, filename)
|
|
|
|
return pool_inst.install(build=True, meter=None, create=True)
|
|
|
|
|
|
def createVol(poolobj, volname=None, input_vol=None, clone_vol=None):
|
|
volclass = StorageVolume.get_volume_for_pool(pool_object=poolobj)
|
|
|
|
if volname is None:
|
|
volname = poolobj.name() + "-vol"
|
|
|
|
alloc = 5 * 1024 * 1024 * 1024
|
|
cap = 10 * 1024 * 1024 * 1024
|
|
vol_inst = volclass(name=volname, capacity=cap, allocation=alloc,
|
|
pool=poolobj)
|
|
|
|
perms = {}
|
|
perms["mode"] = 0700
|
|
perms["owner"] = 10736
|
|
perms["group"] = 10736
|
|
|
|
vol_inst.perms = perms
|
|
if input_vol or clone_vol:
|
|
if not virtinst.Storage.is_create_vol_from_supported(poolobj._conn):
|
|
return
|
|
|
|
if input_vol:
|
|
vol_inst.input_vol = input_vol
|
|
elif clone_vol:
|
|
vol_inst = virtinst.Storage.CloneVolume(volname, clone_vol)
|
|
|
|
filename = os.path.join(basepath, vol_inst.name + ".xml")
|
|
|
|
# Make sure permissions are properly set
|
|
utils.diff_compare(vol_inst.get_xml_config(), filename)
|
|
|
|
return vol_inst.install(meter=False)
|
|
|
|
|
|
class TestStorage(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.conn = utils.open_testdefault()
|
|
|
|
def testDirPool(self):
|
|
poolobj = createPool(self.conn, StoragePool.TYPE_DIR, "pool-dir")
|
|
invol = createVol(poolobj)
|
|
createVol(poolobj, volname=invol.name() + "input", input_vol=invol)
|
|
createVol(poolobj, volname=invol.name() + "clone", clone_vol=invol)
|
|
|
|
def testFSPool(self):
|
|
poolobj = createPool(self.conn, StoragePool.TYPE_FS, "pool-fs")
|
|
invol = createVol(poolobj)
|
|
createVol(poolobj, volname=invol.name() + "input", input_vol=invol)
|
|
createVol(poolobj, volname=invol.name() + "clone", clone_vol=invol)
|
|
|
|
def testNetFSPool(self):
|
|
poolobj = createPool(self.conn, StoragePool.TYPE_NETFS, "pool-netfs")
|
|
invol = createVol(poolobj)
|
|
createVol(poolobj, volname=invol.name() + "input", input_vol=invol)
|
|
createVol(poolobj, volname=invol.name() + "clone", clone_vol=invol)
|
|
|
|
def testLVPool(self):
|
|
poolobj = createPool(self.conn, StoragePool.TYPE_LOGICAL,
|
|
"pool-logical")
|
|
invol = createVol(poolobj)
|
|
createVol(poolobj, volname=invol.name() + "input", input_vol=invol)
|
|
createVol(poolobj, volname=invol.name() + "clone", clone_vol=invol)
|
|
|
|
# Test parsing source name for target path
|
|
createPool(self.conn, StoragePool.TYPE_LOGICAL,
|
|
"pool-logical-target-srcname",
|
|
target_path="/dev/vgfoobar")
|
|
|
|
# Test with source name
|
|
createPool(self.conn, StoragePool.TYPE_LOGICAL, "pool-logical-srcname",
|
|
source_name="vgname")
|
|
|
|
# Test creating with many devices
|
|
createPool(self.conn, StoragePool.TYPE_LOGICAL, "pool-logical-manydev",
|
|
source_path=["/tmp/path1", "/tmp/path2", "/tmp/path3"],
|
|
target_path=None)
|
|
|
|
def testDiskPool(self):
|
|
poolobj = createPool(self.conn, StoragePool.TYPE_DISK,
|
|
"pool-disk", fmt="dos")
|
|
invol = createVol(poolobj)
|
|
createVol(poolobj, volname=invol.name() + "input", input_vol=invol)
|
|
createVol(poolobj, volname=invol.name() + "clone", clone_vol=invol)
|
|
|
|
def testISCSIPool(self):
|
|
poolobj = createPool(self.conn, StoragePool.TYPE_ISCSI, "pool-iscsi")
|
|
# Not supported
|
|
#volobj = createVol(poolobj)
|
|
self.assertRaises(RuntimeError, createVol, poolobj)
|
|
|
|
createPool(self.conn, StoragePool.TYPE_ISCSI, "pool-iscsi-iqn",
|
|
iqn="foo.bar.baz.iqn")
|
|
|
|
def testSCSIPool(self):
|
|
poolobj = createPool(self.conn, StoragePool.TYPE_SCSI, "pool-scsi")
|
|
# Not supported
|
|
#volobj = createVol(poolobj)
|
|
self.assertRaises(RuntimeError, createVol, poolobj)
|
|
|
|
def testMpathPool(self):
|
|
poolobj = createPool(self.conn, StoragePool.TYPE_MPATH, "pool-mpath")
|
|
# Not supported
|
|
#volobj = createVol(poolobj)
|
|
self.assertRaises(RuntimeError, createVol, poolobj)
|
|
|
|
def _enumerateCompare(self, pool_list):
|
|
for pool in pool_list:
|
|
pool.name = pool.name + str(pool_list.index(pool))
|
|
pool.uuid = generate_uuid_from_string(pool.name)
|
|
poolCompare(pool)
|
|
|
|
def testEnumerateLogical(self):
|
|
name = "pool-logical-list"
|
|
|
|
lst = StoragePool.pool_list_from_sources(self.conn, name,
|
|
StoragePool.TYPE_LOGICAL)
|
|
self._enumerateCompare(lst)
|
|
|
|
def testEnumerateNetFS(self):
|
|
name = "pool-netfs-list"
|
|
host = "example.com"
|
|
|
|
lst = StoragePool.pool_list_from_sources(self.conn, name,
|
|
StoragePool.TYPE_NETFS,
|
|
host=host)
|
|
self._enumerateCompare(lst)
|
|
|
|
def testEnumerateiSCSI(self):
|
|
name = "pool-iscsi-list"
|
|
host = "example.com"
|
|
|
|
lst = StoragePool.pool_list_from_sources(self.conn, name,
|
|
StoragePool.TYPE_ISCSI,
|
|
host=host)
|
|
self.assertTrue(len(lst) == 0)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|