Fix ca related controllers

GET /cas/ would return all the CAs even if they're not in the project.
This CR enables the behavior in which if project CAs are defined, then
it will show them when running GET /cas/; If they're not, then it will
show all CAs.

On the other hand, this fixes an issue with remove-from-project. Where
if you would remove the last CA from a project, it would still have a
preferred one. So the behavior now is:
If the user tries to delete the preferred CA; an error will be returned
unless it is the last CA left for the project.

Change-Id: I195ecc4ea6a8748e003fd17b219a727701e5e15d
This commit is contained in:
Juan Antonio Osorio 2015-09-16 17:26:13 +03:00 committed by Dave McCowan
parent 92a4e3a0cc
commit b06abac781
7 changed files with 354 additions and 94 deletions

View File

@ -40,6 +40,11 @@ def _certificate_authority_attribute_not_found():
pecan.abort(404, u._('Not Found. CA attribute not found.'))
def _ca_not_in_project():
"""Throw exception certificate authority is not in project."""
pecan.abort(404, u._('Not Found. CA not in project.'))
def _requested_preferred_ca_not_a_project_ca():
"""Throw exception indicating that preferred CA is not a project CA."""
pecan.abort(
@ -48,6 +53,14 @@ def _requested_preferred_ca_not_a_project_ca():
)
def _cant_remove_preferred_ca_from_project():
pecan.abort(
405,
u._('Please change the preferred CA to a different project CA '
'before removing it.')
)
class CertificateAuthorityController(controllers.ACLMixin):
"""Handles certificate authority retrieval requests."""
@ -168,9 +181,32 @@ class CertificateAuthorityController(controllers.ACLMixin):
suppress_exception=True))
if project_ca:
self.project_ca_repo.delete_entity_by_id(
project_ca[0].id,
None)
self._do_remove_from_project(project_ca[0])
else:
_ca_not_in_project()
def _do_remove_from_project(self, project_ca):
project_id = project_ca.project_id
ca_id = project_ca.ca_id
preferred_ca = self.preferred_ca_repo.get_project_entities(
project_id)[0]
if self._is_last_project_ca(project_id):
self.preferred_ca_repo.delete_entity_by_id(preferred_ca.id, None)
else:
self._assert_is_not_preferred_ca(preferred_ca.ca_id, ca_id)
self.project_ca_repo.delete_entity_by_id(project_ca.id, None)
def _is_last_project_ca(self, project_id):
_cas, _offset, _limit, total = self.project_ca_repo.get_by_create_date(
project_id=project_id,
suppress_exception=True
)
return total == 1
def _assert_is_not_preferred_ca(self, preferred_ca_id, ca_id):
if preferred_ca_id == ca_id:
_cant_remove_preferred_ca_from_project()
@pecan.expose()
@controllers.handle_exceptions(u._('Set preferred project CA'))
@ -261,7 +297,9 @@ class CertificateAuthoritiesController(controllers.ACLMixin):
def __getattr__(self, name):
route_table = {
'global-preferred': self.get_global_preferred
'all': self.get_all,
'global-preferred': self.get_global_preferred,
'preferred': self.preferred
}
if name in route_table:
return route_table[name]
@ -279,9 +317,46 @@ class CertificateAuthoritiesController(controllers.ACLMixin):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@index.when(method='GET', template='json')
@controllers.handle_exceptions(
u._('Certificate Authorities retrieval (limited)'))
@controllers.enforce_rbac('certificate_authorities:get_limited')
def on_get(self, external_project_id, **kw):
LOG.debug('Start certificate_authorities on_get (limited)')
plugin_name = kw.get('plugin_name')
if plugin_name is not None:
plugin_name = parse.unquote_plus(plugin_name)
plugin_ca_id = kw.get('plugin_ca_id', None)
if plugin_ca_id is not None:
plugin_ca_id = parse.unquote_plus(plugin_ca_id)
# refresh CA table, in case plugin entries have expired
cert_resources.refresh_certificate_resources()
project_model = res.get_or_create_project(external_project_id)
if self._project_cas_defined(project_model.id):
cas, offset, limit, total = self._get_subcas_and_project_cas(
offset=kw.get('offset', 0),
limit=kw.get('limit', None),
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_model.id)
else:
cas, offset, limit, total = self._get_subcas_and_root_cas(
offset=kw.get('offset', 0),
limit=kw.get('limit', None),
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_model.id)
return self._display_cas(cas, offset, limit, total)
@pecan.expose(generic=True, template='json')
@controllers.handle_exceptions(u._('Certificate Authorities retrieval'))
@controllers.enforce_rbac('certificate_authorities:get')
def on_get(self, external_project_id, **kw):
def get_all(self, external_project_id, **kw):
LOG.debug('Start certificate_authorities on_get')
plugin_name = kw.get('plugin_name')
@ -295,31 +370,62 @@ class CertificateAuthoritiesController(controllers.ACLMixin):
# refresh CA table, in case plugin entries have expired
cert_resources.refresh_certificate_resources()
result = self.ca_repo.get_by_create_date(
offset_arg=kw.get('offset', 0),
limit_arg=kw.get('limit', None),
project_model = res.get_or_create_project(external_project_id)
cas, offset, limit, total = self._get_subcas_and_root_cas(
offset=kw.get('offset', 0),
limit=kw.get('limit', None),
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_model.id)
return self._display_cas(cas, offset, limit, total)
def _get_project_cas(self, project_id, query_filters):
cas, offset, limit, total = self.project_ca_repo.get_by_create_date(
offset_arg=query_filters.get('offset', 0),
limit_arg=query_filters.get('limit', None),
project_id=project_id,
suppress_exception=True
)
return cas, offset, limit, total
cas, offset, limit, total = result
def _project_cas_defined(self, project_id):
_cas, _offset, _limit, total = self._get_project_cas(project_id, {})
return total > 0
def _get_subcas_and_project_cas(self, offset, limit, plugin_name,
plugin_ca_id, project_id):
return self.ca_repo.get_by_create_date(
offset_arg=offset,
limit_arg=limit,
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_id,
restrict_to_project_cas=True,
suppress_exception=True)
def _get_subcas_and_root_cas(self, offset, limit, plugin_name,
plugin_ca_id, project_id):
return self.ca_repo.get_by_create_date(
offset_arg=offset,
limit_arg=limit,
plugin_name=plugin_name,
plugin_ca_id=plugin_ca_id,
project_id=project_id,
restrict_to_project_cas=False,
suppress_exception=True)
def _display_cas(self, cas, offset, limit, total):
if not cas:
cas_resp_overall = {'cas': [],
'total': total}
else:
cas_resp = [
hrefs.convert_certificate_authority_to_href(s.id)
for s in cas
]
cas_resp_overall = hrefs.add_nav_hrefs(
'cas',
offset,
limit,
total,
{'cas': cas_resp}
)
hrefs.convert_certificate_authority_to_href(ca.id)
for ca in cas]
cas_resp_overall = hrefs.add_nav_hrefs('cas', offset, limit, total,
{'cas': cas_resp})
cas_resp_overall.update({'total': total})
return cas_resp_overall
@ -335,11 +441,11 @@ class CertificateAuthoritiesController(controllers.ACLMixin):
if not pref_ca:
pecan.abort(404, u._("No global preferred CA defined"))
return {
'cas': [hrefs.convert_certificate_authority_to_href(pref_ca.ca_id)]
}
ca = self.ca_repo.get(entity_id=pref_ca.ca_id)
return ca.to_dict_fields()
@pecan.expose(generic=True, template='json')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Retrieve project preferred CA'))
@controllers.enforce_rbac('certificate_authorities:get_preferred_ca')
def preferred(self, external_project_id, **kw):
@ -351,10 +457,8 @@ class CertificateAuthoritiesController(controllers.ACLMixin):
if not pref_ca:
pecan.abort(404, u._("No preferred CA defined for this project"))
return {
'cas':
[hrefs.convert_certificate_authority_to_href(pref_ca[0].ca_id)]
}
ca = self.ca_repo.get(entity_id=pref_ca[0].ca_id)
return ca.to_dict_fields()
@index.when(method='POST', template='json')
@controllers.handle_exceptions(u._('CA creation'))

View File

@ -1372,7 +1372,8 @@ class CertificateAuthorityRepo(BaseRepo):
def get_by_create_date(self, offset_arg=None, limit_arg=None,
plugin_name=None, plugin_ca_id=None,
suppress_exception=False, session=None,
show_expired=False):
show_expired=False, project_id=None,
restrict_to_project_cas=False):
"""Returns a list of certificate authorities
The returned certificate authorities are ordered by the date they
@ -1380,10 +1381,35 @@ class CertificateAuthorityRepo(BaseRepo):
"""
offset, limit = clean_paging_values(offset_arg, limit_arg)
session = self.get_session(session)
query = session.query(models.CertificateAuthority)
if restrict_to_project_cas:
# get both subCAs which have been defined for your project
# (cas for which the ca.project_id == project_id) AND
# project_cas which are defined for your project
# (pca.project_id = project_id)
query1 = session.query(models.CertificateAuthority)
query1 = query1.filter(
models.CertificateAuthority.project_id == project_id)
query2 = session.query(models.CertificateAuthority)
query2 = query2.join(models.ProjectCertificateAuthority)
query2 = query2.filter(
models.ProjectCertificateAuthority.project_id == project_id)
query = query1.union(query2)
else:
# get both subcas that have been defined for your project
# (cas for which ca.project_id == project_id) AND
# all top-level CAs (ca.project_id == None)
# Note(alee) for sqlalchemy, use '== None', not 'is None'
query = session.query(models.CertificateAuthority)
query = query.filter(or_(
models.CertificateAuthority.project_id == project_id,
models.CertificateAuthority.project_id == None
))
query = query.order_by(models.CertificateAuthority.created_at)
query = query.filter_by(deleted=False)

View File

@ -38,7 +38,11 @@ def create_ca(parsed_ca, id_ref="id"):
class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
def test_should_get_list_certificate_authorities(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user="user1")
}
self.create_cas(set_project_cas=False)
resp = self.app.get('/cas/', self.params)
self.assertEqual(len(resp.namespace['cas']), self.limit)
@ -57,16 +61,86 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
self.assertEqual(resp.body.decode('utf-8').count(url_hrefs),
(self.limit + 2))
def test_response_should_include_total(self):
def test_response_should_list_subca_and_project_cas(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
resp = self.app.get('/cas/', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(3, resp.namespace['total'])
ca_refs = list(resp.namespace['cas'])
for ca_ref in ca_refs:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
if not ((ca_id in self.project_ca_ids)
or (ca_id == self.subca.id)):
self.fail("Invalid CA reference returned")
def test_response_should_all_except_subca(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context("other_project",
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
self.params['plugin_name'] = self.plugin_name
resp = self.app.get('/cas/', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(self.num_cas,
resp.namespace['total'])
self.assertEqual(self.num_cas - 1, resp.namespace['total'])
ca_refs = list(resp.namespace['cas'])
for ca_ref in ca_refs:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
self.assertNotEqual(ca_id, self.subca.id)
def test_response_should_all_except_subca_from_all_subresource(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context("other_project",
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
self.params['plugin_name'] = self.plugin_name
resp = self.app.get('/cas/all', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(self.num_cas - 1, resp.namespace['total'])
ca_refs = list(resp.namespace['cas'])
for ca_ref in ca_refs:
ca_id = hrefs.get_ca_id_from_ref(ca_ref)
self.assertNotEqual(ca_id, self.subca.id)
def test_response_should_all_from_all_subresource(self):
self.create_cas()
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
self.params['plugin_name'] = self.plugin_name
resp = self.app.get('/cas/all', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(self.num_cas, resp.namespace['total'])
def test_response_should_all_cas(self):
self.create_cas(set_project_cas=False)
self.app.extra_environ = {
'barbican.context': self._build_context(self.project_id,
user="user1")
}
self.params['limit'] = 100
self.params['offset'] = 0
self.params['plugin_name'] = self.plugin_name
resp = self.app.get('/cas/', self.params)
self.assertIn('total', resp.namespace)
self.assertEqual(self.num_cas, resp.namespace['total'])
def test_should_get_list_certificate_authorities_with_params(self):
self.create_cas()
self.create_cas(set_project_cas=False)
self.params['plugin_name'] = self.plugin_name
self.params['plugin_ca_id'] = self.plugin_ca_id + str(1)
self.params['offset'] = 0
@ -77,6 +151,18 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
self.assertNotIn('next', resp.namespace)
self.assertEqual(resp.namespace['total'], 1)
def test_should_get_with_params_on_all_resource(self):
self.create_cas(set_project_cas=False)
self.params['plugin_name'] = self.plugin_name
self.params['plugin_ca_id'] = self.plugin_ca_id + str(1)
self.params['offset'] = 0
resp = self.app.get('/cas/all', self.params)
self.assertNotIn('previous', resp.namespace)
self.assertNotIn('next', resp.namespace)
self.assertEqual(resp.namespace['total'], 1)
def test_should_handle_no_cas(self):
self.params = {'offset': 0, 'limit': 2, 'plugin_name': 'dummy'}
resp = self.app.get('/cas/', self.params)
@ -89,9 +175,10 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
self.create_cas()
resp = self.app.get('/cas/global-preferred')
self.assertEqual(
hrefs.convert_certificate_authority_to_href(self.global_ca_id),
resp.namespace['cas'][0])
self.assertEqual(self.global_preferred_ca.id,
resp.namespace['ca_id'])
self.assertEqual(self.global_preferred_ca.plugin_ca_id,
resp.namespace['plugin_ca_id'])
def test_should_get_no_global_preferred_ca(self):
resp = self.app.get('/cas/global-preferred', expect_errors=True)
@ -106,10 +193,10 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
def test_should_get_preferred_ca(self):
self.create_cas()
resp = self.app.get('/cas/preferred')
self.assertEqual(
hrefs.convert_certificate_authority_to_href(
self.preferred_project_ca_id),
resp.namespace['cas'][0])
self.assertEqual(self.preferred_ca.id,
resp.namespace['ca_id'])
self.assertEqual(self.preferred_ca.plugin_ca_id,
resp.namespace['plugin_ca_id'])
def test_should_get_ca(self):
self.create_cas()
@ -184,12 +271,29 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
self.assertEqual(204, resp.status_int)
# TODO(alee) need more detailed tests here
def test_should_remove_from_project_not_currently_set(self):
def test_should_raise_remove_from_project_preferred_ca(self):
self.create_cas()
resp = self.app.post('/cas/{0}/remove-from-project'.format(
self.selected_ca_id))
self.project_ca_ids[1]),
expect_errors=True)
self.assertEqual(405, resp.status_int)
def test_should_remove_preferred_ca_if_last_project_ca(self):
self.create_cas()
resp = self.app.post('/cas/{0}/remove-from-project'.format(
self.project_ca_ids[0]))
self.assertEqual(204, resp.status_int)
# TODO(alee) need more detailed tests here
resp = self.app.post('/cas/{0}/remove-from-project'.format(
self.project_ca_ids[1]))
self.assertEqual(204, resp.status_int)
def test_should_raise_remove_from_project_not_currently_set(self):
self.create_cas()
resp = self.app.post(
'/cas/{0}/remove-from-project'.format(self.selected_ca_id),
expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_should_raise_remove_form_project_on_ca_not_found(self):
self.create_cas()
@ -248,8 +352,8 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
def test_should_unset_global_preferred(self):
self.create_cas()
resp = self.app.post(
'/cas/{0}/unset-global-preferred'.format(self.global_ca_id))
resp = self.app.post('/cas/{0}/unset-global-preferred'.format(
self.global_preferred_ca.id))
self.assertEqual(204, resp.status_int)
def test_should_unset_global_preferred_not_post(self):
@ -350,7 +454,7 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
'parent_ca_id': parent_ca_id
}
def create_cas(self):
def create_cas(self, set_project_cas=True):
self.project = res.get_or_create_project(self.project_id)
project_repo.save(self.project)
self.project_ca_ids = []
@ -359,12 +463,32 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
self.plugin_ca_id = 'default_plugin_ca_id_'
self.ca_id = "id1"
self.num_root_cas = 2
self.num_cas = 10
self.offset = 2
self.limit = 4
self.params = {'offset': self.offset, 'limit': self.limit}
self._do_create_cas(set_project_cas)
# create subca for DELETE testing
parsed_ca = {
'plugin_name': self.plugin_name,
'plugin_ca_id': self.plugin_ca_id + "subca 1",
'name': self.plugin_name,
'description': 'Sub CA for default plugin',
'ca_signing_certificate': 'ZZZZZ' + "sub ca1",
'intermediates': 'YYYYY' + "sub ca1",
'project_id': self.project.id,
'creator_id': 'user12345'
}
ca = models.CertificateAuthority(parsed_ca)
ca_repo.create_from(ca)
ca_repo.save(ca)
self.subca = ca
self.num_cas += 1
def _do_create_cas(self, set_project_cas):
for ca_id in moves.range(self.num_cas):
parsed_ca = {
'plugin_name': self.plugin_name,
@ -385,9 +509,9 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
ca.id)
preferred_ca_repo.create_from(pref_ca)
preferred_ca_repo.save(pref_ca)
self.global_ca_id = ca.id
self.global_preferred_ca = ca
if ca_id == 2:
if ca_id == 2 and set_project_cas:
# set project CA
project_ca = models.ProjectCertificateAuthority(
self.project.id, ca.id)
@ -395,7 +519,7 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
project_ca_repo.save(project_ca)
self.project_ca_ids.append(ca.id)
if ca_id == 3:
if ca_id == 3 and set_project_cas:
# set project preferred CA
project_ca = models.ProjectCertificateAuthority(
self.project.id, ca.id)
@ -407,7 +531,7 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
self.project.id, ca.id)
preferred_ca_repo.create_from(pref_ca)
preferred_ca_repo.save(pref_ca)
self.preferred_project_ca_id = ca.id
self.preferred_ca = ca
if ca_id == 4:
# set ca for testing GETs for a single CA
@ -416,22 +540,6 @@ class WhenTestingCAsResource(utils.BarbicanAPIBaseTestCase):
self.selected_signing_cert = 'ZZZZZ' + str(ca_id)
self.selected_intermediates = 'YYYYY' + str(ca_id)
# create subca for DELETE testing
parsed_ca = {
'plugin_name': self.plugin_name + '_delete_me',
'plugin_ca_id': self.plugin_ca_id + "subca 1",
'name': self.plugin_name,
'description': 'Sub CA for default plugin',
'ca_signing_certificate': 'ZZZZZ' + "sub ca1",
'intermediates': 'YYYYY' + "sub ca1",
'project_id': self.project_id,
'creator_id': 'user12345'
}
ca = models.CertificateAuthority(parsed_ca)
ca_repo.create_from(ca)
ca_repo.save(ca)
self.subca = ca
def _create_url(self, external_project_id, offset_arg=None,
limit_arg=None):
if limit_arg:

View File

@ -50,7 +50,8 @@
"transport_key:delete": "rule:admin",
"transport_keys:get": "rule:all_users",
"transport_keys:post": "rule:admin",
"certificate_authorities:get": "rule:all_users",
"certificate_authorities:get_limited": "rule:all_users",
"certificate_authorities:get_all": "rule:admin",
"certificate_authorities:post": "rule:admin",
"certificate_authorities:get_preferred_ca": "rule:all_users",
"certificate_authorities:get_global_preferred_ca": "rule:service_admin",

View File

@ -140,3 +140,17 @@ class CABehaviors(base_behaviors.BaseBehaviors):
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
return resp
def set_preferred(self, ca_ref, headers=None, use_auth=True,
user_name=None):
resp = self.client.post(ca_ref + '/set-preferred',
extra_headers=headers, use_auth=use_auth,
user_name=user_name)
return resp
def get_preferred(self, extra_headers=None, use_auth=True,
user_name=None):
return self.client.get('cas/preferred',
response_model_type=ca_models.CAModel,
extra_headers=extra_headers, use_auth=use_auth,
user_name=user_name)

View File

@ -16,9 +16,9 @@
import base64
import copy
import re
import testtools
from OpenSSL import crypto
import testtools
from barbican.common import hrefs
from barbican.plugin.interface import certificate_manager as cert_interface
@ -121,6 +121,12 @@ class CATestCommon(base.TestCase):
return item
return None
def get_snakeoil_root_ca_ref(self):
return self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
class CertificateAuthoritiesTestCase(CATestCommon):
@ -145,12 +151,8 @@ class CertificateAuthoritiesTestCase(CATestCommon):
(cacert.get_issuer() == issuer_dn))
def get_snakeoil_subca_model(self):
parent_ca_ref = self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
return ca_models.CAModel(
parent_ca_ref=parent_ca_ref,
parent_ca_ref=self.get_snakeoil_root_ca_ref(),
description=self.subca_description,
name=self.subca_name,
subject_dn=self.subca_subject
@ -170,10 +172,7 @@ class CertificateAuthoritiesTestCase(CATestCommon):
resp, ca_ref = self.ca_behaviors.create_ca(ca_model)
self.assertEqual(201, resp.status_code)
root_ca_ref = self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
root_ca_ref = self.get_snakeoil_root_ca_ref()
root_subject = self.get_signing_cert(root_ca_ref).get_subject()
self.verify_signing_cert(
@ -222,6 +221,16 @@ class CertificateAuthoritiesTestCase(CATestCommon):
self.assertEqual(201, resp.status_code)
self.send_test_order(ca_ref)
# @depends_on_ca_plugins('snakeoil_ca')
@testtools.skip("Skip test until ca behaviors tracks project cas")
def test_add_snakeoil_ca__to_project_and_get_preferred(self):
ca_ref = self.get_snakeoil_root_ca_ref()
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
ca = self.ca_behaviors.get_preferred(user_name=admin_a)
self.assertEqual(hrefs.get_ca_id_from_ref(ca_ref), ca.model.ca_id)
@depends_on_ca_plugins('snakeoil_ca')
def test_create_and_delete_snakeoil_subca(self):
ca_model = self.get_snakeoil_subca_model()
@ -234,11 +243,9 @@ class CertificateAuthoritiesTestCase(CATestCommon):
@depends_on_ca_plugins('snakeoil_ca')
def test_fail_to_delete_top_level_snakeoil_ca(self):
root_ca_ref = self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
resp = self.ca_behaviors.delete_ca(root_ca_ref, expected_fail=True)
resp = self.ca_behaviors.delete_ca(
self.get_snakeoil_root_ca_ref(),
expected_fail=True)
self.assertEqual(403, resp.status_code)
@depends_on_ca_plugins('snakeoil_ca')
@ -285,22 +292,22 @@ class ProjectCATestCase(CATestCommon):
def setUp(self):
super(ProjectCATestCase, self).setUp()
# @depends_on_ca_plugins('snakeoil_ca', 'simple_certificate')
@testtools.skip("re-enable once CA list code is fixed")
@depends_on_ca_plugins('snakeoil_ca', 'simple_certificate')
def test_addition_of_project_ca_affects_getting_ca_list(self):
# Getting list of CAs should get the total configured CAs
(resp, cas, initial_total, _, __) = self.ca_behaviors.get_cas()
self.assertGreater(initial_total, 0)
self.assertEqual(initial_total, 2)
# Set project CA
ca_ref = self.get_root_ca_ref(
ca_plugin_name=('barbican.plugin.snakeoil_ca.'
'SnakeoilCACertificatePlugin'),
ca_plugin_id="Snakeoil CA")
ca_ref = self.get_snakeoil_root_ca_ref()
resp = self.ca_behaviors.add_ca_to_project(ca_ref, user_name=admin_a)
self.assertEqual(204, resp.status_code)
# Getting list of CAs should get only the project CA
# Getting list of CAs should get only the project CA for admin
(resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas(
user_name=admin_a)
self.assertEqual(1, project_ca_total)
# Getting list of CAs should get only the project CA for non-admin
(resp, cas, project_ca_total, _, __) = self.ca_behaviors.get_cas(
user_name=creator_a)
self.assertEqual(1, project_ca_total)
@ -313,4 +320,4 @@ class ProjectCATestCase(CATestCommon):
# Getting list of CAs should get the total configured CAs (as seen
# before)
(resp, cas, final_total, _, __) = self.ca_behaviors.get_cas()
self.assertGreater(initial_total, final_total)
self.assertEqual(initial_total, final_total)

View File

@ -29,7 +29,7 @@ retval=$?
testr slowest
# run the tests in parallel
SKIP=^\(\?\!\.\*\(ProjectQuotasPagingTestCase\|QuotaEnforcementTestCase\|ListingCAsTestCase\)\)
SKIP=^\(\?\!\.\*\(ProjectQuotasPagingTestCase\|QuotaEnforcementTestCase\|ListingCAsTestCase\|ProjectCATestCase\)\)
testr init
testr run $SKIP --parallel --subunit | subunit-trace --no-failure-debug -f
retval=$?