Group common PKI validation code - Refactor

The PKI token validation for PKI and PKIZ tokens both individually do
the cms validation and the revocation checking. There's no reason that
this needs to be in both the functions. We can split the decode process
and standardize the validation process to make the code flow easier to
understand.

As per the previous patch the tests all operate on _validate_offline
already so this is a straight refactoring.

Change-Id: I148c96688cace1f73a8fad80ed5104b1b6b871b7
This commit is contained in:
Jamie Lennox
2016-01-16 15:24:08 +11:00
parent 808c922243
commit e8ca9276c6

View File

@@ -429,6 +429,16 @@ def _get_project_version(project):
return pkg_resources.get_distribution(project).version
def _uncompress_pkiz(token):
# TypeError If the signed_text is not zlib compressed binascii.Error if
# signed_text has incorrect base64 padding (py34)
try:
return cms.pkiz_uncompress(token)
except (TypeError, binascii.Error):
raise ksm_exceptions.InvalidToken(token)
class BaseAuthProtocol(object):
"""A base class for AuthProtocol token checking implementations.
@@ -849,14 +859,19 @@ class AuthProtocol(BaseAuthProtocol):
raise webob.exc.HTTPInternalServerError()
def _validate_offline(self, token, token_hashes):
if cms.is_pkiz(token):
token_data = _uncompress_pkiz(token)
inform = cms.PKIZ_CMS_FORM
elif cms.is_asn1_token(token):
token_data = cms.token_to_cms(token)
inform = cms.PKI_ASN1_FORM
else:
# Can't do offline validation for this type of token.
return
try:
if cms.is_pkiz(token):
verified = self._verify_pkiz_token(token, token_hashes)
elif cms.is_asn1_token(token):
verified = self._verify_signed_token(token, token_hashes)
else:
# Can't do offline validation for this type of token.
return
self._revocations.check(token_hashes)
verified = self._cms_verify(token_data, inform)
except ksc_exceptions.CertificateConfigError:
self.log.warning(_LW('Fetch certificate config failed, '
'fallback to online validation.'))
@@ -923,24 +938,6 @@ class AuthProtocol(BaseAuthProtocol):
self.log.error(_LE('CMS Verify output: %s'), err.output)
raise
def _verify_signed_token(self, signed_text, token_ids):
"""Check that the token is unrevoked and has a valid signature."""
self._revocations.check(token_ids)
formatted = cms.token_to_cms(signed_text)
verified = self._cms_verify(formatted)
return verified
def _verify_pkiz_token(self, signed_text, token_ids):
self._revocations.check(token_ids)
try:
uncompressed = cms.pkiz_uncompress(signed_text)
verified = self._cms_verify(uncompressed, inform=cms.PKIZ_CMS_FORM)
return verified
# TypeError If the signed_text is not zlib compressed
# binascii.Error if signed_text has incorrect base64 padding (py34)
except (TypeError, binascii.Error):
raise ksm_exceptions.InvalidToken(signed_text)
def _fetch_signing_cert(self):
self._signing_directory.write_file(
self._SIGNING_CERT_FILE_NAME,