From 425f599c843ab30dec63ba5aa3876ec84c80a21e Mon Sep 17 00:00:00 2001 From: Cole Robinson Date: Thu, 11 Oct 2018 18:52:45 -0400 Subject: [PATCH] device: disk: Move all path search handling to diskbackend Simplify it slightly by combining return output and removing an entry point. Add some additional test cases for it --- tests/xmlconfig.py | 14 ++- virtManager/addstorage.py | 7 +- virtinst/capabilities.py | 31 ++++++ virtinst/cli.py | 7 +- virtinst/devices/disk.py | 197 ++++++++------------------------------ virtinst/diskbackend.py | 116 ++++++++++++++++++++++ 6 files changed, 205 insertions(+), 167 deletions(-) diff --git a/tests/xmlconfig.py b/tests/xmlconfig.py index ba5b6547..26d1780b 100644 --- a/tests/xmlconfig.py +++ b/tests/xmlconfig.py @@ -217,12 +217,22 @@ class TestXMLMisc(unittest.TestCase): def test_dir_searchable(self): # Normally the dir searchable test is skipped in the unittest, # but let's contrive an example that should trigger all the code - from virtinst.devices.disk import _is_dir_searchable + # to ensure it isn't horribly broken + from virtinst import diskbackend oldtest = os.environ.pop("VIRTINST_TEST_SUITE") try: uid = -1 username = "fakeuser-zzzz" with tempfile.TemporaryDirectory() as tmpdir: - self.assertFalse(_is_dir_searchable(uid, username, tmpdir)) + fixlist = diskbackend.is_path_searchable(tmpdir, uid, username) + self.assertTrue(bool(fixlist)) + errdict = diskbackend.set_dirs_searchable(fixlist, username) + self.assertTrue(not bool(errdict)) + + + import getpass + fixlist = diskbackend.is_path_searchable( + os.getcwd(), os.getuid(), getpass.getuser()) + self.assertTrue(not bool(fixlist)) finally: os.environ["VIRTINST_TEST_SUITE"] = oldtest diff --git a/virtManager/addstorage.py b/virtManager/addstorage.py index 17958d0e..18992f78 100644 --- a/virtManager/addstorage.py +++ b/virtManager/addstorage.py @@ -115,9 +115,10 @@ class vmmAddStorage(vmmGObjectUI): @staticmethod def check_path_search(src, conn, path): skip_paths = src.config.get_perms_fix_ignore() - user, broken_paths = virtinst.DeviceDisk.check_path_search( + searchdata = virtinst.DeviceDisk.check_path_search( conn.get_backend(), path) + broken_paths = searchdata.fixlist[:] for p in broken_paths[:]: if p in skip_paths: broken_paths.remove(p) @@ -139,8 +140,8 @@ class vmmAddStorage(vmmGObjectUI): return logging.debug("Attempting to correct permission issues.") - errors = virtinst.DeviceDisk.fix_path_search_for_user( - conn.get_backend(), path, user) + errors = virtinst.DeviceDisk.fix_path_search( + conn.get_backend(), searchdata) if not errors: return diff --git a/virtinst/capabilities.py b/virtinst/capabilities.py index b7d96969..173a6ab4 100644 --- a/virtinst/capabilities.py +++ b/virtinst/capabilities.py @@ -8,6 +8,7 @@ # See the COPYING file in the top-level directory. import logging +import pwd from .domain import DomainCpu from .xmlbuilder import XMLBuilder, XMLChildProperty, XMLProperty @@ -62,6 +63,36 @@ class _CapsHost(XMLBuilder): cpu = XMLChildProperty(_CapsCPU, is_single=True) topology = XMLChildProperty(_CapsTopology, is_single=True) + def get_qemu_baselabel(self): + for secmodel in self.secmodels: + if secmodel.model != "dac": + continue + + label = None + for baselabel in secmodel.baselabels: + if baselabel.type in ["qemu", "kvm"]: + label = baselabel.content + break + if not label: + continue + + # XML we are looking at is like: + # + # + # dac + # 0 + # +107:+107 + # +107:+107 + # + try: + uid = int(label.split(":")[0].replace("+", "")) + user = pwd.getpwuid(uid)[0] + return user, uid + except Exception: + logging.debug("Exception parsing qemu dac baselabel=%s", + label, exc_info=True) + return None, None + ################################ # and parsers # diff --git a/virtinst/cli.py b/virtinst/cli.py index f071462e..ac76edf0 100644 --- a/virtinst/cli.py +++ b/virtinst/cli.py @@ -344,12 +344,13 @@ def validate_disk(dev, warn_overwrite=False): _optional_fail(errmsg, "disk_size", warn_on_skip=False) def check_path_search(dev): - user, broken_paths = dev.check_path_search(dev.conn, dev.path) - if not broken_paths: + searchdata = dev.check_path_search(dev.conn, dev.path) + if not searchdata.fixlist: return logging.warning(_("%s may not be accessible by the hypervisor. " "You will need to grant the '%s' user search permissions for " - "the following directories: %s"), dev.path, user, broken_paths) + "the following directories: %s"), + dev.path, searchdata.username, searchdata.fixlist) check_path_exists(dev) check_inuse_conflict(dev) diff --git a/virtinst/devices/disk.py b/virtinst/devices/disk.py index 6b65cbdc..26b4a489 100644 --- a/virtinst/devices/disk.py +++ b/virtinst/devices/disk.py @@ -7,12 +7,8 @@ # This work is licensed under the GNU GPLv2 or later. # See the COPYING file in the top-level directory. -import os -import stat -import pwd -import subprocess import logging -import re +import os from .. import diskbackend from .. import util @@ -33,47 +29,6 @@ def _qemu_sanitize_drvtype(phystype, fmt): return fmt -def _is_dir_searchable(uid, username, path): - """ - Check if passed directory is searchable by uid - """ - if "VIRTINST_TEST_SUITE" in os.environ: - return True - - try: - statinfo = os.stat(path) - except OSError: - return False - - if uid == statinfo.st_uid: - flag = stat.S_IXUSR - elif uid == statinfo.st_gid: - flag = stat.S_IXGRP - else: - flag = stat.S_IXOTH - - if bool(statinfo.st_mode & flag): - return True - - # Check POSIX ACL (since that is what we use to 'fix' access) - cmd = ["getfacl", path] - try: - proc = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = proc.communicate() - except OSError: - logging.debug("Didn't find the getfacl command.") - return False - - if proc.returncode != 0: - logging.debug("Cmd '%s' failed: %s", cmd, err) - return False - - pattern = "user:%s:..x" % username - return bool(re.search(pattern.encode("utf-8", "replace"), out)) - - class _Host(XMLBuilder): _XML_PROP_ORDER = ["name", "port"] XML_NAME = "host" @@ -211,131 +166,55 @@ class DeviceDisk(Device): return False - @staticmethod - def check_path_search_for_user(conn, path, username): - """ - Check if the passed user has search permissions for all the - directories in the disk path. - - :returns: List of the directories the user cannot search, or empty list - """ - if path is None: - return [] - if conn.is_remote(): - return [] - if username == "root": - return [] - if diskbackend.path_is_url(path): - return [] - - try: - # Get UID for string name - uid = pwd.getpwnam(username)[2] - except Exception as e: - logging.debug("Error looking up username: %s", str(e)) - return [] - - fixlist = [] - - if os.path.isdir(path): - dirname = path - base = "-" - else: - dirname, base = os.path.split(path) - - while base: - if not _is_dir_searchable(uid, username, dirname): - fixlist.append(dirname) - - dirname, base = os.path.split(dirname) - - return fixlist - @staticmethod def check_path_search(conn, path): - # Only works for qemu and DAC - if conn.is_remote() or not conn.is_qemu_system(): - return None, [] + """ + Check if the connection DAC user has search permissions for all the + directories in the passed path. - user = None - try: - for secmodel in conn.caps.host.secmodels: - if secmodel.model != "dac": - continue + :returns: Class with: + - List of the directories the user cannot search, or empty list + - username we checked for or None if not applicable + - uid we checked for or None if not application + """ + class SearchData(object): + def __init__(self): + self.user = None + self.uid = None + self.fixlist = [] - label = None - for baselabel in secmodel.baselabels: - if baselabel.type in ["qemu", "kvm"]: - label = baselabel.content - break - if not label: - continue + searchdata = SearchData() + if path is None: + return searchdata + if conn.is_remote(): + return searchdata + if not conn.is_qemu_system(): + return searchdata + if diskbackend.path_is_url(path): + return searchdata - pwuid = pwd.getpwuid( - int(label.split(":")[0].replace("+", ""))) - if pwuid: - user = pwuid[0] - except Exception: - logging.debug("Exception grabbing qemu DAC user", exc_info=True) - return None, [] + user, uid = conn.caps.host.get_qemu_baselabel() + if not user: + return searchdata + if uid == 0: + return searchdata - return user, DeviceDisk.check_path_search_for_user(conn, path, user) + searchdata.user = user + searchdata.uid = uid + searchdata.fixlist = diskbackend.is_path_searchable(path, uid, user) + searchdata.fixlist.reverse() + return searchdata @staticmethod - def fix_path_search_for_user(conn, path, username): + def fix_path_search(conn, searchdata): """ - Try to fix any permission problems found by check_path_search_for_user + Try to fix any permission problems found by check_path_search :returns: Return a dictionary of entries {broken path : error msg} """ - def fix_perms(dirname, useacl=True): - if useacl: - cmd = ["setfacl", "--modify", "user:%s:x" % username, dirname] - proc = subprocess.Popen(cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - out, err = proc.communicate() - - logging.debug("Ran command '%s'", cmd) - if out or err: - logging.debug("out=%s\nerr=%s", out, err) - - if proc.returncode != 0: - raise ValueError(err) - else: - logging.debug("Setting +x on %s", dirname) - mode = os.stat(dirname).st_mode - newmode = mode | stat.S_IXOTH - os.chmod(dirname, newmode) - if os.stat(dirname).st_mode != newmode: - # Trying to change perms on vfat at least doesn't work - # but also doesn't seem to error. Try and detect that - raise ValueError(_("Permissions on '%s' did not stick") % - dirname) - - fixlist = DeviceDisk.check_path_search_for_user(conn, path, username) - if not fixlist: - return [] - - fixlist.reverse() - errdict = {} - - useacl = True - for dirname in fixlist: - try: - try: - fix_perms(dirname, useacl) - except Exception: - # If acl fails, fall back to chmod and retry - if not useacl: - raise - useacl = False - - logging.debug("setfacl failed, trying old fashioned way") - fix_perms(dirname, useacl) - except Exception as e: - errdict[dirname] = str(e) - + ignore = conn + errdict = diskbackend.set_dirs_searchable( + searchdata.fixlist, searchdata.username) return errdict @staticmethod diff --git a/virtinst/diskbackend.py b/virtinst/diskbackend.py index 877eaec6..d81c7a80 100644 --- a/virtinst/diskbackend.py +++ b/virtinst/diskbackend.py @@ -10,6 +10,7 @@ import logging import os import re import stat +import subprocess import libvirt @@ -222,6 +223,121 @@ def _get_dev_type(path, vol_xml, vol_object, pool_xml, remote): return "file" +######################### +# ACL/path perm helpers # +######################### + +def _fix_perms_acl(dirname, username): + cmd = ["setfacl", "--modify", "user:%s:x" % username, dirname] + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = proc.communicate() + + logging.debug("Ran command '%s'", cmd) + if out or err: + logging.debug("out=%s\nerr=%s", out, err) + + if proc.returncode != 0: + raise ValueError(err) + + +def _fix_perms_chmod(dirname): + logging.debug("Setting +x on %s", dirname) + mode = os.stat(dirname).st_mode + newmode = mode | stat.S_IXOTH + os.chmod(dirname, newmode) + if os.stat(dirname).st_mode != newmode: + # Trying to change perms on vfat at least doesn't work + # but also doesn't seem to error. Try and detect that + raise ValueError(_("Permissions on '%s' did not stick") % + dirname) + + +def set_dirs_searchable(dirlist, username): + useacl = True + errdict = {} + for dirname in dirlist: + if useacl: + try: + _fix_perms_acl(dirname, username) + continue + except Exception as e: + logging.debug("setfacl failed: %s", e) + logging.debug("trying chmod") + useacl = False + + try: + # If we reach here, ACL setting failed, try chmod + _fix_perms_chmod(dirname) + except Exception as e: + errdict[dirname] = str(e) + + return errdict + + +def _is_dir_searchable(dirname, uid, username): + """ + Check if passed directory is searchable by uid + """ + if "VIRTINST_TEST_SUITE" in os.environ: + return True + + try: + statinfo = os.stat(dirname) + except OSError: + return False + + if uid == statinfo.st_uid: + flag = stat.S_IXUSR + elif uid == statinfo.st_gid: + flag = stat.S_IXGRP + else: + flag = stat.S_IXOTH + + if bool(statinfo.st_mode & flag): + return True + + # Check POSIX ACL (since that is what we use to 'fix' access) + cmd = ["getfacl", dirname] + try: + proc = subprocess.Popen(cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + out, err = proc.communicate() + except OSError: + logging.debug("Didn't find the getfacl command.") + return False + + if proc.returncode != 0: + logging.debug("Cmd '%s' failed: %s", cmd, err) + return False + + pattern = "user:%s:..x" % username + return bool(re.search(pattern.encode("utf-8", "replace"), out)) + + +def is_path_searchable(path, uid, username): + """ + Check each dir component of the passed path, see if they are + searchable by the uid/username, and return a list of paths + which aren't searchable + """ + if os.path.isdir(path): + dirname = path + base = "-" + else: + dirname, base = os.path.split(path) + + fixlist = [] + while base: + if not _is_dir_searchable(dirname, uid, username): + fixlist.append(dirname) + dirname, base = os.path.split(dirname) + + return fixlist + + ############################################## # Classes for tracking storage media details # ##############################################