diff --git a/glance_tempest_plugin/tests/rbac/v2/base.py b/glance_tempest_plugin/tests/rbac/v2/base.py index 3d65514..96c3f94 100644 --- a/glance_tempest_plugin/tests/rbac/v2/base.py +++ b/glance_tempest_plugin/tests/rbac/v2/base.py @@ -82,14 +82,9 @@ class RbacBaseTests(base.BaseV2ImageTest): def namespace(name, owner, visibility='private', protected=False): - ns = dict() - ns['namespace'] = name - ns['visibility'] = visibility - ns['description'] = data_utils.arbitrary_string() - ns['display_name'] = ns['namespace'] - ns['owner'] = owner - ns['protected'] = protected - return ns + return {'namespace': name, 'visibility': visibility, + 'description': data_utils.arbitrary_string(), + 'display_name': name, 'owner': owner, 'protected': protected} class RbacMetadefBase(RbacBaseTests): @@ -97,21 +92,18 @@ class RbacMetadefBase(RbacBaseTests): """Create private and public namespaces for different projects.""" project_namespaces = [] alt_namespaces = [] - project_id = self.os_project_admin.namespaces_client.project_id - alt_project_id = \ - self.os_project_alt_admin.namespaces_client.project_id for visibility in ['public', 'private']: project_ns = "%s_%s_%s" % ( - project_id, + self.project_id, visibility, data_utils.rand_name('namespace')) - alt_ns = "%s_%s_%s" % (alt_project_id, visibility, + alt_ns = "%s_%s_%s" % (self.alt_project_id, visibility, data_utils.rand_name('namespace')) project_namespace = \ self.os_project_admin.namespaces_client.create_namespace( - **namespace(project_ns, project_id, + **namespace(project_ns, self.project_id, visibility=visibility)) project_namespaces.append(project_namespace) self.addCleanup( @@ -121,7 +113,7 @@ class RbacMetadefBase(RbacBaseTests): alt_namespace = \ self.os_project_admin.namespaces_client.create_namespace( - **namespace(alt_ns, alt_project_id, + **namespace(alt_ns, self.alt_project_id, visibility=visibility)) alt_namespaces.append(alt_namespace) self.addCleanup( diff --git a/glance_tempest_plugin/tests/rbac/v2/metadefs/test_objects.py b/glance_tempest_plugin/tests/rbac/v2/metadefs/test_objects.py new file mode 100644 index 0000000..6ed0d6d --- /dev/null +++ b/glance_tempest_plugin/tests/rbac/v2/metadefs/test_objects.py @@ -0,0 +1,294 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +from tempest.common import tempest_fixtures as fixtures +from tempest.lib.common.utils import data_utils +from tempest.lib import exceptions + +from glance_tempest_plugin.tests.rbac.v2 import base as rbac_base + + +class MetadefV2RbacObjectsTest(rbac_base.RbacMetadefBase): + def setUp(self): + # NOTE(pdeore): As we are using global data there is a possibility + # of invalid results if these tests are executed concurrently, to avoid + # such conflicts we are using locking to execute metadef tests + # serially. + self.useFixture(fixtures.LockFixture('metadef_namespaces')) + super(MetadefV2RbacObjectsTest, self).setUp() + + @classmethod + def setup_clients(cls): + super(MetadefV2RbacObjectsTest, cls).setup_clients() + if 'project_member' in cls.credentials: + persona = 'project_member' + alt_persona = 'project_alt_member' + elif 'project_reader' in cls.credentials: + persona = 'project_reader' + alt_persona = 'project_alt_reader' + else: + persona = 'project_admin' + alt_persona = 'project_alt_admin' + cls.persona = getattr(cls, 'os_%s' % persona) + cls.alt_persona = getattr(cls, 'os_%s' % alt_persona) + cls.project_id = cls.persona.namespace_objects_client.project_id + cls.alt_project_id = \ + cls.alt_persona.namespace_objects_client.project_id + cls.objects_client = cls.persona.namespace_objects_client + + def create_objects(self): + # Create namespace for two different projects + namespaces = self.create_namespaces() + + client = self.os_project_admin.namespace_objects_client + namespace_objects = [] + for ns in namespaces: + if ns['namespace'].startswith(self.project_id): + client = self.os_project_alt_admin.namespace_objects_client + object_name = "object_of_%s" % (ns['namespace']) + namespace_object = client.create_namespace_object( + ns['namespace'], name=object_name, + description=data_utils.arbitrary_string()) + + obj = {'namespace': ns, 'object': namespace_object} + namespace_objects.append(obj) + + return namespace_objects + + def assertObjectsList(self, actual_obj, client, owner=None): + ns = actual_obj['namespace'] + if owner: + if not (ns['visibility'] == 'public' or + ns['owner'] == owner): + self.do_request('list_namespace_objects', + expected_status=exceptions.NotFound, + namespace=ns['namespace'], + client=client) + else: + resp = self.do_request('list_namespace_objects', + expected_status=200, + namespace=ns['namespace'], + client=client) + self.assertIn(actual_obj['object']['name'], + resp['objects'][0]['name']) + else: + resp = self.do_request('list_namespace_objects', + expected_status=200, + namespace=ns['namespace'], + client=client) + self.assertIn(actual_obj['object']['name'], + resp['objects'][0]['name']) + + +class MetadefV2RbacObjectsTemplate(metaclass=abc.ABCMeta): + + @abc.abstractmethod + def test_create_object(self): + """Test add_metadef_object policy.""" + pass + + @abc.abstractmethod + def test_get_object(self): + """Test get_metadef_object policy.""" + pass + + @abc.abstractmethod + def test_list_objects(self): + """Test list_metadef_objects policy.""" + pass + + @abc.abstractmethod + def test_update_object(self): + """Test update_metadef_object policy.""" + pass + + @abc.abstractmethod + def test_delete_object(self): + """Test delete_metadef_object policy.""" + pass + + +class ProjectAdminTests(MetadefV2RbacObjectsTest, + MetadefV2RbacObjectsTemplate): + + credentials = ['project_admin', 'project_alt_admin', 'primary'] + + def test_get_object(self): + ns_objects = self.create_objects() + + # Get all metadef objects with admin role + for obj in ns_objects: + resp = self.do_request( + 'show_namespace_object', + expected_status=200, + client=self.objects_client, + namespace=obj['namespace']['namespace'], + object_name=obj['object']['name']) + self.assertEqual(obj['object']['name'], resp['name']) + + def test_list_objects(self): + ns_objects = self.create_objects() + + # list all metadef objects with admin role + for obj in ns_objects: + self.assertObjectsList(obj, self.objects_client) + + def test_update_object(self): + ns_objects = self.create_objects() + + # update all metadef objects with admin role of 'project' + for obj in ns_objects: + resp = self.do_request( + 'update_namespace_object', + expected_status=200, + namespace=obj['namespace']['namespace'], + client=self.objects_client, + object_name=obj['object']['name'], + name=obj['object']['name'], + description=data_utils.arbitrary_string(base_text="updated")) + self.assertNotEqual(obj['object']['description'], + resp['description']) + + def test_delete_object(self): + ns_objects = self.create_objects() + # delete all metadef objects with admin role of 'project' + for obj in ns_objects: + self.do_request('delete_namespace_object', + expected_status=204, + namespace=obj['namespace']['namespace'], + object_name=obj['object']['name'], + client=self.objects_client) + + # Verify the object is deleted successfully + self.do_request('show_namespace_object', + expected_status=exceptions.NotFound, + client=self.objects_client, + namespace=obj['namespace']['namespace'], + object_name=obj['object']['name']) + + def test_create_object(self): + # As this is been covered in other tests for admin role, + # skipping to test only create objects seperately. + pass + + +class ProjectMemberTests(MetadefV2RbacObjectsTest, + MetadefV2RbacObjectsTemplate): + + credentials = ['project_member', 'project_alt_member', 'project_admin', + 'project_alt_admin', 'primary'] + + def test_create_object(self): + namespaces = self.create_namespaces() + + def assertCreateObjects(namespace, owner, client): + object_name = "object_of_%s" % (namespace['namespace']) + expected_status = exceptions.Forbidden + if (namespace['visibility'] == 'private' and + namespace['owner'] != owner): + expected_status = exceptions.NotFound + + self.do_request('create_namespace_object', + expected_status=expected_status, + namespace=namespace['namespace'], + name=object_name, + client=client) + + # Make sure non admin role of 'project' forbidden to + # create objects + for namespace in namespaces: + assertCreateObjects(namespace, self.project_id, + self.objects_client) + + def test_get_object(self): + + def assertObjectGet(actual_obj, owner, client): + ns = actual_obj['namespace'] + expected_status = 200 + if (ns['visibility'] == 'private' and + ns['owner'] != owner): + expected_status = exceptions.NotFound + + self.do_request('show_namespace_object', + expected_status=expected_status, + namespace=actual_obj['namespace']['namespace'], + object_name=actual_obj['object']['name'], + client=client) + + ns_objects = self.create_objects() + + # Get object - member role from 'project' can access all + # objects of it's own & only objects having public namespace of + # 'alt_project' + for obj in ns_objects: + assertObjectGet(obj, self.project_id, self.objects_client) + + def test_list_objects(self): + ns_objects = self.create_objects() + + # list objects - member role from 'project' can access all + # objects of it's own & only objects having public namespace of + # 'alt_project' + for obj in ns_objects: + self.assertObjectsList(obj, self.objects_client, self.project_id) + + def test_update_object(self): + + def assertObjectUpdate(actual_object, owner, client): + ns = actual_object['namespace'] + expected_status = exceptions.Forbidden + if (ns['visibility'] == 'private' and + ns['owner'] != owner): + expected_status = exceptions.NotFound + + self.do_request('update_namespace_object', + expected_status=expected_status, + name=actual_object['object']['name'], + description=data_utils.arbitrary_string(), + namespace=actual_object['namespace']['namespace'], + object_name=actual_object['object']['name'], + client=client) + + ns_objects = self.create_objects() + # Make sure non admin role of 'project' not allowed to + # update objects + for obj in ns_objects: + assertObjectUpdate(obj, self.project_id, self.objects_client) + + def test_delete_object(self): + + def assertObjectDelete(actual_obj, owner, client): + ns = actual_obj['namespace'] + expected_status = exceptions.Forbidden + if (ns['visibility'] == 'private' and + ns['owner'] != owner): + expected_status = exceptions.NotFound + + self.do_request('delete_namespace_object', + expected_status=expected_status, + namespace=actual_obj['namespace']['namespace'], + object_name=actual_obj['object']['name'], + client=client) + + ns_objects = self.create_objects() + # Make sure non admin role of 'project' not allowed to + # delete objects + for obj in ns_objects: + assertObjectDelete(obj, self.project_id, self.objects_client) + + +class ProjectReaderTests(ProjectMemberTests): + + credentials = ['project_reader', 'project_alt_reader', 'project_admin', + 'project_alt_admin', 'primary']