virtinst: Add a bunch of # pragma: no cover

This commit is contained in:
Cole Robinson 2019-06-10 14:15:50 -04:00
parent ffcf713e06
commit f056798918
11 changed files with 77 additions and 81 deletions

View File

@ -48,6 +48,10 @@ class TestCapabilities(unittest.TestCase):
test_utils(caps_with_kvm, True, True)
test_utils(caps_no_kvm, True, False)
# Small test for extra unittest coverage
with self.assertRaises(ValueError):
caps_empty.guest_lookup(os_type="linux")
def testCapsNuma(self):
cells = self._buildCaps("lxc.xml").host.topology.cells
self.assertEqual(len(cells), 1)

View File

@ -17,7 +17,7 @@ def _setup_i18n():
try:
locale.setlocale(locale.LC_ALL, '')
except Exception:
except Exception: # pragma: no cover
# Can happen if user passed a bogus LANG
pass

View File

@ -73,7 +73,7 @@ class _CapsHost(XMLBuilder):
label = baselabel.content
break
if not label:
continue
continue # pragma: no cover
# XML we are looking at is like:
#
@ -87,10 +87,10 @@ class _CapsHost(XMLBuilder):
uid = int(label.split(":")[0].replace("+", ""))
user = pwd.getpwuid(uid)[0]
return user, uid
except Exception:
except Exception: # pragma: no cover
logging.debug("Exception parsing qemu dac baselabel=%s",
label, exc_info=True)
return None, None
return None, None # pragma: no cover
################################
@ -158,13 +158,9 @@ class _CapsGuest(XMLBuilder):
"""
Return True if kvm guests can be installed
"""
if self.os_type != "hvm":
return False
for d in self.domains:
if d.hypervisor_type == "kvm":
return True
return False
def supports_pae(self):
@ -225,9 +221,6 @@ class Capabilities(XMLBuilder):
############################
def _guestForOSType(self, os_type, arch):
if self.host is None:
return None
archs = [arch]
if arch is None:
archs = [self.host.cpu.arch, None]

View File

@ -88,14 +88,14 @@ class VirtHelpFormatter(argparse.RawDescriptionHelpFormatter):
self, *args, **kwargs)
if len(kwargs) != 0 and len(args) != 2:
return return_default()
return return_default() # pragma: no cover
try:
text = args[0]
if "\n" in text:
return text.splitlines()
return return_default()
except Exception:
except Exception: # pragma: no cover
return return_default()
@ -123,13 +123,13 @@ def setupLogging(appname, debug_stdout, do_quiet, cli_app=True):
_reset_global_state()
get_global_state().quiet = do_quiet
vi_dir = None
logfile = None
if not in_testsuite():
vi_dir = VirtinstConnection.get_app_cache_dir()
logfile = os.path.join(vi_dir, appname + ".log")
vi_dir = VirtinstConnection.get_app_cache_dir()
logfile = os.path.join(vi_dir, appname + ".log")
if in_testsuite():
vi_dir = None
logfile = None
try:
try: # pragma: no cover
if vi_dir and not os.access(vi_dir, os.W_OK):
if os.path.exists(vi_dir):
raise RuntimeError("No write access to directory %s" % vi_dir)
@ -144,11 +144,10 @@ def setupLogging(appname, debug_stdout, do_quiet, cli_app=True):
os.path.exists(logfile) and
not os.access(logfile, os.W_OK)):
raise RuntimeError("No write access to logfile %s" % logfile)
except Exception as e:
except Exception as e: # pragma: no cover
logging.warning("Error setting up logfile: %s", e)
logfile = None
dateFormat = "%a, %d %b %Y %H:%M:%S"
fileFormat = ("[%(asctime)s " + appname + " %(process)d] "
"%(levelname)s (%(module)s:%(lineno)d) %(message)s")
@ -174,20 +173,21 @@ def setupLogging(appname, debug_stdout, do_quiet, cli_app=True):
streamHandler.setFormatter(logging.Formatter(fileFormat,
dateFormat))
elif cli_app or not logfile:
# Have cli tools show WARN/ERROR by default
if get_global_state().quiet:
level = logging.ERROR
else:
level = logging.WARN
streamHandler.setLevel(level)
streamHandler.setFormatter(logging.Formatter(streamErrorFormat))
else:
else: # pragma: no cover
streamHandler = None
if streamHandler:
rootLogger.addHandler(streamHandler)
# Log uncaught exceptions
def exception_log(typ, val, tb):
def exception_log(typ, val, tb): # pragma: no cover
logging.debug("Uncaught exception:\n%s",
"".join(traceback.format_exception(typ, val, tb)))
if not debug_stdout:
@ -272,11 +272,6 @@ def _fail_exit():
sys.exit(1)
def nice_exit():
print_stdout(_("Exiting at user request."))
sys.exit(0)
def virsh_start_cmd(guest):
return ("virsh --connect %s start %s" % (guest.conn.uri, guest.name))
@ -305,7 +300,7 @@ def check_path_search(conn, path):
logging.warning(_("%s may not be accessible by the hypervisor. "
"You will need to grant the '%s' user search permissions for "
"the following directories: %s"),
path, searchdata.user, searchdata.fixlist)
path, searchdata.user, searchdata.fixlist) # pragma: no cover
def validate_disk(dev, warn_overwrite=False):
@ -370,8 +365,9 @@ def _run_console(domain, args):
if child:
return child
os.execvp(args[0], args)
os._exit(1) # pylint: disable=protected-access
os.execvp(args[0], args) # pragma: no cover
# pylint: disable=protected-access
os._exit(1) # pragma no cover
def _gfx_console(guest, domain):
@ -413,7 +409,7 @@ def connect_console(guest, domain, consolecb, wait, destroy_on_exit):
# If we connected the console, wait for it to finish
try:
os.waitpid(child, 0)
except OSError as e:
except OSError as e: # pragma: no cover
logging.debug("waitpid error: %s", e)
if destroy_on_exit and domain.isActive():
@ -971,11 +967,12 @@ class _VirtCLIArgumentStatic(object):
self._aliases = []
if not self.propname and not self.cb:
raise RuntimeError(
raise RuntimeError( # pragma: no cover
"programming error: propname or cb must be specified.")
if not self.propname and self.lookup_cb == -1:
raise RuntimeError("programming error: "
raise RuntimeError( # pragma: no cover
"programming error: "
"cliname=%s propname is None but lookup_cb is not specified. "
"Even if a 'cb' is passed, 'propname' is still used for "
"device lookup for virt-xml --edit.\n\nIf cb is just "
@ -1069,7 +1066,7 @@ class _VirtCLIArgument(object):
try:
if self.propname:
xmlutil.get_prop_path(inst, self.propname)
except AttributeError:
except AttributeError: # pragma: no cover
raise RuntimeError("programming error: obj=%s does not have "
"member=%s" % (inst, self.propname))
@ -1199,17 +1196,19 @@ class _InitClass(type):
"""
def __new__(cls, *args, **kwargs):
if len(args) != 3:
return super().__new__(cls, *args)
return super().__new__(cls, *args) # pragma: no cover
name, bases, ns = args
init = ns.get('_init_class')
if isinstance(init, types.FunctionType):
raise RuntimeError("_init_class must be a @classmethod")
raise RuntimeError( # pragma: no cover
"_init_class must be a @classmethod")
self = super().__new__(cls, name, bases, ns)
self._init_class(**kwargs) # pylint: disable=protected-access
# Check for leftover aliases
if self.aliases:
raise RuntimeError("programming error: class=%s "
raise RuntimeError( # pragma: no cover
"programming error: class=%s "
"leftover aliases=%s" % (self, self.aliases))
return self
@ -2094,8 +2093,6 @@ class ParserVCPU(VirtCLIParser):
return cb(*args, **kwargs)
def set_cpuset_cb(self, inst, val, virtarg):
if not val:
return
if val != "auto":
inst.vcpu_cpuset = val
return
@ -2191,10 +2188,6 @@ class ParserBoot(VirtCLIParser):
def set_initargs_cb(self, inst, val, virtarg):
inst.set_initargs_string(val)
def set_smbios_mode_cb(self, inst, val, virtarg):
inst.smbios_mode = val
self.optdict["smbios_mode"] = val
def set_bootloader_cb(self, inst, val, virtarg):
self.guest.bootloader = val
@ -2728,7 +2721,7 @@ def _add_char_source_args(cls, prefix=""):
def _default_image_file_format(conn):
if conn.support.conn_default_qcow2():
return "qcow2"
return "raw"
return "raw" # pragma: no cover
def _get_default_image_format(conn, poolobj):

View File

@ -240,7 +240,7 @@ class DeviceGraphics(Device):
return None
def _default_spice_gl(self, _guest):
if not self.conn.support.conn_spice_gl():
if not self.conn.support.conn_spice_gl(): # pragma: no cover
raise ValueError(_("Host does not support spice GL"))
# If spice GL but rendernode wasn't specified, hardcode

View File

@ -41,13 +41,13 @@ def _random_mac(conn):
def _default_route():
route_file = "/proc/net/route"
if not os.path.exists(route_file):
if not os.path.exists(route_file): # pragma: no cover
logging.debug("route_file=%s does not exist", route_file)
return None
for line in open(route_file):
info = line.split()
if len(info) != 11:
if len(info) != 11: # pragma: no cover
logging.debug("Unexpected field count=%s when parsing %s",
len(info), route_file)
break
@ -59,29 +59,29 @@ def _default_route():
except ValueError:
continue
return None
return None # pragma: no cover
def _default_bridge():
dev = _default_route()
if not dev:
return None
return None # pragma: no cover
# New style peth0 == phys dev, eth0 == bridge, eth0 == default route
if os.path.exists("/sys/class/net/%s/bridge" % dev):
return dev
return dev # pragma: no cover
# Old style, peth0 == phys dev, eth0 == netloop, xenbr0 == bridge,
# vif0.0 == netloop enslaved, eth0 == default route
try:
defn = int(dev[-1])
except Exception:
except Exception: # pragma: no cover
defn = -1
if (defn >= 0 and
os.path.exists("/sys/class/net/peth%d/brport" % defn) and
os.path.exists("/sys/class/net/xenbr%d/bridge" % defn)):
return "xenbr%d"
return "xenbr%d" # pragma: no cover
return None
@ -132,11 +132,12 @@ class DeviceInterface(Device):
try:
DeviceInterface.is_conflict_net(conn, mac)
return mac
except RuntimeError:
except RuntimeError: # pragma: no cover
continue
logging.debug("Failed to generate non-conflicting MAC")
return None
logging.debug( # pragma: no cover
"Failed to generate non-conflicting MAC")
return None # pragma: no cover
@staticmethod
def is_conflict_net(conn, searchmac):

View File

@ -425,7 +425,7 @@ class Installer(object):
# Handle undefining the VM if the initial startup fails
try:
domain.create()
except Exception:
except Exception: # pragma: no cover
try:
domain.undefine()
except Exception:
@ -464,7 +464,7 @@ class Installer(object):
try:
logging.debug("XML fetched from libvirt object:\n%s",
domain.XMLDesc(0))
except Exception as e:
except Exception as e: # pragma: no cover
logging.debug("Error fetching XML from libvirt object: %s", e)
return domain
@ -474,7 +474,7 @@ class Installer(object):
"""
try:
domain.setAutostart(True)
except Exception as e:
except Exception as e: # pragma: no cover
if not self.conn.support.is_error_nosupport(e):
raise
logging.warning("Could not set autostart flag: libvirt "
@ -542,11 +542,13 @@ class Installer(object):
if disk.get_vol_object():
disk.get_vol_object().delete()
else:
else: # pragma: no cover
# This case technically shouldn't happen here, but
# it's here incase future assumptions change
os.unlink(disk.path)
meter.end(0)
except Exception as e:
except Exception as e: # pragma: no cover
logging.debug("Failed to remove disk '%s'",
name, exc_info=True)
logging.error("Failed to remove disk '%s': %s", name, e)

View File

@ -35,11 +35,11 @@ def _run_initrd_commands(initrd, tempdir):
finderr = find_proc.stderr.read()
cpioerr = cpio_proc.stderr.read()
gziperr = gzip_proc.stderr.read()
if finderr:
if finderr: # pragma: no cover
logging.debug("find stderr=%s", finderr)
if cpioerr:
if cpioerr: # pragma: no cover
logging.debug("cpio stderr=%s", cpioerr)
if gziperr:
if gziperr: # pragma: no cover
logging.debug("gzip stderr=%s", gziperr)
@ -97,7 +97,7 @@ def perform_cdrom_injections(injections, scratchdir):
try:
_perform_generic_injections(injections, scratchdir, iso,
_run_iso_commands)
except Exception:
except Exception: # pragma: no cover
os.unlink(iso)
raise

View File

@ -93,10 +93,10 @@ class InstallerTreeMedia(object):
not os.path.exists(system_scratchdir) or
not os.access(system_scratchdir, os.W_OK)):
if not os.path.exists(user_scratchdir):
os.makedirs(user_scratchdir, 0o751)
os.makedirs(user_scratchdir, 0o751) # pragma: no cover
return user_scratchdir
return system_scratchdir
return system_scratchdir # pragma: no cover
def __init__(self, conn, location, location_kernel, location_initrd):
self.conn = conn
@ -171,7 +171,8 @@ class InstallerTreeMedia(object):
for kpath, ipath in cache.kernel_pairs:
if fetcher.hasFile(kpath) and fetcher.hasFile(ipath):
return kpath, ipath
raise RuntimeError(_("Couldn't find kernel for install tree."))
raise RuntimeError( # pragma: no cover
_("Couldn't find kernel for install tree."))
kernelpath, initrdpath = _check_kernel_pairs()
kernel = fetcher.acquireFile(kernelpath)

View File

@ -109,7 +109,8 @@ def _version_str_to_int(verstr):
return 0
if verstr.count(".") != 2:
raise RuntimeError("programming error: version string '%s' needs "
raise RuntimeError( # pragma: no cover
"programming error: version string '%s' needs "
"two '.' in it.")
return ((int(verstr.split(".")[0]) * 1000000) +
@ -156,7 +157,8 @@ class _SupportCheck(object):
for vstr in versions:
v = _version_str_to_int(vstr)
if vstr is not None and v != 0 and v < 7003:
raise RuntimeError("programming error: Cannot enforce "
raise RuntimeError( # pragma: no cover
"programming error: Cannot enforce "
"support checks for libvirt versions less than 0.7.3, "
"since required APIs were not available. ver=%s" % vstr)

View File

@ -221,9 +221,9 @@ class _URLFetcher(object):
logging.debug("Saved file to %s", fn)
return fn
except: # noqa
if fn and os.path.exists(fn):
os.unlink(fn)
raise
if fn and os.path.exists(fn): # pragma: no cover
os.unlink(fn) # pragma: no cover
raise # pragma: no cover
def acquireFileContent(self, filename):
"""
@ -247,7 +247,7 @@ class _HTTPURLFetcher(_URLFetcher):
if self._session:
try:
self._session.close()
except Exception:
except Exception: # pragma: no cover
logging.debug("Error closing requests.session", exc_info=True)
self._session = None
@ -261,7 +261,7 @@ class _HTTPURLFetcher(_URLFetcher):
try:
response = self._session.head(url, allow_redirects=True)
response.raise_for_status()
except Exception as e:
except Exception as e: # pragma: no cover
logging.debug("HTTP hasFile request failed: %s", str(e))
return False
return True
@ -274,7 +274,7 @@ class _HTTPURLFetcher(_URLFetcher):
response.raise_for_status()
try:
size = int(response.headers.get('content-length'))
except Exception:
except Exception: # pragma: no cover
size = None
return response, size
@ -296,7 +296,7 @@ class _FTPURLFetcher(_URLFetcher):
def _prepare(self):
if self._ftp:
return
return # pragma: no cover
try:
parsed = urllib.parse.urlparse(self.location)
@ -310,7 +310,7 @@ class _FTPURLFetcher(_URLFetcher):
self._ftp.login(username, password)
# Force binary mode
self._ftp.voidcmd("TYPE I")
except Exception as e:
except Exception as e: # pragma: no cover
raise ValueError(_("Opening URL %s failed: %s.") %
(self.location, str(e)))
@ -328,11 +328,11 @@ class _FTPURLFetcher(_URLFetcher):
def _cleanup(self):
if not self._ftp:
return
return # pragma: no cover
try:
self._ftp.quit()
except Exception:
except Exception: # pragma: no cover
logging.debug("Error quitting ftp connection", exc_info=True)
self._ftp = None
@ -347,7 +347,7 @@ class _FTPURLFetcher(_URLFetcher):
except ftplib.all_errors:
# If it's a dir
self._ftp.cwd(path)
except ftplib.all_errors as e:
except ftplib.all_errors as e: # pragma: no cover
logging.debug("FTP hasFile: couldn't access %s: %s",
url, str(e))
return False