Check a CA's status as project and preferred CA before deleting

If a CA is preferred and not the last CA of a project, it should
not be deleted.  A user is informed of this with a 409 status code.

Otherwise, the CA can be deleted as well as any record of it in the
CA project list and the CA preferred list.

Change-Id: I9a1ee91252ee17746cfcffd11cba520270d09f21
Closes-bug: #1499876
This commit is contained in:
Dave McCowan 2015-09-25 17:30:46 -04:00
parent be40fa7d03
commit 4afaee095a
7 changed files with 140 additions and 24 deletions

View File

@ -189,20 +189,13 @@ class CertificateAuthorityController(controllers.ACLMixin):
ca_id = project_ca.ca_id
preferred_ca = self.preferred_ca_repo.get_project_entities(
project_id)[0]
if self._is_last_project_ca(project_id):
if cert_resources.is_last_project_ca(project_id):
self.preferred_ca_repo.delete_entity_by_id(preferred_ca.id, None)
else:
self._assert_is_not_preferred_ca(preferred_ca.ca_id, ca_id)
self.project_ca_repo.delete_entity_by_id(project_ca.id, None)
def _is_last_project_ca(self, project_id):
_cas, _offset, _limit, total = self.project_ca_repo.get_by_create_date(
project_id=project_id,
suppress_exception=True
)
return total == 1
def _assert_is_not_preferred_ca(self, preferred_ca_id, ca_id):
if preferred_ca_id == ca_id:
_cant_remove_preferred_ca_from_project()
@ -248,9 +241,6 @@ class CertificateAuthorityController(controllers.ACLMixin):
@controllers.handle_exceptions(u._('CA deletion'))
@controllers.enforce_rbac('certificate_authority:delete')
def on_delete(self, external_project_id, **kwargs):
# ensure user's project exists in DB before calling DB operation
res.get_or_create_project(external_project_id)
cert_resources.delete_subordinate_ca(external_project_id, self.ca)
LOG.info(u._LI('Deleted CA for project: %s'), external_project_id)

View File

@ -475,6 +475,12 @@ class UnauthorizedSubCA(BarbicanHTTPException):
status_code = 403
class CannotDeletePreferredCA(BarbicanHTTPException):
message = u._("A new project preferred CA must be set "
"before this one can be deleted.")
status_code = 409
class BadSubCACreationRequest(BarbicanHTTPException):
message = u._("Errors returned by CA when attempting to "
"create subordinate CA: %(reason)")

View File

@ -232,33 +232,72 @@ def create_subordinate_ca(project_model, name, description, subject_dn,
def delete_subordinate_ca(external_project_id, ca):
"""Deletes a subordinate CA
"""Deletes a subordinate CA and any related artifacts
:param external_project_id: external project ID
:param ca: class:`models.CertificateAuthority` to be deleted
:return: None
"""
# TODO(alee) See if the checks below can be moved to the RBAC code
# Check that this CA is a subCA
if ca.project_id is None:
raise excep.CannotDeleteBaseCA()
project = repos.get_project_repository().find_by_external_project_id(
external_project_id)
# Check that the user's project owns this subCA
project = res.get_or_create_project(external_project_id)
if ca.project_id != project.id:
raise excep.UnauthorizedSubCA()
project_ca_repo = repos.get_project_ca_repository()
(project_cas, _, _, _) = project_ca_repo.get_by_create_date(
project_id=project.id, ca_id=ca.id,
suppress_exception=True)
preferred_ca_repo = repos.get_preferred_ca_repository()
(preferred_cas, _, _, _) = preferred_ca_repo.get_by_create_date(
project_id=project.id, ca_id=ca.id, suppress_exception=True)
# Can not delete a project preferred CA, if other project CAs exist. One
# of those needs to be designated as the preferred CA first.
if project_cas and preferred_cas and not is_last_project_ca(project.id):
raise excep.CannotDeletePreferredCA()
# Remove the CA as preferred
if preferred_cas:
preferred_ca_repo.delete_entity_by_id(preferred_cas[0].id,
external_project_id)
# Remove the CA from project list
if project_cas:
project_ca_repo.delete_entity_by_id(project_cas[0].id,
external_project_id)
# Delete the CA entry from plugin
cert_plugin = cert.CertificatePluginManager().get_plugin_by_name(
ca.plugin_name)
cert_plugin.delete_ca(ca.plugin_ca_id)
# Delete the CA from the data model.
# Finally, delete the CA entity from the CA repository
ca_repo = repos.get_ca_repository()
ca_repo.delete_entity_by_id(
entity_id=ca.id,
external_project_id=external_project_id)
def is_last_project_ca(project_id):
"""Returns True iff project has exactly one project CA
:param project_id: internal project ID
:return: Boolean
"""
project_ca_repo = repos.get_project_ca_repository()
_, _, _, total = project_ca_repo.get_by_create_date(
project_id=project_id,
suppress_exception=True
)
return total == 1
def _handle_task_result(result, result_follow_on, order_model,
project_model, request_type, unavailable_status):
if cert.CertificateStatus.WAITING_FOR_CA == result.status:

View File

@ -35,6 +35,7 @@ from barbican.tests import utils
container_repo = repositories.get_container_repository()
secret_repo = repositories.get_secret_repository()
ca_repo = repositories.get_ca_repository()
project_ca_repo = repositories.get_project_ca_repository()
preferred_ca_repo = repositories.get_preferred_ca_repository()
project_repo = repositories.get_project_repository()
order_repo = repositories.get_order_repository()
@ -946,6 +947,65 @@ class WhenCreatingSubordinateCAs(utils.BaseTestCase):
cert_res.delete_subordinate_ca(self.project.external_id, subca)
self.cert_plugin.delete_ca.assert_called_once_with(subca.plugin_ca_id)
def test_should_delete_subca_and_all_related_db_entities(self):
subca = cert_res.create_subordinate_ca(
project_model=self.project,
name=self.name,
description=self.description,
subject_dn=self.subject_name,
parent_ca_ref=self.parent_ca_ref,
creator_id=self.creator_id
)
project_ca = models.ProjectCertificateAuthority(
self.project.id,
subca.id
)
project_ca_repo.create_from(project_ca)
preferred_ca = models.PreferredCertificateAuthority(
self.project.id,
subca.id)
preferred_ca_repo.create_from(preferred_ca)
cert_res.delete_subordinate_ca(self.project.external_id, subca)
self.cert_plugin.delete_ca.assert_called_once_with(subca.plugin_ca_id)
def test_should_raise_when_delete_pref_subca_with_other_project_ca(self):
subca = cert_res.create_subordinate_ca(
project_model=self.project,
name=self.name,
description=self.description,
subject_dn=self.subject_name,
parent_ca_ref=self.parent_ca_ref,
creator_id=self.creator_id
)
project_ca = models.ProjectCertificateAuthority(
self.project.id,
subca.id
)
project_ca_repo.create_from(project_ca)
preferred_ca = models.PreferredCertificateAuthority(
self.project.id,
subca.id)
preferred_ca_repo.create_from(preferred_ca)
subca2 = cert_res.create_subordinate_ca(
project_model=self.project,
name=self.name,
description=self.description,
subject_dn=self.subject_name,
parent_ca_ref=self.parent_ca_ref,
creator_id=self.creator_id
)
project_ca2 = models.ProjectCertificateAuthority(
self.project.id,
subca2.id
)
project_ca_repo.create_from(project_ca2)
self.assertRaises(
excep.CannotDeletePreferredCA,
cert_res.delete_subordinate_ca,
self.project.external_id,
subca
)
def test_should_raise_cannot_delete_base_ca(self):
self.assertRaises(
excep.CannotDeleteBaseCA,

View File

@ -275,7 +275,7 @@ nss_db_path_ca = '/etc/barbican/alias-ca'
nss_password = 'password123'
simple_cmc_profile = 'caOtherCert'
ca_expiration_time = 1
plugin_working_dir = '/etc/barbican/workdir'
plugin_working_dir = '/etc/barbican/dogtag'
[p11_crypto_plugin]

View File

@ -19,16 +19,20 @@ from functionaltests.api.v1.models import ca_models
class CABehaviors(base_behaviors.BaseBehaviors):
def get_ca(self, ca_ref, extra_headers=None):
def get_ca(self, ca_ref, extra_headers=None,
use_auth=True, user_name=None):
"""Handles getting a CA
:param ca_ref: href for a CA
:param extra_headers: extra HTTP headers for the GET request
:param use_auth: Boolean for whether to send authentication headers
:param user_name: The user name used for request
:return: a request Response object
"""
return self.client.get(ca_ref,
response_model_type=ca_models.CAModel,
extra_headers=extra_headers)
extra_headers=extra_headers,
use_auth=use_auth, user_name=user_name)
def get_cacert(self, ca_ref, payload_content_encoding=None,
extra_headers=None,

View File

@ -354,6 +354,23 @@ class CertificateAuthoritiesTestCase(CATestCommon):
resp = self.ca_behaviors.get_ca(ca_ref)
self.assertEqual(404, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_and_delete_snakeoil_subca_and_artifacts(self):
ca_model = self.get_subca_model(self.get_snakeoil_root_ca_ref())
resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a)
self.assertEqual(201, resp.status_code)
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(200, resp.status_code)
self.ca_behaviors.delete_ca(ca_ref, user_name=admin_a)
resp = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(404, resp.status_code)
resp = self.ca_behaviors.get_ca(ca_ref, user_name=admin_a)
self.assertEqual(404, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
def test_fail_to_delete_top_level_snakeoil_ca(self):
self._fail_to_delete_top_level_ca(
@ -387,13 +404,13 @@ class CertificateAuthoritiesTestCase(CATestCommon):
def _create_subca_and_get_cacert(self, root_ca_ref):
ca_model = self.get_subca_model(root_ca_ref)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
resp, ca_ref = self.ca_behaviors.create_ca(ca_model, user_name=admin_a)
self.assertEqual(201, resp.status_code)
resp = self.ca_behaviors.get_cacert(ca_ref)
resp = self.ca_behaviors.get_cacert(ca_ref, user_name=admin_a)
self.assertEqual(200, resp.status_code)
crypto.load_certificate(crypto.FILETYPE_PEM, resp.text)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref)
resp = self.ca_behaviors.delete_ca(ca_ref=ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
@ -509,11 +526,11 @@ class ProjectCATestCase(CATestCommon):
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
# Getting list of CAs should get only the project CA for admin
# Getting list of CAs should get only the project CA for all users
(resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas(
user_name=admin_a)
self.assertEqual(1, project_ca_total)
# Getting list of CAs should get only the project CA for non-admin
# Getting list of CAs should get only the project CA for all users
(resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas(
user_name=creator_a)
self.assertEqual(1, project_ca_total)