Whitelist cloud functional tests in acceptance

Next bunch of functional tests adapted to be included in the acceptance
tests.

Inclusion of tests uncovered issue in getting quota related to the fact
that not every user is capable to find/list projects. This is fixed by
introduction of ForbiddenException, skipping it in the _find call and
using current_project_id.

Change-Id: I53e718de239de7fb6f0347ff995282da079c68f3
This commit is contained in:
Artem Goncharov 2022-09-30 19:49:00 +02:00
parent 3159933778
commit ae22f1532d
35 changed files with 238 additions and 126 deletions

View File

@ -1,8 +1,12 @@
# This file contains list of tests that can work with regular user privileges
# Until all tests are modified to properly identify whether they are able to
# run or must skip the ones that are known to work are listed here.
openstack.tests.functional.network
### Block Storage
openstack.tests.functional.block_storage.v3.test_volume
# Do not enable test_backup for now, since it is not capable to determine
# backup capabilities of the cloud
# openstack.tests.functional.block_storage.v3.test_backup
### Cloud
openstack.tests.functional.cloud
### Network
openstack.tests.functional.network

View File

@ -1,3 +1,4 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@ -663,9 +664,8 @@ class NetworkCloudMixin:
:raises: OpenStackCloudException if it's not a valid project
:returns: A network ``Quota`` object if found, else None.
"""
proj = self.get_project(name_or_id)
if not proj:
raise exc.OpenStackCloudException("project does not exist")
proj = self.identity.find_project(
name_or_id, ignore_missing=False)
return self.network.get_quota(proj.id, details)
def get_network_extensions(self):

View File

@ -112,6 +112,11 @@ class BadRequestException(HttpException):
pass
class ForbiddenException(HttpException):
"""HTTP 403 Forbidden Request."""
pass
class ConflictException(HttpException):
"""HTTP 409 Conflict."""
pass
@ -187,12 +192,14 @@ def raise_from_response(response, error_message=None):
if response.status_code < 400:
return
if response.status_code == 409:
cls = ConflictException
if response.status_code == 400:
cls = BadRequestException
elif response.status_code == 403:
cls = ForbiddenException
elif response.status_code == 404:
cls = NotFoundException
elif response.status_code == 400:
cls = BadRequestException
elif response.status_code == 409:
cls = ConflictException
elif response.status_code == 412:
cls = PreconditionFailedException
else:

View File

@ -2226,7 +2226,8 @@ class Resource(dict):
id=name_or_id, connection=session._get_connection(), **params
)
return match.fetch(session, microversion=microversion, **params)
except (exceptions.NotFoundException, exceptions.BadRequestException):
except (exceptions.NotFoundException, exceptions.BadRequestException,
exceptions.ForbiddenException):
# NOTE(gtema): There are few places around openstack that return
# 400 if we try to GET resource and it doesn't exist.
pass

View File

@ -88,6 +88,8 @@ class BaseFunctionalTest(base.TestCase):
cloud=self._demo_name_alt, **kwargs)
self.user_cloud_alt = connection.Connection(config=user_config_alt)
_disable_keep_alive(self.user_cloud_alt)
else:
self.user_cloud_alt = None
def _set_operator_cloud(self, **kwargs):
operator_config = self.config.get_one(cloud=self._op_name, **kwargs)

View File

@ -23,6 +23,8 @@ from openstack.tests.functional import base
class TestAggregate(base.BaseFunctionalTest):
def test_aggregates(self):
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
aggregate_name = self.getUniqueString()
availability_zone = self.getUniqueString()
self.addCleanup(self.cleanup, aggregate_name)

View File

@ -150,6 +150,8 @@ class TestCompute(base.BaseFunctionalTest):
self.assertTrue(srv is None or srv.status.lower() == 'deleted')
def test_list_all_servers(self):
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
server = self.user_cloud.create_server(
name=self.server_name,
@ -474,6 +476,9 @@ class TestCompute(base.BaseFunctionalTest):
def test_get_compute_usage(self):
'''Test usage functionality'''
# Add a server so that we can know we have usage
if not self.operator_cloud:
# TODO(gtema) rework method not to require getting project
self.skipTest("Operator cloud is required for this test")
self.addCleanup(self._cleanup_servers_and_volumes, self.server_name)
self.user_cloud.create_server(
name=self.server_name,

View File

@ -43,13 +43,3 @@ class TestDevstack(base.BaseFunctionalTest):
if os.environ.get(
'OPENSTACKSDK_HAS_{env}'.format(env=self.env), '0') == '1':
self.assertTrue(self.user_cloud.has_service(self.service))
class TestKeystoneVersion(base.BaseFunctionalTest):
def test_keystone_version(self):
use_keystone_v2 = os.environ.get('OPENSTACKSDK_USE_KEYSTONE_V2', False)
if use_keystone_v2 and use_keystone_v2 != '0':
self.assertEqual('2.0', self.identity_version)
else:
self.assertEqual('3', self.identity_version)

View File

@ -25,6 +25,8 @@ class TestDomain(base.BaseFunctionalTest):
def setUp(self):
super(TestDomain, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
i_ver = self.operator_cloud.config.get_api_version('identity')
if i_ver in ('2', '2.0'):
self.skipTest('Identity service does not support domains')

View File

@ -34,6 +34,8 @@ class TestEndpoints(base.KeystoneBaseFunctionalTest):
def setUp(self):
super(TestEndpoints, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
# Generate a random name for services and regions in this test
self.new_item_name = 'test_' + ''.join(

View File

@ -35,13 +35,15 @@ class TestFlavor(base.BaseFunctionalTest):
def _cleanup_flavors(self):
exception_list = list()
for f in self.operator_cloud.list_flavors(get_extra=False):
if f['name'].startswith(self.new_item_name):
try:
self.operator_cloud.delete_flavor(f['id'])
except Exception as e:
# We were unable to delete a flavor, let's try with next
exception_list.append(str(e))
if self.operator_cloud:
for f in self.operator_cloud.list_flavors(get_extra=False):
if f['name'].startswith(self.new_item_name):
try:
self.operator_cloud.delete_flavor(f['id'])
except Exception as e:
# We were unable to delete a flavor, let's try with
# next
exception_list.append(str(e))
continue
if exception_list:
# Raise an error: we must make users aware that something went
@ -49,6 +51,9 @@ class TestFlavor(base.BaseFunctionalTest):
raise OpenStackCloudException('\n'.join(exception_list))
def test_create_flavor(self):
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
flavor_name = self.new_item_name + '_create'
flavor_kwargs = dict(
name=flavor_name, ram=1024, vcpus=2, disk=10, ephemeral=5,
@ -85,24 +90,31 @@ class TestFlavor(base.BaseFunctionalTest):
name=priv_flavor_name, ram=1024, vcpus=2, disk=10, is_public=False
)
# Create a public and private flavor. We expect both to be listed
# for an operator.
self.operator_cloud.create_flavor(**public_kwargs)
self.operator_cloud.create_flavor(**private_kwargs)
if self.operator_cloud:
# Create a public and private flavor. We expect both to be listed
# for an operator.
self.operator_cloud.create_flavor(**public_kwargs)
self.operator_cloud.create_flavor(**private_kwargs)
flavors = self.operator_cloud.list_flavors(get_extra=False)
flavors = self.operator_cloud.list_flavors(get_extra=False)
# Flavor list will include the standard devstack flavors. We just want
# to make sure both of the flavors we just created are present.
found = []
for f in flavors:
# extra_specs should be added within list_flavors()
self.assertIn('extra_specs', f)
if f['name'] in (pub_flavor_name, priv_flavor_name):
found.append(f)
self.assertEqual(2, len(found))
# Flavor list will include the standard devstack flavors. We just
# want to make sure both of the flavors we just created are
# present.
found = []
for f in flavors:
# extra_specs should be added within list_flavors()
self.assertIn('extra_specs', f)
if f['name'] in (pub_flavor_name, priv_flavor_name):
found.append(f)
self.assertEqual(2, len(found))
else:
self.user_cloud.list_flavors()
def test_flavor_access(self):
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
priv_flavor_name = self.new_item_name + '_private'
private_kwargs = dict(
name=priv_flavor_name, ram=1024, vcpus=2, disk=10, is_public=False
@ -141,6 +153,9 @@ class TestFlavor(base.BaseFunctionalTest):
"""
Test setting and unsetting flavor extra specs
"""
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
flavor_name = self.new_item_name + '_spec_test'
kwargs = dict(
name=flavor_name, ram=1024, vcpus=2, disk=10

View File

@ -62,7 +62,7 @@ class TestFloatingIP(base.BaseFunctionalTest):
r, subnet_id=s['id'])
except Exception:
pass
self.user_cloud.delete_router(r)
self.user_cloud.delete_router(r.id)
except Exception as e:
exception_list.append(e)
tb_list.append(sys.exc_info()[2])
@ -71,7 +71,7 @@ class TestFloatingIP(base.BaseFunctionalTest):
for s in self.user_cloud.list_subnets():
if s['name'].startswith(self.new_item_name):
try:
self.user_cloud.delete_subnet(s)
self.user_cloud.delete_subnet(s.id)
except Exception as e:
exception_list.append(e)
tb_list.append(sys.exc_info()[2])
@ -80,7 +80,7 @@ class TestFloatingIP(base.BaseFunctionalTest):
for n in self.user_cloud.list_networks():
if n['name'].startswith(self.new_item_name):
try:
self.user_cloud.delete_network(n)
self.user_cloud.delete_network(n.id)
except Exception as e:
exception_list.append(e)
tb_list.append(sys.exc_info()[2])
@ -104,7 +104,7 @@ class TestFloatingIP(base.BaseFunctionalTest):
for i in self.user_cloud.list_servers(bare=True):
if i.name.startswith(self.new_item_name):
try:
self.user_cloud.delete_server(i, wait=True)
self.user_cloud.delete_server(i.id, wait=True)
except Exception as e:
exception_list.append(str(e))
continue
@ -124,7 +124,7 @@ class TestFloatingIP(base.BaseFunctionalTest):
if (ip.get('fixed_ip', None) == fixed_ip
or ip.get('fixed_ip_address', None) == fixed_ip):
try:
self.user_cloud.delete_floating_ip(ip)
self.user_cloud.delete_floating_ip(ip.id)
except Exception as e:
exception_list.append(str(e))
continue
@ -235,38 +235,50 @@ class TestFloatingIP(base.BaseFunctionalTest):
server_id=new_server.id, floating_ip_id=f_ip['id'])
def test_list_floating_ips(self):
fip_admin = self.operator_cloud.create_floating_ip()
self.addCleanup(self.operator_cloud.delete_floating_ip, fip_admin.id)
if self.operator_cloud:
fip_admin = self.operator_cloud.create_floating_ip()
self.addCleanup(
self.operator_cloud.delete_floating_ip, fip_admin.id)
fip_user = self.user_cloud.create_floating_ip()
self.addCleanup(self.user_cloud.delete_floating_ip, fip_user.id)
# Get all the floating ips.
fip_id_list = [
fip.id for fip in self.operator_cloud.list_floating_ips()
if self.operator_cloud:
fip_op_id_list = [
fip.id for fip in self.operator_cloud.list_floating_ips()
]
fip_user_id_list = [
fip.id for fip in self.user_cloud.list_floating_ips()
]
if self.user_cloud.has_service('network'):
self.assertIn(fip_user.id, fip_user_id_list)
# Neutron returns all FIP for all projects by default
self.assertIn(fip_admin.id, fip_id_list)
self.assertIn(fip_user.id, fip_id_list)
if self.operator_cloud and fip_admin:
self.assertIn(fip_user.id, fip_op_id_list)
# Ask Neutron for only a subset of all the FIPs.
filtered_fip_id_list = [
fip.id for fip in self.operator_cloud.list_floating_ips(
{'tenant_id': self.user_cloud.current_project_id}
)
]
self.assertNotIn(fip_admin.id, filtered_fip_id_list)
self.assertIn(fip_user.id, filtered_fip_id_list)
if self.operator_cloud:
filtered_fip_id_list = [
fip.id for fip in self.operator_cloud.list_floating_ips(
{'tenant_id': self.user_cloud.current_project_id}
)
]
self.assertNotIn(fip_admin.id, filtered_fip_id_list)
self.assertIn(fip_user.id, filtered_fip_id_list)
else:
self.assertIn(fip_admin.id, fip_id_list)
if fip_admin:
self.assertIn(fip_admin.id, fip_op_id_list)
# By default, Nova returns only the FIPs that belong to the
# project which made the listing request.
self.assertNotIn(fip_user.id, fip_id_list)
self.assertRaisesRegex(
ValueError, "Nova-network don't support server-side.*",
self.operator_cloud.list_floating_ips, filters={'foo': 'bar'}
)
if self.operator_cloud:
self.assertNotIn(fip_user.id, fip_op_id_list)
self.assertRaisesRegex(
ValueError, "Nova-network don't support server-side.*",
self.operator_cloud.list_floating_ips,
filters={'foo': 'bar'}
)
def test_search_floating_ips(self):
fip_user = self.user_cloud.create_floating_ip()

View File

@ -25,6 +25,9 @@ class TestGroup(base.BaseFunctionalTest):
def setUp(self):
super(TestGroup, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
i_ver = self.operator_cloud.config.get_api_version('identity')
if i_ver in ('2', '2.0'):
self.skipTest('Identity service does not support groups')

View File

@ -27,6 +27,8 @@ from openstack.tests.functional import base
class TestIdentity(base.KeystoneBaseFunctionalTest):
def setUp(self):
super(TestIdentity, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
self.role_prefix = 'test_role' + ''.join(
random.choice(string.ascii_lowercase) for _ in range(5))
self.user_prefix = self.getUniqueString('user')

View File

@ -26,6 +26,9 @@ from openstack.tests.functional import base
class TestInventory(base.BaseFunctionalTest):
def setUp(self):
super().setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
# This needs to use an admin account, otherwise a public IP
# is not allocated from devstack.
self.inventory = inventory.OpenStackInventory(cloud='devstack-admin')

View File

@ -33,6 +33,9 @@ class TestUsage(base.BaseFunctionalTest):
def test_get_other_compute_limits(self):
'''Test quotas functionality'''
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
limits = self.operator_cloud.get_compute_limits('demo')
self.assertIsNotNone(limits)
self.assertTrue(hasattr(limits, 'server_meta'))
@ -48,5 +51,8 @@ class TestUsage(base.BaseFunctionalTest):
def test_get_other_volume_limits(self):
'''Test quotas functionality'''
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
limits = self.operator_cloud.get_volume_limits('demo')
self.assertFalse(hasattr(limits, 'maxTotalVolumes'))

View File

@ -24,7 +24,7 @@ class TestMagnumServices(base.BaseFunctionalTest):
def setUp(self):
super(TestMagnumServices, self).setUp()
if not self.operator_cloud.has_service(
if not self.user_cloud.has_service(
'container-infrastructure-management'
):
self.skipTest('Container service not supported by cloud')
@ -33,7 +33,7 @@ class TestMagnumServices(base.BaseFunctionalTest):
'''Test magnum services functionality'''
# Test that we can list services
services = self.operator_cloud.list_magnum_services()
services = self.user_cloud.list_magnum_services()
self.assertEqual(1, len(services))
self.assertEqual(services[0]['id'], 1)

View File

@ -24,6 +24,9 @@ from openstack.tests.functional import base
class TestNetwork(base.BaseFunctionalTest):
def setUp(self):
super(TestNetwork, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud.has_service('network'):
self.skipTest('Network service not supported by cloud')
self.network_name = self.getUniqueString('network')

View File

@ -40,10 +40,12 @@ class TestObject(base.BaseFunctionalTest):
self.addDetail('container', content.text_content(container_name))
self.addCleanup(self.user_cloud.delete_container, container_name)
self.user_cloud.create_container(container_name)
self.assertEqual(container_name,
self.user_cloud.list_containers()[0]['name'])
self.assertEqual([],
self.user_cloud.list_containers(prefix='somethin'))
container = self.user_cloud.get_container(container_name)
self.assertEqual(
container_name, container.name)
self.assertEqual(
[],
self.user_cloud.list_containers(prefix='somethin'))
sizes = (
(64 * 1024, 1), # 64K, one segment
(64 * 1024, 5) # 64MB, 5 segments
@ -99,8 +101,9 @@ class TestObject(base.BaseFunctionalTest):
self.assertTrue(
self.user_cloud.delete_object(container_name, name))
self.assertEqual([], self.user_cloud.list_objects(container_name))
self.assertEqual(container_name,
self.user_cloud.list_containers()[0]['name'])
self.assertEqual(
container_name,
self.user_cloud.get_container(container_name).name)
self.user_cloud.delete_container(container_name)
def test_download_object_to_file(self):

View File

@ -31,9 +31,13 @@ class TestPort(base.BaseFunctionalTest):
def setUp(self):
super(TestPort, self).setUp()
# Skip Neutron tests if neutron is not present
if not self.operator_cloud.has_service('network'):
if not self.user_cloud.has_service('network'):
self.skipTest('Network service not supported by cloud')
net_name = self.getUniqueString('CloudPortName')
self.net = self.user_cloud.network.create_network(name=net_name)
self.addCleanup(self.user_cloud.network.delete_network, self.net.id)
# Generate a unique port name to allow concurrent tests
self.new_port_name = 'test_' + ''.join(
random.choice(string.ascii_lowercase) for _ in range(5))
@ -43,10 +47,10 @@ class TestPort(base.BaseFunctionalTest):
def _cleanup_ports(self):
exception_list = list()
for p in self.operator_cloud.list_ports():
for p in self.user_cloud.list_ports():
if p['name'].startswith(self.new_port_name):
try:
self.operator_cloud.delete_port(name_or_id=p['id'])
self.user_cloud.delete_port(name_or_id=p['id'])
except Exception as e:
# We were unable to delete this port, let's try with next
exception_list.append(str(e))
@ -60,12 +64,8 @@ class TestPort(base.BaseFunctionalTest):
def test_create_port(self):
port_name = self.new_port_name + '_create'
networks = self.operator_cloud.list_networks()
if not networks:
self.assertFalse('no sensible network available')
port = self.operator_cloud.create_port(
network_id=networks[0]['id'], name=port_name)
port = self.user_cloud.create_port(
network_id=self.net.id, name=port_name)
self.assertIsInstance(port, dict)
self.assertIn('id', port)
self.assertEqual(port.get('name'), port_name)
@ -73,17 +73,13 @@ class TestPort(base.BaseFunctionalTest):
def test_get_port(self):
port_name = self.new_port_name + '_get'
networks = self.operator_cloud.list_networks()
if not networks:
self.assertFalse('no sensible network available')
port = self.operator_cloud.create_port(
network_id=networks[0]['id'], name=port_name)
port = self.user_cloud.create_port(
network_id=self.net.id, name=port_name)
self.assertIsInstance(port, dict)
self.assertIn('id', port)
self.assertEqual(port.get('name'), port_name)
updated_port = self.operator_cloud.get_port(name_or_id=port['id'])
updated_port = self.user_cloud.get_port(name_or_id=port['id'])
# extra_dhcp_opts is added later by Neutron...
if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port:
del updated_port['extra_dhcp_opts']
@ -92,17 +88,13 @@ class TestPort(base.BaseFunctionalTest):
def test_get_port_by_id(self):
port_name = self.new_port_name + '_get_by_id'
networks = self.operator_cloud.list_networks()
if not networks:
self.assertFalse('no sensible network available')
port = self.operator_cloud.create_port(
network_id=networks[0]['id'], name=port_name)
port = self.user_cloud.create_port(
network_id=self.net.id, name=port_name)
self.assertIsInstance(port, dict)
self.assertIn('id', port)
self.assertEqual(port.get('name'), port_name)
updated_port = self.operator_cloud.get_port_by_id(port['id'])
updated_port = self.user_cloud.get_port_by_id(port['id'])
# extra_dhcp_opts is added later by Neutron...
if 'extra_dhcp_opts' in updated_port and 'extra_dhcp_opts' not in port:
del updated_port['extra_dhcp_opts']
@ -112,19 +104,15 @@ class TestPort(base.BaseFunctionalTest):
port_name = self.new_port_name + '_update'
new_port_name = port_name + '_new'
networks = self.operator_cloud.list_networks()
if not networks:
self.assertFalse('no sensible network available')
self.user_cloud.create_port(
network_id=self.net.id, name=port_name)
self.operator_cloud.create_port(
network_id=networks[0]['id'], name=port_name)
port = self.operator_cloud.update_port(
port = self.user_cloud.update_port(
name_or_id=port_name, name=new_port_name)
self.assertIsInstance(port, dict)
self.assertEqual(port.get('name'), new_port_name)
updated_port = self.operator_cloud.get_port(name_or_id=port['id'])
updated_port = self.user_cloud.get_port(name_or_id=port['id'])
self.assertEqual(port.get('name'), new_port_name)
port.pop('revision_number', None)
port.pop(u'revision_number', None)
@ -140,20 +128,16 @@ class TestPort(base.BaseFunctionalTest):
def test_delete_port(self):
port_name = self.new_port_name + '_delete'
networks = self.operator_cloud.list_networks()
if not networks:
self.assertFalse('no sensible network available')
port = self.operator_cloud.create_port(
network_id=networks[0]['id'], name=port_name)
port = self.user_cloud.create_port(
network_id=self.net.id, name=port_name)
self.assertIsInstance(port, dict)
self.assertIn('id', port)
self.assertEqual(port.get('name'), port_name)
updated_port = self.operator_cloud.get_port(name_or_id=port['id'])
updated_port = self.user_cloud.get_port(name_or_id=port['id'])
self.assertIsNotNone(updated_port)
self.operator_cloud.delete_port(name_or_id=port_name)
self.user_cloud.delete_port(name_or_id=port_name)
updated_port = self.operator_cloud.get_port(name_or_id=port['id'])
updated_port = self.user_cloud.get_port(name_or_id=port['id'])
self.assertIsNone(updated_port)

View File

@ -28,6 +28,9 @@ class TestProject(base.KeystoneBaseFunctionalTest):
def setUp(self):
super(TestProject, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
self.new_project_name = self.getUniqueString('project')
self.addCleanup(self._cleanup_projects)

View File

@ -27,6 +27,9 @@ class TestProjectCleanup(base.BaseFunctionalTest):
def setUp(self):
super(TestProjectCleanup, self).setUp()
if not self.user_cloud_alt:
self.skipTest("Alternate demo cloud is required for this test")
self.conn = self.user_cloud_alt
self.network_name = self.getUniqueString('network')

View File

@ -25,6 +25,8 @@ from openstack.tests.functional import base
class TestQosBandwidthLimitRule(base.BaseFunctionalTest):
def setUp(self):
super(TestQosBandwidthLimitRule, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud.has_service('network'):
self.skipTest('Network service not supported by cloud')
if not self.operator_cloud._has_neutron_extension('qos'):

View File

@ -25,6 +25,8 @@ from openstack.tests.functional import base
class TestQosDscpMarkingRule(base.BaseFunctionalTest):
def setUp(self):
super(TestQosDscpMarkingRule, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud.has_service('network'):
self.skipTest('Network service not supported by cloud')
if not self.operator_cloud._has_neutron_extension('qos'):

View File

@ -25,6 +25,8 @@ from openstack.tests.functional import base
class TestQosMinimumBandwidthRule(base.BaseFunctionalTest):
def setUp(self):
super(TestQosMinimumBandwidthRule, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud.has_service('network'):
self.skipTest('Network service not supported by cloud')
if not self.operator_cloud._has_neutron_extension('qos'):

View File

@ -25,6 +25,8 @@ from openstack.tests.functional import base
class TestQosPolicy(base.BaseFunctionalTest):
def setUp(self):
super(TestQosPolicy, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud.has_service('network'):
self.skipTest('Network service not supported by cloud')
if not self.operator_cloud._has_neutron_extension('qos'):

View File

@ -22,8 +22,16 @@ from openstack.tests.functional import base
class TestComputeQuotas(base.BaseFunctionalTest):
def test_quotas(self):
def test_get_quotas(self):
'''Test quotas functionality'''
self.user_cloud.get_compute_quotas(
self.user_cloud.current_project_id)
def test_set_quotas(self):
'''Test quotas functionality'''
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
quotas = self.operator_cloud.get_compute_quotas('demo')
cores = quotas['cores']
self.operator_cloud.set_compute_quotas('demo', cores=cores + 1)
@ -39,11 +47,20 @@ class TestVolumeQuotas(base.BaseFunctionalTest):
def setUp(self):
super(TestVolumeQuotas, self).setUp()
if not self.operator_cloud.has_service('volume'):
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
def test_quotas(self):
'''Test quotas functionality'''
def test_get_quotas(self):
'''Test get quotas functionality'''
self.user_cloud.get_volume_quotas(
self.user_cloud.current_project_id
)
def test_set_quotas(self):
'''Test set quotas functionality'''
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
quotas = self.operator_cloud.get_volume_quotas('demo')
volumes = quotas['volumes']
self.operator_cloud.set_volume_quotas('demo', volumes=volumes + 1)
@ -58,13 +75,18 @@ class TestVolumeQuotas(base.BaseFunctionalTest):
class TestNetworkQuotas(base.BaseFunctionalTest):
def setUp(self):
super(TestNetworkQuotas, self).setUp()
if not self.operator_cloud.has_service('network'):
self.skipTest('network service not supported by cloud')
def test_get_quotas(self):
'''Test get quotas functionality'''
self.user_cloud.get_network_quotas(
self.user_cloud.current_project_id)
def test_quotas(self):
'''Test quotas functionality'''
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud.has_service('network'):
self.skipTest('network service not supported by cloud')
quotas = self.operator_cloud.get_network_quotas('demo')
network = quotas['networks']
self.operator_cloud.set_network_quotas('demo', networks=network + 1)
@ -77,6 +99,11 @@ class TestNetworkQuotas(base.BaseFunctionalTest):
self.operator_cloud.get_network_quotas('demo')['networks'])
def test_get_quotas_details(self):
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.operator_cloud.has_service('network'):
self.skipTest('network service not supported by cloud')
quotas = [
'floating_ips', 'networks', 'ports',
'rbac_policies', 'routers', 'subnets',

View File

@ -34,6 +34,8 @@ EXPECTED_GW_INFO_FIELDS = ('network_id', 'enable_snat', 'external_fixed_ips')
class TestRouter(base.BaseFunctionalTest):
def setUp(self):
super(TestRouter, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud required for this test")
if not self.operator_cloud.has_service('network'):
self.skipTest('Network service not supported by cloud')

View File

@ -22,6 +22,22 @@ from openstack.tests.functional import base
class TestSecurityGroups(base.BaseFunctionalTest):
def test_create_list_security_groups(self):
sg1 = self.user_cloud.create_security_group(
name="sg1", description="sg1")
self.addCleanup(self.user_cloud.delete_security_group, sg1['id'])
if self.user_cloud.has_service('network'):
# Neutron defaults to all_tenants=1 when admin
sg_list = self.user_cloud.list_security_groups()
self.assertIn(sg1['id'], [sg['id'] for sg in sg_list])
else:
# Nova does not list all tenants by default
sg_list = self.operator_cloud.list_security_groups()
def test_create_list_security_groups_operator(self):
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
sg1 = self.user_cloud.create_security_group(
name="sg1", description="sg1")
self.addCleanup(self.user_cloud.delete_security_group, sg1['id'])

View File

@ -33,6 +33,8 @@ class TestServices(base.KeystoneBaseFunctionalTest):
def setUp(self):
super(TestServices, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
# Generate a random name for services in this test
self.new_service_name = 'test_' + ''.join(

View File

@ -24,6 +24,9 @@ from openstack.tests.functional import base
class TestUsers(base.KeystoneBaseFunctionalTest):
def setUp(self):
super(TestUsers, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
self.user_prefix = self.getUniqueString('user')
self.addCleanup(self._cleanup_users)

View File

@ -33,6 +33,8 @@ class TestVolumeType(base.BaseFunctionalTest):
def setUp(self):
super(TestVolumeType, self).setUp()
if not self.operator_cloud:
self.skipTest("Operator cloud is required for this test")
if not self.user_cloud.has_service('volume'):
self.skipTest('volume service not supported by cloud')
volume_type = {

View File

@ -13,7 +13,7 @@
import munch
import testtools
import openstack.cloud
from openstack import exceptions
from openstack.tests.unit import base
@ -198,7 +198,7 @@ class TestClusterTemplates(base.TestCase):
# match the more specific HTTPError, even though it's a subclass
# of OpenStackCloudException.
with testtools.ExpectedException(
openstack.cloud.OpenStackCloudHTTPError):
exceptions.ForbiddenException):
self.cloud.create_cluster_template('fake-cluster-template')
self.assert_calls()

View File

@ -14,7 +14,7 @@
import testtools
from testtools import matchers
import openstack.cloud
from openstack import exceptions
from openstack.tests.unit import base
@ -248,7 +248,7 @@ class TestIdentityRoles(base.TestCase):
status_code=403)
])
with testtools.ExpectedException(
openstack.cloud.exc.OpenStackCloudHTTPError
exceptions.ForbiddenException
):
self.cloud.list_role_assignments()
self.assert_calls()

View File

@ -212,7 +212,7 @@ class TestQuotas(base.TestCase):
'port': 500
}
project = self.mock_for_keystone_projects(project_count=1,
list_get=True)[0]
id_get=True)[0]
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(
@ -272,7 +272,7 @@ class TestQuotas(base.TestCase):
'reserved': 0}
}
project = self.mock_for_keystone_projects(project_count=1,
list_get=True)[0]
id_get=True)[0]
self.register_uris([
dict(method='GET',
uri=self.get_mock_url(