From 8561bc339fb60275faaf876e03aaf4fb9e03e8a3 Mon Sep 17 00:00:00 2001 From: Fernando Diaz Date: Thu, 4 May 2017 04:47:55 +0000 Subject: [PATCH] Remove Certificate Orders and CAs from API Removes Certificate Orders and CAs from the Barbican API Controller. This patch also removes any tests associated with those controllers. Co-Authored-By: Nam Nguyen Hoai Change-Id: Iead0336a19ce58b8b2bb1f9af5e6dd3688fe91fc --- barbican/api/controllers/cas.py | 499 ------------ barbican/api/controllers/orders.py | 52 -- barbican/api/controllers/versions.py | 2 - barbican/queue/client.py | 9 - barbican/queue/server.py | 13 - barbican/tests/api/controllers/test_cas.py | 556 ------------- barbican/tests/api/controllers/test_orders.py | 494 ----------- barbican/tests/queue/test_client.py | 23 - barbican/tests/queue/test_server.py | 43 - .../api/v1/behaviors/ca_behaviors.py | 183 ----- functionaltests/api/v1/functional/test_cas.py | 717 ---------------- .../v1/functional/test_certificate_orders.py | 767 ------------------ .../api/v1/functional/test_quotas_enforce.py | 73 -- ...s-certificate-orders-96fc47a7acaea273.yaml | 38 + 14 files changed, 38 insertions(+), 3431 deletions(-) delete mode 100644 barbican/api/controllers/cas.py delete mode 100644 barbican/tests/api/controllers/test_cas.py delete mode 100644 functionaltests/api/v1/behaviors/ca_behaviors.py delete mode 100644 functionaltests/api/v1/functional/test_cas.py delete mode 100644 functionaltests/api/v1/functional/test_certificate_orders.py create mode 100644 releasenotes/notes/removing-cas-certificate-orders-96fc47a7acaea273.yaml diff --git a/barbican/api/controllers/cas.py b/barbican/api/controllers/cas.py deleted file mode 100644 index 923abf2bd..000000000 --- a/barbican/api/controllers/cas.py +++ /dev/null @@ -1,499 +0,0 @@ -# Copyright (c) 2014 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import versionutils -import pecan -from six.moves.urllib import parse - -from barbican import api -from barbican.api import controllers -from barbican.common import exception as excep -from barbican.common import hrefs -from barbican.common import quota -from barbican.common import resources as res -from barbican.common import utils -from barbican.common import validators -from barbican import i18n as u -from barbican.model import models -from barbican.model import repositories as repo -from barbican.tasks import certificate_resources as cert_resources - -LOG = utils.getLogger(__name__) - -_DEPRECATION_MSG = '%s has been deprecated in the Newton release. ' \ - 'It will be removed in the Pike release.' - - -def _certificate_authority_not_found(): - """Throw exception indicating certificate authority not found.""" - pecan.abort(404, u._('Not Found. CA not found.')) - - -def _certificate_authority_attribute_not_found(): - """Throw exception indicating CA attribute was not found.""" - pecan.abort(404, u._('Not Found. CA attribute not found.')) - - -def _ca_not_in_project(): - """Throw exception certificate authority is not in project.""" - pecan.abort(404, u._('Not Found. CA not in project.')) - - -def _requested_preferred_ca_not_a_project_ca(): - """Throw exception indicating that preferred CA is not a project CA.""" - pecan.abort( - 400, - u._('Cannot set CA as a preferred CA as it is not a project CA.') - ) - - -def _cant_remove_preferred_ca_from_project(): - pecan.abort( - 409, - u._('Please change the preferred CA to a different project CA ' - 'before removing it.') - ) - - -class CertificateAuthorityController(controllers.ACLMixin): - """Handles certificate authority retrieval requests.""" - - def __init__(self, ca): - LOG.debug('=== Creating CertificateAuthorityController ===') - msg = _DEPRECATION_MSG % "Certificate Authorities API" - versionutils.report_deprecated_feature(LOG, msg) - self.ca = ca - self.ca_repo = repo.get_ca_repository() - self.project_ca_repo = repo.get_project_ca_repository() - self.preferred_ca_repo = repo.get_preferred_ca_repository() - self.project_repo = repo.get_project_repository() - - def __getattr__(self, name): - route_table = { - 'add-to-project': self.add_to_project, - 'remove-from-project': self.remove_from_project, - 'set-global-preferred': self.set_global_preferred, - 'set-preferred': self.set_preferred, - } - if name in route_table: - return route_table[name] - raise AttributeError - - @pecan.expose() - def _lookup(self, attribute, *remainder): - _certificate_authority_attribute_not_found() - - @pecan.expose(generic=True) - def index(self, **kwargs): - pecan.abort(405) # HTTP 405 Method Not Allowed as default - - @index.when(method='GET', template='json') - @controllers.handle_exceptions(u._('Certificate Authority retrieval')) - @controllers.enforce_rbac('certificate_authority:get') - def on_get(self, external_project_id): - LOG.debug("== Getting certificate authority for %s", self.ca.id) - return self.ca.to_dict_fields() - - @pecan.expose() - @utils.allow_all_content_types - @controllers.handle_exceptions(u._('CA Signing Cert retrieval')) - @controllers.enforce_rbac('certificate_authority:get_cacert') - def cacert(self, external_project_id): - LOG.debug("== Getting signing cert for %s", self.ca.id) - cacert = self.ca.ca_meta['ca_signing_certificate'].value - return cacert - - @pecan.expose() - @controllers.handle_exceptions(u._('CA Cert Chain retrieval')) - @controllers.enforce_rbac('certificate_authority:get_ca_cert_chain') - def intermediates(self, external_project_id): - LOG.debug("== Getting CA Cert Chain for %s", self.ca.id) - cert_chain = self.ca.ca_meta['intermediates'].value - return cert_chain - - @pecan.expose(template='json') - @controllers.handle_exceptions(u._('CA projects retrieval')) - @controllers.enforce_rbac('certificate_authority:get_projects') - def projects(self, external_project_id): - LOG.debug("== Getting Projects for %s", self.ca.id) - project_cas = self.ca.project_cas - if not project_cas: - ca_projects_resp = {'projects': []} - else: - project_list = [] - for p in project_cas: - project_list.append(p.project.external_id) - - ca_projects_resp = {'projects': project_list} - - return ca_projects_resp - - @pecan.expose() - @utils.allow_all_content_types - @controllers.handle_exceptions(u._('Add CA to project')) - @controllers.enforce_rbac('certificate_authority:add_to_project') - def add_to_project(self, external_project_id): - if pecan.request.method != 'POST': - pecan.abort(405) - - LOG.debug("== Saving CA %s to external_project_id %s", - self.ca.id, external_project_id) - project_model = res.get_or_create_project(external_project_id) - - # CA must be a base CA or a subCA owned by this project - if (self.ca.project_id is not None and - self.ca.project_id != project_model.id): - raise excep.UnauthorizedSubCA() - - project_cas = project_model.cas - num_cas = len(project_cas) - for project_ca in project_cas: - if project_ca.ca_id == self.ca.id: - # project already added - return - - project_ca = models.ProjectCertificateAuthority( - project_model.id, self.ca.id) - self.project_ca_repo.create_from(project_ca) - - if num_cas == 0: - # set first project CA to be the preferred one - preferred_ca = models.PreferredCertificateAuthority( - project_model.id, self.ca.id) - self.preferred_ca_repo.create_from(preferred_ca) - - @pecan.expose() - @utils.allow_all_content_types - @controllers.handle_exceptions(u._('Remove CA from project')) - @controllers.enforce_rbac('certificate_authority:remove_from_project') - def remove_from_project(self, external_project_id): - if pecan.request.method != 'POST': - pecan.abort(405) - - LOG.debug("== Removing CA %s from project_external_id %s", - self.ca.id, external_project_id) - - project_model = res.get_or_create_project(external_project_id) - (project_ca, __offset, __limit, __total) = ( - self.project_ca_repo.get_by_create_date( - project_id=project_model.id, - ca_id=self.ca.id, - suppress_exception=True)) - - if project_ca: - self._do_remove_from_project(project_ca[0]) - else: - _ca_not_in_project() - - def _do_remove_from_project(self, project_ca): - project_id = project_ca.project_id - ca_id = project_ca.ca_id - preferred_ca = self.preferred_ca_repo.get_project_entities( - project_id)[0] - if cert_resources.is_last_project_ca(project_id): - self.preferred_ca_repo.delete_entity_by_id(preferred_ca.id, None) - else: - self._assert_is_not_preferred_ca(preferred_ca.ca_id, ca_id) - - self.project_ca_repo.delete_entity_by_id(project_ca.id, None) - - def _assert_is_not_preferred_ca(self, preferred_ca_id, ca_id): - if preferred_ca_id == ca_id: - _cant_remove_preferred_ca_from_project() - - @pecan.expose() - @controllers.handle_exceptions(u._('Set preferred project CA')) - @controllers.enforce_rbac('certificate_authority:set_preferred') - def set_preferred(self, external_project_id): - if pecan.request.method != 'POST': - pecan.abort(405) - - LOG.debug("== Setting preferred CA %s for project %s", - self.ca.id, external_project_id) - - project_model = res.get_or_create_project(external_project_id) - - (project_ca, __offset, __limit, __total) = ( - self.project_ca_repo.get_by_create_date( - project_id=project_model.id, - ca_id=self.ca.id, - suppress_exception=True)) - if not project_ca: - _requested_preferred_ca_not_a_project_ca() - - self.preferred_ca_repo.create_or_update_by_project_id( - project_model.id, self.ca.id) - - @pecan.expose() - @utils.allow_all_content_types - @controllers.handle_exceptions(u._('Set global preferred CA')) - @controllers.enforce_rbac('certificate_authority:set_global_preferred') - def set_global_preferred(self, external_project_id): - if pecan.request.method != 'POST': - pecan.abort(405) - - LOG.debug("== Set global preferred CA %s", self.ca.id) - project = res.get_or_create_global_preferred_project() - self.preferred_ca_repo.create_or_update_by_project_id( - project.id, self.ca.id) - - @index.when(method='DELETE') - @utils.allow_all_content_types - @controllers.handle_exceptions(u._('CA deletion')) - @controllers.enforce_rbac('certificate_authority:delete') - def on_delete(self, external_project_id, **kwargs): - cert_resources.delete_subordinate_ca(external_project_id, self.ca) - LOG.info('Deleted CA for project: %s', external_project_id) - - -class CertificateAuthoritiesController(controllers.ACLMixin): - """Handles certificate authority list requests.""" - - def __init__(self): - LOG.debug('Creating CertificateAuthoritiesController') - msg = _DEPRECATION_MSG % "Certificate Authorities API" - versionutils.report_deprecated_feature(LOG, msg) - self.ca_repo = repo.get_ca_repository() - self.project_ca_repo = repo.get_project_ca_repository() - self.preferred_ca_repo = repo.get_preferred_ca_repository() - self.project_repo = repo.get_project_repository() - self.validator = validators.NewCAValidator() - self.quota_enforcer = quota.QuotaEnforcer('cas', self.ca_repo) - # Populate the CA table at start up - cert_resources.refresh_certificate_resources() - - def __getattr__(self, name): - route_table = { - 'all': self.get_all, - 'global-preferred': self.get_global_preferred, - 'preferred': self.preferred, - 'unset-global-preferred': self.unset_global_preferred, - } - if name in route_table: - return route_table[name] - raise AttributeError - - @pecan.expose() - def _lookup(self, ca_id, *remainder): - ca = self.ca_repo.get(entity_id=ca_id, suppress_exception=True) - if not ca: - _certificate_authority_not_found() - return CertificateAuthorityController(ca), remainder - - @pecan.expose(generic=True) - def index(self, **kwargs): - pecan.abort(405) # HTTP 405 Method Not Allowed as default - - @index.when(method='GET', template='json') - @controllers.handle_exceptions( - u._('Certificate Authorities retrieval (limited)')) - @controllers.enforce_rbac('certificate_authorities:get_limited') - def on_get(self, external_project_id, **kw): - LOG.debug('Start certificate_authorities on_get (limited)') - - plugin_name = kw.get('plugin_name') - if plugin_name is not None: - plugin_name = parse.unquote_plus(plugin_name) - - plugin_ca_id = kw.get('plugin_ca_id', None) - if plugin_ca_id is not None: - plugin_ca_id = parse.unquote_plus(plugin_ca_id) - - # refresh CA table, in case plugin entries have expired - cert_resources.refresh_certificate_resources() - - project_model = res.get_or_create_project(external_project_id) - - if self._project_cas_defined(project_model.id): - cas, offset, limit, total = self._get_subcas_and_project_cas( - offset=kw.get('offset', 0), - limit=kw.get('limit', None), - plugin_name=plugin_name, - plugin_ca_id=plugin_ca_id, - project_id=project_model.id) - else: - cas, offset, limit, total = self._get_subcas_and_root_cas( - offset=kw.get('offset', 0), - limit=kw.get('limit', None), - plugin_name=plugin_name, - plugin_ca_id=plugin_ca_id, - project_id=project_model.id) - - return self._display_cas(cas, offset, limit, total) - - @pecan.expose(generic=True, template='json') - @controllers.handle_exceptions(u._('Certificate Authorities retrieval')) - @controllers.enforce_rbac('certificate_authorities:get') - def get_all(self, external_project_id, **kw): - LOG.debug('Start certificate_authorities on_get') - - plugin_name = kw.get('plugin_name') - if plugin_name is not None: - plugin_name = parse.unquote_plus(plugin_name) - - plugin_ca_id = kw.get('plugin_ca_id', None) - if plugin_ca_id is not None: - plugin_ca_id = parse.unquote_plus(plugin_ca_id) - - # refresh CA table, in case plugin entries have expired - cert_resources.refresh_certificate_resources() - - project_model = res.get_or_create_project(external_project_id) - - cas, offset, limit, total = self._get_subcas_and_root_cas( - offset=kw.get('offset', 0), - limit=kw.get('limit', None), - plugin_name=plugin_name, - plugin_ca_id=plugin_ca_id, - project_id=project_model.id) - - return self._display_cas(cas, offset, limit, total) - - def _get_project_cas(self, project_id, query_filters): - cas, offset, limit, total = self.project_ca_repo.get_by_create_date( - offset_arg=query_filters.get('offset', 0), - limit_arg=query_filters.get('limit', None), - project_id=project_id, - suppress_exception=True - ) - return cas, offset, limit, total - - def _project_cas_defined(self, project_id): - _cas, _offset, _limit, total = self._get_project_cas(project_id, {}) - return total > 0 - - def _get_subcas_and_project_cas(self, offset, limit, plugin_name, - plugin_ca_id, project_id): - return self.ca_repo.get_by_create_date( - offset_arg=offset, - limit_arg=limit, - plugin_name=plugin_name, - plugin_ca_id=plugin_ca_id, - project_id=project_id, - restrict_to_project_cas=True, - suppress_exception=True) - - def _get_subcas_and_root_cas(self, offset, limit, plugin_name, - plugin_ca_id, project_id): - return self.ca_repo.get_by_create_date( - offset_arg=offset, - limit_arg=limit, - plugin_name=plugin_name, - plugin_ca_id=plugin_ca_id, - project_id=project_id, - restrict_to_project_cas=False, - suppress_exception=True) - - def _display_cas(self, cas, offset, limit, total): - if not cas: - cas_resp_overall = {'cas': [], - 'total': total} - else: - cas_resp = [ - hrefs.convert_certificate_authority_to_href(ca.id) - for ca in cas] - cas_resp_overall = hrefs.add_nav_hrefs('cas', offset, limit, total, - {'cas': cas_resp}) - cas_resp_overall.update({'total': total}) - - return cas_resp_overall - - @pecan.expose(generic=True, template='json') - @controllers.handle_exceptions(u._('Retrieve global preferred CA')) - @controllers.enforce_rbac( - 'certificate_authorities:get_global_preferred_ca') - def get_global_preferred(self, external_project_id, **kw): - LOG.debug('Start certificate_authorities get_global_preferred CA') - - pref_ca = cert_resources.get_global_preferred_ca() - if not pref_ca: - pecan.abort(404, u._("No global preferred CA defined")) - - return { - 'ca_ref': - hrefs.convert_certificate_authority_to_href(pref_ca.ca_id) - } - - @pecan.expose() - @utils.allow_all_content_types - @controllers.handle_exceptions(u._('Unset global preferred CA')) - @controllers.enforce_rbac('certificate_authorities:unset_global_preferred') - def unset_global_preferred(self, external_project_id): - if pecan.request.method != 'POST': - pecan.abort(405) - LOG.debug("== Unsetting global preferred CA") - self._remove_global_preferred_ca(external_project_id) - - def _remove_global_preferred_ca(self, external_project_id): - global_preferred_ca = cert_resources.get_global_preferred_ca() - if global_preferred_ca: - self.preferred_ca_repo.delete_entity_by_id( - global_preferred_ca.id, - external_project_id) - - @pecan.expose(generic=True, template='json') - @utils.allow_all_content_types - @controllers.handle_exceptions(u._('Retrieve project preferred CA')) - @controllers.enforce_rbac('certificate_authorities:get_preferred_ca') - def preferred(self, external_project_id, **kw): - LOG.debug('Start certificate_authorities get' - ' project preferred CA') - - project = res.get_or_create_project(external_project_id) - - pref_ca_id = cert_resources.get_project_preferred_ca_id(project.id) - if not pref_ca_id: - pecan.abort(404, u._("No preferred CA defined for this project")) - - return { - 'ca_ref': - hrefs.convert_certificate_authority_to_href(pref_ca_id) - } - - @index.when(method='POST', template='json') - @controllers.handle_exceptions(u._('CA creation')) - @controllers.enforce_rbac('certificate_authorities:post') - @controllers.enforce_content_types(['application/json']) - def on_post(self, external_project_id, **kwargs): - LOG.debug('Start on_post for project-ID %s:...', - external_project_id) - - data = api.load_body(pecan.request, validator=self.validator) - project = res.get_or_create_project(external_project_id) - - ctxt = controllers._get_barbican_context(pecan.request) - if ctxt: # in authenticated pipeline case, always use auth token user - creator_id = ctxt.user - - self.quota_enforcer.enforce(project) - - new_ca = cert_resources.create_subordinate_ca( - project_model=project, - name=data.get('name'), - description=data.get('description'), - subject_dn=data.get('subject_dn'), - parent_ca_ref=data.get('parent_ca_ref'), - creator_id=creator_id - ) - - url = hrefs.convert_certificate_authority_to_href(new_ca.id) - LOG.debug('URI to sub-CA is %s', url) - - pecan.response.status = 201 - pecan.response.headers['Location'] = url - - LOG.info('Created a sub CA for project: %s', - external_project_id) - - return {'ca_ref': url} diff --git a/barbican/api/controllers/orders.py b/barbican/api/controllers/orders.py index 02ef66aec..f548b8af8 100644 --- a/barbican/api/controllers/orders.py +++ b/barbican/api/controllers/orders.py @@ -10,7 +10,6 @@ # License for the specific language governing permissions and limitations # under the License. -from oslo_log import versionutils import pecan from barbican import api @@ -47,12 +46,6 @@ def _order_update_not_supported(): pecan.abort(405, u._("Order update is not supported.")) -def _order_update_not_supported_for_type(order_type): - """Throw exception that update is not supported.""" - pecan.abort(400, u._("Updates are not supported for order type " - "{0}.").format(order_type)) - - def _order_cannot_be_updated_if_not_pending(order_status): """Throw exception that order cannot be updated if not PENDING.""" pecan.abort(400, u._("Only PENDING orders can be updated. Order is in the" @@ -84,41 +77,6 @@ class OrderController(controllers.ACLMixin): def on_get(self, external_project_id): return hrefs.convert_to_hrefs(self.order.to_dict_fields()) - @index.when(method='PUT') - @controllers.handle_exceptions(u._('Order update')) - @controllers.enforce_rbac('order:put') - @controllers.enforce_content_types(['application/json']) - def on_put(self, external_project_id, **kwargs): - body = api.load_body(pecan.request, - validator=self.type_order_validator) - - project = res.get_or_create_project(external_project_id) - order_type = body.get('type') - - request_id = None - ctxt = controllers._get_barbican_context(pecan.request) - if ctxt and ctxt.request_id: - request_id = ctxt.request_id - - if self.order.type != order_type: - order_cannot_modify_order_type() - - if models.OrderType.CERTIFICATE != self.order.type: - _order_update_not_supported_for_type(order_type) - - if models.States.PENDING != self.order.status: - _order_cannot_be_updated_if_not_pending(self.order.status) - - updated_meta = body.get('meta') - validators.validate_ca_id(project.id, updated_meta) - - # TODO(chellygel): Put 'meta' into a separate order association - # entity. - self.queue.update_order(order_id=self.order.id, - project_id=external_project_id, - updated_meta=updated_meta, - request_id=request_id) - @index.when(method='DELETE') @utils.allow_all_content_types @controllers.handle_exceptions(u._('Order deletion')) @@ -214,16 +172,6 @@ class OrdersController(controllers.ACLMixin): {'order_type': order_type, 'request_type': request_type}) - if order_type == models.OrderType.CERTIFICATE: - msg = _DEPRECATION_MSG % "Certificate Order Resource" - versionutils.report_deprecated_feature(LOG, msg) - validators.validate_ca_id(project.id, body.get('meta')) - if request_type == 'stored-key': - container_ref = order_meta.get('container_ref') - validators.validate_stored_key_rsa_container( - external_project_id, - container_ref, pecan.request) - self.quota_enforcer.enforce(project) new_order = models.Order() diff --git a/barbican/api/controllers/versions.py b/barbican/api/controllers/versions.py index e73259fe8..1a9ac1b76 100644 --- a/barbican/api/controllers/versions.py +++ b/barbican/api/controllers/versions.py @@ -14,7 +14,6 @@ import pecan from six.moves.urllib import parse from barbican.api import controllers -from barbican.api.controllers import cas from barbican.api.controllers import containers from barbican.api.controllers import orders from barbican.api.controllers import quotas @@ -93,7 +92,6 @@ class V1Controller(BaseVersionController): self.orders = orders.OrdersController() self.containers = containers.ContainersController() self.transport_keys = transportkeys.TransportKeysController() - self.cas = cas.CertificateAuthoritiesController() self.quotas = quotas.QuotasController() setattr(self, 'project-quotas', quotas.ProjectsQuotasController()) setattr(self, 'secret-stores', secretstores.SecretStoresController()) diff --git a/barbican/queue/client.py b/barbican/queue/client.py index 7b4eeee38..34c5101ae 100644 --- a/barbican/queue/client.py +++ b/barbican/queue/client.py @@ -46,15 +46,6 @@ class TaskClient(object): project_id=project_id, request_id=request_id) - def update_order(self, order_id, project_id, updated_meta, request_id): - """Update Order.""" - - self._cast('update_order', - order_id=order_id, - project_id=project_id, - updated_meta=updated_meta, - request_id=request_id) - def check_certificate_status(self, order_id, project_id, request_id): """Check the status of a certificate order.""" self._cast('check_certificate_status', diff --git a/barbican/queue/server.py b/barbican/queue/server.py index 7c7b5f3ba..1aab5ff0f 100644 --- a/barbican/queue/server.py +++ b/barbican/queue/server.py @@ -211,19 +211,6 @@ class Tasks(object): return resources.BeginTypeOrder().process_and_suppress_exceptions( order_id, project_id) - @monitored - @transactional - @retryable_order - def update_order(self, context, order_id, project_id, - updated_meta, request_id): - """Update Order.""" - message = "Processing update order: order ID is '%(order)s' and " \ - "request ID is '%(request)s'" - - LOG.info(message, {'order': order_id, 'request': request_id}) - return resources.UpdateOrder().process_and_suppress_exceptions( - order_id, project_id, updated_meta) - @monitored @transactional @retryable_order diff --git a/barbican/tests/api/controllers/test_cas.py b/barbican/tests/api/controllers/test_cas.py deleted file mode 100644 index 973cd1809..000000000 --- a/barbican/tests/api/controllers/test_cas.py +++ /dev/null @@ -1,556 +0,0 @@ -# Copyright (c) 2015 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import mock -from six import moves - -from barbican.common import exception -from barbican.common import hrefs -from barbican.common import resources as res -from barbican.model import models -from barbican.model import repositories -from barbican.tests import utils - -project_repo = repositories.get_project_repository() -ca_repo = repositories.get_ca_repository() -project_ca_repo = repositories.get_project_repository() -preferred_ca_repo = repositories.get_preferred_ca_repository() - - -def create_ca(parsed_ca, id_ref="id"): - """Generate a CA entity instance.""" - ca = models.CertificateAuthority(parsed_ca) - ca.id = id_ref - return ca - - -class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase): - - def test_should_get_list_certificate_authorities(self): - self.app.extra_environ = { - 'barbican.context': self._build_context(self.project_id, - user="user1") - } - self.create_cas(set_project_cas=False) - resp = self.app.get('/cas/', self.params) - - self.assertEqual(self.limit, len(resp.namespace['cas'])) - self.assertIn('previous', resp.namespace) - self.assertIn('next', resp.namespace) - - url_nav_next = self._create_url(self.project_id, - self.offset + self.limit, self.limit) - self.assertEqual(1, resp.body.decode('utf-8').count(url_nav_next)) - - url_nav_prev = self._create_url(self.project_id, - 0, self.limit) - self.assertEqual(1, resp.body.decode('utf-8').count(url_nav_prev)) - - url_hrefs = self._create_url(self.project_id) - self.assertEqual((self.limit + 2), - resp.body.decode('utf-8').count(url_hrefs)) - - def test_response_should_list_subca_and_project_cas(self): - self.create_cas() - self.app.extra_environ = { - 'barbican.context': self._build_context(self.project_id, - user="user1") - } - self.params['limit'] = 100 - self.params['offset'] = 0 - resp = self.app.get('/cas/', self.params) - self.assertIn('total', resp.namespace) - self.assertEqual(3, resp.namespace['total']) - ca_refs = list(resp.namespace['cas']) - for ca_ref in ca_refs: - ca_id = hrefs.get_ca_id_from_ref(ca_ref) - if not ((ca_id in self.project_ca_ids) - or (ca_id == self.subca.id)): - self.fail("Invalid CA reference returned") - - def test_response_should_all_except_subca(self): - self.create_cas() - self.app.extra_environ = { - 'barbican.context': self._build_context("other_project", - user="user1") - } - self.params['limit'] = 100 - self.params['offset'] = 0 - self.params['plugin_name'] = self.plugin_name - resp = self.app.get('/cas/', self.params) - self.assertIn('total', resp.namespace) - self.assertEqual(self.num_cas - 1, resp.namespace['total']) - ca_refs = list(resp.namespace['cas']) - for ca_ref in ca_refs: - ca_id = hrefs.get_ca_id_from_ref(ca_ref) - self.assertNotEqual(ca_id, self.subca.id) - - def test_response_should_all_except_subca_from_all_subresource(self): - self.create_cas() - self.app.extra_environ = { - 'barbican.context': self._build_context("other_project", - user="user1") - } - self.params['limit'] = 100 - self.params['offset'] = 0 - self.params['plugin_name'] = self.plugin_name - resp = self.app.get('/cas/all', self.params) - self.assertIn('total', resp.namespace) - self.assertEqual(self.num_cas - 1, resp.namespace['total']) - ca_refs = list(resp.namespace['cas']) - for ca_ref in ca_refs: - ca_id = hrefs.get_ca_id_from_ref(ca_ref) - self.assertNotEqual(ca_id, self.subca.id) - - def test_response_should_all_from_all_subresource(self): - self.create_cas() - self.app.extra_environ = { - 'barbican.context': self._build_context(self.project_id, - user="user1") - } - self.params['limit'] = 100 - self.params['offset'] = 0 - self.params['plugin_name'] = self.plugin_name - resp = self.app.get('/cas/all', self.params) - self.assertIn('total', resp.namespace) - self.assertEqual(self.num_cas, resp.namespace['total']) - - def test_response_should_all_cas(self): - self.create_cas(set_project_cas=False) - self.app.extra_environ = { - 'barbican.context': self._build_context(self.project_id, - user="user1") - } - self.params['limit'] = 100 - self.params['offset'] = 0 - self.params['plugin_name'] = self.plugin_name - resp = self.app.get('/cas/', self.params) - self.assertIn('total', resp.namespace) - self.assertEqual(self.num_cas, resp.namespace['total']) - - def test_should_get_list_certificate_authorities_with_params(self): - self.create_cas(set_project_cas=False) - self.params['plugin_name'] = self.plugin_name - self.params['plugin_ca_id'] = self.plugin_ca_id + str(1) - self.params['offset'] = 0 - - resp = self.app.get('/cas/', self.params) - - self.assertNotIn('previous', resp.namespace) - self.assertNotIn('next', resp.namespace) - self.assertEqual(1, resp.namespace['total']) - - def test_should_get_with_params_on_all_resource(self): - self.create_cas(set_project_cas=False) - self.params['plugin_name'] = self.plugin_name - self.params['plugin_ca_id'] = self.plugin_ca_id + str(1) - self.params['offset'] = 0 - - resp = self.app.get('/cas/all', self.params) - - self.assertNotIn('previous', resp.namespace) - self.assertNotIn('next', resp.namespace) - self.assertEqual(1, resp.namespace['total']) - - def test_should_handle_no_cas(self): - self.params = {'offset': 0, 'limit': 2, 'plugin_name': 'dummy'} - resp = self.app.get('/cas/', self.params) - self.assertEqual([], resp.namespace.get('cas')) - self.assertEqual(0, resp.namespace.get('total')) - self.assertNotIn('previous', resp.namespace) - self.assertNotIn('next', resp.namespace) - - def test_should_get_global_preferred_ca(self): - self.create_cas() - - resp = self.app.get('/cas/global-preferred') - self.assertEqual( - hrefs.convert_certificate_authority_to_href( - self.global_preferred_ca.id), - resp.namespace['ca_ref']) - - def test_should_get_no_global_preferred_ca(self): - resp = self.app.get('/cas/global-preferred', expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_get_preferred_ca_not_found(self): - self.project = res.get_or_create_project(self.project_id) - project_repo.save(self.project) - resp = self.app.get('/cas/preferred', expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_get_preferred_ca(self): - self.create_cas() - resp = self.app.get('/cas/preferred') - self.assertEqual( - hrefs.convert_certificate_authority_to_href( - self.preferred_ca.id), - resp.namespace['ca_ref']) - - def test_should_get_ca(self): - self.create_cas() - resp = self.app.get('/cas/{0}'.format(self.selected_ca_id)) - self.assertEqual(self.selected_ca_id, - resp.namespace['ca_id']) - self.assertEqual(self.selected_plugin_ca_id, - resp.namespace['plugin_ca_id']) - - def test_should_throw_exception_for_get_when_ca_not_found(self): - self.create_cas() - resp = self.app.get('/cas/bogus_ca_id', expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_get_signing_certificate(self): - self.create_cas() - resp = self.app.get('/cas/{0}/cacert'.format(self.selected_ca_id)) - self.assertEqual(self.selected_signing_cert, resp.body.decode('utf-8')) - - def test_should_raise_for_get_signing_certificate_ca_not_found(self): - resp = self.app.get('/cas/bogus_ca/cacert', expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_get_cert_chain(self): - self.create_cas() - resp = self.app.get('/cas/{0}/intermediates'.format( - self.selected_ca_id)) - - self.assertEqual(self.selected_intermediates, - resp.body.decode('utf-8')) - - def test_should_raise_for_get_cert_chain_ca_not_found(self): - resp = self.app.get('/cas/bogus_ca/intermediates', expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_raise_for_ca_attribute_not_found(self): - self.create_cas() - resp = self.app.get('/cas/{0}/bogus'.format(self.selected_ca_id), - expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_add_to_project(self): - self.create_cas() - resp = self.app.post('/cas/{0}/add-to-project'.format( - self.selected_ca_id)) - self.assertEqual(204, resp.status_int) - # TODO(alee) need more detailed tests here - - def test_should_add_existing_project_ca_to_project(self): - self.create_cas() - resp = self.app.post('/cas/{0}/add-to-project'.format( - self.project_ca_ids[0])) - self.assertEqual(204, resp.status_int) - # TODO(alee) need more detailed tests here - - def test_should_raise_add_to_project_on_ca_not_found(self): - resp = self.app.post( - '/cas/bogus_ca/add-to-project', expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_raise_add_to_project_on_ca_not_owned_by_project(self): - self.create_cas() - self.app.extra_environ = { - 'barbican.context': self._build_context("other_project", - user="user1") - } - resp = self.app.post('/cas/{0}/add-to-project'.format( - self.subca.id), expect_errors=True) - self.assertEqual(403, resp.status_int) - - def test_should_raise_add_to_project_not_post(self): - self.create_cas() - resp = self.app.get( - '/cas/{0}/add_to_project'.format(self.selected_ca_id), - expect_errors=True) - self.assertEqual(405, resp.status_int) - - def test_should_remove_from_project(self): - self.create_cas() - resp = self.app.post('/cas/{0}/remove-from-project'.format( - self.project_ca_ids[0])) - self.assertEqual(204, resp.status_int) - # TODO(alee) need more detailed tests here - - def test_should_raise_remove_from_project_preferred_ca(self): - self.create_cas() - resp = self.app.post('/cas/{0}/remove-from-project'.format( - self.project_ca_ids[1]), - expect_errors=True) - self.assertEqual(409, resp.status_int) - - def test_should_remove_preferred_ca_if_last_project_ca(self): - self.create_cas() - resp = self.app.post('/cas/{0}/remove-from-project'.format( - self.project_ca_ids[0])) - self.assertEqual(204, resp.status_int) - - resp = self.app.post('/cas/{0}/remove-from-project'.format( - self.project_ca_ids[1])) - self.assertEqual(204, resp.status_int) - - def test_should_raise_remove_from_project_not_currently_set(self): - self.create_cas() - resp = self.app.post( - '/cas/{0}/remove-from-project'.format(self.selected_ca_id), - expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_raise_remove_form_project_on_ca_not_found(self): - self.create_cas() - resp = self.app.post('/cas/bogus_ca/remove-from-project', - expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_raise_remove_from_project_not_post(self): - self.create_cas() - resp = self.app.get( - '/cas/{0}/remove-from-project'.format(self.selected_ca_id), - expect_errors=True) - self.assertEqual(405, resp.status_int) - - def test_should_set_preferred_modify_existing(self): - self.create_cas() - self.app.post( - '/cas/{0}/set-preferred'.format(self.project_ca_ids[1])) - - def test_should_raise_set_preferred_ca_not_found(self): - self.create_cas() - resp = self.app.post('/cas/bogus_ca/set-preferred', expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_raise_set_preferred_ca_not_in_project(self): - self.create_cas() - resp = self.app.post( - '/cas/{0}/set-preferred'.format(self.selected_ca_id), - expect_errors=True) - self.assertEqual(400, resp.status_int) - - def test_should_raise_set_preferred_ca_not_post(self): - self.create_cas() - resp = self.app.get( - '/cas/{0}/set-preferred'.format(self.selected_ca_id), - expect_errors=True) - self.assertEqual(405, resp.status_int) - - def test_should_set_global_preferred(self): - self.create_cas() - self.app.post( - '/cas/{0}/set-global-preferred'.format(self.selected_ca_id)) - - def test_should_raise_set_global_preferred_ca_not_found(self): - resp = self.app.post( - '/cas/bogus_ca/set-global-preferred', - expect_errors=True) - self.assertEqual(404, resp.status_int) - - def test_should_raise_set_global_preferred_ca_not_post(self): - self.create_cas() - resp = self.app.get( - '/cas/{0}/set-global-preferred'.format(self.selected_ca_id), - expect_errors=True) - self.assertEqual(405, resp.status_int) - - def test_should_unset_global_preferred(self): - self.create_cas() - resp = self.app.post( - '/cas/unset-global-preferred') - self.assertEqual(204, resp.status_int) - - def test_should_unset_global_preferred_not_post(self): - self.create_cas() - resp = self.app.get( - '/cas/unset-global-preferred', - expect_errors=True) - self.assertEqual(405, resp.status_int) - - def test_should_get_projects(self): - self.create_cas() - resp = self.app.get( - '/cas/{0}/projects'.format(self.project_ca_ids[0])) - self.assertEqual( - self.project.external_id, - resp.namespace['projects'][0]) - - def test_should_get_no_projects(self): - self.create_cas() - resp = self.app.get('/cas/{0}/projects'.format(self.selected_ca_id)) - self.assertEqual([], resp.namespace['projects']) - - def test_should_raise_get_projects_ca_not_found(self): - self.create_cas() - resp = self.app.get( - '/cas/bogus_ca/projects'.format(self.project_ca_ids[0]), - expect_errors=True) - self.assertEqual(404, resp.status_int) - - @mock.patch('barbican.tasks.certificate_resources.create_subordinate_ca') - def test_should_create_subca(self, mocked_task): - self.create_cas() - self.create_subca_request(self.selected_ca_id) - - mocked_task.return_value = models.CertificateAuthority( - self.parsed_subca) - - resp = self.app.post_json( - '/cas', - self.subca_request, - expect_errors=False) - - self.assertEqual(201, resp.status_int) - - def test_should_raise_delete_subca_not_found(self): - self.create_cas() - resp = self.app.delete('/cas/foobar', expect_errors=True) - self.assertEqual(404, resp.status_int) - - @mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca') - def test_should_delete_subca(self, mocked_task): - self.create_cas() - resp = self.app.delete('/cas/' + self.subca.id) - mocked_task.assert_called_once_with(self.project_id, - self.subca) - self.assertEqual(204, resp.status_int) - - @mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca') - def test_should_raise_delete_not_a_subca(self, mocked_task): - self.create_cas() - mocked_task.side_effect = exception.CannotDeleteBaseCA() - resp = self.app.delete('/cas/' + self.subca.id, - expect_errors=True) - mocked_task.assert_called_once_with(self.project_id, - self.subca) - self.assertEqual(403, resp.status_int) - - @mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca') - def test_should_raise_delete_not_authorized(self, mocked_task): - self.create_cas() - mocked_task.side_effect = exception.UnauthorizedSubCA() - resp = self.app.delete('/cas/' + self.subca.id, - expect_errors=True) - mocked_task.assert_called_once_with(self.project_id, - self.subca) - self.assertEqual(403, resp.status_int) - - def create_subca_request(self, parent_ca_id): - self.subca_request = { - 'name': "Subordinate CA", - 'subject_dn': 'cn=subordinate ca signing cert, o=example.com', - 'parent_ca_ref': "https://localhost:9311/cas/" + parent_ca_id - } - - self.parsed_subca = { - 'plugin_name': self.plugin_name, - 'plugin_ca_id': self.plugin_ca_id + '_subca_id', - 'name': self.plugin_name, - 'description': 'Subordinate CA', - 'ca_signing_certificate': 'ZZZZZ...Subordinate...', - 'intermediates': 'YYYYY...subordinate...', - 'parent_ca_id': parent_ca_id - } - - def create_cas(self, set_project_cas=True): - self.project = res.get_or_create_project(self.project_id) - self.global_project = res.get_or_create_global_preferred_project() - project_repo.save(self.project) - self.project_ca_ids = [] - - self.plugin_name = 'default_plugin' - self.plugin_ca_id = 'default_plugin_ca_id_' - self.ca_id = "id1" - - self.num_cas = 10 - self.offset = 2 - self.limit = 4 - self.params = {'offset': self.offset, 'limit': self.limit} - - self._do_create_cas(set_project_cas) - - # create subca for DELETE testing - parsed_ca = { - 'plugin_name': self.plugin_name, - 'plugin_ca_id': self.plugin_ca_id + "subca 1", - 'name': self.plugin_name, - 'description': 'Sub CA for default plugin', - 'ca_signing_certificate': 'ZZZZZ' + "sub ca1", - 'intermediates': 'YYYYY' + "sub ca1", - 'project_id': self.project.id, - 'creator_id': 'user12345' - } - ca = models.CertificateAuthority(parsed_ca) - ca_repo.create_from(ca) - ca_repo.save(ca) - self.subca = ca - - self.num_cas += 1 - - def _do_create_cas(self, set_project_cas): - for ca_id in moves.range(self.num_cas): - parsed_ca = { - 'plugin_name': self.plugin_name, - 'plugin_ca_id': self.plugin_ca_id + str(ca_id), - 'name': self.plugin_name, - 'description': 'Master CA for default plugin', - 'ca_signing_certificate': 'ZZZZZ' + str(ca_id), - 'intermediates': 'YYYYY' + str(ca_id) - } - ca = models.CertificateAuthority(parsed_ca) - ca_repo.create_from(ca) - ca_repo.save(ca) - - if ca_id == 1: - # set global preferred ca - pref_ca = models.PreferredCertificateAuthority( - self.global_project.id, - ca.id) - preferred_ca_repo.create_from(pref_ca) - preferred_ca_repo.save(pref_ca) - self.global_preferred_ca = ca - - if ca_id == 2 and set_project_cas: - # set project CA - project_ca = models.ProjectCertificateAuthority( - self.project.id, ca.id) - project_ca_repo.create_from(project_ca) - project_ca_repo.save(project_ca) - self.project_ca_ids.append(ca.id) - - if ca_id == 3 and set_project_cas: - # set project preferred CA - project_ca = models.ProjectCertificateAuthority( - self.project.id, ca.id) - project_ca_repo.create_from(project_ca) - project_ca_repo.save(project_ca) - self.project_ca_ids.append(ca.id) - - pref_ca = models.PreferredCertificateAuthority( - self.project.id, ca.id) - preferred_ca_repo.create_from(pref_ca) - preferred_ca_repo.save(pref_ca) - self.preferred_ca = ca - - if ca_id == 4: - # set ca for testing GETs for a single CA - self.selected_ca_id = ca.id - self.selected_plugin_ca_id = self.plugin_ca_id + str(ca_id) - self.selected_signing_cert = 'ZZZZZ' + str(ca_id) - self.selected_intermediates = 'YYYYY' + str(ca_id) - - def _create_url(self, external_project_id, offset_arg=None, - limit_arg=None): - if limit_arg: - offset = int(offset_arg) - limit = int(limit_arg) - return '/cas?limit={0}&offset={1}'.format( - limit, offset) - else: - return '/cas' diff --git a/barbican/tests/api/controllers/test_orders.py b/barbican/tests/api/controllers/test_orders.py index 343b9b034..f948a41fc 100644 --- a/barbican/tests/api/controllers/test_orders.py +++ b/barbican/tests/api/controllers/test_orders.py @@ -15,13 +15,8 @@ import os import uuid -import mock - -from barbican.common import resources from barbican.model import models from barbican.model import repositories -from barbican.tests.api.controllers import test_acls -from barbican.tests.api import test_resources_policy as test_policy from barbican.tests import utils from oslo_utils import uuidutils @@ -213,90 +208,6 @@ class WhenGettingOrDeletingOrders(utils.BarbicanAPIBaseTestCase): self.assertEqual(404, resp.status_int) -@utils.parameterized_test_case -class WhenPuttingAnOrderWithMetadata(utils.BarbicanAPIBaseTestCase): - def setUp(self): - # Temporarily mock the queue until we can figure out a better way - # TODO(jvrbanac): Remove dependence on mocks - self.update_order_mock = mock.MagicMock() - repositories.OrderRepo.update_order = self.update_order_mock - - super(WhenPuttingAnOrderWithMetadata, self).setUp() - - def _create_generic_order_for_put(self): - """Create a real order to modify and perform PUT actions on - - This makes sure that a project exists for our order and that there - is an order within the database. This is a little hacky due to issues - testing certificate order types. - """ - # Create generic order - resp, order_uuid = create_order( - self.app, - order_type='key', - meta=generic_key_meta - ) - self.assertEqual(202, resp.status_int) - - # Modify the order in the DB to allow actions to be performed - order_model = order_repo.get(order_uuid, self.project_id) - order_model.type = 'certificate' - order_model.status = models.States.PENDING - order_model.meta = {'nope': 'nothing'} - order_model.save() - - repositories.commit() - - return order_uuid - - def test_putting_on_a_order(self): - order_uuid = self._create_generic_order_for_put() - - body = { - 'type': 'certificate', - 'meta': {'nope': 'thing'} - } - resp = self.app.put_json( - '/orders/{0}'.format(order_uuid), - body, - headers={'Content-Type': 'application/json'} - ) - - self.assertEqual(204, resp.status_int) - self.assertEqual(1, self.update_order_mock.call_count) - - @utils.parameterized_dataset({ - 'bogus_content': ['bogus'], - 'bad_order_type': ['{"type": "secret", "meta": {}}'], - }) - def test_return_400_on_put_with(self, body): - order_uuid = self._create_generic_order_for_put() - resp = self.app.put( - '/orders/{0}'.format(order_uuid), - body, - headers={'Content-Type': 'application/json'}, - expect_errors=True - ) - self.assertEqual(400, resp.status_int) - - def test_return_400_on_put_when_order_is_active(self): - order_uuid = self._create_generic_order_for_put() - - # Put the order in a active state to prevent modification - order_model = order_repo.get(order_uuid, self.project_id) - order_model.status = models.States.ACTIVE - order_model.save() - repositories.commit() - - resp = self.app.put_json( - '/orders/{0}'.format(order_uuid), - {'type': 'certificate', 'meta': {}}, - headers={'Content-Type': 'application/json'}, - expect_errors=True - ) - self.assertEqual(400, resp.status_int) - - class WhenCreatingOrders(utils.BarbicanAPIBaseTestCase): def test_should_add_new_order(self): order_meta = { @@ -328,411 +239,6 @@ class WhenCreatingOrders(utils.BarbicanAPIBaseTestCase): self.assertEqual(415, resp.status_int) -class WhenCreatingCertificateOrders(utils.BarbicanAPIBaseTestCase): - def setUp(self): - super(WhenCreatingCertificateOrders, self).setUp() - self.certificate_meta = { - 'request': 'XXXXXX' - } - # Make sure we have a project - self.project = resources.get_or_create_project(self.project_id) - - # Create CA's in the db - self.available_ca_ids = [] - for i in range(2): - ca_information = { - 'plugin_name': 'plugin_name', - 'plugin_ca_id': 'plugin_name ca_id1', - 'name': 'plugin name', - 'description': 'Master CA for default plugin', - 'ca_signing_certificate': 'XXXXX', - 'intermediates': 'YYYYY' - } - - ca_model = models.CertificateAuthority(ca_information) - ca = ca_repo.create_from(ca_model) - self.available_ca_ids.append(ca.id) - - foreign_project = resources.get_or_create_project('foreign_project') - foreign_ca_information = { - 'project_id': foreign_project.id, - 'plugin_name': 'plugin_name', - 'plugin_ca_id': 'plugin_name ca_id1', - 'name': 'plugin name', - 'description': 'Master CA for default plugin', - 'ca_signing_certificate': 'XXXXX', - 'intermediates': 'YYYYY' - } - foreign_ca_model = models.CertificateAuthority(foreign_ca_information) - foreign_ca = ca_repo.create_from(foreign_ca_model) - self.foreign_ca_id = foreign_ca.id - - repositories.commit() - - def test_can_create_new_cert_order(self): - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=self.certificate_meta - ) - - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_uuid, self.project_id) - self.assertIsInstance(order, models.Order) - - def test_can_add_new_cert_order_with_ca_id(self): - self.certificate_meta['ca_id'] = self.available_ca_ids[0] - - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=self.certificate_meta - ) - - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_uuid, self.project_id) - self.assertIsInstance(order, models.Order) - - def test_can_add_new_cert_order_with_ca_id_project_ca_defined(self): - # Create a Project CA and add it - project_ca_model = models.ProjectCertificateAuthority( - self.project.id, - self.available_ca_ids[0] - ) - project_ca_repo.create_from(project_ca_model) - - repositories.commit() - - # Attempt to create an order - self.certificate_meta['ca_id'] = self.available_ca_ids[0] - - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=self.certificate_meta - ) - - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_uuid, self.project_id) - self.assertIsInstance(order, models.Order) - - def test_create_w_invalid_ca_id_should_fail(self): - self.certificate_meta['ca_id'] = 'bogus_ca_id' - - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=self.certificate_meta, - expect_errors=True - ) - self.assertEqual(400, create_resp.status_int) - - def test_create_should_fail_when_ca_not_in_defined_project_ca_ids(self): - # Create a Project CA and add it - project_ca_model = models.ProjectCertificateAuthority( - self.project.id, - self.available_ca_ids[0] - ) - project_ca_repo.create_from(project_ca_model) - - repositories.commit() - - # Make sure we set the ca_id to an id not defined in the project - self.certificate_meta['ca_id'] = self.available_ca_ids[1] - - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=self.certificate_meta, - expect_errors=True - ) - self.assertEqual(403, create_resp.status_int) - - def test_create_with_wrong_projects_subca_should_fail(self): - self.certificate_meta['ca_id'] = self.foreign_ca_id - - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=self.certificate_meta, - expect_errors=True - ) - self.assertEqual(403, create_resp.status_int) - self.assertIn("not owned", create_resp.json['description']) - - -class WhenCreatingStoredKeyOrders(utils.BarbicanAPIBaseTestCase, - test_policy.BaseTestCase): - def setUp(self): - super(WhenCreatingStoredKeyOrders, self).setUp() - - # Make sure we have a project - self.project = resources.get_or_create_project(self.project_id) - self.creator_user_id = 'creatorUserId' - - def test_can_create_new_stored_key_order(self): - container_name = 'rsa container name' - container_type = 'rsa' - secret_refs = [] - resp, container_id = create_container( - self.app, - name=container_name, - container_type=container_type, - secret_refs=secret_refs - ) - stored_key_meta = { - 'request_type': 'stored-key', - 'subject_dn': 'cn=barbican-server,o=example.com', - 'container_ref': 'https://localhost/v1/containers/' + container_id - } - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=stored_key_meta - ) - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_uuid, self.project_id) - self.assertIsInstance(order, models.Order) - - def _setup_acl_order_context_and_create_order( - self, add_acls=False, read_project_access=True, order_roles=None, - order_user=None, expect_errors=False): - """Helper method to setup acls, order context and return created order. - - Create order uses actual oslo policy enforcer instead of being None. - Create ACLs for container if 'add_acls' is True. - Make container private when 'read_project_access' is False. - """ - - container_name = 'rsa container name' - container_type = 'rsa' - secret_refs = [] - self.app.extra_environ = { - 'barbican.context': self._build_context(self.project_id, - user=self.creator_user_id) - } - _, container_id = create_container( - self.app, - name=container_name, - container_type=container_type, - secret_refs=secret_refs - ) - - if add_acls: - test_acls.manage_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u3', 'u4'], - read_project_access=read_project_access, - is_update=False) - - self.app.extra_environ = { - 'barbican.context': self._build_context( - self.project_id, roles=order_roles, user=order_user, - is_admin=False, policy_enforcer=self.policy_enforcer) - } - - stored_key_meta = { - 'request_type': 'stored-key', - 'subject_dn': 'cn=barbican-server,o=example.com', - 'container_ref': 'https://localhost/v1/containers/' + container_id - } - return create_order( - self.app, - order_type='certificate', - meta=stored_key_meta, - expect_errors=expect_errors - ) - - def test_can_create_new_stored_key_order_no_acls_and_policy_check(self): - """Create stored key order with actual policy enforcement logic. - - Order can be created as long as order project and user roles are - allowed in policy. In the test, user requesting order has container - project and has 'creator' role. Order should be created regardless - of what user id is. - """ - - create_resp, order_id = self._setup_acl_order_context_and_create_order( - add_acls=False, read_project_access=True, order_roles=['creator'], - order_user='anyUserId', expect_errors=False) - - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_id, self.project_id) - self.assertIsInstance(order, models.Order) - self.assertEqual('anyUserId', order.creator_id) - - def test_should_fail_for_user_observer_role_no_acls_and_policy_check(self): - """Should not allow create order when user doesn't have necessary role. - - Order can be created as long as order project and user roles are - allowed in policy. In the test, user requesting order has container - project but has 'observer' role. Create order should fail as expected - role is 'admin' or 'creator'. - """ - - create_resp, _ = self._setup_acl_order_context_and_create_order( - add_acls=False, read_project_access=True, order_roles=['observer'], - order_user='anyUserId', expect_errors=True) - self.assertEqual(403, create_resp.status_int) - - def test_can_create_order_with_private_container_and_creator_user(self): - """Create order using private container with creator user. - - Container has been marked private via ACLs. Still creator of container - should be able to create stored key order using that container - successfully. - """ - create_resp, order_id = self._setup_acl_order_context_and_create_order( - add_acls=True, read_project_access=False, order_roles=['creator'], - order_user=self.creator_user_id, expect_errors=False) - - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_id, self.project_id) - self.assertIsInstance(order, models.Order) - self.assertEqual(self.creator_user_id, order.creator_id) - - def test_can_create_order_with_private_container_and_acl_user(self): - """Create order using private container with acl user. - - Container has been marked private via ACLs. So *generally* project user - should not be able to create stored key order using that container. - But here it can create order as that user is defined in read ACL user - list. Here project user means user which has 'creator' role in the - container project. Order project is same as container. - """ - - create_resp, order_id = self._setup_acl_order_context_and_create_order( - add_acls=True, read_project_access=False, order_roles=['creator'], - order_user='u3', expect_errors=False) - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_id, self.project_id) - self.assertIsInstance(order, models.Order) - self.assertEqual('u3', order.creator_id) - - def test_should_raise_with_private_container_and_project_user(self): - """Create order should fail using private container for project user. - - Container has been marked private via ACLs. So project user should not - be able to create stored key order using that container. Here project - user means user which has 'creator' role in the container project. - Order project is same as container. If container was not marked - private, this user would have been able to create order. See next test. - """ - - create_resp, _ = self._setup_acl_order_context_and_create_order( - add_acls=True, read_project_access=False, order_roles=['creator'], - order_user='anyProjectUser', expect_errors=True) - - self.assertEqual(403, create_resp.status_int) - - def test_can_create_order_with_non_private_acls_and_project_user(self): - """Create order using non-private container with project user. - - Container has not been marked private via ACLs. So project user should - be able to create stored key order using that container successfully. - Here project user means user which has 'creator' role in the container - project. Order project is same as container. - """ - create_resp, order_id = self._setup_acl_order_context_and_create_order( - add_acls=True, read_project_access=True, order_roles=['creator'], - order_user='anyProjectUser', expect_errors=False) - - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_id, self.project_id) - self.assertIsInstance(order, models.Order) - self.assertEqual('anyProjectUser', order.creator_id) - - def test_can_create_order_with_non_private_acls_and_creator_user(self): - """Create order using non-private container with creator user. - - Container has not been marked private via ACLs. So user who created - container should be able to create stored key order using that - container successfully. Order project is same as container. - """ - create_resp, order_id = self._setup_acl_order_context_and_create_order( - add_acls=True, read_project_access=True, order_roles=['creator'], - order_user=self.creator_user_id, expect_errors=False) - - self.assertEqual(202, create_resp.status_int) - - order = order_repo.get(order_id, self.project_id) - self.assertIsInstance(order, models.Order) - self.assertEqual(self.creator_user_id, order.creator_id) - - def test_should_raise_with_bad_container_ref(self): - stored_key_meta = { - 'request_type': 'stored-key', - 'subject_dn': 'cn=barbican-server,o=example.com', - 'container_ref': 'bad_ref' - } - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=stored_key_meta, - expect_errors=True - ) - self.assertEqual(400, create_resp.status_int) - - def test_should_raise_with_container_not_found(self): - stored_key_meta = { - 'request_type': 'stored-key', - 'subject_dn': 'cn=barbican-server,o=example.com', - 'container_ref': 'https://localhost/v1/containers/not_found' - } - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=stored_key_meta, - expect_errors=True - ) - self.assertEqual(400, create_resp.status_int) - - def test_should_raise_with_container_wrong_type(self): - container_name = 'generic container name' - container_type = 'generic' - secret_refs = [] - resp, container_id = create_container( - self.app, - name=container_name, - container_type=container_type, - secret_refs=secret_refs - ) - stored_key_meta = { - 'request_type': 'stored-key', - 'subject_dn': 'cn=barbican-server,o=example.com', - 'container_ref': 'https://localhost/v1/containers/' + container_id - } - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=stored_key_meta, - expect_errors=True - ) - self.assertEqual(400, create_resp.status_int) - - def test_should_raise_with_container_no_access(self): - stored_key_meta = { - 'request_type': 'stored-key', - 'subject_dn': 'cn=barbican-server,o=example.com', - 'container_ref': 'https://localhost/v1/containers/no_access' - } - create_resp, order_uuid = create_order( - self.app, - order_type='certificate', - meta=stored_key_meta, - expect_errors=True - ) - self.assertEqual(400, create_resp.status_int) - - class WhenPerformingUnallowedOperations(utils.BarbicanAPIBaseTestCase): def test_should_not_allow_put_orders(self): resp = self.app.put_json('/orders/', expect_errors=True) diff --git a/barbican/tests/queue/test_client.py b/barbican/tests/queue/test_client.py index ffab6c44b..2eed809f5 100644 --- a/barbican/tests/queue/test_client.py +++ b/barbican/tests/queue/test_client.py @@ -52,29 +52,6 @@ class WhenUsingAsyncTaskClient(utils.BaseTestCase): project_id=self.external_project_id, request_id=self.request_id) - def test_should_update_order(self): - updated_meta = {} - self.client.update_order(order_id=self.order_id, - project_id=self.external_project_id, - updated_meta=updated_meta, - request_id=self.request_id) - self.mock_client.cast.assert_called_with( - {}, 'update_order', order_id=self.order_id, - project_id=self.external_project_id, - updated_meta=updated_meta, - request_id=self.request_id) - - def test_should_check_certificate_order(self): - self.client.check_certificate_status( - order_id=self.order_id, - project_id=self.external_project_id, - request_id=self.request_id) - self.mock_client.cast.assert_called_with( - {}, 'check_certificate_status', - order_id=self.order_id, - project_id=self.external_project_id, - request_id=self.request_id) - class WhenCreatingDirectTaskClient(utils.BaseTestCase): """Test using the synchronous task client (i.e. standalone mode).""" diff --git a/barbican/tests/queue/test_server.py b/barbican/tests/queue/test_server.py index b2177a4f2..00abe5124 100644 --- a/barbican/tests/queue/test_server.py +++ b/barbican/tests/queue/test_server.py @@ -303,26 +303,6 @@ class WhenCallingTasksMethod(utils.BaseTestCase): mock.ANY, 'result', None, 'order1234', 'keystone1234', 'request1234') - @mock.patch('barbican.queue.server.schedule_order_retry_tasks') - @mock.patch('barbican.tasks.resources.UpdateOrder') - def test_should_process_update_order( - self, mock_update_order, mock_schedule): - method = mock_update_order.return_value.process_and_suppress_exceptions - method.return_value = 'result' - updated_meta = {'foo': 1} - - self.tasks.update_order( - None, self.order_id, self.external_project_id, - updated_meta, self.request_id) - - mock_process = mock_update_order.return_value - mock_process.process_and_suppress_exceptions.assert_called_with( - self.order_id, self.external_project_id, updated_meta - ) - mock_schedule.assert_called_with( - mock.ANY, 'result', None, - 'order1234', 'keystone1234', updated_meta, 'request1234') - @mock.patch('barbican.queue.server.schedule_order_retry_tasks') @mock.patch('barbican.tasks.resources.CheckCertificateStatusOrder') def test_should_check_certificate_order( @@ -438,26 +418,3 @@ class WhenUsingTaskServer(database_utils.RepositoryTestCase): self.assertEqual( six.u('500'), order_result.error_status_code) - - def test_process_bogus_update_type_order_should_not_rollback(self): - order_id = self.order.id - self.order.type = 'bogus-type' # Force error out of business logic. - - # Invoke process, including the transactional decorator that terminates - # the session when it is done. Hence we must re-retrieve the order for - # verification afterwards. - self.server.update_order( - None, self.order.id, self.external_id, None, self.request_id) - - order_repo = repositories.get_order_repository() - order_result = order_repo.get(order_id, self.external_id) - - self.assertEqual(models.States.ERROR, order_result.status) - self.assertEqual( - six.u( - 'Update Order failure seen - ' - 'please contact site administrator.'), - order_result.error_reason) - self.assertEqual( - six.u('500'), - order_result.error_status_code) diff --git a/functionaltests/api/v1/behaviors/ca_behaviors.py b/functionaltests/api/v1/behaviors/ca_behaviors.py deleted file mode 100644 index 192f8ceec..000000000 --- a/functionaltests/api/v1/behaviors/ca_behaviors.py +++ /dev/null @@ -1,183 +0,0 @@ -""" -Copyright 2015 Red Hat, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -from functionaltests.api.v1.behaviors import base_behaviors -from functionaltests.api.v1.models import ca_models - - -class CABehaviors(base_behaviors.BaseBehaviors): - - def get_ca(self, ca_ref, extra_headers=None, - use_auth=True, user_name=None): - """Handles getting a CA - - :param ca_ref: href for a CA - :param extra_headers: extra HTTP headers for the GET request - :param use_auth: Boolean for whether to send authentication headers - :param user_name: The user name used for request - :return: a request Response object - """ - return self.client.get(ca_ref, - response_model_type=ca_models.CAModel, - extra_headers=extra_headers, - use_auth=use_auth, user_name=user_name) - - def get_cacert(self, ca_ref, payload_content_encoding=None, - extra_headers=None, - use_auth=True, user_name=None): - """Retrieve the CA signing certificate. """ - headers = {'Accept': 'application/octet-stream', - 'Accept-Encoding': payload_content_encoding} - if extra_headers: - headers.update(extra_headers) - - return self.client.get(ca_ref + '/cacert', - extra_headers=headers, use_auth=use_auth, - user_name=user_name) - - def get_cas(self, limit=10, offset=0, user_name=None): - """Handles getting a list of CAs. - - :param limit: limits number of returned CAs - :param offset: represents how many records to skip before retrieving - the list - :return: the response, a list of cas, total number of cas, next and - prev references - """ - resp = self.client.get('cas', user_name=user_name, - params={'limit': limit, 'offset': offset}) - - # TODO(alee) refactor to use he client's get_list_of_models() - - resp_json = self.get_json(resp) - cas, total, next_ref, prev_ref = [], 0, None, None - - for item in resp_json: - if 'next' == item: - next_ref = resp_json.get('next') - elif 'previous' == item: - prev_ref = resp_json.get('previous') - elif 'cas' == item: - cas = resp_json.get('cas') - elif 'total' == item: - total = resp_json.get('total') - - return resp, cas, total, next_ref, prev_ref - - def create_ca(self, model, headers=None, use_auth=True, - user_name=None, admin=None): - """Create a subordinate CA from the data in the model. - - :param model: The metadata used to create the subCA - :param use_auth: Boolean for whether to send authentication headers - :param user_name: The user name used to create the subCA - :param admin: The user with permissions to delete the subCA - :return: A tuple containing the response from the create - and the href to the newly created subCA - """ - - resp = self.client.post('cas', request_model=model, - extra_headers=headers, use_auth=use_auth, - user_name=user_name) - - # handle expected JSON parsing errors for unauthenticated requests - if resp.status_code == 401 and not use_auth: - return resp, None - - returned_data = self.get_json(resp) - ca_ref = returned_data.get('ca_ref') - if ca_ref: - if admin is None: - admin = user_name - self.created_entities.append((ca_ref, admin)) - return resp, ca_ref - - def delete_ca(self, ca_ref, extra_headers=None, - expected_fail=False, use_auth=True, user_name=None): - """Delete a secret. - - :param ca_ref: HATEOAS ref of the secret to be deleted - :param extra_headers: Optional HTTP headers to add to the request - :param expected_fail: If test is expected to fail the deletion - :param use_auth: Boolean for whether to send authentication headers - :param user_name: The user name used to delete the entity - :return: A request response object - """ - resp = self.client.delete(ca_ref, extra_headers=extra_headers, - use_auth=use_auth, user_name=user_name) - - if not expected_fail: - for item in self.created_entities: - if item[0] == ca_ref: - self.created_entities.remove(item) - - return resp - - def delete_all_created_cas(self): - """Delete all of the cas that we have created.""" - entities = list(self.created_entities) - for (ca_ref, admin) in entities: - self.delete_ca(ca_ref, user_name=admin) - - def add_ca_to_project(self, ca_ref, headers=None, use_auth=True, - user_name=None): - resp = self.client.post(ca_ref + '/add-to-project', - extra_headers=headers, use_auth=use_auth, - user_name=user_name) - return resp - - def remove_ca_from_project(self, ca_ref, headers=None, use_auth=True, - user_name=None): - resp = self.client.post(ca_ref + '/remove-from-project', - extra_headers=headers, use_auth=use_auth, - user_name=user_name) - return resp - - def set_preferred(self, ca_ref, headers=None, use_auth=True, - user_name=None): - resp = self.client.post(ca_ref + '/set-preferred', - extra_headers=headers, use_auth=use_auth, - user_name=user_name) - return resp - - def get_preferred(self, extra_headers=None, use_auth=True, - user_name=None): - resp = self.client.get('cas/preferred', - response_model_type=ca_models.CAModel, - extra_headers=extra_headers, use_auth=use_auth, - user_name=user_name) - return resp - - def set_global_preferred(self, ca_ref, headers=None, - use_auth=True, user_name=None): - resp = self.client.post(ca_ref + '/set-global-preferred', - extra_headers=headers, use_auth=use_auth, - user_name=user_name) - return resp - - def unset_global_preferred(self, headers=None, - use_auth=True, user_name=None): - resp = self.client.post('cas/unset-global-preferred', - extra_headers=headers, - use_auth=use_auth, user_name=user_name) - return resp - - def get_global_preferred(self, extra_headers=None, - use_auth=True, user_name=None): - resp = self.client.get('cas/global-preferred', - response_model_type=ca_models.CAModel, - extra_headers=extra_headers, - use_auth=use_auth, user_name=user_name) - return resp diff --git a/functionaltests/api/v1/functional/test_cas.py b/functionaltests/api/v1/functional/test_cas.py deleted file mode 100644 index 150dab7aa..000000000 --- a/functionaltests/api/v1/functional/test_cas.py +++ /dev/null @@ -1,717 +0,0 @@ -# Copyright (c) 2015 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import base64 -import copy -import datetime -import re -import testtools -import time - -from OpenSSL import crypto - -from barbican.common import hrefs -from barbican.plugin.interface import certificate_manager as cert_interface -from barbican.tests import certificate_utils as certutil -from functionaltests.api import base -from functionaltests.api.v1.behaviors import ca_behaviors -from functionaltests.api.v1.behaviors import container_behaviors -from functionaltests.api.v1.behaviors import order_behaviors -from functionaltests.api.v1.behaviors import secret_behaviors -from functionaltests.api.v1.models import ca_models -from functionaltests.api.v1.models import order_models -from functionaltests.common import config - -CONF = config.get_config() - -dogtag_subcas_enabled = False - -admin_a = CONF.rbac_users.admin_a -admin_b = CONF.rbac_users.admin_b -creator_a = CONF.rbac_users.creator_a -service_admin = CONF.identity.service_admin - -order_simple_cmc_request_data = { - 'type': 'certificate', - 'meta': { - 'request_type': 'simple-cmc', - 'requestor_name': 'Barbican User', - 'requestor_email': 'user@example.com', - 'requestor_phone': '555-1212' - } -} - - -BARBICAN_SRV_CONF = cert_interface.CONF - - -def is_plugin_enabled(plugin): - return plugin in BARBICAN_SRV_CONF.certificate.enabled_certificate_plugins - - -def depends_on_ca_plugins(*plugins): - def depends_on_ca_plugins_decorator(function): - def wrapper(instance, *args, **kwargs): - plugins_enabled = (is_plugin_enabled(p) for p in plugins) - if not all(plugins_enabled): - instance.skipTest("The following plugin(s) need to be " - "enabled: {}".format(plugins)) - function(instance, *args, **kwargs) - return wrapper - return depends_on_ca_plugins_decorator - - -def convert_to_X509Name(dn): - target = crypto.X509().get_subject() - fields = dn.split(',') - for field in fields: - m = re.search(r"(\w+)\s*=\s*(.+)", field.strip()) - name = m.group(1) - value = m.group(2) - if name.lower() == 'ou': - target.OU = value - elif name.lower() == 'st': - target.ST = value - elif name.lower() == 'cn': - target.CN = value - elif name.lower() == 'l': - target.L = value - elif name.lower() == 'o': - target.O = value - return target - - -class CATestCommon(base.TestCase): - - def setUp(self): - super(CATestCommon, self).setUp() - self.order_behaviors = order_behaviors.OrderBehaviors(self.client) - self.ca_behaviors = ca_behaviors.CABehaviors(self.client) - self.container_behaviors = container_behaviors.ContainerBehaviors( - self.client) - self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client) - self.simple_cmc_data = copy.deepcopy(order_simple_cmc_request_data) - - def tearDown(self): - self.order_behaviors.delete_all_created_orders() - self.ca_behaviors.delete_all_created_cas() - self.container_behaviors.delete_all_created_containers() - self.secret_behaviors.delete_all_created_secrets() - super(CATestCommon, self).tearDown() - - def send_test_order(self, ca_ref=None, user_name=None, - expected_return=202): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - if ca_ref is not None: - ca_id = hrefs.get_ca_id_from_ref(ca_ref) - test_model.meta['ca_id'] = ca_id - - create_resp, order_ref = self.order_behaviors.create_order( - test_model, user_name=user_name) - self.assertEqual(expected_return, create_resp.status_code) - if expected_return == 202: - self.assertIsNotNone(order_ref) - return order_ref - - def wait_for_order(self, order_resp, order_ref): - # Make sure we have an active order - time_count = 1 - while order_resp.model.status != "ACTIVE" and time_count <= 4: - time.sleep(1) - time_count += 1 - order_resp = self.behaviors.get_order(order_ref) - - def get_root_ca_ref(self, ca_plugin_name, ca_plugin_id): - (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas( - limit=100) - - for item in cas: - ca = self.ca_behaviors.get_ca(item) - if ca.model.plugin_name == ca_plugin_name: - if ca.model.plugin_ca_id == ca_plugin_id: - return item - return None - - def get_snakeoil_root_ca_ref(self): - return self.get_root_ca_ref( - ca_plugin_name=('barbican.plugin.snakeoil_ca.' - 'SnakeoilCACertificatePlugin'), - ca_plugin_id="Snakeoil CA") - - def get_dogtag_root_ca_ref(self): - return self.get_root_ca_ref( - ca_plugin_name='barbican.plugin.dogtag.DogtagCAPlugin', - ca_plugin_id="Dogtag CA") - - -class CertificateAuthoritiesTestCase(CATestCommon): - - def setUp(self): - super(CertificateAuthoritiesTestCase, self).setUp() - - self.subca_name = "Subordinate CA" - self.subca_description = "Test Snake Oil Subordinate CA" - self.subca_subca_name = "Sub-Sub CA" - self.subca_subca_description = "Test Snake Oil Sub-Sub CA" - - def get_signing_cert(self, ca_ref): - resp = self.ca_behaviors.get_cacert(ca_ref) - return crypto.load_certificate(crypto.FILETYPE_PEM, resp.text) - - def verify_signing_cert(self, ca_ref, subject_dn, issuer_dn): - cacert = self.get_signing_cert(ca_ref) - return ((cacert.get_subject() == subject_dn) and - (cacert.get_issuer() == issuer_dn)) - - def get_subca_model(self, root_ref): - now = datetime.datetime.utcnow().isoformat() - subject = "CN=Subordinate CA " + now + ", O=example.com" - return ca_models.CAModel( - parent_ca_ref=root_ref, - description=self.subca_description, - name=self.subca_name, - subject_dn=subject - ) - - def get_sub_subca_model(self, parent_ca_ref): - now = datetime.datetime.utcnow().isoformat() - subject = "CN=sub sub CA " + now + ", O=example.com" - return ca_models.CAModel( - parent_ca_ref=parent_ca_ref, - description=self.subca_subca_description, - name=self.subca_subca_name, - subject_dn=subject - ) - - @depends_on_ca_plugins('snakeoil_ca') - def test_create_snakeoil_subca(self): - self._create_and_verify_subca(self.get_snakeoil_root_ca_ref()) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_create_dogtag_subca(self): - self._create_and_verify_subca(self.get_dogtag_root_ca_ref()) - - def _create_and_verify_subca(self, root_ca_ref): - ca_model = self.get_subca_model(root_ca_ref) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model) - self.assertEqual(201, resp.status_code) - - root_subject = self.get_signing_cert(root_ca_ref).get_subject() - self.verify_signing_cert( - ca_ref=ca_ref, - subject_dn=convert_to_X509Name(ca_model.subject_dn), - issuer_dn=root_subject) - - resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref) - self.assertEqual(204, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_create_subca_of_snakeoil_subca(self): - self._create_subca_of_subca(self.get_snakeoil_root_ca_ref()) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_create_subca_of_dogtag_subca(self): - self._create_subca_of_subca(self.get_dogtag_root_ca_ref()) - - def _create_subca_of_subca(self, root_ca_ref): - parent_model = self.get_subca_model(root_ca_ref) - resp, parent_ref = self.ca_behaviors.create_ca(parent_model) - self.assertEqual(201, resp.status_code) - - child_model = self.get_sub_subca_model(parent_ref) - resp, child_ref = self.ca_behaviors.create_ca(child_model) - self.assertEqual(201, resp.status_code) - - parent_subject = self.get_signing_cert(parent_ref).get_subject() - self.verify_signing_cert( - ca_ref=child_ref, - subject_dn=convert_to_X509Name(child_model.subject_dn), - issuer_dn=parent_subject) - - resp = self.ca_behaviors.delete_ca(ca_ref=child_ref) - self.assertEqual(204, resp.status_code) - resp = self.ca_behaviors.delete_ca(ca_ref=parent_ref) - self.assertEqual(204, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_fail_to_create_subca_of_snakeoil_not_owned_subca(self): - self._fail_to_create_subca_of_not_owned_subca( - self.get_snakeoil_root_ca_ref()) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_fail_to_create_subca_of_dogtag_not_owned_subca(self): - self._fail_to_create_subca_of_not_owned_subca( - self.get_dogtag_root_ca_ref()) - - def _fail_to_create_subca_of_not_owned_subca(self, root_ca_ref): - parent_model = self.get_subca_model(root_ca_ref) - resp, parent_ref = self.ca_behaviors.create_ca(parent_model) - self.assertEqual(201, resp.status_code) - - child_model = self.get_sub_subca_model(parent_ref) - resp, child_ref = self.ca_behaviors.create_ca(child_model, - user_name=admin_a) - self.assertEqual(403, resp.status_code) - - resp = self.ca_behaviors.delete_ca(ca_ref=parent_ref) - self.assertEqual(204, resp.status_code) - - def test_create_subca_with_invalid_parent_ca_id(self): - ca_model = self.get_subca_model( - 'http://localhost:9311/cas/invalid_ref' - ) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model) - self.assertEqual(400, resp.status_code) - - def test_create_subca_with_missing_parent_ca_id(self): - ca_model = self.get_subca_model( - 'http://localhost:9311/cas/missing_ref' - ) - del ca_model.parent_ca_ref - resp, ca_ref = self.ca_behaviors.create_ca(ca_model) - self.assertEqual(400, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_create_snakeoil_subca_with_missing_subjectdn(self): - self._create_subca_with_missing_subjectdn( - self.get_snakeoil_root_ca_ref()) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_create_dogtag_subca_with_missing_subjectdn(self): - self._create_subca_with_missing_subjectdn( - self.get_dogtag_root_ca_ref()) - - def _create_subca_with_missing_subjectdn(self, root_ca_ref): - ca_model = self.get_subca_model(root_ca_ref) - del ca_model.subject_dn - resp, ca_ref = self.ca_behaviors.create_ca(ca_model) - self.assertEqual(400, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_create_snakeoil_subca_and_send_cert_order(self): - self._create_subca_and_send_cert_order( - self.get_snakeoil_root_ca_ref()) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_create_dogtag_subca_and_send_cert_order(self): - self._create_subca_and_send_cert_order( - self.get_dogtag_root_ca_ref()) - - def _create_subca_and_send_cert_order(self, root_ca): - ca_model = self.get_subca_model(root_ca) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model) - self.assertEqual(201, resp.status_code) - self.send_test_order(ca_ref) - - resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref) - self.assertEqual(204, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_add_snakeoil_ca__to_project_and_get_preferred(self): - self._add_ca__to_project_and_get_preferred( - self.get_snakeoil_root_ca_ref() - ) - - @depends_on_ca_plugins('dogtag') - def test_add_dogtag_ca__to_project_and_get_preferred(self): - self._add_ca__to_project_and_get_preferred( - self.get_dogtag_root_ca_ref() - ) - - def _add_ca__to_project_and_get_preferred(self, ca_ref): - resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a) - self.assertEqual(204, resp.status_code) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(200, resp.status_code) - ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref) - self.assertEqual(hrefs.get_ca_id_from_ref(ca_ref), ca_id) - - resp = self.ca_behaviors.remove_ca_from_project( - ca_ref, user_name=admin_a) - self.assertEqual(204, resp.status_code) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(404, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_try_and_fail_to_add_to_proj_snakeoil_subca_that_is_not_mine(self): - self._try_and_fail_to_add_to_proj_subca_that_is_not_mine( - self.get_snakeoil_root_ca_ref() - ) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_try_and_fail_to_add_to_proj_dogtag_subca_that_is_not_mine(self): - self._try_and_fail_to_add_to_proj_subca_that_is_not_mine( - self.get_dogtag_root_ca_ref() - ) - - def _try_and_fail_to_add_to_proj_subca_that_is_not_mine(self, root_ca_ref): - ca_model = self.get_subca_model(root_ca_ref) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a) - self.assertEqual(201, resp.status_code) - - resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_b) - self.assertEqual(403, resp.status_code) - - resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref, user_name=admin_a) - self.assertEqual(204, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_create_and_delete_snakeoil_subca(self): - self._create_and_delete_subca( - self.get_snakeoil_root_ca_ref() - ) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_create_and_delete_dogtag_subca(self): - self._create_and_delete_subca( - self.get_dogtag_root_ca_ref() - ) - - def _create_and_delete_subca(self, root_ca_ref): - ca_model = self.get_subca_model(root_ca_ref) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model) - self.assertEqual(201, resp.status_code) - - self.ca_behaviors.delete_ca(ca_ref) - resp = self.ca_behaviors.get_ca(ca_ref) - self.assertEqual(404, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_create_and_delete_snakeoil_subca_and_artifacts(self): - ca_model = self.get_subca_model(self.get_snakeoil_root_ca_ref()) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a) - self.assertEqual(201, resp.status_code) - resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a) - self.assertEqual(204, resp.status_code) - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(200, resp.status_code) - - self.ca_behaviors.delete_ca(ca_ref, user_name=admin_a) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(404, resp.status_code) - resp = self.ca_behaviors.get_ca(ca_ref, user_name=admin_a) - self.assertEqual(404, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_fail_to_delete_top_level_snakeoil_ca(self): - self._fail_to_delete_top_level_ca( - self.get_snakeoil_root_ca_ref() - ) - - @depends_on_ca_plugins('dogtag') - def test_fail_to_delete_top_level_dogtag_ca(self): - self._fail_to_delete_top_level_ca( - self.get_dogtag_root_ca_ref() - ) - - def _fail_to_delete_top_level_ca(self, root_ca_ref): - resp = self.ca_behaviors.delete_ca( - root_ca_ref, - expected_fail=True) - self.assertEqual(403, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_create_snakeoil_subca_and_get_cacert(self): - self._create_subca_and_get_cacert( - self.get_snakeoil_root_ca_ref() - ) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_create_dogtag_subca_and_get_cacert(self): - self._create_subca_and_get_cacert( - self.get_dogtag_root_ca_ref() - ) - - def _create_subca_and_get_cacert(self, root_ca_ref): - ca_model = self.get_subca_model(root_ca_ref) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a) - self.assertEqual(201, resp.status_code) - resp = self.ca_behaviors.get_cacert(ca_ref, user_name=admin_a) - self.assertEqual(200, resp.status_code) - crypto.load_certificate(crypto.FILETYPE_PEM, resp.text) - - resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref, user_name=admin_a) - self.assertEqual(204, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_try_and_fail_to_use_snakeoil_subca_that_is_not_mine(self): - self._try_and_fail_to_use_subca_that_is_not_mine( - self.get_snakeoil_root_ca_ref() - ) - - @testtools.skipIf(not dogtag_subcas_enabled, - "dogtag subcas are deprecated") - @depends_on_ca_plugins('dogtag') - def test_try_and_fail_to_use_dogtag_subca_that_is_not_mine(self): - self._try_and_fail_to_use_subca_that_is_not_mine( - self.get_dogtag_root_ca_ref() - ) - - def _try_and_fail_to_use_subca_that_is_not_mine(self, root_ca_ref): - ca_model = self.get_subca_model(root_ca_ref) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a) - self.assertEqual(201, resp.status_code) - - self.send_test_order(ca_ref=ca_ref, user_name=admin_a) - - self.send_test_order(ca_ref=ca_ref, user_name=admin_b, - expected_return=403) - - resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref, user_name=admin_a) - self.assertEqual(204, resp.status_code) - - @depends_on_ca_plugins('snakeoil_ca') - def test_create_snakeoil_subca_and_send_cert_order_and_verify_cert(self): - ca_model = self.get_subca_model(self.get_snakeoil_root_ca_ref()) - resp, ca_ref = self.ca_behaviors.create_ca(ca_model) - self.assertEqual(201, resp.status_code) - order_ref = self.send_test_order(ca_ref) - - order_resp = self.order_behaviors.get_order(order_ref=order_ref) - self.assertEqual(200, order_resp.status_code) - self.wait_for_order(order_resp=order_resp, order_ref=order_ref) - - container_resp = self.container_behaviors.get_container( - order_resp.model.container_ref) - self.assertEqual(200, container_resp.status_code) - - secret_dict = {} - for secret in container_resp.model.secret_refs: - self.assertIsNotNone(secret.secret_ref) - secret_resp = self.secret_behaviors.get_secret( - secret.secret_ref, "application/octet-stream") - self.assertIsNotNone(secret_resp) - secret_dict[secret.name] = secret_resp.content - - certificate = secret_dict['certificate'] - - new_cert = crypto.load_certificate(crypto.FILETYPE_PEM, certificate) - signing_cert = self.get_signing_cert(ca_ref) - - issuer = new_cert.get_issuer() - expected_issuer = signing_cert.get_subject() - self.assertEqual(expected_issuer, issuer) - - resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref) - self.assertEqual(204, resp.status_code) - - -class ListingCAsTestCase(CATestCommon): - """Tests for listing CAs. - - Must be in a separate class so that we can deselect them - in the parallel CA tests, until we can deselect specific tests - using a decorator. - """ - - def test_list_and_get_cas(self): - (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() - self.assertGreater(total, 0) - for item in cas: - ca = self.ca_behaviors.get_ca(item) - self.assertIsNotNone(ca.model.plugin_name) - self.assertIsNotNone(ca.model.ca_id) - self.assertIsNotNone(ca.model.plugin_ca_id) - - @depends_on_ca_plugins('snakeoil_ca', 'simple_certificate') - def test_list_snakeoil_and_simple_cert_cas(self): - """Test if backend loads these specific CAs - - Since the standard gate works with the snakeoil CA and the - simple_certificate CA. This test is just to make sure that these two - are specifically loaded. - """ - (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() - self.assertEqual(2, total) - - @depends_on_ca_plugins('dogtag') - def test_list_dogtag_cas(self): - """Test if backend loads this specific CA""" - (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() - self.assertGreater(total, 0) - - -class ProjectCATestCase(CATestCommon): - - @depends_on_ca_plugins('snakeoil_ca', 'simple_certificate') - def test_addition_of_project_ca_affects_getting_ca_list(self): - # Getting list of CAs should get the total configured CAs - (resp, cas, initial_total, _, __) = self.ca_behaviors.get_cas() - self.assertEqual(2, initial_total) - - # Set project CA - ca_ref = self.get_snakeoil_root_ca_ref() - resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a) - self.assertEqual(204, resp.status_code) - - # Getting list of CAs should get only the project CA for all users - (resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas( - user_name=admin_a) - self.assertEqual(1, project_ca_total) - # Getting list of CAs should get only the project CA for all users - (resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas( - user_name=creator_a) - self.assertEqual(1, project_ca_total) - - # Remove project CA - resp = self.ca_behaviors.remove_ca_from_project(ca_ref, - user_name=admin_a) - self.assertEqual(204, resp.status_code) - - # Getting list of CAs should get the total configured CAs (as seen - # before) - (resp, cas, final_total, _, __) = self.ca_behaviors.get_cas() - self.assertEqual(initial_total, final_total) - - -class GlobalPreferredCATestCase(CATestCommon): - - def setUp(self): - super(GlobalPreferredCATestCase, self).setUp() - (_, self.cas, self.num_cas, _, _) = self.ca_behaviors.get_cas() - self.ca_ids = [hrefs.get_ca_id_from_ref(ref) for ref in self.cas] - - def tearDown(self): - super(CATestCommon, self).tearDown() - - def test_global_preferred_no_project_admin_access(self): - resp = self.ca_behaviors.get_global_preferred() - self.assertEqual(403, resp.status_code) - resp = self.ca_behaviors.set_global_preferred(ca_ref=self.cas[0]) - self.assertEqual(403, resp.status_code) - resp = self.ca_behaviors.unset_global_preferred() - self.assertEqual(403, resp.status_code) - - def test_global_preferred_update(self): - if self.num_cas < 2: - self.skipTest("At least two CAs are required for this test") - resp = self.ca_behaviors.set_global_preferred( - ca_ref=self.cas[0], user_name=service_admin) - self.assertEqual(204, resp.status_code) - resp = self.ca_behaviors.get_global_preferred(user_name=service_admin) - self.assertEqual(200, resp.status_code) - ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref) - self.assertEqual(self.ca_ids[0], ca_id) - - resp = self.ca_behaviors.set_global_preferred( - ca_ref=self.cas[1], user_name=service_admin) - self.assertEqual(204, resp.status_code) - resp = self.ca_behaviors.get_global_preferred(user_name=service_admin) - self.assertEqual(200, resp.status_code) - ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref) - self.assertEqual(self.ca_ids[1], ca_id) - - resp = self.ca_behaviors.unset_global_preferred( - user_name=service_admin) - self.assertEqual(204, resp.status_code) - - def test_global_preferred_set_and_unset(self): - resp = self.ca_behaviors.set_global_preferred( - ca_ref=self.cas[0], user_name=service_admin) - self.assertEqual(204, resp.status_code) - resp = self.ca_behaviors.get_global_preferred(user_name=service_admin) - self.assertEqual(200, resp.status_code) - ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref) - self.assertEqual(self.ca_ids[0], ca_id) - - resp = self.ca_behaviors.unset_global_preferred( - user_name=service_admin) - self.assertEqual(204, resp.status_code) - resp = self.ca_behaviors.get_global_preferred(user_name=service_admin) - self.assertEqual(404, resp.status_code) - - def test_global_preferred_affects_project_preferred(self): - if self.num_cas < 2: - self.skipTest("At least two CAs are required for this test") - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(404, resp.status_code) - - resp = self.ca_behaviors.set_global_preferred( - ca_ref=self.cas[1], user_name=service_admin) - self.assertEqual(204, resp.status_code) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(200, resp.status_code) - ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref) - self.assertEqual(self.ca_ids[1], ca_id) - - resp = self.ca_behaviors.unset_global_preferred( - user_name=service_admin) - self.assertEqual(204, resp.status_code) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(404, resp.status_code) - - def test_project_preferred_overrides_global_preferred(self): - if self.num_cas < 2: - self.skipTest("At least two CAs are required for this test") - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(404, resp.status_code) - - resp = self.ca_behaviors.set_global_preferred( - ca_ref=self.cas[1], user_name=service_admin) - self.assertEqual(204, resp.status_code) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(200, resp.status_code) - ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref) - self.assertEqual(self.ca_ids[1], ca_id) - - resp = self.ca_behaviors.add_ca_to_project( - ca_ref=self.cas[0], user_name=admin_a) - self.assertEqual(204, resp.status_code) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(200, resp.status_code) - ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref) - self.assertEqual(self.ca_ids[0], ca_id) - - resp = self.ca_behaviors.remove_ca_from_project( - ca_ref=self.cas[0], user_name=admin_a) - self.assertEqual(204, resp.status_code) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - ca_id = hrefs.get_ca_id_from_ref(resp.model.ca_ref) - self.assertEqual(self.ca_ids[1], ca_id) - - resp = self.ca_behaviors.unset_global_preferred( - user_name=service_admin) - self.assertEqual(204, resp.status_code) - - resp = self.ca_behaviors.get_preferred(user_name=admin_a) - self.assertEqual(404, resp.status_code) diff --git a/functionaltests/api/v1/functional/test_certificate_orders.py b/functionaltests/api/v1/functional/test_certificate_orders.py deleted file mode 100644 index 64fba55f0..000000000 --- a/functionaltests/api/v1/functional/test_certificate_orders.py +++ /dev/null @@ -1,767 +0,0 @@ -# Copyright (c) 2015 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -# implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import base64 -import copy -from oslo_serialization import jsonutils -import time - -from OpenSSL import crypto -import testtools - -from barbican.plugin.interface import secret_store as s -from barbican.tasks import certificate_resources as cert_res -from barbican.tests import certificate_utils as certutil -from barbican.tests import keys -from functionaltests.api import base -from functionaltests.api.v1.behaviors import ca_behaviors -from functionaltests.api.v1.behaviors import container_behaviors -from functionaltests.api.v1.behaviors import order_behaviors -from functionaltests.api.v1.behaviors import secret_behaviors -from functionaltests.api.v1.models import container_models -from functionaltests.api.v1.models import order_models -from functionaltests.api.v1.models import secret_models - -try: - import pki # flake8: noqa - dogtag_imports_ok = True -except ImportError: - # dogtag libraries not available, assume dogtag not installed - dogtag_imports_ok = False - - -NOT_FOUND_CONTAINER_REF = "http://localhost:9311/v1/containers/not_found" -INVALID_CONTAINER_REF = "invalid" - - -order_simple_cmc_request_data = { - 'type': 'certificate', - 'meta': { - 'request_type': 'simple-cmc', - 'requestor_name': 'Barbican User', - 'requestor_email': 'user@example.com', - 'requestor_phone': '555-1212' - } -} - -order_full_cmc_request_data = { - 'type': 'certificate', - 'meta': { - 'request_type': 'full-cmc', - 'requestor_name': 'Barbican User', - 'requestor_email': 'user@example.com', - 'requestor_phone': '555-1212' - } -} - -order_stored_key_request_data = { - 'type': 'certificate', - 'meta': { - 'request_type': 'stored-key', - 'subject_dn': 'cn=server.example.com,o=example.com', - 'requestor_name': 'Barbican User', - 'requestor_email': 'user@example.com', - 'requestor_phone': '555-1212' - } -} - -order_dogtag_custom_request_data = { - 'type': 'certificate', - 'meta': { - 'request_type': 'custom', - 'cert_request_type': 'pkcs10', - 'profile_id': 'caServerCert' - } -} - -create_container_rsa_data = { - "name": "rsacontainer", - "type": "rsa", - "secret_refs": [ - { - "name": "public_key", - }, - { - "name": "private_key", - }, - { - "name": "private_key_passphrase" - } - ] -} - -def get_private_key_req(): - return {'name': 'myprivatekey', - 'payload_content_type': 'application/octet-stream', - 'payload_content_encoding': 'base64', - 'algorithm': 'rsa', - 'bit_length': 2048, - 'secret_type': s.SecretType.PRIVATE, - 'payload': base64.b64encode(keys.get_private_key_pem())} - - -def get_public_key_req(): - return {'name': 'mypublickey', - 'payload_content_type': 'application/octet-stream', - 'payload_content_encoding': 'base64', - 'algorithm': 'rsa', - 'bit_length': 2048, - 'secret_type': s.SecretType.PUBLIC, - 'payload': base64.b64encode(keys.get_public_key_pem())} - - -create_generic_container_data = { - "name": "containername", - "type": "generic", - "secret_refs": [ - { - "name": "secret1", - }, - { - "name": "secret2", - }, - { - "name": "secret3" - } - ] -} - - -class CertificatesTestCase(base.TestCase): - - def setUp(self): - super(CertificatesTestCase, self).setUp() - self.behaviors = order_behaviors.OrderBehaviors(self.client) - self.ca_behaviors = ca_behaviors.CABehaviors(self.client) - self.container_behaviors = container_behaviors.ContainerBehaviors( - self.client) - self.secret_behaviors = secret_behaviors.SecretBehaviors(self.client) - self.simple_cmc_data = copy.deepcopy(order_simple_cmc_request_data) - self.full_cmc_data = copy.deepcopy(order_full_cmc_request_data) - self.stored_key_data = copy.deepcopy(order_stored_key_request_data) - self.dogtag_custom_data = copy.deepcopy( - order_dogtag_custom_request_data) - - def tearDown(self): - self.behaviors.delete_all_created_orders() - self.ca_behaviors.delete_all_created_cas() - self.container_behaviors.delete_all_created_containers() - self.secret_behaviors.delete_all_created_secrets() - super(CertificatesTestCase, self).tearDown() - - def wait_for_order( - self, order_ref, delay_before_check_seconds=1, max_wait_seconds=4): - time.sleep(delay_before_check_seconds) - - # Make sure we have an order in a terminal state - time_count = 1 - order_resp = self.behaviors.get_order(order_ref) - - while ((order_resp.model.status != "ACTIVE") and - (order_resp.model.status != "ERROR") and - time_count <= max_wait_seconds): - time.sleep(1) - time_count += 1 - order_resp = self.behaviors.get_order(order_ref) - return order_resp - - def create_asymmetric_key_container(self): - secret_model = secret_models.SecretModel(**get_private_key_req()) - secret_model.secret_type = s.SecretType.PRIVATE - resp, secret_ref_priv = self.secret_behaviors.create_secret( - secret_model) - self.assertEqual(201, resp.status_code) - - secret_model = secret_models.SecretModel(**get_public_key_req()) - secret_model.secret_type = s.SecretType.PUBLIC - resp, secret_ref_pub = self.secret_behaviors.create_secret( - secret_model) - self.assertEqual(201, resp.status_code) - - pub_key_ref = {'name': 'public_key', 'secret_ref': secret_ref_pub} - priv_key_ref = {'name': 'private_key', 'secret_ref': secret_ref_priv} - test_model = container_models.ContainerModel( - **create_container_rsa_data) - test_model.secret_refs = [pub_key_ref, priv_key_ref] - resp, container_ref = self.container_behaviors.create_container( - test_model) - self.assertEqual(201, resp.status_code) - - return container_ref - - def create_generic_container(self): - secret_model = secret_models.SecretModel(**get_private_key_req()) - secret_model.secret_type = s.SecretType.PRIVATE - resp, secret_ref = self.secret_behaviors.create_secret(secret_model) - self.assertEqual(201, resp.status_code) - - test_model = container_models.ContainerModel(**create_generic_container_data) - test_model.secret_refs = [{ - 'name': 'my_secret', - 'secret_ref': secret_ref - }] - resp, container_ref = self.container_behaviors.create_container(test_model) - self.assertEqual(201, resp.status_code) - return container_ref - - def get_dogtag_ca_id(self): - (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() - for item in cas: - ca = self.ca_behaviors.get_ca(item) - if ca.model.plugin_name == ( - 'barbican.plugin.dogtag.DogtagCAPlugin'): - return ca.model.ca_id - return None - - def verify_cert_returned(self, order_resp, is_stored_key_type=False): - container_ref = order_resp.model.container_ref - self.assertIsNotNone(container_ref, "no cert container returned") - - container_resp = self.container_behaviors.get_container(container_ref) - self.assertIsNotNone(container_resp, "Cert container returns None") - self.assertEqual('certificate', container_resp.model.type) - - secret_refs = container_resp.model.secret_refs - self.assertIsNotNone(secret_refs, "container has no secret refs") - - contains_cert = False - contains_private_key_ref = False - - for secret in secret_refs: - if secret.name == 'certificate': - contains_cert = True - self.assertIsNotNone(secret.secret_ref) - self.verify_valid_cert(secret.secret_ref) - if secret.name == 'intermediates': - self.assertIsNotNone(secret.secret_ref) - self.verify_valid_intermediates(secret.secret_ref) - if is_stored_key_type: - if secret.name == 'private_key': - contains_private_key_ref = True - self.assertIsNotNone(secret.secret_ref) - - self.assertTrue(contains_cert) - if is_stored_key_type: - self.assertTrue(contains_private_key_ref) - - def verify_valid_cert(self, secret_ref): - secret_resp = self.secret_behaviors.get_secret( - secret_ref, - "application/octet-stream") - self.assertIsNotNone(secret_resp) - self.assertIsNotNone(secret_resp.content) - cert = secret_resp.content - crypto.load_certificate(crypto.FILETYPE_PEM, cert) - - def verify_valid_intermediates(self, secret_ref): - secret_resp = self.secret_behaviors.get_secret( - secret_ref, - "application/octet-stream") - self.assertIsNotNone(secret_resp) - self.assertIsNotNone(secret_resp.content) - cert_chain = secret_resp.content - crypto.load_pkcs7_data(crypto.FILETYPE_PEM, cert_chain) - - def verify_pending_waiting_for_ca(self, order_resp): - self.assertEqual('PENDING', order_resp.model.status) - self.assertEqual(cert_res.ORDER_STATUS_REQUEST_PENDING.id, - order_resp.model.sub_status) - self.assertEqual(cert_res.ORDER_STATUS_REQUEST_PENDING.message, - order_resp.model.sub_status_message) - - def confirm_error_message(self, resp, message): - resp_dict = jsonutils.loads(resp.content) - self.assertEqual(message, resp_dict['description']) - - @testtools.testcase.attr('positive') - @testtools.skipIf(dogtag_imports_ok, "not applicable with dogtag plugin") - def test_create_simple_cmc_order(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.behaviors.get_order(order_ref) - self.verify_pending_waiting_for_ca(order_resp) - - # Wait for retry processing to handle checking for status with the - # default certificate plugin (which takes about 10 seconds +- 20%). - order_resp = self.wait_for_order( - order_ref, delay_before_check_seconds=20, max_wait_seconds=25) - - self.assertEqual('ACTIVE', order_resp.model.status) - - @testtools.testcase.attr('positive') - def test_create_simple_cmc_order_without_requestor_info(self): - self.simple_cmc_data.pop("requestor_name", None) - self.simple_cmc_data.pop("requestor_email", None) - self.simple_cmc_data.pop("requestor_phone", None) - - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.behaviors.get_order(order_ref) - self.verify_pending_waiting_for_ca(order_resp) - - @testtools.testcase.attr('positive') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_simple_cmc_order_with_dogtag_profile(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - test_model.meta['profile'] = 'caServerCert' - test_model.meta['ca_id'] = self.get_dogtag_ca_id() - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - - self.assertEqual('ACTIVE', order_resp.model.status) - self.verify_cert_returned(order_resp) - - @testtools.testcase.attr('negative') - def test_create_simple_cmc_with_profile_and_no_ca_id(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - test_model.meta['profile'] = 'caServerCert' - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.assertIsNone(order_ref) - self.confirm_error_message( - create_resp, - "Missing required metadata field for ca_id" - ) - - @testtools.testcase.attr('negative') - def test_create_simple_cmc_with_profile_and_incorrect_ca_id(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - test_model.meta['profile'] = 'caServerCert' - test_model.meta['ca_id'] = 'incorrect_ca_id' - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.assertIsNone(order_ref) - self.confirm_error_message( - create_resp, - "Order creation issue seen - The ca_id provided " - "in the request is invalid." - ) - - @testtools.testcase.attr('negative') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_simple_cmc_with_dogtag_and_invalid_subject_dn(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_csr_with_bad_subject_dn()) - test_model.meta['profile'] = 'caServerCert' - test_model.meta['ca_id'] = self.get_dogtag_ca_id() - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.assertEqual('ERROR', order_resp.model.status) - self.assertEqual('400', order_resp.model.error_status_code) - self.assertIn('Problem with data in certificate request', - order_resp.model.error_reason) - # TODO(alee) Dogtag does not currently return a error message - # when it does, check for that specific error message - - @testtools.testcase.attr('negative') - def test_create_simple_cmc_order_with_no_base64(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - # do not encode with base64 to force the error - test_model.meta['request_data'] = certutil.create_bad_csr() - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.assertIsNone(order_ref) - self.confirm_error_message(create_resp, - "Unable to decode request data.") - - @testtools.testcase.attr('negative') - def test_create_simple_cmc_order_with_invalid_pkcs10(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_bad_csr()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.assertIsNone(order_ref) - self.confirm_error_message(create_resp, - "Invalid PKCS10 Data: Bad format") - - @testtools.testcase.attr('negative') - def test_create_simple_csc_order_with_unsigned_pkcs10(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_csr_that_has_not_been_signed()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.assertIsNone(order_ref) - error_description = jsonutils.loads(create_resp.content)['description'] - self.assertIn("Invalid PKCS10 Data", error_description) - - @testtools.testcase.attr('negative') - def test_create_simple_csc_order_with_pkcs10_signed_by_wrong_key(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_csr_signed_with_wrong_key()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Invalid PKCS10 Data: Signing key incorrect" - ) - - @testtools.testcase.attr('negative') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_simple_cmc_order_with_invalid_dogtag_profile(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - test_model.meta['profile'] = 'invalidProfileID' - test_model.meta['ca_id'] = self.get_dogtag_ca_id() - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.assertEqual('ERROR', order_resp.model.status) - self.assertEqual('400', order_resp.model.error_status_code) - self.assertIn('Problem with data in certificate request', - order_resp.model.error_reason) - self.assertIn('Profile not found', - order_resp.model.error_reason) - - @testtools.testcase.attr('positive') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_simple_cmc_order_with_non_approved_dogtag_profile(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - test_model.meta['profile'] = 'caTPSCert' - test_model.meta['ca_id'] = self.get_dogtag_ca_id() - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.verify_pending_waiting_for_ca(order_resp) - - @testtools.testcase.attr('negative') - def test_create_simple_cmc_order_with_missing_request(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.assertIsNone(order_ref) - self.confirm_error_message( - create_resp, - "Missing required metadata field for request_data" - ) - - @testtools.testcase.attr('negative') - def test_create_full_cmc_order(self): - test_model = order_models.OrderModel(**self.full_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.assertIsNone(order_ref) - self.confirm_error_message( - create_resp, - "Full CMC Requests are not yet supported." - ) - - @testtools.testcase.attr('negative') - def test_create_cert_order_with_invalid_type(self): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - test_model.meta['request_type'] = "invalid_type" - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Invalid Certificate Request Type" - ) - - @testtools.testcase.attr('positive') - def test_create_stored_key_order(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = ( - self.create_asymmetric_key_container()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.behaviors.get_order(order_ref) - self.verify_pending_waiting_for_ca(order_resp) - - @testtools.testcase.attr('positive') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_stored_key_order_with_dogtag_profile(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = ( - self.create_asymmetric_key_container()) - test_model.meta['profile'] = "caServerCert" - test_model.meta['ca_id'] = self.get_dogtag_ca_id() - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.assertEqual('ACTIVE', order_resp.model.status) - self.verify_cert_returned(order_resp, is_stored_key_type=True) - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_invalid_container_ref(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = INVALID_CONTAINER_REF - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Order creation issue seen - " - "Invalid container: Bad Container Reference " - + INVALID_CONTAINER_REF + "." - ) - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_not_found_container_ref(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = NOT_FOUND_CONTAINER_REF - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Order creation issue seen - " - "Invalid container: Container Not Found." - ) - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_missing_container_ref(self): - test_model = order_models.OrderModel(**self.stored_key_data) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Missing required metadata field for container_ref" - ) - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_unauthorized_container_ref(self): - # TODO(alee) - Not sure how to do this - pass - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_invalid_container_type(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = (self.create_generic_container()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Order creation issue seen - " - "Invalid container: Container Wrong Type." - ) - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_container_secrets_inaccessible(self): - # TODO(alee) Not sure how to do this - pass - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_subject_dn_missing(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = ( - self.create_asymmetric_key_container()) - del test_model.meta['subject_dn'] - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Missing required metadata field for subject_dn" - ) - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_subject_dn_invalid(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = ( - self.create_asymmetric_key_container()) - test_model.meta['subject_dn'] = "invalid_subject_dn" - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Invalid subject DN: invalid_subject_dn" - ) - - @testtools.testcase.attr('negative') - def test_create_stored_key_order_with_extensions(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = ( - self.create_asymmetric_key_container()) - test_model.meta['extensions'] = "any-extensions" - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(400, create_resp.status_code) - self.confirm_error_message( - create_resp, - "Extensions are not yet supported. " - "Specify a valid profile instead." - ) - - @testtools.testcase.attr('positive') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_stored_key_order_with_non_approved_dogtag_profile(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = ( - self.create_asymmetric_key_container()) - test_model.meta['profile'] = "caTPSCert" - test_model.meta['ca_id'] = self.get_dogtag_ca_id() - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.verify_pending_waiting_for_ca(order_resp) - - @testtools.testcase.attr('negative') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_stored_key_order_with_invalid_dogtag_profile(self): - test_model = order_models.OrderModel(**self.stored_key_data) - test_model.meta['container_ref'] = ( - self.create_asymmetric_key_container()) - test_model.meta['profile'] = "invalidProfileID" - test_model.meta['ca_id'] = self.get_dogtag_ca_id() - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.assertEqual('ERROR', order_resp.model.status) - self.assertIn('Problem with data in certificate request', - order_resp.model.error_reason) - self.assertIn('Profile not found', - order_resp.model.error_reason) - - @testtools.testcase.attr('positive') - def test_create_cert_order_with_missing_request_type(self): - # defaults to 'custom' type - test_model = order_models.OrderModel(**self.dogtag_custom_data) - test_model.meta['cert_request'] = base64.b64encode( - certutil.create_good_csr()) - test_model.meta['profile_id'] = 'caTPSCert' - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.behaviors.get_order(order_ref) - self.verify_pending_waiting_for_ca(order_resp) - - @testtools.testcase.attr('positive') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_cert_order_with_missing_request_type_auto_enroll(self): - # defaults to 'custom' type - test_model = order_models.OrderModel(**self.dogtag_custom_data) - test_model.meta['cert_request'] = base64.b64encode( - certutil.create_good_csr()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.assertEqual('ACTIVE', order_resp.model.status) - self.verify_cert_returned(order_resp) - - @testtools.testcase.attr('positive') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_custom_order_with_valid_dogtag_data(self): - # defaults to 'custom' type - test_model = order_models.OrderModel(**self.dogtag_custom_data) - test_model.meta['cert_request'] = base64.b64encode( - certutil.create_good_csr()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.assertEqual('ACTIVE', order_resp.model.status) - self.verify_cert_returned(order_resp) - - @testtools.testcase.attr('negative') - @testtools.skipIf(not dogtag_imports_ok, "Dogtag imports not available") - def test_create_custom_order_with_invalid_dogtag_data(self): - # TODO(alee) this test is broken because Dogtag does not return the - # correct type of exception, Fix this when Dogtag is fixed. - test_model = order_models.OrderModel(**self.dogtag_custom_data) - test_model.meta['cert_request'] = "invalid_data" - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.wait_for_order(order_ref) - self.assertEqual('ERROR', order_resp.model.status) - # TODO(alee) confirm substatus - data error seen - - @testtools.testcase.attr('positive') - @testtools.skipIf(dogtag_imports_ok, "Non-Dogtag test only") - def test_create_custom_order_for_generic_plugin(self): - test_model = order_models.OrderModel(**self.dogtag_custom_data) - test_model.meta['container_ref'] = ( - self.create_asymmetric_key_container()) - - create_resp, order_ref = self.behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - order_resp = self.behaviors.get_order(order_ref) - self.assertEqual('PENDING', order_resp.model.status) diff --git a/functionaltests/api/v1/functional/test_quotas_enforce.py b/functionaltests/api/v1/functional/test_quotas_enforce.py index eb6b30248..e2b06f81c 100644 --- a/functionaltests/api/v1/functional/test_quotas_enforce.py +++ b/functionaltests/api/v1/functional/test_quotas_enforce.py @@ -18,13 +18,11 @@ import testtools from barbican.plugin.interface import certificate_manager as cert_interface from functionaltests.api import base -from functionaltests.api.v1.behaviors import ca_behaviors from functionaltests.api.v1.behaviors import consumer_behaviors from functionaltests.api.v1.behaviors import container_behaviors from functionaltests.api.v1.behaviors import order_behaviors from functionaltests.api.v1.behaviors import quota_behaviors from functionaltests.api.v1.behaviors import secret_behaviors -from functionaltests.api.v1.models import ca_models from functionaltests.api.v1.models import consumer_model from functionaltests.api.v1.models import container_models from functionaltests.api.v1.models import order_models @@ -55,7 +53,6 @@ class QuotaEnforcementTestCase(base.TestCase): self.order_behaviors = order_behaviors.OrderBehaviors(self.client) self.consumer_behaviors = consumer_behaviors.ConsumerBehaviors( self.client) - self.ca_behaviors = ca_behaviors.CABehaviors(self.client) self.secret_data = self.get_default_secret_data() self.quota_data = self.get_default_quota_data() @@ -70,7 +67,6 @@ class QuotaEnforcementTestCase(base.TestCase): self.consumer_behaviors.delete_all_created_consumers() self.container_behaviors.delete_all_created_containers() self.secret_behaviors.delete_all_created_secrets() - self.ca_behaviors.delete_all_created_cas() for secret_ref in self.order_secrets: resp = self.secret_behaviors.delete_secret( secret_ref, user_name=admin_b) @@ -150,32 +146,6 @@ class QuotaEnforcementTestCase(base.TestCase): self.create_consumers(count=5) self.create_consumers(expected_return=403) - @testtools.skipIf(not is_ca_backend_snakeoil(), - "This test is only usable with snakeoil") - def test_snakeoil_cas_unlimited(self): - self.set_quotas('cas', -1) - self.create_snakeoil_cas(count=5) - - @testtools.skipIf(not is_ca_backend_snakeoil(), - "This test is only usable with snakeoil") - def test_snakeoil_cas_disabled(self): - self.set_quotas('cas', 0) - self.create_snakeoil_cas(expected_return=403) - - @testtools.skipIf(not is_ca_backend_snakeoil(), - "This test is only usable with snakeoil") - def test_snakeoil_cas_limited_one(self): - self.set_quotas('cas', 1) - self.create_snakeoil_cas(count=1) - self.create_snakeoil_cas(expected_return=403) - - @testtools.skipIf(not is_ca_backend_snakeoil(), - "This test is only usable with snakeoil") - def test_snakeoil_cas_limited_five(self): - self.set_quotas('cas', 5) - self.create_snakeoil_cas(count=5) - self.create_snakeoil_cas(expected_return=403) - # ----------------------- Helper Functions --------------------------- def get_default_quota_data(self): @@ -269,46 +239,3 @@ class QuotaEnforcementTestCase(base.TestCase): resp, consumer_dat = self.consumer_behaviors.create_consumer( model, container_ref, user_name=admin_b) self.assertEqual(expected_return, resp.status_code) - - def get_order_simple_cmc_request_data(self): - return { - 'type': 'certificate', - 'meta': { - 'request_type': 'simple-cmc', - 'requestor_name': 'Barbican User', - 'requestor_email': 'user@example.com', - 'requestor_phone': '555-1212' - } - } - - def get_root_ca_ref(self): - if self.root_ca_ref is not None: - return self.root_ca_ref - - (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() - snake_name = 'barbican.plugin.snakeoil_ca.SnakeoilCACertificatePlugin' - snake_plugin_ca_id = "Snakeoil CA" - - for item in cas: - ca = self.ca_behaviors.get_ca(item) - if ca.model.plugin_name == snake_name: - if ca.model.plugin_ca_id == snake_plugin_ca_id: - return item - return None - - def get_snakeoil_subca_model(self): - parent_ca_ref = self.get_root_ca_ref() - return ca_models.CAModel( - parent_ca_ref=parent_ca_ref, - description="Test Snake Oil Subordinate CA", - name="Subordinate CA", - subject_dn="CN=Subordinate CA, O=example.com" - ) - - def create_snakeoil_cas(self, count=1, expected_return=201): - """Utility function to create snakeoil cas""" - for _ in range(count): - ca_model = self.get_snakeoil_subca_model() - resp, ca_ref = self.ca_behaviors.create_ca(ca_model, - user_name=admin_b) - self.assertEqual(expected_return, resp.status_code) diff --git a/releasenotes/notes/removing-cas-certificate-orders-96fc47a7acaea273.yaml b/releasenotes/notes/removing-cas-certificate-orders-96fc47a7acaea273.yaml new file mode 100644 index 000000000..fc774d5f6 --- /dev/null +++ b/releasenotes/notes/removing-cas-certificate-orders-96fc47a7acaea273.yaml @@ -0,0 +1,38 @@ +--- +prelude: > + This release notify that we will remove Certificate Orders and CAs from API. +deprecations: + - Certificate Orders + - CAs +other: + - Why are we deprecating Certificate Issuance? + There are a few reasons that were considered for this decision. First, + there does not seem to be a lot of interest in the community to fully + develop the Certificate Authority integration with Barbican. We have a + few outstanding blueprints that are needed to make Certificate Issuance + fully functional, but so far no one has committed to getting the work + done. Additionally, we've had very little buy-in from public + Certificate Authorities. Both Symantec and Digicert were interested in + integration in the past, but that interest didn't materialize into + robust CA plugins like we hoped it would. + + Secondly, there have been new developments in the space of Certificate + Authorities since we started Barbican. The most significant of these + was the launch of the Let's Encrypt public CA along with the definition + of the ACME protocol for certificate issuance. We believe that future + certificate authority services would do good to implement the ACME + standard, which is quite different than the API the Barbican team had + developed. + + Lastly, deprecating Certificate Issuance within Barbican will simplify + both the architecture and deployment of Barbican. This will allow us to + focus on the features that Barbican does well -- the secure storage of + secret material. + + - Will Barbican still be able to store Certificates? + + Yes, absolutely! The only thing we're deprecating is the plugin + interface that talks to Certificate Authorites and associated APIs. + While you will not be able to use Barbican to issue a new certificate, + you will always be able to securely store any certificates in Barbican, + including those issued by public CAs or internal CAs.