Merge "Use apksigner in check_target_files_signatures"
This commit is contained in:
commit
8542706908
|
@ -120,19 +120,18 @@ class CertDB(object):
|
|||
def __init__(self):
|
||||
self.certs = {}
|
||||
|
||||
def Add(self, cert, name=None):
|
||||
if cert in self.certs:
|
||||
def Add(self, cert_digest, subject, name=None):
|
||||
if cert_digest in self.certs:
|
||||
if name:
|
||||
self.certs[cert] = self.certs[cert] + "," + name
|
||||
self.certs[cert_digest] = self.certs[cert_digest] + "," + name
|
||||
else:
|
||||
if name is None:
|
||||
name = "unknown cert %s (%s)" % (common.sha1(cert).hexdigest()[:12],
|
||||
GetCertSubject(cert))
|
||||
self.certs[cert] = name
|
||||
name = "unknown cert %s (%s)" % (cert_digest[:12], subject)
|
||||
self.certs[cert_digest] = name
|
||||
|
||||
def Get(self, cert):
|
||||
"""Return the name for a given cert."""
|
||||
return self.certs.get(cert, None)
|
||||
def Get(self, cert_digest):
|
||||
"""Return the name for a given cert digest."""
|
||||
return self.certs.get(cert_digest, None)
|
||||
|
||||
def FindLocalCerts(self):
|
||||
to_load = []
|
||||
|
@ -148,7 +147,10 @@ class CertDB(object):
|
|||
cert = common.ParseCertificate(f.read())
|
||||
name, _ = os.path.splitext(i)
|
||||
name, _ = os.path.splitext(name)
|
||||
self.Add(cert, name)
|
||||
|
||||
cert_sha1 = common.sha1(cert).hexdigest()
|
||||
cert_subject = GetCertSubject(cert)
|
||||
self.Add(cert_sha1, cert_subject, name)
|
||||
|
||||
|
||||
ALL_CERTS = CertDB()
|
||||
|
@ -184,7 +186,7 @@ class APK(object):
|
|||
|
||||
def __init__(self, full_filename, filename):
|
||||
self.filename = filename
|
||||
self.certs = None
|
||||
self.cert_digests = frozenset()
|
||||
self.shared_uid = None
|
||||
self.package = None
|
||||
|
||||
|
@ -195,22 +197,68 @@ class APK(object):
|
|||
finally:
|
||||
Pop()
|
||||
|
||||
def RecordCerts(self, full_filename):
|
||||
out = set()
|
||||
def ReadCertsDeprecated(self, full_filename):
|
||||
print("reading certs in deprecated way for {}".format(full_filename))
|
||||
cert_digests = set()
|
||||
with zipfile.ZipFile(full_filename) as apk:
|
||||
pkcs7 = None
|
||||
for info in apk.infolist():
|
||||
filename = info.filename
|
||||
if (filename.startswith("META-INF/") and
|
||||
info.filename.endswith((".DSA", ".RSA"))):
|
||||
pkcs7 = apk.read(filename)
|
||||
cert = CertFromPKCS7(pkcs7, filename)
|
||||
out.add(cert)
|
||||
ALL_CERTS.Add(cert)
|
||||
if not pkcs7:
|
||||
AddProblem("no signature")
|
||||
if not cert:
|
||||
continue
|
||||
cert_sha1 = common.sha1(cert).hexdigest()
|
||||
cert_subject = GetCertSubject(cert)
|
||||
ALL_CERTS.Add(cert_sha1, cert_subject)
|
||||
cert_digests.add(cert_sha1)
|
||||
if not cert_digests:
|
||||
AddProblem("No signature found")
|
||||
return
|
||||
self.cert_digests = frozenset(cert_digests)
|
||||
|
||||
self.certs = frozenset(out)
|
||||
def RecordCerts(self, full_filename):
|
||||
"""Parse and save the signature of an apk file."""
|
||||
|
||||
# Dump the cert info with apksigner
|
||||
cmd = ["apksigner", "verify", "--print-certs", full_filename]
|
||||
p = common.Run(cmd, stdout=subprocess.PIPE)
|
||||
output, _ = p.communicate()
|
||||
if p.returncode != 0:
|
||||
self.ReadCertsDeprecated(full_filename)
|
||||
return
|
||||
|
||||
# Sample output:
|
||||
# Signer #1 certificate DN: ...
|
||||
# Signer #1 certificate SHA-256 digest: ...
|
||||
# Signer #1 certificate SHA-1 digest: ...
|
||||
# ...
|
||||
certs_info = {}
|
||||
certificate_regex = re.compile(r"(Signer #[0-9]+) (certificate .*):(.*)")
|
||||
for line in output.splitlines():
|
||||
m = certificate_regex.match(line)
|
||||
if not m:
|
||||
continue
|
||||
signer, key, val = m.group(1), m.group(2), m.group(3)
|
||||
if certs_info.get(signer):
|
||||
certs_info[signer].update({key.strip(): val.strip()})
|
||||
else:
|
||||
certs_info.update({signer: {key.strip(): val.strip()}})
|
||||
if not certs_info:
|
||||
AddProblem("Failed to parse cert info")
|
||||
return
|
||||
|
||||
cert_digests = set()
|
||||
for signer, props in certs_info.items():
|
||||
subject = props.get("certificate DN")
|
||||
digest = props.get("certificate SHA-1 digest")
|
||||
if not subject or not digest:
|
||||
AddProblem("Failed to parse cert subject or digest")
|
||||
return
|
||||
ALL_CERTS.Add(digest, subject)
|
||||
cert_digests.add(digest)
|
||||
self.cert_digests = frozenset(cert_digests)
|
||||
|
||||
def ReadManifest(self, full_filename):
|
||||
p = common.Run(["aapt2", "dump", "xmltree", full_filename, "--file",
|
||||
|
@ -316,8 +364,8 @@ class TargetFiles(object):
|
|||
print("uid %s is shared by packages with different cert sets:" % (uid,))
|
||||
for apk in apks:
|
||||
print("%-*s [%s]" % (self.max_pkg_len, apk.package, apk.filename))
|
||||
for cert in apk.certs:
|
||||
print(" ", ALL_CERTS.Get(cert))
|
||||
for digest in apk.cert_digests:
|
||||
print(" ", ALL_CERTS.Get(digest))
|
||||
print()
|
||||
|
||||
def CheckExternalSignatures(self):
|
||||
|
@ -328,25 +376,30 @@ class TargetFiles(object):
|
|||
# predexopting. Consider it an error if this app is now
|
||||
# signed with any key that is present in our tree.
|
||||
apk = self.apks_by_basename[apk_filename]
|
||||
name = ALL_CERTS.Get(apk.cert)
|
||||
if not name.startswith("unknown "):
|
||||
signed_with_external = False
|
||||
for digest in apk.cert_digests:
|
||||
name = ALL_CERTS.Get(digest)
|
||||
if name and name.startswith("unknown "):
|
||||
signed_with_external = True
|
||||
|
||||
if not signed_with_external:
|
||||
Push(apk.filename)
|
||||
AddProblem("hasn't been signed with EXTERNAL cert")
|
||||
Pop()
|
||||
|
||||
def PrintCerts(self):
|
||||
"""Display a table of packages grouped by cert."""
|
||||
by_cert = {}
|
||||
by_digest = {}
|
||||
for apk in self.apks.values():
|
||||
for cert in apk.certs:
|
||||
by_cert.setdefault(cert, []).append((apk.package, apk))
|
||||
for digest in apk.cert_digests:
|
||||
by_digest.setdefault(digest, []).append((apk.package, apk))
|
||||
|
||||
order = [(-len(v), k) for (k, v) in by_cert.items()]
|
||||
order = [(-len(v), k) for (k, v) in by_digest.items()]
|
||||
order.sort()
|
||||
|
||||
for _, cert in order:
|
||||
print("%s:" % (ALL_CERTS.Get(cert),))
|
||||
apks = by_cert[cert]
|
||||
for _, digest in order:
|
||||
print("%s:" % (ALL_CERTS.Get(digest),))
|
||||
apks = by_digest[digest]
|
||||
apks.sort()
|
||||
for _, apk in apks:
|
||||
if apk.shared_uid:
|
||||
|
@ -366,15 +419,15 @@ class TargetFiles(object):
|
|||
|
||||
max_pkg_len = max(self.max_pkg_len, other.max_pkg_len)
|
||||
|
||||
by_certpair = {}
|
||||
by_digestpair = {}
|
||||
|
||||
for i in all_apks:
|
||||
if i in self.apks:
|
||||
if i in other.apks:
|
||||
# in both; should have same set of certs
|
||||
if self.apks[i].certs != other.apks[i].certs:
|
||||
by_certpair.setdefault((other.apks[i].certs,
|
||||
self.apks[i].certs), []).append(i)
|
||||
if self.apks[i].cert_digests != other.apks[i].cert_digests:
|
||||
by_digestpair.setdefault((other.apks[i].cert_digests,
|
||||
self.apks[i].cert_digests), []).append(i)
|
||||
else:
|
||||
print("%s [%s]: new APK (not in comparison target_files)" % (
|
||||
i, self.apks[i].filename))
|
||||
|
@ -383,10 +436,10 @@ class TargetFiles(object):
|
|||
print("%s [%s]: removed APK (only in comparison target_files)" % (
|
||||
i, other.apks[i].filename))
|
||||
|
||||
if by_certpair:
|
||||
if by_digestpair:
|
||||
AddProblem("some APKs changed certs")
|
||||
Banner("APK signing differences")
|
||||
for (old, new), packages in sorted(by_certpair.items()):
|
||||
for (old, new), packages in sorted(by_digestpair.items()):
|
||||
for i, o in enumerate(old):
|
||||
if i == 0:
|
||||
print("was", ALL_CERTS.Get(o))
|
||||
|
|
Loading…
Reference in New Issue