From 40b1882f402cad8f992b526d42a9058ccbb71f78 Mon Sep 17 00:00:00 2001 From: Tao Bao Date: Tue, 30 Jan 2018 18:19:04 -0800 Subject: [PATCH] releasetools: Add Payload class. This breaks down the current WriteABOTAPackageWithBrilloScript() into smaller and testable units, which also prepares for the work in b/35724498. Bug: 35724498 Test: python -m unittest test_ota_from_target_files Test: Get identical A/B OTA packages w/ and w/o the CL. Change-Id: I2ea45ce98e2d2baa58e94fb829b7242f6fe685a7 (cherry picked from commit 036d72181280cd6589b056c0c4adb9d2eb80228b) --- tools/releasetools/ota_from_target_files.py | 198 +++++++++++------- .../test_ota_from_target_files.py | 158 +++++++++++++- 2 files changed, 285 insertions(+), 71 deletions(-) diff --git a/tools/releasetools/ota_from_target_files.py b/tools/releasetools/ota_from_target_files.py index 2b64f7f9b..edc20f6b3 100755 --- a/tools/releasetools/ota_from_target_files.py +++ b/tools/releasetools/ota_from_target_files.py @@ -360,6 +360,122 @@ class PayloadSigner(object): return out_file +class Payload(object): + """Manages the creation and the signing of an A/B OTA Payload.""" + + PAYLOAD_BIN = 'payload.bin' + PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt' + + def __init__(self): + # The place where the output from the subprocess should go. + self._log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE + self.payload_file = None + self.payload_properties = None + + def Generate(self, target_file, source_file=None, additional_args=None): + """Generates a payload from the given target-files zip(s). + + Args: + target_file: The filename of the target build target-files zip. + source_file: The filename of the source build target-files zip; or None if + generating a full OTA. + additional_args: A list of additional args that should be passed to + brillo_update_payload script; or None. + """ + if additional_args is None: + additional_args = [] + + payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin") + cmd = ["brillo_update_payload", "generate", + "--payload", payload_file, + "--target_image", target_file] + if source_file is not None: + cmd.extend(["--source_image", source_file]) + cmd.extend(additional_args) + p = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) + stdoutdata, _ = p.communicate() + assert p.returncode == 0, \ + "brillo_update_payload generate failed: {}".format(stdoutdata) + + self.payload_file = payload_file + self.payload_properties = None + + def Sign(self, payload_signer): + """Generates and signs the hashes of the payload and metadata. + + Args: + payload_signer: A PayloadSigner() instance that serves the signing work. + + Raises: + AssertionError: On any failure when calling brillo_update_payload script. + """ + assert isinstance(payload_signer, PayloadSigner) + + # 1. Generate hashes of the payload and metadata files. + payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") + metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") + cmd = ["brillo_update_payload", "hash", + "--unsigned_payload", self.payload_file, + "--signature_size", "256", + "--metadata_hash_file", metadata_sig_file, + "--payload_hash_file", payload_sig_file] + p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) + p1.communicate() + assert p1.returncode == 0, "brillo_update_payload hash failed" + + # 2. Sign the hashes. + signed_payload_sig_file = payload_signer.Sign(payload_sig_file) + signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file) + + # 3. Insert the signatures back into the payload file. + signed_payload_file = common.MakeTempFile(prefix="signed-payload-", + suffix=".bin") + cmd = ["brillo_update_payload", "sign", + "--unsigned_payload", self.payload_file, + "--payload", signed_payload_file, + "--signature_size", "256", + "--metadata_signature_file", signed_metadata_sig_file, + "--payload_signature_file", signed_payload_sig_file] + p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) + p1.communicate() + assert p1.returncode == 0, "brillo_update_payload sign failed" + + # 4. Dump the signed payload properties. + properties_file = common.MakeTempFile(prefix="payload-properties-", + suffix=".txt") + cmd = ["brillo_update_payload", "properties", + "--payload", signed_payload_file, + "--properties_file", properties_file] + p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT) + p1.communicate() + assert p1.returncode == 0, "brillo_update_payload properties failed" + + if OPTIONS.wipe_user_data: + with open(properties_file, "a") as f: + f.write("POWERWASH=1\n") + + self.payload_file = signed_payload_file + self.payload_properties = properties_file + + def WriteToZip(self, output_zip): + """Writes the payload to the given zip. + + Args: + output_zip: The output ZipFile instance. + """ + assert self.payload_file is not None + assert self.payload_properties is not None + + # Add the signed payload file and properties into the zip. In order to + # support streaming, we pack them as ZIP_STORED. So these entries can be + # read directly with the offset and length pairs. + common.ZipWrite(output_zip, self.payload_file, arcname=Payload.PAYLOAD_BIN, + compress_type=zipfile.ZIP_STORED) + common.ZipWrite(output_zip, self.payload_properties, + arcname=Payload.PAYLOAD_PROPERTIES_TXT, + compress_type=zipfile.ZIP_STORED) + + def SignOutput(temp_zip_name, output_zip_name): pw = OPTIONS.key_passwords[OPTIONS.package_key] @@ -1122,12 +1238,6 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, value += ' ' * (expected_length - len(value)) return value - # The place where the output from the subprocess should go. - log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE - - # Get the PayloadSigner to be used in step 3. - payload_signer = PayloadSigner() - # Stage the output zip package for package signing. temp_zip_file = tempfile.NamedTemporaryFile() output_zip = zipfile.ZipFile(temp_zip_file, "w", @@ -1143,77 +1253,23 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, # Metadata to comply with Android OTA package format. metadata = GetPackageMetadata(target_info, source_info) - # 1. Generate payload. - payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin") - cmd = ["brillo_update_payload", "generate", - "--payload", payload_file, - "--target_image", target_file] - if source_file is not None: - cmd.extend(["--source_image", source_file]) + # Generate payload. + payload = Payload() + + # Enforce a max timestamp this payload can be applied on top of. if OPTIONS.downgrade: max_timestamp = source_info.GetBuildProp("ro.build.date.utc") else: max_timestamp = metadata["post-timestamp"] - cmd.extend(["--max_timestamp", max_timestamp]) - p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) - p1.communicate() - assert p1.returncode == 0, "brillo_update_payload generate failed" + additional_args = ["--max_timestamp", max_timestamp] - # 2. Generate hashes of the payload and metadata files. - payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") - metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin") - cmd = ["brillo_update_payload", "hash", - "--unsigned_payload", payload_file, - "--signature_size", "256", - "--metadata_hash_file", metadata_sig_file, - "--payload_hash_file", payload_sig_file] - p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) - p1.communicate() - assert p1.returncode == 0, "brillo_update_payload hash failed" + payload.Generate(target_file, source_file, additional_args) - # 3. Sign the hashes and insert them back into the payload file. - # 3a. Sign the payload hash. - signed_payload_sig_file = payload_signer.Sign(payload_sig_file) + # Sign the payload. + payload.Sign(PayloadSigner()) - # 3b. Sign the metadata hash. - signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file) - - # 3c. Insert the signatures back into the payload file. - signed_payload_file = common.MakeTempFile(prefix="signed-payload-", - suffix=".bin") - cmd = ["brillo_update_payload", "sign", - "--unsigned_payload", payload_file, - "--payload", signed_payload_file, - "--signature_size", "256", - "--metadata_signature_file", signed_metadata_sig_file, - "--payload_signature_file", signed_payload_sig_file] - p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) - p1.communicate() - assert p1.returncode == 0, "brillo_update_payload sign failed" - - # 4. Dump the signed payload properties. - properties_file = common.MakeTempFile(prefix="payload-properties-", - suffix=".txt") - cmd = ["brillo_update_payload", "properties", - "--payload", signed_payload_file, - "--properties_file", properties_file] - p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT) - p1.communicate() - assert p1.returncode == 0, "brillo_update_payload properties failed" - - if OPTIONS.wipe_user_data: - with open(properties_file, "a") as f: - f.write("POWERWASH=1\n") - - # Add the signed payload file and properties into the zip. In order to - # support streaming, we pack payload.bin, payload_properties.txt and - # care_map.txt as ZIP_STORED. So these entries can be read directly with - # the offset and length pairs. - common.ZipWrite(output_zip, signed_payload_file, arcname="payload.bin", - compress_type=zipfile.ZIP_STORED) - common.ZipWrite(output_zip, properties_file, - arcname="payload_properties.txt", - compress_type=zipfile.ZIP_STORED) + # Write the payload into output zip. + payload.WriteToZip(output_zip) # If dm-verity is supported for the device, copy contents of care_map # into A/B OTA package. @@ -1224,6 +1280,8 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file, namelist = target_zip.namelist() if care_map_path in namelist: care_map_data = target_zip.read(care_map_path) + # In order to support streaming, care_map.txt needs to be packed as + # ZIP_STORED. common.ZipWriteStr(output_zip, "care_map.txt", care_map_data, compress_type=zipfile.ZIP_STORED) else: diff --git a/tools/releasetools/test_ota_from_target_files.py b/tools/releasetools/test_ota_from_target_files.py index fa6655b4f..103d4b644 100644 --- a/tools/releasetools/test_ota_from_target_files.py +++ b/tools/releasetools/test_ota_from_target_files.py @@ -15,12 +15,14 @@ # import copy +import os import os.path import unittest +import zipfile import common from ota_from_target_files import ( - _LoadOemDicts, BuildInfo, GetPackageMetadata, PayloadSigner, + _LoadOemDicts, BuildInfo, GetPackageMetadata, Payload, PayloadSigner, WriteFingerprintAssertion) @@ -564,3 +566,157 @@ class PayloadSignerTest(unittest.TestCase): verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE) self._assertFilesEqual(verify_file, signed_file) + + +class PayloadTest(unittest.TestCase): + + def setUp(self): + self.testdata_dir = get_testdata_dir() + self.assertTrue(os.path.exists(self.testdata_dir)) + + common.OPTIONS.wipe_user_data = False + common.OPTIONS.payload_signer = None + common.OPTIONS.payload_signer_args = None + common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey') + common.OPTIONS.key_passwords = { + common.OPTIONS.package_key : None, + } + + def tearDown(self): + common.Cleanup() + + @staticmethod + def _construct_target_files(): + target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip') + with zipfile.ZipFile(target_files, 'w') as target_files_zip: + # META/update_engine_config.txt + target_files_zip.writestr( + 'META/update_engine_config.txt', + "PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n") + + # META/ab_partitions.txt + ab_partitions = ['boot', 'system', 'vendor'] + target_files_zip.writestr( + 'META/ab_partitions.txt', + '\n'.join(ab_partitions)) + + # Create dummy images for each of them. + for partition in ab_partitions: + target_files_zip.writestr('IMAGES/' + partition + '.img', + os.urandom(len(partition))) + + return target_files + + def _create_payload_full(self): + target_file = self._construct_target_files() + payload = Payload() + payload.Generate(target_file) + return payload + + def _create_payload_incremental(self): + target_file = self._construct_target_files() + source_file = self._construct_target_files() + payload = Payload() + payload.Generate(target_file, source_file) + return payload + + def test_Generate_full(self): + payload = self._create_payload_full() + self.assertTrue(os.path.exists(payload.payload_file)) + + def test_Generate_incremental(self): + payload = self._create_payload_incremental() + self.assertTrue(os.path.exists(payload.payload_file)) + + def test_Generate_additionalArgs(self): + target_file = self._construct_target_files() + source_file = self._construct_target_files() + payload = Payload() + # This should work the same as calling payload.Generate(target_file, + # source_file). + payload.Generate( + target_file, additional_args=["--source_image", source_file]) + self.assertTrue(os.path.exists(payload.payload_file)) + + def test_Generate_invalidInput(self): + target_file = self._construct_target_files() + common.ZipDelete(target_file, 'IMAGES/vendor.img') + payload = Payload() + self.assertRaises(AssertionError, payload.Generate, target_file) + + def test_Sign_full(self): + payload = self._create_payload_full() + payload.Sign(PayloadSigner()) + + output_file = common.MakeTempFile(suffix='.zip') + with zipfile.ZipFile(output_file, 'w') as output_zip: + payload.WriteToZip(output_zip) + + import check_ota_package_signature + check_ota_package_signature.VerifyAbOtaPayload( + os.path.join(self.testdata_dir, 'testkey.x509.pem'), + output_file) + + def test_Sign_incremental(self): + payload = self._create_payload_incremental() + payload.Sign(PayloadSigner()) + + output_file = common.MakeTempFile(suffix='.zip') + with zipfile.ZipFile(output_file, 'w') as output_zip: + payload.WriteToZip(output_zip) + + import check_ota_package_signature + check_ota_package_signature.VerifyAbOtaPayload( + os.path.join(self.testdata_dir, 'testkey.x509.pem'), + output_file) + + def test_Sign_withDataWipe(self): + common.OPTIONS.wipe_user_data = True + payload = self._create_payload_full() + payload.Sign(PayloadSigner()) + + with open(payload.payload_properties) as properties_fp: + self.assertIn("POWERWASH=1", properties_fp.read()) + + def test_Sign_badSigner(self): + """Tests that signing failure can be captured.""" + payload = self._create_payload_full() + payload_signer = PayloadSigner() + payload_signer.signer_args.append('bad-option') + self.assertRaises(AssertionError, payload.Sign, payload_signer) + + def test_WriteToZip(self): + payload = self._create_payload_full() + payload.Sign(PayloadSigner()) + + output_file = common.MakeTempFile(suffix='.zip') + with zipfile.ZipFile(output_file, 'w') as output_zip: + payload.WriteToZip(output_zip) + + with zipfile.ZipFile(output_file) as verify_zip: + # First make sure we have the essential entries. + namelist = verify_zip.namelist() + self.assertIn(Payload.PAYLOAD_BIN, namelist) + self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist) + + # Then assert these entries are stored. + for entry_info in verify_zip.infolist(): + if entry_info.filename not in (Payload.PAYLOAD_BIN, + Payload.PAYLOAD_PROPERTIES_TXT): + continue + self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type) + + def test_WriteToZip_unsignedPayload(self): + """Unsigned payloads should not be allowed to be written to zip.""" + payload = self._create_payload_full() + + output_file = common.MakeTempFile(suffix='.zip') + with zipfile.ZipFile(output_file, 'w') as output_zip: + self.assertRaises(AssertionError, payload.WriteToZip, output_zip) + + # Also test with incremental payload. + payload = self._create_payload_incremental() + + output_file = common.MakeTempFile(suffix='.zip') + with zipfile.ZipFile(output_file, 'w') as output_zip: + self.assertRaises(AssertionError, payload.WriteToZip, output_zip)