Merge "Avoid to use common.cert_manager directly"

This commit is contained in:
Jenkins 2015-12-10 01:22:47 +00:00 committed by Gerrit Code Review
commit 6237764bd1
2 changed files with 37 additions and 81 deletions

View File

@ -14,13 +14,11 @@
from tempfile import NamedTemporaryFile
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from oslo_log import log as logging
from magnum.common import cert_manager
from magnum.common.pythonk8sclient.swagger_client import api_client
from magnum.common.pythonk8sclient.swagger_client.apis import apiv_api
from magnum.conductor.handlers.common import cert_manager
from magnum.conductor import utils
from magnum.objects.bay import Bay
@ -72,23 +70,13 @@ class K8sAPI(apiv_api.ApivApi):
:param bay: Bay object
"""
magnum_cert_obj = cert_manager.get_backend().CertManager.get_cert(
bay.magnum_cert_ref, resource_ref=bay.uuid)
magnum_cert_obj = cert_manager.get_bay_magnum_cert(bay)
self.cert_file = self._create_temp_file_with_content(
magnum_cert_obj.get_certificate())
private_key = serialization.load_pem_private_key(
magnum_cert_obj.get_private_key(),
password=magnum_cert_obj.get_private_key_passphrase(),
backend=default_backend(),
)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
private_key = magnum_cert_obj.get_decrypted_private_key()
self.key_file = self._create_temp_file_with_content(
private_key)
ca_cert_obj = cert_manager.get_backend().CertManager.get_cert(
bay.ca_cert_ref, resource_ref=bay.uuid)
ca_cert_obj = cert_manager.get_bay_ca_certificate(bay)
self.ca_file = self._create_temp_file_with_content(
ca_cert_obj.get_certificate())
@ -156,25 +144,15 @@ class K8sAPI_Service(apiv_api.ApivApi):
:param bay: Bay object
"""
magnum_cert_obj = cert_manager.get_backend().CertManager.get_cert(
bay.magnum_cert_ref)
magnum_cert_obj = cert_manager.get_bay_magnum_cert(bay)
self.cert_file = self._create_temp_file_with_content(
magnum_cert_obj.certificate)
private_key = serialization.load_pem_private_key(
magnum_cert_obj.private_key,
password=magnum_cert_obj.private_key_passphrase,
backend=default_backend(),
)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
magnum_cert_obj.get_certificate())
private_key = magnum_cert_obj.get_decrypted_private_key()
self.key_file = self._create_temp_file_with_content(
private_key)
ca_cert_obj = cert_manager.get_backend().CertManager.get_cert(
bay.ca_cert_ref)
ca_cert_obj = cert_manager.get_bay_ca_certificate(bay)
self.ca_file = self._create_temp_file_with_content(
ca_cert_obj.certificate)
ca_cert_obj.get_certificate())
def __del__(self):
if self.ca_file:
@ -240,25 +218,15 @@ class K8sAPI_Pod(apiv_api.ApivApi):
:param bay: Bay object
"""
magnum_cert_obj = cert_manager.get_backend().CertManager.get_cert(
bay.magnum_cert_ref)
magnum_cert_obj = cert_manager.get_bay_magnum_cert(bay)
self.cert_file = self._create_temp_file_with_content(
magnum_cert_obj.certificate)
private_key = serialization.load_pem_private_key(
magnum_cert_obj.private_key,
password=magnum_cert_obj.private_key_passphrase,
backend=default_backend(),
)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
magnum_cert_obj.get_certificate())
private_key = magnum_cert_obj.get_decrypted_private_key()
self.key_file = self._create_temp_file_with_content(
private_key)
ca_cert_obj = cert_manager.get_backend().CertManager.get_cert(
bay.ca_cert_ref)
ca_cert_obj = cert_manager.get_bay_ca_certificate(bay)
self.ca_file = self._create_temp_file_with_content(
ca_cert_obj.certificate)
ca_cert_obj.get_certificate())
def __del__(self):
if self.ca_file:
@ -324,25 +292,15 @@ class K8sAPI_RC(apiv_api.ApivApi):
:param bay: Bay object
"""
magnum_cert_obj = cert_manager.get_backend().CertManager.get_cert(
bay.magnum_cert_ref)
magnum_cert_obj = cert_manager.get_bay_magnum_cert(bay)
self.cert_file = self._create_temp_file_with_content(
magnum_cert_obj.certificate)
private_key = serialization.load_pem_private_key(
magnum_cert_obj.private_key,
password=magnum_cert_obj.private_key_passphrase,
backend=default_backend(),
)
private_key = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
magnum_cert_obj.get_certificate())
private_key = magnum_cert_obj.get_decrypted_private_key()
self.key_file = self._create_temp_file_with_content(
private_key)
ca_cert_obj = cert_manager.get_backend().CertManager.get_cert(
bay.ca_cert_ref)
ca_cert_obj = cert_manager.get_bay_ca_certificate(bay)
self.ca_file = self._create_temp_file_with_content(
ca_cert_obj.certificate)
ca_cert_obj.get_certificate())
def __del__(self):
if self.ca_file:

View File

@ -24,11 +24,13 @@ class TestK8sAPI(base.TestCase):
content_dict = {
'fake-magnum-cert-ref': {
'certificate': 'certificate-content',
'private_key': 'private-key-content'
'private_key': 'private-key-content',
'decrypted_private_key': 'private-key-content',
},
'fake-ca-cert-ref': {
'certificate': 'ca-cert-content',
'private_key': None
'private_key': None,
'decrypted_private_key': None,
}
}
file_dict = {
@ -51,28 +53,32 @@ class TestK8sAPI(base.TestCase):
TestK8sAPI.content_dict[cert_ref]['certificate'])
cert_obj.get_private_key.return_value = (
TestK8sAPI.content_dict[cert_ref]['private_key'])
cert_obj.get_decrypted_private_key.return_value = (
TestK8sAPI.content_dict[cert_ref]['decrypted_private_key'])
return cert_obj
@patch('magnum.conductor.k8s_api.serialization.load_pem_private_key')
@patch(
'magnum.conductor.handlers.common.cert_manager.get_bay_ca_certificate')
@patch('magnum.conductor.handlers.common.cert_manager.get_bay_magnum_cert')
@patch('magnum.conductor.utils.retrieve_bay')
@patch('magnum.common.pythonk8sclient.swagger_client.api_client.ApiClient')
@patch(
'magnum.common.pythonk8sclient.swagger_client.apis.apiv_api.ApivApi')
def _test_create_k8s_api(self, cls,
mock_api_vapi,
mock_api_client,
mock_bay_retrieval,
mock_load_pem_private_key):
mock_get_bay_magnum_cert,
mock_get_bay_ca_cert):
bay_obj = mock.MagicMock()
bay_obj.uuid = 'bay-uuid'
bay_obj.api_address = 'fake-k8s-api-endpoint'
bay_obj.magnum_cert_ref = 'fake-magnum-cert-ref'
bay_obj.ca_cert_ref = 'fake-ca-cert-ref'
mock_bay_retrieval.return_value = bay_obj
mock_private_bytes = mock.MagicMock()
mock_load_pem_private_key.return_value = mock_private_bytes
mock_private_bytes.private_bytes = mock.MagicMock(
return_value='private-key-content')
mock_get_bay_magnum_cert.return_value = self._mock_cert_mgr_get_cert(
'fake-magnum-cert-ref')
mock_get_bay_ca_cert.return_value = self._mock_cert_mgr_get_cert(
'fake-ca-cert-ref')
file_dict = TestK8sAPI.file_dict
for content in file_dict.keys():
@ -91,15 +97,7 @@ class TestK8sAPI(base.TestCase):
with patch(
'magnum.conductor.k8s_api.K8sAPI._create_temp_file_with_content',
side_effect=self._mock_named_file_creation):
with patch(
'magnum.common.cert_manager.local_cert_manager'
'.CertManager.get_cert',
side_effect=self._mock_cert_mgr_get_cert):
with patch(
'magnum.common.cert_manager.barbican_cert_manager'
'.CertManager.get_cert',
side_effect=self._mock_cert_mgr_get_cert):
k8s_api.create_k8s_api(context, obj)
k8s_api.create_k8s_api(context, obj)
if cls is not 'Bay':
mock_bay_retrieval.assert_called_once_with(context, obj.bay_uuid)