device: disk: Move all path search handling to diskbackend

Simplify it slightly by combining return output and removing
an entry point. Add some additional test cases for it
This commit is contained in:
Cole Robinson 2018-10-11 18:52:45 -04:00
parent fb1e63c100
commit 425f599c84
6 changed files with 205 additions and 167 deletions

View File

@ -217,12 +217,22 @@ class TestXMLMisc(unittest.TestCase):
def test_dir_searchable(self):
# Normally the dir searchable test is skipped in the unittest,
# but let's contrive an example that should trigger all the code
from virtinst.devices.disk import _is_dir_searchable
# to ensure it isn't horribly broken
from virtinst import diskbackend
oldtest = os.environ.pop("VIRTINST_TEST_SUITE")
try:
uid = -1
username = "fakeuser-zzzz"
with tempfile.TemporaryDirectory() as tmpdir:
self.assertFalse(_is_dir_searchable(uid, username, tmpdir))
fixlist = diskbackend.is_path_searchable(tmpdir, uid, username)
self.assertTrue(bool(fixlist))
errdict = diskbackend.set_dirs_searchable(fixlist, username)
self.assertTrue(not bool(errdict))
import getpass
fixlist = diskbackend.is_path_searchable(
os.getcwd(), os.getuid(), getpass.getuser())
self.assertTrue(not bool(fixlist))
finally:
os.environ["VIRTINST_TEST_SUITE"] = oldtest

View File

@ -115,9 +115,10 @@ class vmmAddStorage(vmmGObjectUI):
@staticmethod
def check_path_search(src, conn, path):
skip_paths = src.config.get_perms_fix_ignore()
user, broken_paths = virtinst.DeviceDisk.check_path_search(
searchdata = virtinst.DeviceDisk.check_path_search(
conn.get_backend(), path)
broken_paths = searchdata.fixlist[:]
for p in broken_paths[:]:
if p in skip_paths:
broken_paths.remove(p)
@ -139,8 +140,8 @@ class vmmAddStorage(vmmGObjectUI):
return
logging.debug("Attempting to correct permission issues.")
errors = virtinst.DeviceDisk.fix_path_search_for_user(
conn.get_backend(), path, user)
errors = virtinst.DeviceDisk.fix_path_search(
conn.get_backend(), searchdata)
if not errors:
return

View File

@ -8,6 +8,7 @@
# See the COPYING file in the top-level directory.
import logging
import pwd
from .domain import DomainCpu
from .xmlbuilder import XMLBuilder, XMLChildProperty, XMLProperty
@ -62,6 +63,36 @@ class _CapsHost(XMLBuilder):
cpu = XMLChildProperty(_CapsCPU, is_single=True)
topology = XMLChildProperty(_CapsTopology, is_single=True)
def get_qemu_baselabel(self):
for secmodel in self.secmodels:
if secmodel.model != "dac":
continue
label = None
for baselabel in secmodel.baselabels:
if baselabel.type in ["qemu", "kvm"]:
label = baselabel.content
break
if not label:
continue
# XML we are looking at is like:
#
# <secmodel>
# <model>dac</model>
# <doi>0</doi>
# <baselabel type='kvm'>+107:+107</baselabel>
# <baselabel type='qemu'>+107:+107</baselabel>
# </secmodel>
try:
uid = int(label.split(":")[0].replace("+", ""))
user = pwd.getpwuid(uid)[0]
return user, uid
except Exception:
logging.debug("Exception parsing qemu dac baselabel=%s",
label, exc_info=True)
return None, None
################################
# <guest> and <domain> parsers #

View File

@ -344,12 +344,13 @@ def validate_disk(dev, warn_overwrite=False):
_optional_fail(errmsg, "disk_size", warn_on_skip=False)
def check_path_search(dev):
user, broken_paths = dev.check_path_search(dev.conn, dev.path)
if not broken_paths:
searchdata = dev.check_path_search(dev.conn, dev.path)
if not searchdata.fixlist:
return
logging.warning(_("%s may not be accessible by the hypervisor. "
"You will need to grant the '%s' user search permissions for "
"the following directories: %s"), dev.path, user, broken_paths)
"the following directories: %s"),
dev.path, searchdata.username, searchdata.fixlist)
check_path_exists(dev)
check_inuse_conflict(dev)

View File

@ -7,12 +7,8 @@
# This work is licensed under the GNU GPLv2 or later.
# See the COPYING file in the top-level directory.
import os
import stat
import pwd
import subprocess
import logging
import re
import os
from .. import diskbackend
from .. import util
@ -33,47 +29,6 @@ def _qemu_sanitize_drvtype(phystype, fmt):
return fmt
def _is_dir_searchable(uid, username, path):
"""
Check if passed directory is searchable by uid
"""
if "VIRTINST_TEST_SUITE" in os.environ:
return True
try:
statinfo = os.stat(path)
except OSError:
return False
if uid == statinfo.st_uid:
flag = stat.S_IXUSR
elif uid == statinfo.st_gid:
flag = stat.S_IXGRP
else:
flag = stat.S_IXOTH
if bool(statinfo.st_mode & flag):
return True
# Check POSIX ACL (since that is what we use to 'fix' access)
cmd = ["getfacl", path]
try:
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
except OSError:
logging.debug("Didn't find the getfacl command.")
return False
if proc.returncode != 0:
logging.debug("Cmd '%s' failed: %s", cmd, err)
return False
pattern = "user:%s:..x" % username
return bool(re.search(pattern.encode("utf-8", "replace"), out))
class _Host(XMLBuilder):
_XML_PROP_ORDER = ["name", "port"]
XML_NAME = "host"
@ -211,131 +166,55 @@ class DeviceDisk(Device):
return False
@staticmethod
def check_path_search_for_user(conn, path, username):
"""
Check if the passed user has search permissions for all the
directories in the disk path.
:returns: List of the directories the user cannot search, or empty list
"""
if path is None:
return []
if conn.is_remote():
return []
if username == "root":
return []
if diskbackend.path_is_url(path):
return []
try:
# Get UID for string name
uid = pwd.getpwnam(username)[2]
except Exception as e:
logging.debug("Error looking up username: %s", str(e))
return []
fixlist = []
if os.path.isdir(path):
dirname = path
base = "-"
else:
dirname, base = os.path.split(path)
while base:
if not _is_dir_searchable(uid, username, dirname):
fixlist.append(dirname)
dirname, base = os.path.split(dirname)
return fixlist
@staticmethod
def check_path_search(conn, path):
# Only works for qemu and DAC
if conn.is_remote() or not conn.is_qemu_system():
return None, []
"""
Check if the connection DAC user has search permissions for all the
directories in the passed path.
user = None
try:
for secmodel in conn.caps.host.secmodels:
if secmodel.model != "dac":
continue
:returns: Class with:
- List of the directories the user cannot search, or empty list
- username we checked for or None if not applicable
- uid we checked for or None if not application
"""
class SearchData(object):
def __init__(self):
self.user = None
self.uid = None
self.fixlist = []
label = None
for baselabel in secmodel.baselabels:
if baselabel.type in ["qemu", "kvm"]:
label = baselabel.content
break
if not label:
continue
searchdata = SearchData()
if path is None:
return searchdata
if conn.is_remote():
return searchdata
if not conn.is_qemu_system():
return searchdata
if diskbackend.path_is_url(path):
return searchdata
pwuid = pwd.getpwuid(
int(label.split(":")[0].replace("+", "")))
if pwuid:
user = pwuid[0]
except Exception:
logging.debug("Exception grabbing qemu DAC user", exc_info=True)
return None, []
user, uid = conn.caps.host.get_qemu_baselabel()
if not user:
return searchdata
if uid == 0:
return searchdata
return user, DeviceDisk.check_path_search_for_user(conn, path, user)
searchdata.user = user
searchdata.uid = uid
searchdata.fixlist = diskbackend.is_path_searchable(path, uid, user)
searchdata.fixlist.reverse()
return searchdata
@staticmethod
def fix_path_search_for_user(conn, path, username):
def fix_path_search(conn, searchdata):
"""
Try to fix any permission problems found by check_path_search_for_user
Try to fix any permission problems found by check_path_search
:returns: Return a dictionary of entries {broken path : error msg}
"""
def fix_perms(dirname, useacl=True):
if useacl:
cmd = ["setfacl", "--modify", "user:%s:x" % username, dirname]
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
logging.debug("Ran command '%s'", cmd)
if out or err:
logging.debug("out=%s\nerr=%s", out, err)
if proc.returncode != 0:
raise ValueError(err)
else:
logging.debug("Setting +x on %s", dirname)
mode = os.stat(dirname).st_mode
newmode = mode | stat.S_IXOTH
os.chmod(dirname, newmode)
if os.stat(dirname).st_mode != newmode:
# Trying to change perms on vfat at least doesn't work
# but also doesn't seem to error. Try and detect that
raise ValueError(_("Permissions on '%s' did not stick") %
dirname)
fixlist = DeviceDisk.check_path_search_for_user(conn, path, username)
if not fixlist:
return []
fixlist.reverse()
errdict = {}
useacl = True
for dirname in fixlist:
try:
try:
fix_perms(dirname, useacl)
except Exception:
# If acl fails, fall back to chmod and retry
if not useacl:
raise
useacl = False
logging.debug("setfacl failed, trying old fashioned way")
fix_perms(dirname, useacl)
except Exception as e:
errdict[dirname] = str(e)
ignore = conn
errdict = diskbackend.set_dirs_searchable(
searchdata.fixlist, searchdata.username)
return errdict
@staticmethod

View File

@ -10,6 +10,7 @@ import logging
import os
import re
import stat
import subprocess
import libvirt
@ -222,6 +223,121 @@ def _get_dev_type(path, vol_xml, vol_object, pool_xml, remote):
return "file"
#########################
# ACL/path perm helpers #
#########################
def _fix_perms_acl(dirname, username):
cmd = ["setfacl", "--modify", "user:%s:x" % username, dirname]
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
logging.debug("Ran command '%s'", cmd)
if out or err:
logging.debug("out=%s\nerr=%s", out, err)
if proc.returncode != 0:
raise ValueError(err)
def _fix_perms_chmod(dirname):
logging.debug("Setting +x on %s", dirname)
mode = os.stat(dirname).st_mode
newmode = mode | stat.S_IXOTH
os.chmod(dirname, newmode)
if os.stat(dirname).st_mode != newmode:
# Trying to change perms on vfat at least doesn't work
# but also doesn't seem to error. Try and detect that
raise ValueError(_("Permissions on '%s' did not stick") %
dirname)
def set_dirs_searchable(dirlist, username):
useacl = True
errdict = {}
for dirname in dirlist:
if useacl:
try:
_fix_perms_acl(dirname, username)
continue
except Exception as e:
logging.debug("setfacl failed: %s", e)
logging.debug("trying chmod")
useacl = False
try:
# If we reach here, ACL setting failed, try chmod
_fix_perms_chmod(dirname)
except Exception as e:
errdict[dirname] = str(e)
return errdict
def _is_dir_searchable(dirname, uid, username):
"""
Check if passed directory is searchable by uid
"""
if "VIRTINST_TEST_SUITE" in os.environ:
return True
try:
statinfo = os.stat(dirname)
except OSError:
return False
if uid == statinfo.st_uid:
flag = stat.S_IXUSR
elif uid == statinfo.st_gid:
flag = stat.S_IXGRP
else:
flag = stat.S_IXOTH
if bool(statinfo.st_mode & flag):
return True
# Check POSIX ACL (since that is what we use to 'fix' access)
cmd = ["getfacl", dirname]
try:
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = proc.communicate()
except OSError:
logging.debug("Didn't find the getfacl command.")
return False
if proc.returncode != 0:
logging.debug("Cmd '%s' failed: %s", cmd, err)
return False
pattern = "user:%s:..x" % username
return bool(re.search(pattern.encode("utf-8", "replace"), out))
def is_path_searchable(path, uid, username):
"""
Check each dir component of the passed path, see if they are
searchable by the uid/username, and return a list of paths
which aren't searchable
"""
if os.path.isdir(path):
dirname = path
base = "-"
else:
dirname, base = os.path.split(path)
fixlist = []
while base:
if not _is_dir_searchable(dirname, uid, username):
fixlist.append(dirname)
dirname, base = os.path.split(dirname)
return fixlist
##############################################
# Classes for tracking storage media details #
##############################################