Use reader role for network tests

OpenStack services have implemented the 'reader' role in their
RBAC policy by default. But the Tempest tests for the reader role
APIs are calling these APIs with an admin or member user.

This patch update all the test files in tempest/api/network to use
the reader role for the GET requests (list, show, get operations).
This ensures proper RBAC testing by using the project_reader
credentials for read operations while maintaining the primary
credentials for write operations.

Changes include:
- Added credentials = ['primary', 'project_reader'] to test classes
- Set up reader_client in setup_clients method
- Replaced all GET requests to use reader_client instead of regular client
- Updated files: test_agent_management_negative.py,
  test_allowed_address_pair.py, test_dhcp_ipv6.py, test_extensions.py,
  test_extra_dhcp_options.py, test_floating_ips.py, test_networks.py,
  test_networks_negative.py, test_ports.py, test_routers.py,
  test_routers_negative.py, test_security_groups.py,
  test_security_groups_negative.py, test_service_providers.py,
  test_subnetpools_extensions.py, test_tags.py, test_versions.py

Implements-blueprint: test-neutron-with-srbac-defaults
Change-Id: I507358721afd4eca550e8b21b61ede1398248fc5
Signed-off-by: Manpreet Kaur <kaurmanpreet2620@gmail.com>
This commit is contained in:
Manpreet Kaur
2026-01-11 18:33:26 +00:00
parent 1b54854f98
commit 9e36fa64f9
17 changed files with 343 additions and 77 deletions

View File

@@ -14,15 +14,28 @@
# under the License.
from tempest.api.network import base
from tempest import config
from tempest.lib import decorators
CONF = config.CONF
class AgentManagementNegativeTest(base.BaseNetworkTest):
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(AgentManagementNegativeTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.network_agents_client
else:
cls.reader_client = cls.agents_client
@decorators.idempotent_id('e335be47-b9a1-46fd-be30-0874c0b751e6')
@decorators.attr(type=['negative'])
def test_list_agents_non_admin(self):
"""Validate that non-admin user cannot list agents."""
# Listing agents requires admin_only permissions.
body = self.agents_client.list_agents()
body = self.reader_client.list_agents()
self.assertEmpty(body["agents"])

View File

@@ -39,6 +39,7 @@ class AllowedAddressPairTestJSON(base.BaseNetworkTest):
api_extensions
"""
credentials = ['primary', 'project_reader']
@classmethod
def skip_checks(cls):
@@ -47,6 +48,14 @@ class AllowedAddressPairTestJSON(base.BaseNetworkTest):
msg = "Allowed Address Pairs extension not enabled."
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(AllowedAddressPairTestJSON, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.ports_client
else:
cls.reader_client = cls.ports_client
@classmethod
def resource_setup(cls):
super(AllowedAddressPairTestJSON, cls).resource_setup()
@@ -73,7 +82,7 @@ class AllowedAddressPairTestJSON(base.BaseNetworkTest):
self.ports_client.delete_port, port_id)
# Confirm port was created with allowed address pair attribute
body = self.ports_client.list_ports()
body = self.reader_client.list_ports()
ports = body['ports']
port = [p for p in ports if p['id'] == port_id]
msg = 'Created port not found in list of ports returned by Neutron'

View File

@@ -41,6 +41,8 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
addressing in subnets with router
"""
credentials = ['primary', 'project_reader']
@classmethod
def skip_checks(cls):
super(NetworksTestDHCPv6, cls).skip_checks()
@@ -52,6 +54,18 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
if msg:
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(NetworksTestDHCPv6, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_ports_client = cls.os_project_reader.ports_client
cls.reader_subnets_client = cls.os_project_reader.subnets_client
cls.reader_routers_client = cls.os_project_reader.routers_client
else:
cls.reader_ports_client = cls.ports_client
cls.reader_subnets_client = cls.subnets_client
cls.reader_routers_client = cls.routers_client
@classmethod
def resource_setup(cls):
super(NetworksTestDHCPv6, cls).resource_setup()
@@ -67,7 +81,7 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
del things_list[index]
def _clean_network(self):
body = self.ports_client.list_ports()
body = self.reader_ports_client.list_ports()
ports = body['ports']
for port in ports:
if (net_info.is_router_interface_port(port) and
@@ -78,13 +92,13 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
if port['id'] in [p['id'] for p in self.ports]:
self.ports_client.delete_port(port['id'])
self._remove_from_list_by_index(self.ports, port)
body = self.subnets_client.list_subnets()
body = self.reader_subnets_client.list_subnets()
subnets = body['subnets']
for subnet in subnets:
if subnet['id'] in [s['id'] for s in self.subnets]:
self.subnets_client.delete_subnet(subnet['id'])
self._remove_from_list_by_index(self.subnets, subnet)
body = self.routers_client.list_routers()
body = self.reader_routers_client.list_routers()
routers = body['routers']
for router in routers:
if router['id'] in [r['id'] for r in self.routers]:
@@ -221,7 +235,7 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
subnet_slaac]]
self.ports_client.delete_port(port['id'])
self.ports.pop()
body = self.ports_client.list_ports()
body = self.reader_ports_client.list_ports()
ports_id_list = [i['id'] for i in body['ports']]
self.assertNotIn(port['id'], ports_id_list)
self._clean_network()
@@ -398,7 +412,7 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
self.routers.append(router)
port = self.create_router_interface(router['id'],
subnet['id'])
body = self.ports_client.show_port(port['port_id'])
body = self.reader_ports_client.show_port(port['port_id'])
return subnet, body['port']
@decorators.idempotent_id('e98f65db-68f4-4330-9fea-abd8c5192d4d')

View File

@@ -16,8 +16,11 @@
from tempest.api.network import base
from tempest.common import utils
from tempest import config
from tempest.lib import decorators
CONF = config.CONF
class ExtensionsTestJSON(base.BaseNetworkTest):
"""Tests the following operations in the Neutron API:
@@ -29,6 +32,16 @@ class ExtensionsTestJSON(base.BaseNetworkTest):
etc/tempest.conf.
"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(ExtensionsTestJSON, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.network_extensions_client
else:
cls.reader_client = cls.network_extensions_client
@decorators.attr(type='smoke')
@decorators.idempotent_id('ef28c7e6-e646-4979-9d67-deb207bc5564')
def test_list_show_extensions(self):
@@ -42,14 +55,14 @@ class ExtensionsTestJSON(base.BaseNetworkTest):
expected_alias = [ext for ext in expected_alias if
utils.is_extension_enabled(ext, 'network')]
actual_alias = list()
extensions = self.network_extensions_client.list_extensions()
extensions = self.reader_client.list_extensions()
list_extensions = extensions['extensions']
# Show and verify the details of the available extensions
for ext in list_extensions:
ext_name = ext['name']
ext_alias = ext['alias']
actual_alias.append(ext['alias'])
ext_details = self.network_extensions_client.show_extension(
ext_details = self.reader_client.show_extension(
ext_alias)
ext_details = ext_details['extension']

View File

@@ -36,6 +36,8 @@ class ExtraDHCPOptionsTestJSON(base.BaseNetworkTest):
section of etc/tempest.conf
"""
credentials = ['primary', 'project_reader']
@classmethod
def skip_checks(cls):
super(ExtraDHCPOptionsTestJSON, cls).skip_checks()
@@ -43,6 +45,14 @@ class ExtraDHCPOptionsTestJSON(base.BaseNetworkTest):
msg = "Extra DHCP Options extension not enabled."
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(ExtraDHCPOptionsTestJSON, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_ports_client = cls.os_project_reader.ports_client
else:
cls.reader_ports_client = cls.ports_client
@classmethod
def resource_setup(cls):
super(ExtraDHCPOptionsTestJSON, cls).resource_setup()
@@ -72,7 +82,7 @@ class ExtraDHCPOptionsTestJSON(base.BaseNetworkTest):
self.ports_client.delete_port, port_id)
# Confirm port created has Extra DHCP Options
body = self.ports_client.list_ports()
body = self.reader_ports_client.list_ports()
ports = body['ports']
port = [p for p in ports if p['id'] == port_id]
self.assertTrue(port)
@@ -88,7 +98,7 @@ class ExtraDHCPOptionsTestJSON(base.BaseNetworkTest):
name=name,
extra_dhcp_opts=self.extra_dhcp_opts)
# Confirm extra dhcp options were added to the port
body = self.ports_client.show_port(self.port['id'])
body = self.reader_ports_client.show_port(self.port['id'])
self._confirm_extra_dhcp_options(body['port'], self.extra_dhcp_opts)
def _confirm_extra_dhcp_options(self, port, extra_dhcp_opts):

View File

@@ -42,6 +42,8 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
public_network_id which is the id for the external network present
"""
credentials = ['primary', 'project_reader']
@classmethod
def skip_checks(cls):
super(FloatingIPTestJSON, cls).skip_checks()
@@ -54,6 +56,14 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
if not CONF.network_feature_enabled.floating_ips:
raise cls.skipException("Floating ips are not available")
@classmethod
def setup_clients(cls):
super(FloatingIPTestJSON, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.floating_ips_client
else:
cls.reader_client = cls.floating_ips_client
@classmethod
def resource_setup(cls):
super(FloatingIPTestJSON, cls).resource_setup()
@@ -92,7 +102,7 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
self.assertIn(created_floating_ip['fixed_ip_address'],
[ip['ip_address'] for ip in self.ports[0]['fixed_ips']])
# Verifies the details of a floating_ip
floating_ip = self.floating_ips_client.show_floatingip(
floating_ip = self.reader_client.show_floatingip(
created_floating_ip['id'])
shown_floating_ip = floating_ip['floatingip']
self.assertEqual(shown_floating_ip['id'], created_floating_ip['id'])
@@ -105,7 +115,7 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
self.assertEqual(shown_floating_ip['port_id'], self.ports[0]['id'])
# Verify the floating ip exists in the list of all floating_ips
floating_ips = self.floating_ips_client.list_floatingips()
floating_ips = self.reader_client.list_floatingips()
floatingip_id_list = list()
for f in floating_ips['floatingips']:
floatingip_id_list.append(f['id'])
@@ -162,7 +172,7 @@ class FloatingIPTestJSON(base.BaseNetworkTest):
# Delete port
self.ports_client.delete_port(created_port['id'])
# Verifies the details of the floating_ip
floating_ip = self.floating_ips_client.show_floatingip(
floating_ip = self.reader_client.show_floatingip(
created_floating_ip['id'])
shown_floating_ip = floating_ip['floatingip']
# Confirm the fields are back to None

View File

@@ -155,6 +155,24 @@ class NetworksTest(BaseNetworkTestResources):
project_network_v6_mask_bits is the equivalent for ipv6 subnets
"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(NetworksTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_networks_client = cls.os_project_reader.networks_client
cls.reader_ports_client = cls.os_project_reader.ports_client
cls.reader_subnets_client = cls.os_project_reader.subnets_client
cls.reader_network_extensions_client = (
cls.os_project_reader.network_extensions_client)
else:
cls.reader_networks_client = cls.networks_client
cls.reader_ports_client = cls.ports_client
cls.reader_subnets_client = cls.subnets_client
cls.reader_network_extensions_client = (
cls.network_extensions_client)
@decorators.attr(type='smoke')
@decorators.idempotent_id('0e269138-0da6-4efc-a46d-578161e7b221')
def test_create_update_delete_network_subnet(self):
@@ -185,7 +203,7 @@ class NetworksTest(BaseNetworkTestResources):
@decorators.idempotent_id('2bf13842-c93f-4a69-83ed-717d2ec3b44e')
def test_show_network(self):
"""Verify the details of a network"""
body = self.networks_client.show_network(self.network['id'])
body = self.reader_networks_client.show_network(self.network['id'])
network = body['network']
for key in ['id', 'name']:
self.assertEqual(network[key], self.network[key])
@@ -196,8 +214,8 @@ class NetworksTest(BaseNetworkTestResources):
fields = ['id', 'name']
if utils.is_extension_enabled('net-mtu', 'network'):
fields.append('mtu')
body = self.networks_client.show_network(self.network['id'],
fields=fields)
body = self.reader_networks_client.show_network(self.network['id'],
fields=fields)
network = body['network']
self.assertEqual(sorted(network.keys()), sorted(fields))
for field_name in fields:
@@ -209,7 +227,7 @@ class NetworksTest(BaseNetworkTestResources):
@decorators.idempotent_id('f7ffdeda-e200-4a7a-bcbe-05716e86bf43')
def test_list_networks(self):
"""Verify the network exists in the list of all networks"""
body = self.networks_client.list_networks()
body = self.reader_networks_client.list_networks()
networks = [network['id'] for network in body['networks']
if network['id'] == self.network['id']]
self.assertNotEmpty(networks, "Created network not found in the list")
@@ -220,7 +238,7 @@ class NetworksTest(BaseNetworkTestResources):
fields = ['id', 'name']
if utils.is_extension_enabled('net-mtu', 'network'):
fields.append('mtu')
body = self.networks_client.list_networks(fields=fields)
body = self.reader_networks_client.list_networks(fields=fields)
networks = body['networks']
self.assertNotEmpty(networks, "Network list returned is empty")
for network in networks:
@@ -230,7 +248,7 @@ class NetworksTest(BaseNetworkTestResources):
@decorators.idempotent_id('bd635d81-6030-4dd1-b3b9-31ba0cfdf6cc')
def test_show_subnet(self):
"""Verify the details of a subnet"""
body = self.subnets_client.show_subnet(self.subnet['id'])
body = self.reader_subnets_client.show_subnet(self.subnet['id'])
subnet = body['subnet']
self.assertNotEmpty(subnet, "Subnet returned has no fields")
for key in ['id', 'cidr']:
@@ -241,8 +259,8 @@ class NetworksTest(BaseNetworkTestResources):
def test_show_subnet_fields(self):
"""Verify specific fields of a subnet"""
fields = ['id', 'network_id']
body = self.subnets_client.show_subnet(self.subnet['id'],
fields=fields)
body = self.reader_subnets_client.show_subnet(self.subnet['id'],
fields=fields)
subnet = body['subnet']
self.assertEqual(sorted(subnet.keys()), sorted(fields))
for field_name in fields:
@@ -252,7 +270,7 @@ class NetworksTest(BaseNetworkTestResources):
@decorators.idempotent_id('db68ba48-f4ea-49e9-81d1-e367f6d0b20a')
def test_list_subnets(self):
"""Verify the subnet exists in the list of all subnets"""
body = self.subnets_client.list_subnets()
body = self.reader_subnets_client.list_subnets()
subnets = [subnet['id'] for subnet in body['subnets']
if subnet['id'] == self.subnet['id']]
self.assertNotEmpty(subnets, "Created subnet not found in the list")
@@ -261,7 +279,7 @@ class NetworksTest(BaseNetworkTestResources):
def test_list_subnets_fields(self):
"""Verify specific fields of subnets"""
fields = ['id', 'network_id']
body = self.subnets_client.list_subnets(fields=fields)
body = self.reader_subnets_client.list_subnets(fields=fields)
subnets = body['subnets']
self.assertNotEmpty(subnets, "Subnet list returned is empty")
for subnet in subnets:
@@ -284,7 +302,8 @@ class NetworksTest(BaseNetworkTestResources):
self.networks_client.delete_network(net_id)
# Verify that the subnet got automatically deleted.
self.assertRaises(lib_exc.NotFound, self.subnets_client.show_subnet,
self.assertRaises(lib_exc.NotFound,
self.reader_subnets_client.show_subnet,
subnet_id)
@decorators.idempotent_id('d2d596e2-8e76-47a9-ac51-d4648009f4d3')
@@ -373,7 +392,8 @@ class NetworksTest(BaseNetworkTestResources):
public_network_id = CONF.network.public_network_id
# find external network matching public_network_id
body = self.networks_client.list_networks(**{'router:external': True})
body = self.reader_networks_client.list_networks(
**{'router:external': True})
external_network = next((network for network in body['networks']
if network['id'] == public_network_id), None)
self.assertIsNotNone(external_network, "Public network %s not found "
@@ -388,10 +408,12 @@ class NetworksTest(BaseNetworkTestResources):
# only check the public network ID because the other networks may
# belong to other tests and their state may have changed during this
# test
body = self.subnets_client.list_subnets(network_id=public_network_id)
body = self.reader_subnets_client.list_subnets(
network_id=public_network_id)
extensions = [
ext['alias'] for ext in
self.network_extensions_client.list_extensions()['extensions']]
self.reader_network_extensions_client.list_extensions()[
'extensions']]
is_sen_ext = 'subnet-external-network' in extensions
# check subnet visibility of external_network
@@ -412,12 +434,14 @@ class NetworksTest(BaseNetworkTestResources):
body = self.create_network(description='d1')
self.assertEqual('d1', body['description'])
net_id = body['id']
body = self.networks_client.list_networks(id=net_id)['networks'][0]
body = self.reader_networks_client.list_networks(
id=net_id)['networks'][0]
self.assertEqual('d1', body['description'])
body = self.networks_client.update_network(body['id'],
description='d2')
self.assertEqual('d2', body['network']['description'])
body = self.networks_client.list_networks(id=net_id)['networks'][0]
body = self.reader_networks_client.list_networks(
id=net_id)['networks'][0]
self.assertEqual('d2', body['description'])
@@ -439,11 +463,25 @@ class BulkNetworkOpsTest(base.BaseNetworkTest):
the block defined by project-network_cidr
"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(BulkNetworkOpsTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_networks_client = cls.os_project_reader.networks_client
cls.reader_ports_client = cls.os_project_reader.ports_client
cls.reader_subnets_client = cls.os_project_reader.subnets_client
else:
cls.reader_networks_client = cls.networks_client
cls.reader_ports_client = cls.ports_client
cls.reader_subnets_client = cls.subnets_client
def _delete_networks(self, created_networks):
for n in created_networks:
self.networks_client.delete_network(n['id'])
# Asserting that the networks are not found in the list after deletion
body = self.networks_client.list_networks()
body = self.reader_networks_client.list_networks()
networks_list = [network['id'] for network in body['networks']]
for n in created_networks:
self.assertNotIn(n['id'], networks_list)
@@ -452,7 +490,7 @@ class BulkNetworkOpsTest(base.BaseNetworkTest):
for n in created_subnets:
self.subnets_client.delete_subnet(n['id'])
# Asserting that the subnets are not found in the list after deletion
body = self.subnets_client.list_subnets()
body = self.reader_subnets_client.list_subnets()
subnets_list = [subnet['id'] for subnet in body['subnets']]
for n in created_subnets:
self.assertNotIn(n['id'], subnets_list)
@@ -461,7 +499,7 @@ class BulkNetworkOpsTest(base.BaseNetworkTest):
for n in created_ports:
self.ports_client.delete_port(n['id'])
# Asserting that the ports are not found in the list after deletion
body = self.ports_client.list_ports()
body = self.reader_ports_client.list_ports()
ports_list = [port['id'] for port in body['ports']]
for n in created_ports:
self.assertNotIn(n['id'], ports_list)
@@ -480,7 +518,7 @@ class BulkNetworkOpsTest(base.BaseNetworkTest):
created_networks = body['networks']
self.addCleanup(self._delete_networks, created_networks)
# Asserting that the networks are found in the list after creation
body = self.networks_client.list_networks()
body = self.reader_networks_client.list_networks()
networks_list = [network['id'] for network in body['networks']]
for n in created_networks:
self.assertIsNotNone(n['id'])
@@ -512,7 +550,7 @@ class BulkNetworkOpsTest(base.BaseNetworkTest):
created_subnets = body['subnets']
self.addCleanup(self._delete_subnets, created_subnets)
# Asserting that the subnets are found in the list after creation
body = self.subnets_client.list_subnets()
body = self.reader_subnets_client.list_subnets()
subnets_list = [subnet['id'] for subnet in body['subnets']]
for n in created_subnets:
self.assertIsNotNone(n['id'])
@@ -541,7 +579,7 @@ class BulkNetworkOpsTest(base.BaseNetworkTest):
created_ports = body['ports']
self.addCleanup(self._delete_ports, created_ports)
# Asserting that the ports are found in the list after creation
body = self.ports_client.list_ports()
body = self.reader_ports_client.list_ports()
ports_list = [port['id'] for port in body['ports']]
for n in created_ports:
self.assertIsNotNone(n['id'])
@@ -555,6 +593,16 @@ class BulkNetworkOpsIpV6Test(BulkNetworkOpsTest):
class NetworksIpV6Test(NetworksTest):
_ip_version = 6
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(NetworksIpV6Test, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_subnets_client = cls.os_project_reader.subnets_client
else:
cls.reader_subnets_client = cls.subnets_client
@decorators.idempotent_id('e41a4888-65a6-418c-a095-f7c2ef4ad59a')
def test_create_delete_subnet_with_gw(self):
"""Verify creating and deleting subnet with gateway"""
@@ -600,7 +648,7 @@ class NetworksIpV6Test(NetworksTest):
# Verifies Subnet GW is None in IPv4
self.assertIsNone(subnet2['gateway_ip'])
# Verifies all 2 subnets in the same network
body = self.subnets_client.list_subnets()
body = self.reader_subnets_client.list_subnets()
subnets = [sub['id'] for sub in body['subnets']
if sub['network_id'] == network['id']]
test_subnet_ids = [sub['id'] for sub in (subnet1, subnet2)]
@@ -613,6 +661,16 @@ class NetworksIpV6TestAttrs(BaseNetworkTestResources):
_ip_version = 6
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(NetworksIpV6TestAttrs, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_subnets_client = cls.os_project_reader.subnets_client
else:
cls.reader_subnets_client = cls.subnets_client
@classmethod
def skip_checks(cls):
super(NetworksIpV6TestAttrs, cls).skip_checks()
@@ -651,7 +709,7 @@ class NetworksIpV6TestAttrs(BaseNetworkTestResources):
port = self.create_port(slaac_network)
self.assertIsNotNone(port['fixed_ips'][0]['ip_address'])
self.subnets_client.delete_subnet(subnet_slaac['id'])
subnets = self.subnets_client.list_subnets()
subnets = self.reader_subnets_client.list_subnets()
subnet_ids = [subnet['id'] for subnet in subnets['subnets']]
self.assertNotIn(subnet_slaac['id'], subnet_ids,
"Subnet wasn't deleted")

View File

@@ -26,12 +26,27 @@ CONF = config.CONF
class NetworksNegativeTestJSON(base.BaseNetworkTest):
"""Negative tests of network"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(NetworksNegativeTestJSON, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_ports_client = cls.os_project_reader.ports_client
cls.reader_subnets_client = cls.os_project_reader.subnets_client
cls.reader_networks_client = cls.os_project_reader.networks_client
else:
cls.reader_ports_client = cls.ports_client
cls.reader_subnets_client = cls.subnets_client
cls.reader_networks_client = cls.networks_client
@decorators.attr(type=['negative'])
@decorators.idempotent_id('9293e937-824d-42d2-8d5b-e985ea67002a')
def test_show_non_existent_network(self):
"""Test showing non existent network"""
non_exist_id = data_utils.rand_uuid()
self.assertRaises(lib_exc.NotFound, self.networks_client.show_network,
self.assertRaises(lib_exc.NotFound,
self.reader_networks_client.show_network,
non_exist_id)
@decorators.attr(type=['negative'])
@@ -39,7 +54,8 @@ class NetworksNegativeTestJSON(base.BaseNetworkTest):
def test_show_non_existent_subnet(self):
"""Test showing non existent subnet"""
non_exist_id = data_utils.rand_uuid()
self.assertRaises(lib_exc.NotFound, self.subnets_client.show_subnet,
self.assertRaises(lib_exc.NotFound,
self.reader_subnets_client.show_subnet,
non_exist_id)
@decorators.attr(type=['negative'])
@@ -47,7 +63,8 @@ class NetworksNegativeTestJSON(base.BaseNetworkTest):
def test_show_non_existent_port(self):
"""Test showing non existent port"""
non_exist_id = data_utils.rand_uuid()
self.assertRaises(lib_exc.NotFound, self.ports_client.show_port,
self.assertRaises(lib_exc.NotFound,
self.reader_ports_client.show_port,
non_exist_id)
@decorators.attr(type=['negative'])

View File

@@ -40,6 +40,16 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
port update
"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(PortsTestJSON, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.ports_client
else:
cls.reader_client = cls.ports_client
@classmethod
def resource_setup(cls):
super(PortsTestJSON, cls).resource_setup()
@@ -48,7 +58,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
def _delete_port(self, port_id):
self.ports_client.delete_port(port_id)
body = self.ports_client.list_ports()
body = self.reader_client.list_ports()
ports_list = body['ports']
self.assertFalse(port_id in [n['id'] for n in ports_list])
@@ -153,7 +163,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
@decorators.idempotent_id('c9a685bd-e83f-499c-939f-9f7863ca259f')
def test_show_port(self):
"""Verify the details of port"""
body = self.ports_client.show_port(self.port['id'])
body = self.reader_client.show_port(self.port['id'])
port = body['port']
self.assertIn('id', port)
# NOTE(rfolco): created_at and updated_at may get inconsistent values
@@ -170,8 +180,8 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
def test_show_port_fields(self):
"""Verify specific fields of a port"""
fields = ['id', 'mac_address']
body = self.ports_client.show_port(self.port['id'],
fields=fields)
body = self.reader_client.show_port(self.port['id'],
fields=fields)
port = body['port']
self.assertEqual(sorted(port.keys()), sorted(fields))
for field_name in fields:
@@ -181,7 +191,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
@decorators.idempotent_id('cf95b358-3e92-4a29-a148-52445e1ac50e')
def test_list_ports(self):
"""Verify the port exists in the list of all ports"""
body = self.ports_client.list_ports()
body = self.reader_client.list_ports()
ports = [port['id'] for port in body['ports']
if port['id'] == self.port['id']]
self.assertNotEmpty(ports, "Created port not found in the list")
@@ -212,7 +222,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
# List ports filtered by fixed_ips
port_1_fixed_ip = port_1['port']['fixed_ips'][0]['ip_address']
fixed_ips = 'ip_address=' + port_1_fixed_ip
port_list = self.ports_client.list_ports(fixed_ips=fixed_ips)
port_list = self.reader_client.list_ports(fixed_ips=fixed_ips)
# Check that we got the desired port
ports = port_list['ports']
project_ids = set([port['project_id'] for port in ports])
@@ -281,7 +291,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
ips_filter = 'ip_address_substr=' + ip_address_1[:-1]
else:
ips_filter = 'ip_address_substr=' + ip_address_1
ports = self.ports_client.list_ports(fixed_ips=ips_filter)['ports']
ports = self.reader_client.list_ports(fixed_ips=ips_filter)['ports']
# Check that we got the desired port
port_ids = [port['id'] for port in ports]
fixed_ips = [port['fixed_ips'] for port in ports]
@@ -302,7 +312,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
while substr not in ip_address_2:
substr = substr[:-1]
ips_filter = 'ip_address_substr=' + substr
ports = self.ports_client.list_ports(fixed_ips=ips_filter)['ports']
ports = self.reader_client.list_ports(fixed_ips=ips_filter)['ports']
# Check that we got both port
port_ids = [port['id'] for port in ports]
fixed_ips = [port['fixed_ips'] for port in ports]
@@ -339,7 +349,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
self.routers_client.remove_router_interface,
router['id'], port_id=port['port']['id'])
# List ports filtered by router_id
port_list = self.ports_client.list_ports(device_id=router['id'])
port_list = self.reader_client.list_ports(device_id=router['id'])
ports = port_list['ports']
self.assertEqual(len(ports), 1)
self.assertEqual(ports[0]['id'], port['port']['id'])
@@ -349,7 +359,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
def test_list_ports_fields(self):
"""Verify specific fields of ports"""
fields = ['id', 'mac_address']
body = self.ports_client.list_ports(fields=fields)
body = self.reader_client.list_ports(fields=fields)
ports = body['ports']
self.assertNotEmpty(ports, "Port list returned is empty")
# Asserting the fields returned are correct
@@ -501,7 +511,7 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.ports_client.delete_port, body['port']['id'])
port = body['port']
body = self.ports_client.show_port(port['id'])
body = self.reader_client.show_port(port['id'])
show_port = body['port']
self.assertEqual(free_mac_address,
show_port['mac_address'])

View File

@@ -37,6 +37,18 @@ class RoutersTest(base.BaseNetworkTest):
self.assertEqual(subnet_id, interface['subnet_id'])
return interface
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(RoutersTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_routers_client = cls.os_project_reader.routers_client
cls.reader_ports_client = cls.os_project_reader.ports_client
else:
cls.reader_routers_client = cls.routers_client
cls.reader_ports_client = cls.ports_client
@classmethod
def skip_checks(cls):
super(RoutersTest, cls).skip_checks()
@@ -65,7 +77,7 @@ class RoutersTest(base.BaseNetworkTest):
router['external_gateway_info']['network_id'],
CONF.network.public_network_id)
# Show details of the created router
router_show = self.routers_client.show_router(
router_show = self.reader_routers_client.show_router(
router['id'])['router']
self.assertEqual(router_show['name'], router['name'])
self.assertEqual(
@@ -79,7 +91,7 @@ class RoutersTest(base.BaseNetworkTest):
router_update = self.routers_client.update_router(
router['id'], name=updated_name)['router']
self.assertEqual(router_update['name'], updated_name)
router_show = self.routers_client.show_router(
router_show = self.reader_routers_client.show_router(
router['id'])['router']
self.assertEqual(router_show['name'], updated_name)
@@ -107,7 +119,7 @@ class RoutersTest(base.BaseNetworkTest):
self.assertIn('subnet_id', interface.keys())
self.assertIn('port_id', interface.keys())
# Verify router id is equal to device id in port details
show_port_body = self.ports_client.show_port(
show_port_body = self.reader_ports_client.show_port(
interface['port_id'])
self.assertEqual(show_port_body['port']['device_id'],
router['id'])
@@ -140,7 +152,7 @@ class RoutersTest(base.BaseNetworkTest):
self.assertIn('subnet_id', interface.keys())
self.assertIn('port_id', interface.keys())
# Verify router id is equal to device id in port details
show_port_body = self.ports_client.show_port(
show_port_body = self.reader_ports_client.show_port(
interface['port_id'])
self.assertEqual(show_port_body['port']['device_id'],
router['id'])
@@ -194,7 +206,7 @@ class RoutersTest(base.BaseNetworkTest):
test_routes.sort(key=lambda x: x['destination'])
extra_route = self.routers_client.update_router(
router['id'], routes=test_routes)
show_body = self.routers_client.show_router(router['id'])
show_body = self.reader_routers_client.show_router(router['id'])
# Assert the number of routes
self.assertEqual(routes_num, len(extra_route['router']['routes']))
self.assertEqual(routes_num, len(show_body['router']['routes']))
@@ -215,7 +227,7 @@ class RoutersTest(base.BaseNetworkTest):
self.assertEqual(test_routes[i]['nexthop'], routes[i]['nexthop'])
self._delete_extra_routes(router['id'])
show_body_after_deletion = self.routers_client.show_router(
show_body_after_deletion = self.reader_routers_client.show_router(
router['id'])
self.assertEmpty(show_body_after_deletion['router']['routes'])
@@ -232,7 +244,7 @@ class RoutersTest(base.BaseNetworkTest):
update_body = self.routers_client.update_router(router['id'],
admin_state_up=True)
self.assertTrue(update_body['router']['admin_state_up'])
show_body = self.routers_client.show_router(router['id'])
show_body = self.reader_routers_client.show_router(router['id'])
self.assertTrue(show_body['router']['admin_state_up'])
@decorators.attr(type='smoke')
@@ -288,7 +300,7 @@ class RoutersTest(base.BaseNetworkTest):
subnet['id'])
self.assertIn('port_id', interface)
self.assertIn('subnet_id', interface)
port = self.ports_client.show_port(interface['port_id'])
port = self.reader_ports_client.show_port(interface['port_id'])
self.assertEqual(port['port']['id'], interface['port_id'])
router_port = self.ports_client.update_port(port['port']['id'],
fixed_ips=fixed_ip)
@@ -296,7 +308,7 @@ class RoutersTest(base.BaseNetworkTest):
router_port['port']['fixed_ips'][0]['subnet_id'])
def _verify_router_interface(self, router_id, subnet_id, port_id):
show_port_body = self.ports_client.show_port(port_id)
show_port_body = self.reader_ports_client.show_port(port_id)
interface_port = show_port_body['port']
self.assertEqual(router_id, interface_port['device_id'])
self.assertEqual(subnet_id,

View File

@@ -26,6 +26,16 @@ CONF = config.CONF
class RoutersNegativeTest(base.BaseNetworkTest):
"""Negative tests of routers"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(RoutersNegativeTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.routers_client
else:
cls.reader_client = cls.routers_client
@classmethod
def skip_checks(cls):
super(RoutersNegativeTest, cls).skip_checks()
@@ -105,7 +115,7 @@ class RoutersNegativeTest(base.BaseNetworkTest):
"""Test showing non existent router"""
router = data_utils.rand_name(
name='non_exist_router', prefix=CONF.resource_name_prefix)
self.assertRaises(lib_exc.NotFound, self.routers_client.show_router,
self.assertRaises(lib_exc.NotFound, self.reader_client.show_router,
router)
@decorators.attr(type=['negative'])

View File

@@ -26,6 +26,21 @@ CONF = config.CONF
class SecGroupTest(base.BaseSecGroupTest):
"""Test security groups"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(SecGroupTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_security_groups_client = (
cls.os_project_reader.security_groups_client)
cls.reader_security_group_rules_client = (
cls.os_project_reader.security_group_rules_client)
else:
cls.reader_security_groups_client = cls.security_groups_client
cls.reader_security_group_rules_client = (
cls.security_group_rules_client)
@classmethod
def skip_checks(cls):
super(SecGroupTest, cls).skip_checks()
@@ -72,7 +87,7 @@ class SecGroupTest(base.BaseSecGroupTest):
@decorators.idempotent_id('e30abd17-fef9-4739-8617-dc26da88e686')
def test_list_security_groups(self):
"""Verify that default security group exist"""
body = self.security_groups_client.list_security_groups()
body = self.reader_security_groups_client.list_security_groups()
security_groups = body['security_groups']
found = None
for n in security_groups:
@@ -88,7 +103,7 @@ class SecGroupTest(base.BaseSecGroupTest):
group_create_body, _ = self._create_security_group()
# List security groups and verify if created group is there in response
list_body = self.security_groups_client.list_security_groups()
list_body = self.reader_security_groups_client.list_security_groups()
secgroup_list = list()
for secgroup in list_body['security_groups']:
secgroup_list.append(secgroup['id'])
@@ -106,7 +121,7 @@ class SecGroupTest(base.BaseSecGroupTest):
self.assertEqual(update_body['security_group']['description'],
new_description)
# Show details of the updated security group
show_body = self.security_groups_client.show_security_group(
show_body = self.reader_security_groups_client.show_security_group(
group_create_body['security_group']['id'])
self.assertEqual(show_body['security_group']['name'], new_name)
self.assertEqual(show_body['security_group']['description'],
@@ -136,7 +151,8 @@ class SecGroupTest(base.BaseSecGroupTest):
# List rules and verify created rule is not in response
rule_list_body = (
self.security_group_rules_client.list_security_group_rules())
self.reader_security_group_rules_client
.list_security_group_rules())
rule_list = [rule['id']
for rule in rule_list_body['security_group_rules']]
self.assertNotIn(rule_id, rule_list)
@@ -170,7 +186,8 @@ class SecGroupTest(base.BaseSecGroupTest):
# List rules and verify created rule is in response
rule_list_body = (
self.security_group_rules_client.list_security_group_rules())
self.reader_security_group_rules_client
.list_security_group_rules())
rule_list = [rule['id']
for rule in rule_list_body['security_group_rules']]
self.assertIn(rule_create_body['security_group_rule']['id'],

View File

@@ -26,6 +26,21 @@ CONF = config.CONF
class NegativeSecGroupTest(base.BaseSecGroupTest):
"""Negative tests of security groups"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(NegativeSecGroupTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_security_groups_client = (
cls.os_project_reader.security_groups_client)
cls.reader_security_group_rules_client = (
cls.os_project_reader.security_group_rules_client)
else:
cls.reader_security_groups_client = cls.security_groups_client
cls.reader_security_group_rules_client = (
cls.security_group_rules_client)
@classmethod
def skip_checks(cls):
super(NegativeSecGroupTest, cls).skip_checks()
@@ -39,7 +54,8 @@ class NegativeSecGroupTest(base.BaseSecGroupTest):
"""Test showing non existent security group"""
non_exist_id = data_utils.rand_uuid()
self.assertRaises(
lib_exc.NotFound, self.security_groups_client.show_security_group,
lib_exc.NotFound,
self.reader_security_groups_client.show_security_group,
non_exist_id)
@decorators.attr(type=['negative'])
@@ -49,7 +65,7 @@ class NegativeSecGroupTest(base.BaseSecGroupTest):
non_exist_id = data_utils.rand_uuid()
self.assertRaises(
lib_exc.NotFound,
self.security_group_rules_client.show_security_group_rule,
self.reader_security_group_rules_client.show_security_group_rule,
non_exist_id)
@decorators.attr(type=['negative'])

View File

@@ -12,12 +12,25 @@
from tempest.api.network import base
from tempest.common import utils
from tempest import config
from tempest.lib import decorators
CONF = config.CONF
class ServiceProvidersTest(base.BaseNetworkTest):
"""Test network service providers"""
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(ServiceProvidersTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.service_providers_client
else:
cls.reader_client = cls.service_providers_client
@classmethod
def skip_checks(cls):
super(ServiceProvidersTest, cls).skip_checks()
@@ -28,6 +41,6 @@ class ServiceProvidersTest(base.BaseNetworkTest):
@decorators.idempotent_id('2cbbeea9-f010-40f6-8df5-4eaa0c918ea6')
def test_service_providers_list(self):
"""Test listing network service providers"""
body = self.service_providers_client.list_service_providers()
body = self.reader_client.list_service_providers()
self.assertIn('service_providers', body)
self.assertIsInstance(body['service_providers'], list)

View File

@@ -39,6 +39,8 @@ class SubnetPoolsTestJSON(base.BaseNetworkTest):
"""
credentials = ['primary', 'project_reader']
@classmethod
def skip_checks(cls):
super(SubnetPoolsTestJSON, cls).skip_checks()
@@ -46,6 +48,14 @@ class SubnetPoolsTestJSON(base.BaseNetworkTest):
msg = "subnet_allocation extension not enabled."
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(SubnetPoolsTestJSON, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.subnetpools_client
else:
cls.reader_client = cls.subnetpools_client
@decorators.attr(type='smoke')
@decorators.idempotent_id('62595970-ab1c-4b7f-8fcc-fddfe55e9811')
def test_create_list_show_update_delete_subnetpools(self):
@@ -62,7 +72,7 @@ class SubnetPoolsTestJSON(base.BaseNetworkTest):
subnetpool_id)
self.assertEqual(subnetpool_name, body["subnetpool"]["name"])
# get detail about subnet pool
body = self.subnetpools_client.show_subnetpool(subnetpool_id)
body = self.reader_client.show_subnetpool(subnetpool_id)
self.assertEqual(subnetpool_name, body["subnetpool"]["name"])
# update the subnet pool
subnetpool_name = data_utils.rand_name(
@@ -73,5 +83,5 @@ class SubnetPoolsTestJSON(base.BaseNetworkTest):
# delete subnet pool
body = self.subnetpools_client.delete_subnetpool(subnetpool_id)
self.assertRaises(lib_exc.NotFound,
self.subnetpools_client.show_subnetpool,
self.reader_client.show_subnetpool,
subnetpool_id)

View File

@@ -37,6 +37,8 @@ class TagsTest(base.BaseNetworkTest):
tags on their networks. The extension supports networks only.
"""
credentials = ['primary', 'project_reader']
@classmethod
def skip_checks(cls):
super(TagsTest, cls).skip_checks()
@@ -44,6 +46,14 @@ class TagsTest(base.BaseNetworkTest):
msg = "tag extension not enabled."
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(TagsTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.tags_client
else:
cls.reader_client = cls.tags_client
@classmethod
def resource_setup(cls):
super(TagsTest, cls).resource_setup()
@@ -61,7 +71,7 @@ class TagsTest(base.BaseNetworkTest):
tag_name)
# Validate that listing tags on a network resource works.
retrieved_tags = self.tags_client.list_tags(
retrieved_tags = self.reader_client.list_tags(
'networks', self.network['id'])['tags']
self.assertEqual([tag_name], retrieved_tags)
@@ -115,6 +125,8 @@ class TagsExtTest(base.BaseNetworkTest):
# the singular case for the corresponding class resource object.
SUPPORTED_RESOURCES = ['subnets', 'ports', 'routers', 'subnetpools']
credentials = ['primary', 'project_reader']
@classmethod
def skip_checks(cls):
super(TagsExtTest, cls).skip_checks()
@@ -126,6 +138,14 @@ class TagsExtTest(base.BaseNetworkTest):
"are enabled.")
raise cls.skipException(msg)
@classmethod
def setup_clients(cls):
super(TagsExtTest, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.tags_client
else:
cls.reader_client = cls.tags_client
@classmethod
def resource_setup(cls):
super(TagsExtTest, cls).resource_setup()
@@ -169,7 +189,7 @@ class TagsExtTest(base.BaseNetworkTest):
for i, resource in enumerate(self.SUPPORTED_RESOURCES):
# Ensure that a tag was created for each resource.
resource_object = getattr(self, resource[:-1])
retrieved_tags = self.tags_client.list_tags(
retrieved_tags = self.reader_client.list_tags(
resource, resource_object['id'])['tags']
self.assertEqual(1, len(retrieved_tags))
self.assertEqual(tag_names[i], retrieved_tags[0])
@@ -181,7 +201,7 @@ class TagsExtTest(base.BaseNetworkTest):
# Delete the tag and ensure it was deleted.
self.tags_client.delete_tag(
resource, resource_object['id'], tag_names[i])
retrieved_tags = self.tags_client.list_tags(
retrieved_tags = self.reader_client.list_tags(
resource, resource_object['id'])['tags']
self.assertEmpty(retrieved_tags)

View File

@@ -13,10 +13,24 @@
# under the License.
from tempest.api.network import base
from tempest import config
from tempest.lib import decorators
CONF = config.CONF
class NetworksApiDiscovery(base.BaseNetworkTest):
credentials = ['primary', 'project_reader']
@classmethod
def setup_clients(cls):
super(NetworksApiDiscovery, cls).setup_clients()
if CONF.enforce_scope.neutron:
cls.reader_client = cls.os_project_reader.network_versions_client
else:
cls.reader_client = cls.network_versions_client
@decorators.attr(type='smoke')
@decorators.idempotent_id('cac8a836-c2e0-4304-b556-cd299c7281d1')
def test_api_version_resources(self):
@@ -28,7 +42,7 @@ class NetworksApiDiscovery(base.BaseNetworkTest):
schema.
"""
result = self.network_versions_client.list_versions()
result = self.reader_client.list_versions()
expected_versions = ('v2.0',)
expected_resources = ('id', 'links', 'status')
received_list = result.values()
@@ -45,7 +59,7 @@ class NetworksApiDiscovery(base.BaseNetworkTest):
"""Test that GET /v2.0/ returns expected resources."""
current_version = 'v2.0'
expected_resources = ('subnet', 'network', 'port')
result = self.network_versions_client.show_version(current_version)
result = self.reader_client.show_version(current_version)
actual_resources = [r['name'] for r in result['resources']]
for resource in expected_resources:
self.assertIn(resource, actual_resources)