Added functional tests for CLI commands. Added functional tests for client API. Modified ACL client API functional test not to use behavior class as per recent change in secrets, containers, and orders. Change-Id: Ibacf33e834daa2c70ef1b0032495bbe443957cac
570 lines
22 KiB
Python
570 lines
22 KiB
Python
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import uuid
|
|
|
|
from testtools import testcase
|
|
|
|
from functionaltests import utils
|
|
from functionaltests.client import base
|
|
from functionaltests.common import cleanup
|
|
|
|
from barbicanclient import exceptions
|
|
|
|
|
|
create_secret_defaults_data = {
|
|
"name": "AES key",
|
|
"expiration": "2018-02-28T19:14:44.180394",
|
|
"algorithm": "aes",
|
|
"bit_length": 256,
|
|
"mode": "cbc",
|
|
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
|
|
"payload_content_type": "application/octet-stream",
|
|
"payload_content_encoding": "base64",
|
|
}
|
|
|
|
create_container_defaults_data = {
|
|
"name": "containername",
|
|
"secrets": {}
|
|
}
|
|
|
|
create_container_rsa_data = {
|
|
"name": "rsacontainer",
|
|
}
|
|
|
|
|
|
ACL_SUBMIT_DATA_POSITIVE = {
|
|
'secret_no_users_access_flag': {
|
|
'users': None, 'project_access': True,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'expect_users': [], 'expect_project_access': True},
|
|
|
|
'secret_users_missing_access_flag': {
|
|
'users': ['u1', 'u2'], 'project_access': None,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'expect_users': ['u1', 'u2'], 'expect_project_access': True},
|
|
|
|
'container_users_no_project_access': {
|
|
'users': ['u1', 'u2', 'u3'], 'project_access': False,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
|
|
|
|
'container_empty_users_with_project_access': {
|
|
'users': [], 'project_access': True,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'expect_users': [], 'expect_project_access': True},
|
|
}
|
|
|
|
ACL_SUBMIT_DATA_NEGATIVE = {
|
|
'secret_users_incorrect_access_flag': {
|
|
'users': ['u1', 'u2'], 'project_access': 'Incorrect_flag',
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'expect_users': ['u1', 'u2'], 'expect_project_access': True,
|
|
'expect_error': True, 'error_code': 400},
|
|
|
|
'container_incorrect_users_as_str': {
|
|
'users': 'u1', 'project_access': True,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'expect_users': ['u1'], 'expect_project_access': True,
|
|
'expect_error': True, 'error_code': None, 'error_class': ValueError},
|
|
}
|
|
|
|
ACL_DELETE_DATA = {
|
|
'secret_no_users_access_flag': {
|
|
'users': None, 'project_access': True, 'create_acl': True,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'expect_users': [], 'expect_project_access': True},
|
|
|
|
'secret_users_missing_access_flag': {
|
|
'users': ['u1', 'u2'], 'project_access': None, 'create_acl': True,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'expect_users': [], 'expect_project_access': True},
|
|
|
|
'container_users_no_project_access': {
|
|
'users': ['u1', 'u2', 'u3'], 'project_access': False,
|
|
'create_acl': True,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'expect_users': [], 'expect_project_access': True},
|
|
|
|
'container_empty_users_with_project_access': {
|
|
'users': [], 'project_access': True, 'create_acl': True,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'expect_users': [], 'expect_project_access': True},
|
|
|
|
'existing_secret_no_acl_defined': {
|
|
'users': ['u1', 'u2'], 'project_access': False, 'create_acl': False,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'expect_users': [], 'expect_project_access': True,
|
|
'expect_error': False},
|
|
|
|
'acl_operation_specific_remove': {
|
|
'users': ['u1', 'u2', 'u3'], 'project_access': False,
|
|
'create_acl': True, 'per_op_acl_remove': True,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'expect_users': [], 'expect_project_access': True},
|
|
}
|
|
|
|
ACL_ADD_USERS_DATA_POSITIVE = {
|
|
'secret_no_initial_users_access_flag': {
|
|
'users': None, 'project_access': True,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'add_users': ['u4'],
|
|
'expect_users': ['u4'], 'expect_project_access': True},
|
|
|
|
'secret_users_missing_access_flag': {
|
|
'users': ['u1', 'u2'], 'project_access': None,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'add_users': ['u2', 'u4'],
|
|
'expect_users': ['u1', 'u2', 'u4'], 'expect_project_access': True},
|
|
|
|
'container_users_no_project_access_empty_add': {
|
|
'users': ['u1', 'u2', 'u3'], 'project_access': False,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'add_users': [],
|
|
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
|
|
|
|
'container_empty_users_with_project_access_none_add_users': {
|
|
'users': [], 'project_access': True,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'add_users': None,
|
|
'expect_users': [], 'expect_project_access': True},
|
|
|
|
'secret_users_modify_access_flag_in_add': {
|
|
'users': ['u1', 'u2', 'u3'], 'project_access': False,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'add_users': [], 'add_project_access': True,
|
|
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True},
|
|
|
|
}
|
|
|
|
ACL_ADD_USERS_DATA_NEGATIVE = {
|
|
'secret_users_incorrect_access_flag_during_add': {
|
|
'users': ['u1', 'u2'], 'project_access': False,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'add_users': ['u5'], 'add_project_access': 'Incorrect',
|
|
'expect_users': ['u1', 'u2', 'u5'], 'expect_project_access': False,
|
|
'expect_error': True, 'error_code': 400},
|
|
}
|
|
|
|
ACL_REMOVE_USERS_DATA_POSITIVE = {
|
|
'secret_no_initial_users_access_flag': {
|
|
'users': None, 'project_access': True,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'remove_users': ['u4'],
|
|
'expect_users': [], 'expect_project_access': True},
|
|
|
|
'secret_users_missing_access_flag': {
|
|
'users': ['u1', 'u2'], 'project_access': None,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'remove_users': ['u2', 'u4'],
|
|
'expect_users': ['u1'], 'expect_project_access': True},
|
|
|
|
'secret_users_no_matching_users': {
|
|
'users': ['u1', 'u2'], 'project_access': None,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'remove_users': ['u3', 'u4'],
|
|
'expect_users': ['u1', 'u2'], 'expect_project_access': True},
|
|
|
|
'container_users_no_project_access_empty_add': {
|
|
'users': ['u1', 'u2', 'u3'], 'project_access': False,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'remove_users': [],
|
|
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
|
|
|
|
'container_empty_users_with_project_access_none_add_users': {
|
|
'users': [], 'project_access': True,
|
|
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
|
|
'remove_users': None,
|
|
'expect_users': [], 'expect_project_access': True},
|
|
|
|
'secret_users_modify_access_flag_in_remove': {
|
|
'users': ['u1', 'u2', 'u3'], 'project_access': False,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'remove_users': [], 'remove_project_access': True,
|
|
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True},
|
|
}
|
|
|
|
ACL_REMOVE_USERS_DATA_NEGATIVE = {
|
|
'secret_users_incorrect_access_flag_during_add': {
|
|
'users': ['u1', 'u2'], 'project_access': False,
|
|
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
|
|
'remove_users': ['u5'], 'remove_project_access': 'Incorrect',
|
|
'expect_users': ['u1', 'u2'], 'expect_project_access': False,
|
|
'expect_error': True, 'error_code': 400},
|
|
}
|
|
|
|
|
|
class BaseACLsTestCase(base.TestCase):
|
|
|
|
def setUp(self):
|
|
super(BaseACLsTestCase, self).setUp()
|
|
|
|
self.cleanup = cleanup.CleanUp(self.barbicanclient)
|
|
# Set up three secrets
|
|
self.secret_ref_1, self.secret_1 = self._create_a_secret()
|
|
self.secret_ref_2, self.secret_2 = self._create_a_secret()
|
|
self.secret_ref_3, self.secret_3 = self._create_a_secret()
|
|
|
|
self.secret_list = [self.secret_ref_1, self.secret_ref_2,
|
|
self.secret_ref_3]
|
|
|
|
secrets_dict = {'secret_1': self.secret_1, 'secret_2': self.secret_2,
|
|
'secret_3': self.secret_3}
|
|
|
|
create_container_defaults_data['secrets'] = secrets_dict
|
|
|
|
create_container_rsa_data['public_key'] = self.secret_1
|
|
create_container_rsa_data['private_key'] = self.secret_2
|
|
create_container_rsa_data['private_key_passphrase'] = self.secret_3
|
|
|
|
def tearDown(self):
|
|
"""Handles test cleanup.
|
|
|
|
It should be noted that delete all secrets must be called before
|
|
delete containers.
|
|
"""
|
|
self.cleanup.delete_all_entities()
|
|
super(BaseACLsTestCase, self).tearDown()
|
|
|
|
def _create_a_secret(self):
|
|
secret = self.barbicanclient.secrets.create(
|
|
**create_secret_defaults_data)
|
|
secret_ref = self.cleanup.add_entity(secret)
|
|
|
|
return secret_ref, secret
|
|
|
|
def _create_a_container(self):
|
|
container = self.barbicanclient.containers.create(
|
|
**create_container_defaults_data)
|
|
container_ref = self.cleanup.add_entity(container)
|
|
|
|
return container_ref, container
|
|
|
|
|
|
@utils.parameterized_test_case
|
|
class ACLsTestCase(BaseACLsTestCase):
|
|
|
|
@testcase.attr('negative')
|
|
def test_get_non_existent_secret_valid_uuid(self):
|
|
"""A get on a container that does not exist with valid UUID
|
|
|
|
This should return a 404.
|
|
"""
|
|
base_url = self.barbicanclient.acls._api.endpoint_override
|
|
new_uuid = str(uuid.uuid4())
|
|
url = '{0}/containers/{1}'.format(base_url, new_uuid)
|
|
|
|
e = self.assertRaises(
|
|
exceptions.HTTPClientError,
|
|
self.barbicanclient.acls.get,
|
|
url
|
|
)
|
|
|
|
self.assertEqual(e.status_code, 404)
|
|
|
|
@testcase.attr('negative')
|
|
def test_delete_non_existent_secret_valid_uuid(self):
|
|
"""A delete on a ACL when secret with a valid UUID does not exist
|
|
|
|
This should return a 404.
|
|
"""
|
|
base_url = self.barbicanclient.acls._api.endpoint_override
|
|
new_uuid = str(uuid.uuid4())
|
|
url = '{0}/secrets/{1}'.format(base_url, new_uuid)
|
|
|
|
acl_data = {'entity_ref': url}
|
|
entity = self.barbicanclient.acls.create(**acl_data)
|
|
|
|
e = self.assertRaises(
|
|
exceptions.HTTPClientError,
|
|
entity.remove
|
|
)
|
|
|
|
self.assertEqual(e.status_code, 404)
|
|
|
|
@utils.parameterized_dataset(ACL_SUBMIT_DATA_POSITIVE)
|
|
@testcase.attr('positive')
|
|
def test_acl_successful_submit(self, users, project_access,
|
|
entity_ref_method, acl_type,
|
|
expect_users, expect_project_access,
|
|
**kwargs):
|
|
"""Submit operation on ACL entity which stores ACL setting in Barbican.
|
|
|
|
"""
|
|
entity_ref, _ = getattr(self, entity_ref_method)()
|
|
|
|
acl_data = {'entity_ref': entity_ref, 'users': users,
|
|
'project_access': project_access}
|
|
entity = self.barbicanclient.acls.create(**acl_data)
|
|
|
|
acl_ref = self.cleanup.add_entity(entity)
|
|
self.assertIsNotNone(acl_ref)
|
|
self.assertEqual(entity_ref + "/acl", acl_ref)
|
|
|
|
acl_entity = self.barbicanclient.acls.get(entity.entity_ref)
|
|
self.assertIsNotNone(acl_entity)
|
|
|
|
# read acl as dictionary lookup
|
|
acl = acl_entity.get('read')
|
|
self.assertEqual(set(expect_users), set(acl.users))
|
|
self.assertEqual(expect_project_access, acl.project_access)
|
|
self.assertIsNotNone(acl.created)
|
|
self.assertIsNotNone(acl.updated)
|
|
self.assertEqual(acl_type, acl_entity._acl_type)
|
|
|
|
# read acl as property lookup
|
|
acl = acl_entity.read
|
|
self.assertEqual(set(expect_users), set(acl.users))
|
|
self.assertEqual(expect_project_access, acl.project_access)
|
|
|
|
@utils.parameterized_dataset(ACL_SUBMIT_DATA_NEGATIVE)
|
|
@testcase.attr('negative')
|
|
def test_acl_incorrect_submit(self, users, project_access,
|
|
entity_ref_method, acl_type, expect_users,
|
|
expect_project_access, **kwargs):
|
|
"""Incorrect Submit operation on ACL entity which stores ACL setting in
|
|
Barbican.
|
|
|
|
"""
|
|
entity_ref, _ = getattr(self, entity_ref_method)()
|
|
|
|
acl_data = {'entity_ref': entity_ref, 'users': users,
|
|
'project_access': project_access}
|
|
entity = self.barbicanclient.acls.create(**acl_data)
|
|
|
|
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
|
|
e = self.assertRaises(
|
|
error_class,
|
|
entity.submit
|
|
)
|
|
if hasattr(e, 'status_code'):
|
|
self.assertEqual(e.status_code, kwargs.get('error_code'))
|
|
|
|
@utils.parameterized_dataset(ACL_DELETE_DATA)
|
|
def test_acl_delete(self, users, project_access, entity_ref_method,
|
|
create_acl, acl_type, expect_users,
|
|
expect_project_access, **kwargs):
|
|
"""remove operation on ACL entity which stores ACL setting in Barbican.
|
|
|
|
"""
|
|
entity_ref, _ = getattr(self, entity_ref_method)()
|
|
|
|
acl_data = {'entity_ref': entity_ref, 'users': users,
|
|
'project_access': project_access}
|
|
entity = self.barbicanclient.acls.create(**acl_data)
|
|
|
|
entity_ref = entity.entity_ref
|
|
if create_acl:
|
|
self.cleanup.add_entity(entity)
|
|
|
|
acl_op_remove = kwargs.get('per_op_acl_remove')
|
|
if acl_op_remove:
|
|
entity.read.remove()
|
|
else:
|
|
entity.remove()
|
|
|
|
acl_entity = self.barbicanclient.acls.get(entity_ref)
|
|
self.assertIsNotNone(acl_entity)
|
|
|
|
# read acl as dictionary lookup
|
|
acl = acl_entity.get('read')
|
|
self.assertEqual(set(expect_users), set(acl.users))
|
|
self.assertEqual(expect_project_access, acl.project_access)
|
|
self.assertIsNone(acl.created)
|
|
self.assertIsNone(acl.updated)
|
|
self.assertEqual(acl_type, acl_entity._acl_type)
|
|
|
|
# read acl as property lookup
|
|
acl = acl_entity.read
|
|
self.assertEqual(set(expect_users), set(acl.users))
|
|
self.assertEqual(expect_project_access, acl.project_access)
|
|
|
|
@utils.parameterized_dataset(ACL_ADD_USERS_DATA_POSITIVE)
|
|
@testcase.attr('positive')
|
|
def test_acl_successful_add_users(self, users, project_access,
|
|
entity_ref_method, acl_type, add_users,
|
|
expect_users, expect_project_access,
|
|
**kwargs):
|
|
"""Checks client add users behavior on existing ACL entity.
|
|
|
|
In this new users or project access flag is modified and verified for
|
|
expected behavior
|
|
"""
|
|
entity_ref, _ = getattr(self, entity_ref_method)()
|
|
|
|
acl_data = {'entity_ref': entity_ref, 'users': users,
|
|
'project_access': project_access}
|
|
entity = self.barbicanclient.acls.create(**acl_data)
|
|
|
|
acl_ref = self.cleanup.add_entity(entity)
|
|
self.assertIsNotNone(acl_ref)
|
|
self.assertEqual(entity_ref + "/acl", acl_ref)
|
|
|
|
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
|
|
|
|
if server_acl.get('read').users is not None and add_users:
|
|
server_acl.get('read').users.extend(add_users)
|
|
|
|
if kwargs.get('add_project_access') is not None:
|
|
server_acl.get('read').project_access = \
|
|
kwargs.get('add_project_access')
|
|
|
|
acl_ref = server_acl.submit()
|
|
self.assertIsNotNone(acl_ref)
|
|
self.assertEqual(entity_ref + "/acl", acl_ref)
|
|
|
|
acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref)
|
|
self.assertIsNotNone(acl_entity)
|
|
|
|
# read acl as dictionary lookup
|
|
acl = acl_entity.get('read')
|
|
self.assertEqual(set(expect_users), set(acl.users))
|
|
self.assertEqual(expect_project_access, acl.project_access)
|
|
self.assertIsNotNone(acl.created)
|
|
self.assertIsNotNone(acl.updated)
|
|
self.assertEqual(acl_type, acl_entity._acl_type)
|
|
|
|
# read acl as property lookup
|
|
acl = acl_entity.read
|
|
self.assertEqual(set(expect_users), set(acl.users))
|
|
self.assertEqual(expect_project_access, acl.project_access)
|
|
|
|
@utils.parameterized_dataset(ACL_ADD_USERS_DATA_NEGATIVE)
|
|
@testcase.attr('negative')
|
|
def test_acl_add_users_failure(self, users, project_access,
|
|
entity_ref_method, acl_type, add_users,
|
|
expect_users, expect_project_access,
|
|
**kwargs):
|
|
"""Checks client add users failures on existing ACL entity.
|
|
|
|
In this new users or project access flag is modified and verified for
|
|
expected behavior
|
|
"""
|
|
entity_ref, _ = getattr(self, entity_ref_method)()
|
|
|
|
acl_data = {'entity_ref': entity_ref, 'users': users,
|
|
'project_access': project_access}
|
|
entity = self.barbicanclient.acls.create(**acl_data)
|
|
|
|
acl_ref = self.cleanup.add_entity(entity)
|
|
self.assertIsNotNone(acl_ref)
|
|
self.assertEqual(entity_ref + "/acl", acl_ref)
|
|
|
|
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
|
|
|
|
if server_acl.get('read').users is not None and add_users:
|
|
server_acl.get('read').users.extend(add_users)
|
|
|
|
if kwargs.get('add_project_access') is not None:
|
|
server_acl.get('read').project_access = \
|
|
kwargs.get('add_project_access')
|
|
|
|
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
|
|
e = self.assertRaises(
|
|
error_class,
|
|
server_acl.submit
|
|
)
|
|
if hasattr(e, 'status_code'):
|
|
self.assertEqual(e.status_code, kwargs.get('error_code'))
|
|
|
|
@utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_POSITIVE)
|
|
@testcase.attr('positive')
|
|
def test_acl_remove_users_successful(self, users, project_access,
|
|
entity_ref_method, acl_type,
|
|
remove_users, expect_users,
|
|
expect_project_access, **kwargs):
|
|
"""Checks client remove users behavior on existing ACL entity.
|
|
|
|
In this users are removed from existing users list or project access
|
|
flag is modified and then verified for expected behavior
|
|
"""
|
|
entity_ref, _ = getattr(self, entity_ref_method)()
|
|
|
|
acl_data = {'entity_ref': entity_ref, 'users': users,
|
|
'project_access': project_access}
|
|
entity = self.barbicanclient.acls.create(**acl_data)
|
|
|
|
acl_ref = self.cleanup.add_entity(entity)
|
|
self.assertIsNotNone(acl_ref)
|
|
self.assertEqual(entity_ref + "/acl", acl_ref)
|
|
|
|
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
|
|
acl_users = server_acl.read.users
|
|
if acl_users and remove_users:
|
|
acl_users = set(acl_users).difference(remove_users)
|
|
server_acl.read.users = acl_users
|
|
|
|
if kwargs.get('remove_project_access') is not None:
|
|
server_acl.read.project_access = \
|
|
kwargs.get('remove_project_access')
|
|
|
|
acl_ref = server_acl.submit()
|
|
self.assertIsNotNone(acl_ref)
|
|
self.assertEqual(entity_ref + "/acl", acl_ref)
|
|
|
|
acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref)
|
|
self.assertIsNotNone(acl_entity)
|
|
|
|
# read acl as dictionary lookup
|
|
acl = acl_entity.get('read')
|
|
self.assertEqual(set(expect_users), set(acl.users))
|
|
self.assertEqual(expect_project_access, acl.project_access)
|
|
self.assertIsNotNone(acl.created)
|
|
self.assertIsNotNone(acl.updated)
|
|
self.assertEqual(acl_type, acl_entity._acl_type)
|
|
|
|
# read acl as property lookup
|
|
acl = acl_entity.read
|
|
self.assertEqual(set(expect_users), set(acl.users))
|
|
self.assertEqual(expect_project_access, acl.project_access)
|
|
|
|
@utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_NEGATIVE)
|
|
@testcase.attr('negative')
|
|
def test_acl_remove_users_failure(self, users, project_access,
|
|
entity_ref_method, acl_type,
|
|
remove_users, expect_users,
|
|
expect_project_access, **kwargs):
|
|
"""Checks client remove users failures on existing ACL entity.
|
|
|
|
In this users are removed from existing users list or project access
|
|
flag is modified and then verified for expected behavior
|
|
"""
|
|
entity_ref, _ = getattr(self, entity_ref_method)()
|
|
|
|
acl_data = {'entity_ref': entity_ref, 'users': users,
|
|
'project_access': project_access}
|
|
entity = self.barbicanclient.acls.create(**acl_data)
|
|
|
|
acl_ref = self.cleanup.add_entity(entity)
|
|
self.assertIsNotNone(acl_ref)
|
|
self.assertEqual(entity_ref + "/acl", acl_ref)
|
|
|
|
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
|
|
acl_users = server_acl.read.users
|
|
if acl_users and remove_users:
|
|
acl_users = set(acl_users).difference(remove_users)
|
|
server_acl.read.users = acl_users
|
|
|
|
if kwargs.get('remove_project_access') is not None:
|
|
server_acl.read.project_access = \
|
|
kwargs.get('remove_project_access')
|
|
|
|
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
|
|
e = self.assertRaises(
|
|
error_class,
|
|
server_acl.submit
|
|
)
|
|
if hasattr(e, 'status_code'):
|
|
self.assertEqual(e.status_code, kwargs.get('error_code'))
|