From 641aa44fe8ed8b73cafbb51588095d5fd05c0f09 Mon Sep 17 00:00:00 2001 From: Juan Antonio Osorio Date: Fri, 11 Sep 2015 13:59:12 +0200 Subject: [PATCH] Add functional test for project CA Added a test that whole scenario for the add project CA/remove project CA. On the other hand, this fixed an issue in the add/remove project CA controllers. Change-Id: Ic1963ab32fe28925361ebe9760d4a8a5d32b5f0b --- barbican/api/controllers/cas.py | 2 + .../api/v1/behaviors/ca_behaviors.py | 20 +- functionaltests/api/v1/functional/test_cas.py | 212 ++++++++++++------ functionaltests/run_tests.sh | 2 +- 4 files changed, 165 insertions(+), 71 deletions(-) diff --git a/barbican/api/controllers/cas.py b/barbican/api/controllers/cas.py index 0160c2350..66705361e 100644 --- a/barbican/api/controllers/cas.py +++ b/barbican/api/controllers/cas.py @@ -112,6 +112,7 @@ class CertificateAuthorityController(controllers.ACLMixin): return ca_projects_resp @pecan.expose() + @utils.allow_all_content_types @controllers.handle_exceptions(u._('Add CA to project')) @controllers.enforce_rbac('certificate_authority:add_to_project') def add_to_project(self, external_project_id): @@ -140,6 +141,7 @@ class CertificateAuthorityController(controllers.ACLMixin): self.preferred_ca_repo.create_from(preferred_ca) @pecan.expose() + @utils.allow_all_content_types @controllers.handle_exceptions(u._('Remove CA from project')) @controllers.enforce_rbac('certificate_authority:remove_from_project') def remove_from_project(self, external_project_id): diff --git a/functionaltests/api/v1/behaviors/ca_behaviors.py b/functionaltests/api/v1/behaviors/ca_behaviors.py index b178220be..f850050d1 100644 --- a/functionaltests/api/v1/behaviors/ca_behaviors.py +++ b/functionaltests/api/v1/behaviors/ca_behaviors.py @@ -43,7 +43,7 @@ class CABehaviors(base_behaviors.BaseBehaviors): extra_headers=headers, use_auth=use_auth, user_name=user_name) - def get_cas(self, limit=10, offset=0): + def get_cas(self, limit=10, offset=0, user_name=None): """Handles getting a list of CAs. :param limit: limits number of returned CAs @@ -52,8 +52,8 @@ class CABehaviors(base_behaviors.BaseBehaviors): :return: the response, a list of cas, total number of cas, next and prev references """ - resp = self.client.get('cas', params={'limit': limit, - 'offset': offset}) + resp = self.client.get('cas', user_name=user_name, + params={'limit': limit, 'offset': offset}) # TODO(alee) refactor to use he client's get_list_of_models() @@ -126,3 +126,17 @@ class CABehaviors(base_behaviors.BaseBehaviors): entities = list(self.created_entities) for (ca_ref, admin) in entities: self.delete_ca(ca_ref, user_name=admin) + + def add_ca_to_project(self, ca_ref, headers=None, use_auth=True, + user_name=None): + resp = self.client.post(ca_ref + '/add-to-project', + extra_headers=headers, use_auth=use_auth, + user_name=user_name) + return resp + + def remove_ca_from_project(self, ca_ref, headers=None, use_auth=True, + user_name=None): + resp = self.client.post(ca_ref + '/remove-from-project', + extra_headers=headers, use_auth=use_auth, + user_name=user_name) + return resp diff --git a/functionaltests/api/v1/functional/test_cas.py b/functionaltests/api/v1/functional/test_cas.py index 98eaa8988..860cb9ce4 100644 --- a/functionaltests/api/v1/functional/test_cas.py +++ b/functionaltests/api/v1/functional/test_cas.py @@ -28,6 +28,12 @@ from functionaltests.api.v1.behaviors import ca_behaviors from functionaltests.api.v1.behaviors import order_behaviors from functionaltests.api.v1.models import ca_models from functionaltests.api.v1.models import order_models +from functionaltests.common import config + +CONF = config.get_config() + +admin_a = CONF.rbac_users.admin_a +creator_a = CONF.rbac_users.creator_a order_simple_cmc_request_data = { 'type': 'certificate', @@ -40,11 +46,23 @@ order_simple_cmc_request_data = { } -CONF = cert_interface.CONF +BARBICAN_SRV_CONF = cert_interface.CONF -def is_snakeoil_enabled(): - return 'snakeoil_ca' in CONF.certificate.enabled_certificate_plugins +def is_plugin_enabled(plugin): + return plugin in BARBICAN_SRV_CONF.certificate.enabled_certificate_plugins + + +def depends_on_ca_plugins(*plugins): + def depends_on_ca_plugins_decorator(function): + def wrapper(instance, *args, **kwargs): + plugins_enabled = (is_plugin_enabled(p) for p in plugins) + if not all(plugins_enabled): + instance.skipTest("The following plugin(s) need to be " + "enabled: ".format(plugins)) + function(instance, *args, **kwargs) + return wrapper + return depends_on_ca_plugins_decorator def convert_to_X509Name(dn): @@ -67,21 +85,12 @@ def convert_to_X509Name(dn): return target -class CertificateAuthoritiesTestCase(base.TestCase): +class CATestCommon(base.TestCase): def setUp(self): - super(CertificateAuthoritiesTestCase, self).setUp() + super(CATestCommon, self).setUp() self.order_behaviors = order_behaviors.OrderBehaviors(self.client) self.ca_behaviors = ca_behaviors.CABehaviors(self.client) - self.root_ca_ref = None - - self.subca_subject = "CN=Subordinate CA, O=example.com" - self.subca_name = "Subordinate CA" - self.subca_description = "Test Snake Oil Subordinate CA" - - self.subca_subca_subject = "CN=sub-sub CA, O=example.com" - self.subca_subca_name = "Sub-Sub CA" - self.subca_subca_description = "Test Snake Oil Sub-Sub CA" self.simple_cmc_data = copy.deepcopy(order_simple_cmc_request_data) @@ -92,7 +101,43 @@ class CertificateAuthoritiesTestCase(base.TestCase): def tearDown(self): self.order_behaviors.delete_all_created_orders() self.ca_behaviors.delete_all_created_cas() - super(CertificateAuthoritiesTestCase, self).tearDown() + super(CATestCommon, self).tearDown() + + def send_test_order(self, ca_ref=None): + test_model = order_models.OrderModel(**self.simple_cmc_data) + test_model.meta['request_data'] = base64.b64encode( + certutil.create_good_csr()) + if ca_ref is not None: + ca_id = hrefs.get_ca_id_from_ref(ca_ref) + test_model.meta['ca_id'] = ca_id + + create_resp, order_ref = self.order_behaviors.create_order(test_model) + self.assertEqual(202, create_resp.status_code) + self.assertIsNotNone(order_ref) + + def get_root_ca_ref(self, ca_plugin_name, ca_plugin_id): + (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() + + for item in cas: + ca = self.ca_behaviors.get_ca(item) + if ca.model.plugin_name == ca_plugin_name: + if ca.model.plugin_ca_id == ca_plugin_id: + return item + return None + + +class CertificateAuthoritiesTestCase(CATestCommon): + + def setUp(self): + super(CertificateAuthoritiesTestCase, self).setUp() + + self.subca_subject = "CN=Subordinate CA, O=example.com" + self.subca_name = "Subordinate CA" + self.subca_description = "Test Snake Oil Subordinate CA" + + self.subca_subca_subject = "CN=sub-sub CA, O=example.com" + self.subca_subca_name = "Sub-Sub CA" + self.subca_subca_description = "Test Snake Oil Sub-Sub CA" def get_signing_cert(self, ca_ref): resp = self.ca_behaviors.get_cacert(ca_ref) @@ -103,23 +148,11 @@ class CertificateAuthoritiesTestCase(base.TestCase): return ((cacert.get_subject() == subject_dn) and (cacert.get_issuer() == issuer_dn)) - def get_root_ca_ref(self): - if self.root_ca_ref is not None: - return self.root_ca_ref - - (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() - snake_name = 'barbican.plugin.snakeoil_ca.SnakeoilCACertificatePlugin' - snake_plugin_ca_id = "Snakeoil CA" - - for item in cas: - ca = self.ca_behaviors.get_ca(item) - if ca.model.plugin_name == snake_name: - if ca.model.plugin_ca_id == snake_plugin_ca_id: - return item - return None - def get_snakeoil_subca_model(self): - parent_ca_ref = self.get_root_ca_ref() + parent_ca_ref = self.get_root_ca_ref( + ca_plugin_name=('barbican.plugin.snakeoil_ca.' + 'SnakeoilCACertificatePlugin'), + ca_plugin_id="Snakeoil CA") return ca_models.CAModel( parent_ca_ref=parent_ca_ref, description=self.subca_description, @@ -135,44 +168,24 @@ class CertificateAuthoritiesTestCase(base.TestCase): subject_dn=self.subca_subca_subject ) - def send_test_order(self, ca_ref=None): - test_model = order_models.OrderModel(**self.simple_cmc_data) - test_model.meta['request_data'] = base64.b64encode( - certutil.create_good_csr()) - if ca_ref is not None: - ca_id = hrefs.get_ca_id_from_ref(ca_ref) - test_model.meta['ca_id'] = ca_id - - create_resp, order_ref = self.order_behaviors.create_order(test_model) - self.assertEqual(202, create_resp.status_code) - self.assertIsNotNone(order_ref) - - def test_list_and_get_cas(self): - (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() - self.assertGreater(total, 0) - for item in cas: - ca = self.ca_behaviors.get_ca(item) - self.assertIsNotNone(ca.model.plugin_name) - self.assertIsNotNone(ca.model.ca_id) - self.assertIsNotNone(ca.model.plugin_ca_id) - - @testtools.skipIf(not is_snakeoil_enabled(), - "This test is only usable with snakeoil") + @depends_on_ca_plugins('snakeoil_ca') def test_create_snakeoil_subca(self): ca_model = self.get_snakeoil_subca_model() resp, ca_ref = self.ca_behaviors.create_ca(ca_model) self.assertEqual(201, resp.status_code) - root_subject = self.get_signing_cert( - self.get_root_ca_ref()).get_subject() + root_ca_ref = self.get_root_ca_ref( + ca_plugin_name=('barbican.plugin.snakeoil_ca.' + 'SnakeoilCACertificatePlugin'), + ca_plugin_id="Snakeoil CA") + root_subject = self.get_signing_cert(root_ca_ref).get_subject() self.verify_signing_cert( ca_ref=ca_ref, subject_dn=convert_to_X509Name(self.subca_subject), issuer_dn=root_subject) - @testtools.skipIf(not is_snakeoil_enabled(), - "This test is only usable with snakeoil") + @depends_on_ca_plugins('snakeoil_ca') def test_create_subca_of_snakeoil_subca(self): parent_model = self.get_snakeoil_subca_model() resp, parent_ref = self.ca_behaviors.create_ca(parent_model) @@ -206,16 +219,14 @@ class CertificateAuthoritiesTestCase(base.TestCase): resp, ca_ref = self.ca_behaviors.create_ca(ca_model) self.assertEqual(400, resp.status_code) - @testtools.skipIf(not is_snakeoil_enabled(), - "This test is only usable with snakeoil") + @depends_on_ca_plugins('snakeoil_ca') def test_create_snakeoil_subca_and_send_cert_order(self): ca_model = self.get_snakeoil_subca_model() resp, ca_ref = self.ca_behaviors.create_ca(ca_model) self.assertEqual(201, resp.status_code) self.send_test_order(ca_ref) - @testtools.skipIf(not is_snakeoil_enabled(), - "This test is only usable with snakeoil") + @depends_on_ca_plugins('snakeoil_ca') def test_create_and_delete_snakeoil_subca(self): ca_model = self.get_snakeoil_subca_model() resp, ca_ref = self.ca_behaviors.create_ca(ca_model) @@ -225,14 +236,16 @@ class CertificateAuthoritiesTestCase(base.TestCase): resp = self.ca_behaviors.get_ca(ca_ref) self.assertEqual(404, resp.status_code) - @testtools.skipIf(not is_snakeoil_enabled(), - "This test is only usable with snakeoil") + @depends_on_ca_plugins('snakeoil_ca') def test_fail_to_delete_top_level_snakeoil_ca(self): - resp = self.ca_behaviors.delete_ca(self.get_root_ca_ref()) + root_ca_ref = self.get_root_ca_ref( + ca_plugin_name=('barbican.plugin.snakeoil_ca.' + 'SnakeoilCACertificatePlugin'), + ca_plugin_id="Snakeoil CA") + resp = self.ca_behaviors.delete_ca(root_ca_ref, expected_fail=True) self.assertEqual(403, resp.status_code) - @testtools.skipIf(not is_snakeoil_enabled(), - "This test is only usable with snakeoil") + @depends_on_ca_plugins('snakeoil_ca') def test_create_snakeoil_subca_and_get_cacert(self): ca_model = self.get_snakeoil_subca_model() resp, ca_ref = self.ca_behaviors.create_ca(ca_model) @@ -240,3 +253,68 @@ class CertificateAuthoritiesTestCase(base.TestCase): resp = self.ca_behaviors.get_cacert(ca_ref) self.assertEqual(200, resp.status_code) crypto.load_certificate(crypto.FILETYPE_PEM, resp.text) + + +class ListingCAsTestCase(CATestCommon): + """Tests for listing CAs. + + Must be in a separate class so that we can deselect them + in the parallel CA tests, until we can deselect specific tests + using a decorator. + """ + + def test_list_and_get_cas(self): + (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() + self.assertGreater(total, 0) + for item in cas: + ca = self.ca_behaviors.get_ca(item) + self.assertIsNotNone(ca.model.plugin_name) + self.assertIsNotNone(ca.model.ca_id) + self.assertIsNotNone(ca.model.plugin_ca_id) + + @depends_on_ca_plugins('snakeoil_ca', 'simple_certificate') + def test_list_snakeoil_and_simple_cert_cas(self): + """Test if backend loads these specific CAs + + Since the standard gate works with the snakeoil CA and the + simple_certificate CA. This test is just to make sure that these two + are specifically loaded. + """ + (resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas() + self.assertEqual(total, 2) + + +class ProjectCATestCase(CATestCommon): + + def setUp(self): + super(ProjectCATestCase, self).setUp() + + # @depends_on_ca_plugins('snakeoil_ca', 'simple_certificate') + @testtools.skip("re-enable once CA list code is fixed") + def test_addition_of_project_ca_affects_getting_ca_list(self): + # Getting list of CAs should get the total configured CAs + (resp, cas, initial_total, _, __) = self.ca_behaviors.get_cas() + self.assertGreater(initial_total, 0) + + # Set project CA + ca_ref = self.get_root_ca_ref( + ca_plugin_name=('barbican.plugin.snakeoil_ca.' + 'SnakeoilCACertificatePlugin'), + ca_plugin_id="Snakeoil CA") + resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a) + self.assertEqual(204, resp.status_code) + + # Getting list of CAs should get only the project CA + (resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas( + user_name=creator_a) + self.assertEqual(1, project_ca_total) + + # Remove project CA + resp = self.ca_behaviors.remove_ca_from_project(ca_ref, + user_name=admin_a) + self.assertEqual(204, resp.status_code) + + # Getting list of CAs should get the total configured CAs (as seen + # before) + (resp, cas, final_total, _, __) = self.ca_behaviors.get_cas() + self.assertGreater(initial_total, final_total) diff --git a/functionaltests/run_tests.sh b/functionaltests/run_tests.sh index 89a477664..d1dc0f51d 100755 --- a/functionaltests/run_tests.sh +++ b/functionaltests/run_tests.sh @@ -35,7 +35,7 @@ retval=$? testr slowest # run the tests in parallel -SKIP=^\(\?\!\.\*\(ProjectQuotasPagingTestCase\|QuotaEnforcementTestCase\)\) +SKIP=^\(\?\!\.\*\(ProjectQuotasPagingTestCase\|QuotaEnforcementTestCase\|ListingCAsTestCase\)\) testr init testr run $SKIP --parallel --subunit | subunit-trace --no-failure-debug -f retval=$?