Remove dead code around certificate handling

LocalCertManager is unusable because there's no way to get cert data
into the system (the API doesn't accept it) so there's no way we could
store it, which makes it unusable for its original purpose which was to
be a dev tool (it is not suitable for production use in any case).

Barbican does not support certificate generation in a way that makes
sense for us (they do async only) and Anchor will be the way forward.
This driver will never be completed and therefore should be removed.

Change-Id: I78019bc7ad7dffc745055216ed2aace725c58de2
This commit is contained in:
Adam Harwell 2016-02-08 12:43:57 -06:00
parent 25bcbc1cc1
commit 0e78993002
5 changed files with 0 additions and 476 deletions

View File

@ -1,114 +0,0 @@
# Copyright (c) 2014 Rackspace US, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Cert generator implementation for Barbican
"""
import random
import time
from oslo_config import cfg
from stevedore import driver as stevedore_driver
from octavia.certificates.generator import cert_gen
MAX_ATTEMPTS = 10
class BarbicanCertGenerator(cert_gen.CertGenerator):
"""Certificate Generator that wraps the Barbican client API."""
def __init__(self):
super(BarbicanCertGenerator, self).__init__()
self.auth = stevedore_driver.DriverManager(
namespace='octavia.barbican_auth',
name=cfg.CONF.certificates.barbican_auth,
invoke_on_load=True,
).driver
def sign_cert(self, csr, validity):
"""Signs a certificate using our private CA based on the specified CSR.
:param csr: A Certificate Signing Request
:param validity: Valid for <validity> seconds from the current time
:return: Signed certificate
:raises Exception: if certificate signing fails
"""
raise NotImplementedError("Barbican does not yet support signing.")
def _generate_private_key(self, bit_length=2048, passphrase=None,
create_only=False):
"""Generates a private key
:param bit_length: Private key bit length (default 2048)
:param passphrase: Passphrase to use for encrypting the private key
:return: PEM encoded private key
:raises Exception: If private key generation fails
"""
connection = self.auth.get_barbican_client(
cfg.CONF.keystone_authtoken.admin_user)
order = connection.orders.create_asymmetric(
bit_length=bit_length,
algorithm='rsa',
pass_phrase=passphrase,
payload_content_type='application/octet-stream'
)
order.submit()
attempts = 0
while (order.container_ref is None and order.status != 'ERROR'
and attempts < MAX_ATTEMPTS):
backoff = float(1 << attempts) + random.random() * attempts
time.sleep(backoff)
order = connection.orders.get(order.order_ref)
attempts += 1
if order.status != 'ACTIVE':
raise Exception("Barbican failed to generate a private key.")
container = connection.containers.get(order.container_ref)
secret = container.private_key
if not create_only:
try:
pk = secret.payload
except ValueError:
secret = connection.secrets.get(
secret_ref=secret.secret_ref,
payload_content_type='application/octet-stream'
)
pk = secret.payload
return pk
return secret.secret_ref
def _sign_cert_from_stored_key(self, cn, pk_ref, validity):
raise NotImplementedError("Barbican does not yet support signing.")
def generate_cert_key_pair(self, cn, validity, bit_length=2048,
passphrase=None):
# This code will essentially work once Barbican enables CertOrders
# pk = self._generate_private_key(
# bit_length=bit_length,
# passphrase=passphrase,
# create_only=True
# )
# cert_container = self._sign_cert_from_stored_key(cn=cn, pk_ref=pk,
# validity=validity)
# return barbican_common.BarbicanCert(cert_container)
raise NotImplementedError("Barbican does not yet support signing.")

View File

@ -1,161 +0,0 @@
# Copyright (c) 2014 Rackspace US, Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import uuid
from oslo_config import cfg
from oslo_log import log as logging
from octavia.certificates.common import local as local_common
from octavia.certificates.manager import cert_mgr
from octavia.common import exceptions
from octavia.i18n import _LE, _LI
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class LocalCertManager(cert_mgr.CertManager):
"""Cert Manager Interface that stores data locally."""
@staticmethod
def store_cert(project_id, certificate, private_key, intermediates=None,
private_key_passphrase=None, **kwargs):
"""Stores (i.e., registers) a cert with the cert manager.
This method stores the specified cert to the filesystem and returns
a UUID that can be used to retrieve it.
:param project_id: Ignored in this implementation
:param certificate: PEM encoded TLS certificate
:param private_key: private key for the supplied certificate
:param intermediates: ordered and concatenated intermediate certs
:param private_key_passphrase: optional passphrase for the supplied key
:returns: the UUID of the stored cert
:raises CertificateStorageException: if certificate storage fails
"""
cert_ref = str(uuid.uuid4())
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
LOG.info(_LI(
"Storing certificate data on the local filesystem."
))
try:
filename_certificate = "{0}.crt".format(filename_base)
with open(filename_certificate, 'w') as cert_file:
cert_file.write(certificate)
filename_private_key = "{0}.key".format(filename_base)
with open(filename_private_key, 'w') as key_file:
key_file.write(private_key)
if intermediates:
filename_intermediates = "{0}.int".format(filename_base)
with open(filename_intermediates, 'w') as int_file:
int_file.write(intermediates)
if private_key_passphrase:
filename_pkp = "{0}.pass".format(filename_base)
with open(filename_pkp, 'w') as pass_file:
pass_file.write(private_key_passphrase)
except IOError as ioe:
LOG.error(_LE("Failed to store certificate."))
raise exceptions.CertificateStorageException(message=ioe.message)
return cert_ref
@staticmethod
def get_cert(project_id, cert_ref, **kwargs):
"""Retrieves the specified cert.
:param project_id: Ignored in this implementation
:param cert_ref: the UUID of the cert to retrieve
:return: octavia.certificates.common.Cert representation of the
certificate data
:raises CertificateStorageException: if certificate retrieval fails
"""
LOG.info(_LI("Loading certificate %s from the local filesystem."),
cert_ref)
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
filename_private_key = "{0}.key".format(filename_base)
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
cert_data = dict()
try:
with open(filename_certificate, 'r') as cert_file:
cert_data['certificate'] = cert_file.read()
except IOError:
LOG.error(_LE("Failed to read certificate for %s."), cert_ref)
raise exceptions.CertificateStorageException(
msg="Certificate could not be read."
)
try:
with open(filename_private_key, 'r') as key_file:
cert_data['private_key'] = key_file.read()
except IOError:
LOG.error(_LE("Failed to read private key for %s"), cert_ref)
raise exceptions.CertificateStorageException(
msg="Private Key could not be read."
)
try:
with open(filename_intermediates, 'r') as int_file:
cert_data['intermediates'] = int_file.read()
except IOError:
pass
try:
with open(filename_pkp, 'r') as pass_file:
cert_data['private_key_passphrase'] = pass_file.read()
except IOError:
pass
return local_common.LocalCert(**cert_data)
@staticmethod
def delete_cert(project_id, cert_ref, **kwargs):
"""Deletes the specified cert.
:param project_id: Ignored in this implementation
:param cert_ref: the UUID of the cert to delete
:raises CertificateStorageException: if certificate deletion fails
"""
LOG.info(_LI("Deleting certificate %s from the local filesystem."),
cert_ref)
filename_base = os.path.join(CONF.certificates.storage_path, cert_ref)
filename_certificate = "{0}.crt".format(filename_base)
filename_private_key = "{0}.key".format(filename_base)
filename_intermediates = "{0}.int".format(filename_base)
filename_pkp = "{0}.pass".format(filename_base)
try:
os.remove(filename_certificate)
os.remove(filename_private_key)
os.remove(filename_intermediates)
os.remove(filename_pkp)
except IOError as ioe:
LOG.error(_LE("Failed to delete certificate %s"), cert_ref)
raise exceptions.CertificateStorageException(message=ioe.message)

View File

@ -1,70 +0,0 @@
# Copyright 2014 Rackspace US, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
import mock
from OpenSSL import crypto
import octavia.certificates.generator.barbican as barbican_cert_gen
import octavia.tests.unit.base as base
class TestBarbicanGenerator(base.TestCase):
def setUp(self):
# Make a fake Order and contents
self.barbican_endpoint = 'http://localhost:9311/v1'
self.container_uuid = uuid.uuid4()
# TODO(rm_work): fill this section, right now it is placeholder data
self.order_uuid = uuid.uuid4()
self.order_ref = '{0}/orders/{1}'.format(
self.barbican_endpoint, self.container_uuid
)
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 1024)
req = crypto.X509Req()
req.set_pubkey(key)
self.certificate_signing_request = crypto.dump_certificate_request(
crypto.FILETYPE_PEM, req
)
order = mock.Mock()
self.order = order
super(TestBarbicanGenerator, self).setUp()
def test_sign_cert(self):
# TODO(rm_work): Update this test when Barbican supports this, right
# now this is all guesswork
self.skipTest("Barbican does not yet support signing.")
# Mock out the client
bc = mock.MagicMock()
bc.orders.create.return_value = self.order
barbican_cert_gen.BarbicanCertGenerator._barbican_client = bc
# Attempt to order a cert signing
barbican_cert_gen.BarbicanCertGenerator.sign_cert(
csr=self.certificate_signing_request
)
# create order should be called once
# should get back a valid order
bc.orders.create.assert_called_once_with()
def test_generate_cert_key_pair(self):
self.skipTest("Barbican does not yet support signing.")

View File

@ -1,129 +0,0 @@
# Copyright 2014 Rackspace US, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import mock
from oslo_config import cfg
from oslo_config import fixture as oslo_fixture
import six
import octavia.certificates.common.cert as cert
import octavia.certificates.manager.local as local_cert_mgr
import octavia.tests.unit.base as base
if six.PY2:
import __builtin__ as builtins
else:
import builtins
class TestLocalManager(base.TestCase):
def setUp(self):
self.certificate = "My Certificate"
self.intermediates = "My Intermediates"
self.private_key = "My Private Key"
self.private_key_passphrase = "My Private Key Passphrase"
conf = oslo_fixture.Config(cfg.CONF)
conf.config(group="certificates", storage_path="/tmp/")
super(TestLocalManager, self).setUp()
def _store_cert(self):
file_mock = mock.mock_open()
# Attempt to store the cert
with mock.patch.object(builtins, 'open', file_mock):
cert_id = local_cert_mgr.LocalCertManager.store_cert(
None,
certificate=self.certificate,
intermediates=self.intermediates,
private_key=self.private_key,
private_key_passphrase=self.private_key_passphrase
)
# Check that something came back
self.assertIsNotNone(cert_id)
# Verify the correct files were opened
file_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'w'),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'w')
], any_order=True)
# Verify the writes were made
file_mock().write.assert_has_calls([
mock.call(self.certificate),
mock.call(self.intermediates),
mock.call(self.private_key),
mock.call(self.private_key_passphrase)
], any_order=True)
return cert_id
def _get_cert(self, cert_id):
file_mock = mock.mock_open()
# Attempt to retrieve the cert
with mock.patch.object(builtins, 'open', file_mock):
data = local_cert_mgr.LocalCertManager.get_cert(None, cert_id)
# Verify the correct files were opened
file_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id)), 'r'),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)), 'r')
], any_order=True)
# The returned data should be a Cert object
self.assertIsInstance(data, cert.Cert)
return data
def _delete_cert(self, cert_id):
remove_mock = mock.Mock()
# Delete the cert
with mock.patch('os.remove', remove_mock):
local_cert_mgr.LocalCertManager.delete_cert(None, cert_id)
# Verify the correct files were removed
remove_mock.assert_has_calls([
mock.call(os.path.join('/tmp/{0}.crt'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.key'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.int'.format(cert_id))),
mock.call(os.path.join('/tmp/{0}.pass'.format(cert_id)))
], any_order=True)
def test_store_cert(self):
self._store_cert()
def test_get_cert(self):
# Store a cert
cert_id = self._store_cert()
# Get the cert
self._get_cert(cert_id)
def test_delete_cert(self):
# Store a cert
cert_id = self._store_cert()
# Verify the cert exists
self._get_cert(cert_id)
# Delete the cert
self._delete_cert(cert_id)

View File

@ -62,10 +62,8 @@ octavia.network.drivers =
containers_driver = octavia.network.drivers.neutron.containers:ContainersDriver
octavia.cert_generator =
local_cert_generator = octavia.certificates.generator.local:LocalCertGenerator
barbican_cert_generator = octavia.certificates.generator.barbican:BarbicanCertGenerator
anchor_cert_generator = octavia.certificates.generator.anchor:AnchorCertGenerator
octavia.cert_manager =
local_cert_manager = octavia.certificates.manager.local:LocalCertManager
barbican_cert_manager = octavia.certificates.manager.barbican:BarbicanCertManager
octavia.barbican_auth =
barbican_acl_auth = octavia.certificates.common.auth.barbican_acl:BarbicanACLAuth