Wrap zipfile.write(), writestr() and close()

In order to work around the zip 2GiB limit, we need to wrap the related
functions in zipfile. Calls to those functions should always be replaced
with calls to the wrappers instead.

Bug: 18015246
Change-Id: I499574cee51ec4804bc10cbefe0b17940afed918
(cherry picked from commit 2ed665a033)
This commit is contained in:
Tao Bao 2015-04-01 11:21:55 -07:00
parent 821a554c1b
commit f3282b4a7f
7 changed files with 251 additions and 76 deletions

View File

@ -33,10 +33,6 @@ import os
import tempfile import tempfile
import zipfile import zipfile
# missing in Python 2.4 and before
if not hasattr(os, "SEEK_SET"):
os.SEEK_SET = 0
import build_image import build_image
import common import common
@ -189,7 +185,7 @@ def AddUserdata(output_zip, prefix="IMAGES/"):
assert succ, "build userdata.img image failed" assert succ, "build userdata.img image failed"
common.CheckSize(img.name, "userdata.img", OPTIONS.info_dict) common.CheckSize(img.name, "userdata.img", OPTIONS.info_dict)
output_zip.write(img.name, prefix + "userdata.img") common.ZipWrite(output_zip, img.name, prefix + "userdata.img")
img.close() img.close()
os.rmdir(user_dir) os.rmdir(user_dir)
os.rmdir(temp_dir) os.rmdir(temp_dir)
@ -226,7 +222,7 @@ def AddCache(output_zip, prefix="IMAGES/"):
assert succ, "build cache.img image failed" assert succ, "build cache.img image failed"
common.CheckSize(img.name, "cache.img", OPTIONS.info_dict) common.CheckSize(img.name, "cache.img", OPTIONS.info_dict)
output_zip.write(img.name, prefix + "cache.img") common.ZipWrite(output_zip, img.name, prefix + "cache.img")
img.close() img.close()
os.rmdir(user_dir) os.rmdir(user_dir)
os.rmdir(temp_dir) os.rmdir(temp_dir)
@ -252,7 +248,7 @@ def AddImagesToTargetFiles(filename):
OPTIONS.info_dict["selinux_fc"] = os.path.join( OPTIONS.info_dict["selinux_fc"] = os.path.join(
OPTIONS.input_tmp, "BOOT", "RAMDISK", "file_contexts") OPTIONS.input_tmp, "BOOT", "RAMDISK", "file_contexts")
input_zip.close() common.ZipClose(input_zip)
output_zip = zipfile.ZipFile(filename, "a", output_zip = zipfile.ZipFile(filename, "a",
compression=zipfile.ZIP_DEFLATED) compression=zipfile.ZIP_DEFLATED)
@ -297,7 +293,7 @@ def AddImagesToTargetFiles(filename):
banner("cache") banner("cache")
AddCache(output_zip) AddCache(output_zip)
output_zip.close() common.ZipClose(output_zip)
def main(argv): def main(argv):
def option_handler(o, _): def option_handler(o, _):

View File

@ -205,8 +205,8 @@ def BuildImage(in_dir, prop_dict, out_file):
Returns: Returns:
True iff the image is built successfully. True iff the image is built successfully.
""" """
# system_root_image=true: build a system.img that combines the contents of /system # system_root_image=true: build a system.img that combines the contents of
# and the ramdisk, and can be mounted at the root of the file system. # /system and the ramdisk, and can be mounted at the root of the file system.
origin_in = in_dir origin_in = in_dir
fs_config = prop_dict.get("fs_config") fs_config = prop_dict.get("fs_config")
if (prop_dict.get("system_root_image") == "true" if (prop_dict.get("system_root_image") == "true"
@ -375,8 +375,8 @@ def ImagePropFromGlobalDict(glob_dict, mount_point):
copy_prop("system_size", "partition_size") copy_prop("system_size", "partition_size")
copy_prop("system_journal_size", "journal_size") copy_prop("system_journal_size", "journal_size")
copy_prop("system_verity_block_device", "verity_block_device") copy_prop("system_verity_block_device", "verity_block_device")
copy_prop("system_root_image","system_root_image") copy_prop("system_root_image", "system_root_image")
copy_prop("ramdisk_dir","ramdisk_dir") copy_prop("ramdisk_dir", "ramdisk_dir")
elif mount_point == "data": elif mount_point == "data":
# Copy the generic fs type first, override with specific one if available. # Copy the generic fs type first, override with specific one if available.
copy_prop("fs_type", "fs_type") copy_prop("fs_type", "fs_type")

View File

@ -32,10 +32,7 @@ import zipfile
import blockimgdiff import blockimgdiff
import rangelib import rangelib
try: from hashlib import sha1 as sha1
from hashlib import sha1 as sha1
except ImportError:
from sha import sha as sha1
class Options(object): class Options(object):
@ -384,6 +381,10 @@ def BuildBootableImage(sourcedir, fs_config_file, info_dict=None):
p.communicate() p.communicate()
assert p.returncode == 0, "vboot_signer of %s image failed" % path assert p.returncode == 0, "vboot_signer of %s image failed" % path
# Clean up the temp files.
img_unsigned.close()
img_keyblock.close()
img.seek(os.SEEK_SET, 0) img.seek(os.SEEK_SET, 0)
data = img.read() data = img.read()
@ -861,16 +862,50 @@ def ZipWrite(zip_file, filename, arcname=None, perms=0o644,
zipfile.ZIP64_LIMIT = saved_zip64_limit zipfile.ZIP64_LIMIT = saved_zip64_limit
def ZipWriteStr(zip_file, filename, data, perms=0o644, compression=None): def ZipWriteStr(zip_file, zinfo_or_arcname, data, perms=0o644,
# use a fixed timestamp so the output is repeatable. compress_type=None):
zinfo = zipfile.ZipInfo(filename=filename, """Wrap zipfile.writestr() function to work around the zip64 limit.
date_time=(2009, 1, 1, 0, 0, 0))
if compression is None: Even with the ZIP64_LIMIT workaround, it won't allow writing a string
longer than 2GiB. It gives 'OverflowError: size does not fit in an int'
when calling crc32(bytes).
But it still works fine to write a shorter string into a large zip file.
We should use ZipWrite() whenever possible, and only use ZipWriteStr()
when we know the string won't be too long.
"""
saved_zip64_limit = zipfile.ZIP64_LIMIT
zipfile.ZIP64_LIMIT = (1 << 32) - 1
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
zinfo.compress_type = zip_file.compression zinfo.compress_type = zip_file.compression
else: else:
zinfo.compress_type = compression zinfo = zinfo_or_arcname
# If compress_type is given, it overrides the value in zinfo.
if compress_type is not None:
zinfo.compress_type = compress_type
# Use a fixed timestamp so the output is repeatable.
zinfo.external_attr = perms << 16 zinfo.external_attr = perms << 16
zinfo.date_time = (2009, 1, 1, 0, 0, 0)
zip_file.writestr(zinfo, data) zip_file.writestr(zinfo, data)
zipfile.ZIP64_LIMIT = saved_zip64_limit
def ZipClose(zip_file):
# http://b/18015246
# zipfile also refers to ZIP64_LIMIT during close() when it writes out the
# central directory.
saved_zip64_limit = zipfile.ZIP64_LIMIT
zipfile.ZIP64_LIMIT = (1 << 32) - 1
zip_file.close()
zipfile.ZIP64_LIMIT = saved_zip64_limit
class DeviceSpecificParams(object): class DeviceSpecificParams(object):
@ -976,7 +1011,7 @@ class File(object):
return t return t
def AddToZip(self, z, compression=None): def AddToZip(self, z, compression=None):
ZipWriteStr(z, self.name, self.data, compression=compression) ZipWriteStr(z, self.name, self.data, compress_type=compression)
DIFF_PROGRAM_BY_EXT = { DIFF_PROGRAM_BY_EXT = {
".gz" : "imgdiff", ".gz" : "imgdiff",

View File

@ -43,7 +43,8 @@ OPTIONS = common.OPTIONS
def CopyInfo(output_zip): def CopyInfo(output_zip):
"""Copy the android-info.txt file from the input to the output.""" """Copy the android-info.txt file from the input to the output."""
output_zip.write(os.path.join(OPTIONS.input_tmp, "OTA", "android-info.txt"), common.ZipWrite(
output_zip, os.path.join(OPTIONS.input_tmp, "OTA", "android-info.txt"),
"android-info.txt") "android-info.txt")
@ -133,13 +134,7 @@ def main(argv):
finally: finally:
print "cleaning up..." print "cleaning up..."
# http://b/18015246 common.ZipClose(output_zip)
# See common.py for context. zipfile also refers to ZIP64_LIMIT during
# close() when it writes out the central directory.
saved_zip64_limit = zipfile.ZIP64_LIMIT
zipfile.ZIP64_LIMIT = (1 << 32) - 1
output_zip.close()
zipfile.ZIP64_LIMIT = saved_zip64_limit
shutil.rmtree(OPTIONS.input_tmp) shutil.rmtree(OPTIONS.input_tmp)
print "done." print "done."

View File

@ -92,7 +92,6 @@ if sys.hexversion < 0x02070000:
print >> sys.stderr, "Python 2.7 or newer is required." print >> sys.stderr, "Python 2.7 or newer is required."
sys.exit(1) sys.exit(1)
import copy
import multiprocessing import multiprocessing
import os import os
import tempfile import tempfile
@ -371,6 +370,7 @@ def CopyPartitionFiles(itemset, input_zip, output_zip=None, substitute=None):
symlinks.append((input_zip.read(info.filename), symlinks.append((input_zip.read(info.filename),
"/" + partition + "/" + basefilename)) "/" + partition + "/" + basefilename))
else: else:
import copy
info2 = copy.copy(info) info2 = copy.copy(info)
fn = info2.filename = partition + "/" + basefilename fn = info2.filename = partition + "/" + basefilename
if substitute and fn in substitute and substitute[fn] is None: if substitute and fn in substitute and substitute[fn] is None:
@ -380,7 +380,7 @@ def CopyPartitionFiles(itemset, input_zip, output_zip=None, substitute=None):
data = substitute[fn] data = substitute[fn]
else: else:
data = input_zip.read(info.filename) data = input_zip.read(info.filename)
output_zip.writestr(info2, data) common.ZipWriteStr(output_zip, info2, data)
if fn.endswith("/"): if fn.endswith("/"):
itemset.Get(fn[:-1], is_dir=True) itemset.Get(fn[:-1], is_dir=True)
else: else:
@ -1581,6 +1581,7 @@ def main(argv):
OPTIONS.package_key = OPTIONS.info_dict.get( OPTIONS.package_key = OPTIONS.info_dict.get(
"default_system_dev_certificate", "default_system_dev_certificate",
"build/target/product/security/testkey") "build/target/product/security/testkey")
common.ZipClose(output_zip)
break break
else: else:
@ -1601,15 +1602,14 @@ def main(argv):
common.DumpInfoDict(OPTIONS.source_info_dict) common.DumpInfoDict(OPTIONS.source_info_dict)
try: try:
WriteIncrementalOTAPackage(input_zip, source_zip, output_zip) WriteIncrementalOTAPackage(input_zip, source_zip, output_zip)
common.ZipClose(output_zip)
break break
except ValueError: except ValueError:
if not OPTIONS.fallback_to_full: if not OPTIONS.fallback_to_full:
raise raise
print "--- failed to build incremental; falling back to full ---" print "--- failed to build incremental; falling back to full ---"
OPTIONS.incremental_source = None OPTIONS.incremental_source = None
output_zip.close() common.ZipClose(output_zip)
output_zip.close()
if not OPTIONS.no_signing: if not OPTIONS.no_signing:
SignOutput(temp_zip_file.name, args[1]) SignOutput(temp_zip_file.name, args[1])

View File

@ -196,23 +196,23 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
if key not in common.SPECIAL_CERT_STRINGS: if key not in common.SPECIAL_CERT_STRINGS:
print " signing: %-*s (%s)" % (maxsize, name, key) print " signing: %-*s (%s)" % (maxsize, name, key)
signed_data = SignApk(data, key, key_passwords[key]) signed_data = SignApk(data, key, key_passwords[key])
output_tf_zip.writestr(out_info, signed_data) common.ZipWriteStr(output_tf_zip, out_info, signed_data)
else: else:
# an APK we're not supposed to sign. # an APK we're not supposed to sign.
print "NOT signing: %s" % (name,) print "NOT signing: %s" % (name,)
output_tf_zip.writestr(out_info, data) common.ZipWriteStr(output_tf_zip, out_info, data)
elif info.filename in ("SYSTEM/build.prop", elif info.filename in ("SYSTEM/build.prop",
"VENDOR/build.prop", "VENDOR/build.prop",
"RECOVERY/RAMDISK/default.prop"): "RECOVERY/RAMDISK/default.prop"):
print "rewriting %s:" % (info.filename,) print "rewriting %s:" % (info.filename,)
new_data = RewriteProps(data, misc_info) new_data = RewriteProps(data, misc_info)
output_tf_zip.writestr(out_info, new_data) common.ZipWriteStr(output_tf_zip, out_info, new_data)
if info.filename == "RECOVERY/RAMDISK/default.prop": if info.filename == "RECOVERY/RAMDISK/default.prop":
write_to_temp(info.filename, info.external_attr, new_data) write_to_temp(info.filename, info.external_attr, new_data)
elif info.filename.endswith("mac_permissions.xml"): elif info.filename.endswith("mac_permissions.xml"):
print "rewriting %s with new keys." % (info.filename,) print "rewriting %s with new keys." % (info.filename,)
new_data = ReplaceCerts(data) new_data = ReplaceCerts(data)
output_tf_zip.writestr(out_info, new_data) common.ZipWriteStr(output_tf_zip, out_info, new_data)
elif info.filename in ("SYSTEM/recovery-from-boot.p", elif info.filename in ("SYSTEM/recovery-from-boot.p",
"SYSTEM/bin/install-recovery.sh"): "SYSTEM/bin/install-recovery.sh"):
rebuild_recovery = True rebuild_recovery = True
@ -229,7 +229,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
pass pass
else: else:
# a non-APK file; copy it verbatim # a non-APK file; copy it verbatim
output_tf_zip.writestr(out_info, data) common.ZipWriteStr(output_tf_zip, out_info, data)
if OPTIONS.replace_ota_keys: if OPTIONS.replace_ota_keys:
new_recovery_keys = ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info) new_recovery_keys = ReplaceOtaKeys(input_tf_zip, output_tf_zip, misc_info)
@ -243,7 +243,7 @@ def ProcessTargetFiles(input_tf_zip, output_tf_zip, misc_info,
"boot.img", "boot.img", tmpdir, "BOOT", info_dict=misc_info) "boot.img", "boot.img", tmpdir, "BOOT", info_dict=misc_info)
def output_sink(fn, data): def output_sink(fn, data):
output_tf_zip.writestr("SYSTEM/"+fn, data) common.ZipWriteStr(output_tf_zip, "SYSTEM/" + fn, data)
common.MakeRecoveryPatch(tmpdir, output_sink, recovery_img, boot_img, common.MakeRecoveryPatch(tmpdir, output_sink, recovery_img, boot_img,
info_dict=misc_info) info_dict=misc_info)
@ -488,8 +488,8 @@ def main(argv):
ProcessTargetFiles(input_zip, output_zip, misc_info, ProcessTargetFiles(input_zip, output_zip, misc_info,
apk_key_map, key_passwords) apk_key_map, key_passwords)
input_zip.close() common.ZipClose(input_zip)
output_zip.close() common.ZipClose(output_zip)
add_img_to_target_files.AddImagesToTargetFiles(args[1]) add_img_to_target_files.AddImagesToTargetFiles(args[1])

View File

@ -29,15 +29,54 @@ def random_string_with_holes(size, block_size, step_size):
data[begin:end] = os.urandom(block_size) data[begin:end] = os.urandom(block_size)
return "".join(data) return "".join(data)
def get_2gb_string():
kilobytes = 1024
megabytes = 1024 * kilobytes
gigabytes = 1024 * megabytes
size = int(2 * gigabytes + 1)
block_size = 4 * kilobytes
step_size = 4 * megabytes
two_gb_string = random_string_with_holes(
size, block_size, step_size)
return two_gb_string
class CommonZipTest(unittest.TestCase): class CommonZipTest(unittest.TestCase):
def _verify(self, zip_file, zip_file_name, arcname, contents,
test_file_name=None, expected_stat=None, expected_mode=0o644,
expected_compress_type=zipfile.ZIP_STORED):
# Verify the stat if present.
if test_file_name is not None:
new_stat = os.stat(test_file_name)
self.assertEqual(int(expected_stat.st_mode), int(new_stat.st_mode))
self.assertEqual(int(expected_stat.st_mtime), int(new_stat.st_mtime))
# Reopen the zip file to verify.
zip_file = zipfile.ZipFile(zip_file_name, "r")
# Verify the timestamp.
info = zip_file.getinfo(arcname)
self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
# Verify the file mode.
mode = (info.external_attr >> 16) & 0o777
self.assertEqual(mode, expected_mode)
# Verify the compress type.
self.assertEqual(info.compress_type, expected_compress_type)
# Verify the zip contents.
self.assertEqual(zip_file.read(arcname), contents)
self.assertIsNone(zip_file.testzip())
def _test_ZipWrite(self, contents, extra_zipwrite_args=None): def _test_ZipWrite(self, contents, extra_zipwrite_args=None):
extra_zipwrite_args = dict(extra_zipwrite_args or {}) extra_zipwrite_args = dict(extra_zipwrite_args or {})
test_file = tempfile.NamedTemporaryFile(delete=False) test_file = tempfile.NamedTemporaryFile(delete=False)
zip_file = tempfile.NamedTemporaryFile(delete=False)
test_file_name = test_file.name test_file_name = test_file.name
zip_file = tempfile.NamedTemporaryFile(delete=False)
zip_file_name = zip_file.name zip_file_name = zip_file.name
# File names within an archive strip the leading slash. # File names within an archive strip the leading slash.
@ -52,31 +91,100 @@ class CommonZipTest(unittest.TestCase):
test_file.write(contents) test_file.write(contents)
test_file.close() test_file.close()
old_stat = os.stat(test_file_name) expected_stat = os.stat(test_file_name)
expected_mode = extra_zipwrite_args.get("perms", 0o644) expected_mode = extra_zipwrite_args.get("perms", 0o644)
expected_compress_type = extra_zipwrite_args.get("compress_type",
zipfile.ZIP_STORED)
time.sleep(5) # Make sure the atime/mtime will change measurably. time.sleep(5) # Make sure the atime/mtime will change measurably.
common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args) common.ZipWrite(zip_file, test_file_name, **extra_zipwrite_args)
common.ZipClose(zip_file)
new_stat = os.stat(test_file_name) self._verify(zip_file, zip_file_name, arcname, contents, test_file_name,
self.assertEqual(int(old_stat.st_mode), int(new_stat.st_mode)) expected_stat, expected_mode, expected_compress_type)
self.assertEqual(int(old_stat.st_mtime), int(new_stat.st_mtime))
self.assertIsNone(zip_file.testzip())
zip_file.close()
zip_file = zipfile.ZipFile(zip_file_name, "r")
info = zip_file.getinfo(arcname)
self.assertEqual(info.date_time, (2009, 1, 1, 0, 0, 0))
mode = (info.external_attr >> 16) & 0o777
self.assertEqual(mode, expected_mode)
self.assertEqual(zip_file.read(arcname), contents)
self.assertIsNone(zip_file.testzip())
finally: finally:
os.remove(test_file_name) os.remove(test_file_name)
os.remove(zip_file_name) os.remove(zip_file_name)
def _test_ZipWriteStr(self, zinfo_or_arcname, contents, extra_args=None):
extra_args = dict(extra_args or {})
zip_file = tempfile.NamedTemporaryFile(delete=False)
zip_file_name = zip_file.name
zip_file.close()
zip_file = zipfile.ZipFile(zip_file_name, "w")
try:
expected_compress_type = extra_args.get("compress_type",
zipfile.ZIP_STORED)
time.sleep(5) # Make sure the atime/mtime will change measurably.
if not isinstance(zinfo_or_arcname, zipfile.ZipInfo):
zinfo = zipfile.ZipInfo(filename=zinfo_or_arcname)
else:
zinfo = zinfo_or_arcname
arcname = zinfo.filename
common.ZipWriteStr(zip_file, zinfo, contents, **extra_args)
common.ZipClose(zip_file)
self._verify(zip_file, zip_file_name, arcname, contents,
expected_compress_type=expected_compress_type)
finally:
os.remove(zip_file_name)
def _test_ZipWriteStr_large_file(self, large, small, extra_args=None):
extra_args = dict(extra_args or {})
zip_file = tempfile.NamedTemporaryFile(delete=False)
zip_file_name = zip_file.name
test_file = tempfile.NamedTemporaryFile(delete=False)
test_file_name = test_file.name
arcname_large = test_file_name
arcname_small = "bar"
# File names within an archive strip the leading slash.
if arcname_large[0] == "/":
arcname_large = arcname_large[1:]
zip_file.close()
zip_file = zipfile.ZipFile(zip_file_name, "w")
try:
test_file.write(large)
test_file.close()
expected_stat = os.stat(test_file_name)
expected_mode = 0o644
expected_compress_type = extra_args.get("compress_type",
zipfile.ZIP_STORED)
time.sleep(5) # Make sure the atime/mtime will change measurably.
common.ZipWrite(zip_file, test_file_name, **extra_args)
common.ZipWriteStr(zip_file, arcname_small, small, **extra_args)
common.ZipClose(zip_file)
# Verify the contents written by ZipWrite().
self._verify(zip_file, zip_file_name, arcname_large, large,
test_file_name, expected_stat, expected_mode,
expected_compress_type)
# Verify the contents written by ZipWriteStr().
self._verify(zip_file, zip_file_name, arcname_small, small,
expected_compress_type=expected_compress_type)
finally:
os.remove(zip_file_name)
os.remove(test_file_name)
def _test_reset_ZIP64_LIMIT(self, func, *args):
default_limit = (1 << 31) - 1
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
func(*args)
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
def test_ZipWrite(self): def test_ZipWrite(self):
file_contents = os.urandom(1024) file_contents = os.urandom(1024)
self._test_ZipWrite(file_contents) self._test_ZipWrite(file_contents)
@ -88,23 +196,64 @@ class CommonZipTest(unittest.TestCase):
"perms": 0o777, "perms": 0o777,
"compress_type": zipfile.ZIP_DEFLATED, "compress_type": zipfile.ZIP_DEFLATED,
}) })
self._test_ZipWrite(file_contents, {
"arcname": "foobar",
"perms": 0o700,
"compress_type": zipfile.ZIP_STORED,
})
def test_ZipWrite_large_file(self): def test_ZipWrite_large_file(self):
kilobytes = 1024 file_contents = get_2gb_string()
megabytes = 1024 * kilobytes
gigabytes = 1024 * megabytes
size = int(2 * gigabytes + 1)
block_size = 4 * kilobytes
step_size = 4 * megabytes
file_contents = random_string_with_holes(
size, block_size, step_size)
self._test_ZipWrite(file_contents, { self._test_ZipWrite(file_contents, {
"compress_type": zipfile.ZIP_DEFLATED, "compress_type": zipfile.ZIP_DEFLATED,
}) })
def test_ZipWrite_resets_ZIP64_LIMIT(self): def test_ZipWrite_resets_ZIP64_LIMIT(self):
default_limit = (1 << 31) - 1 self._test_reset_ZIP64_LIMIT(self._test_ZipWrite, "")
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT)
self._test_ZipWrite('') def test_ZipWriteStr(self):
self.assertEqual(default_limit, zipfile.ZIP64_LIMIT) random_string = os.urandom(1024)
# Passing arcname
self._test_ZipWriteStr("foo", random_string)
# Passing zinfo
zinfo = zipfile.ZipInfo(filename="foo")
self._test_ZipWriteStr(zinfo, random_string)
# Timestamp in the zinfo should be overwritten.
zinfo.date_time = (2015, 3, 1, 15, 30, 0)
self._test_ZipWriteStr(zinfo, random_string)
def test_ZipWriteStr_with_opts(self):
random_string = os.urandom(1024)
# Passing arcname
self._test_ZipWriteStr("foo", random_string, {
"compress_type": zipfile.ZIP_DEFLATED,
})
self._test_ZipWriteStr("foo", random_string, {
"compress_type": zipfile.ZIP_STORED,
})
# Passing zinfo
zinfo = zipfile.ZipInfo(filename="foo")
self._test_ZipWriteStr(zinfo, random_string, {
"compress_type": zipfile.ZIP_DEFLATED,
})
self._test_ZipWriteStr(zinfo, random_string, {
"compress_type": zipfile.ZIP_STORED,
})
def test_ZipWriteStr_large_file(self):
# zipfile.writestr() doesn't work when the str size is over 2GiB even with
# the workaround. We will only test the case of writing a string into a
# large archive.
long_string = get_2gb_string()
short_string = os.urandom(1024)
self._test_ZipWriteStr_large_file(long_string, short_string, {
"compress_type": zipfile.ZIP_DEFLATED,
})
def test_ZipWriteStr_resets_ZIP64_LIMIT(self):
self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, "foo", "")
zinfo = zipfile.ZipInfo(filename="foo")
self._test_reset_ZIP64_LIMIT(self._test_ZipWriteStr, zinfo, "")