test_urls: Use multiple unittests, not a single big one
This commit is contained in:
parent
601eb5f15e
commit
ff5b8d29e2
10
setup.py
10
setup.py
|
@ -484,11 +484,10 @@ class TestCommand(TestBaseCommand):
|
|||
class TestURLFetch(TestBaseCommand):
|
||||
description = "Test fetching kernels and isos from various distro trees"
|
||||
|
||||
user_options = TestBaseCommand.user_options + \
|
||||
[("match=", None, "Regular expression of dist names to "
|
||||
"match [default: '.*']"),
|
||||
("path=", None, "Paths to local iso or directory or check"
|
||||
" for installable distro. Comma separated")]
|
||||
user_options = TestBaseCommand.user_options + [
|
||||
("path=", None, "Paths to local iso or directory or check"
|
||||
" for installable distro. Comma separated"),
|
||||
]
|
||||
|
||||
def initialize_options(self):
|
||||
TestBaseCommand.initialize_options(self)
|
||||
|
@ -509,7 +508,6 @@ class TestURLFetch(TestBaseCommand):
|
|||
def run(self):
|
||||
from tests import test_urls
|
||||
self._testfiles = ["tests.test_urls"]
|
||||
test_urls.MATCH_FILTER = self.match
|
||||
if self.path:
|
||||
for p in self.path:
|
||||
test_urls.LOCAL_MEDIA.append(p)
|
||||
|
|
|
@ -39,9 +39,6 @@ from virtinst.urlfetcher import MandrivaDistro
|
|||
# pylint: disable=W0212
|
||||
# Access to protected member, needed to unittest stuff
|
||||
|
||||
# Filters for including/excluding certain distros.
|
||||
MATCH_FILTER = ".*"
|
||||
|
||||
# Variable used to store a local iso or dir path to check for a distro
|
||||
# Specified via 'python setup.py test_urls --path"
|
||||
LOCAL_MEDIA = []
|
||||
|
@ -63,11 +60,6 @@ SCIENTIFIC_BASEURL = "http://ftp.scientificlinux.org/linux/scientific/%s/%s/"
|
|||
# Doesn't appear to be a simple boot iso in newer suse trees
|
||||
NOBOOTISO_FILTER = ".*opensuse12.*|.*opensuse11.*|.*opensuse10.3.*|.*opensuse10.0.*"
|
||||
|
||||
# Opensuse < 10.3 (and some sles) require crazy rpm hacking to get a bootable
|
||||
# kernel. We expect failure in this case since our test harness doesn't
|
||||
# actually fetch anything
|
||||
EXPECT_XEN_FAIL = ".*opensuse10.2.*|.*opensuse10.0.*"
|
||||
|
||||
|
||||
# Return the expected Distro class for the passed distro label
|
||||
def distroClass(distname):
|
||||
|
@ -226,215 +218,147 @@ urls = {
|
|||
|
||||
testconn = utils.open_testdefault()
|
||||
testguest = Guest(testconn)
|
||||
meter = urlgrabber.progress.BaseMeter()
|
||||
if utils.get_debug():
|
||||
meter = urlgrabber.progress.TextMeter(fo=sys.stdout)
|
||||
|
||||
|
||||
class TestURLFetch(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.meter = urlgrabber.progress.BaseMeter()
|
||||
if utils.get_debug():
|
||||
self.meter = urlgrabber.progress.TextMeter(fo=sys.stdout)
|
||||
|
||||
def _fetchLocalMedia(self, mediapath):
|
||||
arch = platform.machine()
|
||||
|
||||
fetcher = urlfetcher._fetcherForURI(mediapath, "/tmp")
|
||||
|
||||
def _storeForDistro(fetcher, url, _type, arch):
|
||||
"""
|
||||
Helper to lookup the Distro store object, basically detecting the
|
||||
URL. Handle occasional proxy errors
|
||||
"""
|
||||
for ignore in range(0, 10):
|
||||
try:
|
||||
fetcher.prepareLocation()
|
||||
|
||||
# Make sure we detect _a_ distro
|
||||
hvmstore = self._getStore(fetcher, mediapath, "hvm", arch)
|
||||
logging.debug("Local distro detected as: %s", hvmstore)
|
||||
finally:
|
||||
fetcher.cleanupLocation()
|
||||
|
||||
|
||||
def _fetchFromURLDict(self, distname, url, arch, distro_info, check_xen):
|
||||
logging.debug("\nDistro='%s' arch='%s' url=%s",
|
||||
distname, arch, url)
|
||||
|
||||
fetcher = urlfetcher._fetcherForURI(url, "/tmp")
|
||||
try:
|
||||
fetcher.prepareLocation()
|
||||
return urlfetcher._storeForDistro(fetcher=fetcher, baseuri=url,
|
||||
progresscb=meter,
|
||||
arch=arch, typ=_type)
|
||||
except Exception, e:
|
||||
# Don't raise an error here: the site might be down atm
|
||||
logging.error("%s-%s: Couldn't access url %s: %s. Skipping.",
|
||||
distname, arch, fetcher.location, str(e))
|
||||
fetcher.cleanupLocation()
|
||||
self.skipTest("")
|
||||
return
|
||||
|
||||
try:
|
||||
self._grabURLMedia(fetcher, distname, url, arch, distro_info,
|
||||
check_xen)
|
||||
finally:
|
||||
fetcher.cleanupLocation()
|
||||
|
||||
def _checkDistroReporting(self, stores, distro_info):
|
||||
if distro_info is None:
|
||||
return
|
||||
|
||||
dvariant = distro_info
|
||||
|
||||
for store in stores:
|
||||
if not store:
|
||||
if str(e).count("502"):
|
||||
logging.debug("Caught proxy error: %s", str(e))
|
||||
time.sleep(.5)
|
||||
continue
|
||||
raise
|
||||
raise
|
||||
|
||||
v = store.os_variant
|
||||
|
||||
if dvariant != v:
|
||||
raise RuntimeError("Store distro/variant did not match "
|
||||
"expected values: %s %s != %s"
|
||||
% (store, v, dvariant))
|
||||
def _testLocalMedia(fetcher, path):
|
||||
"""
|
||||
Test a local path explicitly requested by the user
|
||||
"""
|
||||
print "\nChecking local path: %s" % path
|
||||
arch = platform.machine()
|
||||
|
||||
# Verify the values are valid
|
||||
if v:
|
||||
testguest.os_variant = v
|
||||
# Make sure we detect _a_ distro
|
||||
hvmstore = _storeForDistro(fetcher, path, "hvm", arch)
|
||||
logging.debug("Local distro detected as: %s", hvmstore)
|
||||
|
||||
def _grabURLMedia(self, fetcher, distname, url, arch, distro_info,
|
||||
check_xen):
|
||||
|
||||
hvmstore = self._getStore(fetcher, url, "hvm", arch)
|
||||
def _testURL(fetcher, distname, url, arch, detect_distro, check_xen):
|
||||
"""
|
||||
Test that our URL detection logic works for grabbing kernel, xen
|
||||
kernel, and boot.iso
|
||||
"""
|
||||
print "\nTesting %s-%s" % (distname, arch)
|
||||
|
||||
if check_xen:
|
||||
xenstore = self._getStore(fetcher, url, "xen", arch)
|
||||
else:
|
||||
xenstore = None
|
||||
hvmstore = _storeForDistro(fetcher, url, "hvm", arch)
|
||||
xenstore = None
|
||||
if check_xen:
|
||||
xenstore = _storeForDistro(fetcher, url, "xen", arch)
|
||||
|
||||
exp_store = distroClass(distname)
|
||||
for s in [hvmstore, xenstore]:
|
||||
if s and not isinstance(s, exp_store):
|
||||
logging.error("(%s): expected store %s, was %s",
|
||||
distname, exp_store, s)
|
||||
self.fail()
|
||||
exp_store = distroClass(distname)
|
||||
for s in [hvmstore, xenstore]:
|
||||
if s and not isinstance(s, exp_store):
|
||||
raise AssertionError("(%s): expected store %s, was %s" %
|
||||
(distname, exp_store, s))
|
||||
|
||||
# Make sure the stores are reporting correct distro name/variant
|
||||
try:
|
||||
self._checkDistroReporting([hvmstore, xenstore], distro_info)
|
||||
except:
|
||||
logging.exception("\n\nDistro detection failed.")
|
||||
self.fail()
|
||||
if s and detect_distro and detect_distro != s.os_variant:
|
||||
raise AssertionError("Store distro/variant did not match "
|
||||
"expected values: store=%s, found=%s expect=%s" %
|
||||
(s, s.os_variant, detect_distro))
|
||||
|
||||
def fakeAcquireFile(filename, meter):
|
||||
if not isinstance(meter, urlgrabber.progress.BaseMeter):
|
||||
raise ValueError("passed meter is '%s' not an"
|
||||
" actual meter." % meter)
|
||||
logging.debug("Fake acquiring %s", filename)
|
||||
return fetcher.hasFile(filename)
|
||||
# Do this only after the distro detection, since we actually need
|
||||
# to fetch files for that part
|
||||
def fakeAcquireFile(filename, _meter):
|
||||
ignore = _meter
|
||||
logging.debug("Fake acquiring %s", filename)
|
||||
return fetcher.hasFile(filename)
|
||||
fetcher.acquireFile = fakeAcquireFile
|
||||
|
||||
# Replace acquireFile with hasFile, so we don't actually have to fetch
|
||||
# 1000 kernels
|
||||
fetcher.acquireFile = fakeAcquireFile
|
||||
# Fetch boot iso
|
||||
if re.match(r"%s" % NOBOOTISO_FILTER, distname):
|
||||
logging.debug("Known lack of boot.iso in %s tree. Skipping.",
|
||||
distname)
|
||||
else:
|
||||
boot = hvmstore.acquireBootDisk(testguest, fetcher, meter)
|
||||
logging.debug("acquireBootDisk: %s", str(boot))
|
||||
|
||||
# Fetch boot iso
|
||||
try:
|
||||
if re.match(r"%s" % NOBOOTISO_FILTER, distname):
|
||||
logging.debug("Known lack of boot.iso in %s tree. Skipping.",
|
||||
distname)
|
||||
else:
|
||||
boot = hvmstore.acquireBootDisk(testguest, fetcher, self.meter)
|
||||
logging.debug("acquireBootDisk: %s", str(boot))
|
||||
if boot is not True:
|
||||
raise AssertionError("%s-%s: bootiso fetching failed" %
|
||||
(distname, arch))
|
||||
|
||||
if boot is not True:
|
||||
raise RuntimeError("Didn't fetch any boot iso.")
|
||||
except Exception, e:
|
||||
logging.exception("\n\n%s-%s: bootdisk fetching: %s",
|
||||
distname, arch, str(e))
|
||||
self.fail()
|
||||
# Fetch regular kernel
|
||||
kern = hvmstore.acquireKernel(testguest, fetcher, meter)
|
||||
logging.debug("acquireKernel (hvm): %s", str(kern))
|
||||
|
||||
# Fetch regular kernel
|
||||
try:
|
||||
kern = hvmstore.acquireKernel(testguest, fetcher, self.meter)
|
||||
logging.debug("acquireKernel (hvm): %s", str(kern))
|
||||
if kern[0] is not True or kern[1] is not True:
|
||||
AssertionError("%s-%s: hvm kernel fetching failed" %
|
||||
(distname, arch))
|
||||
|
||||
if kern[0] is not True or kern[1] is not True:
|
||||
raise RuntimeError("Didn't fetch any hvm kernel.")
|
||||
except Exception, e:
|
||||
logging.exception("\n\n%s-%s: hvm kernel fetching: %s",
|
||||
distname, arch, str(e))
|
||||
self.fail()
|
||||
# Fetch xen kernel
|
||||
if xenstore and check_xen:
|
||||
kern = xenstore.acquireKernel(testguest, fetcher, meter)
|
||||
logging.debug("acquireKernel (xen): %s", str(kern))
|
||||
|
||||
# Fetch xen kernel
|
||||
try:
|
||||
if xenstore and check_xen:
|
||||
kern = xenstore.acquireKernel(testguest, fetcher, self.meter)
|
||||
logging.debug("acquireKernel (xen): %s", str(kern))
|
||||
if kern[0] is not True or kern[1] is not True:
|
||||
raise AssertionError("%s-%s: xen kernel fetching" %
|
||||
(distname, arch))
|
||||
else:
|
||||
logging.debug("acquireKernel (xen): Hardcoded skipping.")
|
||||
|
||||
if kern[0] is not True or kern[1] is not True:
|
||||
raise RuntimeError("Didn't fetch any xen kernel.")
|
||||
else:
|
||||
logging.debug("acquireKernel (xen): Hardcoded skipping.")
|
||||
except Exception, e:
|
||||
if re.match(r"%s" % EXPECT_XEN_FAIL, distname):
|
||||
logging.debug("%s: anticipated xen failure.", distname)
|
||||
else:
|
||||
logging.exception("\n\n%s-%s: xen kernel fetching: %s",
|
||||
distname, arch, str(e))
|
||||
self.fail()
|
||||
|
||||
def _getStore(self, fetcher, url, _type, arch):
|
||||
for ignore in range(0, 10):
|
||||
try:
|
||||
return urlfetcher._storeForDistro(fetcher=fetcher, baseuri=url,
|
||||
progresscb=self.meter,
|
||||
arch=arch, typ=_type)
|
||||
except Exception, e:
|
||||
if str(e).count("502"):
|
||||
logging.debug("Caught proxy error: %s", str(e))
|
||||
time.sleep(.5)
|
||||
continue
|
||||
raise
|
||||
raise
|
||||
|
||||
def testURLFetch(self):
|
||||
def _fetch_wrapper(url, cb, *args):
|
||||
fetcher = urlfetcher._fetcherForURI(url, "/tmp")
|
||||
try:
|
||||
fetcher.prepareLocation()
|
||||
return cb(fetcher, *args)
|
||||
finally:
|
||||
fetcher.cleanupLocation()
|
||||
|
||||
if LOCAL_MEDIA:
|
||||
logging.debug("Skipping URL tests since local path is specified.")
|
||||
return
|
||||
|
||||
def _make_test_wrapper(url, cb, args):
|
||||
def cmdtemplate():
|
||||
return _fetch_wrapper(url, cb, *args)
|
||||
return lambda _self: cmdtemplate()
|
||||
|
||||
|
||||
# Register tests to be picked up by unittest
|
||||
# If local ISO tests requested, skip all other URL tests
|
||||
class URLTests(unittest.TestCase):
|
||||
pass
|
||||
|
||||
|
||||
def _make_tests():
|
||||
if LOCAL_MEDIA:
|
||||
newidx = 0
|
||||
for p in LOCAL_MEDIA:
|
||||
newidx += 1
|
||||
args = (p,)
|
||||
testfunc = _make_test_wrapper(p, _testLocalMedia, args)
|
||||
setattr(URLTests, "testLocalMedia%s" % newidx, testfunc)
|
||||
else:
|
||||
keys = urls.keys()
|
||||
keys.sort()
|
||||
assertions = 0
|
||||
for label in keys:
|
||||
distro_info = None
|
||||
if MATCH_FILTER and not re.match(r"%s" % MATCH_FILTER, label):
|
||||
logging.debug("Excluding '%s' from exclude filter.", label)
|
||||
continue
|
||||
for key in keys:
|
||||
distro_dict = urls[key]
|
||||
detect_distro = distro_dict.pop("distro", "linux")
|
||||
check_xen = not distro_dict.pop("noxen", False)
|
||||
|
||||
check_xen = not bool(urls[label].get("noxen"))
|
||||
if "distro" in urls[label]:
|
||||
distro_info = urls[label]["distro"]
|
||||
for arch, url in distro_dict.items():
|
||||
args = (key, url, arch, detect_distro, check_xen)
|
||||
testfunc = _make_test_wrapper(url, _testURL, args)
|
||||
setattr(URLTests, "testURL%s%s" % (key, arch), testfunc)
|
||||
|
||||
for arch, url in urls[label].items():
|
||||
if arch == "distro" or arch == "noxen":
|
||||
continue
|
||||
|
||||
try:
|
||||
print "Testing %s-%s : %s" % (label, arch, url)
|
||||
self._fetchFromURLDict(label, url, arch, distro_info,
|
||||
check_xen)
|
||||
except AssertionError:
|
||||
print "%s-%s FAILED." % (label, arch)
|
||||
assertions += 1
|
||||
except Exception:
|
||||
print "%s-%s ERROR." % (label, arch)
|
||||
assertions += 1
|
||||
|
||||
if assertions != 0:
|
||||
raise AssertionError("Found %d errors in URL suite." % assertions)
|
||||
|
||||
def testLocalMedia(self):
|
||||
assertions = 0
|
||||
if LOCAL_MEDIA:
|
||||
for p in LOCAL_MEDIA:
|
||||
print "Checking local path: %s" % p
|
||||
try:
|
||||
self._fetchLocalMedia(p)
|
||||
except Exception, e:
|
||||
logging.exception("\n\nLocal path '%s' failed: %s", p, e)
|
||||
print "Local path FAILED."
|
||||
assertions += 1
|
||||
|
||||
if assertions != 0:
|
||||
raise AssertionError("Found %d errors in local fetch utils." %
|
||||
assertions)
|
||||
_make_tests()
|
||||
|
|
Loading…
Reference in New Issue