Part 3: Adding ACL functional tests.

Added functional tests for CLI commands.
Added functional tests for client API.

Modified ACL client API functional test not to use behavior class
as per recent change in secrets, containers, and orders.

Change-Id: Ibacf33e834daa2c70ef1b0032495bbe443957cac
This commit is contained in:
Arun Kant 2015-08-02 22:06:24 -07:00 committed by Arun Kant
parent 020e24061b
commit af771fecf4
6 changed files with 1057 additions and 1 deletions

View File

@ -0,0 +1,196 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import base_behaviors
class ACLBehaviors(base_behaviors.BaseBehaviors):
_args_map_list = {'users': ['--user', '-u'],
'operation_type': ['--operation-type', '-o']
}
def __init__(self):
super(ACLBehaviors, self).__init__()
self.LOG = logging.getLogger(type(self).__name__)
self.acl_entity_set = set()
def _add_ref_arg(self, argv, entity_ref):
argv.extend([entity_ref])
return argv
def _add_per_acl_args(self, argv, users=[], project_access=None,
operation_type=None, use_short_arg=False):
index = 1 if use_short_arg else 0
if users is not None:
if users:
for user in users:
argv.extend([self._args_map_list['users'][index], user])
else: # empty list case
argv.extend([self._args_map_list['users'][index]])
if project_access is not None:
if project_access:
argv.extend(['--project-access'])
else:
argv.extend(['--no-project-access'])
if operation_type and operation_type is not 'read':
argv.extend([self._args_map_list['operation_type'][index],
operation_type])
return argv
def acl_delete(self, entity_ref):
""" Delete a secret or container acl
:param entity_ref Reference to secret or container entity
:return If error returns stderr string otherwise returns None.
"""
argv = ['acl', 'delete']
self.add_auth_and_endpoint(argv)
self._add_ref_arg(argv, entity_ref)
_, stderr = self.issue_barbican_command(argv)
self.acl_entity_set.discard(entity_ref)
if stderr:
return stderr
def acl_get(self, entity_ref):
"""Get a 'read' ACL setting for a secret or a container.
:param entity_ref Reference to secret or container entity
:return dict of 'read' operation ACL settings if found otherwise empty
dict in case of error.
"""
argv = ['acl', 'get']
self.add_auth_and_endpoint(argv)
self._add_ref_arg(argv, entity_ref)
stdout, stderr = self.issue_barbican_command(argv)
if '4xx Client error: Not Found' in stderr:
return {}
acl_list = self._prettytable_to_list(stdout)
return acl_list[0] # return first ACL which is for 'read' op type
def acl_submit(self, entity_ref, users=None,
project_access=None, use_short_arg=False,
operation_type='read'):
"""Submits a secret or container ACL
:param entity_ref Reference to secret or container entity
:param users List of users for ACL
:param project_access: Flag to pass for project access behavior
:param use_short_arg: Flag to indicate if use short arguments in cli.
Default is False
:param operation_type: ACL operation type. Default is 'read' as
Barbican currently supports only that type of operation.
:return dict of 'read' operation ACL settings if found otherwise empty
dict in case of error.
"""
argv = ['acl', 'submit']
self.add_auth_and_endpoint(argv)
self._add_per_acl_args(argv, users=users,
project_access=project_access,
use_short_arg=use_short_arg,
operation_type=operation_type)
self._add_ref_arg(argv, entity_ref)
stdout, stderr = self.issue_barbican_command(argv)
if '4xx Client error: Not Found' in stderr:
return {}
acl_list = self._prettytable_to_list(stdout)
self.acl_entity_set.add(entity_ref)
return acl_list[0] # return first ACL which is for 'read' op type
def acl_add(self, entity_ref, users=None,
project_access=None, use_short_arg=False,
operation_type='read'):
"""Add to a secret or container ACL
:param entity_ref Reference to secret or container entity
:param users List of users to be added in ACL
:param project_access: Flag to pass for project access behavior
:param use_short_arg: Flag to indicate if use short arguments in cli.
Default is False
:param operation_type: ACL operation type. Default is 'read' as
Barbican currently supports only that type of operation.
:return dict of 'read' operation ACL settings if found otherwise empty
dict in case of error.
"""
argv = ['acl', 'user', 'add']
self.add_auth_and_endpoint(argv)
self._add_per_acl_args(argv, users=users,
project_access=project_access,
use_short_arg=use_short_arg,
operation_type=operation_type)
self._add_ref_arg(argv, entity_ref)
stdout, stderr = self.issue_barbican_command(argv)
if '4xx Client error: Not Found' in stderr:
return {}
self.acl_entity_set.add(entity_ref)
acl_list = self._prettytable_to_list(stdout)
return acl_list
def acl_remove(self, entity_ref, users=None,
project_access=None, use_short_arg=False,
operation_type='read'):
"""Remove users from a secret or container ACL
:param entity_ref Reference to secret or container entity
:param users List of users to be removed from ACL
:param project_access: Flag to pass for project access behavior
:param use_short_arg: Flag to indicate if use short arguments in cli.
Default is False
:param operation_type: ACL operation type. Default is 'read' as
Barbican currently supports only that type of operation.
:return dict of 'read' operation ACL settings if found otherwise empty
dict in case of error.
"""
argv = ['acl', 'user', 'remove']
self.add_auth_and_endpoint(argv)
self._add_per_acl_args(argv, users=users,
project_access=project_access,
use_short_arg=use_short_arg,
operation_type=operation_type)
self._add_ref_arg(argv, entity_ref)
stdout, stderr = self.issue_barbican_command(argv)
if '4xx Client error: Not Found' in stderr:
return {}
acl_list = self._prettytable_to_list(stdout)
return acl_list
def delete_all_created_acls(self):
""" Delete all ACLs that we created """
entities_to_delete = [entry for entry in self.acl_entity_set]
for entity_ref in entities_to_delete:
self.acl_delete(entity_ref)

View File

@ -0,0 +1,244 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from functionaltests import utils
from functionaltests.cli.base import CmdLineTestCase
from functionaltests.cli.v1.behaviors import acl_behaviors
from functionaltests.cli.v1.behaviors import container_behaviors
from functionaltests.cli.v1.behaviors import secret_behaviors
from testtools import testcase
ARGS_TYPE = {'short_arg_false': [False],
'short_arg_true': [True]}
@utils.parameterized_test_case
class ACLTestCase(CmdLineTestCase):
def setUp(self):
super(ACLTestCase, self).setUp()
self.secret_behaviors = secret_behaviors.SecretBehaviors()
self.container_behaviors = container_behaviors.ContainerBehaviors()
self.acl_behaviors = acl_behaviors.ACLBehaviors()
def tearDown(self):
super(ACLTestCase, self).tearDown()
self.acl_behaviors.delete_all_created_acls()
self.container_behaviors.delete_all_created_containers()
self.secret_behaviors.delete_all_created_secrets()
@utils.parameterized_dataset(ARGS_TYPE)
@testcase.attr('positive')
def test_acl_submit(self, use_short_arg):
secret_ref = self.secret_behaviors.store_secret()
container_ref = self.container_behaviors.create_container(
secret_hrefs=[secret_ref])
data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
users=['u1', 'u2'],
use_short_arg=use_short_arg)
self.assertIsNotNone(data)
self.assertIn('u1', data['Users'])
self.assertIn('u2', data['Users'])
self.assertEqual('True', data['Project Access'])
self.assertIn(secret_ref, data['Secret ACL Ref'])
data = self.acl_behaviors.acl_submit(entity_ref=container_ref,
users=['u2', 'u3'],
use_short_arg=use_short_arg)
self.assertIsNotNone(data)
self.assertIn('u3', data['Users'])
self.assertNotIn('u1', data['Users'])
self.assertEqual('True', data['Project Access'])
self.assertIn(container_ref, data['Container ACL Ref'])
@utils.parameterized_dataset(ARGS_TYPE)
@testcase.attr('positive')
def test_acl_submit_for_overwriting_existing_users(self, use_short_arg):
secret_ref = self.secret_behaviors.store_secret()
container_ref = self.container_behaviors.create_container(
secret_hrefs=[secret_ref])
data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
users=['u1', 'u2'],
project_access=False,
use_short_arg=use_short_arg)
self.assertIsNotNone(data)
self.assertIn('u1', data['Users'])
self.assertIn('u2', data['Users'])
self.assertEqual('False', data['Project Access'])
self.assertIn(secret_ref, data['Secret ACL Ref'])
data = self.acl_behaviors.acl_submit(entity_ref=container_ref,
users=[],
project_access=True,
use_short_arg=use_short_arg)
self.assertIsNotNone(data)
self.assertNotIn('u1', data['Users'])
self.assertNotIn('u2', data['Users'])
self.assertEqual('True', data['Project Access'])
self.assertIn(container_ref, data['Container ACL Ref'])
@utils.parameterized_dataset(ARGS_TYPE)
@testcase.attr('positive')
def test_acl_add(self, use_short_arg):
secret_ref = self.secret_behaviors.store_secret()
data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
project_access=False,
users=['u1', 'u2'])
self.assertIsNotNone(data)
self.assertEqual('False', data['Project Access'])
acls = self.acl_behaviors.acl_add(entity_ref=secret_ref,
users=['u2', 'u3'],
use_short_arg=use_short_arg)
data = acls[0] # get 'read' operation ACL data
self.assertIsNotNone(data)
self.assertIn('u1', data['Users'])
self.assertIn('u3', data['Users'])
self.assertEqual('False', data['Project Access'])
self.assertIn(secret_ref, data['Secret ACL Ref'])
# make sure there is no change in existing users with blank users add
acls = self.acl_behaviors.acl_add(entity_ref=secret_ref,
users=[], project_access=False,
use_short_arg=use_short_arg)
data = acls[0] # get 'read' operation ACL data
self.assertIsNotNone(data)
self.assertIn('u1', data['Users'])
self.assertIn('u2', data['Users'])
self.assertIn('u3', data['Users'])
acls = self.acl_behaviors.acl_add(entity_ref=secret_ref,
users=None, project_access=True,
use_short_arg=use_short_arg)
data = acls[0] # get 'read' operation ACL data
self.assertIsNotNone(data)
self.assertIn('u2', data['Users'])
self.assertEqual('True', data['Project Access'])
@utils.parameterized_dataset(ARGS_TYPE)
@testcase.attr('positive')
def test_acl_remove(self, use_short_arg):
secret_ref = self.secret_behaviors.store_secret()
container_ref = self.container_behaviors.create_container(
secret_hrefs=[secret_ref])
secret_ref = self.secret_behaviors.store_secret()
data = self.acl_behaviors.acl_submit(entity_ref=container_ref,
project_access=False,
users=['u1', 'u2'])
self.assertIsNotNone(data)
self.assertEqual('False', data['Project Access'])
acls = self.acl_behaviors.acl_remove(entity_ref=container_ref,
users=['u2', 'u3'],
use_short_arg=use_short_arg)
data = acls[0] # get 'read' operation ACL data
self.assertIsNotNone(data)
self.assertIn('u1', data['Users'])
self.assertNotIn('u2', data['Users'])
self.assertEqual('False', data['Project Access'])
self.assertIn(container_ref, data['Container ACL Ref'])
# make sure there is no change in existing users with blank users
# remove
acls = self.acl_behaviors.acl_remove(entity_ref=container_ref,
users=[], project_access=False,
use_short_arg=use_short_arg)
data = acls[0] # get 'read' operation ACL data
self.assertIsNotNone(data)
self.assertIn('u1', data['Users'])
self.assertEqual('False', data['Project Access'])
@testcase.attr('positive')
def test_acl_get(self):
secret_ref = self.secret_behaviors.store_secret()
container_ref = self.container_behaviors.create_container(
secret_hrefs=[secret_ref])
data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
users=['u1', 'u2'])
self.assertIsNotNone(data)
data = self.acl_behaviors.acl_get(entity_ref=secret_ref)
self.assertIn('u2', data['Users'])
self.assertEqual('True', data['Project Access'])
self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref'])
data = self.acl_behaviors.acl_get(entity_ref=secret_ref + "///")
self.assertIn('u2', data['Users'])
self.assertEqual('True', data['Project Access'])
self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref'])
data = self.acl_behaviors.acl_submit(entity_ref=container_ref,
project_access=False,
users=['u4', 'u5'])
self.assertIsNotNone(data)
data = self.acl_behaviors.acl_get(entity_ref=container_ref)
self.assertIn('u4', data['Users'])
self.assertIn('u5', data['Users'])
self.assertEqual('False', data['Project Access'])
self.assertEqual(container_ref + '/acl', data['Container ACL Ref'])
@testcase.attr('positive')
def test_acl_delete(self):
secret_ref = self.secret_behaviors.store_secret()
data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
users=['u1', 'u2'])
self.assertIsNotNone(data)
self.acl_behaviors.acl_delete(entity_ref=secret_ref)
data = self.acl_behaviors.acl_get(entity_ref=secret_ref)
self.assertEqual('[]', data['Users'])
self.assertEqual('True', data['Project Access'])
self.assertEqual(secret_ref + "/acl", data['Secret ACL Ref'])
# deleting again should be okay as secret or container always has
# default ACL settings
self.acl_behaviors.acl_delete(entity_ref=secret_ref + '////')
data = self.acl_behaviors.acl_get(entity_ref=secret_ref)
self.assertEqual('[]', data['Users'])
self.assertEqual('True', data['Project Access'])
@testcase.attr('negative')
def test_acl_entity_ref_input_with_acl_uri(self):
secret_ref = self.secret_behaviors.store_secret()
container_ref = self.container_behaviors.create_container(
secret_hrefs=[secret_ref])
data = self.acl_behaviors.acl_submit(entity_ref=secret_ref,
users=['u1', 'u2'])
self.assertIsNotNone(data)
err = self.acl_behaviors.acl_delete(entity_ref=container_ref + '/acl')
# above container ACL ref is passed instead of expected container_ref
self.assertIn('Container ACL URI', err)
err = self.acl_behaviors.acl_delete(entity_ref=secret_ref + '/acl')
# above secret ACL ref is passed instead of expected secret_ref
self.assertIn('Secret ACL URI', err)
err = self.acl_behaviors.acl_delete(entity_ref=None)
self.assertIn("Secret or container href", err)

View File

@ -0,0 +1,569 @@
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from testtools import testcase
from functionaltests import utils
from functionaltests.client import base
from functionaltests.common import cleanup
from barbicanclient import exceptions
create_secret_defaults_data = {
"name": "AES key",
"expiration": "2018-02-28T19:14:44.180394",
"algorithm": "aes",
"bit_length": 256,
"mode": "cbc",
"payload": "gF6+lLoF3ohA9aPRpt+6bQ==",
"payload_content_type": "application/octet-stream",
"payload_content_encoding": "base64",
}
create_container_defaults_data = {
"name": "containername",
"secrets": {}
}
create_container_rsa_data = {
"name": "rsacontainer",
}
ACL_SUBMIT_DATA_POSITIVE = {
'secret_no_users_access_flag': {
'users': None, 'project_access': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': [], 'expect_project_access': True},
'secret_users_missing_access_flag': {
'users': ['u1', 'u2'], 'project_access': None,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': ['u1', 'u2'], 'expect_project_access': True},
'container_users_no_project_access': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
'container_empty_users_with_project_access': {
'users': [], 'project_access': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': [], 'expect_project_access': True},
}
ACL_SUBMIT_DATA_NEGATIVE = {
'secret_users_incorrect_access_flag': {
'users': ['u1', 'u2'], 'project_access': 'Incorrect_flag',
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': ['u1', 'u2'], 'expect_project_access': True,
'expect_error': True, 'error_code': 400},
'container_incorrect_users_as_str': {
'users': 'u1', 'project_access': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': ['u1'], 'expect_project_access': True,
'expect_error': True, 'error_code': None, 'error_class': ValueError},
}
ACL_DELETE_DATA = {
'secret_no_users_access_flag': {
'users': None, 'project_access': True, 'create_acl': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': [], 'expect_project_access': True},
'secret_users_missing_access_flag': {
'users': ['u1', 'u2'], 'project_access': None, 'create_acl': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': [], 'expect_project_access': True},
'container_users_no_project_access': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'create_acl': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': [], 'expect_project_access': True},
'container_empty_users_with_project_access': {
'users': [], 'project_access': True, 'create_acl': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': [], 'expect_project_access': True},
'existing_secret_no_acl_defined': {
'users': ['u1', 'u2'], 'project_access': False, 'create_acl': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'expect_users': [], 'expect_project_access': True,
'expect_error': False},
'acl_operation_specific_remove': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'create_acl': True, 'per_op_acl_remove': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'expect_users': [], 'expect_project_access': True},
}
ACL_ADD_USERS_DATA_POSITIVE = {
'secret_no_initial_users_access_flag': {
'users': None, 'project_access': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'add_users': ['u4'],
'expect_users': ['u4'], 'expect_project_access': True},
'secret_users_missing_access_flag': {
'users': ['u1', 'u2'], 'project_access': None,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'add_users': ['u2', 'u4'],
'expect_users': ['u1', 'u2', 'u4'], 'expect_project_access': True},
'container_users_no_project_access_empty_add': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'add_users': [],
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
'container_empty_users_with_project_access_none_add_users': {
'users': [], 'project_access': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'add_users': None,
'expect_users': [], 'expect_project_access': True},
'secret_users_modify_access_flag_in_add': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'add_users': [], 'add_project_access': True,
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True},
}
ACL_ADD_USERS_DATA_NEGATIVE = {
'secret_users_incorrect_access_flag_during_add': {
'users': ['u1', 'u2'], 'project_access': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'add_users': ['u5'], 'add_project_access': 'Incorrect',
'expect_users': ['u1', 'u2', 'u5'], 'expect_project_access': False,
'expect_error': True, 'error_code': 400},
}
ACL_REMOVE_USERS_DATA_POSITIVE = {
'secret_no_initial_users_access_flag': {
'users': None, 'project_access': True,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': ['u4'],
'expect_users': [], 'expect_project_access': True},
'secret_users_missing_access_flag': {
'users': ['u1', 'u2'], 'project_access': None,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': ['u2', 'u4'],
'expect_users': ['u1'], 'expect_project_access': True},
'secret_users_no_matching_users': {
'users': ['u1', 'u2'], 'project_access': None,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': ['u3', 'u4'],
'expect_users': ['u1', 'u2'], 'expect_project_access': True},
'container_users_no_project_access_empty_add': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'remove_users': [],
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': False},
'container_empty_users_with_project_access_none_add_users': {
'users': [], 'project_access': True,
'entity_ref_method': '_create_a_container', 'acl_type': 'container',
'remove_users': None,
'expect_users': [], 'expect_project_access': True},
'secret_users_modify_access_flag_in_remove': {
'users': ['u1', 'u2', 'u3'], 'project_access': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': [], 'remove_project_access': True,
'expect_users': ['u1', 'u2', 'u3'], 'expect_project_access': True},
}
ACL_REMOVE_USERS_DATA_NEGATIVE = {
'secret_users_incorrect_access_flag_during_add': {
'users': ['u1', 'u2'], 'project_access': False,
'entity_ref_method': '_create_a_secret', 'acl_type': 'secret',
'remove_users': ['u5'], 'remove_project_access': 'Incorrect',
'expect_users': ['u1', 'u2'], 'expect_project_access': False,
'expect_error': True, 'error_code': 400},
}
class BaseACLsTestCase(base.TestCase):
def setUp(self):
super(BaseACLsTestCase, self).setUp()
self.cleanup = cleanup.CleanUp(self.barbicanclient)
# Set up three secrets
self.secret_ref_1, self.secret_1 = self._create_a_secret()
self.secret_ref_2, self.secret_2 = self._create_a_secret()
self.secret_ref_3, self.secret_3 = self._create_a_secret()
self.secret_list = [self.secret_ref_1, self.secret_ref_2,
self.secret_ref_3]
secrets_dict = {'secret_1': self.secret_1, 'secret_2': self.secret_2,
'secret_3': self.secret_3}
create_container_defaults_data['secrets'] = secrets_dict
create_container_rsa_data['public_key'] = self.secret_1
create_container_rsa_data['private_key'] = self.secret_2
create_container_rsa_data['private_key_passphrase'] = self.secret_3
def tearDown(self):
"""Handles test cleanup.
It should be noted that delete all secrets must be called before
delete containers.
"""
self.cleanup.delete_all_entities()
super(BaseACLsTestCase, self).tearDown()
def _create_a_secret(self):
secret = self.barbicanclient.secrets.create(
**create_secret_defaults_data)
secret_ref = self.cleanup.add_entity(secret)
return secret_ref, secret
def _create_a_container(self):
container = self.barbicanclient.containers.create(
**create_container_defaults_data)
container_ref = self.cleanup.add_entity(container)
return container_ref, container
@utils.parameterized_test_case
class ACLsTestCase(BaseACLsTestCase):
@testcase.attr('negative')
def test_get_non_existent_secret_valid_uuid(self):
"""A get on a container that does not exist with valid UUID
This should return a 404.
"""
base_url = self.barbicanclient.acls._api.endpoint_override
new_uuid = str(uuid.uuid4())
url = '{0}/containers/{1}'.format(base_url, new_uuid)
e = self.assertRaises(
exceptions.HTTPClientError,
self.barbicanclient.acls.get,
url
)
self.assertEqual(e.status_code, 404)
@testcase.attr('negative')
def test_delete_non_existent_secret_valid_uuid(self):
"""A delete on a ACL when secret with a valid UUID does not exist
This should return a 404.
"""
base_url = self.barbicanclient.acls._api.endpoint_override
new_uuid = str(uuid.uuid4())
url = '{0}/secrets/{1}'.format(base_url, new_uuid)
acl_data = {'entity_ref': url}
entity = self.barbicanclient.acls.create(**acl_data)
e = self.assertRaises(
exceptions.HTTPClientError,
entity.remove
)
self.assertEqual(e.status_code, 404)
@utils.parameterized_dataset(ACL_SUBMIT_DATA_POSITIVE)
@testcase.attr('positive')
def test_acl_successful_submit(self, users, project_access,
entity_ref_method, acl_type,
expect_users, expect_project_access,
**kwargs):
"""Submit operation on ACL entity which stores ACL setting in Barbican.
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
acl_entity = self.barbicanclient.acls.get(entity.entity_ref)
self.assertIsNotNone(acl_entity)
# read acl as dictionary lookup
acl = acl_entity.get('read')
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
self.assertIsNotNone(acl.created)
self.assertIsNotNone(acl.updated)
self.assertEqual(acl_type, acl_entity._acl_type)
# read acl as property lookup
acl = acl_entity.read
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
@utils.parameterized_dataset(ACL_SUBMIT_DATA_NEGATIVE)
@testcase.attr('negative')
def test_acl_incorrect_submit(self, users, project_access,
entity_ref_method, acl_type, expect_users,
expect_project_access, **kwargs):
"""Incorrect Submit operation on ACL entity which stores ACL setting in
Barbican.
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
e = self.assertRaises(
error_class,
entity.submit
)
if hasattr(e, 'status_code'):
self.assertEqual(e.status_code, kwargs.get('error_code'))
@utils.parameterized_dataset(ACL_DELETE_DATA)
def test_acl_delete(self, users, project_access, entity_ref_method,
create_acl, acl_type, expect_users,
expect_project_access, **kwargs):
"""remove operation on ACL entity which stores ACL setting in Barbican.
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
entity_ref = entity.entity_ref
if create_acl:
self.cleanup.add_entity(entity)
acl_op_remove = kwargs.get('per_op_acl_remove')
if acl_op_remove:
entity.read.remove()
else:
entity.remove()
acl_entity = self.barbicanclient.acls.get(entity_ref)
self.assertIsNotNone(acl_entity)
# read acl as dictionary lookup
acl = acl_entity.get('read')
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
self.assertIsNone(acl.created)
self.assertIsNone(acl.updated)
self.assertEqual(acl_type, acl_entity._acl_type)
# read acl as property lookup
acl = acl_entity.read
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
@utils.parameterized_dataset(ACL_ADD_USERS_DATA_POSITIVE)
@testcase.attr('positive')
def test_acl_successful_add_users(self, users, project_access,
entity_ref_method, acl_type, add_users,
expect_users, expect_project_access,
**kwargs):
"""Checks client add users behavior on existing ACL entity.
In this new users or project access flag is modified and verified for
expected behavior
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
if server_acl.get('read').users is not None and add_users:
server_acl.get('read').users.extend(add_users)
if kwargs.get('add_project_access') is not None:
server_acl.get('read').project_access = \
kwargs.get('add_project_access')
acl_ref = server_acl.submit()
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref)
self.assertIsNotNone(acl_entity)
# read acl as dictionary lookup
acl = acl_entity.get('read')
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
self.assertIsNotNone(acl.created)
self.assertIsNotNone(acl.updated)
self.assertEqual(acl_type, acl_entity._acl_type)
# read acl as property lookup
acl = acl_entity.read
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
@utils.parameterized_dataset(ACL_ADD_USERS_DATA_NEGATIVE)
@testcase.attr('negative')
def test_acl_add_users_failure(self, users, project_access,
entity_ref_method, acl_type, add_users,
expect_users, expect_project_access,
**kwargs):
"""Checks client add users failures on existing ACL entity.
In this new users or project access flag is modified and verified for
expected behavior
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
if server_acl.get('read').users is not None and add_users:
server_acl.get('read').users.extend(add_users)
if kwargs.get('add_project_access') is not None:
server_acl.get('read').project_access = \
kwargs.get('add_project_access')
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
e = self.assertRaises(
error_class,
server_acl.submit
)
if hasattr(e, 'status_code'):
self.assertEqual(e.status_code, kwargs.get('error_code'))
@utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_POSITIVE)
@testcase.attr('positive')
def test_acl_remove_users_successful(self, users, project_access,
entity_ref_method, acl_type,
remove_users, expect_users,
expect_project_access, **kwargs):
"""Checks client remove users behavior on existing ACL entity.
In this users are removed from existing users list or project access
flag is modified and then verified for expected behavior
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
acl_users = server_acl.read.users
if acl_users and remove_users:
acl_users = set(acl_users).difference(remove_users)
server_acl.read.users = acl_users
if kwargs.get('remove_project_access') is not None:
server_acl.read.project_access = \
kwargs.get('remove_project_access')
acl_ref = server_acl.submit()
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
acl_entity = self.barbicanclient.acls.get(server_acl.entity_ref)
self.assertIsNotNone(acl_entity)
# read acl as dictionary lookup
acl = acl_entity.get('read')
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
self.assertIsNotNone(acl.created)
self.assertIsNotNone(acl.updated)
self.assertEqual(acl_type, acl_entity._acl_type)
# read acl as property lookup
acl = acl_entity.read
self.assertEqual(set(expect_users), set(acl.users))
self.assertEqual(expect_project_access, acl.project_access)
@utils.parameterized_dataset(ACL_REMOVE_USERS_DATA_NEGATIVE)
@testcase.attr('negative')
def test_acl_remove_users_failure(self, users, project_access,
entity_ref_method, acl_type,
remove_users, expect_users,
expect_project_access, **kwargs):
"""Checks client remove users failures on existing ACL entity.
In this users are removed from existing users list or project access
flag is modified and then verified for expected behavior
"""
entity_ref, _ = getattr(self, entity_ref_method)()
acl_data = {'entity_ref': entity_ref, 'users': users,
'project_access': project_access}
entity = self.barbicanclient.acls.create(**acl_data)
acl_ref = self.cleanup.add_entity(entity)
self.assertIsNotNone(acl_ref)
self.assertEqual(entity_ref + "/acl", acl_ref)
server_acl = self.barbicanclient.acls.get(entity.entity_ref)
acl_users = server_acl.read.users
if acl_users and remove_users:
acl_users = set(acl_users).difference(remove_users)
server_acl.read.users = acl_users
if kwargs.get('remove_project_access') is not None:
server_acl.read.project_access = \
kwargs.get('remove_project_access')
error_class = kwargs.get('error_class', exceptions.HTTPClientError)
e = self.assertRaises(
error_class,
server_acl.submit
)
if hasattr(e, 'status_code'):
self.assertEqual(e.status_code, kwargs.get('error_code'))

View File

@ -234,6 +234,23 @@ class GenericContainersTestCase(BaseContainersTestCase):
container_resp = self.barbicanclient.containers.get(container_ref)
self.assertIsNotNone(container_resp.secret_refs.get(name))
@testcase.attr('positive')
def test_container_read_with_acls(self):
"""Access default ACL settings data on recently created container.
By default, 'read' ACL settings are there for a container.
"""
test_model = self.barbicanclient.containers.create(
**create_container_defaults_data)
container_ref = self.cleanup.add_entity(test_model)
self.assertIsNotNone(container_ref)
container_entity = self.barbicanclient.containers.get(container_ref)
self.assertIsNotNone(container_entity.acls)
self.assertIsNotNone(container_entity.acls.read)
self.assertEqual([], container_entity.acls.read.users)
@utils.parameterized_test_case
class RSAContainersTestCase(BaseContainersTestCase):

View File

@ -98,6 +98,23 @@ class SecretsTestCase(base.TestCase):
resp = self.barbicanclient.secrets.get(secret_ref)
self.assertEqual(resp.algorithm, secret.algorithm)
@testcase.attr('positive')
def test_secret_read_with_acls(self):
"""Access default ACL settings data on recently created secret.
By default, 'read' ACL settings are there for a secret.
"""
test_model = self.barbicanclient.secrets.create(
**secret_create_defaults_data)
secret_ref = self.cleanup.add_entity(test_model)
self.assertIsNotNone(secret_ref)
secret_entity = self.barbicanclient.secrets.get(secret_ref)
self.assertIsNotNone(secret_entity.acls)
self.assertIsNotNone(secret_entity.acls.read)
self.assertEqual([], secret_entity.acls.read.users)
@testcase.attr('positive')
def test_secret_create_defaults_non_standard_mode(self):
"""Create a secret with a non standard mode.

View File

@ -21,6 +21,7 @@ class CleanUp(object):
self.created_entities = {
'secret': [],
'container': [],
'acl': [],
'order': []
}
@ -29,6 +30,7 @@ class CleanUp(object):
def delete_all_entities(self):
"""Helper method to delete all containers and secrets used for
testing"""
self._delete_all_acls()
self._delete_all_containers()
self._delete_all_orders()
self._delete_all_secrets()
@ -38,7 +40,10 @@ class CleanUp(object):
and keeps track of entity for removal after tests are
run"""
entity_type = str(type(entity)).lower()
if 'secret' in entity_type:
if 'acl' in entity_type:
entity_ref = entity.submit()
entity_type = 'acl'
elif 'secret' in entity_type:
entity_ref = entity.store()
entity_type = 'secret'
elif 'container' in entity_type:
@ -62,6 +67,14 @@ class CleanUp(object):
for secret_ref in self.created_entities['secret']:
self.barbicanclient.secrets.delete(secret_ref)
def _delete_all_acls(self):
"""Helper method to delete all acls used for testing"""
for acl_ref in self.created_entities['acl']:
entity_ref = acl_ref.replace("/acl", "")
blank_acl_entity = self.barbicanclient.acls.create(
entity_ref=entity_ref)
blank_acl_entity.remove()
def _delete_all_orders(self):
"""Helper method to delete all orders and secrets used for testing"""
for order_ref in self.created_entities['order']: