From ae22f1532d8454e84fdc6f541f2458c877c1c194 Mon Sep 17 00:00:00 2001 From: Artem Goncharov Date: Fri, 30 Sep 2022 19:49:00 +0200 Subject: [PATCH] Whitelist cloud functional tests in acceptance Next bunch of functional tests adapted to be included in the acceptance tests. Inclusion of tests uncovered issue in getting quota related to the fact that not every user is capable to find/list projects. This is fixed by introduction of ForbiddenException, skipping it in the _find call and using current_project_id. Change-Id: I53e718de239de7fb6f0347ff995282da079c68f3 --- include-acceptance-regular-user.txt | 6 +- openstack/cloud/_network.py | 6 +- openstack/exceptions.py | 15 +++-- openstack/resource.py | 3 +- openstack/tests/functional/base.py | 2 + .../tests/functional/cloud/test_aggregate.py | 2 + .../tests/functional/cloud/test_compute.py | 5 ++ .../tests/functional/cloud/test_devstack.py | 10 --- .../tests/functional/cloud/test_domain.py | 2 + .../tests/functional/cloud/test_endpoints.py | 2 + .../tests/functional/cloud/test_flavor.py | 57 +++++++++++------ .../functional/cloud/test_floating_ip.py | 60 ++++++++++------- .../tests/functional/cloud/test_groups.py | 3 + .../tests/functional/cloud/test_identity.py | 2 + .../tests/functional/cloud/test_inventory.py | 3 + .../tests/functional/cloud/test_limits.py | 6 ++ .../functional/cloud/test_magnum_services.py | 4 +- .../tests/functional/cloud/test_network.py | 3 + .../tests/functional/cloud/test_object.py | 15 +++-- openstack/tests/functional/cloud/test_port.py | 64 +++++++------------ .../tests/functional/cloud/test_project.py | 3 + .../functional/cloud/test_project_cleanup.py | 3 + .../cloud/test_qos_bandwidth_limit_rule.py | 2 + .../cloud/test_qos_dscp_marking_rule.py | 2 + .../cloud/test_qos_minimum_bandwidth_rule.py | 2 + .../tests/functional/cloud/test_qos_policy.py | 2 + .../tests/functional/cloud/test_quotas.py | 43 ++++++++++--- .../tests/functional/cloud/test_router.py | 2 + .../functional/cloud/test_security_groups.py | 16 +++++ .../tests/functional/cloud/test_services.py | 2 + .../tests/functional/cloud/test_users.py | 3 + .../functional/cloud/test_volume_type.py | 2 + .../unit/cloud/test_cluster_templates.py | 4 +- .../tests/unit/cloud/test_identity_roles.py | 4 +- openstack/tests/unit/cloud/test_quotas.py | 4 +- 35 files changed, 238 insertions(+), 126 deletions(-) diff --git a/include-acceptance-regular-user.txt b/include-acceptance-regular-user.txt index 5c5a76019..62773e767 100644 --- a/include-acceptance-regular-user.txt +++ b/include-acceptance-regular-user.txt @@ -1,8 +1,12 @@ # This file contains list of tests that can work with regular user privileges # Until all tests are modified to properly identify whether they are able to # run or must skip the ones that are known to work are listed here. -openstack.tests.functional.network +### Block Storage openstack.tests.functional.block_storage.v3.test_volume # Do not enable test_backup for now, since it is not capable to determine # backup capabilities of the cloud # openstack.tests.functional.block_storage.v3.test_backup +### Cloud +openstack.tests.functional.cloud +### Network +openstack.tests.functional.network diff --git a/openstack/cloud/_network.py b/openstack/cloud/_network.py index 7358490a7..22e4b7dfe 100644 --- a/openstack/cloud/_network.py +++ b/openstack/cloud/_network.py @@ -1,3 +1,4 @@ + # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -663,9 +664,8 @@ class NetworkCloudMixin: :raises: OpenStackCloudException if it's not a valid project :returns: A network ``Quota`` object if found, else None. """ - proj = self.get_project(name_or_id) - if not proj: - raise exc.OpenStackCloudException("project does not exist") + proj = self.identity.find_project( + name_or_id, ignore_missing=False) return self.network.get_quota(proj.id, details) def get_network_extensions(self): diff --git a/openstack/exceptions.py b/openstack/exceptions.py index c2370fef5..ec13dde09 100644 --- a/openstack/exceptions.py +++ b/openstack/exceptions.py @@ -112,6 +112,11 @@ class BadRequestException(HttpException): pass +class ForbiddenException(HttpException): + """HTTP 403 Forbidden Request.""" + pass + + class ConflictException(HttpException): """HTTP 409 Conflict.""" pass @@ -187,12 +192,14 @@ def raise_from_response(response, error_message=None): if response.status_code < 400: return - if response.status_code == 409: - cls = ConflictException + if response.status_code == 400: + cls = BadRequestException + elif response.status_code == 403: + cls = ForbiddenException elif response.status_code == 404: cls = NotFoundException - elif response.status_code == 400: - cls = BadRequestException + elif response.status_code == 409: + cls = ConflictException elif response.status_code == 412: cls = PreconditionFailedException else: diff --git a/openstack/resource.py b/openstack/resource.py index 6044de88e..405507339 100644 --- a/openstack/resource.py +++ b/openstack/resource.py @@ -2226,7 +2226,8 @@ class Resource(dict): id=name_or_id, connection=session._get_connection(), **params ) return match.fetch(session, microversion=microversion, **params) - except (exceptions.NotFoundException, exceptions.BadRequestException): + except (exceptions.NotFoundException, exceptions.BadRequestException, + exceptions.ForbiddenException): # NOTE(gtema): There are few places around openstack that return # 400 if we try to GET resource and it doesn't exist. pass diff --git a/openstack/tests/functional/base.py b/openstack/tests/functional/base.py index ff5d6b10c..381aece28 100644 --- a/openstack/tests/functional/base.py +++ b/openstack/tests/functional/base.py @@ -88,6 +88,8 @@ class BaseFunctionalTest(base.TestCase): cloud=self._demo_name_alt, **kwargs) self.user_cloud_alt = connection.Connection(config=user_config_alt) _disable_keep_alive(self.user_cloud_alt) + else: + self.user_cloud_alt = None def _set_operator_cloud(self, **kwargs): operator_config = self.config.get_one(cloud=self._op_name, **kwargs) diff --git a/openstack/tests/functional/cloud/test_aggregate.py b/openstack/tests/functional/cloud/test_aggregate.py index d7bcc0fd4..e96b64f3d 100644 --- a/openstack/tests/functional/cloud/test_aggregate.py +++ b/openstack/tests/functional/cloud/test_aggregate.py @@ -23,6 +23,8 @@ from openstack.tests.functional import base class TestAggregate(base.BaseFunctionalTest): def test_aggregates(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") aggregate_name = self.getUniqueString() availability_zone = self.getUniqueString() self.addCleanup(self.cleanup, aggregate_name) diff --git a/openstack/tests/functional/cloud/test_compute.py b/openstack/tests/functional/cloud/test_compute.py index f25421abd..d0fefb20f 100644 --- a/openstack/tests/functional/cloud/test_compute.py +++ b/openstack/tests/functional/cloud/test_compute.py @@ -150,6 +150,8 @@ class TestCompute(base.BaseFunctionalTest): self.assertTrue(srv is None or srv.status.lower() == 'deleted') def test_list_all_servers(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") self.addCleanup(self._cleanup_servers_and_volumes, self.server_name) server = self.user_cloud.create_server( name=self.server_name, @@ -474,6 +476,9 @@ class TestCompute(base.BaseFunctionalTest): def test_get_compute_usage(self): '''Test usage functionality''' # Add a server so that we can know we have usage + if not self.operator_cloud: + # TODO(gtema) rework method not to require getting project + self.skipTest("Operator cloud is required for this test") self.addCleanup(self._cleanup_servers_and_volumes, self.server_name) self.user_cloud.create_server( name=self.server_name, diff --git a/openstack/tests/functional/cloud/test_devstack.py b/openstack/tests/functional/cloud/test_devstack.py index 1c72c6aa4..52cced22a 100644 --- a/openstack/tests/functional/cloud/test_devstack.py +++ b/openstack/tests/functional/cloud/test_devstack.py @@ -43,13 +43,3 @@ class TestDevstack(base.BaseFunctionalTest): if os.environ.get( 'OPENSTACKSDK_HAS_{env}'.format(env=self.env), '0') == '1': self.assertTrue(self.user_cloud.has_service(self.service)) - - -class TestKeystoneVersion(base.BaseFunctionalTest): - - def test_keystone_version(self): - use_keystone_v2 = os.environ.get('OPENSTACKSDK_USE_KEYSTONE_V2', False) - if use_keystone_v2 and use_keystone_v2 != '0': - self.assertEqual('2.0', self.identity_version) - else: - self.assertEqual('3', self.identity_version) diff --git a/openstack/tests/functional/cloud/test_domain.py b/openstack/tests/functional/cloud/test_domain.py index 18a070f91..447864356 100644 --- a/openstack/tests/functional/cloud/test_domain.py +++ b/openstack/tests/functional/cloud/test_domain.py @@ -25,6 +25,8 @@ class TestDomain(base.BaseFunctionalTest): def setUp(self): super(TestDomain, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") i_ver = self.operator_cloud.config.get_api_version('identity') if i_ver in ('2', '2.0'): self.skipTest('Identity service does not support domains') diff --git a/openstack/tests/functional/cloud/test_endpoints.py b/openstack/tests/functional/cloud/test_endpoints.py index 39a539709..2381f9775 100644 --- a/openstack/tests/functional/cloud/test_endpoints.py +++ b/openstack/tests/functional/cloud/test_endpoints.py @@ -34,6 +34,8 @@ class TestEndpoints(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestEndpoints, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") # Generate a random name for services and regions in this test self.new_item_name = 'test_' + ''.join( diff --git a/openstack/tests/functional/cloud/test_flavor.py b/openstack/tests/functional/cloud/test_flavor.py index 12e24057d..f1a76b7ff 100644 --- a/openstack/tests/functional/cloud/test_flavor.py +++ b/openstack/tests/functional/cloud/test_flavor.py @@ -35,13 +35,15 @@ class TestFlavor(base.BaseFunctionalTest): def _cleanup_flavors(self): exception_list = list() - for f in self.operator_cloud.list_flavors(get_extra=False): - if f['name'].startswith(self.new_item_name): - try: - self.operator_cloud.delete_flavor(f['id']) - except Exception as e: - # We were unable to delete a flavor, let's try with next - exception_list.append(str(e)) + if self.operator_cloud: + for f in self.operator_cloud.list_flavors(get_extra=False): + if f['name'].startswith(self.new_item_name): + try: + self.operator_cloud.delete_flavor(f['id']) + except Exception as e: + # We were unable to delete a flavor, let's try with + # next + exception_list.append(str(e)) continue if exception_list: # Raise an error: we must make users aware that something went @@ -49,6 +51,9 @@ class TestFlavor(base.BaseFunctionalTest): raise OpenStackCloudException('\n'.join(exception_list)) def test_create_flavor(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + flavor_name = self.new_item_name + '_create' flavor_kwargs = dict( name=flavor_name, ram=1024, vcpus=2, disk=10, ephemeral=5, @@ -85,24 +90,31 @@ class TestFlavor(base.BaseFunctionalTest): name=priv_flavor_name, ram=1024, vcpus=2, disk=10, is_public=False ) - # Create a public and private flavor. We expect both to be listed - # for an operator. - self.operator_cloud.create_flavor(**public_kwargs) - self.operator_cloud.create_flavor(**private_kwargs) + if self.operator_cloud: + # Create a public and private flavor. We expect both to be listed + # for an operator. + self.operator_cloud.create_flavor(**public_kwargs) + self.operator_cloud.create_flavor(**private_kwargs) - flavors = self.operator_cloud.list_flavors(get_extra=False) + flavors = self.operator_cloud.list_flavors(get_extra=False) - # Flavor list will include the standard devstack flavors. We just want - # to make sure both of the flavors we just created are present. - found = [] - for f in flavors: - # extra_specs should be added within list_flavors() - self.assertIn('extra_specs', f) - if f['name'] in (pub_flavor_name, priv_flavor_name): - found.append(f) - self.assertEqual(2, len(found)) + # Flavor list will include the standard devstack flavors. We just + # want to make sure both of the flavors we just created are + # present. + found = [] + for f in flavors: + # extra_specs should be added within list_flavors() + self.assertIn('extra_specs', f) + if f['name'] in (pub_flavor_name, priv_flavor_name): + found.append(f) + self.assertEqual(2, len(found)) + else: + self.user_cloud.list_flavors() def test_flavor_access(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + priv_flavor_name = self.new_item_name + '_private' private_kwargs = dict( name=priv_flavor_name, ram=1024, vcpus=2, disk=10, is_public=False @@ -141,6 +153,9 @@ class TestFlavor(base.BaseFunctionalTest): """ Test setting and unsetting flavor extra specs """ + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + flavor_name = self.new_item_name + '_spec_test' kwargs = dict( name=flavor_name, ram=1024, vcpus=2, disk=10 diff --git a/openstack/tests/functional/cloud/test_floating_ip.py b/openstack/tests/functional/cloud/test_floating_ip.py index 8987decd0..23610c4a3 100644 --- a/openstack/tests/functional/cloud/test_floating_ip.py +++ b/openstack/tests/functional/cloud/test_floating_ip.py @@ -62,7 +62,7 @@ class TestFloatingIP(base.BaseFunctionalTest): r, subnet_id=s['id']) except Exception: pass - self.user_cloud.delete_router(r) + self.user_cloud.delete_router(r.id) except Exception as e: exception_list.append(e) tb_list.append(sys.exc_info()[2]) @@ -71,7 +71,7 @@ class TestFloatingIP(base.BaseFunctionalTest): for s in self.user_cloud.list_subnets(): if s['name'].startswith(self.new_item_name): try: - self.user_cloud.delete_subnet(s) + self.user_cloud.delete_subnet(s.id) except Exception as e: exception_list.append(e) tb_list.append(sys.exc_info()[2]) @@ -80,7 +80,7 @@ class TestFloatingIP(base.BaseFunctionalTest): for n in self.user_cloud.list_networks(): if n['name'].startswith(self.new_item_name): try: - self.user_cloud.delete_network(n) + self.user_cloud.delete_network(n.id) except Exception as e: exception_list.append(e) tb_list.append(sys.exc_info()[2]) @@ -104,7 +104,7 @@ class TestFloatingIP(base.BaseFunctionalTest): for i in self.user_cloud.list_servers(bare=True): if i.name.startswith(self.new_item_name): try: - self.user_cloud.delete_server(i, wait=True) + self.user_cloud.delete_server(i.id, wait=True) except Exception as e: exception_list.append(str(e)) continue @@ -124,7 +124,7 @@ class TestFloatingIP(base.BaseFunctionalTest): if (ip.get('fixed_ip', None) == fixed_ip or ip.get('fixed_ip_address', None) == fixed_ip): try: - self.user_cloud.delete_floating_ip(ip) + self.user_cloud.delete_floating_ip(ip.id) except Exception as e: exception_list.append(str(e)) continue @@ -235,38 +235,50 @@ class TestFloatingIP(base.BaseFunctionalTest): server_id=new_server.id, floating_ip_id=f_ip['id']) def test_list_floating_ips(self): - fip_admin = self.operator_cloud.create_floating_ip() - self.addCleanup(self.operator_cloud.delete_floating_ip, fip_admin.id) + if self.operator_cloud: + fip_admin = self.operator_cloud.create_floating_ip() + self.addCleanup( + self.operator_cloud.delete_floating_ip, fip_admin.id) fip_user = self.user_cloud.create_floating_ip() self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id) # Get all the floating ips. - fip_id_list = [ - fip.id for fip in self.operator_cloud.list_floating_ips() + if self.operator_cloud: + fip_op_id_list = [ + fip.id for fip in self.operator_cloud.list_floating_ips() + ] + fip_user_id_list = [ + fip.id for fip in self.user_cloud.list_floating_ips() ] + if self.user_cloud.has_service('network'): + self.assertIn(fip_user.id, fip_user_id_list) # Neutron returns all FIP for all projects by default - self.assertIn(fip_admin.id, fip_id_list) - self.assertIn(fip_user.id, fip_id_list) + if self.operator_cloud and fip_admin: + self.assertIn(fip_user.id, fip_op_id_list) # Ask Neutron for only a subset of all the FIPs. - filtered_fip_id_list = [ - fip.id for fip in self.operator_cloud.list_floating_ips( - {'tenant_id': self.user_cloud.current_project_id} - ) - ] - self.assertNotIn(fip_admin.id, filtered_fip_id_list) - self.assertIn(fip_user.id, filtered_fip_id_list) + if self.operator_cloud: + filtered_fip_id_list = [ + fip.id for fip in self.operator_cloud.list_floating_ips( + {'tenant_id': self.user_cloud.current_project_id} + ) + ] + self.assertNotIn(fip_admin.id, filtered_fip_id_list) + self.assertIn(fip_user.id, filtered_fip_id_list) else: - self.assertIn(fip_admin.id, fip_id_list) + if fip_admin: + self.assertIn(fip_admin.id, fip_op_id_list) # By default, Nova returns only the FIPs that belong to the # project which made the listing request. - self.assertNotIn(fip_user.id, fip_id_list) - self.assertRaisesRegex( - ValueError, "Nova-network don't support server-side.*", - self.operator_cloud.list_floating_ips, filters={'foo': 'bar'} - ) + if self.operator_cloud: + self.assertNotIn(fip_user.id, fip_op_id_list) + self.assertRaisesRegex( + ValueError, "Nova-network don't support server-side.*", + self.operator_cloud.list_floating_ips, + filters={'foo': 'bar'} + ) def test_search_floating_ips(self): fip_user = self.user_cloud.create_floating_ip() diff --git a/openstack/tests/functional/cloud/test_groups.py b/openstack/tests/functional/cloud/test_groups.py index e9ae28a84..3ef2a5626 100644 --- a/openstack/tests/functional/cloud/test_groups.py +++ b/openstack/tests/functional/cloud/test_groups.py @@ -25,6 +25,9 @@ class TestGroup(base.BaseFunctionalTest): def setUp(self): super(TestGroup, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + i_ver = self.operator_cloud.config.get_api_version('identity') if i_ver in ('2', '2.0'): self.skipTest('Identity service does not support groups') diff --git a/openstack/tests/functional/cloud/test_identity.py b/openstack/tests/functional/cloud/test_identity.py index 5c087ba93..118cf33f9 100644 --- a/openstack/tests/functional/cloud/test_identity.py +++ b/openstack/tests/functional/cloud/test_identity.py @@ -27,6 +27,8 @@ from openstack.tests.functional import base class TestIdentity(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestIdentity, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") self.role_prefix = 'test_role' + ''.join( random.choice(string.ascii_lowercase) for _ in range(5)) self.user_prefix = self.getUniqueString('user') diff --git a/openstack/tests/functional/cloud/test_inventory.py b/openstack/tests/functional/cloud/test_inventory.py index 1401e6c2d..79de47a83 100644 --- a/openstack/tests/functional/cloud/test_inventory.py +++ b/openstack/tests/functional/cloud/test_inventory.py @@ -26,6 +26,9 @@ from openstack.tests.functional import base class TestInventory(base.BaseFunctionalTest): def setUp(self): super().setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + # This needs to use an admin account, otherwise a public IP # is not allocated from devstack. self.inventory = inventory.OpenStackInventory(cloud='devstack-admin') diff --git a/openstack/tests/functional/cloud/test_limits.py b/openstack/tests/functional/cloud/test_limits.py index a4ec061c9..160adaca6 100644 --- a/openstack/tests/functional/cloud/test_limits.py +++ b/openstack/tests/functional/cloud/test_limits.py @@ -33,6 +33,9 @@ class TestUsage(base.BaseFunctionalTest): def test_get_other_compute_limits(self): '''Test quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + limits = self.operator_cloud.get_compute_limits('demo') self.assertIsNotNone(limits) self.assertTrue(hasattr(limits, 'server_meta')) @@ -48,5 +51,8 @@ class TestUsage(base.BaseFunctionalTest): def test_get_other_volume_limits(self): '''Test quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + limits = self.operator_cloud.get_volume_limits('demo') self.assertFalse(hasattr(limits, 'maxTotalVolumes')) diff --git a/openstack/tests/functional/cloud/test_magnum_services.py b/openstack/tests/functional/cloud/test_magnum_services.py index 991b8aac2..80ef5ccf7 100644 --- a/openstack/tests/functional/cloud/test_magnum_services.py +++ b/openstack/tests/functional/cloud/test_magnum_services.py @@ -24,7 +24,7 @@ class TestMagnumServices(base.BaseFunctionalTest): def setUp(self): super(TestMagnumServices, self).setUp() - if not self.operator_cloud.has_service( + if not self.user_cloud.has_service( 'container-infrastructure-management' ): self.skipTest('Container service not supported by cloud') @@ -33,7 +33,7 @@ class TestMagnumServices(base.BaseFunctionalTest): '''Test magnum services functionality''' # Test that we can list services - services = self.operator_cloud.list_magnum_services() + services = self.user_cloud.list_magnum_services() self.assertEqual(1, len(services)) self.assertEqual(services[0]['id'], 1) diff --git a/openstack/tests/functional/cloud/test_network.py b/openstack/tests/functional/cloud/test_network.py index d9d9cada7..d4cfe4267 100644 --- a/openstack/tests/functional/cloud/test_network.py +++ b/openstack/tests/functional/cloud/test_network.py @@ -24,6 +24,9 @@ from openstack.tests.functional import base class TestNetwork(base.BaseFunctionalTest): def setUp(self): super(TestNetwork, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') self.network_name = self.getUniqueString('network') diff --git a/openstack/tests/functional/cloud/test_object.py b/openstack/tests/functional/cloud/test_object.py index 3ee3e3753..cc707f53d 100644 --- a/openstack/tests/functional/cloud/test_object.py +++ b/openstack/tests/functional/cloud/test_object.py @@ -40,10 +40,12 @@ class TestObject(base.BaseFunctionalTest): self.addDetail('container', content.text_content(container_name)) self.addCleanup(self.user_cloud.delete_container, container_name) self.user_cloud.create_container(container_name) - self.assertEqual(container_name, - self.user_cloud.list_containers()[0]['name']) - self.assertEqual([], - self.user_cloud.list_containers(prefix='somethin')) + container = self.user_cloud.get_container(container_name) + self.assertEqual( + container_name, container.name) + self.assertEqual( + [], + self.user_cloud.list_containers(prefix='somethin')) sizes = ( (64 * 1024, 1), # 64K, one segment (64 * 1024, 5) # 64MB, 5 segments @@ -99,8 +101,9 @@ class TestObject(base.BaseFunctionalTest): self.assertTrue( self.user_cloud.delete_object(container_name, name)) self.assertEqual([], self.user_cloud.list_objects(container_name)) - self.assertEqual(container_name, - self.user_cloud.list_containers()[0]['name']) + self.assertEqual( + container_name, + self.user_cloud.get_container(container_name).name) self.user_cloud.delete_container(container_name) def test_download_object_to_file(self): diff --git a/openstack/tests/functional/cloud/test_port.py b/openstack/tests/functional/cloud/test_port.py index b865b5205..a798a20f9 100644 --- a/openstack/tests/functional/cloud/test_port.py +++ b/openstack/tests/functional/cloud/test_port.py @@ -31,9 +31,13 @@ class TestPort(base.BaseFunctionalTest): def setUp(self): super(TestPort, self).setUp() # Skip Neutron tests if neutron is not present - if not self.operator_cloud.has_service('network'): + if not self.user_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') + net_name = self.getUniqueString('CloudPortName') + self.net = self.user_cloud.network.create_network(name=net_name) + self.addCleanup(self.user_cloud.network.delete_network, self.net.id) + # Generate a unique port name to allow concurrent tests self.new_port_name = 'test_' + ''.join( random.choice(string.ascii_lowercase) for _ in range(5)) @@ -43,10 +47,10 @@ class TestPort(base.BaseFunctionalTest): def _cleanup_ports(self): exception_list = list() - for p in self.operator_cloud.list_ports(): + for p in self.user_cloud.list_ports(): if p['name'].startswith(self.new_port_name): try: - self.operator_cloud.delete_port(name_or_id=p['id']) + self.user_cloud.delete_port(name_or_id=p['id']) except Exception as e: # We were unable to delete this port, let's try with next exception_list.append(str(e)) @@ -60,12 +64,8 @@ class TestPort(base.BaseFunctionalTest): def test_create_port(self): port_name = self.new_port_name + '_create' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') - - port = self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) + port = self.user_cloud.create_port( + network_id=self.net.id, name=port_name) self.assertIsInstance(port, dict) self.assertIn('id', port) self.assertEqual(port.get('name'), port_name) @@ -73,17 +73,13 @@ class TestPort(base.BaseFunctionalTest): def test_get_port(self): port_name = self.new_port_name + '_get' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') - - port = self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) + port = self.user_cloud.create_port( + network_id=self.net.id, name=port_name) self.assertIsInstance(port, dict) self.assertIn('id', port) self.assertEqual(port.get('name'), port_name) - updated_port = self.operator_cloud.get_port(name_or_id=port['id']) + updated_port = self.user_cloud.get_port(name_or_id=port['id']) # extra_dhcp_opts is added later by Neutron... if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port: del updated_port['extra_dhcp_opts'] @@ -92,17 +88,13 @@ class TestPort(base.BaseFunctionalTest): def test_get_port_by_id(self): port_name = self.new_port_name + '_get_by_id' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') - - port = self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) + port = self.user_cloud.create_port( + network_id=self.net.id, name=port_name) self.assertIsInstance(port, dict) self.assertIn('id', port) self.assertEqual(port.get('name'), port_name) - updated_port = self.operator_cloud.get_port_by_id(port['id']) + updated_port = self.user_cloud.get_port_by_id(port['id']) # extra_dhcp_opts is added later by Neutron... if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port: del updated_port['extra_dhcp_opts'] @@ -112,19 +104,15 @@ class TestPort(base.BaseFunctionalTest): port_name = self.new_port_name + '_update' new_port_name = port_name + '_new' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') + self.user_cloud.create_port( + network_id=self.net.id, name=port_name) - self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) - - port = self.operator_cloud.update_port( + port = self.user_cloud.update_port( name_or_id=port_name, name=new_port_name) self.assertIsInstance(port, dict) self.assertEqual(port.get('name'), new_port_name) - updated_port = self.operator_cloud.get_port(name_or_id=port['id']) + updated_port = self.user_cloud.get_port(name_or_id=port['id']) self.assertEqual(port.get('name'), new_port_name) port.pop('revision_number', None) port.pop(u'revision_number', None) @@ -140,20 +128,16 @@ class TestPort(base.BaseFunctionalTest): def test_delete_port(self): port_name = self.new_port_name + '_delete' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') - - port = self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) + port = self.user_cloud.create_port( + network_id=self.net.id, name=port_name) self.assertIsInstance(port, dict) self.assertIn('id', port) self.assertEqual(port.get('name'), port_name) - updated_port = self.operator_cloud.get_port(name_or_id=port['id']) + updated_port = self.user_cloud.get_port(name_or_id=port['id']) self.assertIsNotNone(updated_port) - self.operator_cloud.delete_port(name_or_id=port_name) + self.user_cloud.delete_port(name_or_id=port_name) - updated_port = self.operator_cloud.get_port(name_or_id=port['id']) + updated_port = self.user_cloud.get_port(name_or_id=port['id']) self.assertIsNone(updated_port) diff --git a/openstack/tests/functional/cloud/test_project.py b/openstack/tests/functional/cloud/test_project.py index b51e1d4d7..26390f7db 100644 --- a/openstack/tests/functional/cloud/test_project.py +++ b/openstack/tests/functional/cloud/test_project.py @@ -28,6 +28,9 @@ class TestProject(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestProject, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + self.new_project_name = self.getUniqueString('project') self.addCleanup(self._cleanup_projects) diff --git a/openstack/tests/functional/cloud/test_project_cleanup.py b/openstack/tests/functional/cloud/test_project_cleanup.py index f6bed34c0..9016358cc 100644 --- a/openstack/tests/functional/cloud/test_project_cleanup.py +++ b/openstack/tests/functional/cloud/test_project_cleanup.py @@ -27,6 +27,9 @@ class TestProjectCleanup(base.BaseFunctionalTest): def setUp(self): super(TestProjectCleanup, self).setUp() + if not self.user_cloud_alt: + self.skipTest("Alternate demo cloud is required for this test") + self.conn = self.user_cloud_alt self.network_name = self.getUniqueString('network') diff --git a/openstack/tests/functional/cloud/test_qos_bandwidth_limit_rule.py b/openstack/tests/functional/cloud/test_qos_bandwidth_limit_rule.py index 8b731ec5e..e91098df2 100644 --- a/openstack/tests/functional/cloud/test_qos_bandwidth_limit_rule.py +++ b/openstack/tests/functional/cloud/test_qos_bandwidth_limit_rule.py @@ -25,6 +25,8 @@ from openstack.tests.functional import base class TestQosBandwidthLimitRule(base.BaseFunctionalTest): def setUp(self): super(TestQosBandwidthLimitRule, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') if not self.operator_cloud._has_neutron_extension('qos'): diff --git a/openstack/tests/functional/cloud/test_qos_dscp_marking_rule.py b/openstack/tests/functional/cloud/test_qos_dscp_marking_rule.py index a08f128a6..aff8813b3 100644 --- a/openstack/tests/functional/cloud/test_qos_dscp_marking_rule.py +++ b/openstack/tests/functional/cloud/test_qos_dscp_marking_rule.py @@ -25,6 +25,8 @@ from openstack.tests.functional import base class TestQosDscpMarkingRule(base.BaseFunctionalTest): def setUp(self): super(TestQosDscpMarkingRule, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') if not self.operator_cloud._has_neutron_extension('qos'): diff --git a/openstack/tests/functional/cloud/test_qos_minimum_bandwidth_rule.py b/openstack/tests/functional/cloud/test_qos_minimum_bandwidth_rule.py index 36e3d7588..8cc16a893 100644 --- a/openstack/tests/functional/cloud/test_qos_minimum_bandwidth_rule.py +++ b/openstack/tests/functional/cloud/test_qos_minimum_bandwidth_rule.py @@ -25,6 +25,8 @@ from openstack.tests.functional import base class TestQosMinimumBandwidthRule(base.BaseFunctionalTest): def setUp(self): super(TestQosMinimumBandwidthRule, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') if not self.operator_cloud._has_neutron_extension('qos'): diff --git a/openstack/tests/functional/cloud/test_qos_policy.py b/openstack/tests/functional/cloud/test_qos_policy.py index f78834383..6bd3150ce 100644 --- a/openstack/tests/functional/cloud/test_qos_policy.py +++ b/openstack/tests/functional/cloud/test_qos_policy.py @@ -25,6 +25,8 @@ from openstack.tests.functional import base class TestQosPolicy(base.BaseFunctionalTest): def setUp(self): super(TestQosPolicy, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') if not self.operator_cloud._has_neutron_extension('qos'): diff --git a/openstack/tests/functional/cloud/test_quotas.py b/openstack/tests/functional/cloud/test_quotas.py index a1ae19745..2d06167a4 100644 --- a/openstack/tests/functional/cloud/test_quotas.py +++ b/openstack/tests/functional/cloud/test_quotas.py @@ -22,8 +22,16 @@ from openstack.tests.functional import base class TestComputeQuotas(base.BaseFunctionalTest): - def test_quotas(self): + def test_get_quotas(self): '''Test quotas functionality''' + self.user_cloud.get_compute_quotas( + self.user_cloud.current_project_id) + + def test_set_quotas(self): + '''Test quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + quotas = self.operator_cloud.get_compute_quotas('demo') cores = quotas['cores'] self.operator_cloud.set_compute_quotas('demo', cores=cores + 1) @@ -39,11 +47,20 @@ class TestVolumeQuotas(base.BaseFunctionalTest): def setUp(self): super(TestVolumeQuotas, self).setUp() - if not self.operator_cloud.has_service('volume'): + if not self.user_cloud.has_service('volume'): self.skipTest('volume service not supported by cloud') - def test_quotas(self): - '''Test quotas functionality''' + def test_get_quotas(self): + '''Test get quotas functionality''' + self.user_cloud.get_volume_quotas( + self.user_cloud.current_project_id + ) + + def test_set_quotas(self): + '''Test set quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + quotas = self.operator_cloud.get_volume_quotas('demo') volumes = quotas['volumes'] self.operator_cloud.set_volume_quotas('demo', volumes=volumes + 1) @@ -58,13 +75,18 @@ class TestVolumeQuotas(base.BaseFunctionalTest): class TestNetworkQuotas(base.BaseFunctionalTest): - def setUp(self): - super(TestNetworkQuotas, self).setUp() - if not self.operator_cloud.has_service('network'): - self.skipTest('network service not supported by cloud') + def test_get_quotas(self): + '''Test get quotas functionality''' + self.user_cloud.get_network_quotas( + self.user_cloud.current_project_id) def test_quotas(self): '''Test quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + if not self.operator_cloud.has_service('network'): + self.skipTest('network service not supported by cloud') + quotas = self.operator_cloud.get_network_quotas('demo') network = quotas['networks'] self.operator_cloud.set_network_quotas('demo', networks=network + 1) @@ -77,6 +99,11 @@ class TestNetworkQuotas(base.BaseFunctionalTest): self.operator_cloud.get_network_quotas('demo')['networks']) def test_get_quotas_details(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + if not self.operator_cloud.has_service('network'): + self.skipTest('network service not supported by cloud') + quotas = [ 'floating_ips', 'networks', 'ports', 'rbac_policies', 'routers', 'subnets', diff --git a/openstack/tests/functional/cloud/test_router.py b/openstack/tests/functional/cloud/test_router.py index 8c18392f8..ef1dbc2d9 100644 --- a/openstack/tests/functional/cloud/test_router.py +++ b/openstack/tests/functional/cloud/test_router.py @@ -34,6 +34,8 @@ EXPECTED_GW_INFO_FIELDS = ('network_id', 'enable_snat', 'external_fixed_ips') class TestRouter(base.BaseFunctionalTest): def setUp(self): super(TestRouter, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') diff --git a/openstack/tests/functional/cloud/test_security_groups.py b/openstack/tests/functional/cloud/test_security_groups.py index 1a249da77..54ceda6fc 100644 --- a/openstack/tests/functional/cloud/test_security_groups.py +++ b/openstack/tests/functional/cloud/test_security_groups.py @@ -22,6 +22,22 @@ from openstack.tests.functional import base class TestSecurityGroups(base.BaseFunctionalTest): def test_create_list_security_groups(self): + sg1 = self.user_cloud.create_security_group( + name="sg1", description="sg1") + self.addCleanup(self.user_cloud.delete_security_group, sg1['id']) + if self.user_cloud.has_service('network'): + # Neutron defaults to all_tenants=1 when admin + sg_list = self.user_cloud.list_security_groups() + self.assertIn(sg1['id'], [sg['id'] for sg in sg_list]) + + else: + # Nova does not list all tenants by default + sg_list = self.operator_cloud.list_security_groups() + + def test_create_list_security_groups_operator(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + sg1 = self.user_cloud.create_security_group( name="sg1", description="sg1") self.addCleanup(self.user_cloud.delete_security_group, sg1['id']) diff --git a/openstack/tests/functional/cloud/test_services.py b/openstack/tests/functional/cloud/test_services.py index 8e1036e33..70c85d58d 100644 --- a/openstack/tests/functional/cloud/test_services.py +++ b/openstack/tests/functional/cloud/test_services.py @@ -33,6 +33,8 @@ class TestServices(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestServices, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") # Generate a random name for services in this test self.new_service_name = 'test_' + ''.join( diff --git a/openstack/tests/functional/cloud/test_users.py b/openstack/tests/functional/cloud/test_users.py index f69b4c265..3dc94a220 100644 --- a/openstack/tests/functional/cloud/test_users.py +++ b/openstack/tests/functional/cloud/test_users.py @@ -24,6 +24,9 @@ from openstack.tests.functional import base class TestUsers(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestUsers, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + self.user_prefix = self.getUniqueString('user') self.addCleanup(self._cleanup_users) diff --git a/openstack/tests/functional/cloud/test_volume_type.py b/openstack/tests/functional/cloud/test_volume_type.py index 1e7821e4d..d71ee014c 100644 --- a/openstack/tests/functional/cloud/test_volume_type.py +++ b/openstack/tests/functional/cloud/test_volume_type.py @@ -33,6 +33,8 @@ class TestVolumeType(base.BaseFunctionalTest): def setUp(self): super(TestVolumeType, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.user_cloud.has_service('volume'): self.skipTest('volume service not supported by cloud') volume_type = { diff --git a/openstack/tests/unit/cloud/test_cluster_templates.py b/openstack/tests/unit/cloud/test_cluster_templates.py index 13f02e612..15e41d836 100644 --- a/openstack/tests/unit/cloud/test_cluster_templates.py +++ b/openstack/tests/unit/cloud/test_cluster_templates.py @@ -13,7 +13,7 @@ import munch import testtools -import openstack.cloud +from openstack import exceptions from openstack.tests.unit import base @@ -198,7 +198,7 @@ class TestClusterTemplates(base.TestCase): # match the more specific HTTPError, even though it's a subclass # of OpenStackCloudException. with testtools.ExpectedException( - openstack.cloud.OpenStackCloudHTTPError): + exceptions.ForbiddenException): self.cloud.create_cluster_template('fake-cluster-template') self.assert_calls() diff --git a/openstack/tests/unit/cloud/test_identity_roles.py b/openstack/tests/unit/cloud/test_identity_roles.py index 517c54051..045552325 100644 --- a/openstack/tests/unit/cloud/test_identity_roles.py +++ b/openstack/tests/unit/cloud/test_identity_roles.py @@ -14,7 +14,7 @@ import testtools from testtools import matchers -import openstack.cloud +from openstack import exceptions from openstack.tests.unit import base @@ -248,7 +248,7 @@ class TestIdentityRoles(base.TestCase): status_code=403) ]) with testtools.ExpectedException( - openstack.cloud.exc.OpenStackCloudHTTPError + exceptions.ForbiddenException ): self.cloud.list_role_assignments() self.assert_calls() diff --git a/openstack/tests/unit/cloud/test_quotas.py b/openstack/tests/unit/cloud/test_quotas.py index 8eec55af0..00da3985d 100644 --- a/openstack/tests/unit/cloud/test_quotas.py +++ b/openstack/tests/unit/cloud/test_quotas.py @@ -212,7 +212,7 @@ class TestQuotas(base.TestCase): 'port': 500 } project = self.mock_for_keystone_projects(project_count=1, - list_get=True)[0] + id_get=True)[0] self.register_uris([ dict(method='GET', uri=self.get_mock_url( @@ -272,7 +272,7 @@ class TestQuotas(base.TestCase): 'reserved': 0} } project = self.mock_for_keystone_projects(project_count=1, - list_get=True)[0] + id_get=True)[0] self.register_uris([ dict(method='GET', uri=self.get_mock_url(