Adding policy layer changes for ACL support (Part 5)

Added ACL based policy support for a secret or container
'read' operation.
Added support for allowing secret and container's creator
to perform 'get' call.
Added unit tests for policy changes.

Changed logic around secret or container project to match
client token's project. Now it uses policy rule instead
of using it in db lookup critieria.

Depends-On: I62668c03d48a3fa7477f7de8f7240b38b7e22eab

Change-Id: Ie28ed00125676fed5359cbb3e28a11648cf43a82
Implements: blueprint add-per-secret-policy
This commit is contained in:
Arun Kant
2015-03-17 13:09:58 -07:00
parent 879be59325
commit 237f7674cc
10 changed files with 646 additions and 55 deletions

View File

@@ -9,7 +9,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import uuid
import pecan
@@ -41,7 +41,7 @@ def _get_barbican_context(req):
return None
def _do_enforce_rbac(req, action_name, ctx):
def _do_enforce_rbac(inst, req, action_name, ctx, **kwargs):
"""Enforce RBAC based on 'request' information."""
if action_name and ctx:
@@ -56,10 +56,16 @@ def _do_enforce_rbac(req, action_name, ctx):
if 'secret:get' == action_name and not is_json_request_accept(req):
action_name = 'secret:decrypt' # Override to perform special rules
target_name, target_data = inst.get_acl_tuple(req, **kwargs)
policy_dict = {}
if target_name and target_data:
policy_dict['target'] = {target_name: target_data}
policy_dict.update(kwargs)
# Enforce access controls.
if ctx.policy_enforcer:
ctx.policy_enforcer.enforce(action_name, {}, credentials,
do_raise=True)
ctx.policy_enforcer.enforce(action_name, flatten(policy_dict),
credentials, do_raise=True)
def enforce_rbac(action_name='default'):
@@ -76,7 +82,7 @@ def enforce_rbac(action_name='default'):
if ctx:
external_project_id = ctx.project
_do_enforce_rbac(pecan.request, action_name, ctx)
_do_enforce_rbac(inst, pecan.request, action_name, ctx, **kwargs)
# insert external_project_id as the first arg to the guarded method
args = list(args)
args.insert(0, external_project_id)
@@ -154,3 +160,71 @@ def assert_is_valid_uuid_from_uri(doubtful_uuid):
uuid.UUID(doubtful_uuid)
except ValueError:
raise exception.InvalidUUIDInURI(uuid_string=doubtful_uuid)
def flatten(d, parent_key=''):
"""Flatten a nested dictionary
Converts a dictionary with nested values to a single level flat
dictionary, with dotted notation for each key.
"""
items = []
for k, v in d.items():
new_key = parent_key + '.' + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key).items())
else:
items.append((new_key, v))
return dict(items)
class ACLMixin(object):
def get_acl_tuple(self, req, **kwargs):
return None, None
def get_acl_dict_for_user(self, req, acl_list):
"""Get acl operation found for token user in acl list.
Token user is looked into users list present for each acl operation.
If there is a match, it means that ACL data is applicable for policy
logic. Policy logic requires data as dictionary so this method capture
acl's operation, creator_only data in that format.
For operation value, matching ACL record's operation is stored in dict
as key and value both.
creator_only flag is intended to make secret/container private for a
given operation. It doesn't require user match. So its captured in dict
format where key is prefixed with related operation and flag is used as
its value.
Then for acl related policy logic, this acl dict data is combined with
target entity (secret or container) creator_id and project id. The
whole dict serves as target in policy enforcement logic i.e. right
hand side of policy rule.
Following is sample outcome where secret or container has ACL defined
and token user is among the ACL users defined for 'read' and 'list'
operation.
{'read': 'read', 'list': 'list', 'read_creator_only': False,
'list_creator_only': False }
Its possible that ACLs are defined without any user, they just
have creator_only flag set. This means only creator can read or list
ACL entities. In that case, dictionary output can be as follows.
{'read_creator_only': True, 'list_creator_only': True }
"""
ctxt = _get_barbican_context(req)
if not ctxt:
return None
acl_dict = {acl.operation: acl.operation for acl in acl_list
if ctxt.user in acl.to_dict_fields().get('users')}
co_dict = {'%s_creator_only' % acl.operation: acl.creator_only for acl
in acl_list if acl.creator_only is not None}
acl_dict.update(co_dict)
return acl_dict

View File

@@ -51,7 +51,7 @@ def _acl_operation_update_not_allowed():
pecan.abort(400, u._("Existing ACL's operation cannot be updated."))
class SecretACLController(object):
class SecretACLController(controllers.ACLMixin):
"""Handles a SecretACL entity retrieval and update requests."""
def __init__(self, acl_id, secret_project_id, secret):
@@ -61,6 +61,10 @@ class SecretACLController(object):
self.acl_repo = repo.get_secret_acl_repository()
self.validator = validators.ACLValidator()
def get_acl_tuple(self, req, **kwargs):
d = {'project_id': self.secret_project_id}
return 'secret', d
@pecan.expose(generic=True)
def index(self):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@@ -131,7 +135,7 @@ class SecretACLController(object):
external_project_id=None)
class SecretACLsController(object):
class SecretACLsController(controllers.ACLMixin):
"""Handles SecretACL requests by a given secret id."""
def __init__(self, secret):
@@ -141,6 +145,10 @@ class SecretACLsController(object):
self.acl_repo = repo.get_secret_acl_repository()
self.validator = validators.ACLValidator()
def get_acl_tuple(self, req, **kwargs):
d = {'project_id': self.secret_project_id}
return 'secret', d
@pecan.expose()
def _lookup(self, acl_id, *remainder):
return SecretACLController(acl_id, self.secret_project_id,
@@ -286,7 +294,7 @@ class SecretACLsController(object):
return [{'acl_ref': acl['acl_ref']} for acl in acl_recs]
class ContainerACLController(object):
class ContainerACLController(controllers.ACLMixin):
"""Handles a ContainerACL entity retrieval and update requests."""
def __init__(self, acl_id, container_project_id, container):
@@ -296,6 +304,10 @@ class ContainerACLController(object):
self.acl_repo = repo.get_container_acl_repository()
self.validator = validators.ACLValidator()
def get_acl_tuple(self, req, **kwargs):
d = {'project_id': self.container_project_id}
return 'container', d
@pecan.expose(generic=True)
def index(self):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@@ -364,7 +376,7 @@ class ContainerACLController(object):
external_project_id=None)
class ContainerACLsController(object):
class ContainerACLsController(controllers.ACLMixin):
"""Handles ContainerACL requests by a given container id."""
def __init__(self, container_id):

View File

@@ -40,7 +40,7 @@ def _requested_preferred_ca_not_a_project_ca():
)
class CertificateAuthorityController(object):
class CertificateAuthorityController(controllers.ACLMixin):
"""Handles certificate authority retrieval requests."""
def __init__(self, ca):
@@ -222,7 +222,7 @@ class CertificateAuthorityController(object):
external_project_id)
class CertificateAuthoritiesController(object):
class CertificateAuthoritiesController(controllers.ACLMixin):
"""Handles certificate authority list requests."""
def __init__(self):

View File

@@ -32,7 +32,7 @@ def _consumer_not_found():
'another castle.'))
class ContainerConsumerController(object):
class ContainerConsumerController(controllers.ACLMixin):
"""Handles Consumer entity retrieval and deletion requests."""
def __init__(self, consumer_id):
@@ -62,7 +62,7 @@ class ContainerConsumerController(object):
)
class ContainerConsumersController(object):
class ContainerConsumersController(controllers.ACLMixin):
"""Handles Consumer creation requests."""
def __init__(self, container_id):

View File

@@ -34,7 +34,7 @@ def container_not_found():
'another castle.'))
class ContainerController(object):
class ContainerController(controllers.ACLMixin):
"""Handles Container entity retrieval and deletion requests."""
def __init__(self, container_id):
@@ -46,6 +46,17 @@ class ContainerController(object):
self.acls = acls.ContainerACLsController(self.container_id)
self.container = None
def get_acl_tuple(self, req, **kwargs):
self.container = self.container_repo.get_container_by_id(
entity_id=self.container_id, suppress_exception=True)
if self.container:
d = self.get_acl_dict_for_user(req, self.container.container_acls)
d['project_id'] = self.container.project.external_id
d['creator_id'] = self.container.creator_id
return 'container', d
else:
return None, None
@pecan.expose(generic=True)
def index(self, **kwargs):
pecan.abort(405) # HTTP 405 Method Not Allowed as default
@@ -96,7 +107,7 @@ class ContainerController(object):
pass
class ContainersController(object):
class ContainersController(controllers.ACLMixin):
"""Handles Container creation requests."""
def __init__(self):

View File

@@ -59,7 +59,7 @@ def order_cannot_modify_order_type():
pecan.abort(400, u._("Cannot modify order type."))
class OrderController(object):
class OrderController(controllers.ACLMixin):
"""Handles Order retrieval and deletion requests."""
@@ -118,7 +118,7 @@ class OrderController(object):
external_project_id=external_project_id)
class OrdersController(object):
class OrdersController(controllers.ACLMixin):
"""Handles Order requests for Secret creation."""
def __init__(self, queue_resource=None):

View File

@@ -53,7 +53,7 @@ def _request_has_twsk_but_no_transport_key_id():
'transport key id has not been provided.'))
class SecretController(object):
class SecretController(controllers.ACLMixin):
"""Handles Secret retrieval and deletion requests."""
def __init__(self, secret):
@@ -61,6 +61,12 @@ class SecretController(object):
self.secret = secret
self.transport_key_repo = repo.get_transport_key_repository()
def get_acl_tuple(self, req, **kwargs):
d = self.get_acl_dict_for_user(req, self.secret.secret_acls)
d['project_id'] = self.secret.project_assocs[0].projects.external_id
d['creator_id'] = self.secret.creator_id
return 'secret', d
@pecan.expose()
def _lookup(self, sub_resource, *remainder):
if sub_resource == 'acls':
@@ -108,6 +114,12 @@ class SecretController(object):
def _on_get_secret_payload(self, secret, external_project_id, **kwargs):
"""GET actual payload containing the secret."""
# With ACL support, the user token project does not have to be same as
# project associated with secret. The lookup project_id needs to be
# derived from the secret's data considering authorization is already
# done.
external_project_id = secret.project_assocs[0].projects.external_id
project = res.get_or_create_project(external_project_id)
pecan.override_template('', pecan.request.accept.header_value)
@@ -190,7 +202,7 @@ class SecretController(object):
plugin.delete_secret(self.secret, external_project_id)
class SecretsController(object):
class SecretsController(controllers.ACLMixin):
"""Handles Secret creation requests."""
def __init__(self):

View File

@@ -34,7 +34,7 @@ def _transport_key_not_found():
pecan.abort(404, u._('Not Found. Transport Key not found.'))
class TransportKeyController(object):
class TransportKeyController(controllers.ACLMixin):
"""Handles transport key retrieval requests."""
def __init__(self, transport_key_id, transport_key_repo=None):
@@ -74,7 +74,7 @@ class TransportKeyController(object):
_transport_key_not_found()
class TransportKeysController(object):
class TransportKeysController(controllers.ACLMixin):
"""Handles transport key list requests."""
def __init__(self, transport_key_repo=None):

View File

@@ -12,13 +12,11 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This test module focuses on RBAC interactions with the API resource classes.
For typical-flow business logic tests of these classes, see the
'resources_test.py' module.
"""
import os
import mock
@@ -26,10 +24,12 @@ from oslo_config import cfg
from webob import exc
from barbican.api.controllers import consumers
from barbican.api.controllers import containers
from barbican.api.controllers import orders
from barbican.api.controllers import secrets
from barbican.api.controllers import versions
from barbican import context
from barbican.model import models
from barbican.openstack.common import policy
from barbican.tests import utils
@@ -89,6 +89,10 @@ class OrderResource(TestableResource):
controller_cls = orders.OrderController
class ContainerResource(TestableResource):
controller_cls = containers.ContainerController
class ConsumersResource(TestableResource):
controller_cls = consumers.ContainerConsumersController
@@ -106,14 +110,15 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
self.policy_enforcer.load_rules(True)
self.resp = mock.MagicMock()
def _generate_req(self, roles=None, accept=None, content_type=None):
def _generate_req(self, roles=None, accept=None, content_type=None,
user_id=None, project_id=None):
"""Generate a fake HTTP request with security context added to it."""
req = mock.MagicMock()
req.get_param.return_value = None
kwargs = {
'user': None,
'project': None,
'user': user_id,
'project': project_id,
'roles': roles or [],
'policy_enforcer': self.policy_enforcer,
}
@@ -157,7 +162,7 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
return exc.HTTPServerError(message='Read Error')
def _assert_pass_rbac(self, roles, method_under_test, accept=None,
content_type=None):
content_type=None, user_id=None, project_id=None):
"""Assert that RBAC authorization rules passed for the specified roles.
:param roles: List of roles to check, one at a time
@@ -168,7 +173,9 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
for role in roles:
self.req = self._generate_req(roles=[role] if role else [],
accept=accept,
content_type=content_type)
content_type=content_type,
user_id=user_id,
project_id=project_id)
# Force an exception early past the RBAC passing.
self.req.body_file = self._generate_stream_for_exit()
@@ -177,7 +184,7 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
self._assert_post_rbac_exception(exception, role)
def _assert_fail_rbac(self, roles, method_under_test, accept=None,
content_type=None):
content_type=None, user_id=None, project_id=None):
"""Assert that RBAC rules failed for one of the specified roles.
:param roles: List of roles to check, one at a time
@@ -188,7 +195,9 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin):
for role in roles:
self.req = self._generate_req(roles=[role] if role else [],
accept=accept,
content_type=content_type)
content_type=content_type,
user_id=user_id,
project_id=project_id)
exception = self.assertRaises(exc.HTTPForbidden, method_under_test)
self.assertEqual(403, exception.status_int)
@@ -283,12 +292,15 @@ class WhenTestingSecretsResource(BaseTestCase):
class WhenTestingSecretResource(BaseTestCase):
"""RBAC tests for the barbican.api.resources.SecretResource class."""
"""RBAC tests for SecretController class."""
def setUp(self):
super(WhenTestingSecretResource, self).setUp()
self.external_project_id = '12345project'
self.secret_id = '12345secret'
self.user_id = '123456user'
self.creator_user_id = '123456CreatorUser'
# Force an error on GET and DELETE calls that pass RBAC,
# as we are not testing such flows in this test module.
@@ -305,7 +317,20 @@ class WhenTestingSecretResource(BaseTestCase):
self.setup_secret_meta_repository_mock()
self.setup_transport_key_repository_mock()
self.resource = SecretResource(self.secret_id)
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
creator_only=False,
user_ids=[self.user_id, 'anyRandomId'])
self.acl_list = [acl_read]
secret = mock.MagicMock()
secret.secret_acls.__iter__.return_value = self.acl_list
secret.project_assocs[0].projects.external_id = (self.
external_project_id)
secret.creator_id = self.creator_user_id
self.resource = SecretResource(secret)
# self.resource.controller.get_acl_tuple = mock.MagicMock(
# return_value=(None, None))
def test_rules_should_be_loaded(self):
self.assertIsNotNone(self.policy_enforcer.rules)
@@ -314,16 +339,236 @@ class WhenTestingSecretResource(BaseTestCase):
self._assert_pass_rbac(['admin', 'observer', 'creator'],
self._invoke_on_get,
accept='notjsonaccepttype',
content_type='application/json')
content_type='application/json',
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_decrypt_secret(self):
self._assert_fail_rbac([None, 'audit', 'bogus'],
self._invoke_on_get,
accept='notjsonaccepttype')
def test_should_pass_decrypt_secret_for_same_project_with_no_acl(self):
"""Token and secret project needs to be same in no ACL defined case."""
self.acl_list.pop() # remove read acl from default setup
self._assert_pass_rbac(['admin', 'observer', 'creator'],
self._invoke_on_get,
accept='notjsonaccepttype',
content_type='application/json',
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_decrypt_secret_for_with_creator_only_enabled(self):
"""Should raise authz error as secret is marked private.
As secret is private so project users should not be able to access
the secret.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
creator_only=True,
user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
accept='notjsonaccepttype',
content_type='application/json',
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_pass_decrypt_secret_private_enabled_with_read_acl(self):
"""Should pass authz as user has read acl for private secret.
Even though secret is private, user with read acl should be able to
access the secret.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
creator_only=True,
user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
accept='notjsonaccepttype',
content_type='application/json',
user_id='aclUser1',
project_id=self.external_project_id)
def test_should_pass_decrypt_secret_different_user_valid_read_acl(self):
self.acl_list.pop() # remove read acl from default setup
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
creator_only=False,
user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
# token project_id is different from secret's project id but another
# user (from different project) has read acl for secret so should pass
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
accept='notjsonaccepttype',
content_type='application/json',
user_id='aclUser1',
project_id='different_project_id')
def test_should_raise_decrypt_secret_for_different_user_no_read_acl(self):
self.acl_list.pop() # remove read acl from default setup
acl_read = models.SecretACL(secret_id=self.secret_id,
operation='write',
creator_only=False,
user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
# token project_id is different from secret's project id but another
# user (from different project) has read acl for secret so should pass
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
accept='notjsonaccepttype',
content_type='application/json',
user_id='aclUser1',
project_id='different_project_id')
def test_should_pass_decrypt_secret_for_creator_user_with_no_acl(self):
"""Check for creator user rule for secret decrypt/get call.
If token's user is same as creator of secret, then they are allowed
to read decrypted secret regardless of token's project or token's
role.
"""
self.acl_list.pop() # remove read acl from default setup
self.resource.controller.secret.creator_id = 'creatorUserX'
# token user is creator so allowed get call regardless of role
# user (from different project) has read acl for secret so should pass
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
accept='notjsonaccepttype',
content_type='application/json',
user_id='creatorUserX',
project_id='different_project_id')
def test_should_pass_get_secret(self):
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get)
self._invoke_on_get,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_pass_get_secret_with_no_context(self):
"""In unauthenticated flow, get secret should work."""
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get_without_context,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_get_secret_for_different_project_no_acl(self):
"""Should raise error when secret and token's project is different."""
self.acl_list.pop() # remove read acl from default setup
# token project_id is different from secret's project id so should fail
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id=self.user_id,
project_id='different_id')
def test_should_pass_get_secret_for_same_project_but_different_user(self):
# user id should not matter as long token and secret's project match
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id='different_user_id',
project_id=self.external_project_id)
def test_should_pass_get_secret_for_same_project_with_no_acl(self):
self.acl_list.pop() # remove read acl from default setup
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_get_secret_for_with_creator_only_enabled(self):
"""Should raise authz error as secret is marked private.
As secret is private so project users should not be able to access
the secret.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
creator_only=True,
user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_pass_get_secret_for_private_enabled_with_read_acl(self):
"""Should pass authz as user has read acl for private secret.
Even though secret is private, user with read acl should be able to
access the secret.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
creator_only=True,
user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
user_id='aclUser1',
project_id=self.external_project_id)
def test_should_pass_get_secret_different_user_with_valid_read_acl(self):
"""Should allow when read ACL is defined for a user.
Secret's own project and token's project is different but read is
allowed because of valid read ACL.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.SecretACL(secret_id=self.secret_id, operation='read',
creator_only=False,
user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
# token project_id is different from secret's project id but another
# user (from different project) has read acl for secret so should pass
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
user_id='aclUser1',
project_id='different_project_id')
def test_should_raise_get_secret_for_different_user_with_no_read_acl(self):
"""Get secret fails when no read acl is defined.
With different secret and token's project, read is not allowed without
a read ACL.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.SecretACL(secret_id=self.secret_id,
operation='write',
creator_only=False,
user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
# token project_id is different from secret's project id but another
# user (from different project) has read acl for secret so should pass
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id='aclUser1',
project_id='different_project_id')
def test_should_pass_get_secret_for_creator_user_with_no_acl(self):
"""Check for creator user rule for secret get call.
If token's user is same as creator of secret, then he/she is allowed
to read secret regardless of token's project or token's role.
"""
self.acl_list.pop() # remove read acl from default setup
self.resource.controller.secret.creator_id = 'creatorUserX'
# token user is creator so allowed get call regardless of role
# user (from different project) has read acl for secret so should pass
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
user_id='creatorUserX',
project_id='different_project_id')
def test_should_raise_get_secret(self):
self._assert_fail_rbac([None, 'bogus'],
@@ -331,7 +576,9 @@ class WhenTestingSecretResource(BaseTestCase):
def test_should_pass_put_secret(self):
self._assert_pass_rbac(['admin', 'creator'], self._invoke_on_put,
content_type="application/octet-stream")
content_type="application/octet-stream",
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_put_secret(self):
self._assert_fail_rbac([None, 'audit', 'observer', 'bogus'],
@@ -339,16 +586,224 @@ class WhenTestingSecretResource(BaseTestCase):
content_type="application/octet-stream")
def test_should_pass_delete_secret(self):
self._assert_pass_rbac(['admin'], self._invoke_on_delete)
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_delete_secret(self):
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
self._invoke_on_delete)
# @mock.patch.object(secrets.SecretController, 'get_acl_tuple',
# return_value=(None, None))
def _invoke_on_get(self):
self.resource.on_get(self.req, self.resp,
self.external_project_id)
def _invoke_on_get_without_context(self):
# Adding this to get code coverage around context check lines
self.req.environ.pop('barbican.context')
self.resource.on_get(self.req, self.resp,
self.external_project_id)
def _invoke_on_put(self):
self.resource.on_put(self.req, self.resp,
self.external_project_id)
def _invoke_on_delete(self):
self.resource.on_delete(self.req, self.resp,
self.external_project_id)
class WhenTestingContainerResource(BaseTestCase):
"""RBAC tests for ContainerController class.
Container controller tests are quite similar to SecretController as
policy logic is same. Just adding them here to make sure logic related to
acl gathering data works as expected.
"""
def setUp(self):
super(WhenTestingContainerResource, self).setUp()
self.external_project_id = '12345project'
self.container_id = '12345secret'
self.user_id = '123456user'
self.creator_user_id = '123456CreatorUser'
# Force an error on GET and DELETE calls that pass RBAC,
# as we are not testing such flows in this test module.
self.container_repo = mock.MagicMock()
fail_method = mock.MagicMock(return_value=None,
side_effect=self._generate_get_error())
self.container_repo.get = fail_method
self.container_repo.delete_entity_by_id = fail_method
acl_read = models.ContainerACL(
container_id=self.container_id, operation='read',
creator_only=False, user_ids=[self.user_id, 'anyRandomId'])
self.acl_list = [acl_read]
container = mock.MagicMock()
container.container_acls.__iter__.return_value = self.acl_list
container.project.external_id = self.external_project_id
container.creator_id = self.creator_user_id
self.container_repo.get_container_by_id.return_value = container
self.setup_container_repository_mock(self.container_repo)
self.resource = ContainerResource(self.container_id)
def test_rules_should_be_loaded(self):
self.assertIsNotNone(self.policy_enforcer.rules)
def test_should_pass_get_container(self):
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_pass_get_container_with_no_context(self):
"""In unauthenticated flow, get container should work."""
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get_without_context,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_get_container_for_different_project_no_acl(self):
"""Raise error when container and token's project is different."""
self.acl_list.pop() # remove read acl from default setup
# token project_id is different from secret's project id so should fail
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id=self.user_id,
project_id='different_id')
def test_should_pass_get_container_for_same_project_but_different_user(
self):
"""Should pass if token and secret's project match.
User id should not matter as long token and container's project match.
"""
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id='different_user_id',
project_id=self.external_project_id)
def test_should_pass_get_container_for_same_project_with_no_acl(self):
self.acl_list.pop() # remove read acl from default setup
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_get_container_for_with_creator_only_enabled(self):
"""Should raise authz error as container is marked private.
As container is private so project users should not be able to access
the secret.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.ContainerACL(
container_id=self.container_id, operation='read',
creator_only=True, user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_pass_get_container_for_private_enabled_with_read_acl(self):
"""Should pass authz as user has read acl for private container.
Even though container is private, user with read acl should be able to
access the container.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.ContainerACL(
container_id=self.container_id, operation='read',
creator_only=True, user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
user_id='aclUser1',
project_id=self.external_project_id)
def test_should_pass_get_container_different_user_with_valid_read_acl(
self):
"""Should allow when read ACL is defined for a user.
Container's own project and token's project is different but read is
allowed because of valid read ACL. User can read regardless of what is
token's project as it has necessary ACL.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.ContainerACL(
container_id=self.container_id, operation='read',
creator_only=False, user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
user_id='aclUser1',
project_id='different_project_id')
def test_should_raise_get_container_for_different_user_with_no_read_acl(
self):
"""Get secret fails when no read acl is defined.
With different container and token's project, read is not allowed
without a read ACL.
"""
self.acl_list.pop() # remove read acl from default setup
acl_read = models.ContainerACL(
container_id=self.container_id, operation='write',
creator_only=False, user_ids=['anyRandomUserX', 'aclUser1'])
self.acl_list.append(acl_read)
# token project_id is different from secret's project id but another
# user (from different project) has read acl for secret so should pass
self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'],
self._invoke_on_get,
user_id='aclUser1',
project_id='different_project_id')
def test_should_pass_get_container_for_creator_user_with_no_acl(self):
"""Check for creator user rule for container get call.
If token's user is same as creator of container, then he/she is allowed
to read container regardless of token's project or token's role.
"""
self.acl_list.pop() # remove read acl from default setup
self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit',
'bogusRole'],
self._invoke_on_get,
user_id=self.creator_user_id,
project_id='different_project_id')
def test_should_raise_get_container(self):
self._assert_fail_rbac([None, 'bogus'],
self._invoke_on_get)
def test_should_pass_delete_container(self):
self._assert_pass_rbac(['admin'], self._invoke_on_delete,
user_id=self.user_id,
project_id=self.external_project_id)
def test_should_raise_delete_container(self):
self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'],
self._invoke_on_delete)
def _invoke_on_get(self):
self.resource.on_get(self.req, self.resp,
self.external_project_id)
def _invoke_on_get_without_context(self):
# Adding this to get code coverage around context check lines
self.req.environ.pop('barbican.context')
self.resource.on_get(self.req, self.resp,
self.external_project_id)
def _invoke_on_put(self):
self.resource.on_put(self.req, self.resp,
self.external_project_id)

View File

@@ -1,33 +1,46 @@
{
"admin": "role:admin",
"observer": "role:observer",
"creator_role": "role:creator",
"audit": "role:audit",
"admin_or_user_does_not_work": "project_id:%(project_id)s",
"admin_or_user": "role:admin or project_id:%(project_id)s",
"admin_or_creator_role": "role:admin or role:creator",
"all_but_audit": "role:admin or role:observer or role:creator",
"all_users": "role:admin or role:observer or role:creator or role:audit",
"secret_project_match": "project:%(target.secret.project_id)s",
"secret_acl_read": "'read':%(target.secret.read)s",
"secret_private_read": "'True':%(target.secret.read_creator_only)s",
"secret_creator_user": "user:%(target.secret.creator_id)s",
"container_project_match": "project:%(target.container.project_id)s",
"container_acl_read": "'read':%(target.container.read)s",
"container_private_read": "'True':%(target.container.read_creator_only)s",
"container_creator_user": "user:%(target.container.creator_id)s",
"secret_non_private_read": "rule:all_users and rule:secret_project_match and not rule:secret_private_read",
"secret_decrypt_non_private_read": "rule:all_but_audit and rule:secret_project_match and not rule:secret_private_read",
"container_non_private_read": "rule:all_users and rule:container_project_match and not rule:container_private_read",
"version:get": "@",
"secret:decrypt": "rule:all_but_audit",
"secret:get": "rule:all_users",
"secret:put": "rule:admin_or_creator",
"secret:delete": "rule:admin",
"secrets:post": "rule:admin_or_creator",
"secret:decrypt": "rule:secret_decrypt_non_private_read or rule:secret_creator_user or rule:secret_acl_read",
"secret:get": "rule:secret_non_private_read or rule:secret_creator_user or rule:secret_acl_read",
"secret:put": "rule:admin_or_creator_role and rule:secret_project_match",
"secret:delete": "rule:admin and rule:secret_project_match",
"secrets:post": "rule:admin_or_creator_role",
"secrets:get": "rule:all_but_audit",
"orders:post": "rule:admin_or_creator",
"orders:post": "rule:admin_or_creator_role",
"orders:get": "rule:all_but_audit",
"order:get": "rule:all_users",
"order:put":"rule:admin_or_creator",
"order:put": "rule:admin_or_creator_role",
"order:delete": "rule:admin",
"consumer:get": "rule:all_users",
"consumers:get": "rule:all_users",
"consumers:post": "rule:admin",
"consumers:delete": "rule:admin",
"containers:post": "rule:admin_or_creator",
"containers:post": "rule:admin_or_creator_role",
"containers:get": "rule:all_but_audit",
"container:get": "rule:all_users",
"container:get": "rule:container_non_private_read or rule:container_creator_user or rule:container_acl_read",
"container:delete": "rule:admin",
"admin": "role:admin",
"observer": "role:observer",
"creator": "role:creator",
"audit": "rule:audit",
"admin_or_user_does_not_work": "project_id:%(project_id)s",
"admin_or_user": "role:admin or project_id:%(project_id)s",
"admin_or_creator": "role:admin or role:creator",
"all_but_audit": "role:admin or role:observer or role:creator",
"all_users": "role:admin or role:observer or role:creator or role:audit",
"transport_key:get": "rule:all_users",
"transport_key:delete": "rule:admin",
"transport_keys:get": "rule:all_users",
@@ -43,5 +56,19 @@
"certificate_authority:set_global_preferred": "rule:admin",
"certificate_authority:unset_global_preferred": "rule:admin",
"certificate_authority:get_global_preferred": "rule:all_users",
"certificate_authority:get_preferred_ca": "rule:all_users"
"certificate_authority:get_preferred_ca": "rule:all_users",
"secret_acls:post": "rule:admin_or_creator_role and rule:secret_project_match",
"secret_acls:patch": "rule:admin_or_creator_role and rule:secret_project_match",
"secret_acls:delete": "rule:admin_or_creator_role and rule:secret_project_match",
"secret_acls:get": "rule:all_but_audit and rule:secret_project_match",
"secret_acl:get": "rule:all_but_audit and rule:secret_project_match",
"secret_acl:patch": "rule:admin_or_creator_role and rule:secret_project_match",
"secret_acl:delete": "rule:admin_or_creator_role and rule:secret_project_match",
"container_acls:post": "rule:admin_or_creator_role and rule:container_project_match",
"container_acls:patch": "rule:admin_or_creator_role and rule:container_project_match",
"container_acls:delete": "rule:admin_or_creator_role and rule:container_project_match",
"container_acls:get": "rule:all_but_audit and rule:container_project_match",
"container_acl:get": "rule:all_but_audit and rule:container_project_match",
"container_acl:patch": "rule:admin_or_creator_role and rule:container_project_match",
"container_acl:delete": "rule:admin_or_creator_role and rule:container_project_match"
}