Merge "releasetools: Validate A/B OTA payload signatures."

This commit is contained in:
Tao Bao 2017-09-06 20:35:07 +00:00 committed by Gerrit Code Review
commit 631b3a031c
1 changed files with 96 additions and 6 deletions

View File

@ -25,12 +25,19 @@ import common
import re
import subprocess
import sys
import tempfile
import zipfile
from hashlib import sha1
from hashlib import sha256
# 'update_payload' package is under 'system/update_engine/scripts/', which
# should to be included in PYTHONPATH.
from update_payload.payload import Payload
from update_payload.update_metadata_pb2 import Signatures
def cert_uses_sha256(cert):
def CertUsesSha256(cert):
"""Check if the cert uses SHA-256 hashing algorithm."""
cmd = ['openssl', 'x509', '-text', '-noout', '-in', cert]
@ -46,7 +53,7 @@ def cert_uses_sha256(cert):
return algorithm.group(1).startswith('sha256')
def verify_package(cert, package):
def VerifyPackage(cert, package):
"""Verify the given package with the certificate.
(Comments from bootable/recovery/verifier.cpp:)
@ -90,7 +97,7 @@ def verify_package(cert, package):
print('Signed data length: %d' % (signed_len,))
print('Signature start: %d' % (signature_start,))
use_sha256 = cert_uses_sha256(cert)
use_sha256 = CertUsesSha256(cert)
print('Use SHA-256: %s' % (use_sha256,))
if use_sha256:
@ -100,7 +107,7 @@ def verify_package(cert, package):
h.update(package_bytes[:signed_len])
package_digest = h.hexdigest().lower()
print('Digest: %s\n' % (package_digest,))
print('Digest: %s' % (package_digest,))
# Get the signature from the input package.
signature = package_bytes[signature_start:-6]
@ -141,7 +148,87 @@ def verify_package(cert, package):
assert package_digest == digest_string, "Verification failed."
# Verified successfully upon reaching here.
print('VERIFIED\n')
print('\nWhole package signature VERIFIED\n')
def VerifyAbOtaPayload(cert, package):
"""Verifies the payload and metadata signatures in an A/B OTA payload."""
def VerifySignatureBlob(hash_file, blob):
"""Verifies the input hash_file against the signature blob."""
signatures = Signatures()
signatures.ParseFromString(blob)
extracted_sig_file = common.MakeTempFile(
prefix='extracted-sig-', suffix='.bin')
# In Android, we only expect one signature.
assert len(signatures.signatures) == 1, \
'Invalid number of signatures: %d' % len(signatures.signatures)
signature = signatures.signatures[0]
length = len(signature.data)
assert length == 256, 'Invalid signature length %d' % (length,)
with open(extracted_sig_file, 'w') as f:
f.write(signature.data)
# Verify the signature file extracted from the payload, by reversing the
# signing operation. Alternatively, this can be done by calling 'openssl
# rsautl -verify -certin -inkey <cert.pem> -in <extracted_sig_file> -out
# <output>', then to assert that
# <output> == SHA-256 DigestInfo prefix || <hash_file>.
cmd = ['openssl', 'pkeyutl', '-verify', '-certin', '-inkey', cert,
'-pkeyopt', 'digest:sha256', '-in', hash_file,
'-sigfile', extracted_sig_file]
p = common.Run(cmd, stdout=subprocess.PIPE)
result, _ = p.communicate()
# https://github.com/openssl/openssl/pull/3213
# 'openssl pkeyutl -verify' (prior to 1.1.0) returns non-zero return code,
# even on successful verification. To avoid the false alarm with older
# openssl, check the output directly.
assert result.strip() == 'Signature Verified Successfully', result.strip()
package_zip = zipfile.ZipFile(package, 'r')
if 'payload.bin' not in package_zip.namelist():
common.ZipClose(package_zip)
return
print('Verifying A/B OTA payload signatures...')
package_dir = tempfile.mkdtemp(prefix='package-')
common.OPTIONS.tempfiles.append(package_dir)
payload_file = package_zip.extract('payload.bin', package_dir)
payload = Payload(open(payload_file, 'rb'))
payload.Init()
# Extract the payload hash and metadata hash from the payload.bin.
payload_hash_file = common.MakeTempFile(prefix='hash-', suffix='.bin')
metadata_hash_file = common.MakeTempFile(prefix='hash-', suffix='.bin')
cmd = ['brillo_update_payload', 'hash',
'--unsigned_payload', payload_file,
'--signature_size', '256',
'--metadata_hash_file', metadata_hash_file,
'--payload_hash_file', payload_hash_file]
p = common.Run(cmd, stdout=subprocess.PIPE)
p.communicate()
assert p.returncode == 0, 'brillo_update_payload hash failed'
# Payload signature verification.
assert payload.manifest.HasField('signatures_offset')
payload_signature = payload.ReadDataBlob(
payload.manifest.signatures_offset, payload.manifest.signatures_size)
VerifySignatureBlob(payload_hash_file, payload_signature)
# Metadata signature verification.
metadata_signature = payload.ReadDataBlob(
-payload.header.metadata_signature_len,
payload.header.metadata_signature_len)
VerifySignatureBlob(metadata_hash_file, metadata_signature)
common.ZipClose(package_zip)
# Verified successfully upon reaching here.
print('\nPayload signatures VERIFIED\n\n')
def main():
@ -150,7 +237,8 @@ def main():
parser.add_argument('package', help='The OTA package to be verified.')
args = parser.parse_args()
verify_package(args.certificate, args.package)
VerifyPackage(args.certificate, args.package)
VerifyAbOtaPayload(args.certificate, args.package)
if __name__ == '__main__':
@ -159,3 +247,5 @@ if __name__ == '__main__':
except AssertionError as err:
print('\n ERROR: %s\n' % (err,))
sys.exit(1)
finally:
common.Cleanup()