1393 lines
59 KiB
Python
1393 lines
59 KiB
Python
# Copyright (c) 2013-2014 Rackspace, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
This test module focuses on RBAC interactions with the API resource classes.
|
|
For typical-flow business logic tests of these classes, see the
|
|
'resources_test.py' module.
|
|
"""
|
|
import os
|
|
from unittest import mock
|
|
|
|
from webob import exc
|
|
|
|
from barbican.api.controllers import consumers
|
|
from barbican.api.controllers import containers
|
|
from barbican.api.controllers import orders
|
|
from barbican.api.controllers import secrets
|
|
from barbican.api.controllers import secretstores
|
|
from barbican.api.controllers import versions
|
|
from barbican.common import accept as common_accept
|
|
from barbican.common import config
|
|
from barbican.common import policy
|
|
from barbican import context
|
|
from barbican.model import models
|
|
from barbican.tests import utils
|
|
|
|
|
|
# Point to the policy.json file located in source control.
|
|
TEST_VAR_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
|
'../../../etc', 'barbican'))
|
|
|
|
CONF = config.new_config()
|
|
|
|
policy.init()
|
|
ENFORCER = policy.ENFORCER
|
|
|
|
|
|
class TestableResource(object):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.controller = self.controller_cls(*args, **kwargs)
|
|
|
|
def on_get(self, req, resp, *args, **kwargs):
|
|
with mock.patch('pecan.request', req):
|
|
with mock.patch('pecan.response', resp):
|
|
return self.controller.on_get(*args, **kwargs)
|
|
|
|
def on_post(self, req, resp, *args, **kwargs):
|
|
with mock.patch('pecan.request', req):
|
|
with mock.patch('pecan.response', resp):
|
|
return self.controller.on_post(*args, **kwargs)
|
|
|
|
def on_put(self, req, resp, *args, **kwargs):
|
|
with mock.patch('pecan.request', req):
|
|
with mock.patch('pecan.response', resp):
|
|
return self.controller.on_put(*args, **kwargs)
|
|
|
|
def on_delete(self, req, resp, *args, **kwargs):
|
|
with mock.patch('pecan.request', req):
|
|
with mock.patch('pecan.response', resp):
|
|
return self.controller.on_delete(*args, **kwargs)
|
|
|
|
|
|
class VersionsResource(TestableResource):
|
|
controller_cls = versions.VersionsController
|
|
|
|
|
|
class SecretsResource(TestableResource):
|
|
controller_cls = secrets.SecretsController
|
|
|
|
|
|
class SecretResource(TestableResource):
|
|
controller_cls = secrets.SecretController
|
|
|
|
|
|
class OrdersResource(TestableResource):
|
|
controller_cls = orders.OrdersController
|
|
|
|
|
|
class OrderResource(TestableResource):
|
|
controller_cls = orders.OrderController
|
|
|
|
|
|
class ContainerResource(TestableResource):
|
|
controller_cls = containers.ContainerController
|
|
|
|
|
|
class ConsumersResource(TestableResource):
|
|
controller_cls = consumers.ContainerConsumersController
|
|
|
|
|
|
class ConsumerResource(TestableResource):
|
|
controller_cls = consumers.ContainerConsumerController
|
|
|
|
|
|
class SecretStoresResource(TestableResource):
|
|
controller_cls = secretstores.SecretStoresController
|
|
|
|
|
|
class SecretStoreResource(TestableResource):
|
|
controller_cls = secretstores.SecretStoreController
|
|
|
|
|
|
class PreferredSecretStoreResource(TestableResource):
|
|
controller_cls = secretstores.PreferredSecretStoreController
|
|
|
|
|
|
class SecretConsumersResource(TestableResource):
|
|
controller_cls = consumers.SecretConsumersController
|
|
|
|
|
|
class SecretConsumerResource(TestableResource):
|
|
controller_cls = consumers.SecretConsumerController
|
|
|
|
|
|
class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
|
|
|
|
def setUp(self):
|
|
super(BaseTestCase, self).setUp()
|
|
CONF(args=['--config-dir', TEST_VAR_DIR])
|
|
self.policy_enforcer = ENFORCER
|
|
self.policy_enforcer.load_rules(True)
|
|
self.resp = mock.MagicMock()
|
|
|
|
def _generate_req(self, roles=None, accept=None, content_type=None,
|
|
user_id=None, project_id=None):
|
|
"""Generate a fake HTTP request with security context added to it."""
|
|
req = mock.MagicMock()
|
|
req.get_param.return_value = None
|
|
|
|
kwargs = {
|
|
'user_id': user_id,
|
|
'project_id': project_id,
|
|
'roles': roles or [],
|
|
'policy_enforcer': self.policy_enforcer,
|
|
}
|
|
req.environ = {}
|
|
req.environ['barbican.context'] = context.RequestContext(**kwargs)
|
|
req.content_type = content_type
|
|
req.accept = common_accept.create_accept_header(accept)
|
|
|
|
return req
|
|
|
|
def _generate_stream_for_exit(self):
|
|
"""Mock HTTP stream generator, to force RBAC-pass exit.
|
|
|
|
Generate a fake HTTP request stream that forces an IOError to
|
|
occur, which short circuits API resource processing when RBAC
|
|
checks under test here pass.
|
|
"""
|
|
stream = mock.MagicMock()
|
|
read = mock.MagicMock(return_value=None, side_effect=IOError())
|
|
stream.read = read
|
|
return stream
|
|
|
|
def _assert_post_rbac_exception(self, exception, role):
|
|
"""Assert that we received the expected RBAC-passed exception."""
|
|
self.assertEqual(500, exception.status_int)
|
|
|
|
def _generate_get_error(self):
|
|
"""Falcon exception generator to throw from early-exit mocks.
|
|
|
|
Creates an exception that should be raised by GET tests that pass
|
|
RBAC. This allows such flows to short-circuit normal post-RBAC
|
|
processing that is not tested in this module.
|
|
|
|
:return: Python exception that should be raised by repo get methods.
|
|
"""
|
|
# The 'Read Error' clause needs to match that asserted in
|
|
# _assert_post_rbac_exception() above.
|
|
return exc.HTTPServerError(detail='Read Error')
|
|
|
|
def _assert_pass_rbac(self, roles, method_under_test, accept=None,
|
|
content_type=None, user_id=None, project_id=None):
|
|
"""Assert that RBAC authorization rules passed for the specified roles.
|
|
|
|
:param roles: List of roles to check, one at a time
|
|
:param method_under_test: The test method to invoke for each role.
|
|
:param accept Optional Accept header to set on the HTTP request
|
|
:return: None
|
|
"""
|
|
for role in roles:
|
|
self.req = self._generate_req(roles=[role] if role else [],
|
|
accept=accept,
|
|
content_type=content_type,
|
|
user_id=user_id,
|
|
project_id=project_id)
|
|
|
|
# Force an exception early past the RBAC passing.
|
|
type(self.req).body = mock.PropertyMock(side_effect=IOError)
|
|
self.req.body_file = self._generate_stream_for_exit()
|
|
exception = self.assertRaises(exc.HTTPServerError,
|
|
method_under_test)
|
|
self._assert_post_rbac_exception(exception, role)
|
|
|
|
def _assert_fail_rbac(self, roles, method_under_test, accept=None,
|
|
content_type=None, user_id=None, project_id=None):
|
|
"""Assert that RBAC rules failed for one of the specified roles.
|
|
|
|
:param roles: List of roles to check, one at a time
|
|
:param method_under_test: The test method to invoke for each role.
|
|
:param accept Optional Accept header to set on the HTTP request
|
|
:return: None
|
|
"""
|
|
for role in roles:
|
|
self.req = self._generate_req(roles=[role] if role else [],
|
|
accept=accept,
|
|
content_type=content_type,
|
|
user_id=user_id,
|
|
project_id=project_id)
|
|
|
|
exception = self.assertRaises(exc.HTTPForbidden, method_under_test)
|
|
self.assertEqual(403, exception.status_int)
|
|
|
|
|
|
class WhenTestingVersionsResource(BaseTestCase):
|
|
"""RBAC tests for the barbican.api.resources.VersionsResource class."""
|
|
def setUp(self):
|
|
super(WhenTestingVersionsResource, self).setUp()
|
|
|
|
self.resource = VersionsResource()
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_get_versions(self):
|
|
# Can't use base method that short circuits post-RBAC processing here,
|
|
# as version GET is trivial
|
|
for role in ['admin', 'observer', 'creator', 'audit']:
|
|
self.req = self._generate_req(roles=[role] if role else [])
|
|
self._invoke_on_get()
|
|
|
|
def test_should_pass_get_versions_with_bad_roles(self):
|
|
self.req = self._generate_req(roles=[None, 'bunkrolehere'])
|
|
self._invoke_on_get()
|
|
|
|
def test_should_pass_get_versions_with_no_roles(self):
|
|
self.req = self._generate_req()
|
|
self._invoke_on_get()
|
|
|
|
def test_should_pass_get_versions_multiple_roles(self):
|
|
self.req = self._generate_req(roles=['admin', 'observer', 'creator',
|
|
'audit'])
|
|
self._invoke_on_get()
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingSecretsResource(BaseTestCase):
|
|
"""RBAC tests for the barbican.api.resources.SecretsResource class."""
|
|
def setUp(self):
|
|
super(WhenTestingSecretsResource, self).setUp()
|
|
|
|
self.external_project_id = '12345'
|
|
|
|
# Force an error on GET calls that pass RBAC, as we are not testing
|
|
# such flows in this test module.
|
|
self.secret_repo = mock.MagicMock()
|
|
get_by_create_date = mock.MagicMock(return_value=None,
|
|
side_effect=self
|
|
._generate_get_error())
|
|
self.secret_repo.get_by_create_date = get_by_create_date
|
|
self.setup_secret_repository_mock(self.secret_repo)
|
|
|
|
self.setup_encrypted_datum_repository_mock()
|
|
self.setup_kek_datum_repository_mock()
|
|
self.setup_project_repository_mock()
|
|
self.setup_secret_meta_repository_mock()
|
|
self.setup_transport_key_repository_mock()
|
|
|
|
self.resource = SecretsResource()
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_create_secret(self):
|
|
self._assert_pass_rbac(['admin', 'creator'], self._invoke_on_post,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_create_secret(self):
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'bogus'],
|
|
self._invoke_on_post,
|
|
content_type='application/json')
|
|
|
|
def test_should_pass_get_secrets(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator'],
|
|
self._invoke_on_get,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_get_secrets(self):
|
|
self._assert_fail_rbac([None, 'audit', 'bogus'],
|
|
self._invoke_on_get,
|
|
content_type='application/json')
|
|
|
|
def _invoke_on_post(self):
|
|
self.resource.on_post(self.req, self.resp)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingSecretResource(BaseTestCase):
|
|
"""RBAC tests for SecretController class."""
|
|
|
|
def setUp(self):
|
|
super(WhenTestingSecretResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.secret_id = '12345secret'
|
|
self.user_id = '123456user'
|
|
self.creator_user_id = '123456CreatorUser'
|
|
|
|
# Force an error on GET and DELETE calls that pass RBAC,
|
|
# as we are not testing such flows in this test module.
|
|
self.secret_repo = mock.MagicMock()
|
|
fail_method = mock.MagicMock(return_value=None,
|
|
side_effect=self._generate_get_error())
|
|
self.secret_repo.get = fail_method
|
|
self.secret_repo.delete_entity_by_id = fail_method
|
|
self.setup_secret_repository_mock(self.secret_repo)
|
|
|
|
self.setup_encrypted_datum_repository_mock()
|
|
self.setup_kek_datum_repository_mock()
|
|
self.setup_project_repository_mock()
|
|
self.setup_secret_meta_repository_mock()
|
|
self.setup_transport_key_repository_mock()
|
|
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=True,
|
|
user_ids=[self.user_id, 'anyRandomId'])
|
|
self.acl_list = [acl_read]
|
|
secret = mock.MagicMock()
|
|
secret.secret_acls.__iter__.return_value = self.acl_list
|
|
secret.project.external_id = self.external_project_id
|
|
secret.creator_id = self.creator_user_id
|
|
|
|
self.resource = SecretResource(secret)
|
|
|
|
# self.resource.controller.get_acl_tuple = mock.MagicMock(
|
|
# return_value=(None, None))
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_decrypt_secret(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_decrypt_secret(self):
|
|
|
|
self._assert_fail_rbac([None, 'audit', 'bogus'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype')
|
|
|
|
def test_should_pass_decrypt_secret_for_same_project_with_no_acl(self):
|
|
"""Token and secret project needs to be same in no ACL defined case."""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_decrypt_secret_with_project_access_disabled(self):
|
|
"""Should raise authz error as secret is marked private.
|
|
|
|
As secret is private so project users should not be able to access
|
|
the secret. Admin project user can still access it.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=False,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_fail_rbac(['observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_pass_decrypt_secret_for_admin_user_project_access_disabled(self):
|
|
"""Should pass authz for admin role user as secret is marked private.
|
|
|
|
Even when secret is private, admin user should still have access to
|
|
the secret.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=False,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_pass_rbac(['admin'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_decrypt_secret_for_with_project_access_nolist(self):
|
|
"""Should raise authz error as secret is marked private.
|
|
|
|
As secret is private so project users should not be able to access
|
|
the secret. This test passes user_ids as empty list, which is a
|
|
valid and common case. Admin project user can still access it.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=False,
|
|
user_ids=[])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_fail_rbac(['observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_decrypt_secret_private_enabled_with_read_acl(self):
|
|
"""Should pass authz as user has read acl for private secret.
|
|
|
|
Even though secret is private, user with read acl should be able to
|
|
access the secret.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=False,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
|
|
'bogusRole'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id='aclUser1',
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_decrypt_secret_different_user_valid_read_acl(self):
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=True,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
# token project_id is different from secret's project id but another
|
|
# user (from different project) has read acl for secret so should pass
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
|
|
'bogusRole'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id='aclUser1',
|
|
project_id='different_project_id')
|
|
|
|
def test_should_raise_decrypt_secret_for_different_user_no_read_acl(self):
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id,
|
|
operation='write',
|
|
project_access=True,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
# token project_id is different from secret's project id but another
|
|
# user (from different project) has read acl for secret so should pass
|
|
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id='aclUser1',
|
|
project_id='different_project_id')
|
|
|
|
def test_fail_decrypt_secret_for_creator_user_with_different_project(self):
|
|
"""Check for creator user rule for secret decrypt/get call.
|
|
|
|
If token's user is creator of secret but its scoped to different
|
|
project, then he/she is not allowed access to secret when project
|
|
is marked private.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id,
|
|
operation='write',
|
|
project_access=True,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self.resource.controller.secret.creator_id = 'creatorUserX'
|
|
# token user is creator but scoped to project different from secret
|
|
# project so don't allow decrypt secret call to creator of that secret
|
|
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit',
|
|
'bogusRole'],
|
|
self._invoke_on_get,
|
|
accept='notjsonaccepttype',
|
|
content_type='application/json',
|
|
user_id='creatorUserX',
|
|
project_id='different_project_id')
|
|
|
|
def test_should_pass_get_secret(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_get_secret_with_no_context(self):
|
|
"""In unauthenticated flow, get secret should work."""
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get_without_context,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_get_secret_for_different_project_no_acl(self):
|
|
"""Should raise error when secret and token's project is different."""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
# token project_id is different from secret's project id so should fail
|
|
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id='different_id')
|
|
|
|
def test_should_pass_get_secret_for_same_project_but_different_user(self):
|
|
# user id should not matter as long token and secret's project match
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id='different_user_id',
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_get_secret_for_same_project_with_no_acl(self):
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_get_secret_for_with_project_access_disabled(self):
|
|
"""Should raise authz error as secret is marked private.
|
|
|
|
As secret is private so project users should not be able to access
|
|
the secret.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=False,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_fail_rbac(['observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_pass_get_secret_for_admin_user_with_project_access_disabled(self):
|
|
"""Should pass authz for admin user as secret is marked private.
|
|
|
|
Even when secret is private, admin user should have access
|
|
the secret.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=False,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_pass_rbac(['admin'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_get_secret_for_private_enabled_with_read_acl(self):
|
|
"""Should pass authz as user has read acl for private secret.
|
|
|
|
Even though secret is private, user with read acl should be able to
|
|
access the secret.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=False,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
|
|
'bogusRole'],
|
|
self._invoke_on_get,
|
|
user_id='aclUser1',
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_get_secret_different_user_with_valid_read_acl(self):
|
|
"""Should allow when read ACL is defined for a user.
|
|
|
|
Secret's own project and token's project is different but read is
|
|
allowed because of valid read ACL.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
|
|
project_access=True,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
# token project_id is different from secret's project id but another
|
|
# user (from different project) has read acl for secret so should pass
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
|
|
'bogusRole'],
|
|
self._invoke_on_get,
|
|
user_id='aclUser1',
|
|
project_id='different_project_id')
|
|
|
|
def test_should_raise_get_secret_for_different_user_with_no_read_acl(self):
|
|
"""Get secret fails when no read acl is defined.
|
|
|
|
With different secret and token's project, read is not allowed without
|
|
a read ACL.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.SecretACL(secret_id=self.secret_id,
|
|
operation='write',
|
|
project_access=True,
|
|
user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
# token project_id is different from secret's project id but another
|
|
# user (from different project) has read acl for secret so should pass
|
|
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id='aclUser1',
|
|
project_id='different_project_id')
|
|
|
|
def test_fail_get_secret_for_creator_user_with_different_project(self):
|
|
"""Check for creator user rule for secret get call.
|
|
|
|
If token's user is creator of secret but its scoped to different
|
|
project, then he/she is not allowed access to secret when project
|
|
is marked private.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
self.resource.controller.secret.creator_id = 'creatorUserX'
|
|
|
|
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit',
|
|
'bogusRole'],
|
|
self._invoke_on_get,
|
|
user_id='creatorUserX',
|
|
project_id='different_project_id')
|
|
|
|
def test_should_raise_get_secret(self):
|
|
self._assert_fail_rbac([None, 'bogus'],
|
|
self._invoke_on_get)
|
|
|
|
def test_should_pass_put_secret(self):
|
|
self._assert_pass_rbac(['admin', 'creator'], self._invoke_on_put,
|
|
content_type="application/octet-stream",
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_put_secret(self):
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'bogus'],
|
|
self._invoke_on_put,
|
|
content_type="application/octet-stream")
|
|
|
|
def test_should_pass_delete_secret(self):
|
|
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_delete_secret(self):
|
|
"""A non-admin user cannot delete other user's secret.
|
|
|
|
User id is different from initial user who has created the secret.
|
|
"""
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
|
self._invoke_on_delete,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_delete_secret_for_owner(self):
|
|
"""Non-admin user can delete his/her own secret
|
|
|
|
Secret creator_id should match with token user to establish ownership.
|
|
"""
|
|
self._assert_pass_rbac(['creator'], self._invoke_on_delete,
|
|
user_id=self.creator_user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
def _invoke_on_get_without_context(self):
|
|
# Adding this to get code coverage around context check lines
|
|
self.req.environ.pop('barbican.context')
|
|
self.resource.on_get(self.req, self.resp,
|
|
self.external_project_id)
|
|
|
|
def _invoke_on_put(self):
|
|
self.resource.on_put(self.req, self.resp)
|
|
|
|
def _invoke_on_delete(self):
|
|
self.resource.on_delete(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingContainerResource(BaseTestCase):
|
|
"""RBAC tests for ContainerController class.
|
|
|
|
Container controller tests are quite similar to SecretController as
|
|
policy logic is same. Just adding them here to make sure logic related to
|
|
acl gathering data works as expected.
|
|
"""
|
|
|
|
def setUp(self):
|
|
super(WhenTestingContainerResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.container_id = '12345secret'
|
|
self.user_id = '123456user'
|
|
self.creator_user_id = '123456CreatorUser'
|
|
|
|
# Force an error on GET and DELETE calls that pass RBAC,
|
|
# as we are not testing such flows in this test module.
|
|
self.container_repo = mock.MagicMock()
|
|
fail_method = mock.MagicMock(return_value=None,
|
|
side_effect=self._generate_get_error())
|
|
self.container_repo.get = fail_method
|
|
self.container_repo.delete_entity_by_id = fail_method
|
|
|
|
acl_read = models.ContainerACL(
|
|
container_id=self.container_id, operation='read',
|
|
project_access=True, user_ids=[self.user_id, 'anyRandomId'])
|
|
self.acl_list = [acl_read]
|
|
container = mock.MagicMock()
|
|
container.to_dict_fields = mock.MagicMock(side_effect=IOError)
|
|
container.id = self.container_id
|
|
container.container_acls.__iter__.return_value = self.acl_list
|
|
container.project.external_id = self.external_project_id
|
|
container.creator_id = self.creator_user_id
|
|
|
|
self.container_repo.get_container_by_id.return_value = container
|
|
|
|
self.setup_container_repository_mock(self.container_repo)
|
|
|
|
self.resource = ContainerResource(container)
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_get_container(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_get_container_with_no_context(self):
|
|
"""In unauthenticated flow, get container should work."""
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get_without_context,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_get_container_for_different_project_no_acl(self):
|
|
"""Raise error when container and token's project is different."""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
# token project_id is different from secret's project id so should fail
|
|
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id='different_id')
|
|
|
|
def test_should_pass_get_container_for_same_project_but_different_user(
|
|
self):
|
|
"""Should pass if token and secret's project match.
|
|
|
|
User id should not matter as long token and container's project match.
|
|
"""
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id='different_user_id',
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_get_container_for_same_project_with_no_acl(self):
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_get_container_for_with_project_access_disabled(self):
|
|
"""Should raise authz error as container is marked private.
|
|
|
|
As container is private so project users should not be able to access
|
|
the secret (other than admin user).
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.ContainerACL(
|
|
container_id=self.container_id, operation='read',
|
|
project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_fail_rbac(['observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_pass_get_container_for_admin_user_project_access_disabled(self):
|
|
"""Should pass authz for admin user when container is marked private.
|
|
|
|
For private container, admin user should still be able to access
|
|
the secret.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.ContainerACL(
|
|
container_id=self.container_id, operation='read',
|
|
project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_pass_rbac(['admin'],
|
|
self._invoke_on_get,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_get_container_for_private_enabled_with_read_acl(self):
|
|
"""Should pass authz as user has read acl for private container.
|
|
|
|
Even though container is private, user with read acl should be able to
|
|
access the container.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.ContainerACL(
|
|
container_id=self.container_id, operation='read',
|
|
project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
|
|
'bogusRole'],
|
|
self._invoke_on_get,
|
|
user_id='aclUser1',
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_get_container_different_user_with_valid_read_acl(
|
|
self):
|
|
"""Should allow when read ACL is defined for a user.
|
|
|
|
Container's own project and token's project is different but read is
|
|
allowed because of valid read ACL. User can read regardless of what is
|
|
token's project as it has necessary ACL.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.ContainerACL(
|
|
container_id=self.container_id, operation='read',
|
|
project_access=True, user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
|
|
'bogusRole'],
|
|
self._invoke_on_get,
|
|
user_id='aclUser1',
|
|
project_id='different_project_id')
|
|
|
|
def test_should_raise_get_container_for_different_user_with_no_read_acl(
|
|
self):
|
|
"""Get secret fails when no read acl is defined.
|
|
|
|
With different container and token's project, read is not allowed
|
|
without a read ACL.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.ContainerACL(
|
|
container_id=self.container_id, operation='write',
|
|
project_access=True, user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
# token project_id is different from secret's project id but another
|
|
# user (from different project) has read acl for secret so should pass
|
|
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
user_id='aclUser1',
|
|
project_id='different_project_id')
|
|
|
|
def test_fail_get_container_for_creator_user_different_project(self):
|
|
"""Check for creator user rule for container get call.
|
|
|
|
If token's user is creator of container but its scoped to different
|
|
project, then he/she is not allowed access to container when project
|
|
is marked private.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.ContainerACL(
|
|
container_id=self.container_id, operation='read',
|
|
project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_fail_rbac(['creator'],
|
|
self._invoke_on_get,
|
|
user_id=self.creator_user_id,
|
|
project_id='differet_project_id')
|
|
|
|
def test_pass_get_container_for_creator_user_project_access_disabled(self):
|
|
"""Should pass authz for creator user when container is marked private.
|
|
|
|
As container is private so user who created the container can still
|
|
access it as long as user has 'creator' role in container project.
|
|
"""
|
|
self.acl_list.pop() # remove read acl from default setup
|
|
acl_read = models.ContainerACL(
|
|
container_id=self.container_id, operation='read',
|
|
project_access=False, user_ids=['anyRandomUserX', 'aclUser1'])
|
|
self.acl_list.append(acl_read)
|
|
self._assert_pass_rbac(['creator'],
|
|
self._invoke_on_get,
|
|
user_id=self.creator_user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_get_container(self):
|
|
self._assert_fail_rbac([None, 'bogus'],
|
|
self._invoke_on_get)
|
|
|
|
def test_should_pass_delete_container(self):
|
|
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_delete_container(self):
|
|
"""A non-admin user cannot delete other user's container.
|
|
|
|
User id is different from initial user who has created the container.
|
|
"""
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
|
self._invoke_on_delete,
|
|
user_id=self.user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_pass_delete_container_for_owner(self):
|
|
"""Non-admin user can delete his/her own container
|
|
|
|
Container creator_id should match with token user to establish
|
|
ownership.
|
|
"""
|
|
self._assert_pass_rbac(['creator'], self._invoke_on_delete,
|
|
user_id=self.creator_user_id,
|
|
project_id=self.external_project_id)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
def _invoke_on_get_without_context(self):
|
|
# Adding this to get code coverage around context check lines
|
|
self.req.environ.pop('barbican.context')
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
def _invoke_on_put(self):
|
|
self.resource.on_put(self.req, self.resp)
|
|
|
|
def _invoke_on_delete(self):
|
|
self.resource.on_delete(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingOrdersResource(BaseTestCase):
|
|
"""RBAC tests for the barbican.api.resources.OrdersResource class."""
|
|
def setUp(self):
|
|
super(WhenTestingOrdersResource, self).setUp()
|
|
|
|
self.external_project_id = '12345'
|
|
|
|
# Force an error on GET calls that pass RBAC, as we are not testing
|
|
# such flows in this test module.
|
|
self.order_repo = mock.MagicMock()
|
|
get_by_create_date = mock.MagicMock(return_value=None,
|
|
side_effect=self
|
|
._generate_get_error())
|
|
self.order_repo.get_by_create_date = get_by_create_date
|
|
|
|
self.setup_order_repository_mock(self.order_repo)
|
|
self.setup_project_repository_mock()
|
|
|
|
self.resource = OrdersResource(queue_resource=mock.MagicMock())
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_create_order(self):
|
|
self._assert_pass_rbac(['admin', 'creator'], self._invoke_on_post,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_create_order(self):
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'bogus'],
|
|
self._invoke_on_post)
|
|
|
|
def test_should_pass_get_orders(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator'],
|
|
self._invoke_on_get,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_get_orders(self):
|
|
self._assert_fail_rbac([None, 'audit', 'bogus'],
|
|
self._invoke_on_get)
|
|
|
|
def _invoke_on_post(self):
|
|
self.resource.on_post(self.req, self.resp)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingOrderResource(BaseTestCase):
|
|
"""RBAC tests for the barbican.api.resources.OrderResource class."""
|
|
def setUp(self):
|
|
super(WhenTestingOrderResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.order_id = '12345order'
|
|
|
|
order = mock.MagicMock()
|
|
order.id = self.order_id
|
|
order.project.external_id = self.external_project_id
|
|
order.creator_id = 'CRE-A-TOR-UUID'
|
|
|
|
# Force an error on GET and DELETE calls that pass RBAC,
|
|
# as we are not testing such flows in this test module.
|
|
mock_to_dict = mock.MagicMock()
|
|
mock_to_dict.side_effect = self._generate_get_error()
|
|
order.to_dict_fields = mock_to_dict
|
|
self.order_repo = mock.MagicMock()
|
|
fail_method = mock.MagicMock(return_value=None,
|
|
side_effect=self._generate_get_error())
|
|
self.order_repo.get = fail_method
|
|
self.order_repo.delete_entity_by_id = fail_method
|
|
|
|
self.setup_order_repository_mock(self.order_repo)
|
|
|
|
self.resource = OrderResource(order)
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_get_order(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_get_order(self):
|
|
self._assert_fail_rbac([None, 'bogus'],
|
|
self._invoke_on_get)
|
|
|
|
def test_should_pass_delete_order(self):
|
|
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
|
|
project_id=self.external_project_id)
|
|
|
|
def test_should_raise_delete_order(self):
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
|
self._invoke_on_delete)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
def _invoke_on_delete(self):
|
|
self.resource.on_delete(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingConsumersResource(BaseTestCase):
|
|
"""RBAC tests for the barbican.api.resources.ConsumersResource class."""
|
|
def setUp(self):
|
|
super(WhenTestingConsumersResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.container_id = '12345container'
|
|
|
|
# Force an error on GET calls that pass RBAC, as we are not testing
|
|
# such flows in this test module.
|
|
self.consumer_repo = mock.MagicMock()
|
|
get_by_container_id = mock.MagicMock(return_value=None,
|
|
side_effect=self
|
|
._generate_get_error())
|
|
self.consumer_repo.get_by_container_id = get_by_container_id
|
|
|
|
self.setup_project_repository_mock()
|
|
self.setup_container_consumer_repository_mock(self.consumer_repo)
|
|
self.setup_container_repository_mock()
|
|
|
|
self.resource = ConsumersResource(container_id=self.container_id)
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_create_consumer(self):
|
|
self._assert_pass_rbac(['admin'], self._invoke_on_post,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_create_consumer(self):
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
|
self._invoke_on_post,
|
|
content_type='application/json')
|
|
|
|
def test_should_pass_delete_consumer(self):
|
|
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_delete_consumer(self):
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
|
self._invoke_on_delete)
|
|
|
|
def test_should_pass_get_consumers(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_get_consumers(self):
|
|
self._assert_fail_rbac([None, 'bogus'],
|
|
self._invoke_on_get,
|
|
content_type='application/json')
|
|
|
|
def _invoke_on_post(self):
|
|
self.resource.on_post(self.req, self.resp)
|
|
|
|
def _invoke_on_delete(self):
|
|
self.resource.on_delete(self.req, self.resp)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingConsumerResource(BaseTestCase):
|
|
"""RBAC tests for the barbican.api.resources.ConsumerResource class."""
|
|
def setUp(self):
|
|
super(WhenTestingConsumerResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.consumer_id = '12345consumer'
|
|
|
|
# Force an error on GET calls that pass RBAC, as we are not testing
|
|
# such flows in this test module.
|
|
self.consumer_repo = mock.MagicMock()
|
|
fail_method = mock.MagicMock(return_value=None,
|
|
side_effect=self._generate_get_error())
|
|
self.consumer_repo.get = fail_method
|
|
|
|
self.setup_project_repository_mock()
|
|
self.setup_container_consumer_repository_mock(self.consumer_repo)
|
|
self.resource = ConsumerResource(consumer_id=self.consumer_id)
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_get_consumer(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get)
|
|
|
|
def test_should_raise_get_consumer(self):
|
|
self._assert_fail_rbac([None, 'bogus'],
|
|
self._invoke_on_get)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingSecretStoresResource(BaseTestCase):
|
|
"""RBAC tests for the barbican.api.resources.SecretStoresResource class."""
|
|
def setUp(self):
|
|
super(WhenTestingSecretStoresResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
|
|
self.moc_enable_patcher = mock.patch(
|
|
'barbican.common.utils.is_multiple_backends_enabled')
|
|
enable_check_method = self.moc_enable_patcher.start()
|
|
enable_check_method.return_value = True
|
|
self.addCleanup(self.moc_enable_patcher.stop)
|
|
|
|
# Force an error on GET calls that pass RBAC, as we are not testing
|
|
# such flows in this test module.
|
|
self.project_repo = mock.MagicMock()
|
|
fail_method = mock.MagicMock(return_value=None,
|
|
side_effect=self._generate_get_error())
|
|
self.project_repo.find_by_external_project_id = fail_method
|
|
self.setup_project_repository_mock(self.project_repo)
|
|
|
|
self.resource = SecretStoresResource()
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_get_all_secret_stores(self):
|
|
self._assert_pass_rbac(['admin'],
|
|
self._invoke_on_get)
|
|
|
|
def test_should_raise_get_all_secret_stores(self):
|
|
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
|
self._invoke_on_get)
|
|
|
|
def test_should_pass_get_global_default(self):
|
|
self._assert_pass_rbac(['admin'],
|
|
self._invoke_get_global_default)
|
|
|
|
def test_should_raise_get_global_default(self):
|
|
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
|
self._invoke_get_global_default)
|
|
|
|
def test_should_pass_get_preferred(self):
|
|
self._assert_pass_rbac(['admin'],
|
|
self._invoke_get_preferred)
|
|
|
|
def test_should_raise_get_preferred(self):
|
|
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
|
self._invoke_get_preferred)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
def _invoke_get_global_default(self):
|
|
with mock.patch('pecan.request', self.req):
|
|
with mock.patch('pecan.response', self.resp):
|
|
return self.resource.controller.get_global_default()
|
|
|
|
def _invoke_get_preferred(self):
|
|
with mock.patch('pecan.request', self.req):
|
|
with mock.patch('pecan.response', self.resp):
|
|
return self.resource.controller.get_preferred()
|
|
|
|
|
|
class WhenTestingSecretStoreResource(BaseTestCase):
|
|
"""RBAC tests for the barbican.api.resources.SecretStoreResource class."""
|
|
def setUp(self):
|
|
super(WhenTestingSecretStoreResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.store_id = '123456SecretStoreId'
|
|
|
|
self.moc_enable_patcher = mock.patch(
|
|
'barbican.common.utils.is_multiple_backends_enabled')
|
|
enable_check_method = self.moc_enable_patcher.start()
|
|
enable_check_method.return_value = True
|
|
self.addCleanup(self.moc_enable_patcher.stop)
|
|
|
|
# Force an error on GET calls that pass RBAC, as we are not testing
|
|
# such flows in this test module.
|
|
|
|
self.project_repo = mock.MagicMock()
|
|
fail_method = mock.MagicMock(return_value=None,
|
|
side_effect=self._generate_get_error())
|
|
|
|
self.project_repo.find_by_external_project_id = fail_method
|
|
self.setup_project_repository_mock(self.project_repo)
|
|
|
|
secret_store_res = mock.MagicMock()
|
|
secret_store_res.to_dict_fields = mock.MagicMock(side_effect=IOError)
|
|
secret_store_res.id = self.store_id
|
|
|
|
self.resource = SecretStoreResource(secret_store_res)
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_get_a_secret_store(self):
|
|
self._assert_pass_rbac(['admin'],
|
|
self._invoke_on_get)
|
|
|
|
def test_should_raise_get_a_secret_store(self):
|
|
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
|
self._invoke_on_get)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingPreferredSecretStoreResource(BaseTestCase):
|
|
"""RBAC tests for barbican.api.resources.PreferredSecretStoreResource"""
|
|
def setUp(self):
|
|
super(WhenTestingPreferredSecretStoreResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.store_id = '123456SecretStoreId'
|
|
|
|
self.moc_enable_patcher = mock.patch(
|
|
'barbican.common.utils.is_multiple_backends_enabled')
|
|
enable_check_method = self.moc_enable_patcher.start()
|
|
enable_check_method.return_value = True
|
|
self.addCleanup(self.moc_enable_patcher.stop)
|
|
|
|
# Force an error on POST/DELETE calls that pass RBAC, as we are not
|
|
# testing such flows in this test module.
|
|
self.project_repo = mock.MagicMock()
|
|
fail_method = mock.MagicMock(return_value=None,
|
|
side_effect=self._generate_get_error())
|
|
self.project_repo.find_by_external_project_id = fail_method
|
|
self.setup_project_repository_mock(self.project_repo)
|
|
|
|
self.resource = PreferredSecretStoreResource(mock.MagicMock())
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_set_preferred_secret_store(self):
|
|
self._assert_pass_rbac(['admin'],
|
|
self._invoke_on_post)
|
|
|
|
def test_should_raise_set_preferred_secret_store(self):
|
|
self._assert_fail_rbac([None, 'creator', 'observer', 'audit'],
|
|
self._invoke_on_post)
|
|
|
|
def _invoke_on_post(self):
|
|
self.resource.on_post(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingSecretConsumersResource(BaseTestCase):
|
|
"""RBAC tests for barbican.api.resources.SecretConsumersResource"""
|
|
def setUp(self):
|
|
super(WhenTestingSecretConsumersResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.secret_id = '12345secret'
|
|
|
|
# Force an error on GET calls that pass RBAC, as we are not testing
|
|
# such flows in this test module.
|
|
self.consumer_repo = mock.MagicMock()
|
|
get_by_secret_id = mock.MagicMock(return_value=None,
|
|
side_effect=self
|
|
._generate_get_error())
|
|
self.consumer_repo.get_by_secret_id = get_by_secret_id
|
|
|
|
self.setup_project_repository_mock()
|
|
self.setup_secret_consumer_repository_mock(self.consumer_repo)
|
|
self.setup_secret_repository_mock()
|
|
|
|
self.resource = SecretConsumersResource(secret_id=self.secret_id)
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_create_consumer(self):
|
|
self._assert_pass_rbac(['admin'], self._invoke_on_post,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_create_consumer(self):
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
|
self._invoke_on_post,
|
|
content_type='application/json')
|
|
|
|
def test_should_pass_delete_consumer(self):
|
|
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_delete_consumer(self):
|
|
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
|
|
self._invoke_on_delete)
|
|
|
|
def test_should_pass_get_consumers(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get,
|
|
content_type='application/json')
|
|
|
|
def test_should_raise_get_consumers(self):
|
|
self._assert_fail_rbac([None, 'bogus'],
|
|
self._invoke_on_get,
|
|
content_type='application/json')
|
|
|
|
def _invoke_on_post(self):
|
|
self.resource.on_post(self.req, self.resp)
|
|
|
|
def _invoke_on_delete(self):
|
|
self.resource.on_delete(self.req, self.resp)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|
|
|
|
|
|
class WhenTestingSecretConsumerResource(BaseTestCase):
|
|
"""RBAC tests for barbican.api.resources.SecretConsumerResource"""
|
|
def setUp(self):
|
|
super(WhenTestingSecretConsumerResource, self).setUp()
|
|
|
|
self.external_project_id = '12345project'
|
|
self.consumer_id = '12345consumer'
|
|
|
|
# Force an error on GET calls that pass RBAC, as we are not testing
|
|
# such flows in this test module.
|
|
self.consumer_repo = mock.MagicMock()
|
|
fail_method = mock.MagicMock(return_value=None,
|
|
side_effect=self._generate_get_error())
|
|
self.consumer_repo.get = fail_method
|
|
|
|
self.setup_project_repository_mock()
|
|
self.setup_secret_consumer_repository_mock(self.consumer_repo)
|
|
self.resource = SecretConsumerResource(consumer_id=self.consumer_id)
|
|
|
|
def test_rules_should_be_loaded(self):
|
|
self.assertIsNotNone(self.policy_enforcer.rules)
|
|
|
|
def test_should_pass_get_consumer(self):
|
|
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
|
|
self._invoke_on_get)
|
|
|
|
def test_should_raise_get_consumer(self):
|
|
self._assert_fail_rbac([None, 'bogus'],
|
|
self._invoke_on_get)
|
|
|
|
def _invoke_on_get(self):
|
|
self.resource.on_get(self.req, self.resp)
|