Merge "Add initial files for certificate event handling"

This commit is contained in:
Jenkins 2014-09-04 01:54:18 +00:00 committed by Gerrit Code Review
commit 6ebb6b72df
15 changed files with 635 additions and 64 deletions

View File

@ -15,6 +15,7 @@ import pecan
from barbican import api
from barbican.api import controllers
from barbican.common import exception
from barbican.common import hrefs
from barbican.common import resources as res
from barbican.common import utils
from barbican.common import validators
@ -52,8 +53,8 @@ class ContainerConsumerController(object):
dict_fields = consumer.to_dict_fields()
return controllers.hrefs.convert_to_hrefs(
controllers.hrefs.convert_to_hrefs(dict_fields)
return hrefs.convert_to_hrefs(
hrefs.convert_to_hrefs(dict_fields)
)
@ -98,10 +99,10 @@ class ContainerConsumersController(object):
resp_ctrs_overall = {'consumers': [], 'total': total}
else:
resp_ctrs = [
controllers.hrefs.convert_to_hrefs(c.to_dict_fields())
hrefs.convert_to_hrefs(c.to_dict_fields())
for c in consumers
]
resp_ctrs_overall = controllers.hrefs.add_nav_hrefs(
resp_ctrs_overall = hrefs.add_nav_hrefs(
'consumers',
offset,
limit,
@ -169,8 +170,9 @@ class ContainerConsumersController(object):
controllers.containers.container_not_found()
for secret_ref in dict_fields['secret_refs']:
controllers.hrefs.convert_to_hrefs(secret_ref)
hrefs.convert_to_hrefs(secret_ref)
return controllers.hrefs.convert_to_hrefs(
controllers.hrefs.convert_to_hrefs(dict_fields)
# TODO(john-wood-w) Why two calls to convert_to_hrefs()?
return hrefs.convert_to_hrefs(
hrefs.convert_to_hrefs(dict_fields)
)

View File

@ -16,6 +16,7 @@ from barbican import api
from barbican.api import controllers
from barbican.api.controllers import consumers
from barbican.common import exception
from barbican.common import hrefs
from barbican.common import resources as res
from barbican.common import utils
from barbican.common import validators
@ -60,10 +61,10 @@ class ContainerController(object):
dict_fields = container.to_dict_fields()
for secret_ref in dict_fields['secret_refs']:
controllers.hrefs.convert_to_hrefs(secret_ref)
hrefs.convert_to_hrefs(secret_ref)
return controllers.hrefs.convert_to_hrefs(
controllers.hrefs.convert_to_hrefs(dict_fields)
return hrefs.convert_to_hrefs(
hrefs.convert_to_hrefs(dict_fields)
)
@index.when(method='DELETE', template='')
@ -118,15 +119,15 @@ class ContainersController(object):
resp_ctrs_overall = {'containers': [], 'total': total}
else:
resp_ctrs = [
controllers.hrefs.convert_to_hrefs(c.to_dict_fields())
hrefs.convert_to_hrefs(c.to_dict_fields())
for c in containers
]
for ctr in resp_ctrs:
for secret_ref in ctr.get('secret_refs', []):
controllers.hrefs.convert_to_hrefs(secret_ref)
hrefs.convert_to_hrefs(secret_ref)
resp_ctrs_overall = controllers.hrefs.add_nav_hrefs(
resp_ctrs_overall = hrefs.add_nav_hrefs(
'containers',
offset,
limit,
@ -168,5 +169,5 @@ class ContainersController(object):
pecan.response.headers['Location'] = '/{0}/containers/{1}'.format(
keystone_id, new_container.id
)
url = controllers.hrefs.convert_container_to_href(new_container.id)
url = hrefs.convert_container_to_href(new_container.id)
return {'container_ref': url}

View File

@ -14,8 +14,8 @@ import pecan
from barbican import api
from barbican.api import controllers
from barbican.api.controllers import hrefs
from barbican.common import exception
from barbican.common import hrefs
from barbican.common import resources as res
from barbican.common import utils
from barbican.common import validators

View File

@ -17,8 +17,8 @@ import pecan
from barbican import api
from barbican.api import controllers
from barbican.api.controllers import hrefs
from barbican.common import exception
from barbican.common import hrefs
from barbican.common import resources as res
from barbican.common import utils
from barbican.common import validators

View File

@ -18,8 +18,8 @@ import pecan
from barbican import api
from barbican.api import controllers
from barbican.api.controllers import hrefs
from barbican.common import exception
from barbican.common import hrefs
from barbican.common import utils
from barbican.common import validators
from barbican.model import models

View File

@ -32,9 +32,10 @@ from barbican.openstack.common import gettextutils as u
CONF = cfg.CONF
# Configuration for certificate processing plugins:
DEFAULT_PLUGIN_NAMESPACE = 'barbican.certificate.plugin'
#TODO(chellygel): Create a default 'dummy' plugin for certificates.
DEFAULT_PLUGINS = []
DEFAULT_PLUGINS = ['simple_certificate']
cert_opt_group = cfg.OptGroup(name='certificate',
title='Certificate Plugin Options')
@ -51,6 +52,27 @@ cert_opts = [
CONF.register_group(cert_opt_group)
CONF.register_opts(cert_opts, group=cert_opt_group)
# Configuration for certificate eventing plugins:
DEFAULT_EVENT_PLUGIN_NAMESPACE = 'barbican.certificate.event.plugin'
DEFAULT_EVENT_PLUGINS = ['simple_certificate_event']
cert_event_opt_group = cfg.OptGroup(name='certificate_event',
title='Certificate Event Plugin Options')
cert_event_opts = [
cfg.StrOpt('namespace',
default=DEFAULT_EVENT_PLUGIN_NAMESPACE,
help=u._('Extension namespace to search for eventing plugins.')
),
cfg.MultiStrOpt('enabled_certificate_event_plugins',
default=DEFAULT_EVENT_PLUGINS,
help=u._('List of certificate plugins to load.')
)
]
CONF.register_group(cert_event_opt_group)
CONF.register_opts(cert_event_opts, group=cert_event_opt_group)
ERROR_RETRY_MSEC = 300000
RETRY_MSEC = 3600000
@ -63,8 +85,27 @@ CA_SUBJECT_KEY_IDENTIFIER = "ca_subject_key_identifier"
class CertificatePluginNotFound(exception.BarbicanException):
"""Raised when no plugins are installed."""
message = u._("Certificate plugin not found.")
"""Raised when no certificate plugin supporting a request is available."""
def __init__(self, plugin_name=None):
if plugin_name:
message = u._(
"Certificate plugin \"{0}\""
" not found or configured.").format(plugin_name)
else:
message = u._("Certificate plugin not found or configured.")
super(CertificatePluginNotFound, self).__init__(message)
class CertificateEventPluginNotFound(exception.BarbicanException):
"""Raised with no certificate event plugin supporting request."""
def __init__(self, plugin_name=None):
if plugin_name:
message = u._(
"Certificate event plugin "
"\"{0}\" not found or configured.").format(plugin_name)
else:
message = u._("Certificate event plugin not found or configured.")
super(CertificateEventPluginNotFound, self).__init__(message)
class CertificateStatusNotSupported(exception.BarbicanException):
@ -105,6 +146,42 @@ class CertificateStatusInvalidOperation(exception.BarbicanException):
self.reason = reason
@six.add_metaclass(abc.ABCMeta)
class CertificateEventPluginBase(object):
"""Base class for certificate eventing plugins.
This class is the base plugin contract for issuing certificate related
events from Barbican.
"""
@abc.abstractmethod
def notify_certificate_is_ready(
self, project_id, order_ref, container_ref):
"""Notify that a certificate has been generated and is ready to use.
:param project_id: Project/tenant ID associated with this certificate
:param order_ref: HATEOS reference URI to the submitted Barbican Order
:param container_ref: HATEOS reference URI to the Container storing
the certificate
:returns: None
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def notify_ca_is_unavailable(
self, project_id, order_ref, error_msg, retry_in_msec):
"""Notify that the certificate authority (CA) isn't available.
:param project_id: Project/tenant ID associated with this order
:param order_ref: HATEOS reference URI to the submitted Barbican Order
:param error_msg: Error message if it is available
:param retry_in_msec: Delay before attempting to talk to the CA again.
If this is 0, then no attempt will be made.
:returns: None
"""
raise NotImplementedError # pragma: no cover
@six.add_metaclass(abc.ABCMeta)
class CertificatePluginBase(object):
"""Base class for certificate plugins.
@ -172,13 +249,13 @@ class CertificatePluginBase(object):
on their behalf
:returns: A :class:`ResultDTO` instance containing the result
populated by the plugin implementation
:rtype: :class:`ResultDTO`
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def supports(self, certificate_spec):
"""Returns a boolean indicating if the plugin supports the
certificate type.
"""Returns if the plugin supports the certificate type.
:param certificate_spec: Contains details on the certificate to
generate the certificate order
@ -254,4 +331,62 @@ class CertificatePluginManager(named.NamedExtensionManager):
for ext in self.extensions:
if utils.generate_fullname_for(ext.obj) == plugin_name:
return ext.obj
raise CertificatePluginNotFound()
raise CertificatePluginNotFound(plugin_name)
class _CertificateEventPluginManager(named.NamedExtensionManager,
CertificateEventPluginBase):
"""Provides services for certificate event plugins.
This plugin manager differs from others in that it implements the same
contract as the plugins that it manages. This allows eventing operations
to occur on all installed plugins (with this class acting as a composite
plugin), rather than just eventing via an individual plugin.
Each time this class is initialized it will load a new instance
of each enabled plugin. This is undesirable, so rather than initializing a
new instance of this class use the EVENT_PLUGIN_MANAGER at the module
level.
"""
def __init__(self, conf=CONF, invoke_on_load=True,
invoke_args=(), invoke_kwargs={}):
super(_CertificateEventPluginManager, self).__init__(
conf.certificate_event.namespace,
conf.certificate_event.enabled_certificate_event_plugins,
invoke_on_load=invoke_on_load,
invoke_args=invoke_args,
invoke_kwds=invoke_kwargs
)
def get_plugin_by_name(self, plugin_name):
"""Gets a supporting certificate event plugin.
:returns: CertficiateEventPluginBase plugin implementation
"""
for ext in self.extensions:
if utils.generate_fullname_for(ext.obj) == plugin_name:
return ext.obj
raise CertificateEventPluginNotFound(plugin_name)
def notify_certificate_is_ready(
self, project_id, order_ref, container_ref):
self._invoke_certificate_plugins(
'notify_certificate_is_ready',
project_id, order_ref, container_ref)
def notify_ca_is_unavailable(
self, project_id, order_ref, error_msg, retry_in_msec):
self._invoke_certificate_plugins(
'notify_ca_is_unavailable',
project_id, order_ref, error_msg, retry_in_msec)
def _invoke_certificate_plugins(self, method, *args, **kwargs):
"""Invoke same function on plugins as calling function."""
if len(self.extensions) < 1:
raise CertificateEventPluginNotFound()
for ext in self.extensions:
getattr(ext.obj, method)(*args, **kwargs)
EVENT_PLUGIN_MANAGER = _CertificateEventPluginManager()

View File

@ -0,0 +1,129 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Default implementation of Barbican certificate processing plugins and support.
"""
from barbican.common import utils
from barbican.openstack.common import gettextutils as u
from barbican.plugin.interface import certificate_manager as cert
LOG = utils.getLogger(__name__)
class SimpleCertificatePlugin(cert.CertificatePluginBase):
"""Simple/default certificate plugin."""
def issue_certificate_request(self, order_id, order_meta, plugin_meta):
"""Create the initial order with CA
:param order_id: ID associated with the order
:param order_meta: Dict of meta-data associated with the order.
:param plugin_meta: Plugin meta-data previously set by calls to
this plugin. Plugins may also update/add
information here which Barbican will persist
on their behalf.
:returns: A :class:`ResultDTO` instance containing the result
populated by the plugin implementation
:rtype: :class:`ResultDTO`
"""
LOG.info(u._('Invoking issue_certificate_request()'))
return cert.ResultDTO(cert.CertificateStatus.WAITING_FOR_CA)
def modify_certificate_request(self, order_id, order_meta, plugin_meta):
"""Update the order meta-data
:param order_id: ID associated with the order
:param order_meta: Dict of meta-data associated with the order.
:param plugin_meta: Plugin meta-data previously set by calls to
this plugin. Plugins may also update/add
information here which Barbican will persist
on their behalf.
:returns: A :class:`ResultDTO` instance containing the result
populated by the plugin implementation
:rtype: :class:`ResultDTO`
"""
LOG.info(u._('Invoking modify_certificate_request()'))
return cert.ResultDTO(cert.CertificateStatus.WAITING_FOR_CA)
def cancel_certificate_request(self, order_id, order_meta, plugin_meta):
"""Cancel the order
:param order_id: ID associated with the order
:param order_meta: Dict of meta-data associated with the order.
:param plugin_meta: Plugin meta-data previously set by calls to
this plugin. Plugins may also update/add
information here which Barbican will persist
on their behalf.
:returns: A :class:`ResultDTO` instance containing the result
populated by the plugin implementation
:rtype: :class:`ResultDTO`
"""
LOG.info(u._('Invoking cancel_certificate_request()'))
return cert.ResultDTO(cert.CertificateStatus.REQUEST_CANCELED)
def check_certificate_status(self, order_id, order_meta, plugin_meta):
"""Check status of the order
:param order_id: ID associated with the order
:param order_meta: Dict of meta-data associated with the order.
:param plugin_meta: Plugin meta-data previously set by calls to
this plugin. Plugins may also update/add
information here which Barbican will persist
on their behalf.
:returns: A :class:`ResultDTO` instance containing the result
populated by the plugin implementation
:rtype: :class:`ResultDTO`
"""
LOG.info(u._('Invoking check_certificate_status()'))
return cert.ResultDTO(cert.CertificateStatus.WAITING_FOR_CA)
def supports(self, certificate_spec):
"""Returns a boolean indicating if the plugin supports the
certificate type.
:param certificate_spec: Contains details on the certificate to
generate the certificate order
:returns: boolean indicating if the plugin supports the certificate
type
"""
return True
class SimpleCertificateEventPlugin(cert.CertificateEventPluginBase):
"""Simple/default certificate event plugin."""
def notify_certificate_is_ready(
self, project_id, order_ref, container_ref):
"""Notify that a certificate has been generated and is ready to use.
:param project_id: Project/tenant ID associated with this certificate
:param order_ref: HATEOS reference URI to the submitted Barbican Order
:param container_ref: HATEOS reference URI to the Container storing
the certificate
:returns: None
"""
LOG.info(u._('Invoking notify_certificate_is_ready()'))
def notify_ca_is_unavailable(
self, project_id, order_ref, error_msg, retry_in_msec):
"""Notify that the certificate authority (CA) isn't available.
:param project_id: Project/tenant ID associated with this order
:param order_ref: HATEOS reference URI to the submitted Barbican Order
:param error_msg: Error message if it is available
:param retry_in_msec: Delay before attempting to talk to the CA again.
If this is 0, then no attempt will be made.
:returns: None
"""
LOG.info(u._('Invoking notify_ca_is_unavailable()'))

View File

@ -13,11 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from barbican.common import hrefs
import barbican.common.utils as utils
from barbican.model import models
from barbican.plugin.interface import certificate_manager as cert
from barbican.plugin import resources as plugin
# Order sub-status definitions
ORDER_STATUS_REQUEST_PENDING = models.OrderStatus(
"cert_request_pending",
@ -61,7 +63,7 @@ def issue_certificate_request(order_model, tenant_model, repos):
:param: order_model - order associated with this cert request
:param: tenant_model - tenant associated with this request
:param; repos - repos (to be removed)
:param: repos - repos (to be removed)
:returns: container_model - container with the relevant cert if
the request has been completed. None otherwise
"""
@ -98,6 +100,7 @@ def issue_certificate_request(order_model, tenant_model, repos):
_schedule_issue_cert_request(cert_plugin, order_model, plugin_meta,
repos, result, tenant_model,
cert.ERROR_RETRY_MSEC)
_notify_ca_unavailable(order_model, result)
elif cert.CertificateStatus.INVALID_OPERATION == result.status:
_update_order_status(ORDER_STATUS_INVALID_OPERATION)
@ -224,6 +227,15 @@ def _get_plugin_meta(order_model):
return dict()
def _notify_ca_unavailable(order_model, result):
"""Notify observer(s) that the CA was unavailable at this time."""
cert.EVENT_PLUGIN_MANAGER.notify_ca_is_unavailable(
order_model.tenant_id,
hrefs.convert_order_to_href(order_model.id),
result.status_message,
result.retry_msec)
def _save_plugin_metadata(order_model, plugin_meta, repos):
"""Add plugin metadata to an order."""

View File

@ -32,8 +32,8 @@ import webtest
from barbican import api
from barbican.api import app
from barbican.api import controllers
from barbican.api.controllers import hrefs
from barbican.common import exception as excep
from barbican.common import hrefs
from barbican.common import validators
import barbican.context
from barbican.model import models
@ -1759,10 +1759,8 @@ class WhenAddingNavigationHrefs(testtools.TestCase):
offset = 0
limit = 10
data_with_hrefs = controllers.hrefs.add_nav_hrefs(self.resource_name,
offset, limit,
self.num_elements,
self.data)
data_with_hrefs = hrefs.add_nav_hrefs(
self.resource_name, offset, limit, self.num_elements, self.data)
self.assertNotIn('previous', data_with_hrefs)
self.assertIn('next', data_with_hrefs)
@ -1771,10 +1769,8 @@ class WhenAddingNavigationHrefs(testtools.TestCase):
offset = 10
limit = 10
data_with_hrefs = controllers.hrefs.add_nav_hrefs(self.resource_name,
offset, limit,
self.num_elements,
self.data)
data_with_hrefs = hrefs.add_nav_hrefs(
self.resource_name, offset, limit, self.num_elements, self.data)
self.assertIn('previous', data_with_hrefs)
self.assertIn('next', data_with_hrefs)
@ -1783,10 +1779,8 @@ class WhenAddingNavigationHrefs(testtools.TestCase):
offset = 90
limit = 10
data_with_hrefs = controllers.hrefs.add_nav_hrefs(self.resource_name,
offset, limit,
self.num_elements,
self.data)
data_with_hrefs = hrefs.add_nav_hrefs(
self.resource_name, offset, limit, self.num_elements, self.data)
self.assertIn('previous', data_with_hrefs)
self.assertNotIn('next', data_with_hrefs)

View File

@ -0,0 +1,99 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from barbican.plugin.interface import certificate_manager as cm
class WhenTestingCertificateEventPluginManager(testtools.TestCase):
def setUp(self):
super(WhenTestingCertificateEventPluginManager, self).setUp()
self.project_id = '1234'
self.order_ref = 'http://www.mycerts.com/v1/orders/123456'
self.container_ref = 'http://www.mycerts.com/v1/containers/654321'
self.error_msg = 'Something is broken'
self.retry_in_msec = 5432
self.plugin_returned = mock.MagicMock()
self.plugin_name = 'mock.MagicMock'
self.plugin_loaded = mock.MagicMock(obj=self.plugin_returned)
self.manager = cm.EVENT_PLUGIN_MANAGER
self.manager.extensions = [self.plugin_loaded]
def test_get_plugin_by_name(self):
self.assertEqual(self.plugin_returned,
self.manager.get_plugin_by_name(self.plugin_name))
def test_notify_ca_is_unavailable(self):
self.manager.notify_ca_is_unavailable(
self.project_id,
self.order_ref,
self.error_msg,
self.retry_in_msec)
self.plugin_returned.notify_ca_is_unavailable.assert_called_once_with(
self.project_id,
self.order_ref,
self.error_msg,
self.retry_in_msec)
def test_notify_certificate_is_ready(self):
self.manager.notify_certificate_is_ready(
self.project_id,
self.order_ref,
self.container_ref)
pr = self.plugin_returned
pr.notify_certificate_is_ready.assert_called_once_with(
self.project_id,
self.order_ref,
self.container_ref)
def test_invoke_certificate_plugins(self):
self.manager._invoke_certificate_plugins(
'test_invoke_certificate_plugins',
self.project_id,
self.order_ref,
self.container_ref)
# The _invoke_certificate_plugins method should invoke on
# self.plugin_returned the same method by name as the function
# that invoked it...in this case it is this test method.
pr = self.plugin_returned
pr.test_invoke_certificate_plugins.assert_called_once_with(
self.project_id,
self.order_ref,
self.container_ref)
def test_raises_error_with_no_plugin_by_name_found(self):
self.manager.extensions = []
self.assertRaises(
cm.CertificateEventPluginNotFound,
self.manager.get_plugin_by_name,
'any-name-here'
)
def test_raises_error_with_no_plugin_for_invoke_certificate_plugins(self):
self.manager.extensions = []
self.assertRaises(
cm.CertificateEventPluginNotFound,
self.manager._invoke_certificate_plugins,
self.project_id,
self.order_ref,
self.error_msg,
self.retry_in_msec,
)

View File

@ -0,0 +1,66 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import testtools
import barbican.plugin.interface.certificate_manager as cm
import barbican.plugin.simple_certificate_manager as simple
class WhenTestingSimpleCertificateManagerPlugin(testtools.TestCase):
def setUp(self):
super(WhenTestingSimpleCertificateManagerPlugin, self).setUp()
self.plugin = simple.SimpleCertificatePlugin()
def test_issue_certificate_request(self):
result = self.plugin.issue_certificate_request(None, None, None)
self.assertEqual(cm.CertificateStatus.WAITING_FOR_CA, result.status)
def test_check_certificate_status(self):
result = self.plugin.check_certificate_status(None, None, None)
self.assertEqual(cm.CertificateStatus.WAITING_FOR_CA, result.status)
def test_modify_certificate_request(self):
result = self.plugin.modify_certificate_request(None, None, None)
self.assertEqual(cm.CertificateStatus.WAITING_FOR_CA, result.status)
def test_cancel_certificate_request(self):
result = self.plugin.cancel_certificate_request(None, None, None)
self.assertEqual(cm.CertificateStatus.REQUEST_CANCELED, result.status)
def test_supports(self):
result = self.plugin.supports(None)
self.assertTrue(result)
class WhenTestingSimpleCertificateEventManagerPlugin(testtools.TestCase):
def setUp(self):
super(WhenTestingSimpleCertificateEventManagerPlugin, self).setUp()
self.plugin = simple.SimpleCertificateEventPlugin()
def test_notify_ca_is_unavailable(self):
# Test that eventing plugin method does not have side effects such as
# raising exceptions.
self.plugin.notify_ca_is_unavailable(None, None, None, None)
def test_notify_certificate_is_ready(self):
# Test that eventing plugin method does not have side effects such as
# raising exceptions.
self.plugin.notify_certificate_is_ready(None, None, None)

View File

@ -16,14 +16,90 @@
import mock
import testtools
from barbican.common import hrefs
from barbican.plugin.interface import certificate_manager as cert_man
from barbican.tasks import certificate_resources as cert_res
class WhenPerformingPrivateOperations(testtools.TestCase):
"""Tests private methods within certificate_resources.py."""
def test_get_plugin_meta(self):
class Value(object):
def __init__(self, value):
self.value = value
class OrderModel(object):
order_plugin_metadata = {
"foo": Value(1),
"bar": Value(2),
}
order_model = OrderModel()
expected_dict = dict(
(k, v.value) for (k, v) in
order_model.order_plugin_metadata.items())
result = cert_res._get_plugin_meta(order_model)
self._assert_dict_equal(expected_dict, result)
def test_get_plugin_meta_with_empty_dict(self):
result = cert_res._get_plugin_meta(None)
self._assert_dict_equal({}, result)
def test_save_plugin_meta(self):
class Repo(object):
plugin_meta = None
order_model = None
def save(self, plugin_meta, order_model):
self.plugin_meta = plugin_meta
self.order_model = order_model
class Repos(object):
def __init__(self, repo):
self.order_plugin_meta_repo = repo
test_repo = Repo()
repos = Repos(test_repo)
# Test dict for plugin meta data.
test_order_model = 'My order model'
test_plugin_meta = {"foo": 1}
cert_res._save_plugin_metadata(
test_order_model, test_plugin_meta, repos)
self._assert_dict_equal(test_plugin_meta, test_repo.plugin_meta)
self.assertEqual(test_order_model, test_repo.order_model)
# Test None for plugin meta data.
cert_res._save_plugin_metadata(
test_order_model, None, repos)
self._assert_dict_equal({}, test_repo.plugin_meta)
def _assert_dict_equal(self, expected, test):
self.assertIsInstance(expected, dict)
self.assertIsInstance(test, dict)
if expected != test:
if len(expected) != len(test):
self.fail('Expected dict not same size as test dict')
unmatched_items = set(expected.items()) ^ set(test.items())
if len(unmatched_items):
self.fail('One or more items different '
'between the expected and test dicts')
class WhenIssuingCertificateRequests(testtools.TestCase):
"""Tests the 'issue_certificate_request()' function."""
def setUp(self):
super(WhenIssuingCertificateRequests, self).setUp()
self.project_id = "56789"
self.order_id = "12345"
self.order_meta = dict()
self.plugin_meta = dict()
@ -32,44 +108,26 @@ class WhenIssuingCertificateRequests(testtools.TestCase):
)
self.cert_plugin = mock.MagicMock()
self.cert_plugin.issue_certificate_request.return_value = self.result
self.order_model = mock.MagicMock()
self.order_model.id = self.order_id
self.order_model.meta = self.order_meta
self.order_model.tenant_id = self.project_id
self.repos = mock.MagicMock()
self.tenant_model = mock.MagicMock()
# Setting up mock data for the plugin manager.
cert_plugin_config = {
'return_value.get_plugin.return_value': self.cert_plugin
}
self.cert_plugin_patcher = mock.patch(
'barbican.plugin.interface.certificate_manager'
'.CertificatePluginManager',
**cert_plugin_config
)
self.cert_plugin_patcher.start()
self.cert_plugin.issue_certificate_request.return_value = self.result
# Setting up mock data for save plugin meta.
self.save_plugin_meta_patcher = mock.patch(
'barbican.tasks.certificate_resources._save_plugin_metadata'
)
self.mock_save_plugin = self.save_plugin_meta_patcher.start()
# Setting up mock data for get plugin meta.
get_plugin_config = {'return_value': self.plugin_meta}
self.get_plugin_meta_patcher = mock.patch(
'barbican.tasks.certificate_resources._get_plugin_meta',
**get_plugin_config
)
self.get_plugin_meta_patcher.start()
self._config_cert_plugin()
self._config_cert_event_plugin()
self._config_save_meta_plugin()
self._config_get_meta_plugin()
def tearDown(self):
super(WhenIssuingCertificateRequests, self).tearDown()
self.cert_plugin_patcher.stop()
self.save_plugin_meta_patcher.stop()
self.get_plugin_meta_patcher.stop()
self.cert_event_plugin_patcher.stop()
def test_should_return_waiting_for_ca(self):
self.result.status = cert_man.CertificateStatus.WAITING_FOR_CA
@ -100,9 +158,25 @@ class WhenIssuingCertificateRequests(testtools.TestCase):
self.repos
)
def test_should_raise_invalid_operation_seen(self):
self.result.status = cert_man.CertificateStatus.INVALID_OPERATION
self.assertRaises(
cert_man.CertificateStatusInvalidOperation,
cert_res.issue_certificate_request,
self.order_model,
self.tenant_model,
self.repos
)
def test_should_return_ca_unavailable_for_request(self):
retry_msec = 123
status_msg = 'Test status'
self.result.status = (
cert_man.CertificateStatus.CA_UNAVAILABLE_FOR_REQUEST)
self.result.retry_msec = retry_msec
self.result.status_message = status_msg
order_ref = hrefs.convert_order_to_href(self.order_id)
cert_res.issue_certificate_request(self.order_model,
self.tenant_model,
@ -110,6 +184,14 @@ class WhenIssuingCertificateRequests(testtools.TestCase):
self._verify_issue_certificate_plugins_called()
epm = self.cert_event_plugin_patcher.target.EVENT_PLUGIN_MANAGER
epm.notify_ca_is_unavailable.assert_called_once_with(
self.project_id,
order_ref,
status_msg,
retry_msec
)
def test_should_raise_status_not_supported(self):
self.result.status = "Legend of Link"
@ -133,3 +215,39 @@ class WhenIssuingCertificateRequests(testtools.TestCase):
self.plugin_meta,
self.repos
)
def _config_cert_plugin(self):
"""Mock the certificate plugin manager."""
cert_plugin_config = {
'return_value.get_plugin.return_value': self.cert_plugin
}
self.cert_plugin_patcher = mock.patch(
'barbican.plugin.interface.certificate_manager'
'.CertificatePluginManager',
**cert_plugin_config
)
self.cert_plugin_patcher.start()
def _config_cert_event_plugin(self):
"""Mock the certificate event plugin manager."""
self.cert_event_plugin_patcher = mock.patch(
'barbican.plugin.interface.certificate_manager'
'.EVENT_PLUGIN_MANAGER'
)
self.cert_event_plugin_patcher.start()
def _config_save_meta_plugin(self):
"""Mock the save plugin meta function."""
self.save_plugin_meta_patcher = mock.patch(
'barbican.tasks.certificate_resources._save_plugin_metadata'
)
self.mock_save_plugin = self.save_plugin_meta_patcher.start()
def _config_get_meta_plugin(self):
"""Mock the get plugin meta function."""
get_plugin_config = {'return_value': self.plugin_meta}
self.get_plugin_meta_patcher = mock.patch(
'barbican.tasks.certificate_resources._get_plugin_meta',
**get_plugin_config
)
self.get_plugin_meta_patcher.start()

View File

@ -174,3 +174,12 @@ username = 'admin'
password = 'password'
host = localhost
port = 9090
# ================= Certificate plugin ===================
[certificate]
namespace = barbican.certificate.plugin
enabled_certificate_plugins = simple_certificate
[certificate_event]
namespace = barbican.certificate.event.plugin
enabled_certificate_event_plugins = simple_certificate

View File

@ -33,6 +33,12 @@ barbican.secretstore.plugin =
barbican.crypto.plugin =
p11_crypto = barbican.plugin.crypto.p11_crypto:P11CryptoPlugin
simple_crypto = barbican.plugin.crypto.simple_crypto:SimpleCryptoPlugin
barbican.certificate.plugin =
simple_certificate = barbican.plugin.simple_certificate_manager:SimpleCertificatePlugin
symantec = barbican.plugin.symantec:SymantecCertificatePlugin
dogtag = barbican.plugin.dogtag:DogtagCAPlugin
barbican.certificate.event.plugin =
simple_certificate_event = barbican.plugin.simple_certificate_manager:SimpleCertificateEventPlugin
barbican.test.crypto.plugin =
test_crypto = barbican.tests.crypto.test_plugin:TestCryptoPlugin