Add functional test for project CA

Added a test that whole scenario for the add project CA/remove project
CA.

On the other hand, this fixed an issue in the add/remove project
CA controllers.

Change-Id: Ic1963ab32fe28925361ebe9760d4a8a5d32b5f0b
This commit is contained in:
Juan Antonio Osorio 2015-09-11 13:59:12 +02:00 committed by Ade Lee
parent 48550f1d58
commit 641aa44fe8
4 changed files with 165 additions and 71 deletions

View File

@ -112,6 +112,7 @@ class CertificateAuthorityController(controllers.ACLMixin):
return ca_projects_resp
@pecan.expose()
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Add CA to project'))
@controllers.enforce_rbac('certificate_authority:add_to_project')
def add_to_project(self, external_project_id):
@ -140,6 +141,7 @@ class CertificateAuthorityController(controllers.ACLMixin):
self.preferred_ca_repo.create_from(preferred_ca)
@pecan.expose()
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Remove CA from project'))
@controllers.enforce_rbac('certificate_authority:remove_from_project')
def remove_from_project(self, external_project_id):

View File

@ -43,7 +43,7 @@ class CABehaviors(base_behaviors.BaseBehaviors):
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
def get_cas(self, limit=10, offset=0):
def get_cas(self, limit=10, offset=0, user_name=None):
"""Handles getting a list of CAs.
:param limit: limits number of returned CAs
@ -52,8 +52,8 @@ class CABehaviors(base_behaviors.BaseBehaviors):
:return: the response, a list of cas, total number of cas, next and
prev references
"""
resp = self.client.get('cas', params={'limit': limit,
'offset': offset})
resp = self.client.get('cas', user_name=user_name,
params={'limit': limit, 'offset': offset})
# TODO(alee) refactor to use he client's get_list_of_models()
@ -126,3 +126,17 @@ class CABehaviors(base_behaviors.BaseBehaviors):
entities = list(self.created_entities)
for (ca_ref, admin) in entities:
self.delete_ca(ca_ref, user_name=admin)
def add_ca_to_project(self, ca_ref, headers=None, use_auth=True,
user_name=None):
resp = self.client.post(ca_ref + '/add-to-project',
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
return resp
def remove_ca_from_project(self, ca_ref, headers=None, use_auth=True,
user_name=None):
resp = self.client.post(ca_ref + '/remove-from-project',
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
return resp

View File

@ -28,6 +28,12 @@ from functionaltests.api.v1.behaviors import ca_behaviors
from functionaltests.api.v1.behaviors import order_behaviors
from functionaltests.api.v1.models import ca_models
from functionaltests.api.v1.models import order_models
from functionaltests.common import config
CONF = config.get_config()
admin_a = CONF.rbac_users.admin_a
creator_a = CONF.rbac_users.creator_a
order_simple_cmc_request_data = {
'type': 'certificate',
@ -40,11 +46,23 @@ order_simple_cmc_request_data = {
}
CONF = cert_interface.CONF
BARBICAN_SRV_CONF = cert_interface.CONF
def is_snakeoil_enabled():
return 'snakeoil_ca' in CONF.certificate.enabled_certificate_plugins
def is_plugin_enabled(plugin):
return plugin in BARBICAN_SRV_CONF.certificate.enabled_certificate_plugins
def depends_on_ca_plugins(*plugins):
def depends_on_ca_plugins_decorator(function):
def wrapper(instance, *args, **kwargs):
plugins_enabled = (is_plugin_enabled(p) for p in plugins)
if not all(plugins_enabled):
instance.skipTest("The following plugin(s) need to be "
"enabled: ".format(plugins))
function(instance, *args, **kwargs)
return wrapper
return depends_on_ca_plugins_decorator
def convert_to_X509Name(dn):
@ -67,21 +85,12 @@ def convert_to_X509Name(dn):
return target
class CertificateAuthoritiesTestCase(base.TestCase):
class CATestCommon(base.TestCase):
def setUp(self):
super(CertificateAuthoritiesTestCase, self).setUp()
super(CATestCommon, self).setUp()
self.order_behaviors = order_behaviors.OrderBehaviors(self.client)
self.ca_behaviors = ca_behaviors.CABehaviors(self.client)
self.root_ca_ref = None
self.subca_subject = "CN=Subordinate CA, O=example.com"
self.subca_name = "Subordinate CA"
self.subca_description = "Test Snake Oil Subordinate CA"
self.subca_subca_subject = "CN=sub-sub CA, O=example.com"
self.subca_subca_name = "Sub-Sub CA"
self.subca_subca_description = "Test Snake Oil Sub-Sub CA"
self.simple_cmc_data = copy.deepcopy(order_simple_cmc_request_data)
@ -92,7 +101,43 @@ class CertificateAuthoritiesTestCase(base.TestCase):
def tearDown(self):
self.order_behaviors.delete_all_created_orders()
self.ca_behaviors.delete_all_created_cas()
super(CertificateAuthoritiesTestCase, self).tearDown()
super(CATestCommon, self).tearDown()
def send_test_order(self, ca_ref=None):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
if ca_ref is not None:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
test_model.meta['ca_id'] = ca_id
create_resp, order_ref = self.order_behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
def get_root_ca_ref(self, ca_plugin_name, ca_plugin_id):
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
for item in cas:
ca = self.ca_behaviors.get_ca(item)
if ca.model.plugin_name == ca_plugin_name:
if ca.model.plugin_ca_id == ca_plugin_id:
return item
return None
class CertificateAuthoritiesTestCase(CATestCommon):
def setUp(self):
super(CertificateAuthoritiesTestCase, self).setUp()
self.subca_subject = "CN=Subordinate CA, O=example.com"
self.subca_name = "Subordinate CA"
self.subca_description = "Test Snake Oil Subordinate CA"
self.subca_subca_subject = "CN=sub-sub CA, O=example.com"
self.subca_subca_name = "Sub-Sub CA"
self.subca_subca_description = "Test Snake Oil Sub-Sub CA"
def get_signing_cert(self, ca_ref):
resp = self.ca_behaviors.get_cacert(ca_ref)
@ -103,23 +148,11 @@ class CertificateAuthoritiesTestCase(base.TestCase):
return ((cacert.get_subject() == subject_dn) and
(cacert.get_issuer() == issuer_dn))
def get_root_ca_ref(self):
if self.root_ca_ref is not None:
return self.root_ca_ref
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
snake_name = 'barbican.plugin.snakeoil_ca.SnakeoilCACertificatePlugin'
snake_plugin_ca_id = "Snakeoil CA"
for item in cas:
ca = self.ca_behaviors.get_ca(item)
if ca.model.plugin_name == snake_name:
if ca.model.plugin_ca_id == snake_plugin_ca_id:
return item
return None
def get_snakeoil_subca_model(self):
parent_ca_ref = self.get_root_ca_ref()
parent_ca_ref = self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
return ca_models.CAModel(
parent_ca_ref=parent_ca_ref,
description=self.subca_description,
@ -135,44 +168,24 @@ class CertificateAuthoritiesTestCase(base.TestCase):
subject_dn=self.subca_subca_subject
)
def send_test_order(self, ca_ref=None):
test_model = order_models.OrderModel(**self.simple_cmc_data)
test_model.meta['request_data'] = base64.b64encode(
certutil.create_good_csr())
if ca_ref is not None:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
test_model.meta['ca_id'] = ca_id
create_resp, order_ref = self.order_behaviors.create_order(test_model)
self.assertEqual(202, create_resp.status_code)
self.assertIsNotNone(order_ref)
def test_list_and_get_cas(self):
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
self.assertGreater(total, 0)
for item in cas:
ca = self.ca_behaviors.get_ca(item)
self.assertIsNotNone(ca.model.plugin_name)
self.assertIsNotNone(ca.model.ca_id)
self.assertIsNotNone(ca.model.plugin_ca_id)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca(self):
ca_model = self.get_snakeoil_subca_model()
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
root_subject = self.get_signing_cert(
self.get_root_ca_ref()).get_subject()
root_ca_ref = self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
root_subject = self.get_signing_cert(root_ca_ref).get_subject()
self.verify_signing_cert(
ca_ref=ca_ref,
subject_dn=convert_to_X509Name(self.subca_subject),
issuer_dn=root_subject)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
@depends_on_ca_plugins('snakeoil_ca')
def test_create_subca_of_snakeoil_subca(self):
parent_model = self.get_snakeoil_subca_model()
resp, parent_ref = self.ca_behaviors.create_ca(parent_model)
@ -206,16 +219,14 @@ class CertificateAuthoritiesTestCase(base.TestCase):
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(400, resp.status_code)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca_and_send_cert_order(self):
ca_model = self.get_snakeoil_subca_model()
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
self.send_test_order(ca_ref)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
@depends_on_ca_plugins('snakeoil_ca')
def test_create_and_delete_snakeoil_subca(self):
ca_model = self.get_snakeoil_subca_model()
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
@ -225,14 +236,16 @@ class CertificateAuthoritiesTestCase(base.TestCase):
resp = self.ca_behaviors.get_ca(ca_ref)
self.assertEqual(404, resp.status_code)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
@depends_on_ca_plugins('snakeoil_ca')
def test_fail_to_delete_top_level_snakeoil_ca(self):
resp = self.ca_behaviors.delete_ca(self.get_root_ca_ref())
root_ca_ref = self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
resp = self.ca_behaviors.delete_ca(root_ca_ref, expected_fail=True)
self.assertEqual(403, resp.status_code)
@testtools.skipIf(not is_snakeoil_enabled(),
"This test is only usable with snakeoil")
@depends_on_ca_plugins('snakeoil_ca')
def test_create_snakeoil_subca_and_get_cacert(self):
ca_model = self.get_snakeoil_subca_model()
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
@ -240,3 +253,68 @@ class CertificateAuthoritiesTestCase(base.TestCase):
resp = self.ca_behaviors.get_cacert(ca_ref)
self.assertEqual(200, resp.status_code)
crypto.load_certificate(crypto.FILETYPE_PEM, resp.text)
class ListingCAsTestCase(CATestCommon):
"""Tests for listing CAs.
Must be in a separate class so that we can deselect them
in the parallel CA tests, until we can deselect specific tests
using a decorator.
"""
def test_list_and_get_cas(self):
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
self.assertGreater(total, 0)
for item in cas:
ca = self.ca_behaviors.get_ca(item)
self.assertIsNotNone(ca.model.plugin_name)
self.assertIsNotNone(ca.model.ca_id)
self.assertIsNotNone(ca.model.plugin_ca_id)
@depends_on_ca_plugins('snakeoil_ca', 'simple_certificate')
def test_list_snakeoil_and_simple_cert_cas(self):
"""Test if backend loads these specific CAs
Since the standard gate works with the snakeoil CA and the
simple_certificate CA. This test is just to make sure that these two
are specifically loaded.
"""
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
self.assertEqual(total, 2)
class ProjectCATestCase(CATestCommon):
def setUp(self):
super(ProjectCATestCase, self).setUp()
# @depends_on_ca_plugins('snakeoil_ca', 'simple_certificate')
@testtools.skip("re-enable once CA list code is fixed")
def test_addition_of_project_ca_affects_getting_ca_list(self):
# Getting list of CAs should get the total configured CAs
(resp, cas, initial_total, _, __) = self.ca_behaviors.get_cas()
self.assertGreater(initial_total, 0)
# Set project CA
ca_ref = self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
# Getting list of CAs should get only the project CA
(resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas(
user_name=creator_a)
self.assertEqual(1, project_ca_total)
# Remove project CA
resp = self.ca_behaviors.remove_ca_from_project(ca_ref,
user_name=admin_a)
self.assertEqual(204, resp.status_code)
# Getting list of CAs should get the total configured CAs (as seen
# before)
(resp, cas, final_total, _, __) = self.ca_behaviors.get_cas()
self.assertGreater(initial_total, final_total)

View File

@ -35,7 +35,7 @@ retval=$?
testr slowest
# run the tests in parallel
SKIP=^\(\?\!\.\*\(ProjectQuotasPagingTestCase\|QuotaEnforcementTestCase\)\)
SKIP=^\(\?\!\.\*\(ProjectQuotasPagingTestCase\|QuotaEnforcementTestCase\|ListingCAsTestCase\)\)
testr init
testr run $SKIP --parallel --subunit | subunit-trace --no-failure-debug -f
retval=$?