Add DELETE functionality for subCAs

Also adds unit and functional tests for DELETE and adds
a required decorator for retrieving the cacert.  Also, fixes
some TODOs in the functional tests.

Change-Id: I4ed2955ee0bbcee00b482c21b5200e82f2b79133
Partially-implements: blueprint add-cas
This commit is contained in:
Ade Lee 2015-09-11 01:56:56 -04:00
parent 231e0a80dd
commit 48550f1d58
11 changed files with 233 additions and 26 deletions

View File

@ -78,6 +78,7 @@ class CertificateAuthorityController(controllers.ACLMixin):
return self.ca.to_dict_fields()
@pecan.expose()
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('CA Signing Cert retrieval'))
@controllers.enforce_rbac('certificate_authority:get_cacert')
def cacert(self, external_project_id):
@ -224,6 +225,14 @@ class CertificateAuthorityController(controllers.ACLMixin):
global_preferred_ca[0].id,
external_project_id)
@index.when(method='DELETE')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('CA deletion'))
@controllers.enforce_rbac('certificate_authority:delete')
def on_delete(self, external_project_id, **kwargs):
cert_resources.delete_subordinate_ca(external_project_id, self.ca)
LOG.info(u._LI('Deleted CA for project: %s'), external_project_id)
class CertificateAuthoritiesController(controllers.ACLMixin):
"""Handles certificate authority list requests."""

View File

@ -462,3 +462,13 @@ class SubCAsNotSupported(BarbicanHTTPException):
class SubCANotCreated(BarbicanHTTPException):
message = u._("Errors in creating subordinate CA: %(name)")
client_message = message
class CannotDeleteBaseCA(BarbicanHTTPException):
message = u._("Only subordinate CAs can be deleted.")
status_code = 403
class UnauthorizedSubCADelete(BarbicanHTTPException):
message = u._("Subordinate CA is not owned by this project")
status_code = 403

View File

@ -100,7 +100,7 @@ PLUGIN_CA_ID = "plugin_ca_id"
# fields for ca_info dict keys
INFO_NAME = "name"
INFO_DESCRIPTION = "description"
INFO_CA_SIGNING_CERT = "ca_signing_cert"
INFO_CA_SIGNING_CERT = "ca_signing_certificate"
INFO_INTERMEDIATES = "intermediates"
INFO_EXPIRATION = "expiration"
@ -455,6 +455,17 @@ class CertificatePluginBase(object):
"""
raise NotImplementedError # pragma: no cover
def delete_ca(self, ca_id):
"""Deletes a subordinate CA
Like the create_ca call, this should only be made if the plugin
returns Ture for supports_create_ca()
:param ca_id: id for the CA as specified by the plugin
:return: None
"""
raise NotImplementedError # pragma: no cover
class CACreateDTO(object):
"""Class that includes data needed to create a subordinate CA """

View File

@ -364,3 +364,15 @@ class SnakeoilCACertificatePlugin(cert_manager.CertificatePluginBase):
ret[ca_id] = ca_info
return ret
def delete_ca(self, ca_id):
self.cas.pop(ca_id)
cert_path = os.path.join(self.subca_directory, ca_id + ".cert")
key_path = os.path.join(self.subca_directory, ca_id + ".key")
if os.path.exists(key_path):
os.remove(key_path)
if os.path.exists(cert_path):
os.remove(cert_path)

View File

@ -225,6 +225,34 @@ def create_subordinate_ca(project_model, name, description, subject_dn,
return new_ca
def delete_subordinate_ca(external_project_id, ca):
"""Deletes a subordinate CA
:param external_project_id: external project ID
:param ca: class:`models.CertificateAuthority` to be deleted
:return: None
"""
# TODO(alee) See if the checks below can be moved to the RBAC code
if ca.project_id is None:
raise excep.CannotDeleteBaseCA()
project = repos.get_project_repository().find_by_external_project_id(
external_project_id)
if ca.project_id != project.id:
raise excep.UnauthorizedSubCADelete()
cert_plugin = cert.CertificatePluginManager().get_plugin_by_name(
ca.plugin_name)
cert_plugin.delete_ca(ca.plugin_ca_id)
# Delete the CA from the data model.
ca_repo = repos.get_ca_repository()
ca_repo.delete_entity_by_id(
entity_id=ca.id,
external_project_id=external_project_id)
def _handle_task_result(result, result_follow_on, order_model,
project_model, request_type, unavailable_status):
if cert.CertificateStatus.WAITING_FOR_CA == result.status:

View File

@ -15,6 +15,7 @@
import mock
from six import moves
from barbican.common import exception
from barbican.common import hrefs
from barbican.common import resources as res
from barbican.model import models
@ -288,8 +289,42 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
'/cas',
self.subca_request,
expect_errors=False)
self.assertEqual(201, resp.status_int)
def test_should_raise_delete_subca_not_found(self):
self.create_cas()
resp = self.app.delete('/cas/foobar', expect_errors=True)
self.assertEqual(404, resp.status_int)
@mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca')
def test_should_delete_subca(self, mocked_task):
self.create_cas()
resp = self.app.delete('/cas/' + self.subca.id)
mocked_task.assert_called_once_with(self.project_id,
self.subca)
self.assertEqual(204, resp.status_int)
@mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca')
def test_should_raise_delete_not_a_subca(self, mocked_task):
self.create_cas()
mocked_task.side_effect = exception.CannotDeleteBaseCA()
resp = self.app.delete('/cas/' + self.subca.id,
expect_errors=True)
mocked_task.assert_called_once_with(self.project_id,
self.subca)
self.assertEqual(403, resp.status_int)
@mock.patch('barbican.tasks.certificate_resources.delete_subordinate_ca')
def test_should_raise_delete_not_authorized(self, mocked_task):
self.create_cas()
mocked_task.side_effect = exception.UnauthorizedSubCADelete()
resp = self.app.delete('/cas/' + self.subca.id,
expect_errors=True)
mocked_task.assert_called_once_with(self.project_id,
self.subca)
self.assertEqual(403, resp.status_int)
def create_subca_request(self, parent_ca_id):
self.subca_request = {
'name': "Subordinate CA",
@ -372,6 +407,24 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
self.selected_signing_cert = 'ZZZZZ' + str(ca_id)
self.selected_intermediates = 'YYYYY' + str(ca_id)
# create subca for DELETE testing
parsed_ca = {
'plugin_name': self.plugin_name,
'plugin_ca_id': self.plugin_ca_id + "subca 1",
'name': self.plugin_name,
'description': 'Sub CA for default plugin',
'ca_signing_certificate': 'ZZZZZ' + "sub ca1",
'intermediates': 'YYYYY' + "sub ca1",
'project_id': self.project_id,
'creator_id': 'user12345'
}
ca = models.CertificateAuthority(parsed_ca)
ca_repo.create_from(ca)
ca_repo.save(ca)
self.subca = ca
self.num_cas += 1
def _create_url(self, external_project_id, offset_arg=None,
limit_arg=None):
if limit_arg:

View File

@ -292,6 +292,25 @@ class SnakeoilCAPluginTestCase(BaseTestCase):
# TODO(alee) Verify that the ca cert has correct subject name
# TODO(alee) Verify that ca cert is signed by parent CA
def test_delete_ca(self):
subca_dict = self._create_subca()
ca_id = subca_dict.get(cm.PLUGIN_CA_ID)
self.assertIsNotNone(ca_id)
cert_path = os.path.join(self.subca_cert_key_directory,
ca_id + ".cert")
key_path = os.path.join(self.subca_cert_key_directory,
ca_id + ".key")
self.assertTrue(os.path.exists(cert_path))
self.assertTrue(os.path.exists(key_path))
self.plugin.delete_ca(ca_id)
self.assertFalse(os.path.exists(cert_path))
self.assertFalse(os.path.exists(key_path))
cas = self.plugin.get_ca_info()
self.assertNotIn(ca_id, cas.keys())
def test_raises_no_parent_id_passed_in(self):
create_ca_dto = cm.CACreateDTO(
name="sub ca1",

View File

@ -837,8 +837,9 @@ class WhenCreatingSubordinateCAs(utils.BaseTestCase):
def setUp(self):
super(WhenCreatingSubordinateCAs, self).setUp()
self.project = models.Project()
self.project.id = '12345'
self.project = res.get_or_create_project('12345')
self.project2 = res.get_or_create_project('56789')
self.subject_name = "cn=subca1 signing certificate, o=example.com"
self.creator_id = "user12345"
self.name = "Subordinate CA #1"
@ -932,6 +933,43 @@ class WhenCreatingSubordinateCAs(utils.BaseTestCase):
creator_id=self.creator_id
)
def test_should_delete_subca(self):
subca = cert_res.create_subordinate_ca(
project_model=self.project,
name=self.name,
description=self.description,
subject_dn=self.subject_name,
parent_ca_ref=self.parent_ca_ref,
creator_id=self.creator_id
)
self.assertIsInstance(subca, models.CertificateAuthority)
cert_res.delete_subordinate_ca(self.project.external_id, subca)
self.cert_plugin.delete_ca.assert_called_once_with(subca.plugin_ca_id)
def test_should_raise_cannot_delete_base_ca(self):
self.assertRaises(
excep.CannotDeleteBaseCA,
cert_res.delete_subordinate_ca,
self.project.external_id,
self.parent_ca
)
def test_should_raise_unauthorized_subca_delete(self):
subca = cert_res.create_subordinate_ca(
project_model=self.project,
name=self.name,
description=self.description,
subject_dn=self.subject_name,
parent_ca_ref=self.parent_ca_ref,
creator_id=self.creator_id
)
self.assertRaises(
excep.UnauthorizedSubCADelete,
cert_res.delete_subordinate_ca,
self.project2.external_id,
subca
)
def _config_cert_plugin(self):
"""Mock the certificate plugin manager."""
cert_plugin_config = {

View File

@ -50,17 +50,18 @@
"transport_key:delete": "rule:admin",
"transport_keys:get": "rule:all_users",
"transport_keys:post": "rule:admin",
"certificate_authorities:get": "rule:all_but_audit",
"certificate_authorities:post": "rule:all_but_audit",
"certificate_authorities:get": "rule:all_users",
"certificate_authorities:post": "rule:admin",
"certificate_authority:delete": "rule:admin",
"certificate_authority:get": "rule:all_users",
"certificate_authority:get_cacert": "rule:all_users",
"certificate_authority:get_ca_cert_chain": "rule:all_users",
"certificate_authority:get_projects": "rule:admin",
"certificate_authority:get_projects": "rule:service_admin",
"certificate_authority:add_to_project": "rule:admin",
"certificate_authority:remove_from_project": "rule:admin",
"certificate_authority:set_preferred": "rule:admin",
"certificate_authority:set_global_preferred": "rule:admin",
"certificate_authority:unset_global_preferred": "rule:admin",
"certificate_authority:set_global_preferred": "rule:service_admin",
"certificate_authority:unset_global_preferred": "rule:service_admin",
"certificate_authority:get_global_preferred": "rule:all_users",
"certificate_authority:get_preferred_ca": "rule:all_users",
"secret_acls:put_patch": "rule:secret_project_admin or rule:secret_project_creator",

View File

@ -30,10 +30,11 @@ class CABehaviors(base_behaviors.BaseBehaviors):
response_model_type=ca_models.CAModel,
extra_headers=extra_headers)
def get_cacert(self, ca_ref, payload_content_type,
payload_content_encoding=None, extra_headers=None,
def get_cacert(self, ca_ref, payload_content_encoding=None,
extra_headers=None,
use_auth=True, user_name=None):
headers = {'Accept': payload_content_type,
"""Retrieve the CA signing certificate. """
headers = {'Accept': 'application/octet-stream',
'Accept-Encoding': payload_content_encoding}
if extra_headers:
headers.update(extra_headers)

View File

@ -48,7 +48,7 @@ def is_snakeoil_enabled():
def convert_to_X509Name(dn):
target = crypto.X509Name()
target = crypto.X509().get_subject()
fields = dn.split(',')
for field in fields:
m = re.search(r"(\w+)\s*=\s*(.+)", field.strip())
@ -96,7 +96,7 @@ class CertificateAuthoritiesTestCase(base.TestCase):
def get_signing_cert(self, ca_ref):
resp = self.ca_behaviors.get_cacert(ca_ref)
return crypto.load_certificate(crypto.FILETYPE_PEM, resp)
return crypto.load_certificate(crypto.FILETYPE_PEM, resp.text)
def verify_signing_cert(self, ca_ref, subject_dn, issuer_dn):
cacert = self.get_signing_cert(ca_ref)
@ -163,14 +163,13 @@ class CertificateAuthoritiesTestCase(base.TestCase):
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
# TODO(alee) Get this additional test code working
# root_subject = self.get_signing_cert(
# self.get_root_ca_ref()).get_subject()
root_subject = self.get_signing_cert(
self.get_root_ca_ref()).get_subject()
# self.verify_signing_cert(
# ca_ref=ca_ref,
# subject_dn=convert_to_X509Name(self.subca_subject),
# issuer_dn=root_subject)
self.verify_signing_cert(
ca_ref=ca_ref,
subject_dn=convert_to_X509Name(self.subca_subject),
issuer_dn=root_subject)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
@ -183,12 +182,11 @@ class CertificateAuthoritiesTestCase(base.TestCase):
resp, child_ref = self.ca_behaviors.create_ca(child_model)
self.assertEqual(201, resp.status_code)
# TODO(alee) Get this additional test code working
# parent_subject = self.get_signing_cert(parent_ref).get_subject()
# self.verify_signing_cert(
# ca_ref=child_ref,
# subject_dn=convert_to_X509Name(self.subca_subca_subject),
# issuer_dn=parent_subject)
parent_subject = self.get_signing_cert(parent_ref).get_subject()
self.verify_signing_cert(
ca_ref=child_ref,
subject_dn=convert_to_X509Name(self.subca_subca_subject),
issuer_dn=parent_subject)
def test_create_subca_with_invalid_parent_ca_id(self):
ca_model = self.get_snakeoil_subca_model()
@ -215,3 +213,30 @@ class CertificateAuthoritiesTestCase(base.TestCase):
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
self.send_test_order(ca_ref)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
def test_create_and_delete_snakeoil_subca(self):
ca_model = self.get_snakeoil_subca_model()
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
self.ca_behaviors.delete_ca(ca_ref)
resp = self.ca_behaviors.get_ca(ca_ref)
self.assertEqual(404, resp.status_code)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
def test_fail_to_delete_top_level_snakeoil_ca(self):
resp = self.ca_behaviors.delete_ca(self.get_root_ca_ref())
self.assertEqual(403, resp.status_code)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
def test_create_snakeoil_subca_and_get_cacert(self):
ca_model = self.get_snakeoil_subca_model()
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
resp = self.ca_behaviors.get_cacert(ca_ref)
self.assertEqual(200, resp.status_code)
crypto.load_certificate(crypto.FILETYPE_PEM, resp.text)