Merge "releasetools: Validate A/B OTA payload signatures." am: 631b3a031c am: d5ec31f8a9

am: 9e8416917a

Change-Id: I091a3a43e47b2cca5d4605ac41e6981951d29cd5
This commit is contained in:
Tao Bao 2017-09-06 21:15:46 +00:00 committed by android-build-merger
commit 28bec43b96
1 changed files with 96 additions and 6 deletions

View File

@ -25,12 +25,19 @@ import common
import re import re
import subprocess import subprocess
import sys import sys
import tempfile
import zipfile
from hashlib import sha1 from hashlib import sha1
from hashlib import sha256 from hashlib import sha256
# 'update_payload' package is under 'system/update_engine/scripts/', which
# should to be included in PYTHONPATH.
from update_payload.payload import Payload
from update_payload.update_metadata_pb2 import Signatures
def cert_uses_sha256(cert):
def CertUsesSha256(cert):
"""Check if the cert uses SHA-256 hashing algorithm.""" """Check if the cert uses SHA-256 hashing algorithm."""
cmd = ['openssl', 'x509', '-text', '-noout', '-in', cert] cmd = ['openssl', 'x509', '-text', '-noout', '-in', cert]
@ -46,7 +53,7 @@ def cert_uses_sha256(cert):
return algorithm.group(1).startswith('sha256') return algorithm.group(1).startswith('sha256')
def verify_package(cert, package): def VerifyPackage(cert, package):
"""Verify the given package with the certificate. """Verify the given package with the certificate.
(Comments from bootable/recovery/verifier.cpp:) (Comments from bootable/recovery/verifier.cpp:)
@ -90,7 +97,7 @@ def verify_package(cert, package):
print('Signed data length: %d' % (signed_len,)) print('Signed data length: %d' % (signed_len,))
print('Signature start: %d' % (signature_start,)) print('Signature start: %d' % (signature_start,))
use_sha256 = cert_uses_sha256(cert) use_sha256 = CertUsesSha256(cert)
print('Use SHA-256: %s' % (use_sha256,)) print('Use SHA-256: %s' % (use_sha256,))
if use_sha256: if use_sha256:
@ -100,7 +107,7 @@ def verify_package(cert, package):
h.update(package_bytes[:signed_len]) h.update(package_bytes[:signed_len])
package_digest = h.hexdigest().lower() package_digest = h.hexdigest().lower()
print('Digest: %s\n' % (package_digest,)) print('Digest: %s' % (package_digest,))
# Get the signature from the input package. # Get the signature from the input package.
signature = package_bytes[signature_start:-6] signature = package_bytes[signature_start:-6]
@ -141,7 +148,87 @@ def verify_package(cert, package):
assert package_digest == digest_string, "Verification failed." assert package_digest == digest_string, "Verification failed."
# Verified successfully upon reaching here. # Verified successfully upon reaching here.
print('VERIFIED\n') print('\nWhole package signature VERIFIED\n')
def VerifyAbOtaPayload(cert, package):
"""Verifies the payload and metadata signatures in an A/B OTA payload."""
def VerifySignatureBlob(hash_file, blob):
"""Verifies the input hash_file against the signature blob."""
signatures = Signatures()
signatures.ParseFromString(blob)
extracted_sig_file = common.MakeTempFile(
prefix='extracted-sig-', suffix='.bin')
# In Android, we only expect one signature.
assert len(signatures.signatures) == 1, \
'Invalid number of signatures: %d' % len(signatures.signatures)
signature = signatures.signatures[0]
length = len(signature.data)
assert length == 256, 'Invalid signature length %d' % (length,)
with open(extracted_sig_file, 'w') as f:
f.write(signature.data)
# Verify the signature file extracted from the payload, by reversing the
# signing operation. Alternatively, this can be done by calling 'openssl
# rsautl -verify -certin -inkey <cert.pem> -in <extracted_sig_file> -out
# <output>', then to assert that
# <output> == SHA-256 DigestInfo prefix || <hash_file>.
cmd = ['openssl', 'pkeyutl', '-verify', '-certin', '-inkey', cert,
'-pkeyopt', 'digest:sha256', '-in', hash_file,
'-sigfile', extracted_sig_file]
p = common.Run(cmd, stdout=subprocess.PIPE)
result, _ = p.communicate()
# https://github.com/openssl/openssl/pull/3213
# 'openssl pkeyutl -verify' (prior to 1.1.0) returns non-zero return code,
# even on successful verification. To avoid the false alarm with older
# openssl, check the output directly.
assert result.strip() == 'Signature Verified Successfully', result.strip()
package_zip = zipfile.ZipFile(package, 'r')
if 'payload.bin' not in package_zip.namelist():
common.ZipClose(package_zip)
return
print('Verifying A/B OTA payload signatures...')
package_dir = tempfile.mkdtemp(prefix='package-')
common.OPTIONS.tempfiles.append(package_dir)
payload_file = package_zip.extract('payload.bin', package_dir)
payload = Payload(open(payload_file, 'rb'))
payload.Init()
# Extract the payload hash and metadata hash from the payload.bin.
payload_hash_file = common.MakeTempFile(prefix='hash-', suffix='.bin')
metadata_hash_file = common.MakeTempFile(prefix='hash-', suffix='.bin')
cmd = ['brillo_update_payload', 'hash',
'--unsigned_payload', payload_file,
'--signature_size', '256',
'--metadata_hash_file', metadata_hash_file,
'--payload_hash_file', payload_hash_file]
p = common.Run(cmd, stdout=subprocess.PIPE)
p.communicate()
assert p.returncode == 0, 'brillo_update_payload hash failed'
# Payload signature verification.
assert payload.manifest.HasField('signatures_offset')
payload_signature = payload.ReadDataBlob(
payload.manifest.signatures_offset, payload.manifest.signatures_size)
VerifySignatureBlob(payload_hash_file, payload_signature)
# Metadata signature verification.
metadata_signature = payload.ReadDataBlob(
-payload.header.metadata_signature_len,
payload.header.metadata_signature_len)
VerifySignatureBlob(metadata_hash_file, metadata_signature)
common.ZipClose(package_zip)
# Verified successfully upon reaching here.
print('\nPayload signatures VERIFIED\n\n')
def main(): def main():
@ -150,7 +237,8 @@ def main():
parser.add_argument('package', help='The OTA package to be verified.') parser.add_argument('package', help='The OTA package to be verified.')
args = parser.parse_args() args = parser.parse_args()
verify_package(args.certificate, args.package) VerifyPackage(args.certificate, args.package)
VerifyAbOtaPayload(args.certificate, args.package)
if __name__ == '__main__': if __name__ == '__main__':
@ -159,3 +247,5 @@ if __name__ == '__main__':
except AssertionError as err: except AssertionError as err:
print('\n ERROR: %s\n' % (err,)) print('\n ERROR: %s\n' % (err,))
sys.exit(1) sys.exit(1)
finally:
common.Cleanup()