|
|
|
|
@@ -28,6 +28,12 @@ from functionaltests.api.v1.behaviors import ca_behaviors
|
|
|
|
|
from functionaltests.api.v1.behaviors import order_behaviors
|
|
|
|
|
from functionaltests.api.v1.models import ca_models
|
|
|
|
|
from functionaltests.api.v1.models import order_models
|
|
|
|
|
from functionaltests.common import config
|
|
|
|
|
|
|
|
|
|
CONF = config.get_config()
|
|
|
|
|
|
|
|
|
|
admin_a = CONF.rbac_users.admin_a
|
|
|
|
|
creator_a = CONF.rbac_users.creator_a
|
|
|
|
|
|
|
|
|
|
order_simple_cmc_request_data = {
|
|
|
|
|
'type': 'certificate',
|
|
|
|
|
@@ -40,11 +46,23 @@ order_simple_cmc_request_data = {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
CONF = cert_interface.CONF
|
|
|
|
|
BARBICAN_SRV_CONF = cert_interface.CONF
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_snakeoil_enabled():
|
|
|
|
|
return 'snakeoil_ca' in CONF.certificate.enabled_certificate_plugins
|
|
|
|
|
def is_plugin_enabled(plugin):
|
|
|
|
|
return plugin in BARBICAN_SRV_CONF.certificate.enabled_certificate_plugins
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def depends_on_ca_plugins(*plugins):
|
|
|
|
|
def depends_on_ca_plugins_decorator(function):
|
|
|
|
|
def wrapper(instance, *args, **kwargs):
|
|
|
|
|
plugins_enabled = (is_plugin_enabled(p) for p in plugins)
|
|
|
|
|
if not all(plugins_enabled):
|
|
|
|
|
instance.skipTest("The following plugin(s) need to be "
|
|
|
|
|
"enabled: ".format(plugins))
|
|
|
|
|
function(instance, *args, **kwargs)
|
|
|
|
|
return wrapper
|
|
|
|
|
return depends_on_ca_plugins_decorator
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_to_X509Name(dn):
|
|
|
|
|
@@ -67,21 +85,12 @@ def convert_to_X509Name(dn):
|
|
|
|
|
return target
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CertificateAuthoritiesTestCase(base.TestCase):
|
|
|
|
|
class CATestCommon(base.TestCase):
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
super(CertificateAuthoritiesTestCase, self).setUp()
|
|
|
|
|
super(CATestCommon, self).setUp()
|
|
|
|
|
self.order_behaviors = order_behaviors.OrderBehaviors(self.client)
|
|
|
|
|
self.ca_behaviors = ca_behaviors.CABehaviors(self.client)
|
|
|
|
|
self.root_ca_ref = None
|
|
|
|
|
|
|
|
|
|
self.subca_subject = "CN=Subordinate CA, O=example.com"
|
|
|
|
|
self.subca_name = "Subordinate CA"
|
|
|
|
|
self.subca_description = "Test Snake Oil Subordinate CA"
|
|
|
|
|
|
|
|
|
|
self.subca_subca_subject = "CN=sub-sub CA, O=example.com"
|
|
|
|
|
self.subca_subca_name = "Sub-Sub CA"
|
|
|
|
|
self.subca_subca_description = "Test Snake Oil Sub-Sub CA"
|
|
|
|
|
|
|
|
|
|
self.simple_cmc_data = copy.deepcopy(order_simple_cmc_request_data)
|
|
|
|
|
|
|
|
|
|
@@ -92,7 +101,43 @@ class CertificateAuthoritiesTestCase(base.TestCase):
|
|
|
|
|
def tearDown(self):
|
|
|
|
|
self.order_behaviors.delete_all_created_orders()
|
|
|
|
|
self.ca_behaviors.delete_all_created_cas()
|
|
|
|
|
super(CertificateAuthoritiesTestCase, self).tearDown()
|
|
|
|
|
super(CATestCommon, self).tearDown()
|
|
|
|
|
|
|
|
|
|
def send_test_order(self, ca_ref=None):
|
|
|
|
|
test_model = order_models.OrderModel(**self.simple_cmc_data)
|
|
|
|
|
test_model.meta['request_data'] = base64.b64encode(
|
|
|
|
|
certutil.create_good_csr())
|
|
|
|
|
if ca_ref is not None:
|
|
|
|
|
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
|
|
|
|
|
test_model.meta['ca_id'] = ca_id
|
|
|
|
|
|
|
|
|
|
create_resp, order_ref = self.order_behaviors.create_order(test_model)
|
|
|
|
|
self.assertEqual(202, create_resp.status_code)
|
|
|
|
|
self.assertIsNotNone(order_ref)
|
|
|
|
|
|
|
|
|
|
def get_root_ca_ref(self, ca_plugin_name, ca_plugin_id):
|
|
|
|
|
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
|
|
|
|
|
|
|
|
|
|
for item in cas:
|
|
|
|
|
ca = self.ca_behaviors.get_ca(item)
|
|
|
|
|
if ca.model.plugin_name == ca_plugin_name:
|
|
|
|
|
if ca.model.plugin_ca_id == ca_plugin_id:
|
|
|
|
|
return item
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CertificateAuthoritiesTestCase(CATestCommon):
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
super(CertificateAuthoritiesTestCase, self).setUp()
|
|
|
|
|
|
|
|
|
|
self.subca_subject = "CN=Subordinate CA, O=example.com"
|
|
|
|
|
self.subca_name = "Subordinate CA"
|
|
|
|
|
self.subca_description = "Test Snake Oil Subordinate CA"
|
|
|
|
|
|
|
|
|
|
self.subca_subca_subject = "CN=sub-sub CA, O=example.com"
|
|
|
|
|
self.subca_subca_name = "Sub-Sub CA"
|
|
|
|
|
self.subca_subca_description = "Test Snake Oil Sub-Sub CA"
|
|
|
|
|
|
|
|
|
|
def get_signing_cert(self, ca_ref):
|
|
|
|
|
resp = self.ca_behaviors.get_cacert(ca_ref)
|
|
|
|
|
@@ -103,23 +148,11 @@ class CertificateAuthoritiesTestCase(base.TestCase):
|
|
|
|
|
return ((cacert.get_subject() == subject_dn) and
|
|
|
|
|
(cacert.get_issuer() == issuer_dn))
|
|
|
|
|
|
|
|
|
|
def get_root_ca_ref(self):
|
|
|
|
|
if self.root_ca_ref is not None:
|
|
|
|
|
return self.root_ca_ref
|
|
|
|
|
|
|
|
|
|
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
|
|
|
|
|
snake_name = 'barbican.plugin.snakeoil_ca.SnakeoilCACertificatePlugin'
|
|
|
|
|
snake_plugin_ca_id = "Snakeoil CA"
|
|
|
|
|
|
|
|
|
|
for item in cas:
|
|
|
|
|
ca = self.ca_behaviors.get_ca(item)
|
|
|
|
|
if ca.model.plugin_name == snake_name:
|
|
|
|
|
if ca.model.plugin_ca_id == snake_plugin_ca_id:
|
|
|
|
|
return item
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_snakeoil_subca_model(self):
|
|
|
|
|
parent_ca_ref = self.get_root_ca_ref()
|
|
|
|
|
parent_ca_ref = self.get_root_ca_ref(
|
|
|
|
|
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
|
|
|
|
|
'SnakeoilCACertificatePlugin'),
|
|
|
|
|
ca_plugin_id="Snakeoil CA")
|
|
|
|
|
return ca_models.CAModel(
|
|
|
|
|
parent_ca_ref=parent_ca_ref,
|
|
|
|
|
description=self.subca_description,
|
|
|
|
|
@@ -135,44 +168,24 @@ class CertificateAuthoritiesTestCase(base.TestCase):
|
|
|
|
|
subject_dn=self.subca_subca_subject
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def send_test_order(self, ca_ref=None):
|
|
|
|
|
test_model = order_models.OrderModel(**self.simple_cmc_data)
|
|
|
|
|
test_model.meta['request_data'] = base64.b64encode(
|
|
|
|
|
certutil.create_good_csr())
|
|
|
|
|
if ca_ref is not None:
|
|
|
|
|
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
|
|
|
|
|
test_model.meta['ca_id'] = ca_id
|
|
|
|
|
|
|
|
|
|
create_resp, order_ref = self.order_behaviors.create_order(test_model)
|
|
|
|
|
self.assertEqual(202, create_resp.status_code)
|
|
|
|
|
self.assertIsNotNone(order_ref)
|
|
|
|
|
|
|
|
|
|
def test_list_and_get_cas(self):
|
|
|
|
|
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
|
|
|
|
|
self.assertGreater(total, 0)
|
|
|
|
|
for item in cas:
|
|
|
|
|
ca = self.ca_behaviors.get_ca(item)
|
|
|
|
|
self.assertIsNotNone(ca.model.plugin_name)
|
|
|
|
|
self.assertIsNotNone(ca.model.ca_id)
|
|
|
|
|
self.assertIsNotNone(ca.model.plugin_ca_id)
|
|
|
|
|
|
|
|
|
|
@testtools.skipIf(not is_snakeoil_enabled(),
|
|
|
|
|
"This test is only usable with snakeoil")
|
|
|
|
|
@depends_on_ca_plugins('snakeoil_ca')
|
|
|
|
|
def test_create_snakeoil_subca(self):
|
|
|
|
|
ca_model = self.get_snakeoil_subca_model()
|
|
|
|
|
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
|
|
|
|
|
self.assertEqual(201, resp.status_code)
|
|
|
|
|
|
|
|
|
|
root_subject = self.get_signing_cert(
|
|
|
|
|
self.get_root_ca_ref()).get_subject()
|
|
|
|
|
root_ca_ref = self.get_root_ca_ref(
|
|
|
|
|
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
|
|
|
|
|
'SnakeoilCACertificatePlugin'),
|
|
|
|
|
ca_plugin_id="Snakeoil CA")
|
|
|
|
|
root_subject = self.get_signing_cert(root_ca_ref).get_subject()
|
|
|
|
|
|
|
|
|
|
self.verify_signing_cert(
|
|
|
|
|
ca_ref=ca_ref,
|
|
|
|
|
subject_dn=convert_to_X509Name(self.subca_subject),
|
|
|
|
|
issuer_dn=root_subject)
|
|
|
|
|
|
|
|
|
|
@testtools.skipIf(not is_snakeoil_enabled(),
|
|
|
|
|
"This test is only usable with snakeoil")
|
|
|
|
|
@depends_on_ca_plugins('snakeoil_ca')
|
|
|
|
|
def test_create_subca_of_snakeoil_subca(self):
|
|
|
|
|
parent_model = self.get_snakeoil_subca_model()
|
|
|
|
|
resp, parent_ref = self.ca_behaviors.create_ca(parent_model)
|
|
|
|
|
@@ -206,16 +219,14 @@ class CertificateAuthoritiesTestCase(base.TestCase):
|
|
|
|
|
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
|
|
|
|
|
self.assertEqual(400, resp.status_code)
|
|
|
|
|
|
|
|
|
|
@testtools.skipIf(not is_snakeoil_enabled(),
|
|
|
|
|
"This test is only usable with snakeoil")
|
|
|
|
|
@depends_on_ca_plugins('snakeoil_ca')
|
|
|
|
|
def test_create_snakeoil_subca_and_send_cert_order(self):
|
|
|
|
|
ca_model = self.get_snakeoil_subca_model()
|
|
|
|
|
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
|
|
|
|
|
self.assertEqual(201, resp.status_code)
|
|
|
|
|
self.send_test_order(ca_ref)
|
|
|
|
|
|
|
|
|
|
@testtools.skipIf(not is_snakeoil_enabled(),
|
|
|
|
|
"This test is only usable with snakeoil")
|
|
|
|
|
@depends_on_ca_plugins('snakeoil_ca')
|
|
|
|
|
def test_create_and_delete_snakeoil_subca(self):
|
|
|
|
|
ca_model = self.get_snakeoil_subca_model()
|
|
|
|
|
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
|
|
|
|
|
@@ -225,14 +236,16 @@ class CertificateAuthoritiesTestCase(base.TestCase):
|
|
|
|
|
resp = self.ca_behaviors.get_ca(ca_ref)
|
|
|
|
|
self.assertEqual(404, resp.status_code)
|
|
|
|
|
|
|
|
|
|
@testtools.skipIf(not is_snakeoil_enabled(),
|
|
|
|
|
"This test is only usable with snakeoil")
|
|
|
|
|
@depends_on_ca_plugins('snakeoil_ca')
|
|
|
|
|
def test_fail_to_delete_top_level_snakeoil_ca(self):
|
|
|
|
|
resp = self.ca_behaviors.delete_ca(self.get_root_ca_ref())
|
|
|
|
|
root_ca_ref = self.get_root_ca_ref(
|
|
|
|
|
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
|
|
|
|
|
'SnakeoilCACertificatePlugin'),
|
|
|
|
|
ca_plugin_id="Snakeoil CA")
|
|
|
|
|
resp = self.ca_behaviors.delete_ca(root_ca_ref, expected_fail=True)
|
|
|
|
|
self.assertEqual(403, resp.status_code)
|
|
|
|
|
|
|
|
|
|
@testtools.skipIf(not is_snakeoil_enabled(),
|
|
|
|
|
"This test is only usable with snakeoil")
|
|
|
|
|
@depends_on_ca_plugins('snakeoil_ca')
|
|
|
|
|
def test_create_snakeoil_subca_and_get_cacert(self):
|
|
|
|
|
ca_model = self.get_snakeoil_subca_model()
|
|
|
|
|
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
|
|
|
|
|
@@ -240,3 +253,68 @@ class CertificateAuthoritiesTestCase(base.TestCase):
|
|
|
|
|
resp = self.ca_behaviors.get_cacert(ca_ref)
|
|
|
|
|
self.assertEqual(200, resp.status_code)
|
|
|
|
|
crypto.load_certificate(crypto.FILETYPE_PEM, resp.text)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ListingCAsTestCase(CATestCommon):
|
|
|
|
|
"""Tests for listing CAs.
|
|
|
|
|
|
|
|
|
|
Must be in a separate class so that we can deselect them
|
|
|
|
|
in the parallel CA tests, until we can deselect specific tests
|
|
|
|
|
using a decorator.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def test_list_and_get_cas(self):
|
|
|
|
|
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
|
|
|
|
|
self.assertGreater(total, 0)
|
|
|
|
|
for item in cas:
|
|
|
|
|
ca = self.ca_behaviors.get_ca(item)
|
|
|
|
|
self.assertIsNotNone(ca.model.plugin_name)
|
|
|
|
|
self.assertIsNotNone(ca.model.ca_id)
|
|
|
|
|
self.assertIsNotNone(ca.model.plugin_ca_id)
|
|
|
|
|
|
|
|
|
|
@depends_on_ca_plugins('snakeoil_ca', 'simple_certificate')
|
|
|
|
|
def test_list_snakeoil_and_simple_cert_cas(self):
|
|
|
|
|
"""Test if backend loads these specific CAs
|
|
|
|
|
|
|
|
|
|
Since the standard gate works with the snakeoil CA and the
|
|
|
|
|
simple_certificate CA. This test is just to make sure that these two
|
|
|
|
|
are specifically loaded.
|
|
|
|
|
"""
|
|
|
|
|
(resp, cas, total, next_ref, prev_ref) = self.ca_behaviors.get_cas()
|
|
|
|
|
self.assertEqual(total, 2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ProjectCATestCase(CATestCommon):
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
|
super(ProjectCATestCase, self).setUp()
|
|
|
|
|
|
|
|
|
|
# @depends_on_ca_plugins('snakeoil_ca', 'simple_certificate')
|
|
|
|
|
@testtools.skip("re-enable once CA list code is fixed")
|
|
|
|
|
def test_addition_of_project_ca_affects_getting_ca_list(self):
|
|
|
|
|
# Getting list of CAs should get the total configured CAs
|
|
|
|
|
(resp, cas, initial_total, _, __) = self.ca_behaviors.get_cas()
|
|
|
|
|
self.assertGreater(initial_total, 0)
|
|
|
|
|
|
|
|
|
|
# Set project CA
|
|
|
|
|
ca_ref = self.get_root_ca_ref(
|
|
|
|
|
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
|
|
|
|
|
'SnakeoilCACertificatePlugin'),
|
|
|
|
|
ca_plugin_id="Snakeoil CA")
|
|
|
|
|
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
|
|
|
|
|
self.assertEqual(204, resp.status_code)
|
|
|
|
|
|
|
|
|
|
# Getting list of CAs should get only the project CA
|
|
|
|
|
(resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas(
|
|
|
|
|
user_name=creator_a)
|
|
|
|
|
self.assertEqual(1, project_ca_total)
|
|
|
|
|
|
|
|
|
|
# Remove project CA
|
|
|
|
|
resp = self.ca_behaviors.remove_ca_from_project(ca_ref,
|
|
|
|
|
user_name=admin_a)
|
|
|
|
|
self.assertEqual(204, resp.status_code)
|
|
|
|
|
|
|
|
|
|
# Getting list of CAs should get the total configured CAs (as seen
|
|
|
|
|
# before)
|
|
|
|
|
(resp, cas, final_total, _, __) = self.ca_behaviors.get_cas()
|
|
|
|
|
self.assertGreater(initial_total, final_total)
|
|
|
|
|
|