diff --git a/barbican/api/controllers/cas.py b/barbican/api/controllers/cas.py index 6cb10f91e..0160c2350 100644 --- a/barbican/api/controllers/cas.py +++ b/barbican/api/controllers/cas.py @@ -78,6 +78,7 @@ class CertificateAuthorityController(controllers.ACLMixin): return self.ca.to_dict_fields() @pecan.expose() + @utils.allow_all_content_types @controllers.handle_exceptions(u._('CA Signing Cert retrieval')) @controllers.enforce_rbac('certificate_authority:get_cacert') def cacert(self, external_project_id): @@ -224,6 +225,14 @@ class CertificateAuthorityController(controllers.ACLMixin): global_preferred_ca[0].id, external_project_id) + @index.when(method='DELETE') + @utils.allow_all_content_types + @controllers.handle_exceptions(u._('CA deletion')) + @controllers.enforce_rbac('certificate_authority:delete') + def on_delete(self, external_project_id, **kwargs): + cert_resources.delete_subordinate_ca(external_project_id, self.ca) + LOG.info(u._LI('Deleted CA for project: %s'), external_project_id) + class CertificateAuthoritiesController(controllers.ACLMixin): """Handles certificate authority list requests.""" diff --git a/barbican/common/exception.py b/barbican/common/exception.py index d95629195..89d56db60 100644 --- a/barbican/common/exception.py +++ b/barbican/common/exception.py @@ -462,3 +462,13 @@ class SubCAsNotSupported(BarbicanHTTPException): class SubCANotCreated(BarbicanHTTPException): message = u._("Errors in creating subordinate CA: %(name)") client_message = message + + +class CannotDeleteBaseCA(BarbicanHTTPException): + message = u._("Only subordinate CAs can be deleted.") + status_code = 403 + + +class UnauthorizedSubCADelete(BarbicanHTTPException): + message = u._("Subordinate CA is not owned by this project") + status_code = 403 diff --git a/barbican/plugin/interface/certificate_manager.py b/barbican/plugin/interface/certificate_manager.py index 29adae777..0d4b729bc 100644 --- a/barbican/plugin/interface/certificate_manager.py +++ b/barbican/plugin/interface/certificate_manager.py @@ -100,7 +100,7 @@ PLUGIN_CA_ID = "plugin_ca_id" # fields for ca_info dict keys INFO_NAME = "name" INFO_DESCRIPTION = "description" -INFO_CA_SIGNING_CERT = "ca_signing_cert" +INFO_CA_SIGNING_CERT = "ca_signing_certificate" INFO_INTERMEDIATES = "intermediates" INFO_EXPIRATION = "expiration" @@ -455,6 +455,17 @@ class CertificatePluginBase(object): """ raise NotImplementedError # pragma: no cover + def delete_ca(self, ca_id): + """Deletes a subordinate CA + + Like the create_ca call, this should only be made if the plugin + returns Ture for supports_create_ca() + + :param ca_id: id for the CA as specified by the plugin + :return: None + """ + raise NotImplementedError # pragma: no cover + class CACreateDTO(object): """Class that includes data needed to create a subordinate CA """ diff --git a/barbican/plugin/snakeoil_ca.py b/barbican/plugin/snakeoil_ca.py index 170bc27cb..22d5b8f3e 100644 --- a/barbican/plugin/snakeoil_ca.py +++ b/barbican/plugin/snakeoil_ca.py @@ -364,3 +364,15 @@ class SnakeoilCACertificatePlugin(cert_manager.CertificatePluginBase): ret[ca_id] = ca_info return ret + + def delete_ca(self, ca_id): + self.cas.pop(ca_id) + + cert_path = os.path.join(self.subca_directory, ca_id + ".cert") + key_path = os.path.join(self.subca_directory, ca_id + ".key") + + if os.path.exists(key_path): + os.remove(key_path) + + if os.path.exists(cert_path): + os.remove(cert_path) diff --git a/barbican/tasks/certificate_resources.py b/barbican/tasks/certificate_resources.py index cc0e708eb..ebbbbb674 100644 --- a/barbican/tasks/certificate_resources.py +++ b/barbican/tasks/certificate_resources.py @@ -225,6 +225,34 @@ def create_subordinate_ca(project_model, name, description, subject_dn, return new_ca +def delete_subordinate_ca(external_project_id, ca): + """Deletes a subordinate CA + + :param external_project_id: external project ID + :param ca: class:`models.CertificateAuthority` to be deleted + :return: None + """ + # TODO(alee) See if the checks below can be moved to the RBAC code + if ca.project_id is None: + raise excep.CannotDeleteBaseCA() + + project = repos.get_project_repository().find_by_external_project_id( + external_project_id) + if ca.project_id != project.id: + raise excep.UnauthorizedSubCADelete() + + cert_plugin = cert.CertificatePluginManager().get_plugin_by_name( + ca.plugin_name) + + cert_plugin.delete_ca(ca.plugin_ca_id) + + # Delete the CA from the data model. + ca_repo = repos.get_ca_repository() + ca_repo.delete_entity_by_id( + entity_id=ca.id, + external_project_id=external_project_id) + + def _handle_task_result(result, result_follow_on, order_model, project_model, request_type, unavailable_status): if cert.CertificateStatus.WAITING_FOR_CA == result.status: diff --git a/barbican/tests/api/controllers/test_cas.py b/barbican/tests/api/controllers/test_cas.py index dcc767beb..2bc75f802 100644 --- a/barbican/tests/api/controllers/test_cas.py +++ b/barbican/tests/api/controllers/test_cas.py @@ -15,6 +15,7 @@ import mock from six import moves +from barbican.common import exception from barbican.common import hrefs from barbican.common import resources as res from barbican.model import models @@ -288,8 +289,42 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase): '/cas', self.subca_request, expect_errors=False) + self.assertEqual(201, resp.status_int) + def test_should_raise_delete_subca_not_found(self): + self.create_cas() + resp = self.app.delete('/cas/foobar', expect_errors=True) + self.assertEqual(404, resp.status_int) + + @mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca') + def test_should_delete_subca(self, mocked_task): + self.create_cas() + resp = self.app.delete('/cas/' + self.subca.id) + mocked_task.assert_called_once_with(self.project_id, + self.subca) + self.assertEqual(204, resp.status_int) + + @mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca') + def test_should_raise_delete_not_a_subca(self, mocked_task): + self.create_cas() + mocked_task.side_effect = exception.CannotDeleteBaseCA() + resp = self.app.delete('/cas/' + self.subca.id, + expect_errors=True) + mocked_task.assert_called_once_with(self.project_id, + self.subca) + self.assertEqual(403, resp.status_int) + + @mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca') + def test_should_raise_delete_not_authorized(self, mocked_task): + self.create_cas() + mocked_task.side_effect = exception.UnauthorizedSubCADelete() + resp = self.app.delete('/cas/' + self.subca.id, + expect_errors=True) + mocked_task.assert_called_once_with(self.project_id, + self.subca) + self.assertEqual(403, resp.status_int) + def create_subca_request(self, parent_ca_id): self.subca_request = { 'name': "Subordinate CA", @@ -372,6 +407,24 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase): self.selected_signing_cert = 'ZZZZZ' + str(ca_id) self.selected_intermediates = 'YYYYY' + str(ca_id) + # create subca for DELETE testing + parsed_ca = { + 'plugin_name': self.plugin_name, + 'plugin_ca_id': self.plugin_ca_id + "subca 1", + 'name': self.plugin_name, + 'description': 'Sub CA for default plugin', + 'ca_signing_certificate': 'ZZZZZ' + "sub ca1", + 'intermediates': 'YYYYY' + "sub ca1", + 'project_id': self.project_id, + 'creator_id': 'user12345' + } + ca = models.CertificateAuthority(parsed_ca) + ca_repo.create_from(ca) + ca_repo.save(ca) + self.subca = ca + + self.num_cas += 1 + def _create_url(self, external_project_id, offset_arg=None, limit_arg=None): if limit_arg: diff --git a/barbican/tests/plugin/test_snakeoil_ca.py b/barbican/tests/plugin/test_snakeoil_ca.py index 87f2f72bf..365fd5ade 100644 --- a/barbican/tests/plugin/test_snakeoil_ca.py +++ b/barbican/tests/plugin/test_snakeoil_ca.py @@ -292,6 +292,25 @@ class SnakeoilCAPluginTestCase(BaseTestCase): # TODO(alee) Verify that the ca cert has correct subject name # TODO(alee) Verify that ca cert is signed by parent CA + def test_delete_ca(self): + subca_dict = self._create_subca() + ca_id = subca_dict.get(cm.PLUGIN_CA_ID) + self.assertIsNotNone(ca_id) + + cert_path = os.path.join(self.subca_cert_key_directory, + ca_id + ".cert") + key_path = os.path.join(self.subca_cert_key_directory, + ca_id + ".key") + self.assertTrue(os.path.exists(cert_path)) + self.assertTrue(os.path.exists(key_path)) + + self.plugin.delete_ca(ca_id) + self.assertFalse(os.path.exists(cert_path)) + self.assertFalse(os.path.exists(key_path)) + + cas = self.plugin.get_ca_info() + self.assertNotIn(ca_id, cas.keys()) + def test_raises_no_parent_id_passed_in(self): create_ca_dto = cm.CACreateDTO( name="sub ca1", diff --git a/barbican/tests/tasks/test_certificate_resources.py b/barbican/tests/tasks/test_certificate_resources.py index c77339b72..3c9b34252 100644 --- a/barbican/tests/tasks/test_certificate_resources.py +++ b/barbican/tests/tasks/test_certificate_resources.py @@ -837,8 +837,9 @@ class WhenCreatingSubordinateCAs(utils.BaseTestCase): def setUp(self): super(WhenCreatingSubordinateCAs, self).setUp() - self.project = models.Project() - self.project.id = '12345' + self.project = res.get_or_create_project('12345') + self.project2 = res.get_or_create_project('56789') + self.subject_name = "cn=subca1 signing certificate, o=example.com" self.creator_id = "user12345" self.name = "Subordinate CA #1" @@ -932,6 +933,43 @@ class WhenCreatingSubordinateCAs(utils.BaseTestCase): creator_id=self.creator_id ) + def test_should_delete_subca(self): + subca = cert_res.create_subordinate_ca( + project_model=self.project, + name=self.name, + description=self.description, + subject_dn=self.subject_name, + parent_ca_ref=self.parent_ca_ref, + creator_id=self.creator_id + ) + self.assertIsInstance(subca, models.CertificateAuthority) + cert_res.delete_subordinate_ca(self.project.external_id, subca) + self.cert_plugin.delete_ca.assert_called_once_with(subca.plugin_ca_id) + + def test_should_raise_cannot_delete_base_ca(self): + self.assertRaises( + excep.CannotDeleteBaseCA, + cert_res.delete_subordinate_ca, + self.project.external_id, + self.parent_ca + ) + + def test_should_raise_unauthorized_subca_delete(self): + subca = cert_res.create_subordinate_ca( + project_model=self.project, + name=self.name, + description=self.description, + subject_dn=self.subject_name, + parent_ca_ref=self.parent_ca_ref, + creator_id=self.creator_id + ) + self.assertRaises( + excep.UnauthorizedSubCADelete, + cert_res.delete_subordinate_ca, + self.project2.external_id, + subca + ) + def _config_cert_plugin(self): """Mock the certificate plugin manager.""" cert_plugin_config = { diff --git a/etc/barbican/policy.json b/etc/barbican/policy.json index 6c27c0c3b..3f0f6fe03 100644 --- a/etc/barbican/policy.json +++ b/etc/barbican/policy.json @@ -50,17 +50,18 @@ "transport_key:delete": "rule:admin", "transport_keys:get": "rule:all_users", "transport_keys:post": "rule:admin", - "certificate_authorities:get": "rule:all_but_audit", - "certificate_authorities:post": "rule:all_but_audit", + "certificate_authorities:get": "rule:all_users", + "certificate_authorities:post": "rule:admin", + "certificate_authority:delete": "rule:admin", "certificate_authority:get": "rule:all_users", "certificate_authority:get_cacert": "rule:all_users", "certificate_authority:get_ca_cert_chain": "rule:all_users", - "certificate_authority:get_projects": "rule:admin", + "certificate_authority:get_projects": "rule:service_admin", "certificate_authority:add_to_project": "rule:admin", "certificate_authority:remove_from_project": "rule:admin", "certificate_authority:set_preferred": "rule:admin", - "certificate_authority:set_global_preferred": "rule:admin", - "certificate_authority:unset_global_preferred": "rule:admin", + "certificate_authority:set_global_preferred": "rule:service_admin", + "certificate_authority:unset_global_preferred": "rule:service_admin", "certificate_authority:get_global_preferred": "rule:all_users", "certificate_authority:get_preferred_ca": "rule:all_users", "secret_acls:put_patch": "rule:secret_project_admin or rule:secret_project_creator", diff --git a/functionaltests/api/v1/behaviors/ca_behaviors.py b/functionaltests/api/v1/behaviors/ca_behaviors.py index 3ad1bc348..b178220be 100644 --- a/functionaltests/api/v1/behaviors/ca_behaviors.py +++ b/functionaltests/api/v1/behaviors/ca_behaviors.py @@ -30,10 +30,11 @@ class CABehaviors(base_behaviors.BaseBehaviors): response_model_type=ca_models.CAModel, extra_headers=extra_headers) - def get_cacert(self, ca_ref, payload_content_type, - payload_content_encoding=None, extra_headers=None, + def get_cacert(self, ca_ref, payload_content_encoding=None, + extra_headers=None, use_auth=True, user_name=None): - headers = {'Accept': payload_content_type, + """Retrieve the CA signing certificate. """ + headers = {'Accept': 'application/octet-stream', 'Accept-Encoding': payload_content_encoding} if extra_headers: headers.update(extra_headers) diff --git a/functionaltests/api/v1/functional/test_cas.py b/functionaltests/api/v1/functional/test_cas.py index 1c2cb3e92..98eaa8988 100644 --- a/functionaltests/api/v1/functional/test_cas.py +++ b/functionaltests/api/v1/functional/test_cas.py @@ -48,7 +48,7 @@ def is_snakeoil_enabled(): def convert_to_X509Name(dn): - target = crypto.X509Name() + target = crypto.X509().get_subject() fields = dn.split(',') for field in fields: m = re.search(r"(\w+)\s*=\s*(.+)", field.strip()) @@ -96,7 +96,7 @@ class CertificateAuthoritiesTestCase(base.TestCase): def get_signing_cert(self, ca_ref): resp = self.ca_behaviors.get_cacert(ca_ref) - return crypto.load_certificate(crypto.FILETYPE_PEM, resp) + return crypto.load_certificate(crypto.FILETYPE_PEM, resp.text) def verify_signing_cert(self, ca_ref, subject_dn, issuer_dn): cacert = self.get_signing_cert(ca_ref) @@ -163,14 +163,13 @@ class CertificateAuthoritiesTestCase(base.TestCase): resp, ca_ref = self.ca_behaviors.create_ca(ca_model) self.assertEqual(201, resp.status_code) - # TODO(alee) Get this additional test code working - # root_subject = self.get_signing_cert( - # self.get_root_ca_ref()).get_subject() + root_subject = self.get_signing_cert( + self.get_root_ca_ref()).get_subject() - # self.verify_signing_cert( - # ca_ref=ca_ref, - # subject_dn=convert_to_X509Name(self.subca_subject), - # issuer_dn=root_subject) + self.verify_signing_cert( + ca_ref=ca_ref, + subject_dn=convert_to_X509Name(self.subca_subject), + issuer_dn=root_subject) @testtools.skipIf(not is_snakeoil_enabled(), "This test is only usable with snakeoil") @@ -183,12 +182,11 @@ class CertificateAuthoritiesTestCase(base.TestCase): resp, child_ref = self.ca_behaviors.create_ca(child_model) self.assertEqual(201, resp.status_code) - # TODO(alee) Get this additional test code working - # parent_subject = self.get_signing_cert(parent_ref).get_subject() - # self.verify_signing_cert( - # ca_ref=child_ref, - # subject_dn=convert_to_X509Name(self.subca_subca_subject), - # issuer_dn=parent_subject) + parent_subject = self.get_signing_cert(parent_ref).get_subject() + self.verify_signing_cert( + ca_ref=child_ref, + subject_dn=convert_to_X509Name(self.subca_subca_subject), + issuer_dn=parent_subject) def test_create_subca_with_invalid_parent_ca_id(self): ca_model = self.get_snakeoil_subca_model() @@ -215,3 +213,30 @@ class CertificateAuthoritiesTestCase(base.TestCase): resp, ca_ref = self.ca_behaviors.create_ca(ca_model) self.assertEqual(201, resp.status_code) self.send_test_order(ca_ref) + + @testtools.skipIf(not is_snakeoil_enabled(), + "This test is only usable with snakeoil") + def test_create_and_delete_snakeoil_subca(self): + ca_model = self.get_snakeoil_subca_model() + resp, ca_ref = self.ca_behaviors.create_ca(ca_model) + self.assertEqual(201, resp.status_code) + + self.ca_behaviors.delete_ca(ca_ref) + resp = self.ca_behaviors.get_ca(ca_ref) + self.assertEqual(404, resp.status_code) + + @testtools.skipIf(not is_snakeoil_enabled(), + "This test is only usable with snakeoil") + def test_fail_to_delete_top_level_snakeoil_ca(self): + resp = self.ca_behaviors.delete_ca(self.get_root_ca_ref()) + self.assertEqual(403, resp.status_code) + + @testtools.skipIf(not is_snakeoil_enabled(), + "This test is only usable with snakeoil") + def test_create_snakeoil_subca_and_get_cacert(self): + ca_model = self.get_snakeoil_subca_model() + resp, ca_ref = self.ca_behaviors.create_ca(ca_model) + self.assertEqual(201, resp.status_code) + resp = self.ca_behaviors.get_cacert(ca_ref) + self.assertEqual(200, resp.status_code) + crypto.load_certificate(crypto.FILETYPE_PEM, resp.text)