Add a tool to manage x509 objects
Magnum should manage x509 objects, * To generate CA key and cert for each bay * To generate client key for magnum-conductor * To sign a public key for kube-apiserver * To sign a public key for end user This patch adds these abilities to Magnum. Change-Id: Ib5d7180a230dad635b3b570122c5af88cd1ac7a7 Partial-Implements: blueprint magnum-as-a-ca
This commit is contained in:
parent
1727c1728f
commit
0fc4ff0002
@ -1063,3 +1063,33 @@
|
||||
# This option is deprecated for removal.
|
||||
# Its value may be silently ignored in the future.
|
||||
#policy_dirs = policy.d
|
||||
|
||||
|
||||
[x509]
|
||||
|
||||
#
|
||||
# From magnum
|
||||
#
|
||||
|
||||
# Certificate can get the CA flag in x509 extensions. (boolean value)
|
||||
#allow_ca = false
|
||||
|
||||
# List of allowed x509 extensions. Available values:
|
||||
# "authorityKeyIdentifier", "subjectKeyIdentifier",
|
||||
# "authorityInfoAccess", "basicConstraints", "cRLDistributionPoints",
|
||||
# "certificatePolicies", "extendedKeyUsage", "OCSPNoCheck",
|
||||
# "inhibitAnyPolicy", "keyUsage", "nameConstraints", "subjectAltName",
|
||||
# "issuerAltName" (list value)
|
||||
#allowed_extensions = keyUsage,extendedKeyUsage,subjectAltName,basicConstraints,subjectKeyIdentifier
|
||||
|
||||
# List of allowed x509 key usage. Available values: "Digital
|
||||
# Signature", "Non Repudiation", "Key Encipherment", "Data
|
||||
# Encipherment", "Key Agreement", "Certificate Sign", "CRL Sign",
|
||||
# "Encipher Only", "Decipher Only" (list value)
|
||||
#allowed_key_usage = Digital Signature,Key Encipherment,Non Repudiation
|
||||
|
||||
# Number of days for which a certificate is valid. (integer value)
|
||||
#term_of_validity = 1825
|
||||
|
||||
# Size of generated private key. (integer value)
|
||||
#rsa_key_size = 2048
|
||||
|
@ -479,3 +479,7 @@ class X509KeyPairAlreadyExists(Conflict):
|
||||
|
||||
class CertificateStorageException(MagnumException):
|
||||
message = _("Could not store certificate: %(msg)s")
|
||||
|
||||
|
||||
class CertificateValidationError(Invalid):
|
||||
message = _("Extension '%(extension)s' not allowed")
|
||||
|
0
magnum/common/x509/__init__.py
Normal file
0
magnum/common/x509/__init__.py
Normal file
53
magnum/common/x509/config.py
Normal file
53
magnum/common/x509/config.py
Normal file
@ -0,0 +1,53 @@
|
||||
# Copyright 2015 NEC Corporation. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from magnum.common.x509 import extensions
|
||||
from magnum.i18n import _
|
||||
|
||||
ALLOWD_EXTENSIONS = ['"%s"' % e.value for e in extensions.Extensions]
|
||||
DEFAULR_ALLOWED_EXTENSIONS = [
|
||||
extensions.Extensions.KEY_USAGE.value,
|
||||
extensions.Extensions.EXTENDED_KEY_USAGE.value,
|
||||
extensions.Extensions.SUBJECT_ALTERNATIVE_NAME.value,
|
||||
extensions.Extensions.BASIC_CONSTRAINTS.value,
|
||||
extensions.Extensions.SUBJECT_KEY_IDENTIFIER.value]
|
||||
|
||||
ALLOWED_KEY_USAGE = ['"%s"' % e.value[0] for e in extensions.KeyUsages]
|
||||
DEFAULR_ALLOWED_KEY_USAGE = [
|
||||
extensions.KeyUsages.DIGITAL_SIGNATURE.value[0],
|
||||
extensions.KeyUsages.KEY_ENCIPHERMENT.value[0],
|
||||
extensions.KeyUsages.CONTENT_COMMITMENT.value[0]]
|
||||
|
||||
|
||||
x509_opts = [
|
||||
cfg.BoolOpt('allow_ca',
|
||||
default=False,
|
||||
help=_('Certificate can get the CA flag in x509 extensions.')),
|
||||
cfg.ListOpt('allowed_extensions',
|
||||
default=DEFAULR_ALLOWED_EXTENSIONS,
|
||||
help=_('List of allowed x509 extensions. Available values: '
|
||||
'%s') % ', '.join(ALLOWD_EXTENSIONS)),
|
||||
cfg.ListOpt('allowed_key_usage',
|
||||
default=DEFAULR_ALLOWED_KEY_USAGE,
|
||||
help=_('List of allowed x509 key usage. Available values: '
|
||||
'%s') % ', '.join(ALLOWED_KEY_USAGE)),
|
||||
cfg.IntOpt('term_of_validity',
|
||||
default=365 * 5,
|
||||
help=_('Number of days for which a certificate is valid.')),
|
||||
cfg.IntOpt('rsa_key_size',
|
||||
default=2048, help=_('Size of generated private key. '))]
|
||||
|
||||
cfg.CONF.register_opts(x509_opts, group='x509')
|
54
magnum/common/x509/extensions.py
Normal file
54
magnum/common/x509/extensions.py
Normal file
@ -0,0 +1,54 @@
|
||||
# Copyright 2015 NEC Corporation. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class Extensions(Enum):
|
||||
__order__ = ('AUTHORITY_KEY_IDENTIFIER SUBJECT_KEY_IDENTIFIER '
|
||||
'AUTHORITY_INFORMATION_ACCESS BASIC_CONSTRAINTS '
|
||||
'CRL_DISTRIBUTION_POINTS CERTIFICATE_POLICIES '
|
||||
'EXTENDED_KEY_USAGE OCSP_NO_CHECK INHIBIT_ANY_POLICY '
|
||||
'KEY_USAGE NAME_CONSTRAINTS SUBJECT_ALTERNATIVE_NAME '
|
||||
'ISSUER_ALTERNATIVE_NAME')
|
||||
|
||||
AUTHORITY_KEY_IDENTIFIER = "authorityKeyIdentifier"
|
||||
SUBJECT_KEY_IDENTIFIER = "subjectKeyIdentifier"
|
||||
AUTHORITY_INFORMATION_ACCESS = "authorityInfoAccess"
|
||||
BASIC_CONSTRAINTS = "basicConstraints"
|
||||
CRL_DISTRIBUTION_POINTS = "cRLDistributionPoints"
|
||||
CERTIFICATE_POLICIES = "certificatePolicies"
|
||||
EXTENDED_KEY_USAGE = "extendedKeyUsage"
|
||||
OCSP_NO_CHECK = "OCSPNoCheck"
|
||||
INHIBIT_ANY_POLICY = "inhibitAnyPolicy"
|
||||
KEY_USAGE = "keyUsage"
|
||||
NAME_CONSTRAINTS = "nameConstraints"
|
||||
SUBJECT_ALTERNATIVE_NAME = "subjectAltName"
|
||||
ISSUER_ALTERNATIVE_NAME = "issuerAltName"
|
||||
|
||||
|
||||
class KeyUsages(Enum):
|
||||
__order__ = ('DIGITAL_SIGNATURE CONTENT_COMMITMENT KEY_ENCIPHERMENT '
|
||||
'DATA_ENCIPHERMENT KEY_AGREEMENT KEY_CERT_SIGN '
|
||||
'CRL_SIGN ENCIPHER_ONLY DECIPHER_ONLY')
|
||||
|
||||
DIGITAL_SIGNATURE = ("Digital Signature", "digital_signature")
|
||||
CONTENT_COMMITMENT = ("Non Repudiation", "content_commitment")
|
||||
KEY_ENCIPHERMENT = ("Key Encipherment", "key_encipherment")
|
||||
DATA_ENCIPHERMENT = ("Data Encipherment", "data_encipherment")
|
||||
KEY_AGREEMENT = ("Key Agreement", "key_agreement")
|
||||
KEY_CERT_SIGN = ("Certificate Sign", "key_cert_sign")
|
||||
CRL_SIGN = ("CRL Sign", "crl_sign")
|
||||
ENCIPHER_ONLY = ("Encipher Only", "encipher_only")
|
||||
DECIPHER_ONLY = ("Decipher Only", "decipher_only")
|
192
magnum/common/x509/operations.py
Normal file
192
magnum/common/x509/operations.py
Normal file
@ -0,0 +1,192 @@
|
||||
# Copyright 2015 NEC Corporation. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
import uuid
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography import x509
|
||||
from cryptography.x509 import Extension
|
||||
from oslo_config import cfg
|
||||
|
||||
from magnum.common.x509 import validator
|
||||
|
||||
|
||||
cfg.CONF.import_group('x509', 'magnum.common.x509.config')
|
||||
|
||||
|
||||
def generate_ca_certificate(subject_name, encryption_password=None):
|
||||
"""Generate CA Certificate
|
||||
|
||||
:param subject_name: subject name of CA
|
||||
:param encryption_password: encryption passsword for private key
|
||||
:returns: generated private key and certificate pair
|
||||
"""
|
||||
return _generate_self_signed_certificate(
|
||||
subject_name,
|
||||
_build_ca_extentions(),
|
||||
encryption_password=encryption_password
|
||||
)
|
||||
|
||||
|
||||
def generate_client_certificate(issuer_name, subject_name, ca_key,
|
||||
encryption_password=None,
|
||||
ca_key_password=None):
|
||||
"""Generate Client Certificate
|
||||
|
||||
:param issuer_name: issuer name
|
||||
:param subject_name: subject name of client
|
||||
:param ca_key: private key of CA
|
||||
:param encryption_password: encryption passsword for private key
|
||||
:param ca_key_password: private key password for given ca key
|
||||
:returns: generated private key and certificate pair
|
||||
"""
|
||||
return _generate_certificate(issuer_name, subject_name,
|
||||
_build_client_extentions(), ca_key=ca_key,
|
||||
encryption_password=encryption_password,
|
||||
ca_key_password=ca_key_password)
|
||||
|
||||
|
||||
def _build_client_extentions():
|
||||
# Digital Signature and Key Encipherment are enabled
|
||||
key_usage = x509.KeyUsage(True, False, True, False, False, False, False,
|
||||
False, False)
|
||||
key_usage = Extension(key_usage.oid, True, key_usage)
|
||||
extended_key_usage = x509.ExtendedKeyUsage([x509.OID_CLIENT_AUTH])
|
||||
extended_key_usage = Extension(extended_key_usage.oid, False,
|
||||
extended_key_usage)
|
||||
basic_constraints = x509.BasicConstraints(ca=False, path_length=None)
|
||||
basic_constraints = Extension(basic_constraints.oid, True,
|
||||
basic_constraints)
|
||||
|
||||
return [key_usage, extended_key_usage, basic_constraints]
|
||||
|
||||
|
||||
def _build_ca_extentions():
|
||||
# Certificate Sign is enabled
|
||||
key_usage = x509.KeyUsage(False, False, False, False, False, True, False,
|
||||
False, False)
|
||||
key_usage = Extension(key_usage.oid, True, key_usage)
|
||||
basic_constraints = x509.BasicConstraints(ca=True, path_length=0)
|
||||
basic_constraints = Extension(basic_constraints.oid, True,
|
||||
basic_constraints)
|
||||
|
||||
return [basic_constraints, key_usage]
|
||||
|
||||
|
||||
def _generate_self_signed_certificate(subject_name, extensions,
|
||||
encryption_password=None):
|
||||
return _generate_certificate(subject_name, subject_name, extensions,
|
||||
encryption_password=encryption_password)
|
||||
|
||||
|
||||
def _generate_certificate(issuer_name, subject_name, extensions, ca_key=None,
|
||||
encryption_password=None, ca_key_password=None):
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=cfg.CONF.x509.rsa_key_size,
|
||||
backend=default_backend()
|
||||
)
|
||||
|
||||
# subject name is set as common name
|
||||
csr = x509.CertificateSigningRequestBuilder()
|
||||
csr = csr.subject_name(x509.Name([
|
||||
x509.NameAttribute(x509.OID_COMMON_NAME, subject_name),
|
||||
]))
|
||||
|
||||
for extention in extensions:
|
||||
csr = csr.add_extension(extention.value, critical=extention.critical)
|
||||
|
||||
# if ca_key is not provided, it means self signed
|
||||
if not ca_key:
|
||||
ca_key = private_key
|
||||
ca_key_password = encryption_password
|
||||
|
||||
csr = csr.sign(private_key, hashes.SHA256(), default_backend())
|
||||
|
||||
if encryption_password:
|
||||
encryption_algorithm = serialization.BestAvailableEncryption(
|
||||
encryption_password)
|
||||
else:
|
||||
encryption_algorithm = serialization.NoEncryption()
|
||||
|
||||
private_key = private_key.private_bytes(
|
||||
encoding = serialization.Encoding.PEM,
|
||||
format = serialization.PrivateFormat.PKCS8,
|
||||
encryption_algorithm = encryption_algorithm
|
||||
)
|
||||
|
||||
keypairs = {
|
||||
'private_key': private_key,
|
||||
'certificate': sign(
|
||||
csr,
|
||||
issuer_name,
|
||||
ca_key,
|
||||
ca_key_password=ca_key_password,
|
||||
skip_validation=True),
|
||||
}
|
||||
return keypairs
|
||||
|
||||
|
||||
def sign(csr, issuer_name, ca_key, ca_key_password=None,
|
||||
skip_validation=False):
|
||||
"""Sign a given csr
|
||||
|
||||
:param csr: certificate signing request object or pem encoded csr
|
||||
:param issuer_name: issuer name
|
||||
:param ca_key: private key of CA
|
||||
:param ca_key_password: private key password for given ca key
|
||||
:param skip_validation: skip csr validation if true
|
||||
:returns: generated certificate
|
||||
"""
|
||||
if not isinstance(ca_key, rsa.RSAPrivateKey):
|
||||
ca_key = serialization.load_pem_private_key(ca_key,
|
||||
password=ca_key_password,
|
||||
backend=default_backend())
|
||||
if not isinstance(csr, x509.CertificateSigningRequest):
|
||||
csr = x509.load_pem_x509_csr(csr, backend=default_backend())
|
||||
|
||||
term_of_validity = cfg.CONF.x509.term_of_validity
|
||||
one_day = datetime.timedelta(1, 0, 0)
|
||||
expire_after = datetime.timedelta(term_of_validity, 0, 0)
|
||||
|
||||
builder = x509.CertificateBuilder()
|
||||
builder = builder.subject_name(csr.subject)
|
||||
# issuer_name is set as common name
|
||||
builder = builder.issuer_name(x509.Name([
|
||||
x509.NameAttribute(x509.OID_COMMON_NAME, issuer_name),
|
||||
]))
|
||||
builder = builder.not_valid_before(datetime.datetime.today() - one_day)
|
||||
builder = builder.not_valid_after(datetime.datetime.today() + expire_after)
|
||||
builder = builder.serial_number(int(uuid.uuid4()))
|
||||
builder = builder.public_key(csr.public_key())
|
||||
|
||||
if skip_validation:
|
||||
extensions = csr.extensions
|
||||
else:
|
||||
extensions = validator.filter_extensions(csr.extensions)
|
||||
|
||||
for extention in extensions:
|
||||
builder = builder.add_extension(extention.value,
|
||||
critical=extention.critical)
|
||||
|
||||
certificate = builder.sign(
|
||||
private_key=ca_key, algorithm=hashes.SHA256(),
|
||||
backend=default_backend()
|
||||
).public_bytes(serialization.Encoding.PEM)
|
||||
|
||||
return certificate
|
106
magnum/common/x509/validator.py
Normal file
106
magnum/common/x509/validator.py
Normal file
@ -0,0 +1,106 @@
|
||||
# Copyright 2015 NEC Corporation. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cryptography import x509
|
||||
import logging
|
||||
from oslo_config import cfg
|
||||
|
||||
from magnum.common.exception import CertificateValidationError
|
||||
from magnum.common.x509 import extensions
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_CA_KEY_USAGES = [
|
||||
extensions.KeyUsages.KEY_CERT_SIGN.value[0],
|
||||
extensions.KeyUsages.CRL_SIGN.value[0]
|
||||
]
|
||||
|
||||
cfg.CONF.import_group('x509', 'magnum.common.x509.config')
|
||||
|
||||
|
||||
def filter_extensions(extensions):
|
||||
filtered_extensions = []
|
||||
allowed_key_usage = set(cfg.CONF.x509.allowed_key_usage)
|
||||
if not cfg.CONF.x509.allow_ca:
|
||||
allowed_key_usage = _remove_ca_key_usage(allowed_key_usage)
|
||||
|
||||
for ext in filter_allowed_extensions(extensions,
|
||||
cfg.CONF.x509.allowed_extensions):
|
||||
if ext.oid == x509.OID_KEY_USAGE:
|
||||
ext = _merge_key_usage(ext, allowed_key_usage)
|
||||
elif ext.oid == x509.OID_BASIC_CONSTRAINTS:
|
||||
if not cfg.CONF.x509.allow_ca:
|
||||
ext = _disallow_ca_in_basic_constraints(ext)
|
||||
|
||||
filtered_extensions.append(ext)
|
||||
|
||||
return filtered_extensions
|
||||
|
||||
|
||||
def filter_allowed_extensions(extensions, allowed_extensions=None):
|
||||
"""Ensure only accepted extensions are used."""
|
||||
allowed_extensions = allowed_extensions or []
|
||||
|
||||
for ext in extensions:
|
||||
ext_name = x509.oid._OID_NAMES.get(ext.oid, None)
|
||||
if ext_name in allowed_extensions:
|
||||
yield ext
|
||||
else:
|
||||
if ext.critical:
|
||||
raise CertificateValidationError(extension=ext)
|
||||
|
||||
|
||||
def _merge_key_usage(key_usage, allowed_key_usage):
|
||||
critical = key_usage.critical
|
||||
key_usage_value = key_usage.value
|
||||
|
||||
usages = []
|
||||
for usage in extensions.KeyUsages:
|
||||
k, v = usage.value
|
||||
try:
|
||||
value = getattr(key_usage_value, v)
|
||||
except ValueError:
|
||||
# ValueError is raised when encipher_only/decipher_only is
|
||||
# retrieved but key_agreement is False
|
||||
value = False
|
||||
if value:
|
||||
if k not in allowed_key_usage:
|
||||
if critical:
|
||||
raise CertificateValidationError(extension=key_usage)
|
||||
else:
|
||||
value = False
|
||||
usages.append(value)
|
||||
|
||||
rtn = x509.KeyUsage(*usages)
|
||||
return x509.Extension(rtn.oid, critical, rtn)
|
||||
|
||||
|
||||
def _remove_ca_key_usage(allowed_key_usage):
|
||||
for usage in _CA_KEY_USAGES:
|
||||
try:
|
||||
allowed_key_usage.remove(usage)
|
||||
except KeyError:
|
||||
pass
|
||||
return allowed_key_usage
|
||||
|
||||
|
||||
def _disallow_ca_in_basic_constraints(basic_constraints):
|
||||
if basic_constraints.value.ca:
|
||||
if basic_constraints.critical:
|
||||
raise CertificateValidationError(extension=basic_constraints)
|
||||
|
||||
bc = x509.BasicConstraints(False, None)
|
||||
return x509.Extension(bc.oid, False, bc)
|
||||
|
||||
return basic_constraints
|
@ -23,6 +23,7 @@ import magnum.common.clients
|
||||
import magnum.common.exception
|
||||
import magnum.common.magnum_keystoneclient
|
||||
import magnum.common.service
|
||||
import magnum.common.x509.config
|
||||
import magnum.conductor.config
|
||||
import magnum.conductor.handlers.bay_conductor
|
||||
import magnum.conductor.handlers.docker_conductor
|
||||
@ -50,6 +51,7 @@ def list_opts():
|
||||
('heat_client', magnum.common.clients.heat_client_opts),
|
||||
('glance_client', magnum.common.clients.glance_client_opts),
|
||||
('barbican_client', magnum.common.clients.barbican_client_opts),
|
||||
('x509', magnum.common.x509.config.x509_opts),
|
||||
('bay_heat', magnum.conductor.handlers.bay_conductor.bay_heat_opts),
|
||||
('certificates',
|
||||
itertools.chain(magnum.common.cert_manager.cert_manager_opts,
|
||||
|
0
magnum/tests/unit/common/x509/__init__.py
Normal file
0
magnum/tests/unit/common/x509/__init__.py
Normal file
175
magnum/tests/unit/common/x509/test_sign.py
Normal file
175
magnum/tests/unit/common/x509/test_sign.py
Normal file
@ -0,0 +1,175 @@
|
||||
# Copyright 2015 NEC Corporation. All rights reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography import x509 as c_x509
|
||||
from cryptography.x509.oid import NameOID
|
||||
|
||||
from magnum.common.x509 import operations
|
||||
from magnum.tests import base
|
||||
|
||||
|
||||
class TestX509(base.BaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestX509, self).setUp()
|
||||
self.issuer_name = six.u("fake-issuer")
|
||||
self.subject_name = six.u("fake-subject")
|
||||
self.ca_encryption_password = six.b("fake-ca-password")
|
||||
self.encryption_password = six.b("fake-password")
|
||||
|
||||
def _load_pems(self, keypairs, encryption_password):
|
||||
private_key = serialization.load_pem_private_key(
|
||||
keypairs['private_key'],
|
||||
password=encryption_password,
|
||||
backend=default_backend(),
|
||||
)
|
||||
certificate = c_x509.load_pem_x509_certificate(
|
||||
keypairs['certificate'], default_backend())
|
||||
|
||||
return certificate, private_key
|
||||
|
||||
def _generate_ca_certificate(self, issuer_name=None):
|
||||
issuer_name = issuer_name or self.issuer_name
|
||||
keypairs = operations.generate_ca_certificate(
|
||||
issuer_name, encryption_password=self.ca_encryption_password)
|
||||
|
||||
return self._load_pems(keypairs, self.ca_encryption_password)
|
||||
|
||||
def _generate_client_certificate(self, issuer_name, subject_name):
|
||||
ca = operations.generate_ca_certificate(
|
||||
self.issuer_name, encryption_password=self.ca_encryption_password)
|
||||
keypairs = operations.generate_client_certificate(
|
||||
self.issuer_name,
|
||||
self.subject_name,
|
||||
ca['private_key'],
|
||||
encryption_password=self.encryption_password,
|
||||
ca_key_password=self.ca_encryption_password,
|
||||
)
|
||||
|
||||
return self._load_pems(keypairs, self.encryption_password)
|
||||
|
||||
def _public_bytes(self, public_key):
|
||||
return public_key.public_bytes(
|
||||
serialization.Encoding.PEM,
|
||||
serialization.PublicFormat.SubjectPublicKeyInfo
|
||||
)
|
||||
|
||||
def _generate_private_key(self):
|
||||
return rsa.generate_private_key(
|
||||
public_exponent=65537,
|
||||
key_size=2048,
|
||||
backend=default_backend()
|
||||
)
|
||||
|
||||
def _build_csr(self, private_key):
|
||||
csr = c_x509.CertificateSigningRequestBuilder()
|
||||
csr = csr.subject_name(c_x509.Name([
|
||||
c_x509.NameAttribute(NameOID.COMMON_NAME, self.subject_name)
|
||||
]))
|
||||
|
||||
return csr.sign(private_key, hashes.SHA256(), default_backend())
|
||||
|
||||
def assertHasPublicKey(self, keypairs):
|
||||
key = keypairs[1]
|
||||
cert = keypairs[0]
|
||||
|
||||
self.assertEqual(self._public_bytes(key.public_key()),
|
||||
self._public_bytes(cert.public_key()))
|
||||
|
||||
def assertHasSubjectName(self, cert, subject_name):
|
||||
actual_subject_name = cert.subject.get_attributes_for_oid(
|
||||
c_x509.NameOID.COMMON_NAME)
|
||||
actual_subject_name = actual_subject_name[0].value
|
||||
|
||||
self.assertEqual(subject_name, actual_subject_name)
|
||||
|
||||
def assertHasIssuerName(self, cert, issuer_name):
|
||||
actual_issuer_name = cert.issuer.get_attributes_for_oid(
|
||||
c_x509.NameOID.COMMON_NAME)
|
||||
actual_issuer_name = actual_issuer_name[0].value
|
||||
|
||||
self.assertEqual(issuer_name, actual_issuer_name)
|
||||
|
||||
def assertInClientExtensions(self, cert):
|
||||
key_usage = c_x509.KeyUsage(True, False, True, False, False, False,
|
||||
False, False, False)
|
||||
key_usage = c_x509.Extension(key_usage.oid, True, key_usage)
|
||||
extended_key_usage = c_x509.ExtendedKeyUsage([c_x509.OID_CLIENT_AUTH])
|
||||
extended_key_usage = c_x509.Extension(extended_key_usage.oid, False,
|
||||
extended_key_usage)
|
||||
basic_constraints = c_x509.BasicConstraints(ca=False, path_length=None)
|
||||
basic_constraints = c_x509.Extension(basic_constraints.oid, True,
|
||||
basic_constraints)
|
||||
|
||||
self.assertIn(key_usage, cert.extensions)
|
||||
self.assertIn(extended_key_usage, cert.extensions)
|
||||
self.assertIn(basic_constraints, cert.extensions)
|
||||
|
||||
def test_generate_ca_certificate_has_publickey(self):
|
||||
keypairs = self._generate_ca_certificate(self.issuer_name)
|
||||
|
||||
self.assertHasPublicKey(keypairs)
|
||||
|
||||
def test_generate_ca_certificate_set_subject_name(self):
|
||||
cert, _ = self._generate_ca_certificate(self.issuer_name)
|
||||
|
||||
self.assertHasSubjectName(cert, self.issuer_name)
|
||||
|
||||
def test_generate_ca_certificate_set_issuer_name(self):
|
||||
cert, _ = self._generate_ca_certificate(self.issuer_name)
|
||||
|
||||
self.assertHasIssuerName(cert, self.issuer_name)
|
||||
|
||||
def test_generate_ca_certificate_set_extentions_as_ca(self):
|
||||
cert, _ = self._generate_ca_certificate(self.issuer_name)
|
||||
|
||||
key_usage = c_x509.KeyUsage(False, False, False, False, False, True,
|
||||
False, False, False)
|
||||
key_usage = c_x509.Extension(key_usage.oid, True, key_usage)
|
||||
basic_constraints = c_x509.BasicConstraints(ca=True, path_length=0)
|
||||
basic_constraints = c_x509.Extension(basic_constraints.oid, True,
|
||||
basic_constraints)
|
||||
|
||||
self.assertIn(key_usage, cert.extensions)
|
||||
self.assertIn(basic_constraints, cert.extensions)
|
||||
|
||||
def test_generate_client_certificate_has_publickey(self):
|
||||
keypairs = self._generate_client_certificate(
|
||||
self.issuer_name, self.subject_name)
|
||||
|
||||
self.assertHasPublicKey(keypairs)
|
||||
|
||||
def test_generate_client_certificate_set_subject_name(self):
|
||||
cert, _ = self._generate_client_certificate(
|
||||
self.issuer_name, self.subject_name)
|
||||
|
||||
self.assertHasSubjectName(cert, self.subject_name)
|
||||
|
||||
def test_generate_client_certificate_set_issuer_name(self):
|
||||
cert, key = self._generate_client_certificate(
|
||||
self.issuer_name, self.subject_name)
|
||||
|
||||
self.assertHasIssuerName(cert, self.issuer_name)
|
||||
|
||||
def test_generate_client_certificate_set_extentions_as_client(self):
|
||||
cert, key = self._generate_client_certificate(
|
||||
self.issuer_name, self.subject_name)
|
||||
|
||||
self.assertInClientExtensions(cert)
|
120
magnum/tests/unit/common/x509/test_validator.py
Normal file
120
magnum/tests/unit/common/x509/test_validator.py
Normal file
@ -0,0 +1,120 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import unittest
|
||||
|
||||
from cryptography import x509 as c_x509
|
||||
|
||||
from magnum.common.exception import CertificateValidationError
|
||||
from magnum.common.x509 import validator as v
|
||||
|
||||
|
||||
class TestValidators(unittest.TestCase):
|
||||
def setUp(self):
|
||||
super(TestValidators, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(TestValidators, self).tearDown()
|
||||
|
||||
def _build_key_usage(self, critical=False):
|
||||
# Digital Signature and Key Encipherment are enabled
|
||||
key_usage = c_x509.KeyUsage(
|
||||
True, False, True, False, False, False, False, False, False)
|
||||
return c_x509.Extension(key_usage.oid, critical, key_usage)
|
||||
|
||||
def _build_basic_constraints(self, ca=False, critical=False):
|
||||
bc = c_x509.BasicConstraints(ca, None)
|
||||
return c_x509.Extension(bc.oid, critical, bc)
|
||||
|
||||
def test_filter_allowed_extensions(self):
|
||||
key_usage = self._build_key_usage(critical=True)
|
||||
|
||||
actual = [e for e in v.filter_allowed_extensions([key_usage],
|
||||
['keyUsage'])]
|
||||
self.assertEqual(actual, [key_usage])
|
||||
|
||||
def test_filter_allowed_extensions_disallowed_but_not_critical(self):
|
||||
key_usage = self._build_key_usage()
|
||||
|
||||
actual = [e for e in v.filter_allowed_extensions([key_usage],
|
||||
['subjectAltName'])]
|
||||
|
||||
self.assertEqual(actual, [])
|
||||
|
||||
def test_filter_allowed_extensions_disallowed(self):
|
||||
key_usage = self._build_key_usage(critical=True)
|
||||
|
||||
with self.assertRaises(CertificateValidationError):
|
||||
v.filter_allowed_extensions([key_usage], ['subjectAltName']).next()
|
||||
|
||||
def test_merge_key_usage(self):
|
||||
key_usage = self._build_key_usage(critical=True)
|
||||
|
||||
self.assertEqual(v._merge_key_usage(
|
||||
key_usage, ['Digital Signature', 'Key Encipherment']), key_usage)
|
||||
|
||||
def test_merge_key_usage_disallowed_but_not_critical(self):
|
||||
key_usage = self._build_key_usage()
|
||||
expected = c_x509.KeyUsage(
|
||||
True, False, False, False, False, False, False, False, False)
|
||||
expected = c_x509.Extension(expected.oid, False, expected)
|
||||
|
||||
self.assertEqual(v._merge_key_usage(
|
||||
key_usage, ['Digital Signature']), expected)
|
||||
|
||||
def test_merge_key_usage_disallowed(self):
|
||||
key_usage = self._build_key_usage(critical=True)
|
||||
|
||||
with self.assertRaises(CertificateValidationError):
|
||||
v._merge_key_usage(key_usage, ['Digital Signature'])
|
||||
|
||||
def test_disallow_ca_in_basic_constraints_not_critical(self):
|
||||
bc = self._build_basic_constraints(ca=True)
|
||||
expected = self._build_basic_constraints(ca=False)
|
||||
|
||||
self.assertEqual(v._disallow_ca_in_basic_constraints(bc), expected)
|
||||
|
||||
def test_disallow_ca_in_basic_constraints(self):
|
||||
bc = self._build_basic_constraints(ca=True, critical=True)
|
||||
|
||||
with self.assertRaises(CertificateValidationError):
|
||||
v._disallow_ca_in_basic_constraints(bc)
|
||||
|
||||
def test_disallow_ca_in_basic_constraints_with_non_ca(self):
|
||||
bc = self._build_basic_constraints(ca=False)
|
||||
|
||||
self.assertEqual(v._disallow_ca_in_basic_constraints(bc), bc)
|
||||
|
||||
def test_remove_ca_key_usage(self):
|
||||
contains_ca_key_usage = set([
|
||||
"Digital Signature", "Certificate Sign", "CRL Sign"])
|
||||
|
||||
self.assertEqual(v._remove_ca_key_usage(contains_ca_key_usage),
|
||||
set(["Digital Signature"]))
|
||||
|
||||
def test_remove_ca_key_usage_cert_sign(self):
|
||||
contains_ca_key_usage = set(["Digital Signature", "Certificate Sign"])
|
||||
|
||||
self.assertEqual(v._remove_ca_key_usage(contains_ca_key_usage),
|
||||
set(["Digital Signature"]))
|
||||
|
||||
def test_remove_ca_key_usage_crl_sign(self):
|
||||
contains_ca_key_usage = set(["Digital Signature", "CRL Sign"])
|
||||
|
||||
self.assertEqual(v._remove_ca_key_usage(contains_ca_key_usage),
|
||||
set(["Digital Signature"]))
|
||||
|
||||
def test_remove_ca_key_usage_without_ca_usage(self):
|
||||
contains_ca_key_usage = set(["Digital Signature"])
|
||||
|
||||
self.assertEqual(v._remove_ca_key_usage(contains_ca_key_usage),
|
||||
set(["Digital Signature"]))
|
@ -44,3 +44,4 @@ requests>=2.5.2
|
||||
six>=1.9.0
|
||||
stevedore>=1.5.0 # Apache-2.0
|
||||
taskflow>=1.16.0
|
||||
cryptography>=1.0 # Apache-2.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user