Merge "Update style for signature_utils class"

This commit is contained in:
Jenkins 2015-11-18 08:50:05 +00:00 committed by Gerrit Code Review
commit 1585713a9f
2 changed files with 134 additions and 44 deletions

View File

@ -16,6 +16,7 @@
"""Support signature verification."""
import binascii
import datetime
from castellan import key_manager
from cryptography import exceptions as crypto_exception
@ -46,13 +47,14 @@ HASH_METHODS = {
'SHA-512': hashes.SHA512()
}
# These are the currently supported signature formats
# These are the currently supported signature key types
(RSA_PSS,) = (
'RSA-PSS',
)
# This includes the supported public key type for the signature key type
SIGNATURE_KEY_TYPES = {
RSA_PSS
RSA_PSS: rsa.RSAPublicKey
}
# These are the currently supported certificate formats
@ -84,6 +86,55 @@ MASK_GEN_ALGORITHMS = {
)
# each key type will require its own verifier
def create_verifier_for_pss(signature, hash_method, public_key,
image_properties):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:param image_properties: the key-value properties about the image
:return: the verifier to use to verify the signature for RSA-PSS
:raises: SignatureVerificationError if the RSA-PSS specific properties
are invalid
"""
# retrieve other needed properties, or use defaults if not there
if MASK_GEN_ALG in image_properties:
mask_gen_algorithm = image_properties[MASK_GEN_ALG]
if mask_gen_algorithm not in MASK_GEN_ALGORITHMS:
raise exception.SignatureVerificationError(
'Invalid mask_gen_algorithm: %s' % mask_gen_algorithm)
mgf = MASK_GEN_ALGORITHMS[mask_gen_algorithm](hash_method)
else:
# default to MGF1
mgf = padding.MGF1(hash_method)
if PSS_SALT_LENGTH in image_properties:
pss_salt_length = image_properties[PSS_SALT_LENGTH]
try:
salt_length = int(pss_salt_length)
except ValueError:
raise exception.SignatureVerificationError(
'Invalid pss_salt_length: %s' % pss_salt_length)
else:
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
# map the key type to the verifier function to use
KEY_TYPE_METHODS = {
RSA_PSS: create_verifier_for_pss
}
def should_verify_signature(image_properties):
"""Determine whether a signature should be verified.
@ -125,41 +176,11 @@ def verify_signature(context, checksum_hash, image_properties):
image_properties[CERT_UUID],
signature_key_type)
# Initialize the verifier
verifier = None
# create the verifier based on the signature key type
if signature_key_type == RSA_PSS:
# retrieve other needed properties, or use defaults if not there
if MASK_GEN_ALG in image_properties:
mask_gen_algorithm = image_properties[MASK_GEN_ALG]
if mask_gen_algorithm in MASK_GEN_ALGORITHMS:
mgf = MASK_GEN_ALGORITHMS[mask_gen_algorithm](hash_method)
else:
raise exception.SignatureVerificationError(
'Invalid mask_gen_algorithm: %s' % mask_gen_algorithm)
else:
# default to MGF1
mgf = padding.MGF1(hash_method)
if PSS_SALT_LENGTH in image_properties:
pss_salt_length = image_properties[PSS_SALT_LENGTH]
try:
salt_length = int(pss_salt_length)
except ValueError:
raise exception.SignatureVerificationError(
'Invalid pss_salt_length: %s' % pss_salt_length)
else:
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# Create the verifier
verifier = public_key.verifier(
signature,
padding.PSS(
mgf=mgf,
salt_length=salt_length
),
hash_method
)
verifier = KEY_TYPE_METHODS[signature_key_type](signature,
hash_method,
public_key,
image_properties)
if verifier:
# Verify the signature
@ -237,11 +258,10 @@ def get_public_key(context, signature_certificate_uuid, signature_key_type):
public_key = certificate.public_key()
# Confirm the type is of the type expected based on the signature key type
if signature_key_type == RSA_PSS:
if not isinstance(public_key, rsa.RSAPublicKey):
raise exception.SignatureVerificationError(
'Invalid public key type for signature key type: %s'
% signature_key_type)
if not isinstance(public_key, SIGNATURE_KEY_TYPES[signature_key_type]):
raise exception.SignatureVerificationError(
'Invalid public key type for signature key type: %s'
% signature_key_type)
return public_key
@ -282,5 +302,32 @@ def get_certificate(context, signature_certificate_uuid):
cert_data = cert.get_encoded()
certificate = x509.load_der_x509_certificate(cert_data,
default_backend())
else:
raise exception.SignatureVerificationError(
'Certificate format not supported: %s' % cert.format)
# verify the certificate
verify_certificate(certificate)
return certificate
def verify_certificate(certificate):
"""Verify that the certificate has not expired.
:param certificate: the cryptography certificate object
:raises: SignatureVerificationError if the certificate valid time range
does not include now
"""
# Get now in UTC, since certificate returns times in UTC
now = datetime.datetime.utcnow()
# Confirm the certificate valid time range includes now
if now < certificate.not_valid_before:
raise exception.SignatureVerificationError(
'Certificate is not valid before: %s UTC'
% certificate.not_valid_before)
elif now > certificate.not_valid_after:
raise exception.SignatureVerificationError(
'Certificate is not valid after: %s UTC'
% certificate.not_valid_after)

View File

@ -14,6 +14,7 @@
# under the License.
import base64
import datetime
import mock
from cryptography.hazmat.backends import default_backend
@ -76,12 +77,26 @@ class FakeCastellanCertificate(object):
class FakeCryptoCertificate(object):
def __init__(self, pub_key):
def __init__(self, pub_key=TEST_PRIVATE_KEY.public_key(),
not_valid_before=(datetime.datetime.utcnow() -
datetime.timedelta(hours=1)),
not_valid_after=(datetime.datetime.utcnow() +
datetime.timedelta(hours=1))):
self.pub_key = pub_key
self.cert_not_valid_before = not_valid_before
self.cert_not_valid_after = not_valid_after
def public_key(self):
return self.pub_key
@property
def not_valid_before(self):
return self.cert_not_valid_before
@property
def not_valid_after(self):
return self.cert_not_valid_after
class BadPublicKey(object):
@ -304,7 +319,7 @@ class TestSignatureUtils(test_utils.BaseTestCase):
@mock.patch('glance.common.signature_utils.get_certificate')
def test_get_public_key(self, mock_get_cert):
fake_cert = FakeCryptoCertificate(TEST_PRIVATE_KEY.public_key())
fake_cert = FakeCryptoCertificate()
mock_get_cert.return_value = fake_cert
result_pub_key = signature_utils.get_public_key(None, None, 'RSA-PSS')
self.assertEqual(fake_cert.public_key(), result_pub_key)
@ -323,11 +338,39 @@ class TestSignatureUtils(test_utils.BaseTestCase):
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_certificate(self, mock_key_manager_API, mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate(TEST_PRIVATE_KEY.public_key())
x509_cert = FakeCryptoCertificate()
mock_load_cert.return_value = x509_cert
self.assertEqual(x509_cert,
signature_utils.get_certificate(None, cert_uuid))
@mock.patch('cryptography.x509.load_der_x509_certificate')
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_expired_certificate(self, mock_key_manager_API,
mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate(
not_valid_after=datetime.datetime.utcnow() -
datetime.timedelta(hours=1))
mock_load_cert.return_value = x509_cert
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Certificate is not valid after: .*',
signature_utils.get_certificate, None,
cert_uuid)
@mock.patch('cryptography.x509.load_der_x509_certificate')
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_not_yet_valid_certificate(self, mock_key_manager_API,
mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate(
not_valid_before=datetime.datetime.utcnow() +
datetime.timedelta(hours=1))
mock_load_cert.return_value = x509_cert
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Certificate is not valid before: .*',
signature_utils.get_certificate, None,
cert_uuid)
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_certificate_key_manager_fail(self, mock_key_manager_API):
bad_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0695'