Merge "Add code to populate CA tables and select plugin based on ca_id"

This commit is contained in:
Jenkins 2015-03-18 17:37:00 +00:00 committed by Gerrit Code Review
commit f231de6426
13 changed files with 620 additions and 6 deletions

View File

@ -86,6 +86,8 @@ class OrderController(object):
def on_put(self, external_project_id, **kwargs):
body = api.load_body(pecan.request,
validator=self.type_order_validator)
project = res.get_or_create_project(external_project_id)
order_type = body.get('type')
if self.order.type != order_type:
@ -97,11 +99,14 @@ class OrderController(object):
if models.States.PENDING != self.order.status:
_order_cannot_be_updated_if_not_pending(self.order.status)
updated_meta = body.get('meta')
validators.validate_ca_id(project.id, updated_meta)
# TODO(chellygel): Put 'meta' into a separate order association
# entity.
self.queue.update_order(order_id=self.order.id,
project_id=external_project_id,
updated_meta=body.get('meta'))
updated_meta=updated_meta)
@index.when(method='DELETE')
@utils.allow_all_content_types
@ -190,6 +195,9 @@ class OrdersController(object):
order_type = body.get('type')
LOG.debug('Processing order type %s', order_type)
if order_type == models.OrderType.CERTIFICATE:
validators.validate_ca_id(project.id, body.get('meta'))
new_order = models.Order()
new_order.meta = body.get('meta')
new_order.type = order_type

View File

@ -380,3 +380,17 @@ class InvalidUUIDInURI(BarbicanHTTPException):
"malformed.")
client_message = u._("The provided UUID in the URI is malformed.")
status_code = 404
class InvalidCAID(BarbicanHTTPException):
message = u._("Invalid CA_ID: %(ca_id)")
client_message = u._("The ca_id provided in the request is invalid")
status_code = 400
class CANotDefinedForProject(BarbicanHTTPException):
message = u._("CA specified by ca_id %(ca_id) not defined for project: "
"%(project_id)")
client_message = u._("The ca_id provided in the request is not defined "
"for this project")
status_code = 403

View File

@ -26,6 +26,7 @@ from barbican.common import exception
from barbican.common import utils
from barbican import i18n as u
from barbican.model import models
from barbican.model import repositories as repo
from barbican.openstack.common import timeutils
from barbican.plugin.interface import secret_store
from barbican.plugin.util import mime_types
@ -58,6 +59,33 @@ def get_invalid_property(validation_error):
return validation_error.schema_path[1]
def validate_ca_id(project_id, order_meta):
ca_id = order_meta.get('ca_id')
if not ca_id:
return
ca_repo = repo.get_ca_repository()
ca = ca_repo.get(ca_id, suppress_exception=True)
if not ca:
raise exception.InvalidCAID(ca_id=ca_id)
project_ca_repo = repo.get_project_ca_repository()
project_cas, offset, limit, total = project_ca_repo.get_by_create_date(
project_id=project_id,
suppress_exception=True
)
if total < 1:
return
for project_ca in project_cas:
if ca.id == project_ca.ca_id:
return
raise exception.CANotDefinedForProject(
ca_id=ca_id,
project_id=project_id)
@six.add_metaclass(abc.ABCMeta)
class ValidatorBase(object):
"""Base class for validators."""

View File

@ -604,6 +604,17 @@ class DogtagCAPlugin(cm.CertificatePluginBase):
except pki.CertNotFoundException:
return None
def get_default_ca_name(self):
return "Dogtag CA"
def get_default_signing_cert(self):
# TODO(alee) Add code to get the signing cert
return None
def get_default_intermediates(self):
# TODO(alee) Add code to get the cert chain
return None
def check_certificate_status(self, order_id, order_meta, plugin_meta,
barbican_meta_dto):
"""Check the status of a certificate request.

View File

@ -21,6 +21,7 @@ implementations. Hence do not place vendor-specific content in this module.
"""
import abc
import datetime
from oslo_config import cfg
import six
@ -29,6 +30,8 @@ from stevedore import named
from barbican.common import exception
import barbican.common.utils as utils
from barbican import i18n as u
from barbican.model import models
from barbican.model import repositories as repos
CONF = cfg.CONF
@ -75,6 +78,7 @@ CONF.register_opts(cert_event_opts, group=cert_event_opt_group)
ERROR_RETRY_MSEC = 300000
RETRY_MSEC = 3600000
CA_INFO_DEFAULT_EXPIRATION_DAYS = 1
CA_PLUGIN_TYPE_DOGTAG = "dogtag"
CA_PLUGIN_TYPE_SYMANTEC = "symantec"
@ -86,6 +90,17 @@ CA_SUBJECT_KEY_IDENTIFIER = "ca_subject_key_identifier"
# field to get the certificate request type
REQUEST_TYPE = "request_type"
# fields for the ca_id, plugin_ca_id
CA_ID = "ca_id"
PLUGIN_CA_ID = "plugin_ca_id"
# fields for ca_info dict keys
INFO_NAME = "name"
INFO_DESCRIPTION = "description"
INFO_CA_SIGNING_CERT = "ca_signing_cert"
INFO_INTERMEDIATES = "intermediates"
INFO_EXPIRATION = "expiration"
class CertificateRequestType(object):
"""Constants to define the certificate request type."""
@ -107,6 +122,14 @@ class CertificatePluginNotFound(exception.BarbicanException):
super(CertificatePluginNotFound, self).__init__(message)
class CertificatePluginNotFoundForCAID(exception.BarbicanException):
"""Raised when no certificate plugin is available for a CA_ID."""
def __init__(self, ca_id):
message = u._(
'Certificate plugin not found for "{ca_id}".').format(ca_id=ca_id)
super(CertificatePluginNotFoundForCAID, self).__init__(message)
class CertificateEventPluginNotFound(exception.BarbicanException):
"""Raised with no certificate event plugin supporting request."""
def __init__(self, plugin_name=None):
@ -210,6 +233,52 @@ class CertificatePluginBase(object):
This class is the base plugin contract for certificates.
"""
@abc.abstractmethod
def get_default_ca_name(self):
"""Get the default CA name
Provides a default CA name to be returned in the default
get_ca_info() method. If get_ca_info() is overridden (to
support multiple CAs for instance), then this method may not
be called. In that case, just implement this method to return
a dummy variable.
If this value is used, it should be unique amongst all the CA
plugins.
:return: The default CA name
:rtype: str
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def get_default_signing_cert(self):
"""Get the default CA signing cert
Provides a default CA signing cert to be returned in the default
get_ca_info() method. If get_ca_info() is overridden (to
support multiple CAs for instance), then this method may not
be called. In that case, just implement this method to return
a dummy variable.
:return: The default CA signing cert
:rtype: str
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def get_default_intermediates(self):
"""Get the default CA certificate chain
Provides a default CA certificate to be returned in the default
get_ca_info() method. If get_ca_info() is overridden (to
support multiple CAs for instance), then this method may not
be called. In that case, just implement this method to return
a dummy variable.
:return: The default CA certificate chain
:rtype: str
"""
raise NotImplementedError # pragma: no cover
@abc.abstractmethod
def issue_certificate_request(self, order_id, order_meta, plugin_meta,
barbican_meta_dto):
@ -317,6 +386,46 @@ class CertificatePluginBase(object):
"""
return [CertificateRequestType.CUSTOM_REQUEST] # pragma: no cover
def get_ca_info(self):
"""Returns information about the CA(s) supported by this plugin.
:returns: dictionary indexed by plugin_ca_id. Each entry consists
of a dictionary of key-value pairs.
An example dictionary containing the current supported attributes
is shown below::
{ "plugin_ca_id1": {
INFO_NAME : "CA name",
INFO_DESCRIPTION : "CA user friendly description",
INFO_CA_SIGNING_CERT : "base 64 encoded signing cert",
INFO_INTERMEDIATES = "base 64 encoded certificate chain"
INFO_EXPIRATION = "ISO formatted UTC datetime for when this"
"data will become stale"
}
}
"""
name = self.get_default_ca_name()
expiration = (datetime.datetime.utcnow() +
datetime.timedelta(days=CA_INFO_DEFAULT_EXPIRATION_DAYS))
default_info = {
INFO_NAME: name,
INFO_DESCRIPTION: "Certificate Authority - {0}".format(name),
INFO_EXPIRATION: expiration.isoformat()
}
signing_cert = self.get_default_signing_cert()
if signing_cert is not None:
default_info[INFO_CA_SIGNING_CERT] = signing_cert
intermediates = self.get_default_intermediates()
if intermediates is not None:
default_info[INFO_INTERMEDIATES] = intermediates
return {name: default_info}
class CertificateStatus(object):
"""Defines statuses for certificate request process.
@ -394,6 +503,7 @@ class BarbicanMetaDTO(object):
class CertificatePluginManager(named.NamedExtensionManager):
def __init__(self, conf=CONF, invoke_on_load=True,
invoke_args=(), invoke_kwargs={}):
self.ca_repo = repos.get_ca_repository()
super(CertificatePluginManager, self).__init__(
conf.certificate.namespace,
conf.certificate.enabled_certificate_plugins,
@ -434,6 +544,71 @@ class CertificatePluginManager(named.NamedExtensionManager):
return ext.obj
raise CertificatePluginNotFound(plugin_name)
def get_plugin_by_ca_id(self, ca_id):
"""Gets a plugin based on the ca_id.
:param ca_id: id for CA in the CertificateAuthorities table
:returns: CertificatePluginBase plugin implementation
"""
ca = self.ca_repo.get(ca_id, suppress_exception=True)
if not ca:
raise CertificatePluginNotFoundForCAID(ca_id)
return self.get_plugin_by_name(ca.plugin_name)
def refresh_ca_table(self):
"""Refreshes the CertificateAuthority table."""
for ext in self.extensions:
plugin_name = utils.generate_fullname_for(ext.obj)
cas, offset, limit, total = self.ca_repo.get_by_create_date(
plugin_name=plugin_name,
suppress_exception=True)
if total < 1:
# if no entries are found, then the plugin has not yet been
# queried or that plugin's entries have expired.
# Most of the time, this will be a no-op for plugins.
self.update_ca_info(ext.obj)
def update_ca_info(self, cert_plugin):
"""Update the CA info for a particular plugin."""
plugin_name = utils.generate_fullname_for(cert_plugin)
new_ca_infos = cert_plugin.get_ca_info()
old_cas, offset, limit, total = self.ca_repo.get_by_create_date(
plugin_name=plugin_name,
suppress_exception=True,
show_expired=True)
for old_ca in old_cas:
plugin_ca_id = old_ca.plugin_ca_id
if plugin_ca_id not in new_ca_infos.keys():
# remove CAs that no longer exist
self._delete_ca(old_ca)
else:
# update those that still exist
self.ca_repo.update_entity(
old_ca,
new_ca_infos[plugin_ca_id])
old_ids = set([ca.plugin_ca_id for ca in old_cas])
new_ids = set(new_ca_infos.keys())
# add new CAs
add_ids = new_ids - old_ids
for add_id in add_ids:
self._add_ca(plugin_name, add_id, new_ca_infos[add_id])
def _add_ca(self, plugin_name, plugin_ca_id, ca_info):
parsed_ca = dict(ca_info)
parsed_ca['plugin_name'] = plugin_name
parsed_ca['plugin_ca_id'] = plugin_ca_id
new_ca = models.CertificateAuthority(parsed_ca)
self.ca_repo.create_from(new_ca)
def _delete_ca(self, ca):
self.ca_repo.delete_entity_by_id(ca.id, None)
class _CertificateEventPluginManager(named.NamedExtensionManager,
CertificateEventPluginBase):

View File

@ -24,6 +24,15 @@ LOG = utils.getLogger(__name__)
class SimpleCertificatePlugin(cert.CertificatePluginBase):
"""Simple/default certificate plugin."""
def get_default_ca_name(self):
return "Simple CA"
def get_default_signing_cert(self):
return "XXXXXXXXXXXXXXXXX"
def get_default_intermediates(self):
return "YYYYYYYYYYYYYYYY"
def issue_certificate_request(self, order_id, order_meta, plugin_meta,
barbican_meta_dto):
"""Create the initial order with CA

View File

@ -59,6 +59,17 @@ class SymantecCertificatePlugin(cert.CertificatePluginBase):
if self.url == None:
raise ValueError(u._("url is required"))
def get_default_ca_name(self):
return "Symantec CA"
def get_default_signing_cert(self):
# TODO(chellygel) Add code to get the signing cert
return None
def get_default_intermediates(self):
# TODO(chellygel) Add code to get the cert chain
return None
def issue_certificate_request(self, order_id, order_meta, plugin_meta,
barbican_meta_dto):
"""Create the initial order with CA

View File

@ -77,8 +77,18 @@ def issue_certificate_request(order_model, project_model):
plugin_meta = _get_plugin_meta(order_model)
barbican_meta_dto = cert.BarbicanMetaDTO()
# Locate a suitable plugin to issue a certificate.
cert_plugin = cert.CertificatePluginManager().get_plugin(order_model.meta)
# refresh the CA table. This is mostly a no-op unless the entries
# for a plugin are expired.
cert.CertificatePluginManager(repos).refresh_ca_table()
ca_id = _get_ca_id(order_model.meta, project_model.id)
if ca_id:
barbican_meta_dto.plugin_ca_id = ca_id
cert_plugin = cert.CertificatePluginManager().get_plugin_by_ca_id(
ca_id)
else:
cert_plugin = cert.CertificatePluginManager().get_plugin(
order_model.meta)
request_type = order_model.meta.get(cert.REQUEST_TYPE)
if request_type == cert.CertificateRequestType.STORED_KEY_REQUEST:
@ -188,6 +198,24 @@ def modify_certificate_request(order_model, updated_meta):
raise NotImplementedError # pragma: no cover
def _get_ca_id(order_meta, project_id):
ca_id = order_meta.get(cert.CA_ID)
if ca_id:
return ca_id
preferred_ca_repository = repos.get_preferred_ca_repository()
cas, offset, limit, total = preferred_ca_repository.get_by_create_date(
project_id=project_id)
if total > 0:
return cas[0].ca_id
global_ca = preferred_ca_repository.get_global_preferred_ca()
if global_ca:
return global_ca.ca_id
return None
def _schedule_cert_retry_task(cert_result_dto, cert_plugin, order_model,
plugin_meta,
retry_method=None,

View File

@ -999,6 +999,51 @@ class WhenCreatingTypeOrdersUsingOrdersResource(FunctionalTest):
self.queue_resource = mock.MagicMock()
self.queue_resource.process_type_order.return_value = None
self.ca_repo = mock.MagicMock()
self.setup_ca_repository_mock(self.ca_repo)
self.project_ca_repo = mock.MagicMock()
self.setup_project_ca_repository_mock(self.project_ca_repo)
self.cert_type = 'certificate'
self.cert_meta = {'request': 'XXXXXXX'}
self.cert_order_req = {'type': self.cert_type,
'meta': self.cert_meta}
self.ca_id = "ca_id1"
parsed_ca = {
'plugin_name': 'plugin_name',
'plugin_ca_id': 'plugin_name ca_id1',
'name': 'plugin name',
'description': 'Master CA for default plugin',
'ca_signing_certificate': 'XXXXX',
'intermediates': 'YYYYY'
}
self.ca = models.CertificateAuthority(parsed_ca)
self.ca.id = self.ca_id
self.ca_repo.get.return_value = self.ca
self.ca_id2 = "ca_id2"
parsed_ca2 = {
'plugin_name': 'plugin_name',
'plugin_ca_id': 'plugin_name ca_id2',
'name': 'plugin name',
'description': 'Master CA for default plugin',
'ca_signing_certificate': 'XXXXX',
'intermediates': 'YYYYY'
}
self.ca2 = models.CertificateAuthority(parsed_ca2)
self.ca2.id = self.ca_id2
self.project_ca_repo.get_by_create_date.return_value = (
[], 0, 4, 0)
self.project_ca = models.ProjectCertificateAuthority(
self.project.id, self.ca_id)
def test_should_add_new_order(self):
resp = self.app.post_json(
'/orders/', self.key_order_req
@ -1012,6 +1057,78 @@ class WhenCreatingTypeOrdersUsingOrdersResource(FunctionalTest):
order = args[0]
self.assertIsInstance(order, models.Order)
def test_should_add_new_cert_order(self):
resp = self.app.post_json(
'/orders/',
self.cert_order_req
)
self.assertEqual(resp.status_int, 202)
self.queue_resource.process_type_order.assert_called_once_with(
order_id=None, project_id=self.external_project_id)
args, kwargs = self.order_repo.create_from.call_args
order = args[0]
self.assertIsInstance(order, models.Order)
def test_should_add_new_cert_order_with_ca_id(self):
self.cert_meta['ca_id'] = self.ca_id
resp = self.app.post_json(
'/orders/',
self.cert_order_req
)
self.assertEqual(resp.status_int, 202)
self.queue_resource.process_type_order.assert_called_once_with(
order_id=None, project_id=self.external_project_id)
args, kwargs = self.order_repo.create_from.call_args
order = args[0]
self.assertIsInstance(order, models.Order)
def test_should_add_new_cert_order_with_ca_id_project_ca_defined(self):
self.cert_meta['ca_id'] = self.ca_id
self.project_ca_repo.get_by_create_date.return_value = (
[self.project_ca], 0, 4, 1)
resp = self.app.post_json(
'/orders/',
self.cert_order_req
)
self.assertEqual(resp.status_int, 202)
self.queue_resource.process_type_order.assert_called_once_with(
order_id=None, project_id=self.external_project_id)
args, kwargs = self.order_repo.create_from.call_args
order = args[0]
self.assertIsInstance(order, models.Order)
def test_should_fail_invalid_ca_id(self):
self.cert_meta['ca_id'] = 'bogus_ca_id'
self.ca_repo.get.return_value = None
resp = self.app.post_json(
'/orders/',
self.cert_order_req,
expect_errors=True
)
self.assertEqual(resp.status_int, 400)
def test_should_fail_ca_not_in_defined_project_ca_ids(self):
self.cert_meta['ca_id'] = self.ca_id2
self.ca_repo.get.return_value = self.ca2
self.project_ca_repo.get_by_create_date.return_value = (
[self.project_ca], 0, 4, 1)
resp = self.app.post_json(
'/orders/',
self.cert_order_req,
expect_errors=True
)
self.assertEqual(resp.status_int, 403)
def test_should_fail_add_new_order_no_secret_json(self):
resp = self.app.post_json(
'/orders/', {},

View File

@ -11,10 +11,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
import testtools
from barbican.model import models
from barbican.plugin.interface import certificate_manager as cm
from barbican.tests import utils
class WhenTestingCertificateEventPluginManager(testtools.TestCase):
@ -99,7 +103,8 @@ class WhenTestingCertificateEventPluginManager(testtools.TestCase):
)
class WhenTestingCertificatePluginManager(testtools.TestCase):
class WhenTestingCertificatePluginManager(utils.BaseTestCase,
utils.MockModelRepositoryMixin):
def setUp(self):
super(WhenTestingCertificatePluginManager, self).setUp()
@ -111,6 +116,43 @@ class WhenTestingCertificatePluginManager(testtools.TestCase):
cm.CertificateRequestType.CUSTOM_REQUEST]
self.plugin_returned.supported_request_types.return_value = types_list
self.plugin_returned.supports.return_value = True
self.plugin_loaded = mock.MagicMock(obj=self.plugin_returned)
expiration = (datetime.datetime.utcnow() + datetime.timedelta(
days=cm.CA_INFO_DEFAULT_EXPIRATION_DAYS))
ca_info = {
cm.INFO_NAME: "my_ca",
cm.INFO_DESCRIPTION: "Certificate Authority my_ca",
cm.INFO_CA_SIGNING_CERT: "Undefined",
cm.INFO_INTERMEDIATES: "Undefined",
cm.INFO_EXPIRATION: expiration.isoformat()
}
self.plugin_returned.get_ca_info.return_value = {
'plugin_ca_id1': ca_info
}
parsed_ca = {
'plugin_name': self.plugin_name,
'plugin_ca_id': 'plugin_ca_id1',
'name': self.plugin_name,
'description': 'Master CA for default plugin',
'ca_signing_certificate': 'ZZZZZ',
'intermediates': 'YYYYY'
}
self.ca = models.CertificateAuthority(parsed_ca)
self.ca.id = 'ca_id'
self.ca_repo = mock.MagicMock()
self.ca_repo.get_by_create_date.return_value = (
self.ca, 0, 1, 1)
self.ca_repo.create_from.return_value = None
self.ca_repo.get.return_value = self.ca
self.project = models.Project()
self.project.id = '12345'
self.setup_ca_repository_mock(self.ca_repo)
self.plugin_loaded = mock.MagicMock(obj=self.plugin_returned)
self.manager = cm.CertificatePluginManager()
self.manager.extensions = [self.plugin_loaded]
@ -119,6 +161,18 @@ class WhenTestingCertificatePluginManager(testtools.TestCase):
self.assertEqual(self.plugin_returned,
self.manager.get_plugin_by_name(self.plugin_name))
def test_get_plugin_by_ca_id(self):
self.assertEqual(self.plugin_returned,
self.manager.get_plugin_by_ca_id('ca_id'))
def test_raises_error_with_no_plugin_by_ca_id_found(self):
self.ca_repo.get.return_value = None
self.assertRaises(
cm.CertificatePluginNotFoundForCAID,
self.manager.get_plugin_by_ca_id,
'any-name-here'
)
def test_raises_error_with_no_plugin_by_name_found(self):
self.manager.extensions = []
self.assertRaises(
@ -154,3 +208,79 @@ class WhenTestingCertificatePluginManager(testtools.TestCase):
self.manager.get_plugin,
self.cert_spec
)
def test_get_plugin_with_ca_to_be_added(self):
self.ca_repo.get_by_create_date.return_value = (
None, 0, 1, 0)
self.assertEqual(self.plugin_returned,
self.manager.get_plugin(self.cert_spec))
def test_refresh_ca_list(self):
utc_now = datetime.datetime.utcnow()
expired_time = utc_now - datetime.timedelta(days=1)
expiration = utc_now + datetime.timedelta(days=1)
ca1_info = {
cm.INFO_NAME: "expired_ca_to_be_modified",
cm.INFO_DESCRIPTION: "expired_ca to be modified",
cm.INFO_CA_SIGNING_CERT: "XXXXXXX-expired-XXXXXX",
cm.INFO_INTERMEDIATES: "YYYYYYY-expired-YYYYYYY",
cm.INFO_EXPIRATION: expired_time.isoformat()
}
ca1_modified_info = {
cm.INFO_NAME: "expired_ca_to_be_modified",
cm.INFO_DESCRIPTION: "expired_ca to be modified",
cm.INFO_CA_SIGNING_CERT: "XXXXXXX-no-longer-expired-XXXXXX",
cm.INFO_INTERMEDIATES: "YYYYYYY-no-longer-expired-YYYYYYY",
cm.INFO_EXPIRATION: expiration.isoformat()
}
ca2_info = {
cm.INFO_NAME: "expired_ca_to_be_deleted",
cm.INFO_DESCRIPTION: "expired ca to be deleted",
cm.INFO_CA_SIGNING_CERT: "XXXX-expired-to-be-deleted-XXXX",
cm.INFO_INTERMEDIATES: "YYYY-expired-to-be-deleted-YYYY",
cm.INFO_EXPIRATION: expired_time.isoformat()
}
ca3_info = {
cm.INFO_NAME: "new-ca-to-be-added",
cm.INFO_DESCRIPTION: "new-ca-to-be-added",
cm.INFO_CA_SIGNING_CERT: "XXXX-to-be-addeed-XXXX",
cm.INFO_INTERMEDIATES: "YYYY-to-be-added-YYYY",
cm.INFO_EXPIRATION: expiration.isoformat()
}
self.plugin_returned.get_ca_info.return_value = {
'plugin_ca_id_ca1': ca1_modified_info,
'plugin_ca_id_ca3': ca3_info
}
parsed_ca1 = dict(ca1_info)
parsed_ca1[cm.PLUGIN_CA_ID] = 'plugin_ca_id_ca1'
parsed_ca1['plugin_name'] = self.plugin_name
ca1 = models.CertificateAuthority(parsed_ca1)
ca1.id = "ca1_id"
parsed_ca2 = dict(ca2_info)
parsed_ca2[cm.PLUGIN_CA_ID] = 'plugin_ca_id_ca2'
parsed_ca2['plugin_name'] = self.plugin_name
ca2 = models.CertificateAuthority(parsed_ca2)
ca2.id = "ca2_id"
side_effect = [(None, 0, 4, 0),
([ca1, ca2], 0, 4, 2)]
self.ca_repo.get_by_create_date.side_effect = side_effect
self.manager.refresh_ca_table()
self.plugin_returned.get_ca_info.assert_called_once()
self.ca_repo.update_entity.assert_called_once_with(
ca1,
ca1_modified_info)
self.ca_repo.delete_entity_by_id.assert_called_once_with(
ca2.id,
None)
self.ca_repo.create_from.assert_called_once()

View File

@ -48,6 +48,14 @@ class WhenTestingSimpleCertificateManagerPlugin(testtools.TestCase):
self.assertTrue(result)
def test_get_ca_info(self):
result = self.plugin.get_ca_info()
name = self.plugin.get_default_ca_name()
self.assertIn(name, result)
self.assertEqual(name, result[name][cm.INFO_NAME])
self.assertEqual(self.plugin.get_default_signing_cert(),
result[name][cm.INFO_CA_SIGNING_CERT])
def test_supported_request_types(self):
result = self.plugin.supported_request_types()
supported_list = [cm.CertificateRequestType.CUSTOM_REQUEST,

View File

@ -173,6 +173,16 @@ class WhenIssuingCertificateRequests(utils.BaseTestCase,
# Set up mocked repos
self.container_repo = mock.MagicMock()
self.secret_repo = mock.MagicMock()
self.ca_repo = mock.MagicMock()
self.preferred_ca_repo = mock.MagicMock()
self.preferred_ca_repo.get_by_create_date.return_value = (
None, 0, 10, 0)
self.preferred_ca_repo.get_global_preferred_ca.return_value = None
self.pref_ca = models.PreferredCertificateAuthority(
self.project_id,
"ca_id")
# Set up mocked repositories
self.setup_container_repository_mock(self.container_repo)
@ -180,6 +190,8 @@ class WhenIssuingCertificateRequests(utils.BaseTestCase,
self.setup_order_plugin_meta_repository_mock()
self.setup_project_secret_repository_mock()
self.setup_secret_repository_mock(self.secret_repo)
self.setup_ca_repository_mock(self.ca_repo)
self.setup_preferred_ca_repository_mock(self.preferred_ca_repo)
def stored_key_side_effect(self, *args, **kwargs):
if args[0] == self.private_key_secret_id:
@ -225,7 +237,7 @@ class WhenIssuingCertificateRequests(utils.BaseTestCase,
self.project_model
)
def test_should_return_for_pyopenssl_stored_key(self):
def _do_pyopenssl_stored_key_request(self):
self.container = models.Container(
self.parsed_container_without_passphrase)
self.container_repo.get.return_value = self.container
@ -244,6 +256,8 @@ class WhenIssuingCertificateRequests(utils.BaseTestCase,
cert_res.issue_certificate_request(self.order_model,
self.project_model)
def test_should_return_for_pyopenssl_stored_key(self):
self._do_pyopenssl_stored_key_request()
self._verify_issue_certificate_plugins_called()
self.assertIsNotNone(
self.order_model.order_barbican_metadata['generated_csr'])
@ -251,6 +265,30 @@ class WhenIssuingCertificateRequests(utils.BaseTestCase,
# TODO(alee-3) Add tests to validate the request based on the validator
# code that dave-mccowan is adding.
def test_should_return_for_openssl_stored_key_ca_id_passed_in(self):
self.stored_key_meta['ca_id'] = "ca1"
self._do_pyopenssl_stored_key_request()
self._verify_issue_certificate_plugins_called()
self.assertIsNotNone(
self.order_model.order_barbican_metadata['generated_csr'])
def test_should_return_for_openssl_stored_key_pref_ca_defined(self):
self.preferred_ca_repo.get_by_create_date.return_value = (
[self.pref_ca], 0, 10, 1)
self._do_pyopenssl_stored_key_request()
self._verify_issue_certificate_plugins_called()
self.assertIsNotNone(
self.order_model.order_barbican_metadata['generated_csr'])
def test_should_return_for_openssl_stored_key_global_ca_defined(self):
self.preferred_ca_repo.get_global_preferred_ca.return_value = (
self.pref_ca
)
self._do_pyopenssl_stored_key_request()
self._verify_issue_certificate_plugins_called()
self.assertIsNotNone(
self.order_model.order_barbican_metadata['generated_csr'])
def test_should_return_for_pyopenssl_stored_key_with_passphrase(self):
self.container = models.Container(
self.parsed_container_with_passphrase)
@ -455,7 +493,8 @@ class WhenIssuingCertificateRequests(utils.BaseTestCase,
def _config_cert_plugin(self):
"""Mock the certificate plugin manager."""
cert_plugin_config = {
'return_value.get_plugin.return_value': self.cert_plugin
'return_value.get_plugin.return_value': self.cert_plugin,
'return_value.get_plugin_by_ca_id.return_value': self.cert_plugin
}
self.cert_plugin_patcher = mock.patch(
'barbican.plugin.interface.certificate_manager'

View File

@ -260,6 +260,42 @@ class MockModelRepositoryMixin(object):
mock_repo_obj=mock_transport_key_repo,
patcher_obj=self.mock_transport_key_repo_patcher)
def setup_ca_repository_mock(self, mock_ca_repo=mock.MagicMock()):
"""Mocks the project repository factory function
:param mock_ca_repo: The pre-configured mock ca repo to be returned.
"""
self.mock_ca_repo_patcher = None
self._setup_repository_mock(repo_factory='get_ca_repository',
mock_repo_obj=mock_ca_repo,
patcher_obj=self.mock_ca_repo_patcher)
def setup_preferred_ca_repository_mock(
self, mock_preferred_ca_repo=mock.MagicMock()):
"""Mocks the project repository factory function
:param mock_preferred_ca_repo: The pre-configured mock project ca repo
to be returned.
"""
self.mock_preferred_ca_repo_patcher = None
self._setup_repository_mock(
repo_factory='get_preferred_ca_repository',
mock_repo_obj=mock_preferred_ca_repo,
patcher_obj=self.mock_preferred_ca_repo_patcher)
def setup_project_ca_repository_mock(
self, mock_project_ca_repo=mock.MagicMock()):
"""Mocks the project repository factory function
:param mock_project_ca_repo: The pre-configured mock project ca repo
to be returned.
"""
self.mock_project_ca_repo_patcher = None
self._setup_repository_mock(
repo_factory='get_project_ca_repository',
mock_repo_obj=mock_project_ca_repo,
patcher_obj=self.mock_project_ca_repo_patcher)
def _setup_repository_mock(self, repo_factory, mock_repo_obj, patcher_obj):
patcher_obj = mock.patch(
'barbican.model.repositories.' + repo_factory,