Wrap zipfile.write(), writestr() and close()
In order to work around the zip 2GiB limit, we need to wrap the related functions in zipfile. Calls to those functions should always be replaced with calls to the wrappers instead. Bug: 18015246 Change-Id: Ice494371ca6654e88ded2ae0eb680f51082effcb
This commit is contained in:
parent
9db69c1b08
commit
2ed665a033
|
@ -33,10 +33,6 @@ import os
|
|||
import tempfile
|
||||
import zipfile
|
||||
|
||||
# missing in Python 2.4 and before
|
||||
if not hasattr(os, "SEEK_SET"):
|
||||
os.SEEK_SET = 0
|
||||
|
||||
import build_image
|
||||
import common
|
||||
|
||||
|
@ -189,7 +185,7 @@ def AddUserdata(output_zip, prefix="IMAGES/"):
|
|||
assert succ, "build userdata.img image failed"
|
||||
|
||||
common.CheckSize(img.name, "userdata.img", OPTIONS.info_dict)
|
||||
output_zip.write(img.name, prefix + "userdata.img")
|
||||
common.ZipWrite(output_zip, img.name, prefix + "userdata.img")
|
||||
img.close()
|
||||
os.rmdir(user_dir)
|
||||
os.rmdir(temp_dir)
|
||||
|
@ -226,7 +222,7 @@ def AddCache(output_zip, prefix="IMAGES/"):
|
|||
assert succ, "build cache.img image failed"
|
||||
|
||||
common.CheckSize(img.name, "cache.img", OPTIONS.info_dict)
|
||||
output_zip.write(img.name, prefix + "cache.img")
|
||||
common.ZipWrite(output_zip, img.name, prefix + "cache.img")
|
||||
img.close()
|
||||
os.rmdir(user_dir)
|
||||
os.rmdir(temp_dir)
|
||||
|
@ -252,7 +248,7 @@ def AddImagesToTargetFiles(filename):
|
|||
OPTIONS.info_dict["selinux_fc"] = os.path.join(
|
||||
OPTIONS.input_tmp, "BOOT", "RAMDISK", "file_contexts")
|
||||
|
||||
input_zip.close()
|
||||
common.ZipClose(input_zip)
|
||||
output_zip = zipfile.ZipFile(filename, "a",
|
||||
compression=zipfile.ZIP_DEFLATED)
|
||||
|
||||
|
@ -297,7 +293,7 @@ def AddImagesToTargetFiles(filename):
|
|||
banner("cache")
|
||||
AddCache(output_zip)
|
||||
|
||||
output_zip.close()
|
||||
common.ZipClose(output_zip)
|
||||
|
||||
def main(argv):
|
||||
def option_handler(o, _):
|
||||
|
|
|
@ -205,8 +205,8 @@ def BuildImage(in_dir, prop_dict, out_file):
|
|||
Returns:
|
||||
True iff the image is built successfully.
|
||||
"""
|
||||
# system_root_image=true: build a system.img that combines the contents of /system
|
||||
# and the ramdisk, and can be mounted at the root of the file system.
|
||||
# system_root_image=true: build a system.img that combines the contents of
|
||||
# /system and the ramdisk, and can be mounted at the root of the file system.
|
||||
origin_in = in_dir
|
||||
fs_config = prop_dict.get("fs_config")
|
||||
if (prop_dict.get("system_root_image") == "true"
|
||||
|
@ -375,8 +375,8 @@ def ImagePropFromGlobalDict(glob_dict, mount_point):
|
|||
copy_prop("system_size", "partition_size")
|
||||
copy_prop("system_journal_size", "journal_size")
|
||||
copy_prop("system_verity_block_device", "verity_block_device")
|
||||
copy_prop("system_root_image","system_root_image")
|
||||
copy_prop("ramdisk_dir","ramdisk_dir")
|
||||
copy_prop("system_root_image", "system_root_image")
|
||||
copy_prop("ramdisk_dir", "ramdisk_dir")
|
||||
elif mount_point == "data":
|
||||
# Copy the generic fs type first, override with specific one if available.
|
||||
copy_prop("fs_type", "fs_type")
|
||||
|
|
|
@ -32,10 +32,7 @@ import zipfile
|
|||
import blockimgdiff
|
||||
import rangelib
|
||||
|
||||
try:
|
||||
from hashlib import sha1 as sha1
|
||||
except ImportError:
|
||||
from sha import sha as sha1
|
||||
from hashlib import sha1 as sha1
|
||||
|
||||
|
||||
class Options(object):
|
||||
|
@ -380,6 +377,10 @@ def BuildBootableImage(sourcedir, fs_config_file, info_dict=None):
|
|||
p.communicate()
|
||||
assert p.returncode == 0, "vboot_signer of %s image failed" % path
|
||||
|
||||
# Clean up the temp files.
|
||||
img_unsigned.close()
|
||||
img_keyblock.close()
|
||||
|
||||
img.seek(os.SEEK_SET, 0)
|
||||
data = img.read()
|
||||
|
||||
|
@ -854,16 +855,50 @@ def ZipWrite(zip_file, filename, arcname=None, perms=0o644,
|
|||
zipfile.ZIP64_LIMIT = saved_zip64_limit
|
||||
|
||||
|
||||
def ZipWriteStr(zip_file, filename, data, perms=0o644, compression=None):
|
||||
# use a fixed timestamp so the output is repeatable.
|
||||
zinfo = zipfile.ZipInfo(filename=filename,
|
||||
date_time=(2009, 1, 1, 0, 0, 0))
|
||||
if compression is None:
|
||||
def ZipWriteStr(zip_file, zinfo_or_arcname, data, perms=0o644,
|
||||
compress_type=None):
|
||||
"""Wrap zipfile.writestr() function to work around the zip64 limit.
|
||||
|
||||
Even with the ZIP64_LIMIT workaround, it won't allow writing a string
|
||||
longer than 2GiB. It gives 'OverflowError: size does not fit in an int'
|
||||
when calling crc32(bytes).
|
||||
|
||||
But it still works fine to write a shorter string into a large zip file.
|
||||
We should use ZipWrite() whenever possible, and only use ZipWriteStr()
|
||||
when we know the string won't be too long.
|
||||
"""
|
||||
|
||||
saved_zip64_limit = zipfile.ZIP64_LIMIT
|
||||
zipfile.ZIP64_LIMIT = (1 << 32) - 1
|
||||
|
||||
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
|
||||
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
|
||||
zinfo.compress_type = zip_file.compression
|
||||
else:
|
||||
zinfo.compress_type = compression
|
||||
zinfo = zinfo_or_arcname
|
||||
|
||||
# If compress_type is given, it overrides the value in zinfo.
|
||||
if compress_type is not None:
|
||||
zinfo.compress_type = compress_type
|
||||
|
||||
# Use a fixed timestamp so the output is repeatable.
|
||||
zinfo.external_attr = perms << 16
|
||||
zinfo.date_time = (2009, 1, 1, 0, 0, 0)
|
||||
|
||||
zip_file.writestr(zinfo, data)
|
||||
zipfile.ZIP64_LIMIT = saved_zip64_limit
|
||||
|
||||
|
||||
def ZipClose(zip_file):
|
||||
# http://b/18015246
|
||||
# zipfile also refers to ZIP64_LIMIT during close() when it writes out the
|
||||
# central directory.
|
||||
saved_zip64_limit = zipfile.ZIP64_LIMIT
|
||||
zipfile.ZIP64_LIMIT = (1 << 32) - 1
|
||||
|
||||
zip_file.close()
|
||||
|
||||
zipfile.ZIP64_LIMIT = saved_zip64_limit
|
||||
|
||||
|
||||
class DeviceSpecificParams(object):
|
||||
|
@ -969,7 +1004,7 @@ class File(object):
|
|||
return t
|
||||
|
||||
def AddToZip(self, z, compression=None):
|
||||
ZipWriteStr(z, self.name, self.data, compression=compression)
|
||||
ZipWriteStr(z, self.name, self.data, compress_type=compression)
|
||||
|
||||
DIFF_PROGRAM_BY_EXT = {
|
||||
".gz" : "imgdiff",
|
||||
|
|
|
@ -43,8 +43,9 @@ OPTIONS = common.OPTIONS
|
|||
|
||||
def CopyInfo(output_zip):
|
||||
"""Copy the android-info.txt file from the input to the output."""
|
||||
output_zip.write(os.path.join(OPTIONS.input_tmp, "OTA", "android-info.txt"),
|
||||
"android-info.txt")
|
||||
common.ZipWrite(
|
||||
output_zip, os.path.join(OPTIONS.input_tmp, "OTA", "android-info.txt"),
|
||||
"android-info.txt")
|
||||
|
||||
|
||||
def main(argv):
|
||||
|
@ -133,13 +134,7 @@ def main(argv):
|
|||
|
||||
finally:
|
||||
print "cleaning up..."
|
||||
# http://b/18015246
|
||||
# See common.py for context. zipfile also refers to ZIP64_LIMIT during
|
||||
# close() when it writes out the central directory.
|
||||
saved_zip64_limit = zipfile.ZIP64_LIMIT
|
||||
zipfile.ZIP64_LIMIT = (1 << 32) - 1
|
||||
output_zip.close()
|
||||
zipfile.ZIP64_LIMIT = saved_zip64_limit
|
||||
common.ZipClose(output_zip)
|
||||
shutil.rmtree(OPTIONS.input_tmp)
|
||||
|
||||
print "done."
|
||||
|
|
|
@ -92,7 +92,6 @@ if sys.hexversion < 0x02070000:
|
|||
print >> sys.stderr, "Python 2.7 or newer is required."
|
||||
sys.exit(1)
|
||||
|
||||
import copy
|
||||
import multiprocessing
|
||||
import os
|
||||
import tempfile
|
||||
|
@ -371,6 +370,7 @@ def CopyPartitionFiles(itemset, input_zip, output_zip=None, substitute=None):
|
|||
symlinks.append((input_zip.read(info.filename),
|
||||
"/" + partition + "/" + basefilename))
|
||||
else:
|
||||
import copy
|
||||
info2 = copy.copy(info)
|
||||
fn = info2.filename = partition + "/" + basefilename
|
||||
if substitute and fn in substitute and substitute[fn] is None:
|
||||
|
@ -380,7 +380,7 @@ def CopyPartitionFiles(itemset, input_zip, output_zip=None, substitute=None):
|
|||
data = substitute[fn]
|
||||
else:
|
||||
data = input_zip.read(info.filename)
|
||||
output_zip.writestr(info2, data)
|
||||
common.ZipWriteStr(output_zip, info2, data)
|
||||
if fn.endswith("/"):
|
||||
itemset.Get(fn[:-1], is_dir=True)
|
||||
else:
|
||||
|
@ -1581,6 +1581,7 @@ def main(argv):
|
|||
OPTIONS.package_key = OPTIONS.info_dict.get(
|
||||
"default_system_dev_certificate",
|
||||
"build/target/product/security/testkey")
|
||||
common.ZipClose(output_zip)
|
||||
break
|
||||
|
||||
else:
|
||||
|
@ -1601,15 +1602,14 @@ def main(argv):
|
|||
common.DumpInfoDict(OPTIONS.source_info_dict)
|
||||
try:
|
||||
WriteIncrementalOTAPackage(input_zip, source_zip, output_zip)
|
||||
common.ZipClose(output_zip)
|
||||
break
|
||||
except ValueError:
|
||||
if not OPTIONS.fallback_to_full:
|
||||
raise
|
||||
print "--- failed to build incremental; falling back to full ---"
|
||||
OPTIONS.incremental_source = None
|
||||
output_zip.close()
|
||||
|
||||
output_zip.close()
|
||||
common.ZipClose(output_zip)
|
||||
|
||||
if not OPTIONS.no_signing:
|
||||
SignOutput(temp_zip_file.name, args[1])
|
||||
|
|
|
@ -196,23 +196,23 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
|
|||
if key not in common.SPECIAL_CERT_STRINGS:
|
||||
print " signing: %-*s (%s)" % (maxsize, name, key)
|
||||
signed_data = SignApk(data, key, key_passwords[key])
|
||||
output_tf_zip.writestr(out_info, signed_data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, signed_data)
|
||||
else:
|
||||
# an APK we're not supposed to sign.
|
||||
print "NOT signing: %s" % (name,)
|
||||
output_tf_zip.writestr(out_info, data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, data)
|
||||
elif info.filename in ("SYSTEM/build.prop",
|
||||
"VENDOR/build.prop",
|
||||
"RECOVERY/RAMDISK/default.prop"):
|
||||
print "rewriting %s:" % (info.filename,)
|
||||
new_data = RewriteProps(data, misc_info)
|
||||
output_tf_zip.writestr(out_info, new_data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, new_data)
|
||||
if info.filename == "RECOVERY/RAMDISK/default.prop":
|
||||
write_to_temp(info.filename, info.external_attr, new_data)
|
||||
elif info.filename.endswith("mac_permissions.xml"):
|
||||
print "rewriting %s with new keys." % (info.filename,)
|
||||
new_data = ReplaceCerts(data)
|
||||
output_tf_zip.writestr(out_info, new_data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, new_data)
|
||||
elif info.filename in ("SYSTEM/recovery-from-boot.p",
|
||||
"SYSTEM/bin/install-recovery.sh"):
|
||||
rebuild_recovery = True
|
||||
|
@ -229,7 +229,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
|
|||
pass
|
||||
else:
|
||||
# a non-APK file; copy it verbatim
|
||||
output_tf_zip.writestr(out_info, data)
|
||||
common.ZipWriteStr(output_tf_zip, out_info, data)
|
||||
|
||||
if OPTIONS.replace_ota_keys:
|
||||
new_recovery_keys = ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info)
|
||||
|
@ -243,7 +243,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
|
|||
"boot.img", "boot.img", tmpdir, "BOOT", info_dict=misc_info)
|
||||
|
||||
def output_sink(fn, data):
|
||||
output_tf_zip.writestr("SYSTEM/"+fn, data)
|
||||
common.ZipWriteStr(output_tf_zip, "SYSTEM/" + fn, data)
|
||||
|
||||
common.MakeRecoveryPatch(tmpdir, output_sink, recovery_img, boot_img,
|
||||
info_dict=misc_info)
|
||||
|
@ -488,8 +488,8 @@ def main(argv):
|
|||
ProcessTargetFiles(input_zip, output_zip, misc_info,
|
||||
apk_key_map, key_passwords)
|
||||
|
||||
input_zip.close()
|
||||
output_zip.close()
|
||||
common.ZipClose(input_zip)
|
||||
common.ZipClose(output_zip)
|
||||
|
||||
add_img_to_target_files.AddImagesToTargetFiles(args[1])
|
||||
|
||||
|
|
|
@ -29,15 +29,54 @@ def random_string_with_holes(size, block_size, step_size):
|
|||
data[begin:end] = os.urandom(block_size)
|
||||
return "".join(data)
|
||||
|
||||
def get_2gb_string():
|
||||
kilobytes = 1024
|
||||
megabytes = 1024 * kilobytes
|
||||
gigabytes = 1024 * megabytes
|
||||
|
||||
size = int(2 * gigabytes + 1)
|
||||
block_size = 4 * kilobytes
|
||||
step_size = 4 * megabytes
|
||||
two_gb_string = random_string_with_holes(
|
||||
size, block_size, step_size)
|
||||
return two_gb_string
|
||||
|
||||
|
||||
class CommonZipTest(unittest.TestCase):
|
||||
def _verify(self, zip_file, zip_file_name, arcname, contents,
|
||||
test_file_name=None, expected_stat=None, expected_mode=0o644,
|
||||
expected_compress_type=zipfile.ZIP_STORED):
|
||||
# Verify the stat if present.
|
||||
if test_file_name is not None:
|
||||
new_stat = os.stat(test_file_name)
|
||||
self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
|
||||
self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
|
||||
|
||||
# Reopen the zip file to verify.
|
||||
zip_file = zipfile.ZipFile(zip_file_name, "r")
|
||||
|
||||
# Verify the timestamp.
|
||||
info = zip_file.getinfo(arcname)
|
||||
self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
|
||||
|
||||
# Verify the file mode.
|
||||
mode = (info.external_attr >> 16) & 0o777
|
||||
self.assertEqual(mode, expected_mode)
|
||||
|
||||
# Verify the compress type.
|
||||
self.assertEqual(info.compress_type, expected_compress_type)
|
||||
|
||||
# Verify the zip contents.
|
||||
self.assertEqual(zip_file.read(arcname), contents)
|
||||
self.assertIsNone(zip_file.testzip())
|
||||
|
||||
def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
|
||||
extra_zipwrite_args = dict(extra_zipwrite_args or {})
|
||||
|
||||
test_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
|
||||
test_file_name = test_file.name
|
||||
|
||||
zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
zip_file_name = zip_file.name
|
||||
|
||||
# File names within an archive strip the leading slash.
|
||||
|
@ -52,31 +91,100 @@ class CommonZipTest(unittest.TestCase):
|
|||
test_file.write(contents)
|
||||
test_file.close()
|
||||
|
||||
old_stat = os.stat(test_file_name)
|
||||
expected_stat = os.stat(test_file_name)
|
||||
expected_mode = extra_zipwrite_args.get("perms", 0o644)
|
||||
|
||||
expected_compress_type = extra_zipwrite_args.get("compress_type",
|
||||
zipfile.ZIP_STORED)
|
||||
time.sleep(5) # Make sure the atime/mtime will change measurably.
|
||||
|
||||
common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
|
||||
common.ZipClose(zip_file)
|
||||
|
||||
new_stat = os.stat(test_file_name)
|
||||
self.assertEqual(int(old_stat.st_mode), int(new_stat.st_mode))
|
||||
self.assertEqual(int(old_stat.st_mtime), int(new_stat.st_mtime))
|
||||
self.assertIsNone(zip_file.testzip())
|
||||
|
||||
zip_file.close()
|
||||
zip_file = zipfile.ZipFile(zip_file_name, "r")
|
||||
info = zip_file.getinfo(arcname)
|
||||
|
||||
self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
|
||||
mode = (info.external_attr >> 16) & 0o777
|
||||
self.assertEqual(mode, expected_mode)
|
||||
self.assertEqual(zip_file.read(arcname), contents)
|
||||
self.assertIsNone(zip_file.testzip())
|
||||
self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
|
||||
expected_stat, expected_mode, expected_compress_type)
|
||||
finally:
|
||||
os.remove(test_file_name)
|
||||
os.remove(zip_file_name)
|
||||
|
||||
def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
|
||||
extra_args = dict(extra_args or {})
|
||||
|
||||
zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
zip_file_name = zip_file.name
|
||||
zip_file.close()
|
||||
|
||||
zip_file = zipfile.ZipFile(zip_file_name, "w")
|
||||
|
||||
try:
|
||||
expected_compress_type = extra_args.get("compress_type",
|
||||
zipfile.ZIP_STORED)
|
||||
time.sleep(5) # Make sure the atime/mtime will change measurably.
|
||||
|
||||
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
|
||||
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
|
||||
else:
|
||||
zinfo = zinfo_or_arcname
|
||||
arcname = zinfo.filename
|
||||
|
||||
common.ZipWriteStr(zip_file, zinfo, contents, **extra_args)
|
||||
common.ZipClose(zip_file)
|
||||
|
||||
self._verify(zip_file, zip_file_name, arcname, contents,
|
||||
expected_compress_type=expected_compress_type)
|
||||
finally:
|
||||
os.remove(zip_file_name)
|
||||
|
||||
def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
|
||||
extra_args = dict(extra_args or {})
|
||||
|
||||
zip_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
zip_file_name = zip_file.name
|
||||
|
||||
test_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
test_file_name = test_file.name
|
||||
|
||||
arcname_large = test_file_name
|
||||
arcname_small = "bar"
|
||||
|
||||
# File names within an archive strip the leading slash.
|
||||
if arcname_large[0] == "/":
|
||||
arcname_large = arcname_large[1:]
|
||||
|
||||
zip_file.close()
|
||||
zip_file = zipfile.ZipFile(zip_file_name, "w")
|
||||
|
||||
try:
|
||||
test_file.write(large)
|
||||
test_file.close()
|
||||
|
||||
expected_stat = os.stat(test_file_name)
|
||||
expected_mode = 0o644
|
||||
expected_compress_type = extra_args.get("compress_type",
|
||||
zipfile.ZIP_STORED)
|
||||
time.sleep(5) # Make sure the atime/mtime will change measurably.
|
||||
|
||||
common.ZipWrite(zip_file, test_file_name, **extra_args)
|
||||
common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
|
||||
common.ZipClose(zip_file)
|
||||
|
||||
# Verify the contents written by ZipWrite().
|
||||
self._verify(zip_file, zip_file_name, arcname_large, large,
|
||||
test_file_name, expected_stat, expected_mode,
|
||||
expected_compress_type)
|
||||
|
||||
# Verify the contents written by ZipWriteStr().
|
||||
self._verify(zip_file, zip_file_name, arcname_small, small,
|
||||
expected_compress_type=expected_compress_type)
|
||||
finally:
|
||||
os.remove(zip_file_name)
|
||||
os.remove(test_file_name)
|
||||
|
||||
def _test_reset_ZIP64_LIMIT(self, func, *args):
|
||||
default_limit = (1 << 31) - 1
|
||||
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
|
||||
func(*args)
|
||||
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
|
||||
|
||||
def test_ZipWrite(self):
|
||||
file_contents = os.urandom(1024)
|
||||
self._test_ZipWrite(file_contents)
|
||||
|
@ -88,23 +196,64 @@ class CommonZipTest(unittest.TestCase):
|
|||
"perms": 0o777,
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
self._test_ZipWrite(file_contents, {
|
||||
"arcname": "foobar",
|
||||
"perms": 0o700,
|
||||
"compress_type": zipfile.ZIP_STORED,
|
||||
})
|
||||
|
||||
def test_ZipWrite_large_file(self):
|
||||
kilobytes = 1024
|
||||
megabytes = 1024 * kilobytes
|
||||
gigabytes = 1024 * megabytes
|
||||
|
||||
size = int(2 * gigabytes + 1)
|
||||
block_size = 4 * kilobytes
|
||||
step_size = 4 * megabytes
|
||||
file_contents = random_string_with_holes(
|
||||
size, block_size, step_size)
|
||||
file_contents = get_2gb_string()
|
||||
self._test_ZipWrite(file_contents, {
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
|
||||
def test_ZipWrite_resets_ZIP64_LIMIT(self):
|
||||
default_limit = (1 << 31) - 1
|
||||
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
|
||||
self._test_ZipWrite('')
|
||||
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
|
||||
self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
|
||||
|
||||
def test_ZipWriteStr(self):
|
||||
random_string = os.urandom(1024)
|
||||
# Passing arcname
|
||||
self._test_ZipWriteStr("foo", random_string)
|
||||
|
||||
# Passing zinfo
|
||||
zinfo = zipfile.ZipInfo(filename="foo")
|
||||
self._test_ZipWriteStr(zinfo, random_string)
|
||||
|
||||
# Timestamp in the zinfo should be overwritten.
|
||||
zinfo.date_time = (2015, 3, 1, 15, 30, 0)
|
||||
self._test_ZipWriteStr(zinfo, random_string)
|
||||
|
||||
def test_ZipWriteStr_with_opts(self):
|
||||
random_string = os.urandom(1024)
|
||||
# Passing arcname
|
||||
self._test_ZipWriteStr("foo", random_string, {
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
self._test_ZipWriteStr("foo", random_string, {
|
||||
"compress_type": zipfile.ZIP_STORED,
|
||||
})
|
||||
|
||||
# Passing zinfo
|
||||
zinfo = zipfile.ZipInfo(filename="foo")
|
||||
self._test_ZipWriteStr(zinfo, random_string, {
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
self._test_ZipWriteStr(zinfo, random_string, {
|
||||
"compress_type": zipfile.ZIP_STORED,
|
||||
})
|
||||
|
||||
def test_ZipWriteStr_large_file(self):
|
||||
# zipfile.writestr() doesn't work when the str size is over 2GiB even with
|
||||
# the workaround. We will only test the case of writing a string into a
|
||||
# large archive.
|
||||
long_string = get_2gb_string()
|
||||
short_string = os.urandom(1024)
|
||||
self._test_ZipWriteStr_large_file(long_string, short_string, {
|
||||
"compress_type": zipfile.ZIP_DEFLATED,
|
||||
})
|
||||
|
||||
def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
|
||||
self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
|
||||
zinfo = zipfile.ZipInfo(filename="foo")
|
||||
self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")
|
||||
|
|
Loading…
Reference in New Issue