Local development implementation for Certificates
A basic local filesystem implementation of CertManager and a local pyOpenSSL implementation of CertGenerator. Change-Id: I0eb0476afaad8a1bbb2eaaf90564eb63f7872546 Partially-implements: blueprint tls-data-security
This commit is contained in:
parent
1c873900b2
commit
1e866f3ba2
@ -25,5 +25,12 @@
|
||||
# admin_project_id = service
|
||||
|
||||
[certificates]
|
||||
# cert_generator_class =
|
||||
# cert_manager_class =
|
||||
# cert_generator_class = octavia.certificates.generator.LocalCertGenerator
|
||||
# cert_manager_class = octavia.certificates.manager.LocalCertManager
|
||||
|
||||
# For local certificate signing (development only):
|
||||
# ca_certificate = /etc/ssl/certs/ssl-cert-snakeoil.pem
|
||||
# ca_private_key = /etc/ssl/private/ssl-cert-snakeoil.key
|
||||
# ca_private_key_passphrase =
|
||||
# signing_digest = sha265
|
||||
# storage_path = /var/lib/octavia/certificates/
|
||||
|
87
octavia/certificates/common/local.py
Normal file
87
octavia/certificates/common/local.py
Normal file
@ -0,0 +1,87 @@
|
||||
# Copyright (c) 2014 Rackspace US, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""
|
||||
Common classes for local filesystem certificate handling
|
||||
"""
|
||||
import os
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from octavia.certificates.common import cert
|
||||
|
||||
TLS_CERT_DEFAULT = os.environ.get(
|
||||
'OS_OCTAVIA_TLS_CA_CERT', '/etc/ssl/certs/ssl-cert-snakeoil.pem'
|
||||
)
|
||||
TLS_KEY_DEFAULT = os.environ.get(
|
||||
'OS_OCTAVIA_TLS_CA_KEY', '/etc/ssl/private/ssl-cert-snakeoil.key'
|
||||
)
|
||||
TLS_PKP_DEFAULT = os.environ.get('OS_OCTAVIA_CA_KEY_PASS')
|
||||
TLS_DIGEST_DEFAULT = os.environ.get('OS_OCTAVIA_CA_SIGNING_DIGEST', 'sha256')
|
||||
TLS_STORAGE_DEFAULT = os.environ.get(
|
||||
'OS_OCTAVIA_TLS_STORAGE', '/var/lib/octavia/certificates/'
|
||||
)
|
||||
|
||||
certgen_opts = [
|
||||
cfg.StrOpt('ca_certificate',
|
||||
default=TLS_CERT_DEFAULT,
|
||||
help='Absolute path to the CA Certificate for signing. Defaults'
|
||||
' to env[OS_OCTAVIA_TLS_CA_CERT].'),
|
||||
cfg.StrOpt('ca_private_key',
|
||||
default=TLS_KEY_DEFAULT,
|
||||
help='Absolute path to the Private Key for signing. Defaults'
|
||||
' to env[OS_OCTAVIA_TLS_CA_KEY].'),
|
||||
cfg.StrOpt('ca_private_key_passphrase',
|
||||
default=TLS_PKP_DEFAULT,
|
||||
help='Passphrase for the Private Key. Defaults'
|
||||
' to env[OS_OCTAVIA_CA_KEY_PASS] or None.'),
|
||||
cfg.StrOpt('signing_digest',
|
||||
default=TLS_DIGEST_DEFAULT,
|
||||
help='Certificate signing digest. Defaults'
|
||||
' to env[OS_OCTAVIA_CA_SIGNING_DIGEST] or "sha256".')
|
||||
]
|
||||
|
||||
certmgr_opts = [
|
||||
cfg.StrOpt('storage_path',
|
||||
default=TLS_STORAGE_DEFAULT,
|
||||
help='Absolute path to the certificate storage directory. '
|
||||
'Defaults to env[OS_OCTAVIA_TLS_STORAGE].')
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(certgen_opts, group='certificates')
|
||||
CONF.register_opts(certmgr_opts, group='certificates')
|
||||
|
||||
|
||||
class LocalCert(cert.Cert):
|
||||
"""Representation of a Cert for local storage."""
|
||||
def __init__(self, certificate, private_key, intermediates=None,
|
||||
private_key_passphrase=None):
|
||||
self.certificate = certificate
|
||||
self.intermediates = intermediates
|
||||
self.private_key = private_key
|
||||
self.private_key_passphrase = private_key_passphrase
|
||||
|
||||
def get_certificate(self):
|
||||
return self.certificate
|
||||
|
||||
def get_intermediates(self):
|
||||
return self.intermediates
|
||||
|
||||
def get_private_key(self):
|
||||
return self.private_key
|
||||
|
||||
def get_private_key_passphrase(self):
|
||||
return self.private_key_passphrase
|
@ -19,7 +19,7 @@ from octavia.openstack.common import importutils
|
||||
|
||||
certgen_opts = [
|
||||
cfg.StrOpt('cert_generator_class',
|
||||
default='octavia.certificates.barbican.BarbicanCertGenerator',
|
||||
default='octavia.certificates.generator.LocalCertGenerator',
|
||||
help='The full class name of the cert generator API class'),
|
||||
]
|
||||
|
||||
|
108
octavia/certificates/generator/local.py
Normal file
108
octavia/certificates/generator/local.py
Normal file
@ -0,0 +1,108 @@
|
||||
# Copyright (c) 2014 Rackspace US, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import binascii
|
||||
import os
|
||||
|
||||
from OpenSSL import crypto
|
||||
from oslo.config import cfg
|
||||
|
||||
from octavia.certificates.generator import cert_gen
|
||||
from octavia.common import exceptions
|
||||
from octavia.openstack.common import gettextutils
|
||||
from octavia.openstack.common import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class LocalCertGenerator(cert_gen.CertGenerator):
|
||||
"""Cert Generator Interface that signs certs locally."""
|
||||
|
||||
@staticmethod
|
||||
def _new_serial():
|
||||
return int(binascii.hexlify(os.urandom(20)), 16)
|
||||
|
||||
@staticmethod
|
||||
def sign_cert(csr, validity, ca_cert=None, ca_key=None, ca_key_pass=None,
|
||||
ca_digest=None):
|
||||
"""Signs a certificate using our private CA based on the specified CSR
|
||||
|
||||
The signed certificate will be valid from now until <validity> seconds
|
||||
from now.
|
||||
|
||||
:param csr: A Certificate Signing Request
|
||||
:param validity: Valid for <validity> seconds from the current time
|
||||
:param ca_cert: Signing Certificate (default: config)
|
||||
:param ca_key: Signing Certificate Key (default: config)
|
||||
:param ca_key_pass: Signing Certificate Key Pass (default: config)
|
||||
:param ca_digest: Digest method to use for signing (default: config)
|
||||
|
||||
:return: Signed certificate
|
||||
:raises Exception: if certificate signing fails
|
||||
"""
|
||||
LOG.info(gettextutils._LI(
|
||||
"Signing a certificate request using pyOpenSSL locally."
|
||||
))
|
||||
if not ca_cert:
|
||||
LOG.info(gettextutils._LI("Using CA Certificate from config."))
|
||||
try:
|
||||
ca_cert = open(CONF.certificates.ca_certificate).read()
|
||||
except IOError:
|
||||
raise exceptions.CertificateGenerationException(
|
||||
"Failed to load {0}."
|
||||
.format(CONF.certificates.ca_certificate)
|
||||
)
|
||||
if not ca_key:
|
||||
LOG.info(gettextutils._LI("Using CA Private Key from config."))
|
||||
try:
|
||||
ca_key = open(CONF.certificates.ca_private_key).read()
|
||||
except IOError:
|
||||
raise exceptions.CertificateGenerationException(
|
||||
"Failed to load {0}."
|
||||
.format(CONF.certificates.ca_certificate)
|
||||
)
|
||||
if not ca_key_pass:
|
||||
ca_key_pass = CONF.certificates.ca_private_key_passphrase
|
||||
if ca_key_pass:
|
||||
LOG.info(gettextutils._LI(
|
||||
"Using CA Private Key Passphrase from config."
|
||||
))
|
||||
else:
|
||||
LOG.info(gettextutils._LI(
|
||||
"No Passphrase found for CA Private Key, not using one."
|
||||
))
|
||||
if not ca_digest:
|
||||
ca_digest = CONF.certificates.signing_digest
|
||||
|
||||
try:
|
||||
lo_cert = crypto.load_certificate(crypto.FILETYPE_PEM, ca_cert)
|
||||
lo_key = crypto.load_privatekey(crypto.FILETYPE_PEM, ca_key,
|
||||
passphrase=ca_key_pass)
|
||||
lo_req = crypto.load_certificate_request(crypto.FILETYPE_PEM, csr)
|
||||
|
||||
new_cert = crypto.X509()
|
||||
new_cert.set_serial_number(LocalCertGenerator._new_serial())
|
||||
new_cert.gmtime_adj_notBefore(0)
|
||||
new_cert.gmtime_adj_notAfter(validity)
|
||||
new_cert.set_issuer(lo_cert.get_subject())
|
||||
new_cert.set_subject(lo_req.get_subject())
|
||||
new_cert.set_pubkey(lo_req.get_pubkey())
|
||||
new_cert.sign(lo_key, ca_digest)
|
||||
|
||||
return crypto.dump_certificate(crypto.FILETYPE_PEM, new_cert)
|
||||
except Exception as e:
|
||||
LOG.error(gettextutils._LE("Unable to sign certificate."))
|
||||
raise exceptions.CertificateGenerationException(e)
|
@ -19,7 +19,7 @@ from octavia.openstack.common import importutils
|
||||
|
||||
certmgr_opts = [
|
||||
cfg.StrOpt('cert_manager_class',
|
||||
default='octavia.certificates.barbican.BarbicanCertManager',
|
||||
default='octavia.certificates.manager.LocalCertManager',
|
||||
help='The full class name of the cert manager API class'),
|
||||
]
|
||||
|
||||
|
167
octavia/certificates/manager/local.py
Normal file
167
octavia/certificates/manager/local.py
Normal file
@ -0,0 +1,167 @@
|
||||
# Copyright (c) 2014 Rackspace US, Inc
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import os
|
||||
import uuid
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from octavia.certificates.common import local as local_common
|
||||
from octavia.certificates.manager import cert_mgr
|
||||
from octavia.common import exceptions
|
||||
from octavia.openstack.common import gettextutils
|
||||
from octavia.openstack.common import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class LocalCertManager(cert_mgr.CertManager):
|
||||
"""Cert Manager Interface that stores data locally."""
|
||||
|
||||
@staticmethod
|
||||
def store_cert(certificate, private_key, intermediates=None,
|
||||
private_key_passphrase=None, **kwargs):
|
||||
"""Stores (i.e., registers) a cert with the cert manager.
|
||||
|
||||
This method stores the specified cert to the filesystem and returns
|
||||
a UUID that can be used to retrieve it.
|
||||
|
||||
:param certificate: PEM encoded TLS certificate
|
||||
:param private_key: private key for the supplied certificate
|
||||
:param intermediates: ordered and concatenated intermediate certs
|
||||
:param private_key_passphrase: optional passphrase for the supplied key
|
||||
|
||||
:returns: the UUID of the stored cert
|
||||
:raises CertificateStorageException: if certificate storage fails
|
||||
"""
|
||||
cert_ref = str(uuid.uuid4())
|
||||
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
|
||||
|
||||
LOG.info(gettextutils._LI(
|
||||
"Storing certificate data on the local filesystem."
|
||||
))
|
||||
try:
|
||||
filename_certificate = "{0}.crt".format(filename_base, cert_ref)
|
||||
with open(filename_certificate, 'w') as cert_file:
|
||||
cert_file.write(certificate)
|
||||
|
||||
filename_private_key = "{0}.key".format(filename_base, cert_ref)
|
||||
with open(filename_private_key, 'w') as key_file:
|
||||
key_file.write(private_key)
|
||||
|
||||
if intermediates:
|
||||
filename_intermediates = "{0}.int".format(filename_base,
|
||||
cert_ref)
|
||||
with open(filename_intermediates, 'w') as int_file:
|
||||
int_file.write(intermediates)
|
||||
|
||||
if private_key_passphrase:
|
||||
filename_pkp = "{0}.pass".format(filename_base, cert_ref)
|
||||
with open(filename_pkp, 'w') as pass_file:
|
||||
pass_file.write(private_key_passphrase)
|
||||
except IOError as ioe:
|
||||
LOG.error(gettextutils._LE("Failed to store certificate."))
|
||||
raise exceptions.CertificateStorageException(message=ioe.message)
|
||||
|
||||
return cert_ref
|
||||
|
||||
@staticmethod
|
||||
def get_cert(cert_ref, **kwargs):
|
||||
"""Retrieves the specified cert.
|
||||
|
||||
:param cert_ref: the UUID of the cert to retrieve
|
||||
|
||||
:return: octavia.certificates.common.Cert representation of the
|
||||
certificate data
|
||||
:raises CertificateStorageException: if certificate retrieval fails
|
||||
"""
|
||||
LOG.info(gettextutils._LI(
|
||||
"Loading certificate {0} from the local filesystem."
|
||||
).format(cert_ref))
|
||||
|
||||
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
|
||||
|
||||
filename_certificate = "{0}.crt".format(filename_base, cert_ref)
|
||||
filename_private_key = "{0}.key".format(filename_base, cert_ref)
|
||||
filename_intermediates = "{0}.int".format(filename_base, cert_ref)
|
||||
filename_pkp = "{0}.pass".format(filename_base, cert_ref)
|
||||
|
||||
cert_data = dict()
|
||||
|
||||
try:
|
||||
with open(filename_certificate, 'r') as cert_file:
|
||||
cert_data['certificate'] = cert_file.read()
|
||||
except IOError:
|
||||
LOG.error(gettextutils._LE(
|
||||
"Failed to read certificate for {0}."
|
||||
).format(cert_ref))
|
||||
raise exceptions.CertificateStorageException(
|
||||
msg="Certificate could not be read."
|
||||
)
|
||||
try:
|
||||
with open(filename_private_key, 'r') as key_file:
|
||||
cert_data['private_key'] = key_file.read()
|
||||
except IOError:
|
||||
LOG.error(gettextutils._LE(
|
||||
"Failed to read private key for {0}."
|
||||
).format(cert_ref))
|
||||
raise exceptions.CertificateStorageException(
|
||||
msg="Private Key could not be read."
|
||||
)
|
||||
|
||||
try:
|
||||
with open(filename_intermediates, 'r') as int_file:
|
||||
cert_data['intermediates'] = int_file.read()
|
||||
except IOError:
|
||||
pass
|
||||
|
||||
try:
|
||||
with open(filename_pkp, 'r') as pass_file:
|
||||
cert_data['private_key_passphrase'] = pass_file.read()
|
||||
except IOError:
|
||||
pass
|
||||
|
||||
return local_common.LocalCert(**cert_data)
|
||||
|
||||
@staticmethod
|
||||
def delete_cert(cert_ref, **kwargs):
|
||||
"""Deletes the specified cert.
|
||||
|
||||
:param cert_ref: the UUID of the cert to delete
|
||||
|
||||
:raises CertificateStorageException: if certificate deletion fails
|
||||
"""
|
||||
LOG.info(gettextutils._LI(
|
||||
"Deleting certificate {0} from the local filesystem."
|
||||
).format(cert_ref))
|
||||
|
||||
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
|
||||
|
||||
filename_certificate = "{0}.crt".format(filename_base, cert_ref)
|
||||
filename_private_key = "{0}.key".format(filename_base, cert_ref)
|
||||
filename_intermediates = "{0}.int".format(filename_base, cert_ref)
|
||||
filename_pkp = "{0}.pass".format(filename_base, cert_ref)
|
||||
|
||||
try:
|
||||
os.remove(filename_certificate)
|
||||
os.remove(filename_private_key)
|
||||
os.remove(filename_intermediates)
|
||||
os.remove(filename_pkp)
|
||||
except IOError as ioe:
|
||||
LOG.error(gettextutils._LE(
|
||||
"Failed to delete certificate {0}."
|
||||
).format(cert_ref))
|
||||
raise exceptions.CertificateStorageException(message=ioe.message)
|
42
octavia/tests/unit/certificates/common/test_local.py
Normal file
42
octavia/tests/unit/certificates/common/test_local.py
Normal file
@ -0,0 +1,42 @@
|
||||
# Copyright 2014 Rackspace US, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import octavia.certificates.common.local as local_cert
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
|
||||
class TestLocalCommon(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.certificate = "My Certificate"
|
||||
self.intermediates = "My Intermediates"
|
||||
self.private_key = "My Private Key"
|
||||
self.private_key_passphrase = "My Private Key Passphrase"
|
||||
|
||||
super(TestLocalCommon, self).setUp()
|
||||
|
||||
def test_local_cert(self):
|
||||
# Create a cert
|
||||
cert = local_cert.LocalCert(
|
||||
certificate=self.certificate,
|
||||
intermediates=self.intermediates,
|
||||
private_key=self.private_key,
|
||||
private_key_passphrase=self.private_key_passphrase
|
||||
)
|
||||
|
||||
# Validate the cert functions
|
||||
self.assertEqual(cert.get_certificate(), self.certificate)
|
||||
self.assertEqual(cert.get_intermediates(), self.intermediates)
|
||||
self.assertEqual(cert.get_private_key(), self.private_key)
|
||||
self.assertEqual(cert.get_private_key_passphrase(),
|
||||
self.private_key_passphrase)
|
90
octavia/tests/unit/certificates/generator/test_local.py
Normal file
90
octavia/tests/unit/certificates/generator/test_local.py
Normal file
@ -0,0 +1,90 @@
|
||||
# Copyright 2014 Rackspace US, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import datetime
|
||||
|
||||
from OpenSSL import crypto
|
||||
|
||||
import octavia.certificates.generator.local as local_cert_gen
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
|
||||
class TestLocalGenerator(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.signing_digest = "sha256"
|
||||
|
||||
# Set up CSR data
|
||||
csr_key = crypto.PKey()
|
||||
csr_key.generate_key(crypto.TYPE_RSA, 2048)
|
||||
csr = crypto.X509Req()
|
||||
csr.set_pubkey(csr_key)
|
||||
self.certificate_signing_request = crypto.dump_certificate_request(
|
||||
crypto.FILETYPE_PEM, csr
|
||||
)
|
||||
|
||||
# Set up CA data
|
||||
ca_key = crypto.PKey()
|
||||
ca_key.generate_key(crypto.TYPE_RSA, 2048)
|
||||
|
||||
self.ca_private_key_passphrase = "Testing"
|
||||
self.ca_private_key = crypto.dump_privatekey(
|
||||
crypto.FILETYPE_PEM,
|
||||
ca_key,
|
||||
'aes-256-cbc',
|
||||
self.ca_private_key_passphrase
|
||||
)
|
||||
|
||||
ca_cert = crypto.X509()
|
||||
ca_subject = ca_cert.get_subject()
|
||||
ca_subject.C = "US"
|
||||
ca_subject.ST = "Oregon"
|
||||
ca_subject.L = "Springfield"
|
||||
ca_subject.O = "Springfield Nuclear Power Plant"
|
||||
ca_subject.OU = "Section 7-G"
|
||||
ca_subject.CN = "maggie1"
|
||||
ca_cert.set_issuer(ca_cert.get_subject())
|
||||
ca_cert.set_pubkey(ca_key)
|
||||
ca_cert.gmtime_adj_notBefore(0)
|
||||
ca_cert.gmtime_adj_notAfter(2 * 365 * 24 * 60 * 60)
|
||||
ca_cert.sign(ca_key, self.signing_digest)
|
||||
|
||||
self.ca_certificate = crypto.dump_certificate(
|
||||
crypto.FILETYPE_PEM, ca_cert
|
||||
)
|
||||
|
||||
super(TestLocalGenerator, self).setUp()
|
||||
|
||||
def test_sign_cert(self):
|
||||
# Attempt sign a cert
|
||||
signed_cert = local_cert_gen.LocalCertGenerator.sign_cert(
|
||||
csr=self.certificate_signing_request,
|
||||
validity=2 * 365 * 24 * 60 * 60,
|
||||
ca_cert=self.ca_certificate,
|
||||
ca_key=self.ca_private_key,
|
||||
ca_key_pass=self.ca_private_key_passphrase,
|
||||
ca_digest=self.signing_digest
|
||||
)
|
||||
|
||||
self.assertIn("-----BEGIN CERTIFICATE-----", signed_cert)
|
||||
|
||||
# Load the cert for specific tests
|
||||
cert = crypto.load_certificate(crypto.FILETYPE_PEM, signed_cert)
|
||||
|
||||
# Make sure expiry time is accurate
|
||||
expires = datetime.datetime.strptime(cert.get_notAfter(),
|
||||
'%Y%m%d%H%M%SZ')
|
||||
should_expire = (datetime.datetime.utcnow() +
|
||||
datetime.timedelta(seconds=2 * 365 * 24 * 60 * 60))
|
||||
diff = should_expire - expires
|
||||
self.assertTrue(diff < datetime.timedelta(seconds=10))
|
121
octavia/tests/unit/certificates/manager/test_local.py
Normal file
121
octavia/tests/unit/certificates/manager/test_local.py
Normal file
@ -0,0 +1,121 @@
|
||||
# Copyright 2014 Rackspace US, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import os
|
||||
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
from oslo.config import fixture as oslo_fixture
|
||||
|
||||
import octavia.certificates.common.cert as cert
|
||||
import octavia.certificates.manager.local as local_cert_mgr
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
|
||||
class TestLocalGenerator(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.certificate = "My Certificate"
|
||||
self.intermediates = "My Intermediates"
|
||||
self.private_key = "My Private Key"
|
||||
self.private_key_passphrase = "My Private Key Passphrase"
|
||||
|
||||
conf = oslo_fixture.Config(cfg.CONF)
|
||||
conf.config(group="certificates", storage_path="/tmp/")
|
||||
|
||||
super(TestLocalGenerator, self).setUp()
|
||||
|
||||
def _store_cert(self):
|
||||
file_mock = mock.mock_open()
|
||||
# Attempt to store the cert
|
||||
with mock.patch('__builtin__.open', file_mock, create=True):
|
||||
cert_id = local_cert_mgr.LocalCertManager.store_cert(
|
||||
certificate=self.certificate,
|
||||
intermediates=self.intermediates,
|
||||
private_key=self.private_key,
|
||||
private_key_passphrase=self.private_key_passphrase
|
||||
)
|
||||
|
||||
# Check that something came back
|
||||
self.assertIsNotNone(cert_id)
|
||||
|
||||
# Verify the correct files were opened
|
||||
file_mock.assert_has_calls([
|
||||
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'w'),
|
||||
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'w'),
|
||||
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'w'),
|
||||
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'w')
|
||||
], any_order=True)
|
||||
|
||||
# Verify the writes were made
|
||||
file_mock().write.assert_has_calls([
|
||||
mock.call(self.certificate),
|
||||
mock.call(self.intermediates),
|
||||
mock.call(self.private_key),
|
||||
mock.call(self.private_key_passphrase)
|
||||
], any_order=True)
|
||||
|
||||
return cert_id
|
||||
|
||||
def _get_cert(self, cert_id):
|
||||
file_mock = mock.mock_open()
|
||||
# Attempt to retrieve the cert
|
||||
with mock.patch('__builtin__.open', file_mock, create=True):
|
||||
data = local_cert_mgr.LocalCertManager.get_cert(cert_id)
|
||||
|
||||
# Verify the correct files were opened
|
||||
file_mock.assert_has_calls([
|
||||
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'r'),
|
||||
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'r'),
|
||||
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'r'),
|
||||
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'r')
|
||||
], any_order=True)
|
||||
|
||||
# The returned data should be a Cert object
|
||||
self.assertIsInstance(data, cert.Cert)
|
||||
|
||||
return data
|
||||
|
||||
def _delete_cert(self, cert_id):
|
||||
remove_mock = mock.Mock()
|
||||
# Delete the cert
|
||||
with mock.patch('os.remove', remove_mock):
|
||||
local_cert_mgr.LocalCertManager.delete_cert(cert_id)
|
||||
|
||||
# Verify the correct files were removed
|
||||
remove_mock.assert_has_calls([
|
||||
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id))),
|
||||
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id))),
|
||||
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id))),
|
||||
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)))
|
||||
], any_order=True)
|
||||
|
||||
def test_store_cert(self):
|
||||
self._store_cert()
|
||||
|
||||
def test_get_cert(self):
|
||||
# Store a cert
|
||||
cert_id = self._store_cert()
|
||||
|
||||
# Get the cert
|
||||
self._get_cert(cert_id)
|
||||
|
||||
def test_delete_cert(self):
|
||||
# Store a cert
|
||||
cert_id = self._store_cert()
|
||||
|
||||
# Verify the cert exists
|
||||
self._get_cert(cert_id)
|
||||
|
||||
# Delete the cert
|
||||
self._delete_cert(cert_id)
|
@ -30,3 +30,5 @@ oslo.rootwrap>=1.3.0.0a1
|
||||
python-keystoneclient>=0.11.1
|
||||
python-novaclient>=2.17.0
|
||||
posix_ipc
|
||||
|
||||
pyOpenSSL>=0.14
|
||||
|
Loading…
Reference in New Issue
Block a user