Use cursive for signature verification

This change removes the signature_utils module
from Glance and uses the cursive library, which
contains an identical module.

Change-Id: I80fcafa528b87a83b90ed7c0e4c0db9228852bc2
Depends-On: Ic3ffb6b318dc2ac6c9d3a60bed5198fd4d40e318
Partial-Bug: #1528349
This commit is contained in:
Dane Fichter 2016-08-02 23:13:46 -04:00 committed by dane-fichter
parent d0f4316752
commit 5afb5d33ed
10 changed files with 40 additions and 821 deletions

View File

@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cursive import exception as cursive_exception
import glance_store
from oslo_config import cfg
from oslo_log import log as logging
@ -225,7 +226,7 @@ class ImageDataController(object):
raise webob.exc.HTTPServiceUnavailable(explanation=msg,
request=req)
except exception.SignatureVerificationError as e:
except cursive_exception.SignatureVerificationError as e:
msg = (_LE("Signature verification failed for image %(id)s: %(e)s")
% {'id': image_id,
'e': encodeutils.exception_to_unicode(e)})

View File

@ -447,10 +447,6 @@ class MetadefTagNotFound(NotFound):
" namespace=%(namespace_name)s.")
class SignatureVerificationError(GlanceException):
message = _("Unable to verify signature: %(reason)s")
class InvalidVersion(Invalid):
message = _("Version is invalid: %(reason)s")

View File

@ -1,387 +0,0 @@
# Copyright (c) The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Support signature verification."""
import binascii
import datetime
from castellan import key_manager
from cryptography import exceptions as crypto_exception
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography import x509
from oslo_log import log as logging
from oslo_serialization import base64
from oslo_utils import encodeutils
from glance.common import exception
from glance.i18n import _, _LE
LOG = logging.getLogger(__name__)
# Note: This is the signature hash method, which is independent from the
# image data checksum hash method (which is handled elsewhere).
HASH_METHODS = {
'SHA-224': hashes.SHA224(),
'SHA-256': hashes.SHA256(),
'SHA-384': hashes.SHA384(),
'SHA-512': hashes.SHA512()
}
# Currently supported signature key types
# RSA Options
RSA_PSS = 'RSA-PSS'
# DSA Options
DSA = 'DSA'
# ECC curves -- note that only those with key sizes >=384 are included
# Note also that some of these may not be supported by the cryptography backend
ECC_CURVES = (
ec.SECT571K1(),
ec.SECT409K1(),
ec.SECT571R1(),
ec.SECT409R1(),
ec.SECP521R1(),
ec.SECP384R1(),
)
# These are the currently supported certificate formats
(X_509,) = (
'X.509',
)
CERTIFICATE_FORMATS = {
X_509
}
# These are the currently supported MGF formats, used for RSA-PSS signatures
MASK_GEN_ALGORITHMS = {
'MGF1': padding.MGF1
}
# Required image property names
(SIGNATURE, HASH_METHOD, KEY_TYPE, CERT_UUID) = (
'img_signature',
'img_signature_hash_method',
'img_signature_key_type',
'img_signature_certificate_uuid'
)
class SignatureKeyType(object):
_REGISTERED_TYPES = {}
def __init__(self, name, public_key_type, create_verifier):
self.name = name
self.public_key_type = public_key_type
self.create_verifier = create_verifier
@classmethod
def register(cls, name, public_key_type, create_verifier):
"""Register a signature key type.
:param name: the name of the signature key type
:param public_key_type: e.g. RSAPublicKey, DSAPublicKey, etc.
:param create_verifier: a function to create a verifier for this type
"""
cls._REGISTERED_TYPES[name] = cls(name,
public_key_type,
create_verifier)
@classmethod
def lookup(cls, name):
"""Look up the signature key type.
:param name: the name of the signature key type
:returns: the SignatureKeyType object
:raises: glance.common.exception.SignatureVerificationError if
signature key type is invalid
"""
if name not in cls._REGISTERED_TYPES:
raise exception.SignatureVerificationError(
_('Invalid signature key type: %s') % name
)
return cls._REGISTERED_TYPES[name]
# each key type will require its own verifier
def create_verifier_for_pss(signature, hash_method, public_key):
"""Create the verifier to use when the key type is RSA-PSS.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:returns: the verifier to use to verify the signature for RSA-PSS
:raises glance.common.exception.SignatureVerificationError: if the
RSA-PSS specific properties are invalid
"""
# default to MGF1
mgf = padding.MGF1(hash_method)
# default to max salt length
salt_length = padding.PSS.MAX_LENGTH
# return the verifier
return public_key.verifier(
signature,
padding.PSS(mgf=mgf, salt_length=salt_length),
hash_method
)
def create_verifier_for_ecc(signature, hash_method, public_key):
"""Create the verifier to use when the key type is ECC_*.
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:return: the verifier to use to verify the signature for ECC_*
"""
# return the verifier
return public_key.verifier(
signature,
ec.ECDSA(hash_method)
)
def create_verifier_for_dsa(signature, hash_method, public_key):
"""Create verifier to use when the key type is DSA
:param signature: the decoded signature to use
:param hash_method: the hash method to use, as a cryptography object
:param public_key: the public key to use, as a cryptography object
:returns: the verifier to use to verify the signature for DSA
"""
# return the verifier
return public_key.verifier(
signature,
hash_method
)
# map the key type to the verifier function to use
SignatureKeyType.register(RSA_PSS, rsa.RSAPublicKey, create_verifier_for_pss)
SignatureKeyType.register(DSA, dsa.DSAPublicKey, create_verifier_for_dsa)
# Register the elliptic curves which are supported by the backend
for curve in ECC_CURVES:
if default_backend().elliptic_curve_supported(curve):
SignatureKeyType.register('ECC_' + curve.name.upper(),
ec.EllipticCurvePublicKey,
create_verifier_for_ecc)
def should_create_verifier(image_properties):
"""Determine whether a verifier should be created.
Using the image properties, determine whether existing properties indicate
that signature verification should be done.
:param image_properties: the key-value properties about the image
:return: True, if signature metadata properties exist, False otherwise
"""
return (image_properties is not None and
CERT_UUID in image_properties and
HASH_METHOD in image_properties and
SIGNATURE in image_properties and
KEY_TYPE in image_properties)
def get_verifier(context, image_properties):
"""Retrieve the image properties and use them to create a verifier.
:param context: the user context for authentication
:param image_properties: the key-value properties about the image
:return: instance of cryptography AsymmetricVerificationContext
:raises glance.common.exception.SignatureVerificationError: if building
the verifier fails
"""
if not should_create_verifier(image_properties):
raise exception.SignatureVerificationError(
_('Required image properties for signature verification do not'
' exist. Cannot verify signature.')
)
signature = get_signature(image_properties[SIGNATURE])
hash_method = get_hash_method(image_properties[HASH_METHOD])
signature_key_type = SignatureKeyType.lookup(
image_properties[KEY_TYPE])
public_key = get_public_key(context,
image_properties[CERT_UUID],
signature_key_type)
# create the verifier based on the signature key type
try:
verifier = signature_key_type.create_verifier(signature,
hash_method,
public_key)
except crypto_exception.UnsupportedAlgorithm as e:
msg = (_LE("Unable to create verifier since algorithm is "
"unsupported: %(e)s")
% {'e': encodeutils.exception_to_unicode(e)})
LOG.error(msg)
raise exception.SignatureVerificationError(
_('Unable to verify signature since the algorithm is unsupported '
'on this system')
)
if verifier:
return verifier
else:
# Error creating the verifier
raise exception.SignatureVerificationError(
_('Error occurred while creating the verifier')
)
def get_signature(signature_data):
"""Decode the signature data and returns the signature.
:param signature_data: the base64-encoded signature data
:returns: the decoded signature
:raises glance.common.exception.SignatureVerificationError: if the
signature data is malformatted
"""
try:
signature = base64.decode_as_bytes(signature_data)
except (TypeError, binascii.Error):
raise exception.SignatureVerificationError(
_('The signature data was not properly encoded using base64')
)
return signature
def get_hash_method(hash_method_name):
"""Verify the hash method name and create the hash method.
:param hash_method_name: the name of the hash method to retrieve
:returns: the hash method, a cryptography object
:raises glance.common.exception.SignatureVerificationError: if the
hash method name is invalid
"""
if hash_method_name not in HASH_METHODS:
raise exception.SignatureVerificationError(
_('Invalid signature hash method: %s') % hash_method_name
)
return HASH_METHODS[hash_method_name]
def get_public_key(context, signature_certificate_uuid, signature_key_type):
"""Create the public key object from a retrieved certificate.
:param context: the user context for authentication
:param signature_certificate_uuid: the uuid to use to retrieve the
certificate
:param signature_key_type: a SignatureKeyType object
:returns: the public key cryptography object
:raises glance.common.exception.SignatureVerificationError: if public
key format is invalid
"""
certificate = get_certificate(context, signature_certificate_uuid)
# Note that this public key could either be
# RSAPublicKey, DSAPublicKey, or EllipticCurvePublicKey
public_key = certificate.public_key()
# Confirm the type is of the type expected based on the signature key type
if not isinstance(public_key, signature_key_type.public_key_type):
raise exception.SignatureVerificationError(
_('Invalid public key type for signature key type: %s')
% signature_key_type
)
return public_key
def get_certificate(context, signature_certificate_uuid):
"""Create the certificate object from the retrieved certificate data.
:param context: the user context for authentication
:param signature_certificate_uuid: the uuid to use to retrieve the
certificate
:returns: the certificate cryptography object
:raises glance.common.exception.SignatureVerificationError: if the
retrieval fails or the format is invalid
"""
keymgr_api = key_manager.API()
try:
# The certificate retrieved here is a castellan certificate object
cert = keymgr_api.get(context, signature_certificate_uuid)
except Exception as e:
# The problem encountered may be backend-specific, since castellan
# can use different backends. Rather than importing all possible
# backends here, the generic "Exception" is used.
msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s")
% {'id': signature_certificate_uuid,
'e': encodeutils.exception_to_unicode(e)})
LOG.error(msg)
raise exception.SignatureVerificationError(
_('Unable to retrieve certificate with ID: %s')
% signature_certificate_uuid
)
if cert.format not in CERTIFICATE_FORMATS:
raise exception.SignatureVerificationError(
_('Invalid certificate format: %s') % cert.format
)
if cert.format == X_509:
# castellan always encodes certificates in DER format
cert_data = cert.get_encoded()
certificate = x509.load_der_x509_certificate(cert_data,
default_backend())
else:
raise exception.SignatureVerificationError(
_('Certificate format not supported: %s') % cert.format
)
# verify the certificate
verify_certificate(certificate)
return certificate
def verify_certificate(certificate):
"""Verify that the certificate has not expired.
:param certificate: the cryptography certificate object
:raises glance.common.exception.SignatureVerificationError: if the
certificate valid time range does not include now
"""
# Get now in UTC, since certificate returns times in UTC
now = datetime.datetime.utcnow()
# Confirm the certificate valid time range includes now
if now < certificate.not_valid_before:
raise exception.SignatureVerificationError(
_('Certificate is not valid before: %s UTC')
% certificate.not_valid_before
)
elif now > certificate.not_valid_after:
raise exception.SignatureVerificationError(
_('Certificate is not valid after: %s UTC')
% certificate.not_valid_after
)

View File

@ -17,6 +17,8 @@ import collections
import copy
from cryptography import exceptions as crypto_exception
from cursive import exception as cursive_exception
from cursive import signature_utils
import glance_store as store
from oslo_config import cfg
from oslo_log import log as logging
@ -24,7 +26,6 @@ from oslo_utils import encodeutils
from oslo_utils import excutils
from glance.common import exception
from glance.common import signature_utils
from glance.common import utils
import glance.domain.proxy
from glance.i18n import _, _LE, _LI, _LW
@ -411,12 +412,18 @@ class ImageProxy(glance.domain.proxy.Image):
# Create the verifier for signature verification (if correct properties
# are present)
if (signature_utils.should_create_verifier(
self.image.extra_properties)):
extra_props = self.image.extra_properties
if (signature_utils.should_create_verifier(extra_props)):
# NOTE(bpoulos): if creating verifier fails, exception will be
# raised
img_signature = extra_props[signature_utils.SIGNATURE]
hash_method = extra_props[signature_utils.HASH_METHOD]
key_type = extra_props[signature_utils.KEY_TYPE]
cert_uuid = extra_props[signature_utils.CERT_UUID]
verifier = signature_utils.get_verifier(
self.context, self.image.extra_properties)
self.context, cert_uuid, hash_method,
img_signature, key_type
)
else:
verifier = None
@ -436,7 +443,7 @@ class ImageProxy(glance.domain.proxy.Image):
LOG.info(_LI("Successfully verified signature for image %s"),
self.image.image_id)
except crypto_exception.InvalidSignature:
raise exception.SignatureVerificationError(
raise cursive_exception.SignatureVerificationError(
_('Signature verification failed')
)

View File

@ -1,416 +0,0 @@
# Copyright (c) The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import datetime
import mock
import unittest
from cryptography import exceptions as crypto_exception
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from glance.common import exception
from glance.common import signature_utils
from glance.tests import utils as test_utils
TEST_RSA_PRIVATE_KEY = rsa.generate_private_key(public_exponent=3,
key_size=1024,
backend=default_backend())
TEST_DSA_PRIVATE_KEY = dsa.generate_private_key(key_size=3072,
backend=default_backend())
# secp521r1 is assumed to be available on all supported platforms
TEST_ECC_PRIVATE_KEY = ec.generate_private_key(ec.SECP521R1(),
default_backend())
# Required image property names
(SIGNATURE, HASH_METHOD, KEY_TYPE, CERT_UUID) = (
signature_utils.SIGNATURE,
signature_utils.HASH_METHOD,
signature_utils.KEY_TYPE,
signature_utils.CERT_UUID
)
class FakeKeyManager(object):
def __init__(self):
self.certs = {'invalid_format_cert':
FakeCastellanCertificate('A' * 256, 'BLAH'),
'valid_format_cert':
FakeCastellanCertificate('A' * 256, 'X.509')}
def get(self, context, cert_uuid):
cert = self.certs.get(cert_uuid)
if cert is None:
raise Exception("No matching certificate found.")
return cert
class FakeCastellanCertificate(object):
def __init__(self, data, cert_format):
self.data = data
self.cert_format = cert_format
@property
def format(self):
return self.cert_format
def get_encoded(self):
return self.data
class FakeCryptoCertificate(object):
def __init__(self, pub_key=TEST_RSA_PRIVATE_KEY.public_key(),
not_valid_before=(datetime.datetime.utcnow() -
datetime.timedelta(hours=1)),
not_valid_after=(datetime.datetime.utcnow() +
datetime.timedelta(hours=1))):
self.pub_key = pub_key
self.cert_not_valid_before = not_valid_before
self.cert_not_valid_after = not_valid_after
def public_key(self):
return self.pub_key
@property
def not_valid_before(self):
return self.cert_not_valid_before
@property
def not_valid_after(self):
return self.cert_not_valid_after
class BadPublicKey(object):
def verifier(self, signature, padding, hash_method):
return None
class TestSignatureUtils(test_utils.BaseTestCase):
"""Test methods of signature_utils"""
def test_should_create_verifier(self):
image_props = {CERT_UUID: 'CERT_UUID',
HASH_METHOD: 'HASH_METHOD',
SIGNATURE: 'SIGNATURE',
KEY_TYPE: 'SIG_KEY_TYPE'}
self.assertTrue(signature_utils.should_create_verifier(image_props))
def test_should_create_verifier_fail(self):
bad_image_properties = [{CERT_UUID: 'CERT_UUID',
HASH_METHOD: 'HASH_METHOD',
SIGNATURE: 'SIGNATURE'},
{CERT_UUID: 'CERT_UUID',
HASH_METHOD: 'HASH_METHOD',
KEY_TYPE: 'SIG_KEY_TYPE'},
{CERT_UUID: 'CERT_UUID',
SIGNATURE: 'SIGNATURE',
KEY_TYPE: 'SIG_KEY_TYPE'},
{HASH_METHOD: 'HASH_METHOD',
SIGNATURE: 'SIGNATURE',
KEY_TYPE: 'SIG_KEY_TYPE'}]
for bad_props in bad_image_properties:
result = signature_utils.should_create_verifier(bad_props)
self.assertFalse(result)
@unittest.skipIf(not default_backend().hash_supported(hashes.SHA256()),
"SHA-2 hash algorithms not supported by backend")
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_PSS(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_RSA_PRIVATE_KEY.signer(
padding.PSS(
mgf=padding.MGF1(hash_alg),
salt_length=padding.PSS.MAX_LENGTH
),
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
image_props = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: hash_name,
KEY_TYPE: 'RSA-PSS',
SIGNATURE: signature}
verifier = signature_utils.get_verifier(None, image_props)
verifier.update(data)
verifier.verify()
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_ECC(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
# test every ECC curve
for curve in signature_utils.ECC_CURVES:
key_type_name = 'ECC_' + curve.name.upper()
try:
signature_utils.SignatureKeyType.lookup(key_type_name)
except exception.SignatureVerificationError:
import warnings
warnings.warn("ECC curve '%s' not supported" % curve.name)
continue
# Create a private key to use
private_key = ec.generate_private_key(curve,
default_backend())
mock_get_pub_key.return_value = private_key.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = private_key.signer(
ec.ECDSA(hash_alg)
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
image_props = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: hash_name,
KEY_TYPE: key_type_name,
SIGNATURE: signature}
verifier = signature_utils.get_verifier(None, image_props)
verifier.update(data)
verifier.verify()
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_DSA(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_DSA_PRIVATE_KEY.public_key()
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
signer = TEST_DSA_PRIVATE_KEY.signer(
hash_alg
)
signer.update(data)
signature = base64.b64encode(signer.finalize())
image_props = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: hash_name,
KEY_TYPE: 'DSA',
SIGNATURE: signature}
verifier = signature_utils.get_verifier(None, image_props)
verifier.update(data)
verifier.verify()
@unittest.skipIf(not default_backend().hash_supported(hashes.SHA256()),
"SHA-2 hash algorithms not supported by backend")
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_bad_signature(self, mock_get_pub_key):
data = b'224626ae19824466f2a7f39ab7b80f7f'
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
image_properties = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: 'SHA-256',
KEY_TYPE: 'RSA-PSS',
SIGNATURE: 'BLAH'}
verifier = signature_utils.get_verifier(None, image_properties)
verifier.update(data)
self.assertRaises(crypto_exception.InvalidSignature,
verifier.verify)
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_unsupported_algorithm(self,
mock_get_pub_key):
public_key = TEST_RSA_PRIVATE_KEY.public_key()
public_key.verifier = mock.MagicMock(
side_effect=crypto_exception.UnsupportedAlgorithm(
"When OpenSSL is older than 1.0.1 then only SHA1 is "
"supported with MGF1.",
crypto_exception._Reasons.UNSUPPORTED_HASH))
mock_get_pub_key.return_value = public_key
image_properties = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: 'SHA-256',
KEY_TYPE: 'RSA-PSS',
SIGNATURE: 'BLAH'}
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Unable to verify signature since the '
'algorithm is unsupported on this system',
signature_utils.get_verifier,
None, image_properties)
@mock.patch('glance.common.signature_utils.should_create_verifier')
def test_verify_signature_invalid_image_props(self, mock_should):
mock_should.return_value = False
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Required image properties for signature'
' verification do not exist. Cannot verify'
' signature.',
signature_utils.get_verifier,
None, None)
@mock.patch('glance.common.signature_utils.get_public_key')
def test_verify_signature_bad_sig_key_type(self, mock_get_pub_key):
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
image_properties = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: 'SHA-256',
KEY_TYPE: 'BLAH',
SIGNATURE: 'BLAH'}
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Invalid signature key type: .*',
signature_utils.get_verifier,
None, image_properties)
@mock.patch('glance.common.signature_utils.get_public_key')
def test_get_verifier_none(self, mock_get_pub_key):
mock_get_pub_key.return_value = BadPublicKey()
image_properties = {CERT_UUID:
'fea14bc2-d75f-4ba5-bccc-b5c924ad0693',
HASH_METHOD: 'SHA-256',
KEY_TYPE: 'RSA-PSS',
SIGNATURE: 'BLAH'}
self.assertRaisesRegexp(exception.SignatureVerificationError,
'Error occurred while creating'
' the verifier',
signature_utils.get_verifier,
None, image_properties)
def test_get_signature(self):
signature = b'A' * 256
data = base64.b64encode(signature)
self.assertEqual(signature,
signature_utils.get_signature(data))
def test_get_signature_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'The signature data was not properly'
' encoded using base64',
signature_utils.get_signature, '///')
def test_get_hash_method(self):
hash_dict = signature_utils.HASH_METHODS
for hash_name in hash_dict.keys():
hash_class = signature_utils.get_hash_method(hash_name).__class__
self.assertIsInstance(hash_dict[hash_name], hash_class)
def test_get_hash_method_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid signature hash method: .*',
signature_utils.get_hash_method, 'SHA-2')
def test_get_signature_key_type_lookup(self):
for sig_format in ['RSA-PSS', 'ECC_SECT571K1']:
sig_key_type = signature_utils.SignatureKeyType.lookup(sig_format)
self.assertIsInstance(sig_key_type,
signature_utils.SignatureKeyType)
self.assertEqual(sig_format, sig_key_type.name)
def test_signature_key_type_lookup_fail(self):
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid signature key type: .*',
signature_utils.SignatureKeyType.lookup,
'RSB-PSS')
@mock.patch('glance.common.signature_utils.get_certificate')
def test_get_public_key_rsa(self, mock_get_cert):
fake_cert = FakeCryptoCertificate()
mock_get_cert.return_value = fake_cert
sig_key_type = signature_utils.SignatureKeyType.lookup('RSA-PSS')
result_pub_key = signature_utils.get_public_key(None, None,
sig_key_type)
self.assertEqual(fake_cert.public_key(), result_pub_key)
@mock.patch('glance.common.signature_utils.get_certificate')
def test_get_public_key_ecc(self, mock_get_cert):
fake_cert = FakeCryptoCertificate(TEST_ECC_PRIVATE_KEY.public_key())
mock_get_cert.return_value = fake_cert
sig_key_type = signature_utils.SignatureKeyType.lookup('ECC_SECP521R1')
result_pub_key = signature_utils.get_public_key(None, None,
sig_key_type)
self.assertEqual(fake_cert.public_key(), result_pub_key)
@mock.patch('glance.common.signature_utils.get_certificate')
def test_get_public_key_dsa(self, mock_get_cert):
fake_cert = FakeCryptoCertificate(TEST_DSA_PRIVATE_KEY.public_key())
mock_get_cert.return_value = fake_cert
sig_key_type = signature_utils.SignatureKeyType.lookup('DSA')
result_pub_key = signature_utils.get_public_key(None, None,
sig_key_type)
self.assertEqual(fake_cert.public_key(), result_pub_key)
@mock.patch('glance.common.signature_utils.get_certificate')
def test_get_public_key_invalid_key(self, mock_get_certificate):
bad_pub_key = 'A' * 256
mock_get_certificate.return_value = FakeCryptoCertificate(bad_pub_key)
sig_key_type = signature_utils.SignatureKeyType.lookup('RSA-PSS')
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid public key type for '
'signature key type: .*',
signature_utils.get_public_key, None,
None, sig_key_type)
@mock.patch('cryptography.x509.load_der_x509_certificate')
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_certificate(self, mock_key_manager_API, mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate()
mock_load_cert.return_value = x509_cert
self.assertEqual(x509_cert,
signature_utils.get_certificate(None, cert_uuid))
@mock.patch('cryptography.x509.load_der_x509_certificate')
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_expired_certificate(self, mock_key_manager_API,
mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate(
not_valid_after=datetime.datetime.utcnow() -
datetime.timedelta(hours=1))
mock_load_cert.return_value = x509_cert
self.assertRaisesRegex(exception.SignatureVerificationError,
'Certificate is not valid after: .*',
signature_utils.get_certificate, None,
cert_uuid)
@mock.patch('cryptography.x509.load_der_x509_certificate')
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_not_yet_valid_certificate(self, mock_key_manager_API,
mock_load_cert):
cert_uuid = 'valid_format_cert'
x509_cert = FakeCryptoCertificate(
not_valid_before=datetime.datetime.utcnow() +
datetime.timedelta(hours=1))
mock_load_cert.return_value = x509_cert
self.assertRaisesRegex(exception.SignatureVerificationError,
'Certificate is not valid before: .*',
signature_utils.get_certificate, None,
cert_uuid)
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_certificate_key_manager_fail(self, mock_key_manager_API):
bad_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0695'
self.assertRaisesRegex(exception.SignatureVerificationError,
'Unable to retrieve certificate with ID: .*',
signature_utils.get_certificate, None,
bad_cert_uuid)
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
def test_get_certificate_invalid_format(self, mock_API):
cert_uuid = 'invalid_format_cert'
self.assertRaisesRegex(exception.SignatureVerificationError,
'Invalid certificate format: .*',
signature_utils.get_certificate, None,
cert_uuid)

View File

@ -12,11 +12,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cursive import exception as cursive_exception
from cursive import signature_utils
import glance_store
import mock
from glance.common import exception
from glance.common import signature_utils
import glance.location
from glance.tests.unit import base as unit_test_base
from glance.tests.unit import utils as unit_test_utils
@ -223,7 +224,7 @@ class TestStoreImage(utils.BaseTestCase):
unit_test_utils.fake_get_verifier)
image = glance.location.ImageProxy(image_stub, context,
self.store_api, self.store_utils)
self.assertRaises(exception.SignatureVerificationError,
self.assertRaises(cursive_exception.SignatureVerificationError,
image.set_data,
'YYYY', 4)

View File

@ -85,10 +85,10 @@ def fake_get_size_from_backend(uri, context=None):
return 1
def fake_get_verifier(context, image_properties):
def fake_get_verifier(context, cert_uuid, hash_method,
img_signature, key_type):
verifier = mock.Mock()
if (image_properties is not None and 'img_signature' in image_properties
and image_properties['img_signature'] == 'VALID'):
if (img_signature is not None and img_signature == 'VALID'):
verifier.verify.return_value = None
else:
ex = crypto_exception.InvalidSignature()

View File

@ -14,6 +14,7 @@
# under the License.
import uuid
from cursive import exception as cursive_exception
import glance_store
import mock
import six
@ -286,7 +287,7 @@ class TestImagesController(base.StoreClearingUnitTest):
def test_upload_signature_verification_fails(self):
request = unit_test_utils.get_fake_request()
image = FakeImage()
image.set_data = Raise(exception.SignatureVerificationError)
image.set_data = Raise(cursive_exception.SignatureVerificationError)
self.image_repo.result = image
self.assertRaises(webob.exc.HTTPBadRequest, self.controller.upload,
request, unit_test_utils.UUID1, 'YYYY', 4)

View File

@ -0,0 +1,16 @@
---
prelude: >
Glance and Nova contain nearly identical digital signature modules. In
order to better maintain and evolve this code and to eliminate
the possibility that the modules diverge, we propose removing this code
and instead using the new cursive library. Please read the other section
for more details.
other:
- The cursive library is an OpenStack project which implements
OpenStack-specific verification of digital signatures. In Newton, the
majority of the signature verification code was removed from Glance.
Cursive has been added to Glance as a dependency and will be installed by
default. Glance uses the Cursive library's functionality to verify digital
signatures. To familiarize yourself with this new dependency and see the
list of transitive dependencies visit
https://github.com/openstack/cursive

View File

@ -54,9 +54,9 @@ glance-store>=0.16.0 # Apache-2.0
# Artifact repository
semantic-version>=2.3.1 # BSD
castellan>=0.4.0 # Apache-2.0
cryptography!=1.3.0,>=1.0 # BSD/Apache-2.0
debtcollector>=1.2.0 # Apache-2.0
cryptography!=1.3.0,>=1.0 # BSD/Apache-2.0
cursive>=0.1.1 # Apache-2.0
# timeutils
iso8601>=0.1.11 # MIT