diff --git a/barbican/api/controllers/__init__.py b/barbican/api/controllers/__init__.py index db1d43b3b..a1eb82839 100644 --- a/barbican/api/controllers/__init__.py +++ b/barbican/api/controllers/__init__.py @@ -9,7 +9,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - +import collections import uuid import pecan @@ -41,7 +41,7 @@ def _get_barbican_context(req): return None -def _do_enforce_rbac(req, action_name, ctx): +def _do_enforce_rbac(inst, req, action_name, ctx, **kwargs): """Enforce RBAC based on 'request' information.""" if action_name and ctx: @@ -56,10 +56,16 @@ def _do_enforce_rbac(req, action_name, ctx): if 'secret:get' == action_name and not is_json_request_accept(req): action_name = 'secret:decrypt' # Override to perform special rules + target_name, target_data = inst.get_acl_tuple(req, **kwargs) + policy_dict = {} + if target_name and target_data: + policy_dict['target'] = {target_name: target_data} + + policy_dict.update(kwargs) # Enforce access controls. if ctx.policy_enforcer: - ctx.policy_enforcer.enforce(action_name, {}, credentials, - do_raise=True) + ctx.policy_enforcer.enforce(action_name, flatten(policy_dict), + credentials, do_raise=True) def enforce_rbac(action_name='default'): @@ -76,7 +82,7 @@ def enforce_rbac(action_name='default'): if ctx: external_project_id = ctx.project - _do_enforce_rbac(pecan.request, action_name, ctx) + _do_enforce_rbac(inst, pecan.request, action_name, ctx, **kwargs) # insert external_project_id as the first arg to the guarded method args = list(args) args.insert(0, external_project_id) @@ -154,3 +160,71 @@ def assert_is_valid_uuid_from_uri(doubtful_uuid): uuid.UUID(doubtful_uuid) except ValueError: raise exception.InvalidUUIDInURI(uuid_string=doubtful_uuid) + + +def flatten(d, parent_key=''): + """Flatten a nested dictionary + + Converts a dictionary with nested values to a single level flat + dictionary, with dotted notation for each key. + + """ + items = [] + for k, v in d.items(): + new_key = parent_key + '.' + k if parent_key else k + if isinstance(v, collections.MutableMapping): + items.extend(flatten(v, new_key).items()) + else: + items.append((new_key, v)) + return dict(items) + + +class ACLMixin(object): + + def get_acl_tuple(self, req, **kwargs): + return None, None + + def get_acl_dict_for_user(self, req, acl_list): + """Get acl operation found for token user in acl list. + + Token user is looked into users list present for each acl operation. + If there is a match, it means that ACL data is applicable for policy + logic. Policy logic requires data as dictionary so this method capture + acl's operation, creator_only data in that format. + + For operation value, matching ACL record's operation is stored in dict + as key and value both. + creator_only flag is intended to make secret/container private for a + given operation. It doesn't require user match. So its captured in dict + format where key is prefixed with related operation and flag is used as + its value. + + Then for acl related policy logic, this acl dict data is combined with + target entity (secret or container) creator_id and project id. The + whole dict serves as target in policy enforcement logic i.e. right + hand side of policy rule. + + Following is sample outcome where secret or container has ACL defined + and token user is among the ACL users defined for 'read' and 'list' + operation. + + {'read': 'read', 'list': 'list', 'read_creator_only': False, + 'list_creator_only': False } + + Its possible that ACLs are defined without any user, they just + have creator_only flag set. This means only creator can read or list + ACL entities. In that case, dictionary output can be as follows. + + {'read_creator_only': True, 'list_creator_only': True } + + """ + ctxt = _get_barbican_context(req) + if not ctxt: + return None + acl_dict = {acl.operation: acl.operation for acl in acl_list + if ctxt.user in acl.to_dict_fields().get('users')} + co_dict = {'%s_creator_only' % acl.operation: acl.creator_only for acl + in acl_list if acl.creator_only is not None} + acl_dict.update(co_dict) + + return acl_dict diff --git a/barbican/api/controllers/acls.py b/barbican/api/controllers/acls.py index 2dafc394f..7568e0d79 100644 --- a/barbican/api/controllers/acls.py +++ b/barbican/api/controllers/acls.py @@ -51,7 +51,7 @@ def _acl_operation_update_not_allowed(): pecan.abort(400, u._("Existing ACL's operation cannot be updated.")) -class SecretACLController(object): +class SecretACLController(controllers.ACLMixin): """Handles a SecretACL entity retrieval and update requests.""" def __init__(self, acl_id, secret_project_id, secret): @@ -61,6 +61,10 @@ class SecretACLController(object): self.acl_repo = repo.get_secret_acl_repository() self.validator = validators.ACLValidator() + def get_acl_tuple(self, req, **kwargs): + d = {'project_id': self.secret_project_id} + return 'secret', d + @pecan.expose(generic=True) def index(self): pecan.abort(405) # HTTP 405 Method Not Allowed as default @@ -131,7 +135,7 @@ class SecretACLController(object): external_project_id=None) -class SecretACLsController(object): +class SecretACLsController(controllers.ACLMixin): """Handles SecretACL requests by a given secret id.""" def __init__(self, secret): @@ -141,6 +145,10 @@ class SecretACLsController(object): self.acl_repo = repo.get_secret_acl_repository() self.validator = validators.ACLValidator() + def get_acl_tuple(self, req, **kwargs): + d = {'project_id': self.secret_project_id} + return 'secret', d + @pecan.expose() def _lookup(self, acl_id, *remainder): return SecretACLController(acl_id, self.secret_project_id, @@ -286,7 +294,7 @@ class SecretACLsController(object): return [{'acl_ref': acl['acl_ref']} for acl in acl_recs] -class ContainerACLController(object): +class ContainerACLController(controllers.ACLMixin): """Handles a ContainerACL entity retrieval and update requests.""" def __init__(self, acl_id, container_project_id, container): @@ -296,6 +304,10 @@ class ContainerACLController(object): self.acl_repo = repo.get_container_acl_repository() self.validator = validators.ACLValidator() + def get_acl_tuple(self, req, **kwargs): + d = {'project_id': self.container_project_id} + return 'container', d + @pecan.expose(generic=True) def index(self): pecan.abort(405) # HTTP 405 Method Not Allowed as default @@ -364,7 +376,7 @@ class ContainerACLController(object): external_project_id=None) -class ContainerACLsController(object): +class ContainerACLsController(controllers.ACLMixin): """Handles ContainerACL requests by a given container id.""" def __init__(self, container_id): diff --git a/barbican/api/controllers/cas.py b/barbican/api/controllers/cas.py index a58f3499e..d81751f5c 100644 --- a/barbican/api/controllers/cas.py +++ b/barbican/api/controllers/cas.py @@ -40,7 +40,7 @@ def _requested_preferred_ca_not_a_project_ca(): ) -class CertificateAuthorityController(object): +class CertificateAuthorityController(controllers.ACLMixin): """Handles certificate authority retrieval requests.""" def __init__(self, ca): @@ -222,7 +222,7 @@ class CertificateAuthorityController(object): external_project_id) -class CertificateAuthoritiesController(object): +class CertificateAuthoritiesController(controllers.ACLMixin): """Handles certificate authority list requests.""" def __init__(self): diff --git a/barbican/api/controllers/consumers.py b/barbican/api/controllers/consumers.py index 4d2d2a826..8c802384b 100644 --- a/barbican/api/controllers/consumers.py +++ b/barbican/api/controllers/consumers.py @@ -32,7 +32,7 @@ def _consumer_not_found(): 'another castle.')) -class ContainerConsumerController(object): +class ContainerConsumerController(controllers.ACLMixin): """Handles Consumer entity retrieval and deletion requests.""" def __init__(self, consumer_id): @@ -62,7 +62,7 @@ class ContainerConsumerController(object): ) -class ContainerConsumersController(object): +class ContainerConsumersController(controllers.ACLMixin): """Handles Consumer creation requests.""" def __init__(self, container_id): diff --git a/barbican/api/controllers/containers.py b/barbican/api/controllers/containers.py index 6d058db22..a7d994a57 100644 --- a/barbican/api/controllers/containers.py +++ b/barbican/api/controllers/containers.py @@ -34,7 +34,7 @@ def container_not_found(): 'another castle.')) -class ContainerController(object): +class ContainerController(controllers.ACLMixin): """Handles Container entity retrieval and deletion requests.""" def __init__(self, container_id): @@ -46,6 +46,17 @@ class ContainerController(object): self.acls = acls.ContainerACLsController(self.container_id) self.container = None + def get_acl_tuple(self, req, **kwargs): + self.container = self.container_repo.get_container_by_id( + entity_id=self.container_id, suppress_exception=True) + if self.container: + d = self.get_acl_dict_for_user(req, self.container.container_acls) + d['project_id'] = self.container.project.external_id + d['creator_id'] = self.container.creator_id + return 'container', d + else: + return None, None + @pecan.expose(generic=True) def index(self, **kwargs): pecan.abort(405) # HTTP 405 Method Not Allowed as default @@ -96,7 +107,7 @@ class ContainerController(object): pass -class ContainersController(object): +class ContainersController(controllers.ACLMixin): """Handles Container creation requests.""" def __init__(self): diff --git a/barbican/api/controllers/orders.py b/barbican/api/controllers/orders.py index 7cc1124e8..16a543967 100644 --- a/barbican/api/controllers/orders.py +++ b/barbican/api/controllers/orders.py @@ -59,7 +59,7 @@ def order_cannot_modify_order_type(): pecan.abort(400, u._("Cannot modify order type.")) -class OrderController(object): +class OrderController(controllers.ACLMixin): """Handles Order retrieval and deletion requests.""" @@ -118,7 +118,7 @@ class OrderController(object): external_project_id=external_project_id) -class OrdersController(object): +class OrdersController(controllers.ACLMixin): """Handles Order requests for Secret creation.""" def __init__(self, queue_resource=None): diff --git a/barbican/api/controllers/secrets.py b/barbican/api/controllers/secrets.py index 65a793f16..3646d7328 100644 --- a/barbican/api/controllers/secrets.py +++ b/barbican/api/controllers/secrets.py @@ -53,7 +53,7 @@ def _request_has_twsk_but_no_transport_key_id(): 'transport key id has not been provided.')) -class SecretController(object): +class SecretController(controllers.ACLMixin): """Handles Secret retrieval and deletion requests.""" def __init__(self, secret): @@ -61,6 +61,12 @@ class SecretController(object): self.secret = secret self.transport_key_repo = repo.get_transport_key_repository() + def get_acl_tuple(self, req, **kwargs): + d = self.get_acl_dict_for_user(req, self.secret.secret_acls) + d['project_id'] = self.secret.project_assocs[0].projects.external_id + d['creator_id'] = self.secret.creator_id + return 'secret', d + @pecan.expose() def _lookup(self, sub_resource, *remainder): if sub_resource == 'acls': @@ -108,6 +114,12 @@ class SecretController(object): def _on_get_secret_payload(self, secret, external_project_id, **kwargs): """GET actual payload containing the secret.""" + + # With ACL support, the user token project does not have to be same as + # project associated with secret. The lookup project_id needs to be + # derived from the secret's data considering authorization is already + # done. + external_project_id = secret.project_assocs[0].projects.external_id project = res.get_or_create_project(external_project_id) pecan.override_template('', pecan.request.accept.header_value) @@ -190,7 +202,7 @@ class SecretController(object): plugin.delete_secret(self.secret, external_project_id) -class SecretsController(object): +class SecretsController(controllers.ACLMixin): """Handles Secret creation requests.""" def __init__(self): diff --git a/barbican/api/controllers/transportkeys.py b/barbican/api/controllers/transportkeys.py index aaeee010b..b33bd1d3c 100644 --- a/barbican/api/controllers/transportkeys.py +++ b/barbican/api/controllers/transportkeys.py @@ -34,7 +34,7 @@ def _transport_key_not_found(): pecan.abort(404, u._('Not Found. Transport Key not found.')) -class TransportKeyController(object): +class TransportKeyController(controllers.ACLMixin): """Handles transport key retrieval requests.""" def __init__(self, transport_key_id, transport_key_repo=None): @@ -74,7 +74,7 @@ class TransportKeyController(object): _transport_key_not_found() -class TransportKeysController(object): +class TransportKeysController(controllers.ACLMixin): """Handles transport key list requests.""" def __init__(self, transport_key_repo=None): diff --git a/barbican/tests/api/test_resources_policy.py b/barbican/tests/api/test_resources_policy.py index 87e33dd72..29fb0541d 100644 --- a/barbican/tests/api/test_resources_policy.py +++ b/barbican/tests/api/test_resources_policy.py @@ -12,13 +12,11 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. - """ This test module focuses on RBAC interactions with the API resource classes. For typical-flow business logic tests of these classes, see the 'resources_test.py' module. """ - import os import mock @@ -26,10 +24,12 @@ from oslo_config import cfg from webob import exc from barbican.api.controllers import consumers +from barbican.api.controllers import containers from barbican.api.controllers import orders from barbican.api.controllers import secrets from barbican.api.controllers import versions from barbican import context +from barbican.model import models from barbican.openstack.common import policy from barbican.tests import utils @@ -89,6 +89,10 @@ class OrderResource(TestableResource): controller_cls = orders.OrderController +class ContainerResource(TestableResource): + controller_cls = containers.ContainerController + + class ConsumersResource(TestableResource): controller_cls = consumers.ContainerConsumersController @@ -106,14 +110,15 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin): self.policy_enforcer.load_rules(True) self.resp = mock.MagicMock() - def _generate_req(self, roles=None, accept=None, content_type=None): + def _generate_req(self, roles=None, accept=None, content_type=None, + user_id=None, project_id=None): """Generate a fake HTTP request with security context added to it.""" req = mock.MagicMock() req.get_param.return_value = None kwargs = { - 'user': None, - 'project': None, + 'user': user_id, + 'project': project_id, 'roles': roles or [], 'policy_enforcer': self.policy_enforcer, } @@ -157,7 +162,7 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin): return exc.HTTPServerError(message='Read Error') def _assert_pass_rbac(self, roles, method_under_test, accept=None, - content_type=None): + content_type=None, user_id=None, project_id=None): """Assert that RBAC authorization rules passed for the specified roles. :param roles: List of roles to check, one at a time @@ -168,7 +173,9 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin): for role in roles: self.req = self._generate_req(roles=[role] if role else [], accept=accept, - content_type=content_type) + content_type=content_type, + user_id=user_id, + project_id=project_id) # Force an exception early past the RBAC passing. self.req.body_file = self._generate_stream_for_exit() @@ -177,7 +184,7 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin): self._assert_post_rbac_exception(exception, role) def _assert_fail_rbac(self, roles, method_under_test, accept=None, - content_type=None): + content_type=None, user_id=None, project_id=None): """Assert that RBAC rules failed for one of the specified roles. :param roles: List of roles to check, one at a time @@ -188,7 +195,9 @@ class BaseTestCase(utils.BaseTestCase, utils.MockModelRepositoryMixin): for role in roles: self.req = self._generate_req(roles=[role] if role else [], accept=accept, - content_type=content_type) + content_type=content_type, + user_id=user_id, + project_id=project_id) exception = self.assertRaises(exc.HTTPForbidden, method_under_test) self.assertEqual(403, exception.status_int) @@ -283,12 +292,15 @@ class WhenTestingSecretsResource(BaseTestCase): class WhenTestingSecretResource(BaseTestCase): - """RBAC tests for the barbican.api.resources.SecretResource class.""" + """RBAC tests for SecretController class.""" + def setUp(self): super(WhenTestingSecretResource, self).setUp() self.external_project_id = '12345project' self.secret_id = '12345secret' + self.user_id = '123456user' + self.creator_user_id = '123456CreatorUser' # Force an error on GET and DELETE calls that pass RBAC, # as we are not testing such flows in this test module. @@ -305,7 +317,20 @@ class WhenTestingSecretResource(BaseTestCase): self.setup_secret_meta_repository_mock() self.setup_transport_key_repository_mock() - self.resource = SecretResource(self.secret_id) + acl_read = models.SecretACL(secret_id=self.secret_id, operation='read', + creator_only=False, + user_ids=[self.user_id, 'anyRandomId']) + self.acl_list = [acl_read] + secret = mock.MagicMock() + secret.secret_acls.__iter__.return_value = self.acl_list + secret.project_assocs[0].projects.external_id = (self. + external_project_id) + secret.creator_id = self.creator_user_id + + self.resource = SecretResource(secret) + + # self.resource.controller.get_acl_tuple = mock.MagicMock( + # return_value=(None, None)) def test_rules_should_be_loaded(self): self.assertIsNotNone(self.policy_enforcer.rules) @@ -314,16 +339,236 @@ class WhenTestingSecretResource(BaseTestCase): self._assert_pass_rbac(['admin', 'observer', 'creator'], self._invoke_on_get, accept='notjsonaccepttype', - content_type='application/json') + content_type='application/json', + user_id=self.user_id, + project_id=self.external_project_id) def test_should_raise_decrypt_secret(self): + self._assert_fail_rbac([None, 'audit', 'bogus'], self._invoke_on_get, accept='notjsonaccepttype') + def test_should_pass_decrypt_secret_for_same_project_with_no_acl(self): + """Token and secret project needs to be same in no ACL defined case.""" + self.acl_list.pop() # remove read acl from default setup + self._assert_pass_rbac(['admin', 'observer', 'creator'], + self._invoke_on_get, + accept='notjsonaccepttype', + content_type='application/json', + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_raise_decrypt_secret_for_with_creator_only_enabled(self): + """Should raise authz error as secret is marked private. + + As secret is private so project users should not be able to access + the secret. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.SecretACL(secret_id=self.secret_id, operation='read', + creator_only=True, + user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + accept='notjsonaccepttype', + content_type='application/json', + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_pass_decrypt_secret_private_enabled_with_read_acl(self): + """Should pass authz as user has read acl for private secret. + + Even though secret is private, user with read acl should be able to + access the secret. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.SecretACL(secret_id=self.secret_id, operation='read', + creator_only=True, + user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + accept='notjsonaccepttype', + content_type='application/json', + user_id='aclUser1', + project_id=self.external_project_id) + + def test_should_pass_decrypt_secret_different_user_valid_read_acl(self): + self.acl_list.pop() # remove read acl from default setup + acl_read = models.SecretACL(secret_id=self.secret_id, operation='read', + creator_only=False, + user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + # token project_id is different from secret's project id but another + # user (from different project) has read acl for secret so should pass + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + accept='notjsonaccepttype', + content_type='application/json', + user_id='aclUser1', + project_id='different_project_id') + + def test_should_raise_decrypt_secret_for_different_user_no_read_acl(self): + self.acl_list.pop() # remove read acl from default setup + acl_read = models.SecretACL(secret_id=self.secret_id, + operation='write', + creator_only=False, + user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + # token project_id is different from secret's project id but another + # user (from different project) has read acl for secret so should pass + self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + accept='notjsonaccepttype', + content_type='application/json', + user_id='aclUser1', + project_id='different_project_id') + + def test_should_pass_decrypt_secret_for_creator_user_with_no_acl(self): + """Check for creator user rule for secret decrypt/get call. + + If token's user is same as creator of secret, then they are allowed + to read decrypted secret regardless of token's project or token's + role. + """ + self.acl_list.pop() # remove read acl from default setup + self.resource.controller.secret.creator_id = 'creatorUserX' + # token user is creator so allowed get call regardless of role + # user (from different project) has read acl for secret so should pass + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + accept='notjsonaccepttype', + content_type='application/json', + user_id='creatorUserX', + project_id='different_project_id') + def test_should_pass_get_secret(self): self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'], - self._invoke_on_get) + self._invoke_on_get, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_pass_get_secret_with_no_context(self): + """In unauthenticated flow, get secret should work.""" + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get_without_context, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_raise_get_secret_for_different_project_no_acl(self): + """Should raise error when secret and token's project is different.""" + self.acl_list.pop() # remove read acl from default setup + # token project_id is different from secret's project id so should fail + self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id=self.user_id, + project_id='different_id') + + def test_should_pass_get_secret_for_same_project_but_different_user(self): + # user id should not matter as long token and secret's project match + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id='different_user_id', + project_id=self.external_project_id) + + def test_should_pass_get_secret_for_same_project_with_no_acl(self): + self.acl_list.pop() # remove read acl from default setup + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_raise_get_secret_for_with_creator_only_enabled(self): + """Should raise authz error as secret is marked private. + + As secret is private so project users should not be able to access + the secret. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.SecretACL(secret_id=self.secret_id, operation='read', + creator_only=True, + user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_pass_get_secret_for_private_enabled_with_read_acl(self): + """Should pass authz as user has read acl for private secret. + + Even though secret is private, user with read acl should be able to + access the secret. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.SecretACL(secret_id=self.secret_id, operation='read', + creator_only=True, + user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + user_id='aclUser1', + project_id=self.external_project_id) + + def test_should_pass_get_secret_different_user_with_valid_read_acl(self): + """Should allow when read ACL is defined for a user. + + Secret's own project and token's project is different but read is + allowed because of valid read ACL. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.SecretACL(secret_id=self.secret_id, operation='read', + creator_only=False, + user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + # token project_id is different from secret's project id but another + # user (from different project) has read acl for secret so should pass + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + user_id='aclUser1', + project_id='different_project_id') + + def test_should_raise_get_secret_for_different_user_with_no_read_acl(self): + """Get secret fails when no read acl is defined. + + With different secret and token's project, read is not allowed without + a read ACL. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.SecretACL(secret_id=self.secret_id, + operation='write', + creator_only=False, + user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + # token project_id is different from secret's project id but another + # user (from different project) has read acl for secret so should pass + self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id='aclUser1', + project_id='different_project_id') + + def test_should_pass_get_secret_for_creator_user_with_no_acl(self): + """Check for creator user rule for secret get call. + + If token's user is same as creator of secret, then he/she is allowed + to read secret regardless of token's project or token's role. + """ + self.acl_list.pop() # remove read acl from default setup + self.resource.controller.secret.creator_id = 'creatorUserX' + # token user is creator so allowed get call regardless of role + # user (from different project) has read acl for secret so should pass + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + user_id='creatorUserX', + project_id='different_project_id') def test_should_raise_get_secret(self): self._assert_fail_rbac([None, 'bogus'], @@ -331,7 +576,9 @@ class WhenTestingSecretResource(BaseTestCase): def test_should_pass_put_secret(self): self._assert_pass_rbac(['admin', 'creator'], self._invoke_on_put, - content_type="application/octet-stream") + content_type="application/octet-stream", + user_id=self.user_id, + project_id=self.external_project_id) def test_should_raise_put_secret(self): self._assert_fail_rbac([None, 'audit', 'observer', 'bogus'], @@ -339,16 +586,224 @@ class WhenTestingSecretResource(BaseTestCase): content_type="application/octet-stream") def test_should_pass_delete_secret(self): - self._assert_pass_rbac(['admin'], self._invoke_on_delete) + self._assert_pass_rbac(['admin'], self._invoke_on_delete, + user_id=self.user_id, + project_id=self.external_project_id) def test_should_raise_delete_secret(self): self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'], self._invoke_on_delete) + # @mock.patch.object(secrets.SecretController, 'get_acl_tuple', + # return_value=(None, None)) def _invoke_on_get(self): self.resource.on_get(self.req, self.resp, self.external_project_id) + def _invoke_on_get_without_context(self): + # Adding this to get code coverage around context check lines + self.req.environ.pop('barbican.context') + self.resource.on_get(self.req, self.resp, + self.external_project_id) + + def _invoke_on_put(self): + self.resource.on_put(self.req, self.resp, + self.external_project_id) + + def _invoke_on_delete(self): + self.resource.on_delete(self.req, self.resp, + self.external_project_id) + + +class WhenTestingContainerResource(BaseTestCase): + """RBAC tests for ContainerController class. + + Container controller tests are quite similar to SecretController as + policy logic is same. Just adding them here to make sure logic related to + acl gathering data works as expected. + """ + + def setUp(self): + super(WhenTestingContainerResource, self).setUp() + + self.external_project_id = '12345project' + self.container_id = '12345secret' + self.user_id = '123456user' + self.creator_user_id = '123456CreatorUser' + + # Force an error on GET and DELETE calls that pass RBAC, + # as we are not testing such flows in this test module. + self.container_repo = mock.MagicMock() + fail_method = mock.MagicMock(return_value=None, + side_effect=self._generate_get_error()) + self.container_repo.get = fail_method + self.container_repo.delete_entity_by_id = fail_method + + acl_read = models.ContainerACL( + container_id=self.container_id, operation='read', + creator_only=False, user_ids=[self.user_id, 'anyRandomId']) + self.acl_list = [acl_read] + container = mock.MagicMock() + container.container_acls.__iter__.return_value = self.acl_list + container.project.external_id = self.external_project_id + container.creator_id = self.creator_user_id + + self.container_repo.get_container_by_id.return_value = container + + self.setup_container_repository_mock(self.container_repo) + + self.resource = ContainerResource(self.container_id) + + def test_rules_should_be_loaded(self): + self.assertIsNotNone(self.policy_enforcer.rules) + + def test_should_pass_get_container(self): + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_pass_get_container_with_no_context(self): + """In unauthenticated flow, get container should work.""" + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get_without_context, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_raise_get_container_for_different_project_no_acl(self): + """Raise error when container and token's project is different.""" + self.acl_list.pop() # remove read acl from default setup + # token project_id is different from secret's project id so should fail + self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id=self.user_id, + project_id='different_id') + + def test_should_pass_get_container_for_same_project_but_different_user( + self): + """Should pass if token and secret's project match. + + User id should not matter as long token and container's project match. + """ + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id='different_user_id', + project_id=self.external_project_id) + + def test_should_pass_get_container_for_same_project_with_no_acl(self): + self.acl_list.pop() # remove read acl from default setup + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_raise_get_container_for_with_creator_only_enabled(self): + """Should raise authz error as container is marked private. + + As container is private so project users should not be able to access + the secret. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.ContainerACL( + container_id=self.container_id, operation='read', + creator_only=True, user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_pass_get_container_for_private_enabled_with_read_acl(self): + """Should pass authz as user has read acl for private container. + + Even though container is private, user with read acl should be able to + access the container. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.ContainerACL( + container_id=self.container_id, operation='read', + creator_only=True, user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + user_id='aclUser1', + project_id=self.external_project_id) + + def test_should_pass_get_container_different_user_with_valid_read_acl( + self): + """Should allow when read ACL is defined for a user. + + Container's own project and token's project is different but read is + allowed because of valid read ACL. User can read regardless of what is + token's project as it has necessary ACL. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.ContainerACL( + container_id=self.container_id, operation='read', + creator_only=False, user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + user_id='aclUser1', + project_id='different_project_id') + + def test_should_raise_get_container_for_different_user_with_no_read_acl( + self): + """Get secret fails when no read acl is defined. + + With different container and token's project, read is not allowed + without a read ACL. + """ + self.acl_list.pop() # remove read acl from default setup + acl_read = models.ContainerACL( + container_id=self.container_id, operation='write', + creator_only=False, user_ids=['anyRandomUserX', 'aclUser1']) + self.acl_list.append(acl_read) + # token project_id is different from secret's project id but another + # user (from different project) has read acl for secret so should pass + self._assert_fail_rbac(['admin', 'observer', 'creator', 'audit'], + self._invoke_on_get, + user_id='aclUser1', + project_id='different_project_id') + + def test_should_pass_get_container_for_creator_user_with_no_acl(self): + """Check for creator user rule for container get call. + + If token's user is same as creator of container, then he/she is allowed + to read container regardless of token's project or token's role. + """ + self.acl_list.pop() # remove read acl from default setup + self._assert_pass_rbac(['admin', 'observer', 'creator', 'audit', + 'bogusRole'], + self._invoke_on_get, + user_id=self.creator_user_id, + project_id='different_project_id') + + def test_should_raise_get_container(self): + self._assert_fail_rbac([None, 'bogus'], + self._invoke_on_get) + + def test_should_pass_delete_container(self): + self._assert_pass_rbac(['admin'], self._invoke_on_delete, + user_id=self.user_id, + project_id=self.external_project_id) + + def test_should_raise_delete_container(self): + self._assert_fail_rbac([None, 'audit', 'observer', 'creator', 'bogus'], + self._invoke_on_delete) + + def _invoke_on_get(self): + self.resource.on_get(self.req, self.resp, + self.external_project_id) + + def _invoke_on_get_without_context(self): + # Adding this to get code coverage around context check lines + self.req.environ.pop('barbican.context') + self.resource.on_get(self.req, self.resp, + self.external_project_id) + def _invoke_on_put(self): self.resource.on_put(self.req, self.resp, self.external_project_id) diff --git a/etc/barbican/policy.json b/etc/barbican/policy.json index 987e895ff..1ea4ec388 100644 --- a/etc/barbican/policy.json +++ b/etc/barbican/policy.json @@ -1,33 +1,46 @@ { + "admin": "role:admin", + "observer": "role:observer", + "creator_role": "role:creator", + "audit": "role:audit", + "admin_or_user_does_not_work": "project_id:%(project_id)s", + "admin_or_user": "role:admin or project_id:%(project_id)s", + "admin_or_creator_role": "role:admin or role:creator", + "all_but_audit": "role:admin or role:observer or role:creator", + "all_users": "role:admin or role:observer or role:creator or role:audit", + "secret_project_match": "project:%(target.secret.project_id)s", + "secret_acl_read": "'read':%(target.secret.read)s", + "secret_private_read": "'True':%(target.secret.read_creator_only)s", + "secret_creator_user": "user:%(target.secret.creator_id)s", + "container_project_match": "project:%(target.container.project_id)s", + "container_acl_read": "'read':%(target.container.read)s", + "container_private_read": "'True':%(target.container.read_creator_only)s", + "container_creator_user": "user:%(target.container.creator_id)s", + + "secret_non_private_read": "rule:all_users and rule:secret_project_match and not rule:secret_private_read", + "secret_decrypt_non_private_read": "rule:all_but_audit and rule:secret_project_match and not rule:secret_private_read", + "container_non_private_read": "rule:all_users and rule:container_project_match and not rule:container_private_read", + "version:get": "@", - "secret:decrypt": "rule:all_but_audit", - "secret:get": "rule:all_users", - "secret:put": "rule:admin_or_creator", - "secret:delete": "rule:admin", - "secrets:post": "rule:admin_or_creator", + "secret:decrypt": "rule:secret_decrypt_non_private_read or rule:secret_creator_user or rule:secret_acl_read", + "secret:get": "rule:secret_non_private_read or rule:secret_creator_user or rule:secret_acl_read", + "secret:put": "rule:admin_or_creator_role and rule:secret_project_match", + "secret:delete": "rule:admin and rule:secret_project_match", + "secrets:post": "rule:admin_or_creator_role", "secrets:get": "rule:all_but_audit", - "orders:post": "rule:admin_or_creator", + "orders:post": "rule:admin_or_creator_role", "orders:get": "rule:all_but_audit", "order:get": "rule:all_users", - "order:put":"rule:admin_or_creator", + "order:put": "rule:admin_or_creator_role", "order:delete": "rule:admin", "consumer:get": "rule:all_users", "consumers:get": "rule:all_users", "consumers:post": "rule:admin", "consumers:delete": "rule:admin", - "containers:post": "rule:admin_or_creator", + "containers:post": "rule:admin_or_creator_role", "containers:get": "rule:all_but_audit", - "container:get": "rule:all_users", + "container:get": "rule:container_non_private_read or rule:container_creator_user or rule:container_acl_read", "container:delete": "rule:admin", - "admin": "role:admin", - "observer": "role:observer", - "creator": "role:creator", - "audit": "rule:audit", - "admin_or_user_does_not_work": "project_id:%(project_id)s", - "admin_or_user": "role:admin or project_id:%(project_id)s", - "admin_or_creator": "role:admin or role:creator", - "all_but_audit": "role:admin or role:observer or role:creator", - "all_users": "role:admin or role:observer or role:creator or role:audit", "transport_key:get": "rule:all_users", "transport_key:delete": "rule:admin", "transport_keys:get": "rule:all_users", @@ -43,5 +56,19 @@ "certificate_authority:set_global_preferred": "rule:admin", "certificate_authority:unset_global_preferred": "rule:admin", "certificate_authority:get_global_preferred": "rule:all_users", - "certificate_authority:get_preferred_ca": "rule:all_users" + "certificate_authority:get_preferred_ca": "rule:all_users", + "secret_acls:post": "rule:admin_or_creator_role and rule:secret_project_match", + "secret_acls:patch": "rule:admin_or_creator_role and rule:secret_project_match", + "secret_acls:delete": "rule:admin_or_creator_role and rule:secret_project_match", + "secret_acls:get": "rule:all_but_audit and rule:secret_project_match", + "secret_acl:get": "rule:all_but_audit and rule:secret_project_match", + "secret_acl:patch": "rule:admin_or_creator_role and rule:secret_project_match", + "secret_acl:delete": "rule:admin_or_creator_role and rule:secret_project_match", + "container_acls:post": "rule:admin_or_creator_role and rule:container_project_match", + "container_acls:patch": "rule:admin_or_creator_role and rule:container_project_match", + "container_acls:delete": "rule:admin_or_creator_role and rule:container_project_match", + "container_acls:get": "rule:all_but_audit and rule:container_project_match", + "container_acl:get": "rule:all_but_audit and rule:container_project_match", + "container_acl:patch": "rule:admin_or_creator_role and rule:container_project_match", + "container_acl:delete": "rule:admin_or_creator_role and rule:container_project_match" }