Adding rest API for secret-stores resource (Part 4)
Added tests to provide 100% coverage on API and policy logic. Change-Id: Icb43049250be1d78bdd3db8fbad0dc0381cccaf7 Partially-Implements: blueprint multiple-secret-backend
This commit is contained in:
parent
b05c4b6f86
commit
6535e559cd
214
barbican/api/controllers/secretstores.py
Normal file
214
barbican/api/controllers/secretstores.py
Normal file
@ -0,0 +1,214 @@
|
||||
# (c) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import pecan
|
||||
|
||||
from barbican.api import controllers
|
||||
from barbican.common import hrefs
|
||||
from barbican.common import resources as res
|
||||
from barbican.common import utils
|
||||
from barbican import i18n as u
|
||||
from barbican.model import repositories as repo
|
||||
from barbican.plugin.util import multiple_backends
|
||||
|
||||
LOG = utils.getLogger(__name__)
|
||||
|
||||
|
||||
def _secret_store_not_found():
|
||||
"""Throw exception indicating secret store not found."""
|
||||
pecan.abort(404, u._('Not Found. Secret store not found.'))
|
||||
|
||||
|
||||
def _preferred_secret_store_not_found():
|
||||
"""Throw exception indicating secret store not found."""
|
||||
pecan.abort(404, u._('Not Found. No preferred secret store defined for '
|
||||
'this project.'))
|
||||
|
||||
|
||||
def _multiple_backends_not_enabled():
|
||||
"""Throw exception indicating multiple backends support is not enabled."""
|
||||
pecan.abort(404, u._('Not Found. Multiple backends support is not enabled '
|
||||
'in service configuration.'))
|
||||
|
||||
|
||||
def convert_secret_store_to_response_format(secret_store):
|
||||
data = secret_store.to_dict_fields()
|
||||
data['secret_store_plugin'] = data.pop('store_plugin')
|
||||
data['secret_store_ref'] = hrefs.convert_secret_stores_to_href(
|
||||
data['secret_store_id'])
|
||||
# no need to pass store id as secret_store_ref is returned
|
||||
data.pop('secret_store_id', None)
|
||||
return data
|
||||
|
||||
|
||||
class PreferredSecretStoreController(controllers.ACLMixin):
|
||||
"""Handles preferred secret store set/removal requests."""
|
||||
|
||||
def __init__(self, secret_store):
|
||||
LOG.debug(u._('=== Creating PreferredSecretStoreController ==='))
|
||||
self.secret_store = secret_store
|
||||
self.proj_store_repo = repo.get_project_secret_store_repository()
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self, **kwargs):
|
||||
pecan.abort(405) # HTTP 405 Method Not Allowed as default
|
||||
|
||||
@index.when(method='DELETE', template='json')
|
||||
@controllers.handle_exceptions(u._('Removing preferred secret store'))
|
||||
@controllers.enforce_rbac('secretstore_preferred:delete')
|
||||
def on_delete(self, external_project_id, **kw):
|
||||
LOG.debug(u._('Start: Remove project preferred secret-store for store'
|
||||
' id %s'), self.secret_store.id)
|
||||
|
||||
project = res.get_or_create_project(external_project_id)
|
||||
|
||||
project_store = self.proj_store_repo.get_secret_store_for_project(
|
||||
project.id, None, suppress_exception=True)
|
||||
if project_store is None:
|
||||
_preferred_secret_store_not_found()
|
||||
|
||||
self.proj_store_repo.delete_entity_by_id(
|
||||
entity_id=project_store.id,
|
||||
external_project_id=external_project_id)
|
||||
pecan.response.status = 204
|
||||
|
||||
@index.when(method='POST', template='json')
|
||||
@controllers.handle_exceptions(u._('Setting preferred secret store'))
|
||||
@controllers.enforce_rbac('secretstore_preferred:post')
|
||||
def on_post(self, external_project_id, **kwargs):
|
||||
LOG.debug(u._('Start: Set project preferred secret-store for store '),
|
||||
'id %s', self.secret_store.id)
|
||||
|
||||
project = res.get_or_create_project(external_project_id)
|
||||
|
||||
self.proj_store_repo.create_or_update_for_project(project.id,
|
||||
self.secret_store.id)
|
||||
|
||||
pecan.response.status = 204
|
||||
|
||||
|
||||
class SecretStoreController(controllers.ACLMixin):
|
||||
"""Handles secret store retrieval requests."""
|
||||
|
||||
def __init__(self, secret_store):
|
||||
LOG.debug(u._('=== Creating SecretStoreController ==='))
|
||||
self.secret_store = secret_store
|
||||
|
||||
@pecan.expose()
|
||||
def _lookup(self, action, *remainder):
|
||||
if (action == 'preferred'):
|
||||
return PreferredSecretStoreController(self.secret_store), remainder
|
||||
else:
|
||||
pecan.abort(405)
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self, **kwargs):
|
||||
pecan.abort(405) # HTTP 405 Method Not Allowed as default
|
||||
|
||||
@index.when(method='GET', template='json')
|
||||
@controllers.handle_exceptions(u._('Secret store retrieval'))
|
||||
@controllers.enforce_rbac('secretstore:get')
|
||||
def on_get(self, external_project_id):
|
||||
LOG.debug("== Getting secret store for %s", self.secret_store.id)
|
||||
return convert_secret_store_to_response_format(self.secret_store)
|
||||
|
||||
|
||||
class SecretStoresController(controllers.ACLMixin):
|
||||
"""Handles secret-stores list requests."""
|
||||
|
||||
def __init__(self):
|
||||
LOG.debug('Creating SecretStoresController')
|
||||
self.secret_stores_repo = repo.get_secret_stores_repository()
|
||||
self.proj_store_repo = repo.get_project_secret_store_repository()
|
||||
|
||||
def __getattr__(self, name):
|
||||
route_table = {
|
||||
'global-default': self.get_global_default,
|
||||
'preferred': self.get_preferred,
|
||||
}
|
||||
if name in route_table:
|
||||
return route_table[name]
|
||||
raise AttributeError
|
||||
|
||||
@pecan.expose()
|
||||
def _lookup(self, secret_store_id, *remainder):
|
||||
if not utils.is_multiple_backends_enabled():
|
||||
_multiple_backends_not_enabled()
|
||||
|
||||
secret_store = self.secret_stores_repo.get(entity_id=secret_store_id,
|
||||
suppress_exception=True)
|
||||
if not secret_store:
|
||||
_secret_store_not_found()
|
||||
return SecretStoreController(secret_store), remainder
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self, **kwargs):
|
||||
pecan.abort(405) # HTTP 405 Method Not Allowed as default
|
||||
|
||||
@index.when(method='GET', template='json')
|
||||
@controllers.handle_exceptions(u._('List available secret stores'))
|
||||
@controllers.enforce_rbac('secretstores:get')
|
||||
def on_get(self, external_project_id, **kw):
|
||||
LOG.debug(u._('Start SecretStoresController on_get: listing secret '
|
||||
'stores'))
|
||||
if not utils.is_multiple_backends_enabled():
|
||||
_multiple_backends_not_enabled()
|
||||
|
||||
res.get_or_create_project(external_project_id)
|
||||
|
||||
secret_stores = self.secret_stores_repo.get_all()
|
||||
|
||||
resp_list = []
|
||||
for store in secret_stores:
|
||||
item = convert_secret_store_to_response_format(store)
|
||||
resp_list.append(item)
|
||||
|
||||
resp = {'secret_stores': resp_list}
|
||||
|
||||
return resp
|
||||
|
||||
@pecan.expose(generic=True, template='json')
|
||||
@controllers.handle_exceptions(u._('Retrieve global default secret store'))
|
||||
@controllers.enforce_rbac('secretstores:get_global_default')
|
||||
def get_global_default(self, external_project_id, **kw):
|
||||
LOG.debug(u._('Start secret-stores get global default secret store'))
|
||||
|
||||
if not utils.is_multiple_backends_enabled():
|
||||
_multiple_backends_not_enabled()
|
||||
|
||||
res.get_or_create_project(external_project_id)
|
||||
|
||||
store = multiple_backends.get_global_default_secret_store()
|
||||
|
||||
return convert_secret_store_to_response_format(store)
|
||||
|
||||
@pecan.expose(generic=True, template='json')
|
||||
@controllers.handle_exceptions(u._('Retrieve project preferred store'))
|
||||
@controllers.enforce_rbac('secretstores:get_preferred')
|
||||
def get_preferred(self, external_project_id, **kw):
|
||||
LOG.debug(u._('Start secret-stores get preferred secret store'))
|
||||
|
||||
if not utils.is_multiple_backends_enabled():
|
||||
_multiple_backends_not_enabled()
|
||||
|
||||
project = res.get_or_create_project(external_project_id)
|
||||
|
||||
project_store = self.proj_store_repo.get_secret_store_for_project(
|
||||
project.id, None, suppress_exception=True)
|
||||
|
||||
if project_store is None:
|
||||
_preferred_secret_store_not_found()
|
||||
|
||||
return convert_secret_store_to_response_format(
|
||||
project_store.secret_store)
|
@ -19,6 +19,7 @@ from barbican.api.controllers import containers
|
||||
from barbican.api.controllers import orders
|
||||
from barbican.api.controllers import quotas
|
||||
from barbican.api.controllers import secrets
|
||||
from barbican.api.controllers import secretstores
|
||||
from barbican.api.controllers import transportkeys
|
||||
from barbican.common import config
|
||||
from barbican.common import utils
|
||||
@ -97,6 +98,7 @@ class V1Controller(BaseVersionController):
|
||||
self.cas = cas.CertificateAuthoritiesController()
|
||||
self.quotas = quotas.QuotasController()
|
||||
setattr(self, 'project-quotas', quotas.ProjectsQuotasController())
|
||||
setattr(self, 'secret-stores', secretstores.SecretStoresController())
|
||||
|
||||
@pecan.expose(generic=True)
|
||||
def index(self):
|
||||
|
@ -56,6 +56,11 @@ def convert_certificate_authority_to_href(ca_id):
|
||||
return convert_resource_id_to_href('cas', ca_id)
|
||||
|
||||
|
||||
def convert_secret_stores_to_href(secret_store_id):
|
||||
"""Convert the ca ID to a HATEOAS-style href."""
|
||||
return convert_resource_id_to_href('secret-stores', secret_store_id)
|
||||
|
||||
|
||||
# TODO(hgedikli) handle list of fields in here
|
||||
def convert_to_hrefs(fields):
|
||||
"""Convert id's within a fields dict to HATEOAS-style hrefs."""
|
||||
|
325
barbican/tests/api/controllers/test_secretstores.py
Normal file
325
barbican/tests/api/controllers/test_secretstores.py
Normal file
@ -0,0 +1,325 @@
|
||||
# (c) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import uuid
|
||||
|
||||
from barbican.model import models
|
||||
from barbican.model import repositories as repos
|
||||
from barbican.plugin.interface import secret_store
|
||||
from barbican.tests import utils
|
||||
|
||||
|
||||
class SecretStoresMixin(utils.MultipleBackendsTestCase):
|
||||
|
||||
def _create_project(self):
|
||||
session = repos.get_project_repository().get_session()
|
||||
|
||||
project = models.Project()
|
||||
project.external_id = "keystone_project_id" + uuid.uuid4().hex
|
||||
project.save(session=session)
|
||||
return project
|
||||
|
||||
def _create_project_store(self, project_id, secret_store_id):
|
||||
proj_store_repo = repos.get_project_secret_store_repository()
|
||||
session = proj_store_repo.get_session()
|
||||
|
||||
proj_model = models.ProjectSecretStore(project_id, secret_store_id)
|
||||
|
||||
proj_s_store = proj_store_repo.create_from(proj_model, session)
|
||||
proj_s_store.save(session=session)
|
||||
return proj_s_store
|
||||
|
||||
def _init_multiple_backends(self, enabled=True, global_default_index=0):
|
||||
|
||||
store_plugin_names = ['store_crypto', 'kmip_plugin', 'store_crypto']
|
||||
crypto_plugin_names = ['p11_crypto', '', 'simple_crypto']
|
||||
|
||||
self.init_via_conf_file(store_plugin_names,
|
||||
crypto_plugin_names, enabled=enabled,
|
||||
global_default_index=global_default_index)
|
||||
|
||||
with mock.patch('barbican.plugin.crypto.p11_crypto.P11CryptoPlugin.'
|
||||
'_create_pkcs11'), \
|
||||
mock.patch('kmip.pie.client.ProxyKmipClient'):
|
||||
|
||||
secret_store.SecretStorePluginManager()
|
||||
|
||||
|
||||
class WhenTestingSecretStores(utils.BarbicanAPIBaseTestCase,
|
||||
SecretStoresMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretStores, self).setUp()
|
||||
self.secret_store_repo = repos.get_secret_stores_repository()
|
||||
|
||||
def test_should_get_all_secret_stores(self):
|
||||
|
||||
g_index = 2 # global default index in plugins list
|
||||
self._init_multiple_backends(global_default_index=g_index)
|
||||
|
||||
resp = self.app.get('/secret-stores', expect_errors=False)
|
||||
self.assertEqual(200, resp.status_int)
|
||||
secret_stores_data = resp.json.get('secret_stores')
|
||||
|
||||
self.assertEqual(3, len(secret_stores_data))
|
||||
|
||||
for i, secret_data in enumerate(secret_stores_data):
|
||||
self.assertEqual(i == g_index, secret_data['global_default'])
|
||||
self.assertIsNotNone(secret_data['secret_store_ref'])
|
||||
self.assertIsNone(secret_data.get('id'))
|
||||
self.assertIsNone(secret_data.get('secret_store_id'))
|
||||
self.assertIsNotNone(secret_data['name'])
|
||||
self.assertIsNotNone(secret_data['secret_store_plugin'])
|
||||
self.assertIsNotNone(secret_data['created'])
|
||||
self.assertIsNotNone(secret_data['updated'])
|
||||
self.assertEqual(models.States.ACTIVE, secret_data['status'])
|
||||
|
||||
def test_get_all_secret_stores_when_multiple_backends_not_enabled(self):
|
||||
|
||||
self._init_multiple_backends(enabled=False)
|
||||
|
||||
resp = self.app.get('/secret-stores', expect_errors=True)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
resp = self.app.get('/secret-stores/any_valid_id',
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def test_get_all_secret_stores_with_unsupported_http_method(self):
|
||||
|
||||
self._init_multiple_backends()
|
||||
|
||||
resp = self.app.put('/secret-stores', expect_errors=True)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
resp = self.app.patch('/secret-stores', expect_errors=True)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
def test_should_get_global_default(self):
|
||||
|
||||
self._init_multiple_backends(global_default_index=1)
|
||||
|
||||
resp = self.app.get('/secret-stores/global-default',
|
||||
expect_errors=False)
|
||||
self.assertEqual(200, resp.status_int)
|
||||
resp_data = resp.json
|
||||
self.assertEqual(True, resp_data['global_default'])
|
||||
self.assertIn('kmip', resp_data['name'].lower())
|
||||
self.assertIsNotNone(resp_data['secret_store_ref'])
|
||||
self.assertIsNotNone(resp_data['secret_store_plugin'])
|
||||
self.assertIsNone(resp_data['crypto_plugin'])
|
||||
self.assertIsNotNone(resp_data['created'])
|
||||
self.assertIsNotNone(resp_data['updated'])
|
||||
self.assertEqual(models.States.ACTIVE, resp_data['status'])
|
||||
|
||||
def test_get_global_default_when_multiple_backends_not_enabled(self):
|
||||
|
||||
self._init_multiple_backends(enabled=False)
|
||||
|
||||
with mock.patch('barbican.common.resources.'
|
||||
'get_or_create_project') as m1:
|
||||
|
||||
resp = self.app.get('/secret-stores/global-default',
|
||||
expect_errors=True)
|
||||
|
||||
self.assertFalse(m1.called)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def test_get_preferred_when_preferred_is_set(self):
|
||||
self._init_multiple_backends(global_default_index=1)
|
||||
|
||||
secret_stores = self.secret_store_repo.get_all()
|
||||
project1 = self._create_project()
|
||||
|
||||
self._create_project_store(project1.id, secret_stores[0].id)
|
||||
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(project1.external_id)
|
||||
}
|
||||
resp = self.app.get('/secret-stores/preferred',
|
||||
expect_errors=False)
|
||||
self.assertEqual(200, resp.status_int)
|
||||
resp_data = resp.json
|
||||
self.assertEqual(secret_stores[0].name, resp_data['name'])
|
||||
self.assertEqual(secret_stores[0].global_default,
|
||||
resp_data['global_default'])
|
||||
self.assertIn('/secret-stores/{0}'.format(secret_stores[0].id),
|
||||
resp_data['secret_store_ref'])
|
||||
self.assertIsNotNone(resp_data['created'])
|
||||
self.assertIsNotNone(resp_data['updated'])
|
||||
self.assertEqual(models.States.ACTIVE, resp_data['status'])
|
||||
|
||||
def test_get_preferred_when_preferred_is_not_set(self):
|
||||
self._init_multiple_backends(global_default_index=1)
|
||||
project1 = self._create_project()
|
||||
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(project1.external_id)
|
||||
}
|
||||
resp = self.app.get('/secret-stores/preferred',
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def test_get_preferred_when_multiple_backends_not_enabled(self):
|
||||
|
||||
self._init_multiple_backends(enabled=False)
|
||||
|
||||
with mock.patch('barbican.common.resources.'
|
||||
'get_or_create_project') as m1:
|
||||
|
||||
resp = self.app.get('/secret-stores/preferred',
|
||||
expect_errors=True)
|
||||
|
||||
self.assertFalse(m1.called)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
|
||||
class WhenTestingSecretStore(utils.BarbicanAPIBaseTestCase,
|
||||
SecretStoresMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretStore, self).setUp()
|
||||
self.secret_store_repo = repos.get_secret_stores_repository()
|
||||
|
||||
def test_get_a_secret_store_when_no_error(self):
|
||||
|
||||
self._init_multiple_backends()
|
||||
|
||||
secret_stores = self.secret_store_repo.get_all()
|
||||
|
||||
store = secret_stores[0]
|
||||
|
||||
resp = self.app.get('/secret-stores/{0}'.format(store.id),
|
||||
expect_errors=False)
|
||||
self.assertEqual(200, resp.status_int)
|
||||
data = resp.json
|
||||
self.assertEqual(store.global_default, data['global_default'])
|
||||
self.assertEqual(store.name, data['name'])
|
||||
self.assertIn('/secret-stores/{0}'.format(store.id),
|
||||
data['secret_store_ref'])
|
||||
self.assertIsNotNone(data['secret_store_plugin'])
|
||||
self.assertIsNotNone(data['created'])
|
||||
self.assertIsNotNone(data['updated'])
|
||||
self.assertEqual(models.States.ACTIVE, data['status'])
|
||||
|
||||
def test_invalid_uri_for_secret_stores_subresource(self):
|
||||
self._init_multiple_backends()
|
||||
|
||||
resp = self.app.get('/secret-stores/invalid_uri',
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def test_get_a_secret_store_with_unsupported_http_method(self):
|
||||
self._init_multiple_backends()
|
||||
|
||||
secret_stores = self.secret_store_repo.get_all()
|
||||
store_id = secret_stores[0].id
|
||||
|
||||
resp = self.app.put('/secret-stores/{0}'.format(store_id),
|
||||
expect_errors=True)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
def test_invalid_uri_for_a_secret_store_subresource(self):
|
||||
self._init_multiple_backends()
|
||||
|
||||
secret_stores = self.secret_store_repo.get_all()
|
||||
resp = self.app.get('/secret-stores/{0}/invalid_uri'.
|
||||
format(secret_stores[0].id), expect_errors=True)
|
||||
self.assertEqual(405, resp.status_int)
|
||||
|
||||
|
||||
class WhenTestingProjectSecretStore(utils.BarbicanAPIBaseTestCase,
|
||||
SecretStoresMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(WhenTestingProjectSecretStore, self).setUp()
|
||||
self.secret_store_repo = repos.get_secret_stores_repository()
|
||||
self.proj_store_repo = repos.get_project_secret_store_repository()
|
||||
|
||||
def test_set_a_preferred_secret_store_when_no_error(self):
|
||||
|
||||
self._init_multiple_backends()
|
||||
|
||||
stores = self.secret_store_repo.get_all()
|
||||
|
||||
proj_external_id = uuid.uuid4().hex
|
||||
# get ids as secret store are not bound to session after a rest call.
|
||||
store_ids = [store.id for store in stores]
|
||||
for store_id in store_ids:
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(proj_external_id)
|
||||
}
|
||||
resp = self.app.post('/secret-stores/{0}/preferred'.
|
||||
format(store_id), expect_errors=False)
|
||||
self.assertEqual(204, resp.status_int)
|
||||
|
||||
# Now make sure preferred store is set to store id via get call
|
||||
resp = self.app.get('/secret-stores/preferred')
|
||||
self.assertIn(store_id, resp.json['secret_store_ref'])
|
||||
|
||||
def test_unset_a_preferred_secret_store_when_no_error(self):
|
||||
|
||||
self._init_multiple_backends()
|
||||
|
||||
stores = self.secret_store_repo.get_all()
|
||||
|
||||
proj_external_id = uuid.uuid4().hex
|
||||
# get ids as secret store are not bound to session after a rest call.
|
||||
store_ids = [store.id for store in stores]
|
||||
for store_id in store_ids:
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(proj_external_id)
|
||||
}
|
||||
resp = self.app.post('/secret-stores/{0}/preferred'.
|
||||
format(store_id), expect_errors=False)
|
||||
self.assertEqual(204, resp.status_int)
|
||||
|
||||
# unset preferred store here
|
||||
resp = self.app.delete('/secret-stores/{0}/preferred'.
|
||||
format(store_id), expect_errors=False)
|
||||
self.assertEqual(204, resp.status_int)
|
||||
|
||||
# Now make sure that there is no longer a preferred store set
|
||||
resp = self.app.get('/secret-stores/preferred',
|
||||
expect_errors=True)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def test_unset_a_preferred_store_when_not_found_error(self):
|
||||
self._init_multiple_backends()
|
||||
|
||||
stores = self.secret_store_repo.get_all()
|
||||
proj_external_id = uuid.uuid4().hex
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(proj_external_id)
|
||||
}
|
||||
resp = self.app.delete('/secret-stores/{0}/preferred'.
|
||||
format(stores[0].id), expect_errors=True)
|
||||
self.assertEqual(404, resp.status_int)
|
||||
|
||||
def test_preferred_secret_store_call_with_unsupported_http_method(self):
|
||||
self._init_multiple_backends()
|
||||
|
||||
secret_stores = self.secret_store_repo.get_all()
|
||||
store_id = secret_stores[0].id
|
||||
|
||||
proj_external_id = uuid.uuid4().hex
|
||||
|
||||
self.app.extra_environ = {
|
||||
'barbican.context': self._build_context(proj_external_id)
|
||||
}
|
||||
resp = self.app.put('/secret-stores/{0}/preferred'.
|
||||
format(store_id), expect_errors=True)
|
||||
|
||||
self.assertEqual(405, resp.status_int)
|
@ -27,6 +27,7 @@ from barbican.api.controllers import consumers
|
||||
from barbican.api.controllers import containers
|
||||
from barbican.api.controllers import orders
|
||||
from barbican.api.controllers import secrets
|
||||
from barbican.api.controllers import secretstores
|
||||
from barbican.api.controllers import versions
|
||||
from barbican.common import config
|
||||
from barbican import context
|
||||
@ -101,6 +102,18 @@ class ConsumerResource(TestableResource):
|
||||
controller_cls = consumers.ContainerConsumerController
|
||||
|
||||
|
||||
class SecretStoresResource(TestableResource):
|
||||
controller_cls = secretstores.SecretStoresController
|
||||
|
||||
|
||||
class SecretStoreResource(TestableResource):
|
||||
controller_cls = secretstores.SecretStoreController
|
||||
|
||||
|
||||
class PreferredSecretStoreResource(TestableResource):
|
||||
controller_cls = secretstores.PreferredSecretStoreController
|
||||
|
||||
|
||||
class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
|
||||
|
||||
def setUp(self):
|
||||
@ -1115,3 +1128,151 @@ class WhenTestingConsumerResource(BaseTestCase):
|
||||
|
||||
def _invoke_on_get(self):
|
||||
self.resource.on_get(self.req, self.resp)
|
||||
|
||||
|
||||
class WhenTestingSecretStoresResource(BaseTestCase):
|
||||
"""RBAC tests for the barbican.api.resources.SecretStoresResource class."""
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretStoresResource, self).setUp()
|
||||
|
||||
self.external_project_id = '12345project'
|
||||
|
||||
self.moc_enable_patcher = mock.patch(
|
||||
'barbican.common.utils.is_multiple_backends_enabled')
|
||||
enable_check_method = self.moc_enable_patcher.start()
|
||||
enable_check_method.return_value = True
|
||||
self.addCleanup(self.moc_enable_patcher.stop)
|
||||
|
||||
# Force an error on GET calls that pass RBAC, as we are not testing
|
||||
# such flows in this test module.
|
||||
self.project_repo = mock.MagicMock()
|
||||
fail_method = mock.MagicMock(return_value=None,
|
||||
side_effect=self._generate_get_error())
|
||||
self.project_repo.find_by_external_project_id = fail_method
|
||||
self.setup_project_repository_mock(self.project_repo)
|
||||
|
||||
self.resource = SecretStoresResource()
|
||||
|
||||
def test_rules_should_be_loaded(self):
|
||||
self.assertIsNotNone(self.policy_enforcer.rules)
|
||||
|
||||
def test_should_pass_get_all_secret_stores(self):
|
||||
self._assert_pass_rbac(['admin'],
|
||||
self._invoke_on_get)
|
||||
|
||||
def test_should_raise_get_all_secret_stores(self):
|
||||
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
||||
self._invoke_on_get)
|
||||
|
||||
def test_should_pass_get_global_default(self):
|
||||
self._assert_pass_rbac(['admin'],
|
||||
self._invoke_get_global_default)
|
||||
|
||||
def test_should_raise_get_global_default(self):
|
||||
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
||||
self._invoke_get_global_default)
|
||||
|
||||
def test_should_pass_get_preferred(self):
|
||||
self._assert_pass_rbac(['admin'],
|
||||
self._invoke_get_preferred)
|
||||
|
||||
def test_should_raise_get_preferred(self):
|
||||
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
||||
self._invoke_get_preferred)
|
||||
|
||||
def _invoke_on_get(self):
|
||||
self.resource.on_get(self.req, self.resp)
|
||||
|
||||
def _invoke_get_global_default(self):
|
||||
with mock.patch('pecan.request', self.req):
|
||||
with mock.patch('pecan.response', self.resp):
|
||||
return self.resource.controller.get_global_default()
|
||||
|
||||
def _invoke_get_preferred(self):
|
||||
with mock.patch('pecan.request', self.req):
|
||||
with mock.patch('pecan.response', self.resp):
|
||||
return self.resource.controller.get_preferred()
|
||||
|
||||
|
||||
class WhenTestingSecretStoreResource(BaseTestCase):
|
||||
"""RBAC tests for the barbican.api.resources.SecretStoreResource class."""
|
||||
def setUp(self):
|
||||
super(WhenTestingSecretStoreResource, self).setUp()
|
||||
|
||||
self.external_project_id = '12345project'
|
||||
self.store_id = '123456SecretStoreId'
|
||||
|
||||
self.moc_enable_patcher = mock.patch(
|
||||
'barbican.common.utils.is_multiple_backends_enabled')
|
||||
enable_check_method = self.moc_enable_patcher.start()
|
||||
enable_check_method.return_value = True
|
||||
self.addCleanup(self.moc_enable_patcher.stop)
|
||||
|
||||
# Force an error on GET calls that pass RBAC, as we are not testing
|
||||
# such flows in this test module.
|
||||
|
||||
self.project_repo = mock.MagicMock()
|
||||
fail_method = mock.MagicMock(return_value=None,
|
||||
side_effect=self._generate_get_error())
|
||||
|
||||
self.project_repo.find_by_external_project_id = fail_method
|
||||
self.setup_project_repository_mock(self.project_repo)
|
||||
|
||||
secret_store_res = mock.MagicMock()
|
||||
secret_store_res.to_dict_fields = mock.MagicMock(side_effect=IOError)
|
||||
secret_store_res.id = self.store_id
|
||||
|
||||
self.resource = SecretStoreResource(secret_store_res)
|
||||
|
||||
def test_rules_should_be_loaded(self):
|
||||
self.assertIsNotNone(self.policy_enforcer.rules)
|
||||
|
||||
def test_should_pass_get_a_secret_store(self):
|
||||
self._assert_pass_rbac(['admin'],
|
||||
self._invoke_on_get)
|
||||
|
||||
def test_should_raise_get_a_secret_store(self):
|
||||
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
||||
self._invoke_on_get)
|
||||
|
||||
def _invoke_on_get(self):
|
||||
self.resource.on_get(self.req, self.resp)
|
||||
|
||||
|
||||
class WhenTestingPreferredSecretStoreResource(BaseTestCase):
|
||||
"""RBAC tests for barbican.api.resources.PreferredSecretStoreResource"""
|
||||
def setUp(self):
|
||||
super(WhenTestingPreferredSecretStoreResource, self).setUp()
|
||||
|
||||
self.external_project_id = '12345project'
|
||||
self.store_id = '123456SecretStoreId'
|
||||
|
||||
self.moc_enable_patcher = mock.patch(
|
||||
'barbican.common.utils.is_multiple_backends_enabled')
|
||||
enable_check_method = self.moc_enable_patcher.start()
|
||||
enable_check_method.return_value = True
|
||||
self.addCleanup(self.moc_enable_patcher.stop)
|
||||
|
||||
# Force an error on POST/DELETE calls that pass RBAC, as we are not
|
||||
# testing such flows in this test module.
|
||||
self.project_repo = mock.MagicMock()
|
||||
fail_method = mock.MagicMock(return_value=None,
|
||||
side_effect=self._generate_get_error())
|
||||
self.project_repo.find_by_external_project_id = fail_method
|
||||
self.setup_project_repository_mock(self.project_repo)
|
||||
|
||||
self.resource = PreferredSecretStoreResource(mock.MagicMock())
|
||||
|
||||
def test_rules_should_be_loaded(self):
|
||||
self.assertIsNotNone(self.policy_enforcer.rules)
|
||||
|
||||
def test_should_pass_set_preferred_secret_store(self):
|
||||
self._assert_pass_rbac(['admin'],
|
||||
self._invoke_on_post)
|
||||
|
||||
def test_should_raise_set_preferred_secret_store(self):
|
||||
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
||||
self._invoke_on_post)
|
||||
|
||||
def _invoke_on_post(self):
|
||||
self.resource.on_post(self.req, self.resp)
|
||||
|
@ -302,6 +302,32 @@ class MockModelRepositoryMixin(object):
|
||||
mock_repo_obj=mock_preferred_ca_repo,
|
||||
patcher_obj=self.mock_preferred_ca_repo_patcher)
|
||||
|
||||
def setup_secret_stores_repository_mock(
|
||||
self, mock_secret_stores_repo=mock.MagicMock()):
|
||||
"""Mocks the project repository factory function
|
||||
|
||||
:param mock_secret_stores_repo: The pre-configured mock secret stores
|
||||
repo to be returned.
|
||||
"""
|
||||
self.mock_secret_stores_repo_patcher = None
|
||||
self._setup_repository_mock(
|
||||
repo_factory='get_secret_stores_repository',
|
||||
mock_repo_obj=mock_secret_stores_repo,
|
||||
patcher_obj=self.mock_secret_stores_repo_patcher)
|
||||
|
||||
def setup_project_secret_store_repository_mock(
|
||||
self, mock_project_secret_store_repo=mock.MagicMock()):
|
||||
"""Mocks the project repository factory function
|
||||
|
||||
:param mock_project_secret_store_repo: The pre-configured mock project
|
||||
secret store repo to be returned.
|
||||
"""
|
||||
self.mock_proj_secret_store_repo_patcher = None
|
||||
self._setup_repository_mock(
|
||||
repo_factory='get_project_secret_store_repository',
|
||||
mock_repo_obj=mock_project_secret_store_repo,
|
||||
patcher_obj=self.mock_proj_secret_store_repo_patcher)
|
||||
|
||||
def setup_project_ca_repository_mock(
|
||||
self, mock_project_ca_repo=mock.MagicMock()):
|
||||
"""Mocks the project repository factory function
|
||||
|
@ -43,7 +43,7 @@ Request/Response:
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"secret-stores":[
|
||||
"secret_stores":[
|
||||
{
|
||||
"status": "ACTIVE",
|
||||
"updated": "2016-08-22T23:46:45.114283",
|
||||
@ -85,7 +85,7 @@ Response Attributes
|
||||
+---------------+--------+---------------------------------------------+
|
||||
| Name | Type | Description |
|
||||
+===============+========+=============================================+
|
||||
| secret-stores | list | A list of secret store references |
|
||||
| secret_stores | list | A list of secret store references |
|
||||
+---------------+--------+---------------------------------------------+
|
||||
| name | string | store and crypto plugin name delimited by + |
|
||||
| | | (plus) sign. |
|
||||
|
@ -80,5 +80,11 @@
|
||||
"secret_meta:get": "rule:all_but_audit",
|
||||
"secret_meta:post": "rule:admin_or_creator",
|
||||
"secret_meta:put": "rule:admin_or_creator",
|
||||
"secret_meta:delete": "rule:admin_or_creator"
|
||||
"secret_meta:delete": "rule:admin_or_creator",
|
||||
"secretstores:get": "rule:admin",
|
||||
"secretstores:get_global_default": "rule:admin",
|
||||
"secretstores:get_preferred": "rule:admin",
|
||||
"secretstore_preferred:post": "rule:admin",
|
||||
"secretstore_preferred:delete": "rule:admin",
|
||||
"secretstore:get": "rule:admin"
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user