mirror of https://github.com/python/cpython.git
Test zipimporter a bit more. Also get working with -R :: option for finding ref leaks
This commit is contained in:
parent
0e17f8cd38
commit
b155b62f54
|
@ -4,6 +4,7 @@
|
|||
import imp
|
||||
import struct
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import zlib # implied prerequisite
|
||||
from zipfile import ZipFile, ZipInfo, ZIP_STORED, ZIP_DEFLATED
|
||||
|
@ -13,6 +14,11 @@
|
|||
import zipimport
|
||||
|
||||
|
||||
# so we only run testAFakeZlib once if this test is run repeatedly
|
||||
# which happens when we look for ref leaks
|
||||
test_imported = False
|
||||
|
||||
|
||||
def make_pyc(co, mtime):
|
||||
data = marshal.dumps(co)
|
||||
if type(mtime) is type(0.0):
|
||||
|
@ -176,6 +182,37 @@ def testDeepPackage(self):
|
|||
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
|
||||
self.doTest(pyc_ext, files, TESTPACK, TESTPACK2, TESTMOD)
|
||||
|
||||
def testZipImporterMethods(self):
|
||||
packdir = TESTPACK + os.sep
|
||||
packdir2 = packdir + TESTPACK2 + os.sep
|
||||
files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
|
||||
packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
|
||||
packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
|
||||
|
||||
z = ZipFile(TEMP_ZIP, "w")
|
||||
try:
|
||||
for name, (mtime, data) in files.items():
|
||||
zinfo = ZipInfo(name, time.localtime(mtime))
|
||||
zinfo.compress_type = self.compression
|
||||
z.writestr(zinfo, data)
|
||||
z.close()
|
||||
|
||||
zi = zipimport.zipimporter(TEMP_ZIP)
|
||||
self.assertEquals(zi.is_package(TESTPACK), True)
|
||||
zi.load_module(TESTPACK)
|
||||
|
||||
self.assertEquals(zi.is_package(packdir + '__init__'), False)
|
||||
self.assertEquals(zi.is_package(packdir + TESTPACK2), True)
|
||||
self.assertEquals(zi.is_package(packdir2 + TESTMOD), False)
|
||||
|
||||
mod_name = packdir2 + TESTMOD
|
||||
mod = __import__(mod_name.replace('/', '.'))
|
||||
self.assertEquals(zi.get_source(TESTPACK), None)
|
||||
self.assertEquals(zi.get_source(mod_name), None)
|
||||
finally:
|
||||
z.close()
|
||||
os.remove(TEMP_ZIP)
|
||||
|
||||
def testGetData(self):
|
||||
z = ZipFile(TEMP_ZIP, "w")
|
||||
z.compression = self.compression
|
||||
|
@ -186,6 +223,7 @@ def testGetData(self):
|
|||
z.close()
|
||||
zi = zipimport.zipimporter(TEMP_ZIP)
|
||||
self.assertEquals(data, zi.get_data(name))
|
||||
self.assert_('zipimporter object' in repr(zi))
|
||||
finally:
|
||||
z.close()
|
||||
os.remove(TEMP_ZIP)
|
||||
|
@ -212,11 +250,91 @@ class CompressedZipImportTestCase(UncompressedZipImportTestCase):
|
|||
compression = ZIP_DEFLATED
|
||||
|
||||
|
||||
class BadFileZipImportTestCase(unittest.TestCase):
|
||||
def assertZipFailure(self, filename):
|
||||
self.assertRaises(zipimport.ZipImportError,
|
||||
zipimport.zipimporter, filename)
|
||||
|
||||
def testNoFile(self):
|
||||
self.assertZipFailure('AdfjdkFJKDFJjdklfjs')
|
||||
|
||||
def testEmptyFilename(self):
|
||||
self.assertZipFailure('')
|
||||
|
||||
def testBadArgs(self):
|
||||
self.assertRaises(TypeError, zipimport.zipimporter, None)
|
||||
self.assertRaises(TypeError, zipimport.zipimporter, TESTMOD, kwd=None)
|
||||
|
||||
def testFilenameTooLong(self):
|
||||
self.assertZipFailure('A' * 33000)
|
||||
|
||||
def testEmptyFile(self):
|
||||
test_support.unlink(TESTMOD)
|
||||
open(TESTMOD, 'w+').close()
|
||||
self.assertZipFailure(TESTMOD)
|
||||
|
||||
def testFileUnreadable(self):
|
||||
test_support.unlink(TESTMOD)
|
||||
fd = os.open(TESTMOD, os.O_CREAT, 000)
|
||||
os.close(fd)
|
||||
self.assertZipFailure(TESTMOD)
|
||||
|
||||
def testNotZipFile(self):
|
||||
test_support.unlink(TESTMOD)
|
||||
fp = open(TESTMOD, 'w+')
|
||||
fp.write('a' * 22)
|
||||
fp.close()
|
||||
self.assertZipFailure(TESTMOD)
|
||||
|
||||
def testBogusZipFile(self):
|
||||
test_support.unlink(TESTMOD)
|
||||
fp = open(TESTMOD, 'w+')
|
||||
fp.write(struct.pack('=I', 0x06054B50))
|
||||
fp.write('a' * 18)
|
||||
fp.close()
|
||||
z = zipimport.zipimporter(TESTMOD)
|
||||
|
||||
try:
|
||||
self.assertRaises(TypeError, z.find_module, None)
|
||||
self.assertRaises(TypeError, z.load_module, None)
|
||||
self.assertRaises(TypeError, z.is_package, None)
|
||||
self.assertRaises(TypeError, z.get_code, None)
|
||||
self.assertRaises(TypeError, z.get_data, None)
|
||||
self.assertRaises(TypeError, z.get_source, None)
|
||||
|
||||
error = zipimport.ZipImportError
|
||||
self.assertEqual(z.find_module('abc'), None)
|
||||
|
||||
self.assertRaises(error, z.load_module, 'abc')
|
||||
self.assertRaises(error, z.get_code, 'abc')
|
||||
self.assertRaises(IOError, z.get_data, 'abc')
|
||||
self.assertRaises(error, z.get_source, 'abc')
|
||||
self.assertRaises(error, z.is_package, 'abc')
|
||||
finally:
|
||||
zipimport._zip_directory_cache.clear()
|
||||
|
||||
|
||||
def cleanup():
|
||||
# this is necessary if test is run repeated (like when finding leaks)
|
||||
global test_imported
|
||||
if test_imported:
|
||||
zipimport._zip_directory_cache.clear()
|
||||
if hasattr(UncompressedZipImportTestCase, 'testAFakeZlib'):
|
||||
delattr(UncompressedZipImportTestCase, 'testAFakeZlib')
|
||||
if hasattr(CompressedZipImportTestCase, 'testAFakeZlib'):
|
||||
delattr(CompressedZipImportTestCase, 'testAFakeZlib')
|
||||
test_imported = True
|
||||
|
||||
def test_main():
|
||||
test_support.run_unittest(
|
||||
UncompressedZipImportTestCase,
|
||||
CompressedZipImportTestCase
|
||||
)
|
||||
cleanup()
|
||||
try:
|
||||
test_support.run_unittest(
|
||||
UncompressedZipImportTestCase,
|
||||
CompressedZipImportTestCase,
|
||||
BadFileZipImportTestCase,
|
||||
)
|
||||
finally:
|
||||
test_support.unlink(TESTMOD)
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_main()
|
||||
|
|
Loading…
Reference in New Issue