Merge "releasetools: Reduce the memory use in test_common.py."

This commit is contained in:
Tao Bao 2017-11-11 01:32:31 +00:00 committed by Gerrit Code Review
commit 44cb0db6a7
1 changed files with 48 additions and 38 deletions

View File

@ -20,32 +20,27 @@ import time
import unittest import unittest
import zipfile import zipfile
from hashlib import sha1
import common import common
import validate_target_files import validate_target_files
KiB = 1024
def random_string_with_holes(size, block_size, step_size): MiB = 1024 * KiB
data = ["\0"] * size GiB = 1024 * MiB
for begin in range(0, size, step_size):
end = begin + block_size
data[begin:end] = os.urandom(block_size)
return "".join(data)
def get_2gb_string(): def get_2gb_string():
kilobytes = 1024 size = int(2 * GiB + 1)
megabytes = 1024 * kilobytes block_size = 4 * KiB
gigabytes = 1024 * megabytes step_size = 4 * MiB
# Generate a long string with holes, e.g. 'xyz\x00abc\x00...'.
size = int(2 * gigabytes + 1) for _ in range(0, size, step_size):
block_size = 4 * kilobytes yield os.urandom(block_size)
step_size = 4 * megabytes yield '\0' * (step_size - block_size)
two_gb_string = random_string_with_holes(
size, block_size, step_size)
return two_gb_string
class CommonZipTest(unittest.TestCase): class CommonZipTest(unittest.TestCase):
def _verify(self, zip_file, zip_file_name, arcname, contents, def _verify(self, zip_file, zip_file_name, arcname, expected_hash,
test_file_name=None, expected_stat=None, expected_mode=0o644, test_file_name=None, expected_stat=None, expected_mode=0o644,
expected_compress_type=zipfile.ZIP_STORED): expected_compress_type=zipfile.ZIP_STORED):
# Verify the stat if present. # Verify the stat if present.
@ -69,7 +64,11 @@ class CommonZipTest(unittest.TestCase):
self.assertEqual(info.compress_type, expected_compress_type) self.assertEqual(info.compress_type, expected_compress_type)
# Verify the zip contents. # Verify the zip contents.
self.assertEqual(zip_file.read(arcname), contents) entry = zip_file.open(arcname)
sha1_hash = sha1()
for chunk in iter(lambda: entry.read(4 * MiB), ''):
sha1_hash.update(chunk)
self.assertEqual(expected_hash, sha1_hash.hexdigest())
self.assertIsNone(zip_file.testzip()) self.assertIsNone(zip_file.testzip())
def _test_ZipWrite(self, contents, extra_zipwrite_args=None): def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
@ -90,7 +89,10 @@ class CommonZipTest(unittest.TestCase):
zip_file = zipfile.ZipFile(zip_file_name, "w") zip_file = zipfile.ZipFile(zip_file_name, "w")
try: try:
test_file.write(contents) sha1_hash = sha1()
for data in contents:
sha1_hash.update(data)
test_file.write(data)
test_file.close() test_file.close()
expected_stat = os.stat(test_file_name) expected_stat = os.stat(test_file_name)
@ -102,8 +104,9 @@ class CommonZipTest(unittest.TestCase):
common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
common.ZipClose(zip_file) common.ZipClose(zip_file)
self._verify(zip_file, zip_file_name, arcname, contents, test_file_name, self._verify(zip_file, zip_file_name, arcname, sha1_hash.hexdigest(),
expected_stat, expected_mode, expected_compress_type) test_file_name, expected_stat, expected_mode,
expected_compress_type)
finally: finally:
os.remove(test_file_name) os.remove(test_file_name)
os.remove(zip_file_name) os.remove(zip_file_name)
@ -133,7 +136,7 @@ class CommonZipTest(unittest.TestCase):
common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args) common.ZipWriteStr(zip_file, zinfo_or_arcname, contents, **extra_args)
common.ZipClose(zip_file) common.ZipClose(zip_file)
self._verify(zip_file, zip_file_name, arcname, contents, self._verify(zip_file, zip_file_name, arcname, sha1(contents).hexdigest(),
expected_mode=expected_mode, expected_mode=expected_mode,
expected_compress_type=expected_compress_type) expected_compress_type=expected_compress_type)
finally: finally:
@ -159,7 +162,10 @@ class CommonZipTest(unittest.TestCase):
zip_file = zipfile.ZipFile(zip_file_name, "w") zip_file = zipfile.ZipFile(zip_file_name, "w")
try: try:
test_file.write(large) sha1_hash = sha1()
for data in large:
sha1_hash.update(data)
test_file.write(data)
test_file.close() test_file.close()
expected_stat = os.stat(test_file_name) expected_stat = os.stat(test_file_name)
@ -173,12 +179,13 @@ class CommonZipTest(unittest.TestCase):
common.ZipClose(zip_file) common.ZipClose(zip_file)
# Verify the contents written by ZipWrite(). # Verify the contents written by ZipWrite().
self._verify(zip_file, zip_file_name, arcname_large, large, self._verify(zip_file, zip_file_name, arcname_large,
test_file_name, expected_stat, expected_mode, sha1_hash.hexdigest(), test_file_name, expected_stat,
expected_compress_type) expected_mode, expected_compress_type)
# Verify the contents written by ZipWriteStr(). # Verify the contents written by ZipWriteStr().
self._verify(zip_file, zip_file_name, arcname_small, small, self._verify(zip_file, zip_file_name, arcname_small,
sha1(small).hexdigest(),
expected_compress_type=expected_compress_type) expected_compress_type=expected_compress_type)
finally: finally:
os.remove(zip_file_name) os.remove(zip_file_name)
@ -287,13 +294,17 @@ class CommonZipTest(unittest.TestCase):
common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400) common.ZipWriteStr(zip_file, zinfo, random_string, perms=0o400)
common.ZipClose(zip_file) common.ZipClose(zip_file)
self._verify(zip_file, zip_file_name, "foo", random_string, self._verify(zip_file, zip_file_name, "foo",
sha1(random_string).hexdigest(),
expected_mode=0o644) expected_mode=0o644)
self._verify(zip_file, zip_file_name, "bar", random_string, self._verify(zip_file, zip_file_name, "bar",
sha1(random_string).hexdigest(),
expected_mode=0o755) expected_mode=0o755)
self._verify(zip_file, zip_file_name, "baz", random_string, self._verify(zip_file, zip_file_name, "baz",
sha1(random_string).hexdigest(),
expected_mode=0o740) expected_mode=0o740)
self._verify(zip_file, zip_file_name, "qux", random_string, self._verify(zip_file, zip_file_name, "qux",
sha1(random_string).hexdigest(),
expected_mode=0o400) expected_mode=0o400)
finally: finally:
os.remove(zip_file_name) os.remove(zip_file_name)
@ -310,8 +321,7 @@ class InstallRecoveryScriptFormatTest(unittest.TestCase):
dummy_fstab = \ dummy_fstab = \
["/dev/soc.0/by-name/boot /boot emmc defaults defaults", ["/dev/soc.0/by-name/boot /boot emmc defaults defaults",
"/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"] "/dev/soc.0/by-name/recovery /recovery emmc defaults defaults"]
self._info["fstab"] = common.LoadRecoveryFSTab(lambda x : "\n".join(x), self._info["fstab"] = common.LoadRecoveryFSTab("\n".join, 2, dummy_fstab)
2, dummy_fstab)
# Construct the gzipped recovery.img and boot.img # Construct the gzipped recovery.img and boot.img
self.recovery_data = bytearray([ self.recovery_data = bytearray([
0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a, 0x1f, 0x8b, 0x08, 0x00, 0x81, 0x11, 0x02, 0x5a, 0x00, 0x03, 0x2b, 0x4a,
@ -332,8 +342,8 @@ class InstallRecoveryScriptFormatTest(unittest.TestCase):
f.write(data) f.write(data)
def test_full_recovery(self): def test_full_recovery(self):
recovery_image = common.File("recovery.img", self.recovery_data); recovery_image = common.File("recovery.img", self.recovery_data)
boot_image = common.File("boot.img", self.boot_data); boot_image = common.File("boot.img", self.boot_data)
self._info["full_recovery_image"] = "true" self._info["full_recovery_image"] = "true"
common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,
@ -342,9 +352,9 @@ class InstallRecoveryScriptFormatTest(unittest.TestCase):
self._info) self._info)
def test_recovery_from_boot(self): def test_recovery_from_boot(self):
recovery_image = common.File("recovery.img", self.recovery_data); recovery_image = common.File("recovery.img", self.recovery_data)
self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES") self._out_tmp_sink("recovery.img", recovery_image.data, "IMAGES")
boot_image = common.File("boot.img", self.boot_data); boot_image = common.File("boot.img", self.boot_data)
self._out_tmp_sink("boot.img", boot_image.data, "IMAGES") self._out_tmp_sink("boot.img", boot_image.data, "IMAGES")
common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink, common.MakeRecoveryPatch(self._tempdir, self._out_tmp_sink,