From bbfe47d532ddf652492cb10a230ac166d7c6e421 Mon Sep 17 00:00:00 2001 From: Arun Kant Date: Wed, 6 May 2015 23:48:06 -0700 Subject: [PATCH] Removed per ACL operations and added support for PUT method. Changes has been done as per comments in https://review.openstack.org/#/c/178479/ All of APIs for /secrets/{uuid}/acls/{uuid} resource are removed. ACL resource is changed to /secrets/{uuid}/acl. PUT (complete update) support is added for above ACL resource. PUT is used instead of POST for creating ACL resource as per API-WG recommendation. GET on ACL resource will return response similar to request format. PUT/PATCH will return above ACL resource reference in response. Now as every secret or container has a default acl which is returned on GET call when no explicit ACL is set. That's why PATCH will not return 404 when used on secret/container with no explicit ACL. Similarly DELETE will not return 404 on its invocation even if explicit ACL is not set or called multiple times. All of above changes are made to container ACL resource as well. Change-Id: I5bfccf4cb827845084fd2d5d734f476f2ed16146 --- barbican/api/controllers/acls.py | 548 +++----- barbican/api/controllers/containers.py | 2 +- barbican/api/controllers/secrets.py | 2 +- barbican/common/hrefs.py | 37 - barbican/model/repositories.py | 3 +- barbican/tests/api/controllers/test_acls.py | 1235 ++++++----------- barbican/tests/api/controllers/test_orders.py | 2 +- etc/barbican/policy.json | 22 +- 8 files changed, 614 insertions(+), 1237 deletions(-) diff --git a/barbican/api/controllers/acls.py b/barbican/api/controllers/acls.py index 7568e0d79..50ed5b223 100644 --- a/barbican/api/controllers/acls.py +++ b/barbican/api/controllers/acls.py @@ -13,6 +13,7 @@ import itertools import pecan +import six from barbican import api from barbican.api import controllers @@ -26,113 +27,21 @@ from barbican.model import repositories as repo LOG = utils.getLogger(__name__) -def _acls_not_found(acl_for=None): - """Throw exception indicating no secret or container acls found.""" - pecan.abort(404, u._('Not Found. Sorry no ACL found for given {0}.'). - format(acl_for)) +def _convert_acl_to_response_format(acl, acls_dict): + fields = acl.to_dict_fields() + operation = fields['operation'] + + acl_data = {} # dict for each acl operation data + + acl_data['creator-only'] = fields['creator_only'] + acl_data['users'] = fields.get('users', []) + acl_data['created'] = fields['created'] + acl_data['updated'] = fields['updated'] + + acls_dict[operation] = acl_data -def _acl_not_found(): - """Throw exception indicating no secret or container acl found.""" - pecan.abort(404, u._('Not Found. Sorry no ACL found for given id.')) - - -def _acls_already_exist(): - """Throw exception indicating secret or container acls already exist.""" - pecan.abort(400, u._('Existing ACL cannot be updated with POST method.')) - - -def _acl_operation_update_not_allowed(): - """Throw exception indicating existing secret or container acl operation. - - Operation cannot be changed for an existing ACL. Allowed change is list of - users and/or creator_only flag change. - """ - pecan.abort(400, u._("Existing ACL's operation cannot be updated.")) - - -class SecretACLController(controllers.ACLMixin): - """Handles a SecretACL entity retrieval and update requests.""" - - def __init__(self, acl_id, secret_project_id, secret): - self.acl_id = acl_id - self.secret_project_id = secret_project_id - self.secret = secret - self.acl_repo = repo.get_secret_acl_repository() - self.validator = validators.ACLValidator() - - def get_acl_tuple(self, req, **kwargs): - d = {'project_id': self.secret_project_id} - return 'secret', d - - @pecan.expose(generic=True) - def index(self): - pecan.abort(405) # HTTP 405 Method Not Allowed as default - - @index.when(method='GET', template='json') - @controllers.handle_exceptions(u._('SecretACL retrieval')) - @controllers.enforce_rbac('secret_acl:get') - def on_get(self, external_project_id): - secret_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - - if not secret_acl: - _acl_not_found() - - dict_fields = secret_acl.to_dict_fields() - - return hrefs.convert_acl_to_hrefs(dict_fields) - - @index.when(method='PATCH', template='json') - @controllers.handle_exceptions(u._('A SecretACL Update')) - @controllers.enforce_rbac('secret_acl:patch') - @controllers.enforce_content_types(['application/json']) - def on_patch(self, external_project_id, **kwargs): - """Handles existing secret ACL update for given acl id.""" - - data = api.load_body(pecan.request, validator=self.validator) - LOG.debug('Start SecretACLController on_patch...%s', data) - - secret_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - if not secret_acl: - _acl_not_found() - - # Make sure request acl operation matches with acl stored in db - operation = secret_acl.operation - input_acl = data.get(operation) - if not input_acl: - _acl_operation_update_not_allowed() - - creator_only = input_acl.get('creator-only') - user_ids = input_acl.get('users') - if creator_only is not None: - secret_acl.creator_only = creator_only - - self.acl_repo.create_or_replace_from(self.secret, - secret_acl=secret_acl, - user_ids=user_ids) - acl_ref = hrefs.convert_acl_to_hrefs(secret_acl.to_dict_fields()) - - return {'acl_ref': acl_ref['acl_ref']} - - @index.when(method='DELETE', template='json') - @controllers.handle_exceptions(u._('SecretACL deletion')) - @controllers.enforce_rbac('secret_acl:delete') - def on_delete(self, external_project_id, **kwargs): - """Deletes existing ACL by acl_id provided in URI.""" - - secret_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - - if not secret_acl: - _acl_not_found() - - self.acl_repo.delete_entity_by_id(entity_id=self.acl_id, - external_project_id=None) +DEFAULT_ACL = {'read': {'creator-only': False}} class SecretACLsController(controllers.ACLMixin): @@ -146,14 +55,10 @@ class SecretACLsController(controllers.ACLMixin): self.validator = validators.ACLValidator() def get_acl_tuple(self, req, **kwargs): - d = {'project_id': self.secret_project_id} + d = {'project_id': self.secret_project_id, + 'creator_id': self.secret.creator_id} return 'secret', d - @pecan.expose() - def _lookup(self, acl_id, *remainder): - return SecretACLController(acl_id, self.secret_project_id, - self.secret), remainder - @pecan.expose(generic=True) def index(self, **kwargs): pecan.abort(405) # HTTP 405 Method Not Allowed as default @@ -165,60 +70,11 @@ class SecretACLsController(controllers.ACLMixin): LOG.debug('Start secret ACL on_get ' 'for secret-ID %s:', self.secret.id) - return self._return_acl_hrefs(self.secret.id) - - @index.when(method='POST', template='json') - @controllers.handle_exceptions(u._('SecretACL(s) creation')) - @controllers.enforce_rbac('secret_acls:post') - @controllers.enforce_content_types(['application/json']) - def on_post(self, external_project_id, **kwargs): - """Handles secret acls creation request. - - Once a set of ACLs exists for a given secret, it can only be updated - via PATCH method. In create, multiple operation ACL payload can be - specified as mentioned in sample below. - - { - "read":{ - "users":[ - "5ecb18f341894e94baca9e8c7b6a824a" - ] - }, - "write":{ - "users":[ - "20b63d71f90848cf827ee48074f213b7", - "5ecb18f341894e94baca9e8c7b6a824a" - ], - "creator-only":false - } - } - """ - - count = self.acl_repo.get_count(self.secret.id) - LOG.debug('Count of existing ACL on_post is [%s] ' - ' for secret-ID %s:', count, self.secret.id) - if count > 0: - _acls_already_exist() - - data = api.load_body(pecan.request, validator=self.validator) - LOG.debug('Start on_post...%s', data) - - for operation in itertools.ifilter(lambda x: data.get(x), - validators.ACL_OPERATIONS): - input_cr_only = data[operation].get('creator-only') - creator_only = True if input_cr_only else False - new_acl = models.SecretACL(self.secret.id, operation=operation, - creator_only=creator_only) - self.acl_repo.create_or_replace_from( - self.secret, secret_acl=new_acl, - user_ids=data[operation].get('users')) - - pecan.response.status = 201 - return self._return_acl_hrefs(self.secret.id) + return self._return_acl_list_response(self.secret.id) @index.when(method='PATCH', template='json') @controllers.handle_exceptions(u._('SecretACL(s) Update')) - @controllers.enforce_rbac('secret_acls:patch') + @controllers.enforce_rbac('secret_acls:put_patch') @controllers.enforce_content_types(['application/json']) def on_patch(self, external_project_id, **kwargs): """Handles update of existing secret acl requests. @@ -244,13 +100,6 @@ class SecretACLsController(controllers.ACLMixin): } } """ - - count = self.acl_repo.get_count(self.secret.id) - LOG.debug('Count of existing ACL on_secret is [%s] ' - ' for secret-ID %s:', count, self.secret.id) - if count == 0: - _acls_not_found("secret") - data = api.load_body(pecan.request, validator=self.validator) LOG.debug('Start on_patch...%s', data) @@ -271,7 +120,72 @@ class SecretACLsController(controllers.ACLMixin): self.acl_repo.create_or_replace_from(self.secret, secret_acl=s_acl, user_ids=user_ids) - return self._return_acl_hrefs(self.secret.id) + acl_ref = '{0}/acl'.format( + hrefs.convert_secret_to_href(self.secret.id)) + return {'acl_ref': acl_ref} + + @index.when(method='PUT', template='json') + @controllers.handle_exceptions(u._('SecretACL(s) Update')) + @controllers.enforce_rbac('secret_acls:put_patch') + @controllers.enforce_content_types(['application/json']) + def on_put(self, external_project_id, **kwargs): + """Handles update of existing secret acl requests. + + Replaces existing secret ACL(s) with input ACL(s) data. Existing + ACL operation not specified in input are removed as part of update. + For missing creator-only in ACL, false is used as default. + In update, multiple operation ACL payload can be specified as + mentioned in sample below. A specific ACL can be updated by its + own id via SecretACLController patch request. + + { + "read":{ + "users":[ + "5ecb18f341894e94baca9e8c7b6a824a", + "20b63d71f90848cf827ee48074f213b7", + "c7753f8da8dc4fbea75730ab0b6e0ef4" + ] + }, + "write":{ + "users":[ + "5ecb18f341894e94baca9e8c7b6a824a" + ], + "creator-only":true + } + } + + Every secret, by default, has an implicit ACL in case client has not + defined an explicit ACL. That default ACL definition, DEFAULT_ACL, + signifies that a secret by default has project based access i.e. client + with necessary roles on secret project can access the secret. That's + why when ACL is added to a secret, it always returns 200 (and not 201) + indicating existence of implicit ACL on a secret. + """ + data = api.load_body(pecan.request, validator=self.validator) + LOG.debug('Start on_put...%s', data) + + existing_acls_map = {acl.operation: acl for acl in + self.secret.secret_acls} + for operation in itertools.ifilter(lambda x: data.get(x), + validators.ACL_OPERATIONS): + creator_only = data[operation].get('creator-only', False) + user_ids = data[operation].get('users', []) + s_acl = None + if operation in existing_acls_map: # update if matching acl exists + s_acl = existing_acls_map.pop(operation) + s_acl.creator_only = creator_only + else: + s_acl = models.SecretACL(self.secret.id, operation=operation, + creator_only=creator_only) + self.acl_repo.create_or_replace_from(self.secret, secret_acl=s_acl, + user_ids=user_ids) + # delete remaining existing acls as they are not present in input. + for acl in six.itervalues(existing_acls_map): + self.acl_repo.delete_entity_by_id(entity_id=acl.id, + external_project_id=None) + acl_ref = '{0}/acl'.format( + hrefs.convert_secret_to_href(self.secret.id)) + return {'acl_ref': acl_ref} @index.when(method='DELETE', template='json') @controllers.handle_exceptions(u._('SecretACL(s) deletion')) @@ -279,125 +193,37 @@ class SecretACLsController(controllers.ACLMixin): def on_delete(self, external_project_id, **kwargs): count = self.acl_repo.get_count(self.secret.id) - if count == 0: - _acls_not_found("secret") - self.acl_repo.delete_acls_for_secret(self.secret) + if count > 0: + self.acl_repo.delete_acls_for_secret(self.secret) - def _return_acl_hrefs(self, secret_id): + def _return_acl_list_response(self, secret_id): result = self.acl_repo.get_by_secret_id(secret_id) - if not result: - _acls_not_found("secret") - else: - acl_recs = [hrefs.convert_acl_to_hrefs(acl.to_dict_fields()) - for acl in result] - return [{'acl_ref': acl['acl_ref']} for acl in acl_recs] - - -class ContainerACLController(controllers.ACLMixin): - """Handles a ContainerACL entity retrieval and update requests.""" - - def __init__(self, acl_id, container_project_id, container): - self.acl_id = acl_id - self.container_project_id = container_project_id - self.container = container - self.acl_repo = repo.get_container_acl_repository() - self.validator = validators.ACLValidator() - - def get_acl_tuple(self, req, **kwargs): - d = {'project_id': self.container_project_id} - return 'container', d - - @pecan.expose(generic=True) - def index(self): - pecan.abort(405) # HTTP 405 Method Not Allowed as default - - @index.when(method='GET', template='json') - @controllers.handle_exceptions(u._('ContainerACL retrieval')) - @controllers.enforce_rbac('container_acl:get') - def on_get(self, external_project_id): - container_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - - if not container_acl: - _acl_not_found() - - dict_fields = container_acl.to_dict_fields() - - return hrefs.convert_acl_to_hrefs(dict_fields) - - @index.when(method='PATCH', template='json') - @controllers.handle_exceptions(u._('A ContainerACL Update')) - @controllers.enforce_rbac('container_acl:patch') - @controllers.enforce_content_types(['application/json']) - def on_patch(self, external_project_id, **kwargs): - """Handles existing container ACL update for given acl id.""" - data = api.load_body(pecan.request, validator=self.validator) - LOG.debug('Start ContainerACLController on_patch...%s', data) - - container_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - if not container_acl: - _acl_not_found() - - # Make sure request acl operation matches with acl stored in db - operation = container_acl.operation - input_acl = data.get(operation) - if not input_acl: - _acl_operation_update_not_allowed() - - creator_only = input_acl.get('creator-only') - user_ids = input_acl.get('users') - if creator_only is not None: - container_acl.creator_only = creator_only - - self.acl_repo.create_or_replace_from(self.container, - container_acl=container_acl, - user_ids=user_ids) - acl_ref = hrefs.convert_acl_to_hrefs(container_acl.to_dict_fields()) - - return {'acl_ref': acl_ref['acl_ref']} - - @index.when(method='DELETE', template='json') - @controllers.handle_exceptions(u._('ContainerACL deletion')) - @controllers.enforce_rbac('container_acl:delete') - def on_delete(self, external_project_id, **kwargs): - """Deletes existing ACL by acl_id provided in URI.""" - - container_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - if not container_acl: - _acl_not_found() - - self.acl_repo.delete_entity_by_id(entity_id=self.acl_id, - external_project_id=None) + acls_data = {} + if result: + for acl in result: + _convert_acl_to_response_format(acl, acls_data) + if not acls_data: + acls_data = DEFAULT_ACL.copy() + return acls_data class ContainerACLsController(controllers.ACLMixin): """Handles ContainerACL requests by a given container id.""" - def __init__(self, container_id): - self.container_id = container_id - self.container = None + def __init__(self, container): + self.container = container + self.container_id = container.id self.acl_repo = repo.get_container_acl_repository() self.container_repo = repo.get_container_repository() self.validator = validators.ACLValidator() - self.container_project_id = None + self.container_project_id = container.project.external_id def get_acl_tuple(self, req, **kwargs): - self._assert_id_and_set_container(suppress_exception=True) - d = {'project_id': self.container_project_id} + d = {'project_id': self.container_project_id, + 'creator_id': self.container.creator_id} return 'container', d - @pecan.expose() - def _lookup(self, acl_id, *remainder): - self._assert_id_and_set_container() - return (ContainerACLController(acl_id, self.container_project_id, - self.container), remainder) - @pecan.expose(generic=True) def index(self, **kwargs): pecan.abort(405) # HTTP 405 Method Not Allowed as default @@ -406,68 +232,14 @@ class ContainerACLsController(controllers.ACLMixin): @controllers.handle_exceptions(u._('ContainerACL(s) retrieval')) @controllers.enforce_rbac('container_acls:get') def on_get(self, external_project_id, **kw): - self._assert_id_and_set_container(suppress_exception=True) LOG.debug('Start container ACL on_get ' 'for container-ID %s:', self.container_id) - if not self.container: - controllers.containers.container_not_found() - return self._return_acl_hrefs(self.container.id) - - @index.when(method='POST', template='json') - @controllers.handle_exceptions(u._('ContainerACL(s) creation')) - @controllers.enforce_rbac('container_acls:post') - @controllers.enforce_content_types(['application/json']) - def on_post(self, external_project_id, **kwargs): - """Handles container acls creation request. - - Once a set of ACLs exists for a given container, it can only be updated - via PATCH method. In create, multiple operation ACL payload can be - specified as mentioned in sample below. - - { - "read":{ - "users":[ - "5ecb18f341894e94baca9e8c7b6a824a" - ] - }, - "write":{ - "users":[ - "20b63d71f90848cf827ee48074f213b7", - "5ecb18f341894e94baca9e8c7b6a824a" - ], - "creator-only":false - } - } - """ - self._assert_id_and_set_container() - - count = self.acl_repo.get_count(self.container.id) - LOG.debug('Count of existing ACL on_post is [%s] ' - ' for container-ID %s:', count, self.container.id) - if count > 0: - _acls_already_exist() - - data = api.load_body(pecan.request, validator=self.validator) - LOG.debug('Start ContainerACLsController on_post...%s', data) - - for operation in itertools.ifilter(lambda x: data.get(x), - validators.ACL_OPERATIONS): - in_cr_only = data[operation].get('creator-only') - creator_only = True if in_cr_only else False - new_acl = models.ContainerACL(self.container.id, - operation=operation, - creator_only=creator_only) - self.acl_repo.create_or_replace_from( - self.container, container_acl=new_acl, - user_ids=data[operation].get('users')) - - pecan.response.status = 201 - return self._return_acl_hrefs(self.container.id) + return self._return_acl_list_response(self.container.id) @index.when(method='PATCH', template='json') @controllers.handle_exceptions(u._('ContainerACL(s) Update')) - @controllers.enforce_rbac('container_acls:patch') + @controllers.enforce_rbac('container_acls:put_patch') @controllers.enforce_content_types(['application/json']) def on_patch(self, external_project_id, **kwargs): """Handles update of existing container acl requests. @@ -493,13 +265,6 @@ class ContainerACLsController(controllers.ACLMixin): } } """ - self._assert_id_and_set_container() - count = self.acl_repo.get_count(self.container.id) - LOG.debug('Count of existing ACL on_patch is [%s] ' - ' for container-ID %s:', count, self.container.id) - if count == 0: - _acls_not_found("container") - data = api.load_body(pecan.request, validator=self.validator) LOG.debug('Start ContainerACLsController on_patch...%s', data) @@ -521,42 +286,91 @@ class ContainerACLsController(controllers.ACLMixin): container_acl=c_acl, user_ids=user_ids) - return self._return_acl_hrefs(self.container.id) + acl_ref = '{0}/acl'.format( + hrefs.convert_container_to_href(self.container.id)) + return {'acl_ref': acl_ref} + + @index.when(method='PUT', template='json') + @controllers.handle_exceptions(u._('ContainerACL(s) Update')) + @controllers.enforce_rbac('container_acls:put_patch') + @controllers.enforce_content_types(['application/json']) + def on_put(self, external_project_id, **kwargs): + """Handles update of existing container acl requests. + + Replaces existing container ACL(s) with input ACL(s) data. Existing + ACL operation not specified in input are removed as part of update. + For missing creator-only in ACL, false is used as default. + In update, multiple operation ACL payload can be specified as + mentioned in sample below. A specific ACL can be updated by its + own id via ContainerACLController patch request. + + { + "read":{ + "users":[ + "5ecb18f341894e94baca9e8c7b6a824a", + "20b63d71f90848cf827ee48074f213b7", + "c7753f8da8dc4fbea75730ab0b6e0ef4" + ] + }, + "write":{ + "users":[ + "5ecb18f341894e94baca9e8c7b6a824a" + ], + "creator-only":true + } + } + + Every container, by default, has an implicit ACL in case client has not + defined an explicit ACL. That default ACL definition, DEFAULT_ACL, + signifies that a container by default has project based access i.e. + client with necessary roles on container project can access the + container. That's why when ACL is added to a container, it always + returns 200 (and not 201) indicating existence of implicit ACL on a + container. + """ + + data = api.load_body(pecan.request, validator=self.validator) + LOG.debug('Start ContainerACLsController on_put...%s', data) + + existing_acls_map = {acl.operation: acl for acl in + self.container.container_acls} + for operation in itertools.ifilter(lambda x: data.get(x), + validators.ACL_OPERATIONS): + creator_only = data[operation].get('creator-only', False) + user_ids = data[operation].get('users', []) + if operation in existing_acls_map: # update if matching acl exists + c_acl = existing_acls_map.pop(operation) + c_acl.creator_only = creator_only + else: + c_acl = models.ContainerACL(self.container.id, + operation=operation, + creator_only=creator_only) + self.acl_repo.create_or_replace_from(self.container, + container_acl=c_acl, + user_ids=user_ids) + # delete remaining existing acls as they are not present in input. + for acl in six.itervalues(existing_acls_map): + self.acl_repo.delete_entity_by_id(entity_id=acl.id, + external_project_id=None) + acl_ref = '{0}/acl'.format( + hrefs.convert_container_to_href(self.container.id)) + return {'acl_ref': acl_ref} @index.when(method='DELETE', template='json') @controllers.handle_exceptions(u._('ContainerACL(s) deletion')) @controllers.enforce_rbac('container_acls:delete') def on_delete(self, external_project_id, **kwargs): - - self._assert_id_and_set_container() count = self.acl_repo.get_count(self.container_id) - if count == 0: - _acls_not_found("container") + if count > 0: + self.acl_repo.delete_acls_for_container(self.container) - self.acl_repo.delete_acls_for_container(self.container) - - def _assert_id_and_set_container(self, suppress_exception=False): - """Checks whether container_id is valid or not. - - Whether container is associated with token's project is not needed as - that check is now made via policy rule. - """ - if self.container: - return - controllers.assert_is_valid_uuid_from_uri(self.container_id) - self.container = self.container_repo.get_container_by_id( - entity_id=self.container_id, suppress_exception=True) - if not self.container and not suppress_exception: - controllers.containers.container_not_found() - if self.container: - self.container_project_id = self.container.project.external_id - - def _return_acl_hrefs(self, container_id): + def _return_acl_list_response(self, container_id): result = self.acl_repo.get_by_container_id(container_id) - if not result: - _acls_not_found("container") - else: - acl_recs = [hrefs.convert_acl_to_hrefs(acl.to_dict_fields()) - for acl in result] - return [{'acl_ref': acl['acl_ref']} for acl in acl_recs] + acls_data = {} + if result: + for acl in result: + _convert_acl_to_response_format(acl, acls_data) + if not acls_data: + acls_data = DEFAULT_ACL.copy() + return acls_data diff --git a/barbican/api/controllers/containers.py b/barbican/api/controllers/containers.py index e95728be8..cfb3f8cad 100644 --- a/barbican/api/controllers/containers.py +++ b/barbican/api/controllers/containers.py @@ -47,7 +47,7 @@ class ContainerController(controllers.ACLMixin): self.validator = validators.ContainerValidator() self.consumers = consumers.ContainerConsumersController( self.container_id) - self.acls = acls.ContainerACLsController(self.container_id) + self.acl = acls.ContainerACLsController(self.container) def get_acl_tuple(self, req, **kwargs): d = self.get_acl_dict_for_user(req, self.container.container_acls) diff --git a/barbican/api/controllers/secrets.py b/barbican/api/controllers/secrets.py index 62f8da8cd..654917a73 100644 --- a/barbican/api/controllers/secrets.py +++ b/barbican/api/controllers/secrets.py @@ -70,7 +70,7 @@ class SecretController(controllers.ACLMixin): @pecan.expose() def _lookup(self, sub_resource, *remainder): - if sub_resource == 'acls': + if sub_resource == 'acl': return acls.SecretACLsController(self.secret), remainder else: pecan.abort(405) # only 'acl' as sub-resource is supported diff --git a/barbican/common/hrefs.py b/barbican/common/hrefs.py index de887b9c4..9f9bc6645 100644 --- a/barbican/common/hrefs.py +++ b/barbican/common/hrefs.py @@ -9,7 +9,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - from barbican.common import utils @@ -52,42 +51,6 @@ def convert_certificate_authority_to_href(ca_id): return convert_resource_id_to_href('cas', ca_id) -def convert_secret_acl_to_href(secret_id, acl_id): - """Convert the secret acl ID to a HATEOS-style href.""" - secret_href = convert_secret_to_href(secret_id) - return secret_href + '/acls/' + acl_id - - -def convert_container_acl_to_href(container_id, acl_id): - """Convert the container acl ID to a HATEOS-style href.""" - container_href = convert_container_to_href(container_id) - return container_href + '/acls/' + acl_id - - -def convert_acl_to_hrefs(fields): - acl_id = fields['acl_id'] - if 'secret_id' in fields: - fields['acl_ref'] = convert_secret_acl_to_href(fields['secret_id'], - acl_id) - del fields['acl_id'] - fields['secret_ref'] = convert_secret_to_href(fields['secret_id']) - del fields['secret_id'] - - if 'container_id' in fields: - fields['acl_ref'] = convert_container_acl_to_href( - fields['container_id'], acl_id) - del fields['acl_id'] - fields['container_ref'] = convert_container_to_href( - fields['container_id']) - del fields['container_id'] - - if 'creator_only' in fields: - fields['creator-only'] = fields['creator_only'] - del fields['creator_only'] - - return fields - - # TODO(hgedikli) handle list of fields in here def convert_to_hrefs(fields): """Convert id's within a fields dict to HATEOS-style hrefs.""" diff --git a/barbican/model/repositories.py b/barbican/model/repositories.py index 5b70a2b0b..de28fc2ca 100755 --- a/barbican/model/repositories.py +++ b/barbican/model/repositories.py @@ -1726,6 +1726,7 @@ class SecretACLRepo(BaseRepo): session=None): session = self.get_session(session) secret.updated_at = timeutils.utcnow() + secret_acl.updated_at = timeutils.utcnow() secret.secret_acls.append(secret_acl) secret.save(session=session) @@ -1820,7 +1821,7 @@ class ContainerACLRepo(BaseRepo): user_ids=None, session=None): session = self.get_session(session) container.updated_at = timeutils.utcnow() - + container_acl.updated_at = timeutils.utcnow() container.container_acls.append(container_acl) container.save(session=session) diff --git a/barbican/tests/api/controllers/test_acls.py b/barbican/tests/api/controllers/test_acls.py index d93264507..6ceb97355 100644 --- a/barbican/tests/api/controllers/test_acls.py +++ b/barbican/tests/api/controllers/test_acls.py @@ -15,13 +15,10 @@ import os import uuid +from barbican.api.controllers import acls from barbican.model import repositories from barbican.tests import utils -project_repo = repositories.get_project_repository() -secrets_repo = repositories.get_secret_repository() -tkey_repo = repositories.get_transport_key_repository() - class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): @@ -29,15 +26,13 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): """Create secret acls and compare stored values with request data.""" secret_uuid, _ = create_secret(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'secrets', secret_uuid, read_user_ids=['u1', 'u2']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) @@ -46,481 +41,286 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): """Should allow creating acls for a new secret with creator-only.""" secret_uuid, _ = create_secret(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'secrets', secret_uuid, read_creator_only=True) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) self.assertTrue(acl_map['read']['creator_only']) - def test_create_new_secret_acls_with_creator_only_false(self): - """Should allow creating acls for a new secret with creator-only.""" - secret_uuid, _ = create_secret(self.app) - - resp, acls = create_acls( - self.app, 'secrets', secret_uuid, - read_creator_only=False) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) - acl_map = _get_acl_map(secret_uuid, is_secret=True) - self.assertFalse(acl_map['read']['creator_only']) - - def test_new_secret_acls_with_invalid_creator_should_fail(self): + def test_new_secret_acls_with_invalid_creator_only_value_should_fail(self): """Should fail if creator-only flag is provided as string value.""" secret_uuid, _ = create_secret(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'secrets', secret_uuid, read_creator_only="False", read_user_ids=['u1', 'u3', 'u4'], expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) + self.assertEqual(400, resp.status_int) - resp, acls = create_acls( + resp = create_acls( self.app, 'secrets', secret_uuid, read_creator_only="None", expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) + self.assertEqual(400, resp.status_int) - def test_new_secret_acls_with_missing_secret_id_should_fail(self): - """Should fail if invalid secret id is provided in create request.""" - resp, acls = create_acls( - self.app, 'secrets', uuid.uuid4().hex, - read_creator_only="False", - read_user_ids=['u1', 'u3', 'u4'], - expect_errors=True) - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) - - def test_existing_acl_post_request_should_fail(self): - """Should fail if trying to add acls for secret with existing acls.""" - secret_uuid, _ = create_secret(self.app) - resp, acls = create_acls( - self.app, 'secrets', secret_uuid, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - - resp, acls = create_acls( - self.app, 'secrets', secret_uuid, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4'], - expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) - self.assertIn("Existing ACL cannot be updated", - resp.json['description']) - - def test_get_secret_acls_with_valid_secret_id(self): - """Read existing acls for a given valid secret id.""" + def test_get_secret_acls_with_complete_acl_data(self): + """Read existing acls for a with complete acl data.""" secret_id, _ = create_secret(self.app) - _, _ = create_acls( + create_acls( + self.app, 'secrets', secret_id, + read_user_ids=['u1', 'u3'], read_creator_only=True) + + resp = self.app.get( + '/secrets/{0}/acl'.format(secret_id), + expect_errors=False) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + + self.assertIn('read', resp.json) + self.assertTrue(resp.json['read']['creator-only']) + self.assertIsNotNone(resp.json['read']['created']) + self.assertIsNotNone(resp.json['read']['updated']) + self.assertEqual(set(['u1', 'u3']), set(resp.json['read']['users'])) + + def test_get_secret_acls_with_creator_only_data(self): + """Read existing acls for acl when only creator-only flag is set.""" + secret_id, _ = create_secret(self.app) + create_acls( self.app, 'secrets', secret_id, read_creator_only=True) resp = self.app.get( - '/secrets/{0}/acls'.format(secret_id), + '/secrets/{0}/acl'.format(secret_id), expect_errors=False) - acls = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual(1, len(acls)) - for acl_ref in acls: - self.assertIn('/secrets/{0}/acls'.format(secret_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + + self.assertEqual([], resp.json['read']['users']) + self.assertTrue(resp.json['read']['creator-only']) + self.assertIsNotNone(resp.json['read']['created']) + self.assertIsNotNone(resp.json['read']['updated']) def test_get_secret_acls_invalid_secret_should_fail(self): - """Get secret acls should fail for invalid secret id.""" + """Get secret acls should fail for invalid secret id. + + This test applies to all secret ACLs methods as secret entity is + populated in same manner for get, put, patch, delete methods. + """ secret_id, _ = create_secret(self.app) - _, _ = create_acls( + create_acls( self.app, 'secrets', secret_id, read_creator_only=False, read_user_ids=['u1', 'u3', 'u4']) resp = self.app.get( - '/secrets/{0}/acls'.format(uuid.uuid4().hex), + '/secrets/{0}/acl'.format(uuid.uuid4().hex), expect_errors=True) - self.assertEqual(resp.status_int, 404) + self.assertEqual(404, resp.status_int) - def test_get_secret_acls_no_acls_defined_should_fail(self): + def test_get_secret_acls_no_acls_defined_return_default_acl(self): + """Get secret acls should pass when no acls defined for a secret.""" + secret_id, _ = create_secret(self.app) + + resp = self.app.get( + '/secrets/{0}/acl'.format(secret_id), + expect_errors=True) + self.assertEqual(200, resp.status_int) + self.assertEqual(acls.DEFAULT_ACL, resp.json) + + def test_get_secret_acls_with_incorrect_uri_should_fail(self): """Get secret acls should fail when no acls defined for a secret.""" secret_id, _ = create_secret(self.app) resp = self.app.get( - '/secrets/{0}/acls'.format(secret_id), + '/secrets/{0}/incorrect_acls'.format(secret_id), expect_errors=True) - self.assertEqual(resp.status_int, 404) + self.assertEqual(405, resp.status_int) - def test_update_secret_acls_modify_all_acls(self): - """Acls update where only user ids list is modified.""" + def test_full_update_secret_acls_modify_creator_only_value(self): + """ACLs full update with user ids where creator-only flag modified.""" secret_uuid, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_uuid, - read_user_ids=['u1', 'u2']) - - resp, acls = update_acls( - self.app, 'secrets', secret_uuid, - read_user_ids=['u1', 'u2', 'u5']) - - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) - acl_map = _get_acl_map(secret_uuid, is_secret=True) - # Check creator_only is False when not provided - self.assertFalse(acl_map['read']['creator_only']) - self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) - - def test_update_secret_acls_modify_creator_only_values(self): - """Acls update where user ids and creator-only flag is modified.""" - secret_uuid, _ = create_secret(self.app) - - _, _ = create_acls( + create_acls( self.app, 'secrets', secret_uuid, read_user_ids=['u1', 'u2'], read_creator_only=True) - resp, acls = update_acls( - self.app, 'secrets', secret_uuid, + # update acls with no user input so it should delete existing users + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=False, read_creator_only=False) - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) self.assertFalse(acl_map['read']['creator_only']) - self.assertIn('u1', acl_map['read'].to_dict_fields()['users']) - self.assertIn('u2', acl_map['read'].to_dict_fields()['users']) + self.assertIsNone(acl_map['read'].to_dict_fields().get('users')) - def test_update_secret_acls_partial_modify_read_users_only(self): + def test_full_update_secret_acls_modify_users_only(self): + """ACLs full update where specific operation acl is modified.""" + secret_uuid, _ = create_secret(self.app) + + create_acls( + self.app, 'secrets', secret_uuid, + read_user_ids=['u1', 'u2'], read_creator_only=True) + + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=False, + read_user_ids=['u1', 'u3', 'u5']) + + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) + acl_map = _get_acl_map(secret_uuid, is_secret=True) + self.assertFalse(acl_map['read']['creator_only']) + self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + + def test_full_update_secret_acls_with_read_users_only(self): + """Acls full update where specific operation acl is modified.""" + secret_uuid, _ = create_secret(self.app) + + create_acls( + self.app, 'secrets', secret_uuid, + read_user_ids=['u1', 'u2']) + + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # ACL api does not support 'list' operation so making direct db update + # in acl operation data to make sure full update removes this existing + # ACL. + secret_acl = acl_map['read'] + secret_acl.operation = 'list' + secret_acl.save() + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # check 'list' operation is there in db + self.assertIn('list', acl_map) + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=False, + read_user_ids=['u1', 'u3', 'u5']) + + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # make sure 'list' operation is no longer after full update + self.assertNotIn('list', acl_map) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) + + def test_partial_update_secret_acls_with_read_users_only(self): """Acls update where specific operation acl is modified.""" secret_uuid, _ = create_secret(self.app) - _, _ = create_acls( + create_acls( self.app, 'secrets', secret_uuid, read_user_ids=['u1', 'u2']) - resp, acls = update_acls( - self.app, 'secrets', secret_uuid, + acl_map = _get_acl_map(secret_uuid, is_secret=True) + + secret_acl = acl_map['read'] + secret_acl.operation = 'list' + secret_acl.save() + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # check 'list' operation is there in db + self.assertIn('list', acl_map) + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=True, read_user_ids=['u1', 'u3', 'u5']) - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # For partial update, existing other operation ACL is not tocuhed. + self.assertIn('list', acl_map) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['list'].to_dict_fields()['users'])) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + + def test_partial_update_secret_acls_when_no_acls_defined_should_pass(self): + """Acls partial update pass when no acls are defined for a secret. + + Partial update (PATCH) is applicable even when no explicit ACL has been + set as by default every secret has implicit acl definition. If PUT + is used, then new ACL is created instead. + """ + secret_id, _ = create_secret(self.app) + + resp = update_acls( + self.app, 'secrets', secret_id, partial_update=True, + read_user_ids=['u1', 'u3', 'u5'], expect_errors=False) + + self.assertEqual(200, resp.status_int) + acl_map = _get_acl_map(secret_id, is_secret=True) + self.assertFalse(acl_map['read']['creator_only']) + + def test_partial_update_secret_acls_modify_creator_only_values(self): + """Acls partial update where creator-only flag is modified.""" + secret_uuid, _ = create_secret(self.app) + + create_acls( + self.app, 'secrets', secret_uuid, + read_user_ids=['u1', 'u2'], + read_creator_only=True) + + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=True, + read_creator_only=False) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) self.assertFalse(acl_map['read']['creator_only']) - self.assertIn('u1', acl_map['read'].to_dict_fields()['users']) - self.assertIn('u3', acl_map['read'].to_dict_fields()['users']) - self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) - self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) - - def test_update_secret_acls_invalid_secret_should_fail(self): - """Acls update should fail when invalid secret is provided.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2']) - - resp, acls = update_acls( - self.app, 'secrets', uuid.uuid4().hex, - read_user_ids=['u1', 'u3', 'u5'], expect_errors=True) - - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) - - def test_update_secret_acls_when_no_acls_defined_should_fail(self): - """Acls update should fail when acls are defined for a secret.""" - secret_id, _ = create_secret(self.app) - - resp, acls = update_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u3', 'u5'], expect_errors=True) - - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['read'].to_dict_fields()['users'])) def test_delete_secret_acls_with_valid_secret_id(self): """Delete existing acls for a given secret.""" secret_id, _ = create_secret(self.app) - _, _ = create_acls( + create_acls( self.app, 'secrets', secret_id, read_creator_only=True) resp = self.app.delete( - '/secrets/{0}/acls'.format(secret_id), + '/secrets/{0}/acl'.format(secret_id), expect_errors=False) content = resp.json self.assertIsNone(content) # make sure there is no response - self.assertEqual(resp.status_int, 200) + self.assertEqual(200, resp.status_int) acl_map = _get_acl_map(secret_id, is_secret=True) self.assertFalse(acl_map) - def test_delete_secret_acls_invalid_secret_should_fail(self): - """Delete acls should fail when invalid secret id is provided.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False) - - resp = self.app.delete( - '/secrets/{0}/acls'.format(uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_delete_secret_acls_no_acl_defined_should_fail(self): - """Delete acls should fail when no acls are defined for a secret.""" + def test_delete_secret_acls_no_acl_defined_should_pass(self): + """Delete acls should pass when no acls are defined for a secret.""" secret_id, _ = create_secret(self.app) resp = self.app.delete( - '/secrets/{0}/acls'.format(secret_id), - expect_errors=True) - self.assertEqual(resp.status_int, 404) + '/secrets/{0}/acl'.format(secret_id), + expect_errors=False) + self.assertEqual(200, resp.status_int) - def test_invoke_secret_acls_put_should_fail(self): + def test_invoke_secret_acls_head_should_fail(self): """Should fail as put request to secret acls URI is not supported.""" secret_id, _ = create_secret(self.app) - resp = self.app.put( - '/secrets/{0}/acls'.format(secret_id), + resp = self.app.head( + '/secrets/{0}/acl'.format(secret_id), expect_errors=True) - self.assertEqual(resp.status_int, 405) - - -class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): - - def test_get_secret_acl_with_valid_acl_id(self): - """Read a specific acl by id and compare with request values.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, - acl_map['read']['id']), - expect_errors=False) - acl = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual('read', acl['operation']) - self.assertFalse(acl['creator-only']) - self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users'])) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, - acl_map['read']['id']), - expect_errors=False) - acl = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual('read', acl['operation']) - self.assertFalse(acl['creator-only']) - self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users'])) - - def test_get_secret_acl_invalid_acl_should_fail(self): - """Get acl request should fail with invalid acl id.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4']) - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, - uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_get_secret_acl_no_acl_defined_should_fail(self): - """Get acl request should fail with no acls defined for secret.""" - secret_id, _ = create_secret(self.app) - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, - uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_update_secret_acl_modify_all(self): - """Modify existing ACL users by using specific acl id.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=['u1', 'u2', 'u5'], read_creator_only=True) - - self.assertEqual(resp.status_int, 200) - acl_ref = resp.json - self.assertIn('/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - acl_ref['acl_ref']) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - acl['acl_ref']) - # Check creator_only is False when not provided - self.assertTrue(acl['creator-only']) - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - def test_update_secret_acl_with_duplicate_user_ids(self): - """Modify existing ACL users by using specific acl id.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=['u1', 'u2', 'u1', 'u5'], read_creator_only=True) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - acl['acl_ref']) - # Check creator_only is False when not provided - self.assertTrue(acl['creator-only']) - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - def test_update_secret_acl_modify_only_users(self): - """Modifying existing acl's user list and creator-only flag.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2'], - read_creator_only=True) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=['u1', 'u2', 'u5']) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - self.assertTrue(acl['creator-only']) - - # Now remove existing all users from ACL list - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=[]) - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - - acl = resp.json - self.assertIsNone(acl.get('users')) - self.assertTrue(acl['creator-only']) - - def test_update_secret_acl_invalid_acl_should_fail(self): - """Update should fail when invalid acl id is provided.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False) - - resp = update_acl( - self.app, 'secrets', secret_id, uuid.uuid4().hex, - read_creator_only=False, expect_errors=True) - - self.assertEqual(resp.status_int, 404) - - def test_update_secret_acl_when_no_acls_defined_should_fail(self): - """Update should fail when no secret acls are defined.""" - secret_id, _ = create_secret(self.app) - resp = update_acl( - self.app, 'secrets', secret_id, uuid.uuid4().hex, - read_creator_only=False, expect_errors=True) - - self.assertEqual(resp.status_int, 404) - - def test_delete_secret_acl_with_valid_acl_id(self): - """Delete existing acls for a given secret.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False) - - acl_map = _get_acl_map(secret_id, is_secret=True) - - read_acl_id = acl_map['read'].id - - resp = self.app.delete( - '/secrets/{0}/acls/{1}'.format(secret_id, read_acl_id), - expect_errors=False) - content = resp.json - self.assertIsNone(content) # make sure there is no response - self.assertEqual(resp.status_int, 200) - acl_map = _get_acl_map(secret_id, is_secret=True) - self.assertIsNone(acl_map.get('read')) # read acl should be deleted - - def test_delete_secret_acls_invalid_secret_should_fail(self): - """Delete acls should fail when invalid secret id is provided.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False) - - resp = self.app.delete( - '/secrets/{0}/acls/{1}'.format(secret_id, uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_invoke_secret_acl_put_should_fail(self): - """PUT for specific acl id is not supported.""" - secret_id, _ = create_secret(self.app) - resp = self.app.put( - '/secrets/{0}/acls/{1}'.format(secret_id, uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 405) + self.assertEqual(405, resp.status_int) class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): @@ -529,33 +329,32 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): """Create container acls and compare db values with request data.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u2']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['read'].to_dict_fields()['users'])) def test_create_new_container_acls_with_creator_only_false(self): """Should allow creating acls for a new container with creator-only.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_creator_only=False, read_user_ids=['u1', 'u3', 'u4']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) self.assertFalse(acl_map['read']['creator_only']) @@ -563,453 +362,285 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): """Should allow creating acls for a new container with creator-only.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_creator_only=True, read_user_ids=['u1', 'u3', 'u4']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) self.assertTrue(acl_map['read']['creator_only']) - def test_new_container_acls_with_invalid_creator_should_fail(self): + def test_container_acls_with_invalid_creator_only_value_should_fail(self): """Should fail if creator-only flag is provided as string value.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_creator_only="False", read_user_ids=['u1', 'u3', 'u4'], expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) + self.assertEqual(400, resp.status_int) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_creator_only="None", expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) + self.assertEqual(400, resp.status_int) - def test_new_container_acls_with_missing_container_id_should_fail(self): - """Create acls request should fail for invalid container id.""" - resp, acls = create_acls( - self.app, 'containers', uuid.uuid4().hex, - read_creator_only="False", - read_user_ids=['u1', 'u3', 'u4'], - expect_errors=True) - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) - - def test_existing_acl_post_request_should_fail(self): - """Should fail when adding acls for container with existing acls.""" + def test_get_container_acls_with_complete_acl_data(self): + """Read existing acls for a with complete acl data.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + create_acls( self.app, 'containers', container_id, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - - resp, acls = create_acls( - self.app, 'containers', container_id, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4'], - expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) - self.assertIn("Existing ACL cannot be updated", - resp.json['description']) - - def test_get_container_acls_with_valid_container_id(self): - """Read existing acls for a given valid container id.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) + read_user_ids=['u1', 'u3'], read_creator_only=True) resp = self.app.get( - '/containers/{0}/acls'.format(container_id), + '/containers/{0}/acl'.format(container_id), expect_errors=False) - acls = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual(1, len(acls)) - for acl_ref in acls: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) - def test_get_container_acls_invalid_container_should_fail(self): - """Get container acls should fail for invalid secret id.""" + self.assertIn('read', resp.json) + self.assertTrue(resp.json['read']['creator-only']) + self.assertIsNotNone(resp.json['read']['created']) + self.assertIsNotNone(resp.json['read']['updated']) + self.assertEqual(set(['u1', 'u3']), set(resp.json['read']['users'])) + + def test_get_container_acls_with_creator_only_data(self): + """Read existing acls for acl when only creator-only flag is set.""" container_id, _ = create_container(self.app) - _, _ = create_acls( + create_acls( + self.app, 'containers', container_id, + read_creator_only=True) + + resp = self.app.get( + '/containers/{0}/acl'.format(container_id), + expect_errors=False) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + + self.assertEqual([], resp.json['read']['users']) + self.assertTrue(resp.json['read']['creator-only']) + self.assertIsNotNone(resp.json['read']['created']) + self.assertIsNotNone(resp.json['read']['updated']) + + def test_get_container_acls_invalid_container_id_should_fail(self): + """Get container acls should fail for invalid secret id. + + This test applies to all container ACLs methods as secret entity is + populated in same manner for get, put, patch, delete methods. + """ + container_id, _ = create_container(self.app) + create_acls( self.app, 'containers', container_id, read_creator_only=False) resp = self.app.get( - '/containers/{0}/acls'.format(uuid.uuid4().hex), + '/containers/{0}/acl'.format(uuid.uuid4().hex), expect_errors=True) - self.assertEqual(resp.status_int, 404) + self.assertEqual(404, resp.status_int) - def test_get_container_acls_no_acls_defined_should_fail(self): - """Get container acls should fail when no acls defined for a secret.""" + def test_get_container_acls_invalid_non_uuid_secret_should_fail(self): + """Get container acls should fail for invalid (non-uuid) id.""" + container_id, _ = create_container(self.app) + create_acls( + self.app, 'containers', container_id, + read_creator_only=False) + + resp = self.app.get( + '/containers/{0}/acl'.format('my_container_id'), + expect_errors=True) + self.assertEqual(404, resp.status_int) + + def test_get_container_acls_no_acls_defined_return_default_acl(self): + """Get container acls should pass when no acls defined for a secret.""" container_id, _ = create_container(self.app) resp = self.app.get( - '/containers/{0}/acls'.format(container_id), + '/containers/{0}/acl'.format(container_id), expect_errors=True) - self.assertEqual(resp.status_int, 404) + self.assertEqual(200, resp.status_int) + self.assertEqual(acls.DEFAULT_ACL, resp.json) - def test_update_container_acls_modify_all_acls(self): + def test_full_update_container_acls_modify_all_acls(self): """Acls update where only user ids list is modified.""" container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, + create_acls( + self.app, 'containers', container_id, read_creator_only=True, read_user_ids=['u1', 'u2']) - resp, acls = update_acls( - self.app, 'containers', container_id, + resp = update_acls( + self.app, 'containers', container_id, partial_update=False, read_user_ids=['u1', 'u2', 'u5']) - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) - def test_update_container_acls_modify_creator_only_values(self): + def test_full_update_container_acls_modify_creator_only_values(self): """Acls update where user ids and creator-only flag is modified.""" container_id, _ = create_container(self.app) - _, _ = create_acls( + create_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u2']) - resp, acls = update_acls( - self.app, 'containers', container_id, + resp = update_acls( + self.app, 'containers', container_id, partial_update=False, read_creator_only=True) - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) self.assertTrue(acl_map['read']['creator_only']) + self.assertIsNone(acl_map['read'].to_dict_fields().get('users')) - def test_update_container_acls_invalid_secret_should_fail(self): - """Acls update should fail when invalid container is provided.""" + def test_full_update_container_acls_with_read_users_only(self): + """Acls full update where specific operation acl is modified.""" container_id, _ = create_container(self.app) - _, _ = create_acls( + + create_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u2']) - resp, acls = update_acls( - self.app, 'containers', uuid.uuid4().hex, - read_user_ids=['u1', 'u3', 'u5'], expect_errors=True) + acl_map = _get_acl_map(container_id, is_secret=False) + # ACL api does not support 'list' operation so making direct db update + # in acl operation data to make sure full update removes this existing + # ACL. + container_acl = acl_map['read'] + container_acl.operation = 'list' + container_acl.save() + acl_map = _get_acl_map(container_id, is_secret=False) + # check 'list' operation is there in db + self.assertIn('list', acl_map) + resp = update_acls( + self.app, 'containers', container_id, partial_update=False, + read_user_ids=['u1', 'u3', 'u5']) - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) + # make sure 'list' operation is no longer after full update + self.assertNotIn('list', acl_map) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) - def test_update_container_acls_when_no_acls_defined_should_fail(self): - """Acls update should fail when acls are defined for a container.""" + def test_partial_update_container_acls_with_read_users_only(self): + """Acls update where specific operation acl is modified.""" container_id, _ = create_container(self.app) - resp, acls = update_acls( + create_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u3', 'u5'], expect_errors=True) + read_user_ids=['u1', 'u2']) - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) + acl_map = _get_acl_map(container_id, is_secret=False) + + secret_acl = acl_map['read'] + secret_acl.operation = 'list' + secret_acl.save() + acl_map = _get_acl_map(container_id, is_secret=False) + # check 'list' operation is there in db + self.assertIn('list', acl_map) + resp = update_acls( + self.app, 'containers', container_id, partial_update=True, + read_user_ids=['u1', 'u3', 'u5']) + + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) + # For partial update, existing other operation ACL is not tocuhed. + self.assertIn('list', acl_map) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['list'].to_dict_fields()['users'])) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + + def test_partial_update_container_acls_when_no_acls_defined(self): + """Acls partial update pass when no acls are defined for container. + + Partial update (PATCH) is applicable even when no explicit ACL has been + set as by default every container has implicit acl definition. If PUT + is used, then new ACL is created instead. + """ + container_id, _ = create_container(self.app) + + resp = update_acls( + self.app, 'containers', container_id, partial_update=True, + read_user_ids=['u1', 'u3', 'u5'], expect_errors=False) + + self.assertEqual(200, resp.status_int) + acl_map = _get_acl_map(container_id, is_secret=False) + self.assertFalse(acl_map['read']['creator_only']) + + def test_partial_update_container_acls_modify_creator_only_values(self): + """Acls partial update where creator-only flag is modified.""" + container_id, _ = create_container(self.app) + + create_acls( + self.app, 'containers', container_id, + read_user_ids=['u1', 'u2'], + read_creator_only=True) + + resp = update_acls( + self.app, 'containers', container_id, partial_update=True, + read_creator_only=False) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['read'].to_dict_fields()['users'])) def test_delete_container_acls_with_valid_container_id(self): """Delete existing acls for a given container.""" container_id, _ = create_container(self.app) - _, _ = create_acls( + create_acls( self.app, 'containers', container_id, read_creator_only=False) resp = self.app.delete( - '/containers/{0}/acls'.format(container_id), + '/containers/{0}/acl'.format(container_id), expect_errors=False) content = resp.json self.assertIsNone(content) # make sure there is no response - self.assertEqual(resp.status_int, 200) + self.assertEqual(200, resp.status_int) acl_map = _get_acl_map(container_id, is_secret=False) self.assertFalse(acl_map) - def test_delete_container_acls_invalid_container_should_fail(self): - """Delete acls should fail when invalid container id is provided.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - resp = self.app.delete( - '/containers/{0}/acls'.format(uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_delete_container_acls_no_acl_defined_should_fail(self): - """Delete acls should fail when no acls are defined for a container.""" + def test_delete_container_acls_no_acl_defined_should_pass(self): + """Delete acls should pass when no acls are defined for a container.""" container_id, _ = create_container(self.app) resp = self.app.delete( - '/containers/{0}/acls'.format(container_id), - expect_errors=True) - self.assertEqual(resp.status_int, 404) + '/containers/{0}/acl'.format(container_id), + expect_errors=False) + self.assertEqual(200, resp.status_int) - def test_invoke_container_acls_put_should_fail(self): + def test_invoke_container_acls_head_should_fail(self): """PUT request to container acls URI is not supported.""" container_id, _ = create_container(self.app) - resp = self.app.put( - '/containers/{0}/acls'.format(container_id), + resp = self.app.head( + '/containers/{0}/acl/'.format(container_id), expect_errors=True) - self.assertEqual(resp.status_int, 405) - - -class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): - - def test_get_container_acl_with_valid_acl_id(self): - """Read a specific acl by id and compare with request values.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - acl_map = _get_acl_map(container_id, is_secret=False) - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_map['read']['id']), - expect_errors=False) - acl = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual('read', acl['operation']) - self.assertFalse(acl['creator-only']) - self.assertIsNone(acl.get('users')) - - def test_get_container_acl_invalid_acl_should_fail(self): - """Get acl request should fail with invalid acl id.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_get_container_acl_no_acl_defined_should_fail(self): - """Get acl request should fail with no acls defined for container.""" - container_id, _ = create_container(self.app) - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_update_container_acl_modify_all(self): - """Modify existing ACL users by using specific acl id.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2']) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=['u1', 'u2', 'u5'], read_creator_only=True) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/containers/{0}/acls/{1}'.format(container_id, acl_id), - acl['acl_ref']) - # Check creator_only is False when not provided - self.assertTrue(acl['creator-only']) - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - def test_update_container_acl_with_duplicate_user_ids(self): - """Modify existing ACL users by using specific acl id.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2']) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=['u1', 'u2', 'u1', 'u5'], read_creator_only=True) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/containers/{0}/acls/{1}'.format(container_id, acl_id), - acl['acl_ref']) - # Check creator_only is False when not provided - self.assertTrue(acl['creator-only']) - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - def test_update_container_acl_modify_only_users(self): - """Modifying existing acl's user list and creator-only flag.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - read_creator_only=True) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=['u1', 'u2', 'u5']) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - self.assertTrue(acl['creator-only']) - - # Now remove existing all users from ACL list - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=[]) - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNone(acl.get('users')) - self.assertTrue(acl['creator-only']) - - def test_update_container_acl_modify_creator_only(self): - """Modifying only creator_only flag for existing acl by its id.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - read_creator_only=True) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_creator_only=False) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertEqual(set(['u1', 'u2']), set(acl['users'])) - self.assertFalse(acl['creator-only']) - - def test_update_container_acl_invalid_acl_should_fail(self): - """Update should fail when invalid acl id is provided.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - resp = update_acl( - self.app, 'containers', container_id, uuid.uuid4().hex, - read_creator_only=False, expect_errors=True) - - self.assertEqual(resp.status_int, 404) - - def test_update_container_acl_when_no_acls_defined_should_fail(self): - """Update should fail when no container acls are defined.""" - container_id, _ = create_container(self.app) - resp = update_acl( - self.app, 'containers', container_id, uuid.uuid4().hex, - read_creator_only=False, expect_errors=True) - - self.assertEqual(resp.status_int, 404) - - def test_delete_secret_acl_with_valid_acl_id(self): - """Delete existing acls for a given container.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - acl_map = _get_acl_map(container_id, is_secret=False) - - read_acl_id = acl_map['read'].id - - resp = self.app.delete( - '/containers/{0}/acls/{1}'.format(container_id, read_acl_id), - expect_errors=False) - content = resp.json - self.assertIsNone(content) # make sure there is no response - self.assertEqual(resp.status_int, 200) - acl_map = _get_acl_map(container_id, is_secret=False) - self.assertIsNone(acl_map.get('read')) # read acl should be deleted - - def test_delete_secret_acls_invalid_secret_should_fail(self): - """Delete acls should fail when invalid secret id is provided.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - resp = self.app.delete( - '/containers/{0}/acls/{1}'.format(container_id, uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_invoke_container_acl_put_should_fail(self): - """PUT for specific acl id is not supported.""" - container_id, _ = create_container(self.app) - resp = self.app.put( - '/containers/{0}/acls/{1}'.format(container_id, uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 405) + self.assertEqual(405, resp.status_int) # ----------------------- Helper Functions --------------------------- @@ -1078,23 +709,23 @@ def create_acls(app, entity_type, entity_id, read_user_ids=None, return manage_acls(app, entity_type, entity_id, read_user_ids=read_user_ids, read_creator_only=read_creator_only, - is_update=False, + is_update=False, partial_update=False, expect_errors=expect_errors) def update_acls(app, entity_type, entity_id, read_user_ids=None, - read_creator_only=None, + read_creator_only=None, partial_update=False, expect_errors=False): return manage_acls(app, entity_type, entity_id, read_user_ids=read_user_ids, read_creator_only=read_creator_only, - is_update=True, + is_update=True, partial_update=partial_update, expect_errors=expect_errors) def manage_acls(app, entity_type, entity_id, read_user_ids=None, read_creator_only=None, is_update=False, - expect_errors=False): + partial_update=None, expect_errors=False): request = {} _append_acl_to_request(request, 'read', read_user_ids, @@ -1103,41 +734,17 @@ def manage_acls(app, entity_type, entity_id, read_user_ids=None, cleaned_request = {key: val for key, val in request.items() if val is not None} - if is_update: + if is_update and partial_update: # patch for partial update resp = app.patch_json( - '/{0}/{1}/acls'.format(entity_type, entity_id), + '/{0}/{1}/acl'.format(entity_type, entity_id), cleaned_request, expect_errors=expect_errors) - else: - resp = app.post_json( - '/{0}/{1}/acls'.format(entity_type, entity_id), + else: # put (for create or complete update) + resp = app.put_json( + '/{0}/{1}/acl'.format(entity_type, entity_id), cleaned_request, expect_errors=expect_errors) - acl_ids = None - if resp.status_int in (201, 200): - acl_ids = [] - for acl in resp.json: - acl_ids.append(_get_entity_id(acl)) - - return (resp, acl_ids) - - -def update_acl(app, entity_type, entity_id, acl_id, read_user_ids=None, - read_creator_only=None, expect_errors=False): - request = {} - - _append_acl_to_request(request, 'read', read_user_ids, - read_creator_only) - - cleaned_request = {key: val for key, val in request.items() - if val is not None} - - resp = app.patch_json( - '/{0}/{1}/acls/{2}'.format(entity_type, entity_id, acl_id), - cleaned_request, - expect_errors=expect_errors) - return resp @@ -1151,10 +758,6 @@ def _append_acl_to_request(req, operation, user_ids=None, creator_only=None): req[operation] = op_dict -def _get_entity_id(acl): - return os.path.split(acl.get('acl_ref', ''))[-1] - - def _get_acl_map(entity_id, is_secret=True): """Provides map of operation: acl_entity for given entity id.""" if is_secret: diff --git a/barbican/tests/api/controllers/test_orders.py b/barbican/tests/api/controllers/test_orders.py index 568150df5..3f2ae6870 100644 --- a/barbican/tests/api/controllers/test_orders.py +++ b/barbican/tests/api/controllers/test_orders.py @@ -489,7 +489,7 @@ class WhenCreatingStoredKeyOrders(utils.BarbicanAPIBaseTestCase, ) if add_acls: - _, _ = test_acls.manage_acls( + test_acls.manage_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u3', 'u4'], read_creator_only=read_creator_only, diff --git a/etc/barbican/policy.json b/etc/barbican/policy.json index 1ea4ec388..26ba77231 100644 --- a/etc/barbican/policy.json +++ b/etc/barbican/policy.json @@ -20,6 +20,10 @@ "secret_non_private_read": "rule:all_users and rule:secret_project_match and not rule:secret_private_read", "secret_decrypt_non_private_read": "rule:all_but_audit and rule:secret_project_match and not rule:secret_private_read", "container_non_private_read": "rule:all_users and rule:container_project_match and not rule:container_private_read", + "secret_project_admin": "rule:admin and rule:secret_project_match", + "secret_project_creator": "rule:creator_role and rule:secret_project_match and rule:secret_creator_user", + "container_project_admin": "rule:admin and rule:container_project_match", + "container_project_creator": "rule:creator_role and rule:container_project_match and rule:container_creator_user", "version:get": "@", "secret:decrypt": "rule:secret_decrypt_non_private_read or rule:secret_creator_user or rule:secret_acl_read", @@ -57,18 +61,10 @@ "certificate_authority:unset_global_preferred": "rule:admin", "certificate_authority:get_global_preferred": "rule:all_users", "certificate_authority:get_preferred_ca": "rule:all_users", - "secret_acls:post": "rule:admin_or_creator_role and rule:secret_project_match", - "secret_acls:patch": "rule:admin_or_creator_role and rule:secret_project_match", - "secret_acls:delete": "rule:admin_or_creator_role and rule:secret_project_match", + "secret_acls:put_patch": "rule:secret_project_admin or rule:secret_project_creator", + "secret_acls:delete": "rule:secret_project_admin or rule:secret_project_creator", "secret_acls:get": "rule:all_but_audit and rule:secret_project_match", - "secret_acl:get": "rule:all_but_audit and rule:secret_project_match", - "secret_acl:patch": "rule:admin_or_creator_role and rule:secret_project_match", - "secret_acl:delete": "rule:admin_or_creator_role and rule:secret_project_match", - "container_acls:post": "rule:admin_or_creator_role and rule:container_project_match", - "container_acls:patch": "rule:admin_or_creator_role and rule:container_project_match", - "container_acls:delete": "rule:admin_or_creator_role and rule:container_project_match", - "container_acls:get": "rule:all_but_audit and rule:container_project_match", - "container_acl:get": "rule:all_but_audit and rule:container_project_match", - "container_acl:patch": "rule:admin_or_creator_role and rule:container_project_match", - "container_acl:delete": "rule:admin_or_creator_role and rule:container_project_match" + "container_acls:put_patch": "rule:container_project_admin or rule:container_project_creator", + "container_acls:delete": "rule:container_project_admin or rule:container_project_creator", + "container_acls:get": "rule:all_but_audit and rule:container_project_match" }