diff --git a/include-acceptance-regular-user.txt b/include-acceptance-regular-user.txt index 5c5a76019..62773e767 100644 --- a/include-acceptance-regular-user.txt +++ b/include-acceptance-regular-user.txt @@ -1,8 +1,12 @@ # This file contains list of tests that can work with regular user privileges # Until all tests are modified to properly identify whether they are able to # run or must skip the ones that are known to work are listed here. -openstack.tests.functional.network +### Block Storage openstack.tests.functional.block_storage.v3.test_volume # Do not enable test_backup for now, since it is not capable to determine # backup capabilities of the cloud # openstack.tests.functional.block_storage.v3.test_backup +### Cloud +openstack.tests.functional.cloud +### Network +openstack.tests.functional.network diff --git a/openstack/cloud/_network.py b/openstack/cloud/_network.py index 7358490a7..22e4b7dfe 100644 --- a/openstack/cloud/_network.py +++ b/openstack/cloud/_network.py @@ -1,3 +1,4 @@ + # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at @@ -663,9 +664,8 @@ class NetworkCloudMixin: :raises: OpenStackCloudException if it's not a valid project :returns: A network ``Quota`` object if found, else None. """ - proj = self.get_project(name_or_id) - if not proj: - raise exc.OpenStackCloudException("project does not exist") + proj = self.identity.find_project( + name_or_id, ignore_missing=False) return self.network.get_quota(proj.id, details) def get_network_extensions(self): diff --git a/openstack/exceptions.py b/openstack/exceptions.py index c2370fef5..ec13dde09 100644 --- a/openstack/exceptions.py +++ b/openstack/exceptions.py @@ -112,6 +112,11 @@ class BadRequestException(HttpException): pass +class ForbiddenException(HttpException): + """HTTP 403 Forbidden Request.""" + pass + + class ConflictException(HttpException): """HTTP 409 Conflict.""" pass @@ -187,12 +192,14 @@ def raise_from_response(response, error_message=None): if response.status_code < 400: return - if response.status_code == 409: - cls = ConflictException + if response.status_code == 400: + cls = BadRequestException + elif response.status_code == 403: + cls = ForbiddenException elif response.status_code == 404: cls = NotFoundException - elif response.status_code == 400: - cls = BadRequestException + elif response.status_code == 409: + cls = ConflictException elif response.status_code == 412: cls = PreconditionFailedException else: diff --git a/openstack/resource.py b/openstack/resource.py index 6044de88e..405507339 100644 --- a/openstack/resource.py +++ b/openstack/resource.py @@ -2226,7 +2226,8 @@ class Resource(dict): id=name_or_id, connection=session._get_connection(), **params ) return match.fetch(session, microversion=microversion, **params) - except (exceptions.NotFoundException, exceptions.BadRequestException): + except (exceptions.NotFoundException, exceptions.BadRequestException, + exceptions.ForbiddenException): # NOTE(gtema): There are few places around openstack that return # 400 if we try to GET resource and it doesn't exist. pass diff --git a/openstack/tests/functional/base.py b/openstack/tests/functional/base.py index ff5d6b10c..381aece28 100644 --- a/openstack/tests/functional/base.py +++ b/openstack/tests/functional/base.py @@ -88,6 +88,8 @@ class BaseFunctionalTest(base.TestCase): cloud=self._demo_name_alt, **kwargs) self.user_cloud_alt = connection.Connection(config=user_config_alt) _disable_keep_alive(self.user_cloud_alt) + else: + self.user_cloud_alt = None def _set_operator_cloud(self, **kwargs): operator_config = self.config.get_one(cloud=self._op_name, **kwargs) diff --git a/openstack/tests/functional/cloud/test_aggregate.py b/openstack/tests/functional/cloud/test_aggregate.py index d7bcc0fd4..e96b64f3d 100644 --- a/openstack/tests/functional/cloud/test_aggregate.py +++ b/openstack/tests/functional/cloud/test_aggregate.py @@ -23,6 +23,8 @@ from openstack.tests.functional import base class TestAggregate(base.BaseFunctionalTest): def test_aggregates(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") aggregate_name = self.getUniqueString() availability_zone = self.getUniqueString() self.addCleanup(self.cleanup, aggregate_name) diff --git a/openstack/tests/functional/cloud/test_compute.py b/openstack/tests/functional/cloud/test_compute.py index f25421abd..d0fefb20f 100644 --- a/openstack/tests/functional/cloud/test_compute.py +++ b/openstack/tests/functional/cloud/test_compute.py @@ -150,6 +150,8 @@ class TestCompute(base.BaseFunctionalTest): self.assertTrue(srv is None or srv.status.lower() == 'deleted') def test_list_all_servers(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") self.addCleanup(self._cleanup_servers_and_volumes, self.server_name) server = self.user_cloud.create_server( name=self.server_name, @@ -474,6 +476,9 @@ class TestCompute(base.BaseFunctionalTest): def test_get_compute_usage(self): '''Test usage functionality''' # Add a server so that we can know we have usage + if not self.operator_cloud: + # TODO(gtema) rework method not to require getting project + self.skipTest("Operator cloud is required for this test") self.addCleanup(self._cleanup_servers_and_volumes, self.server_name) self.user_cloud.create_server( name=self.server_name, diff --git a/openstack/tests/functional/cloud/test_devstack.py b/openstack/tests/functional/cloud/test_devstack.py index 1c72c6aa4..52cced22a 100644 --- a/openstack/tests/functional/cloud/test_devstack.py +++ b/openstack/tests/functional/cloud/test_devstack.py @@ -43,13 +43,3 @@ class TestDevstack(base.BaseFunctionalTest): if os.environ.get( 'OPENSTACKSDK_HAS_{env}'.format(env=self.env), '0') == '1': self.assertTrue(self.user_cloud.has_service(self.service)) - - -class TestKeystoneVersion(base.BaseFunctionalTest): - - def test_keystone_version(self): - use_keystone_v2 = os.environ.get('OPENSTACKSDK_USE_KEYSTONE_V2', False) - if use_keystone_v2 and use_keystone_v2 != '0': - self.assertEqual('2.0', self.identity_version) - else: - self.assertEqual('3', self.identity_version) diff --git a/openstack/tests/functional/cloud/test_domain.py b/openstack/tests/functional/cloud/test_domain.py index 18a070f91..447864356 100644 --- a/openstack/tests/functional/cloud/test_domain.py +++ b/openstack/tests/functional/cloud/test_domain.py @@ -25,6 +25,8 @@ class TestDomain(base.BaseFunctionalTest): def setUp(self): super(TestDomain, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") i_ver = self.operator_cloud.config.get_api_version('identity') if i_ver in ('2', '2.0'): self.skipTest('Identity service does not support domains') diff --git a/openstack/tests/functional/cloud/test_endpoints.py b/openstack/tests/functional/cloud/test_endpoints.py index 39a539709..2381f9775 100644 --- a/openstack/tests/functional/cloud/test_endpoints.py +++ b/openstack/tests/functional/cloud/test_endpoints.py @@ -34,6 +34,8 @@ class TestEndpoints(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestEndpoints, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") # Generate a random name for services and regions in this test self.new_item_name = 'test_' + ''.join( diff --git a/openstack/tests/functional/cloud/test_flavor.py b/openstack/tests/functional/cloud/test_flavor.py index 12e24057d..f1a76b7ff 100644 --- a/openstack/tests/functional/cloud/test_flavor.py +++ b/openstack/tests/functional/cloud/test_flavor.py @@ -35,13 +35,15 @@ class TestFlavor(base.BaseFunctionalTest): def _cleanup_flavors(self): exception_list = list() - for f in self.operator_cloud.list_flavors(get_extra=False): - if f['name'].startswith(self.new_item_name): - try: - self.operator_cloud.delete_flavor(f['id']) - except Exception as e: - # We were unable to delete a flavor, let's try with next - exception_list.append(str(e)) + if self.operator_cloud: + for f in self.operator_cloud.list_flavors(get_extra=False): + if f['name'].startswith(self.new_item_name): + try: + self.operator_cloud.delete_flavor(f['id']) + except Exception as e: + # We were unable to delete a flavor, let's try with + # next + exception_list.append(str(e)) continue if exception_list: # Raise an error: we must make users aware that something went @@ -49,6 +51,9 @@ class TestFlavor(base.BaseFunctionalTest): raise OpenStackCloudException('\n'.join(exception_list)) def test_create_flavor(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + flavor_name = self.new_item_name + '_create' flavor_kwargs = dict( name=flavor_name, ram=1024, vcpus=2, disk=10, ephemeral=5, @@ -85,24 +90,31 @@ class TestFlavor(base.BaseFunctionalTest): name=priv_flavor_name, ram=1024, vcpus=2, disk=10, is_public=False ) - # Create a public and private flavor. We expect both to be listed - # for an operator. - self.operator_cloud.create_flavor(**public_kwargs) - self.operator_cloud.create_flavor(**private_kwargs) + if self.operator_cloud: + # Create a public and private flavor. We expect both to be listed + # for an operator. + self.operator_cloud.create_flavor(**public_kwargs) + self.operator_cloud.create_flavor(**private_kwargs) - flavors = self.operator_cloud.list_flavors(get_extra=False) + flavors = self.operator_cloud.list_flavors(get_extra=False) - # Flavor list will include the standard devstack flavors. We just want - # to make sure both of the flavors we just created are present. - found = [] - for f in flavors: - # extra_specs should be added within list_flavors() - self.assertIn('extra_specs', f) - if f['name'] in (pub_flavor_name, priv_flavor_name): - found.append(f) - self.assertEqual(2, len(found)) + # Flavor list will include the standard devstack flavors. We just + # want to make sure both of the flavors we just created are + # present. + found = [] + for f in flavors: + # extra_specs should be added within list_flavors() + self.assertIn('extra_specs', f) + if f['name'] in (pub_flavor_name, priv_flavor_name): + found.append(f) + self.assertEqual(2, len(found)) + else: + self.user_cloud.list_flavors() def test_flavor_access(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + priv_flavor_name = self.new_item_name + '_private' private_kwargs = dict( name=priv_flavor_name, ram=1024, vcpus=2, disk=10, is_public=False @@ -141,6 +153,9 @@ class TestFlavor(base.BaseFunctionalTest): """ Test setting and unsetting flavor extra specs """ + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + flavor_name = self.new_item_name + '_spec_test' kwargs = dict( name=flavor_name, ram=1024, vcpus=2, disk=10 diff --git a/openstack/tests/functional/cloud/test_floating_ip.py b/openstack/tests/functional/cloud/test_floating_ip.py index 8987decd0..23610c4a3 100644 --- a/openstack/tests/functional/cloud/test_floating_ip.py +++ b/openstack/tests/functional/cloud/test_floating_ip.py @@ -62,7 +62,7 @@ class TestFloatingIP(base.BaseFunctionalTest): r, subnet_id=s['id']) except Exception: pass - self.user_cloud.delete_router(r) + self.user_cloud.delete_router(r.id) except Exception as e: exception_list.append(e) tb_list.append(sys.exc_info()[2]) @@ -71,7 +71,7 @@ class TestFloatingIP(base.BaseFunctionalTest): for s in self.user_cloud.list_subnets(): if s['name'].startswith(self.new_item_name): try: - self.user_cloud.delete_subnet(s) + self.user_cloud.delete_subnet(s.id) except Exception as e: exception_list.append(e) tb_list.append(sys.exc_info()[2]) @@ -80,7 +80,7 @@ class TestFloatingIP(base.BaseFunctionalTest): for n in self.user_cloud.list_networks(): if n['name'].startswith(self.new_item_name): try: - self.user_cloud.delete_network(n) + self.user_cloud.delete_network(n.id) except Exception as e: exception_list.append(e) tb_list.append(sys.exc_info()[2]) @@ -104,7 +104,7 @@ class TestFloatingIP(base.BaseFunctionalTest): for i in self.user_cloud.list_servers(bare=True): if i.name.startswith(self.new_item_name): try: - self.user_cloud.delete_server(i, wait=True) + self.user_cloud.delete_server(i.id, wait=True) except Exception as e: exception_list.append(str(e)) continue @@ -124,7 +124,7 @@ class TestFloatingIP(base.BaseFunctionalTest): if (ip.get('fixed_ip', None) == fixed_ip or ip.get('fixed_ip_address', None) == fixed_ip): try: - self.user_cloud.delete_floating_ip(ip) + self.user_cloud.delete_floating_ip(ip.id) except Exception as e: exception_list.append(str(e)) continue @@ -235,38 +235,50 @@ class TestFloatingIP(base.BaseFunctionalTest): server_id=new_server.id, floating_ip_id=f_ip['id']) def test_list_floating_ips(self): - fip_admin = self.operator_cloud.create_floating_ip() - self.addCleanup(self.operator_cloud.delete_floating_ip, fip_admin.id) + if self.operator_cloud: + fip_admin = self.operator_cloud.create_floating_ip() + self.addCleanup( + self.operator_cloud.delete_floating_ip, fip_admin.id) fip_user = self.user_cloud.create_floating_ip() self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id) # Get all the floating ips. - fip_id_list = [ - fip.id for fip in self.operator_cloud.list_floating_ips() + if self.operator_cloud: + fip_op_id_list = [ + fip.id for fip in self.operator_cloud.list_floating_ips() + ] + fip_user_id_list = [ + fip.id for fip in self.user_cloud.list_floating_ips() ] + if self.user_cloud.has_service('network'): + self.assertIn(fip_user.id, fip_user_id_list) # Neutron returns all FIP for all projects by default - self.assertIn(fip_admin.id, fip_id_list) - self.assertIn(fip_user.id, fip_id_list) + if self.operator_cloud and fip_admin: + self.assertIn(fip_user.id, fip_op_id_list) # Ask Neutron for only a subset of all the FIPs. - filtered_fip_id_list = [ - fip.id for fip in self.operator_cloud.list_floating_ips( - {'tenant_id': self.user_cloud.current_project_id} - ) - ] - self.assertNotIn(fip_admin.id, filtered_fip_id_list) - self.assertIn(fip_user.id, filtered_fip_id_list) + if self.operator_cloud: + filtered_fip_id_list = [ + fip.id for fip in self.operator_cloud.list_floating_ips( + {'tenant_id': self.user_cloud.current_project_id} + ) + ] + self.assertNotIn(fip_admin.id, filtered_fip_id_list) + self.assertIn(fip_user.id, filtered_fip_id_list) else: - self.assertIn(fip_admin.id, fip_id_list) + if fip_admin: + self.assertIn(fip_admin.id, fip_op_id_list) # By default, Nova returns only the FIPs that belong to the # project which made the listing request. - self.assertNotIn(fip_user.id, fip_id_list) - self.assertRaisesRegex( - ValueError, "Nova-network don't support server-side.*", - self.operator_cloud.list_floating_ips, filters={'foo': 'bar'} - ) + if self.operator_cloud: + self.assertNotIn(fip_user.id, fip_op_id_list) + self.assertRaisesRegex( + ValueError, "Nova-network don't support server-side.*", + self.operator_cloud.list_floating_ips, + filters={'foo': 'bar'} + ) def test_search_floating_ips(self): fip_user = self.user_cloud.create_floating_ip() diff --git a/openstack/tests/functional/cloud/test_groups.py b/openstack/tests/functional/cloud/test_groups.py index e9ae28a84..3ef2a5626 100644 --- a/openstack/tests/functional/cloud/test_groups.py +++ b/openstack/tests/functional/cloud/test_groups.py @@ -25,6 +25,9 @@ class TestGroup(base.BaseFunctionalTest): def setUp(self): super(TestGroup, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + i_ver = self.operator_cloud.config.get_api_version('identity') if i_ver in ('2', '2.0'): self.skipTest('Identity service does not support groups') diff --git a/openstack/tests/functional/cloud/test_identity.py b/openstack/tests/functional/cloud/test_identity.py index 5c087ba93..118cf33f9 100644 --- a/openstack/tests/functional/cloud/test_identity.py +++ b/openstack/tests/functional/cloud/test_identity.py @@ -27,6 +27,8 @@ from openstack.tests.functional import base class TestIdentity(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestIdentity, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") self.role_prefix = 'test_role' + ''.join( random.choice(string.ascii_lowercase) for _ in range(5)) self.user_prefix = self.getUniqueString('user') diff --git a/openstack/tests/functional/cloud/test_inventory.py b/openstack/tests/functional/cloud/test_inventory.py index 1401e6c2d..79de47a83 100644 --- a/openstack/tests/functional/cloud/test_inventory.py +++ b/openstack/tests/functional/cloud/test_inventory.py @@ -26,6 +26,9 @@ from openstack.tests.functional import base class TestInventory(base.BaseFunctionalTest): def setUp(self): super().setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + # This needs to use an admin account, otherwise a public IP # is not allocated from devstack. self.inventory = inventory.OpenStackInventory(cloud='devstack-admin') diff --git a/openstack/tests/functional/cloud/test_limits.py b/openstack/tests/functional/cloud/test_limits.py index a4ec061c9..160adaca6 100644 --- a/openstack/tests/functional/cloud/test_limits.py +++ b/openstack/tests/functional/cloud/test_limits.py @@ -33,6 +33,9 @@ class TestUsage(base.BaseFunctionalTest): def test_get_other_compute_limits(self): '''Test quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + limits = self.operator_cloud.get_compute_limits('demo') self.assertIsNotNone(limits) self.assertTrue(hasattr(limits, 'server_meta')) @@ -48,5 +51,8 @@ class TestUsage(base.BaseFunctionalTest): def test_get_other_volume_limits(self): '''Test quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + limits = self.operator_cloud.get_volume_limits('demo') self.assertFalse(hasattr(limits, 'maxTotalVolumes')) diff --git a/openstack/tests/functional/cloud/test_magnum_services.py b/openstack/tests/functional/cloud/test_magnum_services.py index 991b8aac2..80ef5ccf7 100644 --- a/openstack/tests/functional/cloud/test_magnum_services.py +++ b/openstack/tests/functional/cloud/test_magnum_services.py @@ -24,7 +24,7 @@ class TestMagnumServices(base.BaseFunctionalTest): def setUp(self): super(TestMagnumServices, self).setUp() - if not self.operator_cloud.has_service( + if not self.user_cloud.has_service( 'container-infrastructure-management' ): self.skipTest('Container service not supported by cloud') @@ -33,7 +33,7 @@ class TestMagnumServices(base.BaseFunctionalTest): '''Test magnum services functionality''' # Test that we can list services - services = self.operator_cloud.list_magnum_services() + services = self.user_cloud.list_magnum_services() self.assertEqual(1, len(services)) self.assertEqual(services[0]['id'], 1) diff --git a/openstack/tests/functional/cloud/test_network.py b/openstack/tests/functional/cloud/test_network.py index d9d9cada7..d4cfe4267 100644 --- a/openstack/tests/functional/cloud/test_network.py +++ b/openstack/tests/functional/cloud/test_network.py @@ -24,6 +24,9 @@ from openstack.tests.functional import base class TestNetwork(base.BaseFunctionalTest): def setUp(self): super(TestNetwork, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') self.network_name = self.getUniqueString('network') diff --git a/openstack/tests/functional/cloud/test_object.py b/openstack/tests/functional/cloud/test_object.py index 3ee3e3753..cc707f53d 100644 --- a/openstack/tests/functional/cloud/test_object.py +++ b/openstack/tests/functional/cloud/test_object.py @@ -40,10 +40,12 @@ class TestObject(base.BaseFunctionalTest): self.addDetail('container', content.text_content(container_name)) self.addCleanup(self.user_cloud.delete_container, container_name) self.user_cloud.create_container(container_name) - self.assertEqual(container_name, - self.user_cloud.list_containers()[0]['name']) - self.assertEqual([], - self.user_cloud.list_containers(prefix='somethin')) + container = self.user_cloud.get_container(container_name) + self.assertEqual( + container_name, container.name) + self.assertEqual( + [], + self.user_cloud.list_containers(prefix='somethin')) sizes = ( (64 * 1024, 1), # 64K, one segment (64 * 1024, 5) # 64MB, 5 segments @@ -99,8 +101,9 @@ class TestObject(base.BaseFunctionalTest): self.assertTrue( self.user_cloud.delete_object(container_name, name)) self.assertEqual([], self.user_cloud.list_objects(container_name)) - self.assertEqual(container_name, - self.user_cloud.list_containers()[0]['name']) + self.assertEqual( + container_name, + self.user_cloud.get_container(container_name).name) self.user_cloud.delete_container(container_name) def test_download_object_to_file(self): diff --git a/openstack/tests/functional/cloud/test_port.py b/openstack/tests/functional/cloud/test_port.py index b865b5205..a798a20f9 100644 --- a/openstack/tests/functional/cloud/test_port.py +++ b/openstack/tests/functional/cloud/test_port.py @@ -31,9 +31,13 @@ class TestPort(base.BaseFunctionalTest): def setUp(self): super(TestPort, self).setUp() # Skip Neutron tests if neutron is not present - if not self.operator_cloud.has_service('network'): + if not self.user_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') + net_name = self.getUniqueString('CloudPortName') + self.net = self.user_cloud.network.create_network(name=net_name) + self.addCleanup(self.user_cloud.network.delete_network, self.net.id) + # Generate a unique port name to allow concurrent tests self.new_port_name = 'test_' + ''.join( random.choice(string.ascii_lowercase) for _ in range(5)) @@ -43,10 +47,10 @@ class TestPort(base.BaseFunctionalTest): def _cleanup_ports(self): exception_list = list() - for p in self.operator_cloud.list_ports(): + for p in self.user_cloud.list_ports(): if p['name'].startswith(self.new_port_name): try: - self.operator_cloud.delete_port(name_or_id=p['id']) + self.user_cloud.delete_port(name_or_id=p['id']) except Exception as e: # We were unable to delete this port, let's try with next exception_list.append(str(e)) @@ -60,12 +64,8 @@ class TestPort(base.BaseFunctionalTest): def test_create_port(self): port_name = self.new_port_name + '_create' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') - - port = self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) + port = self.user_cloud.create_port( + network_id=self.net.id, name=port_name) self.assertIsInstance(port, dict) self.assertIn('id', port) self.assertEqual(port.get('name'), port_name) @@ -73,17 +73,13 @@ class TestPort(base.BaseFunctionalTest): def test_get_port(self): port_name = self.new_port_name + '_get' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') - - port = self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) + port = self.user_cloud.create_port( + network_id=self.net.id, name=port_name) self.assertIsInstance(port, dict) self.assertIn('id', port) self.assertEqual(port.get('name'), port_name) - updated_port = self.operator_cloud.get_port(name_or_id=port['id']) + updated_port = self.user_cloud.get_port(name_or_id=port['id']) # extra_dhcp_opts is added later by Neutron... if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port: del updated_port['extra_dhcp_opts'] @@ -92,17 +88,13 @@ class TestPort(base.BaseFunctionalTest): def test_get_port_by_id(self): port_name = self.new_port_name + '_get_by_id' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') - - port = self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) + port = self.user_cloud.create_port( + network_id=self.net.id, name=port_name) self.assertIsInstance(port, dict) self.assertIn('id', port) self.assertEqual(port.get('name'), port_name) - updated_port = self.operator_cloud.get_port_by_id(port['id']) + updated_port = self.user_cloud.get_port_by_id(port['id']) # extra_dhcp_opts is added later by Neutron... if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port: del updated_port['extra_dhcp_opts'] @@ -112,19 +104,15 @@ class TestPort(base.BaseFunctionalTest): port_name = self.new_port_name + '_update' new_port_name = port_name + '_new' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') + self.user_cloud.create_port( + network_id=self.net.id, name=port_name) - self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) - - port = self.operator_cloud.update_port( + port = self.user_cloud.update_port( name_or_id=port_name, name=new_port_name) self.assertIsInstance(port, dict) self.assertEqual(port.get('name'), new_port_name) - updated_port = self.operator_cloud.get_port(name_or_id=port['id']) + updated_port = self.user_cloud.get_port(name_or_id=port['id']) self.assertEqual(port.get('name'), new_port_name) port.pop('revision_number', None) port.pop(u'revision_number', None) @@ -140,20 +128,16 @@ class TestPort(base.BaseFunctionalTest): def test_delete_port(self): port_name = self.new_port_name + '_delete' - networks = self.operator_cloud.list_networks() - if not networks: - self.assertFalse('no sensible network available') - - port = self.operator_cloud.create_port( - network_id=networks[0]['id'], name=port_name) + port = self.user_cloud.create_port( + network_id=self.net.id, name=port_name) self.assertIsInstance(port, dict) self.assertIn('id', port) self.assertEqual(port.get('name'), port_name) - updated_port = self.operator_cloud.get_port(name_or_id=port['id']) + updated_port = self.user_cloud.get_port(name_or_id=port['id']) self.assertIsNotNone(updated_port) - self.operator_cloud.delete_port(name_or_id=port_name) + self.user_cloud.delete_port(name_or_id=port_name) - updated_port = self.operator_cloud.get_port(name_or_id=port['id']) + updated_port = self.user_cloud.get_port(name_or_id=port['id']) self.assertIsNone(updated_port) diff --git a/openstack/tests/functional/cloud/test_project.py b/openstack/tests/functional/cloud/test_project.py index b51e1d4d7..26390f7db 100644 --- a/openstack/tests/functional/cloud/test_project.py +++ b/openstack/tests/functional/cloud/test_project.py @@ -28,6 +28,9 @@ class TestProject(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestProject, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + self.new_project_name = self.getUniqueString('project') self.addCleanup(self._cleanup_projects) diff --git a/openstack/tests/functional/cloud/test_project_cleanup.py b/openstack/tests/functional/cloud/test_project_cleanup.py index f6bed34c0..9016358cc 100644 --- a/openstack/tests/functional/cloud/test_project_cleanup.py +++ b/openstack/tests/functional/cloud/test_project_cleanup.py @@ -27,6 +27,9 @@ class TestProjectCleanup(base.BaseFunctionalTest): def setUp(self): super(TestProjectCleanup, self).setUp() + if not self.user_cloud_alt: + self.skipTest("Alternate demo cloud is required for this test") + self.conn = self.user_cloud_alt self.network_name = self.getUniqueString('network') diff --git a/openstack/tests/functional/cloud/test_qos_bandwidth_limit_rule.py b/openstack/tests/functional/cloud/test_qos_bandwidth_limit_rule.py index 8b731ec5e..e91098df2 100644 --- a/openstack/tests/functional/cloud/test_qos_bandwidth_limit_rule.py +++ b/openstack/tests/functional/cloud/test_qos_bandwidth_limit_rule.py @@ -25,6 +25,8 @@ from openstack.tests.functional import base class TestQosBandwidthLimitRule(base.BaseFunctionalTest): def setUp(self): super(TestQosBandwidthLimitRule, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') if not self.operator_cloud._has_neutron_extension('qos'): diff --git a/openstack/tests/functional/cloud/test_qos_dscp_marking_rule.py b/openstack/tests/functional/cloud/test_qos_dscp_marking_rule.py index a08f128a6..aff8813b3 100644 --- a/openstack/tests/functional/cloud/test_qos_dscp_marking_rule.py +++ b/openstack/tests/functional/cloud/test_qos_dscp_marking_rule.py @@ -25,6 +25,8 @@ from openstack.tests.functional import base class TestQosDscpMarkingRule(base.BaseFunctionalTest): def setUp(self): super(TestQosDscpMarkingRule, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') if not self.operator_cloud._has_neutron_extension('qos'): diff --git a/openstack/tests/functional/cloud/test_qos_minimum_bandwidth_rule.py b/openstack/tests/functional/cloud/test_qos_minimum_bandwidth_rule.py index 36e3d7588..8cc16a893 100644 --- a/openstack/tests/functional/cloud/test_qos_minimum_bandwidth_rule.py +++ b/openstack/tests/functional/cloud/test_qos_minimum_bandwidth_rule.py @@ -25,6 +25,8 @@ from openstack.tests.functional import base class TestQosMinimumBandwidthRule(base.BaseFunctionalTest): def setUp(self): super(TestQosMinimumBandwidthRule, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') if not self.operator_cloud._has_neutron_extension('qos'): diff --git a/openstack/tests/functional/cloud/test_qos_policy.py b/openstack/tests/functional/cloud/test_qos_policy.py index f78834383..6bd3150ce 100644 --- a/openstack/tests/functional/cloud/test_qos_policy.py +++ b/openstack/tests/functional/cloud/test_qos_policy.py @@ -25,6 +25,8 @@ from openstack.tests.functional import base class TestQosPolicy(base.BaseFunctionalTest): def setUp(self): super(TestQosPolicy, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') if not self.operator_cloud._has_neutron_extension('qos'): diff --git a/openstack/tests/functional/cloud/test_quotas.py b/openstack/tests/functional/cloud/test_quotas.py index a1ae19745..2d06167a4 100644 --- a/openstack/tests/functional/cloud/test_quotas.py +++ b/openstack/tests/functional/cloud/test_quotas.py @@ -22,8 +22,16 @@ from openstack.tests.functional import base class TestComputeQuotas(base.BaseFunctionalTest): - def test_quotas(self): + def test_get_quotas(self): '''Test quotas functionality''' + self.user_cloud.get_compute_quotas( + self.user_cloud.current_project_id) + + def test_set_quotas(self): + '''Test quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + quotas = self.operator_cloud.get_compute_quotas('demo') cores = quotas['cores'] self.operator_cloud.set_compute_quotas('demo', cores=cores + 1) @@ -39,11 +47,20 @@ class TestVolumeQuotas(base.BaseFunctionalTest): def setUp(self): super(TestVolumeQuotas, self).setUp() - if not self.operator_cloud.has_service('volume'): + if not self.user_cloud.has_service('volume'): self.skipTest('volume service not supported by cloud') - def test_quotas(self): - '''Test quotas functionality''' + def test_get_quotas(self): + '''Test get quotas functionality''' + self.user_cloud.get_volume_quotas( + self.user_cloud.current_project_id + ) + + def test_set_quotas(self): + '''Test set quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + quotas = self.operator_cloud.get_volume_quotas('demo') volumes = quotas['volumes'] self.operator_cloud.set_volume_quotas('demo', volumes=volumes + 1) @@ -58,13 +75,18 @@ class TestVolumeQuotas(base.BaseFunctionalTest): class TestNetworkQuotas(base.BaseFunctionalTest): - def setUp(self): - super(TestNetworkQuotas, self).setUp() - if not self.operator_cloud.has_service('network'): - self.skipTest('network service not supported by cloud') + def test_get_quotas(self): + '''Test get quotas functionality''' + self.user_cloud.get_network_quotas( + self.user_cloud.current_project_id) def test_quotas(self): '''Test quotas functionality''' + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + if not self.operator_cloud.has_service('network'): + self.skipTest('network service not supported by cloud') + quotas = self.operator_cloud.get_network_quotas('demo') network = quotas['networks'] self.operator_cloud.set_network_quotas('demo', networks=network + 1) @@ -77,6 +99,11 @@ class TestNetworkQuotas(base.BaseFunctionalTest): self.operator_cloud.get_network_quotas('demo')['networks']) def test_get_quotas_details(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + if not self.operator_cloud.has_service('network'): + self.skipTest('network service not supported by cloud') + quotas = [ 'floating_ips', 'networks', 'ports', 'rbac_policies', 'routers', 'subnets', diff --git a/openstack/tests/functional/cloud/test_router.py b/openstack/tests/functional/cloud/test_router.py index 8c18392f8..ef1dbc2d9 100644 --- a/openstack/tests/functional/cloud/test_router.py +++ b/openstack/tests/functional/cloud/test_router.py @@ -34,6 +34,8 @@ EXPECTED_GW_INFO_FIELDS = ('network_id', 'enable_snat', 'external_fixed_ips') class TestRouter(base.BaseFunctionalTest): def setUp(self): super(TestRouter, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud required for this test") if not self.operator_cloud.has_service('network'): self.skipTest('Network service not supported by cloud') diff --git a/openstack/tests/functional/cloud/test_security_groups.py b/openstack/tests/functional/cloud/test_security_groups.py index 1a249da77..54ceda6fc 100644 --- a/openstack/tests/functional/cloud/test_security_groups.py +++ b/openstack/tests/functional/cloud/test_security_groups.py @@ -22,6 +22,22 @@ from openstack.tests.functional import base class TestSecurityGroups(base.BaseFunctionalTest): def test_create_list_security_groups(self): + sg1 = self.user_cloud.create_security_group( + name="sg1", description="sg1") + self.addCleanup(self.user_cloud.delete_security_group, sg1['id']) + if self.user_cloud.has_service('network'): + # Neutron defaults to all_tenants=1 when admin + sg_list = self.user_cloud.list_security_groups() + self.assertIn(sg1['id'], [sg['id'] for sg in sg_list]) + + else: + # Nova does not list all tenants by default + sg_list = self.operator_cloud.list_security_groups() + + def test_create_list_security_groups_operator(self): + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + sg1 = self.user_cloud.create_security_group( name="sg1", description="sg1") self.addCleanup(self.user_cloud.delete_security_group, sg1['id']) diff --git a/openstack/tests/functional/cloud/test_services.py b/openstack/tests/functional/cloud/test_services.py index 8e1036e33..70c85d58d 100644 --- a/openstack/tests/functional/cloud/test_services.py +++ b/openstack/tests/functional/cloud/test_services.py @@ -33,6 +33,8 @@ class TestServices(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestServices, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") # Generate a random name for services in this test self.new_service_name = 'test_' + ''.join( diff --git a/openstack/tests/functional/cloud/test_users.py b/openstack/tests/functional/cloud/test_users.py index f69b4c265..3dc94a220 100644 --- a/openstack/tests/functional/cloud/test_users.py +++ b/openstack/tests/functional/cloud/test_users.py @@ -24,6 +24,9 @@ from openstack.tests.functional import base class TestUsers(base.KeystoneBaseFunctionalTest): def setUp(self): super(TestUsers, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") + self.user_prefix = self.getUniqueString('user') self.addCleanup(self._cleanup_users) diff --git a/openstack/tests/functional/cloud/test_volume_type.py b/openstack/tests/functional/cloud/test_volume_type.py index 1e7821e4d..d71ee014c 100644 --- a/openstack/tests/functional/cloud/test_volume_type.py +++ b/openstack/tests/functional/cloud/test_volume_type.py @@ -33,6 +33,8 @@ class TestVolumeType(base.BaseFunctionalTest): def setUp(self): super(TestVolumeType, self).setUp() + if not self.operator_cloud: + self.skipTest("Operator cloud is required for this test") if not self.user_cloud.has_service('volume'): self.skipTest('volume service not supported by cloud') volume_type = { diff --git a/openstack/tests/unit/cloud/test_cluster_templates.py b/openstack/tests/unit/cloud/test_cluster_templates.py index 13f02e612..15e41d836 100644 --- a/openstack/tests/unit/cloud/test_cluster_templates.py +++ b/openstack/tests/unit/cloud/test_cluster_templates.py @@ -13,7 +13,7 @@ import munch import testtools -import openstack.cloud +from openstack import exceptions from openstack.tests.unit import base @@ -198,7 +198,7 @@ class TestClusterTemplates(base.TestCase): # match the more specific HTTPError, even though it's a subclass # of OpenStackCloudException. with testtools.ExpectedException( - openstack.cloud.OpenStackCloudHTTPError): + exceptions.ForbiddenException): self.cloud.create_cluster_template('fake-cluster-template') self.assert_calls() diff --git a/openstack/tests/unit/cloud/test_identity_roles.py b/openstack/tests/unit/cloud/test_identity_roles.py index 517c54051..045552325 100644 --- a/openstack/tests/unit/cloud/test_identity_roles.py +++ b/openstack/tests/unit/cloud/test_identity_roles.py @@ -14,7 +14,7 @@ import testtools from testtools import matchers -import openstack.cloud +from openstack import exceptions from openstack.tests.unit import base @@ -248,7 +248,7 @@ class TestIdentityRoles(base.TestCase): status_code=403) ]) with testtools.ExpectedException( - openstack.cloud.exc.OpenStackCloudHTTPError + exceptions.ForbiddenException ): self.cloud.list_role_assignments() self.assert_calls() diff --git a/openstack/tests/unit/cloud/test_quotas.py b/openstack/tests/unit/cloud/test_quotas.py index 8eec55af0..00da3985d 100644 --- a/openstack/tests/unit/cloud/test_quotas.py +++ b/openstack/tests/unit/cloud/test_quotas.py @@ -212,7 +212,7 @@ class TestQuotas(base.TestCase): 'port': 500 } project = self.mock_for_keystone_projects(project_count=1, - list_get=True)[0] + id_get=True)[0] self.register_uris([ dict(method='GET', uri=self.get_mock_url( @@ -272,7 +272,7 @@ class TestQuotas(base.TestCase): 'reserved': 0} } project = self.mock_for_keystone_projects(project_count=1, - list_get=True)[0] + id_get=True)[0] self.register_uris([ dict(method='GET', uri=self.get_mock_url(