Add signature_utils module
This change ports Nova's signature_utils module into the cursive library. Change-Id: Ic54dc204e41b3758bc2e8e1571d697931b371889 Partial-Bug: #1528349
This commit is contained in:
parent
016cabb018
commit
b2aba64263
|
@ -0,0 +1,52 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Cursive base exception handling"""
|
||||||
|
|
||||||
|
from cursive.i18n import _
|
||||||
|
|
||||||
|
|
||||||
|
class CursiveException(Exception):
|
||||||
|
"""Base Cursive Exception
|
||||||
|
|
||||||
|
To correctly use this class, inherit from it and define
|
||||||
|
a 'msg_fmt' property. That msg_fmt will get printf'd
|
||||||
|
with the keyword arguments provided to the constructor.
|
||||||
|
|
||||||
|
"""
|
||||||
|
msg_fmt = _("An unknown exception occurred.")
|
||||||
|
headers = {}
|
||||||
|
safe = False
|
||||||
|
|
||||||
|
def __init__(self, message=None, **kwargs):
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
if not message:
|
||||||
|
try:
|
||||||
|
message = self.msg_fmt % kwargs
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
# at least get the core message out if something happened
|
||||||
|
message = self.msg_fmt
|
||||||
|
|
||||||
|
self.message = message
|
||||||
|
super(CursiveException, self).__init__(message)
|
||||||
|
|
||||||
|
def format_message(self):
|
||||||
|
# NOTE(dane-fichter): use the first argument to the python Exception
|
||||||
|
# object which should be our full CursiveException message
|
||||||
|
return self.args[0]
|
||||||
|
|
||||||
|
|
||||||
|
class SignatureVerificationError(CursiveException):
|
||||||
|
msg_fmt = _("Signature verification for the image "
|
||||||
|
"failed: %(reason)s.")
|
|
@ -0,0 +1,36 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""oslo.i18n integration module.
|
||||||
|
|
||||||
|
See http://docs.openstack.org/developer/oslo.i18n/usage.html .
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
import oslo_i18n
|
||||||
|
|
||||||
|
DOMAIN = 'cursive'
|
||||||
|
|
||||||
|
_translators = oslo_i18n.TranslatorFactory(domain=DOMAIN)
|
||||||
|
|
||||||
|
# The primary translation function using the well-known name "_"
|
||||||
|
_ = _translators.primary
|
||||||
|
|
||||||
|
# Translators for log levels.
|
||||||
|
#
|
||||||
|
# The abbreviated names are meant to reflect the usual use of a short
|
||||||
|
# name like '_'. The "L" is for "log" and the other letter comes from
|
||||||
|
# the level.
|
||||||
|
_LI = _translators.log_info
|
||||||
|
_LW = _translators.log_warning
|
||||||
|
_LE = _translators.log_error
|
||||||
|
_LC = _translators.log_critical
|
|
@ -0,0 +1,339 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Support signature verification."""
|
||||||
|
|
||||||
|
import binascii
|
||||||
|
|
||||||
|
from castellan.common.exception import KeyManagerError
|
||||||
|
from castellan import key_manager
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import dsa
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import padding
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
|
from cryptography.hazmat.primitives import hashes
|
||||||
|
from cryptography import x509
|
||||||
|
from oslo_log import log as logging
|
||||||
|
from oslo_serialization import base64
|
||||||
|
from oslo_utils import encodeutils
|
||||||
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
|
from cursive import exception
|
||||||
|
from cursive.i18n import _, _LE
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
HASH_METHODS = {
|
||||||
|
'SHA-224': hashes.SHA224(),
|
||||||
|
'SHA-256': hashes.SHA256(),
|
||||||
|
'SHA-384': hashes.SHA384(),
|
||||||
|
'SHA-512': hashes.SHA512(),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Currently supported signature key types
|
||||||
|
# RSA Options
|
||||||
|
RSA_PSS = 'RSA-PSS'
|
||||||
|
# DSA Options
|
||||||
|
DSA = 'DSA'
|
||||||
|
|
||||||
|
# ECC curves -- note that only those with key sizes >=384 are included
|
||||||
|
# Note also that some of these may not be supported by the cryptography backend
|
||||||
|
ECC_CURVES = (
|
||||||
|
ec.SECT571K1(),
|
||||||
|
ec.SECT409K1(),
|
||||||
|
ec.SECT571R1(),
|
||||||
|
ec.SECT409R1(),
|
||||||
|
ec.SECP521R1(),
|
||||||
|
ec.SECP384R1(),
|
||||||
|
)
|
||||||
|
|
||||||
|
# These are the currently supported certificate formats
|
||||||
|
X_509 = 'X.509'
|
||||||
|
|
||||||
|
CERTIFICATE_FORMATS = {
|
||||||
|
X_509,
|
||||||
|
}
|
||||||
|
|
||||||
|
# These are the currently supported MGF formats, used for RSA-PSS signatures
|
||||||
|
MASK_GEN_ALGORITHMS = {
|
||||||
|
'MGF1': padding.MGF1,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class SignatureKeyType(object):
|
||||||
|
|
||||||
|
_REGISTERED_TYPES = {}
|
||||||
|
|
||||||
|
def __init__(self, name, public_key_type, create_verifier):
|
||||||
|
self.name = name
|
||||||
|
self.public_key_type = public_key_type
|
||||||
|
self.create_verifier = create_verifier
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def register(cls, name, public_key_type, create_verifier):
|
||||||
|
"""Register a signature key type.
|
||||||
|
|
||||||
|
:param name: the name of the signature key type
|
||||||
|
:param public_key_type: e.g. RSAPublicKey, DSAPublicKey, etc.
|
||||||
|
:param create_verifier: a function to create a verifier for this type
|
||||||
|
"""
|
||||||
|
cls._REGISTERED_TYPES[name] = cls(name,
|
||||||
|
public_key_type,
|
||||||
|
create_verifier)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def lookup(cls, name):
|
||||||
|
"""Look up the signature key type.
|
||||||
|
|
||||||
|
:param name: the name of the signature key type
|
||||||
|
:returns: the SignatureKeyType object
|
||||||
|
:raises: SignatureVerificationError if signature key type is invalid
|
||||||
|
"""
|
||||||
|
if name not in cls._REGISTERED_TYPES:
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Invalid signature key type: %s') % name)
|
||||||
|
return cls._REGISTERED_TYPES[name]
|
||||||
|
|
||||||
|
|
||||||
|
# each key type will require its own verifier
|
||||||
|
def create_verifier_for_pss(signature, hash_method, public_key):
|
||||||
|
"""Create the verifier to use when the key type is RSA-PSS.
|
||||||
|
|
||||||
|
:param signature: the decoded signature to use
|
||||||
|
:param hash_method: the hash method to use, as a cryptography object
|
||||||
|
:param public_key: the public key to use, as a cryptography object
|
||||||
|
:raises: SignatureVerificationError if the RSA-PSS specific properties
|
||||||
|
are invalid
|
||||||
|
:returns: the verifier to use to verify the signature for RSA-PSS
|
||||||
|
"""
|
||||||
|
# default to MGF1
|
||||||
|
mgf = padding.MGF1(hash_method)
|
||||||
|
|
||||||
|
# default to max salt length
|
||||||
|
salt_length = padding.PSS.MAX_LENGTH
|
||||||
|
|
||||||
|
# return the verifier
|
||||||
|
return public_key.verifier(
|
||||||
|
signature,
|
||||||
|
padding.PSS(mgf=mgf, salt_length=salt_length),
|
||||||
|
hash_method
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def create_verifier_for_ecc(signature, hash_method, public_key):
|
||||||
|
"""Create the verifier to use when the key type is ECC_*.
|
||||||
|
|
||||||
|
:param signature: the decoded signature to use
|
||||||
|
:param hash_method: the hash method to use, as a cryptography object
|
||||||
|
:param public_key: the public key to use, as a cryptography object
|
||||||
|
:returns: the verifier to use to verify the signature for ECC_*.
|
||||||
|
"""
|
||||||
|
# return the verifier
|
||||||
|
return public_key.verifier(
|
||||||
|
signature,
|
||||||
|
ec.ECDSA(hash_method)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def create_verifier_for_dsa(signature, hash_method, public_key):
|
||||||
|
"""Create the verifier to use when the key type is DSA
|
||||||
|
|
||||||
|
:param signature: the decoded signature to use
|
||||||
|
:param hash_method: the hash method to use, as a cryptography object
|
||||||
|
:param public_key: the public key to use, as a cryptography object
|
||||||
|
:returns: the verifier to use to verify the signature for DSA
|
||||||
|
"""
|
||||||
|
# return the verifier
|
||||||
|
return public_key.verifier(
|
||||||
|
signature,
|
||||||
|
hash_method
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
SignatureKeyType.register(RSA_PSS, rsa.RSAPublicKey, create_verifier_for_pss)
|
||||||
|
SignatureKeyType.register(DSA, dsa.DSAPublicKey, create_verifier_for_dsa)
|
||||||
|
|
||||||
|
# Register the elliptic curves which are supported by the backend
|
||||||
|
for curve in ECC_CURVES:
|
||||||
|
if default_backend().elliptic_curve_supported(curve):
|
||||||
|
SignatureKeyType.register('ECC_' + curve.name.upper(),
|
||||||
|
ec.EllipticCurvePublicKey,
|
||||||
|
create_verifier_for_ecc)
|
||||||
|
|
||||||
|
|
||||||
|
def get_verifier(context, img_signature_certificate_uuid,
|
||||||
|
img_signature_hash_method, img_signature,
|
||||||
|
img_signature_key_type):
|
||||||
|
"""Instantiate signature properties and use them to create a verifier.
|
||||||
|
|
||||||
|
:param context: the user context for authentication
|
||||||
|
:param img_signature_certificate_uuid:
|
||||||
|
uuid of signing certificate stored in key manager
|
||||||
|
:param img_signature_hash_method:
|
||||||
|
string denoting hash method used to compute signature
|
||||||
|
:param img_signature: string of base64 encoding of signature
|
||||||
|
:param img_signature_key_type:
|
||||||
|
string denoting type of keypair used to compute signature
|
||||||
|
:returns: instance of
|
||||||
|
cryptography.hazmat.primitives.asymmetric.AsymmetricVerificationContext
|
||||||
|
:raises: SignatureVerificationError if we fail to build the verifier
|
||||||
|
"""
|
||||||
|
image_meta_props = {'img_signature_uuid': img_signature_certificate_uuid,
|
||||||
|
'img_signature_hash_method': img_signature_hash_method,
|
||||||
|
'img_signature': img_signature,
|
||||||
|
'img_signature_key_type': img_signature_key_type}
|
||||||
|
for key in image_meta_props.keys():
|
||||||
|
if image_meta_props[key] is None:
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Required image properties for signature verification'
|
||||||
|
' do not exist. Cannot verify signature. Missing'
|
||||||
|
' property: %s') % key)
|
||||||
|
|
||||||
|
signature = get_signature(img_signature)
|
||||||
|
hash_method = get_hash_method(img_signature_hash_method)
|
||||||
|
signature_key_type = SignatureKeyType.lookup(img_signature_key_type)
|
||||||
|
public_key = get_public_key(context,
|
||||||
|
img_signature_certificate_uuid,
|
||||||
|
signature_key_type)
|
||||||
|
|
||||||
|
# create the verifier based on the signature key type
|
||||||
|
verifier = signature_key_type.create_verifier(signature,
|
||||||
|
hash_method,
|
||||||
|
public_key)
|
||||||
|
if verifier:
|
||||||
|
return verifier
|
||||||
|
else:
|
||||||
|
# Error creating the verifier
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Error occurred while creating the verifier'))
|
||||||
|
|
||||||
|
|
||||||
|
def get_signature(signature_data):
|
||||||
|
"""Decode the signature data and returns the signature.
|
||||||
|
|
||||||
|
:param signature_data: the base64-encoded signature data
|
||||||
|
:returns: the decoded signature
|
||||||
|
:raises: SignatureVerificationError if the signature data is malformatted
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
signature = base64.decode_as_bytes(signature_data)
|
||||||
|
except (TypeError, binascii.Error):
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('The signature data was not properly '
|
||||||
|
'encoded using base64'))
|
||||||
|
|
||||||
|
return signature
|
||||||
|
|
||||||
|
|
||||||
|
def get_hash_method(hash_method_name):
|
||||||
|
"""Verify the hash method name and create the hash method.
|
||||||
|
|
||||||
|
:param hash_method_name: the name of the hash method to retrieve
|
||||||
|
:returns: the hash method, a cryptography object
|
||||||
|
:raises: SignatureVerificationError if the hash method name is invalid
|
||||||
|
"""
|
||||||
|
if hash_method_name not in HASH_METHODS:
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Invalid signature hash method: %s') % hash_method_name)
|
||||||
|
|
||||||
|
return HASH_METHODS[hash_method_name]
|
||||||
|
|
||||||
|
|
||||||
|
def get_public_key(context, signature_certificate_uuid, signature_key_type):
|
||||||
|
"""Create the public key object from a retrieved certificate.
|
||||||
|
|
||||||
|
:param context: the user context for authentication
|
||||||
|
:param signature_certificate_uuid: the uuid to use to retrieve the
|
||||||
|
certificate
|
||||||
|
:param signature_key_type: a SignatureKeyType object
|
||||||
|
:returns: the public key cryptography object
|
||||||
|
:raises: SignatureVerificationError if public key format is invalid
|
||||||
|
"""
|
||||||
|
certificate = get_certificate(context, signature_certificate_uuid)
|
||||||
|
|
||||||
|
# Note that this public key could either be
|
||||||
|
# RSAPublicKey, DSAPublicKey, or EllipticCurvePublicKey
|
||||||
|
public_key = certificate.public_key()
|
||||||
|
|
||||||
|
# Confirm the type is of the type expected based on the signature key type
|
||||||
|
if not isinstance(public_key, signature_key_type.public_key_type):
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Invalid public key type for signature key type: %s')
|
||||||
|
% signature_key_type.name)
|
||||||
|
|
||||||
|
return public_key
|
||||||
|
|
||||||
|
|
||||||
|
def get_certificate(context, signature_certificate_uuid):
|
||||||
|
"""Create the certificate object from the retrieved certificate data.
|
||||||
|
|
||||||
|
:param context: the user context for authentication
|
||||||
|
:param signature_certificate_uuid: the uuid to use to retrieve the
|
||||||
|
certificate
|
||||||
|
:returns: the certificate cryptography object
|
||||||
|
:raises: SignatureVerificationError if the retrieval fails or the format
|
||||||
|
is invalid
|
||||||
|
"""
|
||||||
|
keymgr_api = key_manager.API()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# The certificate retrieved here is a castellan certificate object
|
||||||
|
cert = keymgr_api.get(context, signature_certificate_uuid)
|
||||||
|
except KeyManagerError as e:
|
||||||
|
# The problem encountered may be backend-specific, since castellan
|
||||||
|
# can use different backends. Rather than importing all possible
|
||||||
|
# backends here, the generic "Exception" is used.
|
||||||
|
msg = (_LE("Unable to retrieve certificate with ID %(id)s: %(e)s")
|
||||||
|
% {'id': signature_certificate_uuid,
|
||||||
|
'e': encodeutils.exception_to_unicode(e)})
|
||||||
|
LOG.error(msg)
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Unable to retrieve certificate with ID: %s')
|
||||||
|
% signature_certificate_uuid)
|
||||||
|
|
||||||
|
if cert.format not in CERTIFICATE_FORMATS:
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Invalid certificate format: %s') % cert.format)
|
||||||
|
|
||||||
|
if cert.format == X_509:
|
||||||
|
# castellan always encodes certificates in DER format
|
||||||
|
cert_data = cert.get_encoded()
|
||||||
|
certificate = x509.load_der_x509_certificate(cert_data,
|
||||||
|
default_backend())
|
||||||
|
|
||||||
|
# verify the certificate
|
||||||
|
verify_certificate(certificate)
|
||||||
|
|
||||||
|
return certificate
|
||||||
|
|
||||||
|
|
||||||
|
def verify_certificate(certificate):
|
||||||
|
"""Verify that the certificate has not expired.
|
||||||
|
|
||||||
|
:param certificate: the cryptography certificate object
|
||||||
|
:raises: SignatureVerificationError if the certificate valid time range
|
||||||
|
does not include now
|
||||||
|
"""
|
||||||
|
# Get now in UTC, since certificate returns times in UTC
|
||||||
|
now = timeutils.utcnow()
|
||||||
|
|
||||||
|
# Confirm the certificate valid time range includes now
|
||||||
|
if now < certificate.not_valid_before:
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Certificate is not valid before: %s UTC')
|
||||||
|
% certificate.not_valid_before)
|
||||||
|
elif now > certificate.not_valid_after:
|
||||||
|
raise exception.SignatureVerificationError(
|
||||||
|
reason=_('Certificate is not valid after: %s UTC')
|
||||||
|
% certificate.not_valid_after)
|
|
@ -1,28 +0,0 @@
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
||||||
# not use this file except in compliance with the License. You may obtain
|
|
||||||
# a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
||||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
||||||
# License for the specific language governing permissions and limitations
|
|
||||||
# under the License.
|
|
||||||
|
|
||||||
"""
|
|
||||||
test_cursive
|
|
||||||
----------------------------------
|
|
||||||
|
|
||||||
Tests for `cursive` module.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from cursive.tests import base
|
|
||||||
|
|
||||||
|
|
||||||
class TestCursive(base.TestCase):
|
|
||||||
|
|
||||||
def test_something(self):
|
|
||||||
pass
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
:mod:`cursive.tests.unit` -- Cursive Unittests
|
||||||
|
=====================================================
|
||||||
|
|
||||||
|
.. automodule:: cursive.tests.unit
|
||||||
|
:platform: Unix
|
||||||
|
"""
|
|
@ -0,0 +1,347 @@
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
from castellan.common.exception import KeyManagerError
|
||||||
|
import cryptography.exceptions as crypto_exceptions
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import dsa
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import ec
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import padding
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
|
import mock
|
||||||
|
from oslo_utils import timeutils
|
||||||
|
|
||||||
|
from cursive import exception
|
||||||
|
from cursive import signature_utils
|
||||||
|
from cursive.tests import base
|
||||||
|
|
||||||
|
TEST_RSA_PRIVATE_KEY = rsa.generate_private_key(public_exponent=3,
|
||||||
|
key_size=1024,
|
||||||
|
backend=default_backend())
|
||||||
|
|
||||||
|
# secp521r1 is assumed to be available on all supported platforms
|
||||||
|
TEST_ECC_PRIVATE_KEY = ec.generate_private_key(ec.SECP521R1(),
|
||||||
|
default_backend())
|
||||||
|
|
||||||
|
TEST_DSA_PRIVATE_KEY = dsa.generate_private_key(key_size=3072,
|
||||||
|
backend=default_backend())
|
||||||
|
|
||||||
|
|
||||||
|
class FakeKeyManager(object):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.certs = {'invalid_format_cert':
|
||||||
|
FakeCastellanCertificate('A' * 256, 'BLAH'),
|
||||||
|
'valid_format_cert':
|
||||||
|
FakeCastellanCertificate('A' * 256, 'X.509')}
|
||||||
|
|
||||||
|
def get(self, context, cert_uuid):
|
||||||
|
cert = self.certs.get(cert_uuid)
|
||||||
|
|
||||||
|
if cert is None:
|
||||||
|
raise KeyManagerError("No matching certificate found.")
|
||||||
|
|
||||||
|
return cert
|
||||||
|
|
||||||
|
|
||||||
|
class FakeCastellanCertificate(object):
|
||||||
|
|
||||||
|
def __init__(self, data, cert_format):
|
||||||
|
self.data = data
|
||||||
|
self.cert_format = cert_format
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self):
|
||||||
|
return self.cert_format
|
||||||
|
|
||||||
|
def get_encoded(self):
|
||||||
|
return self.data
|
||||||
|
|
||||||
|
|
||||||
|
class FakeCryptoCertificate(object):
|
||||||
|
|
||||||
|
def __init__(self, pub_key=TEST_RSA_PRIVATE_KEY.public_key(),
|
||||||
|
not_valid_before=(timeutils.utcnow() -
|
||||||
|
datetime.timedelta(hours=1)),
|
||||||
|
not_valid_after=(timeutils.utcnow() +
|
||||||
|
datetime.timedelta(hours=2))):
|
||||||
|
self.pub_key = pub_key
|
||||||
|
self.cert_not_valid_before = not_valid_before
|
||||||
|
self.cert_not_valid_after = not_valid_after
|
||||||
|
|
||||||
|
def public_key(self):
|
||||||
|
return self.pub_key
|
||||||
|
|
||||||
|
@property
|
||||||
|
def not_valid_before(self):
|
||||||
|
return self.cert_not_valid_before
|
||||||
|
|
||||||
|
@property
|
||||||
|
def not_valid_after(self):
|
||||||
|
return self.cert_not_valid_after
|
||||||
|
|
||||||
|
|
||||||
|
class BadPublicKey(object):
|
||||||
|
|
||||||
|
def verifier(self, signature, padding, hash_method):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class TestSignatureUtils(base.TestCase):
|
||||||
|
"""Test methods of signature_utils"""
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_PSS(self, mock_get_pub_key):
|
||||||
|
data = b'224626ae19824466f2a7f39ab7b80f7f'
|
||||||
|
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
|
||||||
|
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
|
||||||
|
signer = TEST_RSA_PRIVATE_KEY.signer(
|
||||||
|
padding.PSS(
|
||||||
|
mgf=padding.MGF1(hash_alg),
|
||||||
|
salt_length=padding.PSS.MAX_LENGTH
|
||||||
|
),
|
||||||
|
hash_alg
|
||||||
|
)
|
||||||
|
signer.update(data)
|
||||||
|
signature = base64.b64encode(signer.finalize())
|
||||||
|
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
|
||||||
|
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
|
||||||
|
hash_name, signature,
|
||||||
|
signature_utils.RSA_PSS)
|
||||||
|
verifier.update(data)
|
||||||
|
verifier.verify()
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_ECC(self, mock_get_pub_key):
|
||||||
|
data = b'224626ae19824466f2a7f39ab7b80f7f'
|
||||||
|
# test every ECC curve
|
||||||
|
for curve in signature_utils.ECC_CURVES:
|
||||||
|
key_type_name = 'ECC_' + curve.name.upper()
|
||||||
|
try:
|
||||||
|
signature_utils.SignatureKeyType.lookup(key_type_name)
|
||||||
|
except exception.SignatureVerificationError:
|
||||||
|
import warnings
|
||||||
|
warnings.warn("ECC curve '%s' not supported" % curve.name)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Create a private key to use
|
||||||
|
private_key = ec.generate_private_key(curve,
|
||||||
|
default_backend())
|
||||||
|
mock_get_pub_key.return_value = private_key.public_key()
|
||||||
|
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
|
||||||
|
signer = private_key.signer(
|
||||||
|
ec.ECDSA(hash_alg)
|
||||||
|
)
|
||||||
|
signer.update(data)
|
||||||
|
signature = base64.b64encode(signer.finalize())
|
||||||
|
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
|
||||||
|
verifier = signature_utils.get_verifier(None,
|
||||||
|
img_sig_cert_uuid,
|
||||||
|
hash_name, signature,
|
||||||
|
key_type_name)
|
||||||
|
verifier.update(data)
|
||||||
|
verifier.verify()
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_DSA(self, mock_get_pub_key):
|
||||||
|
data = b'224626ae19824466f2a7f39ab7b80f7f'
|
||||||
|
mock_get_pub_key.return_value = TEST_DSA_PRIVATE_KEY.public_key()
|
||||||
|
for hash_name, hash_alg in signature_utils.HASH_METHODS.items():
|
||||||
|
signer = TEST_DSA_PRIVATE_KEY.signer(
|
||||||
|
hash_alg
|
||||||
|
)
|
||||||
|
signer.update(data)
|
||||||
|
signature = base64.b64encode(signer.finalize())
|
||||||
|
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
|
||||||
|
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
|
||||||
|
hash_name, signature,
|
||||||
|
signature_utils.DSA)
|
||||||
|
verifier.update(data)
|
||||||
|
verifier.verify()
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_bad_signature(self, mock_get_pub_key):
|
||||||
|
data = b'224626ae19824466f2a7f39ab7b80f7f'
|
||||||
|
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
|
||||||
|
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
|
||||||
|
verifier = signature_utils.get_verifier(None, img_sig_cert_uuid,
|
||||||
|
'SHA-256', 'BLAH',
|
||||||
|
signature_utils.RSA_PSS)
|
||||||
|
verifier.update(data)
|
||||||
|
self.assertRaises(crypto_exceptions.InvalidSignature,
|
||||||
|
verifier.verify)
|
||||||
|
|
||||||
|
def test_get_verifier_invalid_image_props(self):
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Required image properties for signature'
|
||||||
|
' verification do not exist. Cannot verify'
|
||||||
|
' signature. Missing property: .*',
|
||||||
|
signature_utils.get_verifier,
|
||||||
|
None, None, 'SHA-256', 'BLAH',
|
||||||
|
signature_utils.RSA_PSS)
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_public_key')
|
||||||
|
def test_verify_signature_bad_sig_key_type(self, mock_get_pub_key):
|
||||||
|
mock_get_pub_key.return_value = TEST_RSA_PRIVATE_KEY.public_key()
|
||||||
|
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Invalid signature key type: .*',
|
||||||
|
signature_utils.get_verifier,
|
||||||
|
None, img_sig_cert_uuid, 'SHA-256',
|
||||||
|
'BLAH', 'BLAH')
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_public_key')
|
||||||
|
def test_get_verifier_none(self, mock_get_pub_key):
|
||||||
|
mock_get_pub_key.return_value = BadPublicKey()
|
||||||
|
img_sig_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0693'
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Error occurred while creating'
|
||||||
|
' the verifier',
|
||||||
|
signature_utils.get_verifier,
|
||||||
|
None, img_sig_cert_uuid, 'SHA-256',
|
||||||
|
'BLAH', signature_utils.RSA_PSS)
|
||||||
|
|
||||||
|
def test_get_signature(self):
|
||||||
|
signature = b'A' * 256
|
||||||
|
data = base64.b64encode(signature)
|
||||||
|
self.assertEqual(signature,
|
||||||
|
signature_utils.get_signature(data))
|
||||||
|
|
||||||
|
def test_get_signature_fail(self):
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'The signature data was not properly'
|
||||||
|
' encoded using base64',
|
||||||
|
signature_utils.get_signature, '///')
|
||||||
|
|
||||||
|
def test_get_hash_method(self):
|
||||||
|
hash_dict = signature_utils.HASH_METHODS
|
||||||
|
for hash_name in hash_dict.keys():
|
||||||
|
hash_class = signature_utils.get_hash_method(hash_name).__class__
|
||||||
|
self.assertIsInstance(hash_dict[hash_name], hash_class)
|
||||||
|
|
||||||
|
def test_get_hash_method_fail(self):
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Invalid signature hash method: .*',
|
||||||
|
signature_utils.get_hash_method, 'SHA-2')
|
||||||
|
|
||||||
|
def test_signature_key_type_lookup(self):
|
||||||
|
for sig_format in [signature_utils.RSA_PSS, signature_utils.DSA]:
|
||||||
|
sig_key_type = signature_utils.SignatureKeyType.lookup(sig_format)
|
||||||
|
self.assertIsInstance(sig_key_type,
|
||||||
|
signature_utils.SignatureKeyType)
|
||||||
|
self.assertEqual(sig_format, sig_key_type.name)
|
||||||
|
|
||||||
|
def test_signature_key_type_lookup_fail(self):
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Invalid signature key type: .*',
|
||||||
|
signature_utils.SignatureKeyType.lookup,
|
||||||
|
'RSB-PSS')
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_certificate')
|
||||||
|
def test_get_public_key_rsa(self, mock_get_cert):
|
||||||
|
fake_cert = FakeCryptoCertificate()
|
||||||
|
mock_get_cert.return_value = fake_cert
|
||||||
|
sig_key_type = signature_utils.SignatureKeyType.lookup(
|
||||||
|
signature_utils.RSA_PSS
|
||||||
|
)
|
||||||
|
result_pub_key = signature_utils.get_public_key(None, None,
|
||||||
|
sig_key_type)
|
||||||
|
self.assertEqual(fake_cert.public_key(), result_pub_key)
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_certificate')
|
||||||
|
def test_get_public_key_ecc(self, mock_get_cert):
|
||||||
|
fake_cert = FakeCryptoCertificate(TEST_ECC_PRIVATE_KEY.public_key())
|
||||||
|
mock_get_cert.return_value = fake_cert
|
||||||
|
sig_key_type = signature_utils.SignatureKeyType.lookup('ECC_SECP521R1')
|
||||||
|
result_pub_key = signature_utils.get_public_key(None, None,
|
||||||
|
sig_key_type)
|
||||||
|
self.assertEqual(fake_cert.public_key(), result_pub_key)
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_certificate')
|
||||||
|
def test_get_public_key_dsa(self, mock_get_cert):
|
||||||
|
fake_cert = FakeCryptoCertificate(TEST_DSA_PRIVATE_KEY.public_key())
|
||||||
|
mock_get_cert.return_value = fake_cert
|
||||||
|
sig_key_type = signature_utils.SignatureKeyType.lookup(
|
||||||
|
signature_utils.DSA
|
||||||
|
)
|
||||||
|
result_pub_key = signature_utils.get_public_key(None, None,
|
||||||
|
sig_key_type)
|
||||||
|
self.assertEqual(fake_cert.public_key(), result_pub_key)
|
||||||
|
|
||||||
|
@mock.patch('cursive.signature_utils.get_certificate')
|
||||||
|
def test_get_public_key_invalid_key(self, mock_get_certificate):
|
||||||
|
bad_pub_key = 'A' * 256
|
||||||
|
mock_get_certificate.return_value = FakeCryptoCertificate(bad_pub_key)
|
||||||
|
sig_key_type = signature_utils.SignatureKeyType.lookup(
|
||||||
|
signature_utils.RSA_PSS
|
||||||
|
)
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Invalid public key type for '
|
||||||
|
'signature key type: .*',
|
||||||
|
signature_utils.get_public_key, None,
|
||||||
|
None, sig_key_type)
|
||||||
|
|
||||||
|
@mock.patch('cryptography.x509.load_der_x509_certificate')
|
||||||
|
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
|
||||||
|
def test_get_certificate(self, mock_key_manager_API, mock_load_cert):
|
||||||
|
cert_uuid = 'valid_format_cert'
|
||||||
|
x509_cert = FakeCryptoCertificate()
|
||||||
|
mock_load_cert.return_value = x509_cert
|
||||||
|
self.assertEqual(x509_cert,
|
||||||
|
signature_utils.get_certificate(None, cert_uuid))
|
||||||
|
|
||||||
|
@mock.patch('cryptography.x509.load_der_x509_certificate')
|
||||||
|
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
|
||||||
|
def test_get_expired_certificate(self, mock_key_manager_API,
|
||||||
|
mock_load_cert):
|
||||||
|
cert_uuid = 'valid_format_cert'
|
||||||
|
x509_cert = FakeCryptoCertificate(
|
||||||
|
not_valid_after=timeutils.utcnow() -
|
||||||
|
datetime.timedelta(hours=1))
|
||||||
|
mock_load_cert.return_value = x509_cert
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Certificate is not valid after: .*',
|
||||||
|
signature_utils.get_certificate, None,
|
||||||
|
cert_uuid)
|
||||||
|
|
||||||
|
@mock.patch('cryptography.x509.load_der_x509_certificate')
|
||||||
|
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
|
||||||
|
def test_get_not_yet_valid_certificate(self, mock_key_manager_API,
|
||||||
|
mock_load_cert):
|
||||||
|
cert_uuid = 'valid_format_cert'
|
||||||
|
x509_cert = FakeCryptoCertificate(
|
||||||
|
not_valid_before=timeutils.utcnow() +
|
||||||
|
datetime.timedelta(hours=1))
|
||||||
|
mock_load_cert.return_value = x509_cert
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Certificate is not valid before: .*',
|
||||||
|
signature_utils.get_certificate, None,
|
||||||
|
cert_uuid)
|
||||||
|
|
||||||
|
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
|
||||||
|
def test_get_certificate_key_manager_fail(self, mock_key_manager_API):
|
||||||
|
bad_cert_uuid = 'fea14bc2-d75f-4ba5-bccc-b5c924ad0695'
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Unable to retrieve certificate with ID: .*',
|
||||||
|
signature_utils.get_certificate, None,
|
||||||
|
bad_cert_uuid)
|
||||||
|
|
||||||
|
@mock.patch('castellan.key_manager.API', return_value=FakeKeyManager())
|
||||||
|
def test_get_certificate_invalid_format(self, mock_API):
|
||||||
|
cert_uuid = 'invalid_format_cert'
|
||||||
|
self.assertRaisesRegex(exception.SignatureVerificationError,
|
||||||
|
'Invalid certificate format: .*',
|
||||||
|
signature_utils.get_certificate, None,
|
||||||
|
cert_uuid)
|
|
@ -3,3 +3,11 @@
|
||||||
# process, which may cause wedges in the gate later.
|
# process, which may cause wedges in the gate later.
|
||||||
|
|
||||||
pbr>=1.6 # Apache-2.0
|
pbr>=1.6 # Apache-2.0
|
||||||
|
lxml>=2.3 # BSD
|
||||||
|
cryptography!=1.3.0,>=1.0 # BSD/Apache-2.0
|
||||||
|
netifaces>=0.10.4 # MIT
|
||||||
|
six>=1.9.0 # MIT
|
||||||
|
oslo.serialization>=1.10.0 # Apache-2.0
|
||||||
|
oslo.utils>=3.16.0 # Apache-2.0
|
||||||
|
oslo.i18n>=2.1.0 # Apache-2.0
|
||||||
|
castellan>=0.4.0 # Apache-2.0
|
||||||
|
|
2
tox.ini
2
tox.ini
|
@ -59,6 +59,6 @@ commands = oslo_debug_helper {posargs}
|
||||||
# E123, E125 skipped as they are invalid PEP-8.
|
# E123, E125 skipped as they are invalid PEP-8.
|
||||||
|
|
||||||
show-source = True
|
show-source = True
|
||||||
ignore = E123,E125
|
ignore = E123,E125,H301
|
||||||
builtins = _
|
builtins = _
|
||||||
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build
|
exclude=.venv,.git,.tox,dist,doc,*lib/python*,*egg,build
|
||||||
|
|
Loading…
Reference in New Issue