diff --git a/barbican/api/controllers/acls.py b/barbican/api/controllers/acls.py index 7568e0d79..50ed5b223 100644 --- a/barbican/api/controllers/acls.py +++ b/barbican/api/controllers/acls.py @@ -13,6 +13,7 @@ import itertools import pecan +import six from barbican import api from barbican.api import controllers @@ -26,113 +27,21 @@ from barbican.model import repositories as repo LOG = utils.getLogger(__name__) -def _acls_not_found(acl_for=None): - """Throw exception indicating no secret or container acls found.""" - pecan.abort(404, u._('Not Found. Sorry no ACL found for given {0}.'). - format(acl_for)) +def _convert_acl_to_response_format(acl, acls_dict): + fields = acl.to_dict_fields() + operation = fields['operation'] + + acl_data = {} # dict for each acl operation data + + acl_data['creator-only'] = fields['creator_only'] + acl_data['users'] = fields.get('users', []) + acl_data['created'] = fields['created'] + acl_data['updated'] = fields['updated'] + + acls_dict[operation] = acl_data -def _acl_not_found(): - """Throw exception indicating no secret or container acl found.""" - pecan.abort(404, u._('Not Found. Sorry no ACL found for given id.')) - - -def _acls_already_exist(): - """Throw exception indicating secret or container acls already exist.""" - pecan.abort(400, u._('Existing ACL cannot be updated with POST method.')) - - -def _acl_operation_update_not_allowed(): - """Throw exception indicating existing secret or container acl operation. - - Operation cannot be changed for an existing ACL. Allowed change is list of - users and/or creator_only flag change. - """ - pecan.abort(400, u._("Existing ACL's operation cannot be updated.")) - - -class SecretACLController(controllers.ACLMixin): - """Handles a SecretACL entity retrieval and update requests.""" - - def __init__(self, acl_id, secret_project_id, secret): - self.acl_id = acl_id - self.secret_project_id = secret_project_id - self.secret = secret - self.acl_repo = repo.get_secret_acl_repository() - self.validator = validators.ACLValidator() - - def get_acl_tuple(self, req, **kwargs): - d = {'project_id': self.secret_project_id} - return 'secret', d - - @pecan.expose(generic=True) - def index(self): - pecan.abort(405) # HTTP 405 Method Not Allowed as default - - @index.when(method='GET', template='json') - @controllers.handle_exceptions(u._('SecretACL retrieval')) - @controllers.enforce_rbac('secret_acl:get') - def on_get(self, external_project_id): - secret_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - - if not secret_acl: - _acl_not_found() - - dict_fields = secret_acl.to_dict_fields() - - return hrefs.convert_acl_to_hrefs(dict_fields) - - @index.when(method='PATCH', template='json') - @controllers.handle_exceptions(u._('A SecretACL Update')) - @controllers.enforce_rbac('secret_acl:patch') - @controllers.enforce_content_types(['application/json']) - def on_patch(self, external_project_id, **kwargs): - """Handles existing secret ACL update for given acl id.""" - - data = api.load_body(pecan.request, validator=self.validator) - LOG.debug('Start SecretACLController on_patch...%s', data) - - secret_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - if not secret_acl: - _acl_not_found() - - # Make sure request acl operation matches with acl stored in db - operation = secret_acl.operation - input_acl = data.get(operation) - if not input_acl: - _acl_operation_update_not_allowed() - - creator_only = input_acl.get('creator-only') - user_ids = input_acl.get('users') - if creator_only is not None: - secret_acl.creator_only = creator_only - - self.acl_repo.create_or_replace_from(self.secret, - secret_acl=secret_acl, - user_ids=user_ids) - acl_ref = hrefs.convert_acl_to_hrefs(secret_acl.to_dict_fields()) - - return {'acl_ref': acl_ref['acl_ref']} - - @index.when(method='DELETE', template='json') - @controllers.handle_exceptions(u._('SecretACL deletion')) - @controllers.enforce_rbac('secret_acl:delete') - def on_delete(self, external_project_id, **kwargs): - """Deletes existing ACL by acl_id provided in URI.""" - - secret_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - - if not secret_acl: - _acl_not_found() - - self.acl_repo.delete_entity_by_id(entity_id=self.acl_id, - external_project_id=None) +DEFAULT_ACL = {'read': {'creator-only': False}} class SecretACLsController(controllers.ACLMixin): @@ -146,14 +55,10 @@ class SecretACLsController(controllers.ACLMixin): self.validator = validators.ACLValidator() def get_acl_tuple(self, req, **kwargs): - d = {'project_id': self.secret_project_id} + d = {'project_id': self.secret_project_id, + 'creator_id': self.secret.creator_id} return 'secret', d - @pecan.expose() - def _lookup(self, acl_id, *remainder): - return SecretACLController(acl_id, self.secret_project_id, - self.secret), remainder - @pecan.expose(generic=True) def index(self, **kwargs): pecan.abort(405) # HTTP 405 Method Not Allowed as default @@ -165,60 +70,11 @@ class SecretACLsController(controllers.ACLMixin): LOG.debug('Start secret ACL on_get ' 'for secret-ID %s:', self.secret.id) - return self._return_acl_hrefs(self.secret.id) - - @index.when(method='POST', template='json') - @controllers.handle_exceptions(u._('SecretACL(s) creation')) - @controllers.enforce_rbac('secret_acls:post') - @controllers.enforce_content_types(['application/json']) - def on_post(self, external_project_id, **kwargs): - """Handles secret acls creation request. - - Once a set of ACLs exists for a given secret, it can only be updated - via PATCH method. In create, multiple operation ACL payload can be - specified as mentioned in sample below. - - { - "read":{ - "users":[ - "5ecb18f341894e94baca9e8c7b6a824a" - ] - }, - "write":{ - "users":[ - "20b63d71f90848cf827ee48074f213b7", - "5ecb18f341894e94baca9e8c7b6a824a" - ], - "creator-only":false - } - } - """ - - count = self.acl_repo.get_count(self.secret.id) - LOG.debug('Count of existing ACL on_post is [%s] ' - ' for secret-ID %s:', count, self.secret.id) - if count > 0: - _acls_already_exist() - - data = api.load_body(pecan.request, validator=self.validator) - LOG.debug('Start on_post...%s', data) - - for operation in itertools.ifilter(lambda x: data.get(x), - validators.ACL_OPERATIONS): - input_cr_only = data[operation].get('creator-only') - creator_only = True if input_cr_only else False - new_acl = models.SecretACL(self.secret.id, operation=operation, - creator_only=creator_only) - self.acl_repo.create_or_replace_from( - self.secret, secret_acl=new_acl, - user_ids=data[operation].get('users')) - - pecan.response.status = 201 - return self._return_acl_hrefs(self.secret.id) + return self._return_acl_list_response(self.secret.id) @index.when(method='PATCH', template='json') @controllers.handle_exceptions(u._('SecretACL(s) Update')) - @controllers.enforce_rbac('secret_acls:patch') + @controllers.enforce_rbac('secret_acls:put_patch') @controllers.enforce_content_types(['application/json']) def on_patch(self, external_project_id, **kwargs): """Handles update of existing secret acl requests. @@ -244,13 +100,6 @@ class SecretACLsController(controllers.ACLMixin): } } """ - - count = self.acl_repo.get_count(self.secret.id) - LOG.debug('Count of existing ACL on_secret is [%s] ' - ' for secret-ID %s:', count, self.secret.id) - if count == 0: - _acls_not_found("secret") - data = api.load_body(pecan.request, validator=self.validator) LOG.debug('Start on_patch...%s', data) @@ -271,7 +120,72 @@ class SecretACLsController(controllers.ACLMixin): self.acl_repo.create_or_replace_from(self.secret, secret_acl=s_acl, user_ids=user_ids) - return self._return_acl_hrefs(self.secret.id) + acl_ref = '{0}/acl'.format( + hrefs.convert_secret_to_href(self.secret.id)) + return {'acl_ref': acl_ref} + + @index.when(method='PUT', template='json') + @controllers.handle_exceptions(u._('SecretACL(s) Update')) + @controllers.enforce_rbac('secret_acls:put_patch') + @controllers.enforce_content_types(['application/json']) + def on_put(self, external_project_id, **kwargs): + """Handles update of existing secret acl requests. + + Replaces existing secret ACL(s) with input ACL(s) data. Existing + ACL operation not specified in input are removed as part of update. + For missing creator-only in ACL, false is used as default. + In update, multiple operation ACL payload can be specified as + mentioned in sample below. A specific ACL can be updated by its + own id via SecretACLController patch request. + + { + "read":{ + "users":[ + "5ecb18f341894e94baca9e8c7b6a824a", + "20b63d71f90848cf827ee48074f213b7", + "c7753f8da8dc4fbea75730ab0b6e0ef4" + ] + }, + "write":{ + "users":[ + "5ecb18f341894e94baca9e8c7b6a824a" + ], + "creator-only":true + } + } + + Every secret, by default, has an implicit ACL in case client has not + defined an explicit ACL. That default ACL definition, DEFAULT_ACL, + signifies that a secret by default has project based access i.e. client + with necessary roles on secret project can access the secret. That's + why when ACL is added to a secret, it always returns 200 (and not 201) + indicating existence of implicit ACL on a secret. + """ + data = api.load_body(pecan.request, validator=self.validator) + LOG.debug('Start on_put...%s', data) + + existing_acls_map = {acl.operation: acl for acl in + self.secret.secret_acls} + for operation in itertools.ifilter(lambda x: data.get(x), + validators.ACL_OPERATIONS): + creator_only = data[operation].get('creator-only', False) + user_ids = data[operation].get('users', []) + s_acl = None + if operation in existing_acls_map: # update if matching acl exists + s_acl = existing_acls_map.pop(operation) + s_acl.creator_only = creator_only + else: + s_acl = models.SecretACL(self.secret.id, operation=operation, + creator_only=creator_only) + self.acl_repo.create_or_replace_from(self.secret, secret_acl=s_acl, + user_ids=user_ids) + # delete remaining existing acls as they are not present in input. + for acl in six.itervalues(existing_acls_map): + self.acl_repo.delete_entity_by_id(entity_id=acl.id, + external_project_id=None) + acl_ref = '{0}/acl'.format( + hrefs.convert_secret_to_href(self.secret.id)) + return {'acl_ref': acl_ref} @index.when(method='DELETE', template='json') @controllers.handle_exceptions(u._('SecretACL(s) deletion')) @@ -279,125 +193,37 @@ class SecretACLsController(controllers.ACLMixin): def on_delete(self, external_project_id, **kwargs): count = self.acl_repo.get_count(self.secret.id) - if count == 0: - _acls_not_found("secret") - self.acl_repo.delete_acls_for_secret(self.secret) + if count > 0: + self.acl_repo.delete_acls_for_secret(self.secret) - def _return_acl_hrefs(self, secret_id): + def _return_acl_list_response(self, secret_id): result = self.acl_repo.get_by_secret_id(secret_id) - if not result: - _acls_not_found("secret") - else: - acl_recs = [hrefs.convert_acl_to_hrefs(acl.to_dict_fields()) - for acl in result] - return [{'acl_ref': acl['acl_ref']} for acl in acl_recs] - - -class ContainerACLController(controllers.ACLMixin): - """Handles a ContainerACL entity retrieval and update requests.""" - - def __init__(self, acl_id, container_project_id, container): - self.acl_id = acl_id - self.container_project_id = container_project_id - self.container = container - self.acl_repo = repo.get_container_acl_repository() - self.validator = validators.ACLValidator() - - def get_acl_tuple(self, req, **kwargs): - d = {'project_id': self.container_project_id} - return 'container', d - - @pecan.expose(generic=True) - def index(self): - pecan.abort(405) # HTTP 405 Method Not Allowed as default - - @index.when(method='GET', template='json') - @controllers.handle_exceptions(u._('ContainerACL retrieval')) - @controllers.enforce_rbac('container_acl:get') - def on_get(self, external_project_id): - container_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - - if not container_acl: - _acl_not_found() - - dict_fields = container_acl.to_dict_fields() - - return hrefs.convert_acl_to_hrefs(dict_fields) - - @index.when(method='PATCH', template='json') - @controllers.handle_exceptions(u._('A ContainerACL Update')) - @controllers.enforce_rbac('container_acl:patch') - @controllers.enforce_content_types(['application/json']) - def on_patch(self, external_project_id, **kwargs): - """Handles existing container ACL update for given acl id.""" - data = api.load_body(pecan.request, validator=self.validator) - LOG.debug('Start ContainerACLController on_patch...%s', data) - - container_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - if not container_acl: - _acl_not_found() - - # Make sure request acl operation matches with acl stored in db - operation = container_acl.operation - input_acl = data.get(operation) - if not input_acl: - _acl_operation_update_not_allowed() - - creator_only = input_acl.get('creator-only') - user_ids = input_acl.get('users') - if creator_only is not None: - container_acl.creator_only = creator_only - - self.acl_repo.create_or_replace_from(self.container, - container_acl=container_acl, - user_ids=user_ids) - acl_ref = hrefs.convert_acl_to_hrefs(container_acl.to_dict_fields()) - - return {'acl_ref': acl_ref['acl_ref']} - - @index.when(method='DELETE', template='json') - @controllers.handle_exceptions(u._('ContainerACL deletion')) - @controllers.enforce_rbac('container_acl:delete') - def on_delete(self, external_project_id, **kwargs): - """Deletes existing ACL by acl_id provided in URI.""" - - container_acl = self.acl_repo.get( - entity_id=self.acl_id, - suppress_exception=True) - if not container_acl: - _acl_not_found() - - self.acl_repo.delete_entity_by_id(entity_id=self.acl_id, - external_project_id=None) + acls_data = {} + if result: + for acl in result: + _convert_acl_to_response_format(acl, acls_data) + if not acls_data: + acls_data = DEFAULT_ACL.copy() + return acls_data class ContainerACLsController(controllers.ACLMixin): """Handles ContainerACL requests by a given container id.""" - def __init__(self, container_id): - self.container_id = container_id - self.container = None + def __init__(self, container): + self.container = container + self.container_id = container.id self.acl_repo = repo.get_container_acl_repository() self.container_repo = repo.get_container_repository() self.validator = validators.ACLValidator() - self.container_project_id = None + self.container_project_id = container.project.external_id def get_acl_tuple(self, req, **kwargs): - self._assert_id_and_set_container(suppress_exception=True) - d = {'project_id': self.container_project_id} + d = {'project_id': self.container_project_id, + 'creator_id': self.container.creator_id} return 'container', d - @pecan.expose() - def _lookup(self, acl_id, *remainder): - self._assert_id_and_set_container() - return (ContainerACLController(acl_id, self.container_project_id, - self.container), remainder) - @pecan.expose(generic=True) def index(self, **kwargs): pecan.abort(405) # HTTP 405 Method Not Allowed as default @@ -406,68 +232,14 @@ class ContainerACLsController(controllers.ACLMixin): @controllers.handle_exceptions(u._('ContainerACL(s) retrieval')) @controllers.enforce_rbac('container_acls:get') def on_get(self, external_project_id, **kw): - self._assert_id_and_set_container(suppress_exception=True) LOG.debug('Start container ACL on_get ' 'for container-ID %s:', self.container_id) - if not self.container: - controllers.containers.container_not_found() - return self._return_acl_hrefs(self.container.id) - - @index.when(method='POST', template='json') - @controllers.handle_exceptions(u._('ContainerACL(s) creation')) - @controllers.enforce_rbac('container_acls:post') - @controllers.enforce_content_types(['application/json']) - def on_post(self, external_project_id, **kwargs): - """Handles container acls creation request. - - Once a set of ACLs exists for a given container, it can only be updated - via PATCH method. In create, multiple operation ACL payload can be - specified as mentioned in sample below. - - { - "read":{ - "users":[ - "5ecb18f341894e94baca9e8c7b6a824a" - ] - }, - "write":{ - "users":[ - "20b63d71f90848cf827ee48074f213b7", - "5ecb18f341894e94baca9e8c7b6a824a" - ], - "creator-only":false - } - } - """ - self._assert_id_and_set_container() - - count = self.acl_repo.get_count(self.container.id) - LOG.debug('Count of existing ACL on_post is [%s] ' - ' for container-ID %s:', count, self.container.id) - if count > 0: - _acls_already_exist() - - data = api.load_body(pecan.request, validator=self.validator) - LOG.debug('Start ContainerACLsController on_post...%s', data) - - for operation in itertools.ifilter(lambda x: data.get(x), - validators.ACL_OPERATIONS): - in_cr_only = data[operation].get('creator-only') - creator_only = True if in_cr_only else False - new_acl = models.ContainerACL(self.container.id, - operation=operation, - creator_only=creator_only) - self.acl_repo.create_or_replace_from( - self.container, container_acl=new_acl, - user_ids=data[operation].get('users')) - - pecan.response.status = 201 - return self._return_acl_hrefs(self.container.id) + return self._return_acl_list_response(self.container.id) @index.when(method='PATCH', template='json') @controllers.handle_exceptions(u._('ContainerACL(s) Update')) - @controllers.enforce_rbac('container_acls:patch') + @controllers.enforce_rbac('container_acls:put_patch') @controllers.enforce_content_types(['application/json']) def on_patch(self, external_project_id, **kwargs): """Handles update of existing container acl requests. @@ -493,13 +265,6 @@ class ContainerACLsController(controllers.ACLMixin): } } """ - self._assert_id_and_set_container() - count = self.acl_repo.get_count(self.container.id) - LOG.debug('Count of existing ACL on_patch is [%s] ' - ' for container-ID %s:', count, self.container.id) - if count == 0: - _acls_not_found("container") - data = api.load_body(pecan.request, validator=self.validator) LOG.debug('Start ContainerACLsController on_patch...%s', data) @@ -521,42 +286,91 @@ class ContainerACLsController(controllers.ACLMixin): container_acl=c_acl, user_ids=user_ids) - return self._return_acl_hrefs(self.container.id) + acl_ref = '{0}/acl'.format( + hrefs.convert_container_to_href(self.container.id)) + return {'acl_ref': acl_ref} + + @index.when(method='PUT', template='json') + @controllers.handle_exceptions(u._('ContainerACL(s) Update')) + @controllers.enforce_rbac('container_acls:put_patch') + @controllers.enforce_content_types(['application/json']) + def on_put(self, external_project_id, **kwargs): + """Handles update of existing container acl requests. + + Replaces existing container ACL(s) with input ACL(s) data. Existing + ACL operation not specified in input are removed as part of update. + For missing creator-only in ACL, false is used as default. + In update, multiple operation ACL payload can be specified as + mentioned in sample below. A specific ACL can be updated by its + own id via ContainerACLController patch request. + + { + "read":{ + "users":[ + "5ecb18f341894e94baca9e8c7b6a824a", + "20b63d71f90848cf827ee48074f213b7", + "c7753f8da8dc4fbea75730ab0b6e0ef4" + ] + }, + "write":{ + "users":[ + "5ecb18f341894e94baca9e8c7b6a824a" + ], + "creator-only":true + } + } + + Every container, by default, has an implicit ACL in case client has not + defined an explicit ACL. That default ACL definition, DEFAULT_ACL, + signifies that a container by default has project based access i.e. + client with necessary roles on container project can access the + container. That's why when ACL is added to a container, it always + returns 200 (and not 201) indicating existence of implicit ACL on a + container. + """ + + data = api.load_body(pecan.request, validator=self.validator) + LOG.debug('Start ContainerACLsController on_put...%s', data) + + existing_acls_map = {acl.operation: acl for acl in + self.container.container_acls} + for operation in itertools.ifilter(lambda x: data.get(x), + validators.ACL_OPERATIONS): + creator_only = data[operation].get('creator-only', False) + user_ids = data[operation].get('users', []) + if operation in existing_acls_map: # update if matching acl exists + c_acl = existing_acls_map.pop(operation) + c_acl.creator_only = creator_only + else: + c_acl = models.ContainerACL(self.container.id, + operation=operation, + creator_only=creator_only) + self.acl_repo.create_or_replace_from(self.container, + container_acl=c_acl, + user_ids=user_ids) + # delete remaining existing acls as they are not present in input. + for acl in six.itervalues(existing_acls_map): + self.acl_repo.delete_entity_by_id(entity_id=acl.id, + external_project_id=None) + acl_ref = '{0}/acl'.format( + hrefs.convert_container_to_href(self.container.id)) + return {'acl_ref': acl_ref} @index.when(method='DELETE', template='json') @controllers.handle_exceptions(u._('ContainerACL(s) deletion')) @controllers.enforce_rbac('container_acls:delete') def on_delete(self, external_project_id, **kwargs): - - self._assert_id_and_set_container() count = self.acl_repo.get_count(self.container_id) - if count == 0: - _acls_not_found("container") + if count > 0: + self.acl_repo.delete_acls_for_container(self.container) - self.acl_repo.delete_acls_for_container(self.container) - - def _assert_id_and_set_container(self, suppress_exception=False): - """Checks whether container_id is valid or not. - - Whether container is associated with token's project is not needed as - that check is now made via policy rule. - """ - if self.container: - return - controllers.assert_is_valid_uuid_from_uri(self.container_id) - self.container = self.container_repo.get_container_by_id( - entity_id=self.container_id, suppress_exception=True) - if not self.container and not suppress_exception: - controllers.containers.container_not_found() - if self.container: - self.container_project_id = self.container.project.external_id - - def _return_acl_hrefs(self, container_id): + def _return_acl_list_response(self, container_id): result = self.acl_repo.get_by_container_id(container_id) - if not result: - _acls_not_found("container") - else: - acl_recs = [hrefs.convert_acl_to_hrefs(acl.to_dict_fields()) - for acl in result] - return [{'acl_ref': acl['acl_ref']} for acl in acl_recs] + acls_data = {} + if result: + for acl in result: + _convert_acl_to_response_format(acl, acls_data) + if not acls_data: + acls_data = DEFAULT_ACL.copy() + return acls_data diff --git a/barbican/api/controllers/containers.py b/barbican/api/controllers/containers.py index e95728be8..cfb3f8cad 100644 --- a/barbican/api/controllers/containers.py +++ b/barbican/api/controllers/containers.py @@ -47,7 +47,7 @@ class ContainerController(controllers.ACLMixin): self.validator = validators.ContainerValidator() self.consumers = consumers.ContainerConsumersController( self.container_id) - self.acls = acls.ContainerACLsController(self.container_id) + self.acl = acls.ContainerACLsController(self.container) def get_acl_tuple(self, req, **kwargs): d = self.get_acl_dict_for_user(req, self.container.container_acls) diff --git a/barbican/api/controllers/secrets.py b/barbican/api/controllers/secrets.py index 62f8da8cd..654917a73 100644 --- a/barbican/api/controllers/secrets.py +++ b/barbican/api/controllers/secrets.py @@ -70,7 +70,7 @@ class SecretController(controllers.ACLMixin): @pecan.expose() def _lookup(self, sub_resource, *remainder): - if sub_resource == 'acls': + if sub_resource == 'acl': return acls.SecretACLsController(self.secret), remainder else: pecan.abort(405) # only 'acl' as sub-resource is supported diff --git a/barbican/common/hrefs.py b/barbican/common/hrefs.py index de887b9c4..9f9bc6645 100644 --- a/barbican/common/hrefs.py +++ b/barbican/common/hrefs.py @@ -9,7 +9,6 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - from barbican.common import utils @@ -52,42 +51,6 @@ def convert_certificate_authority_to_href(ca_id): return convert_resource_id_to_href('cas', ca_id) -def convert_secret_acl_to_href(secret_id, acl_id): - """Convert the secret acl ID to a HATEOS-style href.""" - secret_href = convert_secret_to_href(secret_id) - return secret_href + '/acls/' + acl_id - - -def convert_container_acl_to_href(container_id, acl_id): - """Convert the container acl ID to a HATEOS-style href.""" - container_href = convert_container_to_href(container_id) - return container_href + '/acls/' + acl_id - - -def convert_acl_to_hrefs(fields): - acl_id = fields['acl_id'] - if 'secret_id' in fields: - fields['acl_ref'] = convert_secret_acl_to_href(fields['secret_id'], - acl_id) - del fields['acl_id'] - fields['secret_ref'] = convert_secret_to_href(fields['secret_id']) - del fields['secret_id'] - - if 'container_id' in fields: - fields['acl_ref'] = convert_container_acl_to_href( - fields['container_id'], acl_id) - del fields['acl_id'] - fields['container_ref'] = convert_container_to_href( - fields['container_id']) - del fields['container_id'] - - if 'creator_only' in fields: - fields['creator-only'] = fields['creator_only'] - del fields['creator_only'] - - return fields - - # TODO(hgedikli) handle list of fields in here def convert_to_hrefs(fields): """Convert id's within a fields dict to HATEOS-style hrefs.""" diff --git a/barbican/model/repositories.py b/barbican/model/repositories.py index 1e6c35197..4bf7f0cec 100755 --- a/barbican/model/repositories.py +++ b/barbican/model/repositories.py @@ -1710,6 +1710,7 @@ class SecretACLRepo(BaseRepo): session=None): session = self.get_session(session) secret.updated_at = timeutils.utcnow() + secret_acl.updated_at = timeutils.utcnow() secret.secret_acls.append(secret_acl) secret.save(session=session) @@ -1804,7 +1805,7 @@ class ContainerACLRepo(BaseRepo): user_ids=None, session=None): session = self.get_session(session) container.updated_at = timeutils.utcnow() - + container_acl.updated_at = timeutils.utcnow() container.container_acls.append(container_acl) container.save(session=session) diff --git a/barbican/tests/api/controllers/test_acls.py b/barbican/tests/api/controllers/test_acls.py index d93264507..6ceb97355 100644 --- a/barbican/tests/api/controllers/test_acls.py +++ b/barbican/tests/api/controllers/test_acls.py @@ -15,13 +15,10 @@ import os import uuid +from barbican.api.controllers import acls from barbican.model import repositories from barbican.tests import utils -project_repo = repositories.get_project_repository() -secrets_repo = repositories.get_secret_repository() -tkey_repo = repositories.get_transport_key_repository() - class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): @@ -29,15 +26,13 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): """Create secret acls and compare stored values with request data.""" secret_uuid, _ = create_secret(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'secrets', secret_uuid, read_user_ids=['u1', 'u2']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) @@ -46,481 +41,286 @@ class WhenTestingSecretACLsResource(utils.BarbicanAPIBaseTestCase): """Should allow creating acls for a new secret with creator-only.""" secret_uuid, _ = create_secret(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'secrets', secret_uuid, read_creator_only=True) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) self.assertTrue(acl_map['read']['creator_only']) - def test_create_new_secret_acls_with_creator_only_false(self): - """Should allow creating acls for a new secret with creator-only.""" - secret_uuid, _ = create_secret(self.app) - - resp, acls = create_acls( - self.app, 'secrets', secret_uuid, - read_creator_only=False) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) - acl_map = _get_acl_map(secret_uuid, is_secret=True) - self.assertFalse(acl_map['read']['creator_only']) - - def test_new_secret_acls_with_invalid_creator_should_fail(self): + def test_new_secret_acls_with_invalid_creator_only_value_should_fail(self): """Should fail if creator-only flag is provided as string value.""" secret_uuid, _ = create_secret(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'secrets', secret_uuid, read_creator_only="False", read_user_ids=['u1', 'u3', 'u4'], expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) + self.assertEqual(400, resp.status_int) - resp, acls = create_acls( + resp = create_acls( self.app, 'secrets', secret_uuid, read_creator_only="None", expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) + self.assertEqual(400, resp.status_int) - def test_new_secret_acls_with_missing_secret_id_should_fail(self): - """Should fail if invalid secret id is provided in create request.""" - resp, acls = create_acls( - self.app, 'secrets', uuid.uuid4().hex, - read_creator_only="False", - read_user_ids=['u1', 'u3', 'u4'], - expect_errors=True) - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) - - def test_existing_acl_post_request_should_fail(self): - """Should fail if trying to add acls for secret with existing acls.""" - secret_uuid, _ = create_secret(self.app) - resp, acls = create_acls( - self.app, 'secrets', secret_uuid, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - - resp, acls = create_acls( - self.app, 'secrets', secret_uuid, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4'], - expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) - self.assertIn("Existing ACL cannot be updated", - resp.json['description']) - - def test_get_secret_acls_with_valid_secret_id(self): - """Read existing acls for a given valid secret id.""" + def test_get_secret_acls_with_complete_acl_data(self): + """Read existing acls for a with complete acl data.""" secret_id, _ = create_secret(self.app) - _, _ = create_acls( + create_acls( + self.app, 'secrets', secret_id, + read_user_ids=['u1', 'u3'], read_creator_only=True) + + resp = self.app.get( + '/secrets/{0}/acl'.format(secret_id), + expect_errors=False) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + + self.assertIn('read', resp.json) + self.assertTrue(resp.json['read']['creator-only']) + self.assertIsNotNone(resp.json['read']['created']) + self.assertIsNotNone(resp.json['read']['updated']) + self.assertEqual(set(['u1', 'u3']), set(resp.json['read']['users'])) + + def test_get_secret_acls_with_creator_only_data(self): + """Read existing acls for acl when only creator-only flag is set.""" + secret_id, _ = create_secret(self.app) + create_acls( self.app, 'secrets', secret_id, read_creator_only=True) resp = self.app.get( - '/secrets/{0}/acls'.format(secret_id), + '/secrets/{0}/acl'.format(secret_id), expect_errors=False) - acls = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual(1, len(acls)) - for acl_ref in acls: - self.assertIn('/secrets/{0}/acls'.format(secret_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + + self.assertEqual([], resp.json['read']['users']) + self.assertTrue(resp.json['read']['creator-only']) + self.assertIsNotNone(resp.json['read']['created']) + self.assertIsNotNone(resp.json['read']['updated']) def test_get_secret_acls_invalid_secret_should_fail(self): - """Get secret acls should fail for invalid secret id.""" + """Get secret acls should fail for invalid secret id. + + This test applies to all secret ACLs methods as secret entity is + populated in same manner for get, put, patch, delete methods. + """ secret_id, _ = create_secret(self.app) - _, _ = create_acls( + create_acls( self.app, 'secrets', secret_id, read_creator_only=False, read_user_ids=['u1', 'u3', 'u4']) resp = self.app.get( - '/secrets/{0}/acls'.format(uuid.uuid4().hex), + '/secrets/{0}/acl'.format(uuid.uuid4().hex), expect_errors=True) - self.assertEqual(resp.status_int, 404) + self.assertEqual(404, resp.status_int) - def test_get_secret_acls_no_acls_defined_should_fail(self): + def test_get_secret_acls_no_acls_defined_return_default_acl(self): + """Get secret acls should pass when no acls defined for a secret.""" + secret_id, _ = create_secret(self.app) + + resp = self.app.get( + '/secrets/{0}/acl'.format(secret_id), + expect_errors=True) + self.assertEqual(200, resp.status_int) + self.assertEqual(acls.DEFAULT_ACL, resp.json) + + def test_get_secret_acls_with_incorrect_uri_should_fail(self): """Get secret acls should fail when no acls defined for a secret.""" secret_id, _ = create_secret(self.app) resp = self.app.get( - '/secrets/{0}/acls'.format(secret_id), + '/secrets/{0}/incorrect_acls'.format(secret_id), expect_errors=True) - self.assertEqual(resp.status_int, 404) + self.assertEqual(405, resp.status_int) - def test_update_secret_acls_modify_all_acls(self): - """Acls update where only user ids list is modified.""" + def test_full_update_secret_acls_modify_creator_only_value(self): + """ACLs full update with user ids where creator-only flag modified.""" secret_uuid, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_uuid, - read_user_ids=['u1', 'u2']) - - resp, acls = update_acls( - self.app, 'secrets', secret_uuid, - read_user_ids=['u1', 'u2', 'u5']) - - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) - acl_map = _get_acl_map(secret_uuid, is_secret=True) - # Check creator_only is False when not provided - self.assertFalse(acl_map['read']['creator_only']) - self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) - - def test_update_secret_acls_modify_creator_only_values(self): - """Acls update where user ids and creator-only flag is modified.""" - secret_uuid, _ = create_secret(self.app) - - _, _ = create_acls( + create_acls( self.app, 'secrets', secret_uuid, read_user_ids=['u1', 'u2'], read_creator_only=True) - resp, acls = update_acls( - self.app, 'secrets', secret_uuid, + # update acls with no user input so it should delete existing users + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=False, read_creator_only=False) - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) self.assertFalse(acl_map['read']['creator_only']) - self.assertIn('u1', acl_map['read'].to_dict_fields()['users']) - self.assertIn('u2', acl_map['read'].to_dict_fields()['users']) + self.assertIsNone(acl_map['read'].to_dict_fields().get('users')) - def test_update_secret_acls_partial_modify_read_users_only(self): + def test_full_update_secret_acls_modify_users_only(self): + """ACLs full update where specific operation acl is modified.""" + secret_uuid, _ = create_secret(self.app) + + create_acls( + self.app, 'secrets', secret_uuid, + read_user_ids=['u1', 'u2'], read_creator_only=True) + + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=False, + read_user_ids=['u1', 'u3', 'u5']) + + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) + acl_map = _get_acl_map(secret_uuid, is_secret=True) + self.assertFalse(acl_map['read']['creator_only']) + self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + + def test_full_update_secret_acls_with_read_users_only(self): + """Acls full update where specific operation acl is modified.""" + secret_uuid, _ = create_secret(self.app) + + create_acls( + self.app, 'secrets', secret_uuid, + read_user_ids=['u1', 'u2']) + + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # ACL api does not support 'list' operation so making direct db update + # in acl operation data to make sure full update removes this existing + # ACL. + secret_acl = acl_map['read'] + secret_acl.operation = 'list' + secret_acl.save() + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # check 'list' operation is there in db + self.assertIn('list', acl_map) + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=False, + read_user_ids=['u1', 'u3', 'u5']) + + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # make sure 'list' operation is no longer after full update + self.assertNotIn('list', acl_map) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) + + def test_partial_update_secret_acls_with_read_users_only(self): """Acls update where specific operation acl is modified.""" secret_uuid, _ = create_secret(self.app) - _, _ = create_acls( + create_acls( self.app, 'secrets', secret_uuid, read_user_ids=['u1', 'u2']) - resp, acls = update_acls( - self.app, 'secrets', secret_uuid, + acl_map = _get_acl_map(secret_uuid, is_secret=True) + + secret_acl = acl_map['read'] + secret_acl.operation = 'list' + secret_acl.save() + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # check 'list' operation is there in db + self.assertIn('list', acl_map) + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=True, read_user_ids=['u1', 'u3', 'u5']) - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/secrets/{0}/acls'.format(secret_uuid), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) + acl_map = _get_acl_map(secret_uuid, is_secret=True) + # For partial update, existing other operation ACL is not tocuhed. + self.assertIn('list', acl_map) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['list'].to_dict_fields()['users'])) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + + def test_partial_update_secret_acls_when_no_acls_defined_should_pass(self): + """Acls partial update pass when no acls are defined for a secret. + + Partial update (PATCH) is applicable even when no explicit ACL has been + set as by default every secret has implicit acl definition. If PUT + is used, then new ACL is created instead. + """ + secret_id, _ = create_secret(self.app) + + resp = update_acls( + self.app, 'secrets', secret_id, partial_update=True, + read_user_ids=['u1', 'u3', 'u5'], expect_errors=False) + + self.assertEqual(200, resp.status_int) + acl_map = _get_acl_map(secret_id, is_secret=True) + self.assertFalse(acl_map['read']['creator_only']) + + def test_partial_update_secret_acls_modify_creator_only_values(self): + """Acls partial update where creator-only flag is modified.""" + secret_uuid, _ = create_secret(self.app) + + create_acls( + self.app, 'secrets', secret_uuid, + read_user_ids=['u1', 'u2'], + read_creator_only=True) + + resp = update_acls( + self.app, 'secrets', secret_uuid, partial_update=True, + read_creator_only=False) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/secrets/{0}/acl'.format(secret_uuid), + resp.json['acl_ref']) acl_map = _get_acl_map(secret_uuid, is_secret=True) self.assertFalse(acl_map['read']['creator_only']) - self.assertIn('u1', acl_map['read'].to_dict_fields()['users']) - self.assertIn('u3', acl_map['read'].to_dict_fields()['users']) - self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) - self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) - - def test_update_secret_acls_invalid_secret_should_fail(self): - """Acls update should fail when invalid secret is provided.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2']) - - resp, acls = update_acls( - self.app, 'secrets', uuid.uuid4().hex, - read_user_ids=['u1', 'u3', 'u5'], expect_errors=True) - - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) - - def test_update_secret_acls_when_no_acls_defined_should_fail(self): - """Acls update should fail when acls are defined for a secret.""" - secret_id, _ = create_secret(self.app) - - resp, acls = update_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u3', 'u5'], expect_errors=True) - - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['read'].to_dict_fields()['users'])) def test_delete_secret_acls_with_valid_secret_id(self): """Delete existing acls for a given secret.""" secret_id, _ = create_secret(self.app) - _, _ = create_acls( + create_acls( self.app, 'secrets', secret_id, read_creator_only=True) resp = self.app.delete( - '/secrets/{0}/acls'.format(secret_id), + '/secrets/{0}/acl'.format(secret_id), expect_errors=False) content = resp.json self.assertIsNone(content) # make sure there is no response - self.assertEqual(resp.status_int, 200) + self.assertEqual(200, resp.status_int) acl_map = _get_acl_map(secret_id, is_secret=True) self.assertFalse(acl_map) - def test_delete_secret_acls_invalid_secret_should_fail(self): - """Delete acls should fail when invalid secret id is provided.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False) - - resp = self.app.delete( - '/secrets/{0}/acls'.format(uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_delete_secret_acls_no_acl_defined_should_fail(self): - """Delete acls should fail when no acls are defined for a secret.""" + def test_delete_secret_acls_no_acl_defined_should_pass(self): + """Delete acls should pass when no acls are defined for a secret.""" secret_id, _ = create_secret(self.app) resp = self.app.delete( - '/secrets/{0}/acls'.format(secret_id), - expect_errors=True) - self.assertEqual(resp.status_int, 404) + '/secrets/{0}/acl'.format(secret_id), + expect_errors=False) + self.assertEqual(200, resp.status_int) - def test_invoke_secret_acls_put_should_fail(self): + def test_invoke_secret_acls_head_should_fail(self): """Should fail as put request to secret acls URI is not supported.""" secret_id, _ = create_secret(self.app) - resp = self.app.put( - '/secrets/{0}/acls'.format(secret_id), + resp = self.app.head( + '/secrets/{0}/acl'.format(secret_id), expect_errors=True) - self.assertEqual(resp.status_int, 405) - - -class WhenTestingSecretACLResource(utils.BarbicanAPIBaseTestCase): - - def test_get_secret_acl_with_valid_acl_id(self): - """Read a specific acl by id and compare with request values.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, - acl_map['read']['id']), - expect_errors=False) - acl = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual('read', acl['operation']) - self.assertFalse(acl['creator-only']) - self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users'])) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, - acl_map['read']['id']), - expect_errors=False) - acl = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual('read', acl['operation']) - self.assertFalse(acl['creator-only']) - self.assertEqual(set(['u1', 'u3', 'u4']), set(acl['users'])) - - def test_get_secret_acl_invalid_acl_should_fail(self): - """Get acl request should fail with invalid acl id.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4']) - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, - uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_get_secret_acl_no_acl_defined_should_fail(self): - """Get acl request should fail with no acls defined for secret.""" - secret_id, _ = create_secret(self.app) - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, - uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_update_secret_acl_modify_all(self): - """Modify existing ACL users by using specific acl id.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=['u1', 'u2', 'u5'], read_creator_only=True) - - self.assertEqual(resp.status_int, 200) - acl_ref = resp.json - self.assertIn('/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - acl_ref['acl_ref']) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - acl['acl_ref']) - # Check creator_only is False when not provided - self.assertTrue(acl['creator-only']) - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - def test_update_secret_acl_with_duplicate_user_ids(self): - """Modify existing ACL users by using specific acl id.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2']) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=['u1', 'u2', 'u1', 'u5'], read_creator_only=True) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - acl['acl_ref']) - # Check creator_only is False when not provided - self.assertTrue(acl['creator-only']) - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - def test_update_secret_acl_modify_only_users(self): - """Modifying existing acl's user list and creator-only flag.""" - secret_id, _ = create_secret(self.app) - - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_user_ids=['u1', 'u2'], - read_creator_only=True) - - acl_map = _get_acl_map(secret_id, is_secret=True) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=['u1', 'u2', 'u5']) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - self.assertTrue(acl['creator-only']) - - # Now remove existing all users from ACL list - resp = update_acl( - self.app, 'secrets', secret_id, acl_id, - read_user_ids=[]) - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/secrets/{0}/acls/{1}'.format(secret_id, acl_id), - expect_errors=False) - - acl = resp.json - self.assertIsNone(acl.get('users')) - self.assertTrue(acl['creator-only']) - - def test_update_secret_acl_invalid_acl_should_fail(self): - """Update should fail when invalid acl id is provided.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False) - - resp = update_acl( - self.app, 'secrets', secret_id, uuid.uuid4().hex, - read_creator_only=False, expect_errors=True) - - self.assertEqual(resp.status_int, 404) - - def test_update_secret_acl_when_no_acls_defined_should_fail(self): - """Update should fail when no secret acls are defined.""" - secret_id, _ = create_secret(self.app) - resp = update_acl( - self.app, 'secrets', secret_id, uuid.uuid4().hex, - read_creator_only=False, expect_errors=True) - - self.assertEqual(resp.status_int, 404) - - def test_delete_secret_acl_with_valid_acl_id(self): - """Delete existing acls for a given secret.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False) - - acl_map = _get_acl_map(secret_id, is_secret=True) - - read_acl_id = acl_map['read'].id - - resp = self.app.delete( - '/secrets/{0}/acls/{1}'.format(secret_id, read_acl_id), - expect_errors=False) - content = resp.json - self.assertIsNone(content) # make sure there is no response - self.assertEqual(resp.status_int, 200) - acl_map = _get_acl_map(secret_id, is_secret=True) - self.assertIsNone(acl_map.get('read')) # read acl should be deleted - - def test_delete_secret_acls_invalid_secret_should_fail(self): - """Delete acls should fail when invalid secret id is provided.""" - secret_id, _ = create_secret(self.app) - _, _ = create_acls( - self.app, 'secrets', secret_id, - read_creator_only=False) - - resp = self.app.delete( - '/secrets/{0}/acls/{1}'.format(secret_id, uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_invoke_secret_acl_put_should_fail(self): - """PUT for specific acl id is not supported.""" - secret_id, _ = create_secret(self.app) - resp = self.app.put( - '/secrets/{0}/acls/{1}'.format(secret_id, uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 405) + self.assertEqual(405, resp.status_int) class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): @@ -529,33 +329,32 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): """Create container acls and compare db values with request data.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u2']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['read'].to_dict_fields()['users'])) def test_create_new_container_acls_with_creator_only_false(self): """Should allow creating acls for a new container with creator-only.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_creator_only=False, read_user_ids=['u1', 'u3', 'u4']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) self.assertFalse(acl_map['read']['creator_only']) @@ -563,453 +362,285 @@ class WhenTestingContainerAclsResource(utils.BarbicanAPIBaseTestCase): """Should allow creating acls for a new container with creator-only.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_creator_only=True, read_user_ids=['u1', 'u3', 'u4']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) self.assertTrue(acl_map['read']['creator_only']) - def test_new_container_acls_with_invalid_creator_should_fail(self): + def test_container_acls_with_invalid_creator_only_value_should_fail(self): """Should fail if creator-only flag is provided as string value.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_creator_only="False", read_user_ids=['u1', 'u3', 'u4'], expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) + self.assertEqual(400, resp.status_int) - resp, acls = create_acls( + resp = create_acls( self.app, 'containers', container_id, read_creator_only="None", expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) + self.assertEqual(400, resp.status_int) - def test_new_container_acls_with_missing_container_id_should_fail(self): - """Create acls request should fail for invalid container id.""" - resp, acls = create_acls( - self.app, 'containers', uuid.uuid4().hex, - read_creator_only="False", - read_user_ids=['u1', 'u3', 'u4'], - expect_errors=True) - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) - - def test_existing_acl_post_request_should_fail(self): - """Should fail when adding acls for container with existing acls.""" + def test_get_container_acls_with_complete_acl_data(self): + """Read existing acls for a with complete acl data.""" container_id, _ = create_container(self.app) - resp, acls = create_acls( + create_acls( self.app, 'containers', container_id, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4']) - self.assertEqual(resp.status_int, 201) - self.assertIsNotNone(acls) - - resp, acls = create_acls( - self.app, 'containers', container_id, - read_creator_only=False, - read_user_ids=['u1', 'u3', 'u4'], - expect_errors=True) - self.assertEqual(resp.status_int, 400) - self.assertIsNone(acls) - self.assertIn("Existing ACL cannot be updated", - resp.json['description']) - - def test_get_container_acls_with_valid_container_id(self): - """Read existing acls for a given valid container id.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) + read_user_ids=['u1', 'u3'], read_creator_only=True) resp = self.app.get( - '/containers/{0}/acls'.format(container_id), + '/containers/{0}/acl'.format(container_id), expect_errors=False) - acls = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual(1, len(acls)) - for acl_ref in acls: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) - def test_get_container_acls_invalid_container_should_fail(self): - """Get container acls should fail for invalid secret id.""" + self.assertIn('read', resp.json) + self.assertTrue(resp.json['read']['creator-only']) + self.assertIsNotNone(resp.json['read']['created']) + self.assertIsNotNone(resp.json['read']['updated']) + self.assertEqual(set(['u1', 'u3']), set(resp.json['read']['users'])) + + def test_get_container_acls_with_creator_only_data(self): + """Read existing acls for acl when only creator-only flag is set.""" container_id, _ = create_container(self.app) - _, _ = create_acls( + create_acls( + self.app, 'containers', container_id, + read_creator_only=True) + + resp = self.app.get( + '/containers/{0}/acl'.format(container_id), + expect_errors=False) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + + self.assertEqual([], resp.json['read']['users']) + self.assertTrue(resp.json['read']['creator-only']) + self.assertIsNotNone(resp.json['read']['created']) + self.assertIsNotNone(resp.json['read']['updated']) + + def test_get_container_acls_invalid_container_id_should_fail(self): + """Get container acls should fail for invalid secret id. + + This test applies to all container ACLs methods as secret entity is + populated in same manner for get, put, patch, delete methods. + """ + container_id, _ = create_container(self.app) + create_acls( self.app, 'containers', container_id, read_creator_only=False) resp = self.app.get( - '/containers/{0}/acls'.format(uuid.uuid4().hex), + '/containers/{0}/acl'.format(uuid.uuid4().hex), expect_errors=True) - self.assertEqual(resp.status_int, 404) + self.assertEqual(404, resp.status_int) - def test_get_container_acls_no_acls_defined_should_fail(self): - """Get container acls should fail when no acls defined for a secret.""" + def test_get_container_acls_invalid_non_uuid_secret_should_fail(self): + """Get container acls should fail for invalid (non-uuid) id.""" + container_id, _ = create_container(self.app) + create_acls( + self.app, 'containers', container_id, + read_creator_only=False) + + resp = self.app.get( + '/containers/{0}/acl'.format('my_container_id'), + expect_errors=True) + self.assertEqual(404, resp.status_int) + + def test_get_container_acls_no_acls_defined_return_default_acl(self): + """Get container acls should pass when no acls defined for a secret.""" container_id, _ = create_container(self.app) resp = self.app.get( - '/containers/{0}/acls'.format(container_id), + '/containers/{0}/acl'.format(container_id), expect_errors=True) - self.assertEqual(resp.status_int, 404) + self.assertEqual(200, resp.status_int) + self.assertEqual(acls.DEFAULT_ACL, resp.json) - def test_update_container_acls_modify_all_acls(self): + def test_full_update_container_acls_modify_all_acls(self): """Acls update where only user ids list is modified.""" container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, + create_acls( + self.app, 'containers', container_id, read_creator_only=True, read_user_ids=['u1', 'u2']) - resp, acls = update_acls( - self.app, 'containers', container_id, + resp = update_acls( + self.app, 'containers', container_id, partial_update=False, read_user_ids=['u1', 'u2', 'u5']) - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) # Check creator_only is False when not provided self.assertFalse(acl_map['read']['creator_only']) self.assertIn('u5', acl_map['read'].to_dict_fields()['users']) - def test_update_container_acls_modify_creator_only_values(self): + def test_full_update_container_acls_modify_creator_only_values(self): """Acls update where user ids and creator-only flag is modified.""" container_id, _ = create_container(self.app) - _, _ = create_acls( + create_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u2']) - resp, acls = update_acls( - self.app, 'containers', container_id, + resp = update_acls( + self.app, 'containers', container_id, partial_update=False, read_creator_only=True) - self.assertEqual(resp.status_int, 200) - self.assertIsNotNone(acls) - self.assertEqual(1, len(acls)) - for acl_ref in resp.json: - self.assertIn('/containers/{0}/acls'.format(container_id), - acl_ref['acl_ref']) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) acl_map = _get_acl_map(container_id, is_secret=False) self.assertTrue(acl_map['read']['creator_only']) + self.assertIsNone(acl_map['read'].to_dict_fields().get('users')) - def test_update_container_acls_invalid_secret_should_fail(self): - """Acls update should fail when invalid container is provided.""" + def test_full_update_container_acls_with_read_users_only(self): + """Acls full update where specific operation acl is modified.""" container_id, _ = create_container(self.app) - _, _ = create_acls( + + create_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u2']) - resp, acls = update_acls( - self.app, 'containers', uuid.uuid4().hex, - read_user_ids=['u1', 'u3', 'u5'], expect_errors=True) + acl_map = _get_acl_map(container_id, is_secret=False) + # ACL api does not support 'list' operation so making direct db update + # in acl operation data to make sure full update removes this existing + # ACL. + container_acl = acl_map['read'] + container_acl.operation = 'list' + container_acl.save() + acl_map = _get_acl_map(container_id, is_secret=False) + # check 'list' operation is there in db + self.assertIn('list', acl_map) + resp = update_acls( + self.app, 'containers', container_id, partial_update=False, + read_user_ids=['u1', 'u3', 'u5']) - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) + # make sure 'list' operation is no longer after full update + self.assertNotIn('list', acl_map) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + self.assertNotIn('u2', acl_map['read'].to_dict_fields()['users']) - def test_update_container_acls_when_no_acls_defined_should_fail(self): - """Acls update should fail when acls are defined for a container.""" + def test_partial_update_container_acls_with_read_users_only(self): + """Acls update where specific operation acl is modified.""" container_id, _ = create_container(self.app) - resp, acls = update_acls( + create_acls( self.app, 'containers', container_id, - read_user_ids=['u1', 'u3', 'u5'], expect_errors=True) + read_user_ids=['u1', 'u2']) - self.assertEqual(resp.status_int, 404) - self.assertIsNone(acls) + acl_map = _get_acl_map(container_id, is_secret=False) + + secret_acl = acl_map['read'] + secret_acl.operation = 'list' + secret_acl.save() + acl_map = _get_acl_map(container_id, is_secret=False) + # check 'list' operation is there in db + self.assertIn('list', acl_map) + resp = update_acls( + self.app, 'containers', container_id, partial_update=True, + read_user_ids=['u1', 'u3', 'u5']) + + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) + # For partial update, existing other operation ACL is not tocuhed. + self.assertIn('list', acl_map) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['list'].to_dict_fields()['users'])) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u3', 'u5']), + set(acl_map['read'].to_dict_fields()['users'])) + + def test_partial_update_container_acls_when_no_acls_defined(self): + """Acls partial update pass when no acls are defined for container. + + Partial update (PATCH) is applicable even when no explicit ACL has been + set as by default every container has implicit acl definition. If PUT + is used, then new ACL is created instead. + """ + container_id, _ = create_container(self.app) + + resp = update_acls( + self.app, 'containers', container_id, partial_update=True, + read_user_ids=['u1', 'u3', 'u5'], expect_errors=False) + + self.assertEqual(200, resp.status_int) + acl_map = _get_acl_map(container_id, is_secret=False) + self.assertFalse(acl_map['read']['creator_only']) + + def test_partial_update_container_acls_modify_creator_only_values(self): + """Acls partial update where creator-only flag is modified.""" + container_id, _ = create_container(self.app) + + create_acls( + self.app, 'containers', container_id, + read_user_ids=['u1', 'u2'], + read_creator_only=True) + + resp = update_acls( + self.app, 'containers', container_id, partial_update=True, + read_creator_only=False) + self.assertEqual(200, resp.status_int) + self.assertIsNotNone(resp.json) + self.assertIn('/containers/{0}/acl'.format(container_id), + resp.json['acl_ref']) + acl_map = _get_acl_map(container_id, is_secret=False) + self.assertFalse(acl_map['read']['creator_only']) + self.assertEqual(set(['u1', 'u2']), + set(acl_map['read'].to_dict_fields()['users'])) def test_delete_container_acls_with_valid_container_id(self): """Delete existing acls for a given container.""" container_id, _ = create_container(self.app) - _, _ = create_acls( + create_acls( self.app, 'containers', container_id, read_creator_only=False) resp = self.app.delete( - '/containers/{0}/acls'.format(container_id), + '/containers/{0}/acl'.format(container_id), expect_errors=False) content = resp.json self.assertIsNone(content) # make sure there is no response - self.assertEqual(resp.status_int, 200) + self.assertEqual(200, resp.status_int) acl_map = _get_acl_map(container_id, is_secret=False) self.assertFalse(acl_map) - def test_delete_container_acls_invalid_container_should_fail(self): - """Delete acls should fail when invalid container id is provided.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - resp = self.app.delete( - '/containers/{0}/acls'.format(uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_delete_container_acls_no_acl_defined_should_fail(self): - """Delete acls should fail when no acls are defined for a container.""" + def test_delete_container_acls_no_acl_defined_should_pass(self): + """Delete acls should pass when no acls are defined for a container.""" container_id, _ = create_container(self.app) resp = self.app.delete( - '/containers/{0}/acls'.format(container_id), - expect_errors=True) - self.assertEqual(resp.status_int, 404) + '/containers/{0}/acl'.format(container_id), + expect_errors=False) + self.assertEqual(200, resp.status_int) - def test_invoke_container_acls_put_should_fail(self): + def test_invoke_container_acls_head_should_fail(self): """PUT request to container acls URI is not supported.""" container_id, _ = create_container(self.app) - resp = self.app.put( - '/containers/{0}/acls'.format(container_id), + resp = self.app.head( + '/containers/{0}/acl/'.format(container_id), expect_errors=True) - self.assertEqual(resp.status_int, 405) - - -class WhenTestingContainerAclResource(utils.BarbicanAPIBaseTestCase): - - def test_get_container_acl_with_valid_acl_id(self): - """Read a specific acl by id and compare with request values.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - acl_map = _get_acl_map(container_id, is_secret=False) - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_map['read']['id']), - expect_errors=False) - acl = resp.json - self.assertEqual(resp.status_int, 200) - self.assertEqual('read', acl['operation']) - self.assertFalse(acl['creator-only']) - self.assertIsNone(acl.get('users')) - - def test_get_container_acl_invalid_acl_should_fail(self): - """Get acl request should fail with invalid acl id.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_get_container_acl_no_acl_defined_should_fail(self): - """Get acl request should fail with no acls defined for container.""" - container_id, _ = create_container(self.app) - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_update_container_acl_modify_all(self): - """Modify existing ACL users by using specific acl id.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2']) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=['u1', 'u2', 'u5'], read_creator_only=True) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/containers/{0}/acls/{1}'.format(container_id, acl_id), - acl['acl_ref']) - # Check creator_only is False when not provided - self.assertTrue(acl['creator-only']) - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - def test_update_container_acl_with_duplicate_user_ids(self): - """Modify existing ACL users by using specific acl id.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2']) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=['u1', 'u2', 'u1', 'u5'], read_creator_only=True) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertIn('/containers/{0}/acls/{1}'.format(container_id, acl_id), - acl['acl_ref']) - # Check creator_only is False when not provided - self.assertTrue(acl['creator-only']) - self.assertEqual('read', acl['operation']) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - - def test_update_container_acl_modify_only_users(self): - """Modifying existing acl's user list and creator-only flag.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - read_creator_only=True) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=['u1', 'u2', 'u5']) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertEqual(set(['u1', 'u2', 'u5']), set(acl['users'])) - self.assertTrue(acl['creator-only']) - - # Now remove existing all users from ACL list - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_user_ids=[]) - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNone(acl.get('users')) - self.assertTrue(acl['creator-only']) - - def test_update_container_acl_modify_creator_only(self): - """Modifying only creator_only flag for existing acl by its id.""" - container_id, _ = create_container(self.app) - - _, _ = create_acls( - self.app, 'containers', container_id, - read_user_ids=['u1', 'u2'], - read_creator_only=True) - - acl_map = _get_acl_map(container_id, is_secret=False) - acl_id = acl_map['read']['id'] - - # updating read, list operation and adding write operation acl - # Update should be for 'read' operation ACL only. Others are ignored. - resp = update_acl( - self.app, 'containers', container_id, acl_id, - read_creator_only=False) - - self.assertEqual(resp.status_int, 200) - - resp = self.app.get( - '/containers/{0}/acls/{1}'.format(container_id, - acl_id), - expect_errors=False) - acl = resp.json - self.assertIsNotNone(acl) - self.assertEqual(set(['u1', 'u2']), set(acl['users'])) - self.assertFalse(acl['creator-only']) - - def test_update_container_acl_invalid_acl_should_fail(self): - """Update should fail when invalid acl id is provided.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - resp = update_acl( - self.app, 'containers', container_id, uuid.uuid4().hex, - read_creator_only=False, expect_errors=True) - - self.assertEqual(resp.status_int, 404) - - def test_update_container_acl_when_no_acls_defined_should_fail(self): - """Update should fail when no container acls are defined.""" - container_id, _ = create_container(self.app) - resp = update_acl( - self.app, 'containers', container_id, uuid.uuid4().hex, - read_creator_only=False, expect_errors=True) - - self.assertEqual(resp.status_int, 404) - - def test_delete_secret_acl_with_valid_acl_id(self): - """Delete existing acls for a given container.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - acl_map = _get_acl_map(container_id, is_secret=False) - - read_acl_id = acl_map['read'].id - - resp = self.app.delete( - '/containers/{0}/acls/{1}'.format(container_id, read_acl_id), - expect_errors=False) - content = resp.json - self.assertIsNone(content) # make sure there is no response - self.assertEqual(resp.status_int, 200) - acl_map = _get_acl_map(container_id, is_secret=False) - self.assertIsNone(acl_map.get('read')) # read acl should be deleted - - def test_delete_secret_acls_invalid_secret_should_fail(self): - """Delete acls should fail when invalid secret id is provided.""" - container_id, _ = create_container(self.app) - _, _ = create_acls( - self.app, 'containers', container_id, - read_creator_only=False) - - resp = self.app.delete( - '/containers/{0}/acls/{1}'.format(container_id, uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 404) - - def test_invoke_container_acl_put_should_fail(self): - """PUT for specific acl id is not supported.""" - container_id, _ = create_container(self.app) - resp = self.app.put( - '/containers/{0}/acls/{1}'.format(container_id, uuid.uuid4().hex), - expect_errors=True) - self.assertEqual(resp.status_int, 405) + self.assertEqual(405, resp.status_int) # ----------------------- Helper Functions --------------------------- @@ -1078,23 +709,23 @@ def create_acls(app, entity_type, entity_id, read_user_ids=None, return manage_acls(app, entity_type, entity_id, read_user_ids=read_user_ids, read_creator_only=read_creator_only, - is_update=False, + is_update=False, partial_update=False, expect_errors=expect_errors) def update_acls(app, entity_type, entity_id, read_user_ids=None, - read_creator_only=None, + read_creator_only=None, partial_update=False, expect_errors=False): return manage_acls(app, entity_type, entity_id, read_user_ids=read_user_ids, read_creator_only=read_creator_only, - is_update=True, + is_update=True, partial_update=partial_update, expect_errors=expect_errors) def manage_acls(app, entity_type, entity_id, read_user_ids=None, read_creator_only=None, is_update=False, - expect_errors=False): + partial_update=None, expect_errors=False): request = {} _append_acl_to_request(request, 'read', read_user_ids, @@ -1103,41 +734,17 @@ def manage_acls(app, entity_type, entity_id, read_user_ids=None, cleaned_request = {key: val for key, val in request.items() if val is not None} - if is_update: + if is_update and partial_update: # patch for partial update resp = app.patch_json( - '/{0}/{1}/acls'.format(entity_type, entity_id), + '/{0}/{1}/acl'.format(entity_type, entity_id), cleaned_request, expect_errors=expect_errors) - else: - resp = app.post_json( - '/{0}/{1}/acls'.format(entity_type, entity_id), + else: # put (for create or complete update) + resp = app.put_json( + '/{0}/{1}/acl'.format(entity_type, entity_id), cleaned_request, expect_errors=expect_errors) - acl_ids = None - if resp.status_int in (201, 200): - acl_ids = [] - for acl in resp.json: - acl_ids.append(_get_entity_id(acl)) - - return (resp, acl_ids) - - -def update_acl(app, entity_type, entity_id, acl_id, read_user_ids=None, - read_creator_only=None, expect_errors=False): - request = {} - - _append_acl_to_request(request, 'read', read_user_ids, - read_creator_only) - - cleaned_request = {key: val for key, val in request.items() - if val is not None} - - resp = app.patch_json( - '/{0}/{1}/acls/{2}'.format(entity_type, entity_id, acl_id), - cleaned_request, - expect_errors=expect_errors) - return resp @@ -1151,10 +758,6 @@ def _append_acl_to_request(req, operation, user_ids=None, creator_only=None): req[operation] = op_dict -def _get_entity_id(acl): - return os.path.split(acl.get('acl_ref', ''))[-1] - - def _get_acl_map(entity_id, is_secret=True): """Provides map of operation: acl_entity for given entity id.""" if is_secret: diff --git a/barbican/tests/api/controllers/test_orders.py b/barbican/tests/api/controllers/test_orders.py index 568150df5..3f2ae6870 100644 --- a/barbican/tests/api/controllers/test_orders.py +++ b/barbican/tests/api/controllers/test_orders.py @@ -489,7 +489,7 @@ class WhenCreatingStoredKeyOrders(utils.BarbicanAPIBaseTestCase, ) if add_acls: - _, _ = test_acls.manage_acls( + test_acls.manage_acls( self.app, 'containers', container_id, read_user_ids=['u1', 'u3', 'u4'], read_creator_only=read_creator_only, diff --git a/etc/barbican/policy.json b/etc/barbican/policy.json index 1ea4ec388..26ba77231 100644 --- a/etc/barbican/policy.json +++ b/etc/barbican/policy.json @@ -20,6 +20,10 @@ "secret_non_private_read": "rule:all_users and rule:secret_project_match and not rule:secret_private_read", "secret_decrypt_non_private_read": "rule:all_but_audit and rule:secret_project_match and not rule:secret_private_read", "container_non_private_read": "rule:all_users and rule:container_project_match and not rule:container_private_read", + "secret_project_admin": "rule:admin and rule:secret_project_match", + "secret_project_creator": "rule:creator_role and rule:secret_project_match and rule:secret_creator_user", + "container_project_admin": "rule:admin and rule:container_project_match", + "container_project_creator": "rule:creator_role and rule:container_project_match and rule:container_creator_user", "version:get": "@", "secret:decrypt": "rule:secret_decrypt_non_private_read or rule:secret_creator_user or rule:secret_acl_read", @@ -57,18 +61,10 @@ "certificate_authority:unset_global_preferred": "rule:admin", "certificate_authority:get_global_preferred": "rule:all_users", "certificate_authority:get_preferred_ca": "rule:all_users", - "secret_acls:post": "rule:admin_or_creator_role and rule:secret_project_match", - "secret_acls:patch": "rule:admin_or_creator_role and rule:secret_project_match", - "secret_acls:delete": "rule:admin_or_creator_role and rule:secret_project_match", + "secret_acls:put_patch": "rule:secret_project_admin or rule:secret_project_creator", + "secret_acls:delete": "rule:secret_project_admin or rule:secret_project_creator", "secret_acls:get": "rule:all_but_audit and rule:secret_project_match", - "secret_acl:get": "rule:all_but_audit and rule:secret_project_match", - "secret_acl:patch": "rule:admin_or_creator_role and rule:secret_project_match", - "secret_acl:delete": "rule:admin_or_creator_role and rule:secret_project_match", - "container_acls:post": "rule:admin_or_creator_role and rule:container_project_match", - "container_acls:patch": "rule:admin_or_creator_role and rule:container_project_match", - "container_acls:delete": "rule:admin_or_creator_role and rule:container_project_match", - "container_acls:get": "rule:all_but_audit and rule:container_project_match", - "container_acl:get": "rule:all_but_audit and rule:container_project_match", - "container_acl:patch": "rule:admin_or_creator_role and rule:container_project_match", - "container_acl:delete": "rule:admin_or_creator_role and rule:container_project_match" + "container_acls:put_patch": "rule:container_project_admin or rule:container_project_creator", + "container_acls:delete": "rule:container_project_admin or rule:container_project_creator", + "container_acls:get": "rule:all_but_audit and rule:container_project_match" }