Set up temp files containing client TLS certs

This sets up three files for clients that accept TLS certs as files:
 * CA Certificate
 * Magnum's Client Private Key
 * Magnum's Client Certificate

The Client Private Key is decrypted as some clients cannot handle
encrypted private keys.

Partially Implements bp secure-docker

Change-Id: I14ca6b1ad520bd8391e119c7e016d765cae32f6b
This commit is contained in:
Andrew Melton 2015-09-30 16:50:27 -04:00
parent 63cd7f4017
commit fde8d4f673
8 changed files with 129 additions and 5 deletions

View File

@ -18,6 +18,8 @@ Certificate manager API
import abc
import six
from magnum.common.x509 import operations
@six.add_metaclass(abc.ABCMeta)
class Cert(object):
@ -38,6 +40,11 @@ class Cert(object):
"""Returns the private key for the certificate."""
pass
def get_decrypted_private_key(self):
"""Returns the decrypted private key for the certificate."""
return operations.decrypt_key(self.get_private_key(),
self.get_private_key_passphrase())
@abc.abstractmethod
def get_private_key_passphrase(self):
"""Returns the passphrase for the private key."""

View File

@ -194,3 +194,15 @@ def sign(csr, issuer_name, ca_key, ca_key_password=None,
).public_bytes(serialization.Encoding.PEM)
return certificate
def decrypt_key(encrypted_key, password):
private_key = serialization.load_pem_private_key(
encrypted_key, password=password, backend=default_backend()
)
decrypted_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
return decrypted_pem

View File

@ -41,5 +41,5 @@ class Handler(object):
def get_ca_certificate(self, context, bay):
ca_cert = cert_manager.get_bay_ca_certificate(bay)
certificate = objects.Certificate.from_object_bay(bay)
certificate.pem = ca_cert
certificate.pem = ca_cert.get_certificate()
return certificate

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import tempfile
from oslo_log import log as logging
import six
@ -92,7 +94,35 @@ def get_bay_ca_certificate(bay):
resource_ref=bay.uuid
)
return ca_cert.get_certificate()
return ca_cert
def get_bay_magnum_cert(bay):
magnum_cert = cert_manager.get_backend().CertManager.get_cert(
bay.magnum_cert_ref,
resource_ref=bay.uuid
)
return magnum_cert
def create_client_files(bay):
ca_cert = get_bay_ca_certificate(bay)
magnum_cert = get_bay_magnum_cert(bay)
ca_cert_file = tempfile.NamedTemporaryFile()
ca_cert_file.write(ca_cert.get_certificate())
ca_cert_file.flush()
magnum_key_file = tempfile.NamedTemporaryFile()
magnum_key_file.write(magnum_cert.get_decrypted_private_key())
magnum_key_file.flush()
magnum_cert_file = tempfile.NamedTemporaryFile()
magnum_cert_file.write(magnum_cert.get_certificate())
magnum_cert_file.flush()
return ca_cert_file, magnum_key_file, magnum_cert_file
def sign_node_certificate(bay, csr):

View File

@ -11,16 +11,43 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import fixture
from magnum.common import cert_manager
from magnum.common.cert_manager import barbican_cert_manager as bcm
from magnum.common.cert_manager import cert_manager as cert_manager_iface
from magnum.common.cert_manager import get_backend
from magnum.common.cert_manager import local_cert_manager as lcm
from magnum.tests import base
class FakeCert(cert_manager_iface.Cert):
def get_certificate(self):
return 'fake-cert'
def get_intermediates(self):
return 'fake-intermediates'
def get_private_key(self):
return 'fake-private-key'
def get_private_key_passphrase(self):
return 'fake-passphrase'
class TestCert(base.BaseTestCase):
@mock.patch.object(cert_manager_iface, 'operations')
def test_get_decrypted_private_key(self, mock_x509_ops):
mock_x509_ops.decrypt_key.return_value = 'fake-key'
fake_cert = FakeCert()
decrypted_key = fake_cert.get_decrypted_private_key()
self.assertEqual(decrypted_key, 'fake-key')
mock_x509_ops.decrypt_key.assert_called_once_with('fake-private-key',
'fake-passphrase')
class TestCertManager(base.BaseTestCase):
def setUp(self):

View File

@ -0,0 +1,47 @@
# Copyright 2015 Rackspace, inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cryptography.hazmat.primitives import serialization
from magnum.common.x509 import operations
from magnum.tests import base
class TestX509Operations(base.BaseTestCase):
def setUp(self):
super(TestX509Operations, self).setUp()
@mock.patch.object(serialization, 'NoEncryption')
@mock.patch.object(operations, 'default_backend')
@mock.patch.object(serialization, 'load_pem_private_key')
def test_decrypt_key(self, mock_load_pem_private_key,
mock_default_backend, mock_no_encryption_class):
mock_private_key = mock.MagicMock()
mock_load_pem_private_key.return_value = mock_private_key
mock_private_key.private_bytes.return_value = mock.sentinel.decrypted
actual_decrypted = operations.decrypt_key(mock.sentinel.key,
mock.sentinel.passphrase)
mock_load_pem_private_key.assert_called_once_with(
mock.sentinel.key, password=mock.sentinel.passphrase,
backend=mock_default_backend.return_value)
mock_private_key.private_bytes.assert_called_once_with(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=mock_no_encryption_class.return_value
)
self.assertEqual(actual_decrypted, mock.sentinel.decrypted)

View File

@ -149,14 +149,13 @@ class CertManagerTestCase(base.BaseTestCase):
mock_bay = mock.MagicMock()
mock_bay.uuid = "mock_bay_uuid"
mock_ca_cert = mock.MagicMock()
mock_ca_cert.get_certificate.return_value = mock.sentinel.certificate
self.CertManager.get_cert.return_value = mock_ca_cert
bay_ca_cert = cert_manager.get_bay_ca_certificate(mock_bay)
self.CertManager.get_cert.assert_called_once_with(
mock_bay.ca_cert_ref, resource_ref=mock_bay.uuid)
self.assertEqual(bay_ca_cert, mock.sentinel.certificate)
self.assertEqual(bay_ca_cert, mock_ca_cert)
def test_delete_certtificate(self):
mock_delete_cert = self.CertManager.delete_cert

View File

@ -46,7 +46,9 @@ class TestSignConductor(base.TestCase):
mock_bay.uuid = 'bay-uuid'
mock_bay.user_id = 'user-id'
mock_bay.project_id = 'project-id'
mock_cert_manager.get_bay_ca_certificate.return_value = 'fake-pem'
mock_cert = mock.MagicMock()
mock_cert.get_certificate.return_value = 'fake-pem'
mock_cert_manager.get_bay_ca_certificate.return_value = mock_cert
actual_cert = self.ca_handler.get_ca_certificate(self.context,
mock_bay)