python-barbicanclient/functionaltests/client/v1/functional/test_acl.py

570 lines
22 KiB
Python

# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from testtools import testcase
from functionaltests import utils
from functionaltests.client import base
from functionaltests.common import cleanup
from barbicanclient import exceptions
create_secret_defaults_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
create_container_defaults_data = {
"name": "containername",
"secrets": {}
}
create_container_rsa_data = {
"name": "rsacontainer",
}
ACL_SUBMIT_DATA_POSITIVE = {
'secret_no_users_access_flag': {
'users': None, 'project_access': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': [], 'expect_project_access': True},
'secret_users_missing_access_flag': {
'users': ['u1', 'u2'], 'project_access': None,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': ['u1', 'u2'], 'expect_project_access': True},
'container_users_no_project_access': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
'container_empty_users_with_project_access': {
'users': [], 'project_access': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': [], 'expect_project_access': True},
}
ACL_SUBMIT_DATA_NEGATIVE = {
'secret_users_incorrect_access_flag': {
'users': ['u1', 'u2'], 'project_access': 'Incorrect_flag',
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': ['u1', 'u2'], 'expect_project_access': True,
'expect_error': True, 'error_code': 400},
'container_incorrect_users_as_str': {
'users': 'u1', 'project_access': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': ['u1'], 'expect_project_access': True,
'expect_error': True, 'error_code': None, 'error_class': ValueError},
}
ACL_DELETE_DATA = {
'secret_no_users_access_flag': {
'users': None, 'project_access': True, 'create_acl': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': [], 'expect_project_access': True},
'secret_users_missing_access_flag': {
'users': ['u1', 'u2'], 'project_access': None, 'create_acl': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': [], 'expect_project_access': True},
'container_users_no_project_access': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'create_acl': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': [], 'expect_project_access': True},
'container_empty_users_with_project_access': {
'users': [], 'project_access': True, 'create_acl': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': [], 'expect_project_access': True},
'existing_secret_no_acl_defined': {
'users': ['u1', 'u2'], 'project_access': False, 'create_acl': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': [], 'expect_project_access': True,
'expect_error': False},
'acl_operation_specific_remove': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'create_acl': True, 'per_op_acl_remove': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': [], 'expect_project_access': True},
}
ACL_ADD_USERS_DATA_POSITIVE = {
'secret_no_initial_users_access_flag': {
'users': None, 'project_access': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'add_users': ['u4'],
'expect_users': ['u4'], 'expect_project_access': True},
'secret_users_missing_access_flag': {
'users': ['u1', 'u2'], 'project_access': None,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'add_users': ['u2', 'u4'],
'expect_users': ['u1', 'u2', 'u4'], 'expect_project_access': True},
'container_users_no_project_access_empty_add': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'add_users': [],
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
'container_empty_users_with_project_access_none_add_users': {
'users': [], 'project_access': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'add_users': None,
'expect_users': [], 'expect_project_access': True},
'secret_users_modify_access_flag_in_add': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'add_users': [], 'add_project_access': True,
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True},
}
ACL_ADD_USERS_DATA_NEGATIVE = {
'secret_users_incorrect_access_flag_during_add': {
'users': ['u1', 'u2'], 'project_access': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'add_users': ['u5'], 'add_project_access': 'Incorrect',
'expect_users': ['u1', 'u2', 'u5'], 'expect_project_access': False,
'expect_error': True, 'error_code': 400},
}
ACL_REMOVE_USERS_DATA_POSITIVE = {
'secret_no_initial_users_access_flag': {
'users': None, 'project_access': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': ['u4'],
'expect_users': [], 'expect_project_access': True},
'secret_users_missing_access_flag': {
'users': ['u1', 'u2'], 'project_access': None,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': ['u2', 'u4'],
'expect_users': ['u1'], 'expect_project_access': True},
'secret_users_no_matching_users': {
'users': ['u1', 'u2'], 'project_access': None,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': ['u3', 'u4'],
'expect_users': ['u1', 'u2'], 'expect_project_access': True},
'container_users_no_project_access_empty_add': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'remove_users': [],
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
'container_empty_users_with_project_access_none_add_users': {
'users': [], 'project_access': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'remove_users': None,
'expect_users': [], 'expect_project_access': True},
'secret_users_modify_access_flag_in_remove': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': [], 'remove_project_access': True,
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True},
}
ACL_REMOVE_USERS_DATA_NEGATIVE = {
'secret_users_incorrect_access_flag_during_add': {
'users': ['u1', 'u2'], 'project_access': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': ['u5'], 'remove_project_access': 'Incorrect',
'expect_users': ['u1', 'u2'], 'expect_project_access': False,
'expect_error': True, 'error_code': 400},
}
class BaseACLsTestCase(base.TestCase):
def setUp(self):
super(BaseACLsTestCase, self).setUp()
self.cleanup = cleanup.CleanUp(self.barbicanclient)
# Set up three secrets
self.secret_ref_1, self.secret_1 = self._create_a_secret()
self.secret_ref_2, self.secret_2 = self._create_a_secret()
self.secret_ref_3, self.secret_3 = self._create_a_secret()
self.secret_list = [self.secret_ref_1, self.secret_ref_2,
self.secret_ref_3]
secrets_dict = {'secret_1': self.secret_1, 'secret_2': self.secret_2,
'secret_3': self.secret_3}
create_container_defaults_data['secrets'] = secrets_dict
create_container_rsa_data['public_key'] = self.secret_1
create_container_rsa_data['private_key'] = self.secret_2
create_container_rsa_data['private_key_passphrase'] = self.secret_3
def tearDown(self):
"""Handles test cleanup.
It should be noted that delete all secrets must be called before
delete containers.
"""
self.cleanup.delete_all_entities()
super(BaseACLsTestCase, self).tearDown()
def _create_a_secret(self):
secret = self.barbicanclient.secrets.create(
**create_secret_defaults_data)
secret_ref = self.cleanup.add_entity(secret)
return secret_ref, secret
def _create_a_container(self):
container = self.barbicanclient.containers.create(
**create_container_defaults_data)
container_ref = self.cleanup.add_entity(container)
return container_ref, container
@utils.parameterized_test_case
class ACLsTestCase(BaseACLsTestCase):
@testcase.attr('negative')
def test_get_non_existent_secret_valid_uuid(self):
"""A get on a container that does not exist with valid UUID
This should return a 404.
"""
base_url = self.barbicanclient.acls._api.endpoint_override
new_uuid = str(uuid.uuid4())
url = '{0}/containers/{1}'.format(base_url, new_uuid)
e = self.assertRaises(
exceptions.HTTPClientError,
self.barbicanclient.acls.get,
url
)
self.assertEqual(404, e.status_code)
@testcase.attr('negative')
def test_delete_non_existent_secret_valid_uuid(self):
"""A delete on an ACL when secret with a valid UUID does not exist
This should return a 404.
"""
base_url = self.barbicanclient.acls._api.endpoint_override
new_uuid = str(uuid.uuid4())
url = '{0}/secrets/{1}'.format(base_url, new_uuid)
acl_data = {'entity_ref': url}
entity = self.barbicanclient.acls.create(**acl_data)
e = self.assertRaises(
exceptions.HTTPClientError,
entity.remove
)
self.assertEqual(404, e.status_code)
@utils.parameterized_dataset(ACL_SUBMIT_DATA_POSITIVE)
@testcase.attr('positive')
def test_acl_successful_submit(self, users, project_access,
entity_ref_method, acl_type,
expect_users, expect_project_access,
**kwargs):
"""Submit operation on ACL entity which stores ACL setting in Barbican.
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
acl_entity = self.barbicanclient.acls.get(entity.entity_ref)
self.assertIsNotNone(acl_entity)
# read acl as dictionary lookup
acl = acl_entity.get('read')
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
self.assertIsNotNone(acl.created)
self.assertIsNotNone(acl.updated)
self.assertEqual(acl_type, acl_entity._acl_type)
# read acl as property lookup
acl = acl_entity.read
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
@utils.parameterized_dataset(ACL_SUBMIT_DATA_NEGATIVE)
@testcase.attr('negative')
def test_acl_incorrect_submit(self, users, project_access,
entity_ref_method, acl_type, expect_users,
expect_project_access, **kwargs):
"""Incorrect Submit operation on ACL entity which stores ACL setting in
Barbican.
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
e = self.assertRaises(
error_class,
entity.submit
)
if hasattr(e, 'status_code'):
self.assertEqual(e.status_code, kwargs.get('error_code'))
@utils.parameterized_dataset(ACL_DELETE_DATA)
def test_acl_delete(self, users, project_access, entity_ref_method,
create_acl, acl_type, expect_users,
expect_project_access, **kwargs):
"""remove operation on ACL entity which stores ACL setting in Barbican.
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
entity_ref = entity.entity_ref
if create_acl:
self.cleanup.add_entity(entity)
acl_op_remove = kwargs.get('per_op_acl_remove')
if acl_op_remove:
entity.read.remove()
else:
entity.remove()
acl_entity = self.barbicanclient.acls.get(entity_ref)
self.assertIsNotNone(acl_entity)
# read acl as dictionary lookup
acl = acl_entity.get('read')
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
self.assertIsNone(acl.created)
self.assertIsNone(acl.updated)
self.assertEqual(acl_type, acl_entity._acl_type)
# read acl as property lookup
acl = acl_entity.read
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
@utils.parameterized_dataset(ACL_ADD_USERS_DATA_POSITIVE)
@testcase.attr('positive')
def test_acl_successful_add_users(self, users, project_access,
entity_ref_method, acl_type, add_users,
expect_users, expect_project_access,
**kwargs):
"""Checks client add users behavior on existing ACL entity.
In this new users or project access flag is modified and verified for
expected behavior
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
if server_acl.get('read').users is not None and add_users:
server_acl.get('read').users.extend(add_users)
if kwargs.get('add_project_access') is not None:
server_acl.get('read').project_access = \
kwargs.get('add_project_access')
acl_ref = server_acl.submit()
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref)
self.assertIsNotNone(acl_entity)
# read acl as dictionary lookup
acl = acl_entity.get('read')
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
self.assertIsNotNone(acl.created)
self.assertIsNotNone(acl.updated)
self.assertEqual(acl_type, acl_entity._acl_type)
# read acl as property lookup
acl = acl_entity.read
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
@utils.parameterized_dataset(ACL_ADD_USERS_DATA_NEGATIVE)
@testcase.attr('negative')
def test_acl_add_users_failure(self, users, project_access,
entity_ref_method, acl_type, add_users,
expect_users, expect_project_access,
**kwargs):
"""Checks client add users failures on existing ACL entity.
In this new users or project access flag is modified and verified for
expected behavior
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
if server_acl.get('read').users is not None and add_users:
server_acl.get('read').users.extend(add_users)
if kwargs.get('add_project_access') is not None:
server_acl.get('read').project_access = \
kwargs.get('add_project_access')
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
e = self.assertRaises(
error_class,
server_acl.submit
)
if hasattr(e, 'status_code'):
self.assertEqual(e.status_code, kwargs.get('error_code'))
@utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_POSITIVE)
@testcase.attr('positive')
def test_acl_remove_users_successful(self, users, project_access,
entity_ref_method, acl_type,
remove_users, expect_users,
expect_project_access, **kwargs):
"""Checks client remove users behavior on existing ACL entity.
In this users are removed from existing users list or project access
flag is modified and then verified for expected behavior
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
acl_users = server_acl.read.users
if acl_users and remove_users:
acl_users = set(acl_users).difference(remove_users)
server_acl.read.users = acl_users
if kwargs.get('remove_project_access') is not None:
server_acl.read.project_access = \
kwargs.get('remove_project_access')
acl_ref = server_acl.submit()
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref)
self.assertIsNotNone(acl_entity)
# read acl as dictionary lookup
acl = acl_entity.get('read')
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
self.assertIsNotNone(acl.created)
self.assertIsNotNone(acl.updated)
self.assertEqual(acl_type, acl_entity._acl_type)
# read acl as property lookup
acl = acl_entity.read
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
@utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_NEGATIVE)
@testcase.attr('negative')
def test_acl_remove_users_failure(self, users, project_access,
entity_ref_method, acl_type,
remove_users, expect_users,
expect_project_access, **kwargs):
"""Checks client remove users failures on existing ACL entity.
In this users are removed from existing users list or project access
flag is modified and then verified for expected behavior
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
acl_users = server_acl.read.users
if acl_users and remove_users:
acl_users = set(acl_users).difference(remove_users)
server_acl.read.users = acl_users
if kwargs.get('remove_project_access') is not None:
server_acl.read.project_access = \
kwargs.get('remove_project_access')
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
e = self.assertRaises(
error_class,
server_acl.submit
)
if hasattr(e, 'status_code'):
self.assertEqual(e.status_code, kwargs.get('error_code'))