forked from openkylin/platform_build
Merge "releasetools: Add Payload class."
am: 80ac71bada
Change-Id: Iaa68c95cf0ccbfe7895881a16a9e0c2bc759929c
This commit is contained in:
commit
e5cacd7653
|
@ -360,6 +360,122 @@ class PayloadSigner(object):
|
||||||
return out_file
|
return out_file
|
||||||
|
|
||||||
|
|
||||||
|
class Payload(object):
|
||||||
|
"""Manages the creation and the signing of an A/B OTA Payload."""
|
||||||
|
|
||||||
|
PAYLOAD_BIN = 'payload.bin'
|
||||||
|
PAYLOAD_PROPERTIES_TXT = 'payload_properties.txt'
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# The place where the output from the subprocess should go.
|
||||||
|
self._log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE
|
||||||
|
self.payload_file = None
|
||||||
|
self.payload_properties = None
|
||||||
|
|
||||||
|
def Generate(self, target_file, source_file=None, additional_args=None):
|
||||||
|
"""Generates a payload from the given target-files zip(s).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
target_file: The filename of the target build target-files zip.
|
||||||
|
source_file: The filename of the source build target-files zip; or None if
|
||||||
|
generating a full OTA.
|
||||||
|
additional_args: A list of additional args that should be passed to
|
||||||
|
brillo_update_payload script; or None.
|
||||||
|
"""
|
||||||
|
if additional_args is None:
|
||||||
|
additional_args = []
|
||||||
|
|
||||||
|
payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
|
||||||
|
cmd = ["brillo_update_payload", "generate",
|
||||||
|
"--payload", payload_file,
|
||||||
|
"--target_image", target_file]
|
||||||
|
if source_file is not None:
|
||||||
|
cmd.extend(["--source_image", source_file])
|
||||||
|
cmd.extend(additional_args)
|
||||||
|
p = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
|
||||||
|
stdoutdata, _ = p.communicate()
|
||||||
|
assert p.returncode == 0, \
|
||||||
|
"brillo_update_payload generate failed: {}".format(stdoutdata)
|
||||||
|
|
||||||
|
self.payload_file = payload_file
|
||||||
|
self.payload_properties = None
|
||||||
|
|
||||||
|
def Sign(self, payload_signer):
|
||||||
|
"""Generates and signs the hashes of the payload and metadata.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
payload_signer: A PayloadSigner() instance that serves the signing work.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
AssertionError: On any failure when calling brillo_update_payload script.
|
||||||
|
"""
|
||||||
|
assert isinstance(payload_signer, PayloadSigner)
|
||||||
|
|
||||||
|
# 1. Generate hashes of the payload and metadata files.
|
||||||
|
payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
|
||||||
|
metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
|
||||||
|
cmd = ["brillo_update_payload", "hash",
|
||||||
|
"--unsigned_payload", self.payload_file,
|
||||||
|
"--signature_size", "256",
|
||||||
|
"--metadata_hash_file", metadata_sig_file,
|
||||||
|
"--payload_hash_file", payload_sig_file]
|
||||||
|
p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
|
||||||
|
p1.communicate()
|
||||||
|
assert p1.returncode == 0, "brillo_update_payload hash failed"
|
||||||
|
|
||||||
|
# 2. Sign the hashes.
|
||||||
|
signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
|
||||||
|
signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
|
||||||
|
|
||||||
|
# 3. Insert the signatures back into the payload file.
|
||||||
|
signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
|
||||||
|
suffix=".bin")
|
||||||
|
cmd = ["brillo_update_payload", "sign",
|
||||||
|
"--unsigned_payload", self.payload_file,
|
||||||
|
"--payload", signed_payload_file,
|
||||||
|
"--signature_size", "256",
|
||||||
|
"--metadata_signature_file", signed_metadata_sig_file,
|
||||||
|
"--payload_signature_file", signed_payload_sig_file]
|
||||||
|
p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
|
||||||
|
p1.communicate()
|
||||||
|
assert p1.returncode == 0, "brillo_update_payload sign failed"
|
||||||
|
|
||||||
|
# 4. Dump the signed payload properties.
|
||||||
|
properties_file = common.MakeTempFile(prefix="payload-properties-",
|
||||||
|
suffix=".txt")
|
||||||
|
cmd = ["brillo_update_payload", "properties",
|
||||||
|
"--payload", signed_payload_file,
|
||||||
|
"--properties_file", properties_file]
|
||||||
|
p1 = common.Run(cmd, stdout=self._log_file, stderr=subprocess.STDOUT)
|
||||||
|
p1.communicate()
|
||||||
|
assert p1.returncode == 0, "brillo_update_payload properties failed"
|
||||||
|
|
||||||
|
if OPTIONS.wipe_user_data:
|
||||||
|
with open(properties_file, "a") as f:
|
||||||
|
f.write("POWERWASH=1\n")
|
||||||
|
|
||||||
|
self.payload_file = signed_payload_file
|
||||||
|
self.payload_properties = properties_file
|
||||||
|
|
||||||
|
def WriteToZip(self, output_zip):
|
||||||
|
"""Writes the payload to the given zip.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
output_zip: The output ZipFile instance.
|
||||||
|
"""
|
||||||
|
assert self.payload_file is not None
|
||||||
|
assert self.payload_properties is not None
|
||||||
|
|
||||||
|
# Add the signed payload file and properties into the zip. In order to
|
||||||
|
# support streaming, we pack them as ZIP_STORED. So these entries can be
|
||||||
|
# read directly with the offset and length pairs.
|
||||||
|
common.ZipWrite(output_zip, self.payload_file, arcname=Payload.PAYLOAD_BIN,
|
||||||
|
compress_type=zipfile.ZIP_STORED)
|
||||||
|
common.ZipWrite(output_zip, self.payload_properties,
|
||||||
|
arcname=Payload.PAYLOAD_PROPERTIES_TXT,
|
||||||
|
compress_type=zipfile.ZIP_STORED)
|
||||||
|
|
||||||
|
|
||||||
def SignOutput(temp_zip_name, output_zip_name):
|
def SignOutput(temp_zip_name, output_zip_name):
|
||||||
pw = OPTIONS.key_passwords[OPTIONS.package_key]
|
pw = OPTIONS.key_passwords[OPTIONS.package_key]
|
||||||
|
|
||||||
|
@ -1122,12 +1238,6 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file,
|
||||||
value += ' ' * (expected_length - len(value))
|
value += ' ' * (expected_length - len(value))
|
||||||
return value
|
return value
|
||||||
|
|
||||||
# The place where the output from the subprocess should go.
|
|
||||||
log_file = sys.stdout if OPTIONS.verbose else subprocess.PIPE
|
|
||||||
|
|
||||||
# Get the PayloadSigner to be used in step 3.
|
|
||||||
payload_signer = PayloadSigner()
|
|
||||||
|
|
||||||
# Stage the output zip package for package signing.
|
# Stage the output zip package for package signing.
|
||||||
temp_zip_file = tempfile.NamedTemporaryFile()
|
temp_zip_file = tempfile.NamedTemporaryFile()
|
||||||
output_zip = zipfile.ZipFile(temp_zip_file, "w",
|
output_zip = zipfile.ZipFile(temp_zip_file, "w",
|
||||||
|
@ -1143,72 +1253,15 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file,
|
||||||
# Metadata to comply with Android OTA package format.
|
# Metadata to comply with Android OTA package format.
|
||||||
metadata = GetPackageMetadata(target_info, source_info)
|
metadata = GetPackageMetadata(target_info, source_info)
|
||||||
|
|
||||||
# 1. Generate payload.
|
# Generate payload.
|
||||||
payload_file = common.MakeTempFile(prefix="payload-", suffix=".bin")
|
payload = Payload()
|
||||||
cmd = ["brillo_update_payload", "generate",
|
payload.Generate(target_file, source_file)
|
||||||
"--payload", payload_file,
|
|
||||||
"--target_image", target_file]
|
|
||||||
if source_file is not None:
|
|
||||||
cmd.extend(["--source_image", source_file])
|
|
||||||
p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
|
|
||||||
p1.communicate()
|
|
||||||
assert p1.returncode == 0, "brillo_update_payload generate failed"
|
|
||||||
|
|
||||||
# 2. Generate hashes of the payload and metadata files.
|
# Sign the payload.
|
||||||
payload_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
|
payload.Sign(PayloadSigner())
|
||||||
metadata_sig_file = common.MakeTempFile(prefix="sig-", suffix=".bin")
|
|
||||||
cmd = ["brillo_update_payload", "hash",
|
|
||||||
"--unsigned_payload", payload_file,
|
|
||||||
"--signature_size", "256",
|
|
||||||
"--metadata_hash_file", metadata_sig_file,
|
|
||||||
"--payload_hash_file", payload_sig_file]
|
|
||||||
p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
|
|
||||||
p1.communicate()
|
|
||||||
assert p1.returncode == 0, "brillo_update_payload hash failed"
|
|
||||||
|
|
||||||
# 3. Sign the hashes and insert them back into the payload file.
|
# Write the payload into output zip.
|
||||||
# 3a. Sign the payload hash.
|
payload.WriteToZip(output_zip)
|
||||||
signed_payload_sig_file = payload_signer.Sign(payload_sig_file)
|
|
||||||
|
|
||||||
# 3b. Sign the metadata hash.
|
|
||||||
signed_metadata_sig_file = payload_signer.Sign(metadata_sig_file)
|
|
||||||
|
|
||||||
# 3c. Insert the signatures back into the payload file.
|
|
||||||
signed_payload_file = common.MakeTempFile(prefix="signed-payload-",
|
|
||||||
suffix=".bin")
|
|
||||||
cmd = ["brillo_update_payload", "sign",
|
|
||||||
"--unsigned_payload", payload_file,
|
|
||||||
"--payload", signed_payload_file,
|
|
||||||
"--signature_size", "256",
|
|
||||||
"--metadata_signature_file", signed_metadata_sig_file,
|
|
||||||
"--payload_signature_file", signed_payload_sig_file]
|
|
||||||
p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
|
|
||||||
p1.communicate()
|
|
||||||
assert p1.returncode == 0, "brillo_update_payload sign failed"
|
|
||||||
|
|
||||||
# 4. Dump the signed payload properties.
|
|
||||||
properties_file = common.MakeTempFile(prefix="payload-properties-",
|
|
||||||
suffix=".txt")
|
|
||||||
cmd = ["brillo_update_payload", "properties",
|
|
||||||
"--payload", signed_payload_file,
|
|
||||||
"--properties_file", properties_file]
|
|
||||||
p1 = common.Run(cmd, stdout=log_file, stderr=subprocess.STDOUT)
|
|
||||||
p1.communicate()
|
|
||||||
assert p1.returncode == 0, "brillo_update_payload properties failed"
|
|
||||||
|
|
||||||
if OPTIONS.wipe_user_data:
|
|
||||||
with open(properties_file, "a") as f:
|
|
||||||
f.write("POWERWASH=1\n")
|
|
||||||
|
|
||||||
# Add the signed payload file and properties into the zip. In order to
|
|
||||||
# support streaming, we pack payload.bin, payload_properties.txt and
|
|
||||||
# care_map.txt as ZIP_STORED. So these entries can be read directly with
|
|
||||||
# the offset and length pairs.
|
|
||||||
common.ZipWrite(output_zip, signed_payload_file, arcname="payload.bin",
|
|
||||||
compress_type=zipfile.ZIP_STORED)
|
|
||||||
common.ZipWrite(output_zip, properties_file,
|
|
||||||
arcname="payload_properties.txt",
|
|
||||||
compress_type=zipfile.ZIP_STORED)
|
|
||||||
|
|
||||||
# If dm-verity is supported for the device, copy contents of care_map
|
# If dm-verity is supported for the device, copy contents of care_map
|
||||||
# into A/B OTA package.
|
# into A/B OTA package.
|
||||||
|
@ -1219,6 +1272,8 @@ def WriteABOTAPackageWithBrilloScript(target_file, output_file,
|
||||||
namelist = target_zip.namelist()
|
namelist = target_zip.namelist()
|
||||||
if care_map_path in namelist:
|
if care_map_path in namelist:
|
||||||
care_map_data = target_zip.read(care_map_path)
|
care_map_data = target_zip.read(care_map_path)
|
||||||
|
# In order to support streaming, care_map.txt needs to be packed as
|
||||||
|
# ZIP_STORED.
|
||||||
common.ZipWriteStr(output_zip, "care_map.txt", care_map_data,
|
common.ZipWriteStr(output_zip, "care_map.txt", care_map_data,
|
||||||
compress_type=zipfile.ZIP_STORED)
|
compress_type=zipfile.ZIP_STORED)
|
||||||
else:
|
else:
|
||||||
|
|
|
@ -15,12 +15,14 @@
|
||||||
#
|
#
|
||||||
|
|
||||||
import copy
|
import copy
|
||||||
|
import os
|
||||||
import os.path
|
import os.path
|
||||||
import unittest
|
import unittest
|
||||||
|
import zipfile
|
||||||
|
|
||||||
import common
|
import common
|
||||||
from ota_from_target_files import (
|
from ota_from_target_files import (
|
||||||
_LoadOemDicts, BuildInfo, GetPackageMetadata, PayloadSigner,
|
_LoadOemDicts, BuildInfo, GetPackageMetadata, Payload, PayloadSigner,
|
||||||
WriteFingerprintAssertion)
|
WriteFingerprintAssertion)
|
||||||
|
|
||||||
|
|
||||||
|
@ -564,3 +566,157 @@ class PayloadSignerTest(unittest.TestCase):
|
||||||
|
|
||||||
verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
|
verify_file = os.path.join(self.testdata_dir, self.SIGNED_SIGFILE)
|
||||||
self._assertFilesEqual(verify_file, signed_file)
|
self._assertFilesEqual(verify_file, signed_file)
|
||||||
|
|
||||||
|
|
||||||
|
class PayloadTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.testdata_dir = get_testdata_dir()
|
||||||
|
self.assertTrue(os.path.exists(self.testdata_dir))
|
||||||
|
|
||||||
|
common.OPTIONS.wipe_user_data = False
|
||||||
|
common.OPTIONS.payload_signer = None
|
||||||
|
common.OPTIONS.payload_signer_args = None
|
||||||
|
common.OPTIONS.package_key = os.path.join(self.testdata_dir, 'testkey')
|
||||||
|
common.OPTIONS.key_passwords = {
|
||||||
|
common.OPTIONS.package_key : None,
|
||||||
|
}
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
common.Cleanup()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _construct_target_files():
|
||||||
|
target_files = common.MakeTempFile(prefix='target_files-', suffix='.zip')
|
||||||
|
with zipfile.ZipFile(target_files, 'w') as target_files_zip:
|
||||||
|
# META/update_engine_config.txt
|
||||||
|
target_files_zip.writestr(
|
||||||
|
'META/update_engine_config.txt',
|
||||||
|
"PAYLOAD_MAJOR_VERSION=2\nPAYLOAD_MINOR_VERSION=4\n")
|
||||||
|
|
||||||
|
# META/ab_partitions.txt
|
||||||
|
ab_partitions = ['boot', 'system', 'vendor']
|
||||||
|
target_files_zip.writestr(
|
||||||
|
'META/ab_partitions.txt',
|
||||||
|
'\n'.join(ab_partitions))
|
||||||
|
|
||||||
|
# Create dummy images for each of them.
|
||||||
|
for partition in ab_partitions:
|
||||||
|
target_files_zip.writestr('IMAGES/' + partition + '.img',
|
||||||
|
os.urandom(len(partition)))
|
||||||
|
|
||||||
|
return target_files
|
||||||
|
|
||||||
|
def _create_payload_full(self):
|
||||||
|
target_file = self._construct_target_files()
|
||||||
|
payload = Payload()
|
||||||
|
payload.Generate(target_file)
|
||||||
|
return payload
|
||||||
|
|
||||||
|
def _create_payload_incremental(self):
|
||||||
|
target_file = self._construct_target_files()
|
||||||
|
source_file = self._construct_target_files()
|
||||||
|
payload = Payload()
|
||||||
|
payload.Generate(target_file, source_file)
|
||||||
|
return payload
|
||||||
|
|
||||||
|
def test_Generate_full(self):
|
||||||
|
payload = self._create_payload_full()
|
||||||
|
self.assertTrue(os.path.exists(payload.payload_file))
|
||||||
|
|
||||||
|
def test_Generate_incremental(self):
|
||||||
|
payload = self._create_payload_incremental()
|
||||||
|
self.assertTrue(os.path.exists(payload.payload_file))
|
||||||
|
|
||||||
|
def test_Generate_additionalArgs(self):
|
||||||
|
target_file = self._construct_target_files()
|
||||||
|
source_file = self._construct_target_files()
|
||||||
|
payload = Payload()
|
||||||
|
# This should work the same as calling payload.Generate(target_file,
|
||||||
|
# source_file).
|
||||||
|
payload.Generate(
|
||||||
|
target_file, additional_args=["--source_image", source_file])
|
||||||
|
self.assertTrue(os.path.exists(payload.payload_file))
|
||||||
|
|
||||||
|
def test_Generate_invalidInput(self):
|
||||||
|
target_file = self._construct_target_files()
|
||||||
|
common.ZipDelete(target_file, 'IMAGES/vendor.img')
|
||||||
|
payload = Payload()
|
||||||
|
self.assertRaises(AssertionError, payload.Generate, target_file)
|
||||||
|
|
||||||
|
def test_Sign_full(self):
|
||||||
|
payload = self._create_payload_full()
|
||||||
|
payload.Sign(PayloadSigner())
|
||||||
|
|
||||||
|
output_file = common.MakeTempFile(suffix='.zip')
|
||||||
|
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||||
|
payload.WriteToZip(output_zip)
|
||||||
|
|
||||||
|
import check_ota_package_signature
|
||||||
|
check_ota_package_signature.VerifyAbOtaPayload(
|
||||||
|
os.path.join(self.testdata_dir, 'testkey.x509.pem'),
|
||||||
|
output_file)
|
||||||
|
|
||||||
|
def test_Sign_incremental(self):
|
||||||
|
payload = self._create_payload_incremental()
|
||||||
|
payload.Sign(PayloadSigner())
|
||||||
|
|
||||||
|
output_file = common.MakeTempFile(suffix='.zip')
|
||||||
|
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||||
|
payload.WriteToZip(output_zip)
|
||||||
|
|
||||||
|
import check_ota_package_signature
|
||||||
|
check_ota_package_signature.VerifyAbOtaPayload(
|
||||||
|
os.path.join(self.testdata_dir, 'testkey.x509.pem'),
|
||||||
|
output_file)
|
||||||
|
|
||||||
|
def test_Sign_withDataWipe(self):
|
||||||
|
common.OPTIONS.wipe_user_data = True
|
||||||
|
payload = self._create_payload_full()
|
||||||
|
payload.Sign(PayloadSigner())
|
||||||
|
|
||||||
|
with open(payload.payload_properties) as properties_fp:
|
||||||
|
self.assertIn("POWERWASH=1", properties_fp.read())
|
||||||
|
|
||||||
|
def test_Sign_badSigner(self):
|
||||||
|
"""Tests that signing failure can be captured."""
|
||||||
|
payload = self._create_payload_full()
|
||||||
|
payload_signer = PayloadSigner()
|
||||||
|
payload_signer.signer_args.append('bad-option')
|
||||||
|
self.assertRaises(AssertionError, payload.Sign, payload_signer)
|
||||||
|
|
||||||
|
def test_WriteToZip(self):
|
||||||
|
payload = self._create_payload_full()
|
||||||
|
payload.Sign(PayloadSigner())
|
||||||
|
|
||||||
|
output_file = common.MakeTempFile(suffix='.zip')
|
||||||
|
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||||
|
payload.WriteToZip(output_zip)
|
||||||
|
|
||||||
|
with zipfile.ZipFile(output_file) as verify_zip:
|
||||||
|
# First make sure we have the essential entries.
|
||||||
|
namelist = verify_zip.namelist()
|
||||||
|
self.assertIn(Payload.PAYLOAD_BIN, namelist)
|
||||||
|
self.assertIn(Payload.PAYLOAD_PROPERTIES_TXT, namelist)
|
||||||
|
|
||||||
|
# Then assert these entries are stored.
|
||||||
|
for entry_info in verify_zip.infolist():
|
||||||
|
if entry_info.filename not in (Payload.PAYLOAD_BIN,
|
||||||
|
Payload.PAYLOAD_PROPERTIES_TXT):
|
||||||
|
continue
|
||||||
|
self.assertEqual(zipfile.ZIP_STORED, entry_info.compress_type)
|
||||||
|
|
||||||
|
def test_WriteToZip_unsignedPayload(self):
|
||||||
|
"""Unsigned payloads should not be allowed to be written to zip."""
|
||||||
|
payload = self._create_payload_full()
|
||||||
|
|
||||||
|
output_file = common.MakeTempFile(suffix='.zip')
|
||||||
|
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||||
|
self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
|
||||||
|
|
||||||
|
# Also test with incremental payload.
|
||||||
|
payload = self._create_payload_incremental()
|
||||||
|
|
||||||
|
output_file = common.MakeTempFile(suffix='.zip')
|
||||||
|
with zipfile.ZipFile(output_file, 'w') as output_zip:
|
||||||
|
self.assertRaises(AssertionError, payload.WriteToZip, output_zip)
|
||||||
|
|
Loading…
Reference in New Issue