Split out Neutron routers client

Splitting out a routers client for Neutron.

Partially implements blueprint consistent-service-method-names

Change-Id: I9b43d99067778913c235e62040d3a2ca7f4346e8
This commit is contained in:
Ken'ichi Ohmichi 2015-12-22 04:57:11 +00:00
parent 108877da6c
commit e35f472fb7
23 changed files with 314 additions and 245 deletions

View File

@ -74,14 +74,14 @@ class L3AgentSchedulerTestJSON(base.BaseAdminNetworkTest):
# query and setup steps are only required if the extension is available
# and only if the router's default type is distributed.
if test.is_extension_enabled('dvr', 'network'):
cls.is_dvr_router = cls.admin_client.show_router(
cls.is_dvr_router = cls.admin_routers_client.show_router(
cls.router['id'])['router'].get('distributed', False)
if cls.is_dvr_router:
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
cls.port = cls.create_port(cls.network)
cls.client.add_router_interface(cls.router['id'],
port_id=cls.port['id'])
cls.routers_client.add_router_interface(
cls.router['id'], port_id=cls.port['id'])
# NOTE: Sometimes we have seen this test fail with dvr in,
# multinode tests, since the dhcp port is not created before
# the test gets executed and so the router is not scheduled
@ -92,15 +92,15 @@ class L3AgentSchedulerTestJSON(base.BaseAdminNetworkTest):
external_gateway_info = {
'network_id': CONF.network.public_network_id,
'enable_snat': True}
cls.admin_client.update_router_with_snat_gw_info(
cls.admin_routers_client.update_router_with_snat_gw_info(
cls.router['id'],
external_gateway_info=external_gateway_info)
@classmethod
def resource_cleanup(cls):
if cls.is_dvr_router:
cls.client.remove_router_interface(cls.router['id'],
port_id=cls.port['id'])
cls.routers_client.remove_router_interface(cls.router['id'],
port_id=cls.port['id'])
super(L3AgentSchedulerTestJSON, cls).resource_cleanup()
@test.idempotent_id('b7ce6e89-e837-4ded-9b78-9ed3c9c6a45a')
@ -114,7 +114,8 @@ class L3AgentSchedulerTestJSON(base.BaseAdminNetworkTest):
self.agent['id'],
router_id=self.router['id'])
body = (
self.admin_client.list_l3_agents_hosting_router(self.router['id']))
self.admin_routers_client.list_l3_agents_hosting_router(
self.router['id']))
for agent in body['agents']:
l3_agent_ids.append(agent['id'])
self.assertIn('agent_type', agent)

View File

@ -34,8 +34,8 @@ class RoutersTestDVR(base.BaseRouterTest):
# has a distributed attribute.
super(RoutersTestDVR, cls).resource_setup()
name = data_utils.rand_name('pretest-check')
router = cls.admin_client.create_router(name)
cls.admin_client.delete_router(router['router']['id'])
router = cls.admin_routers_client.create_router(name)
cls.admin_routers_client.delete_router(router['router']['id'])
if 'distributed' not in router['router']:
msg = "'distributed' flag not found. DVR Possibly not enabled"
raise cls.skipException(msg)
@ -53,8 +53,9 @@ class RoutersTestDVR(base.BaseRouterTest):
set to True
"""
name = data_utils.rand_name('router')
router = self.admin_client.create_router(name, distributed=True)
self.addCleanup(self.admin_client.delete_router,
router = self.admin_routers_client.create_router(name,
distributed=True)
self.addCleanup(self.admin_routers_client.delete_router,
router['router']['id'])
self.assertTrue(router['router']['distributed'])
@ -72,8 +73,9 @@ class RoutersTestDVR(base.BaseRouterTest):
as opposed to a "Distributed Virtual Router"
"""
name = data_utils.rand_name('router')
router = self.admin_client.create_router(name, distributed=False)
self.addCleanup(self.admin_client.delete_router,
router = self.admin_routers_client.create_router(name,
distributed=False)
self.addCleanup(self.admin_routers_client.delete_router,
router['router']['id'])
self.assertFalse(router['router']['distributed'])
@ -93,11 +95,12 @@ class RoutersTestDVR(base.BaseRouterTest):
"""
name = data_utils.rand_name('router')
# router needs to be in admin state down in order to be upgraded to DVR
router = self.admin_client.create_router(name, distributed=False,
admin_state_up=False)
self.addCleanup(self.admin_client.delete_router,
router = self.admin_routers_client.create_router(name,
distributed=False,
admin_state_up=False)
self.addCleanup(self.admin_routers_client.delete_router,
router['router']['id'])
self.assertFalse(router['router']['distributed'])
router = self.admin_client.update_router(router['router']['id'],
distributed=True)
router = self.admin_routers_client.update_router(
router['router']['id'], distributed=True)
self.assertTrue(router['router']['distributed'])

View File

@ -71,6 +71,7 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
cls.agents_client = cls.os.network_agents_client
cls.network_extensions_client = cls.os.network_extensions_client
cls.networks_client = cls.os.networks_client
cls.routers_client = cls.os.routers_client
cls.subnetpools_client = cls.os.subnetpools_client
cls.subnets_client = cls.os.subnets_client
cls.ports_client = cls.os.ports_client
@ -231,7 +232,7 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
ext_gw_info['network_id'] = external_network_id
if enable_snat is not None:
ext_gw_info['enable_snat'] = enable_snat
body = cls.client.create_router(
body = cls.routers_client.create_router(
router_name, external_gateway_info=ext_gw_info,
admin_state_up=admin_state_up, **kwargs)
router = body['router']
@ -250,8 +251,8 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
@classmethod
def create_router_interface(cls, router_id, subnet_id):
"""Wrapper utility that returns a router interface."""
interface = cls.client.add_router_interface(router_id,
subnet_id=subnet_id)
interface = cls.routers_client.add_router_interface(
router_id, subnet_id=subnet_id)
return interface
@classmethod
@ -260,12 +261,12 @@ class BaseNetworkTest(tempest.test.BaseTestCase):
interfaces = body['ports']
for i in interfaces:
try:
cls.client.remove_router_interface(
cls.routers_client.remove_router_interface(
router['id'],
subnet_id=i['fixed_ips'][0]['subnet_id'])
except lib_exc.NotFound:
pass
cls.client.delete_router(router['id'])
cls.routers_client.delete_router(router['id'])
class BaseAdminNetworkTest(BaseNetworkTest):
@ -278,6 +279,7 @@ class BaseAdminNetworkTest(BaseNetworkTest):
cls.admin_client = cls.os_adm.network_client
cls.admin_agents_client = cls.os_adm.network_agents_client
cls.admin_networks_client = cls.os_adm.networks_client
cls.admin_routers_client = cls.os_adm.routers_client
cls.admin_subnets_client = cls.os_adm.subnets_client
cls.admin_ports_client = cls.os_adm.ports_client
cls.admin_quotas_client = cls.os_adm.network_quotas_client

View File

@ -33,31 +33,31 @@ class BaseRouterTest(base.BaseAdminNetworkTest):
self.addCleanup(self._cleanup_router, router)
return router
def _delete_router(self, router_id, network_client=None):
client = network_client or self.client
def _delete_router(self, router_id, routers_client=None):
client = routers_client or self.routers_client
client.delete_router(router_id)
# Asserting that the router is not found in the list
# after deletion
list_body = self.client.list_routers()
list_body = self.routers_client.list_routers()
routers_list = list()
for router in list_body['routers']:
routers_list.append(router['id'])
self.assertNotIn(router_id, routers_list)
def _add_router_interface_with_subnet_id(self, router_id, subnet_id):
interface = self.client.add_router_interface(router_id,
subnet_id=subnet_id)
interface = self.routers_client.add_router_interface(
router_id, subnet_id=subnet_id)
self.addCleanup(self._remove_router_interface_with_subnet_id,
router_id, subnet_id)
self.assertEqual(subnet_id, interface['subnet_id'])
return interface
def _remove_router_interface_with_subnet_id(self, router_id, subnet_id):
body = self.client.remove_router_interface(router_id,
subnet_id=subnet_id)
body = self.routers_client.remove_router_interface(router_id,
subnet_id=subnet_id)
self.assertEqual(subnet_id, body['subnet_id'])
def _remove_router_interface_with_port_id(self, router_id, port_id):
body = self.client.remove_router_interface(router_id,
port_id=port_id)
body = self.routers_client.remove_router_interface(router_id,
port_id=port_id)
self.assertEqual(port_id, body['port_id'])

View File

@ -68,8 +68,8 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
for port in ports:
if (port['device_owner'].startswith('network:router_interface') and
port['device_id'] in [r['id'] for r in self.routers]):
self.client.remove_router_interface(port['device_id'],
port_id=port['id'])
self.routers_client.remove_router_interface(port['device_id'],
port_id=port['id'])
else:
if port['id'] in [p['id'] for p in self.ports]:
self.ports_client.delete_port(port['id'])
@ -80,11 +80,11 @@ class NetworksTestDHCPv6(base.BaseNetworkTest):
if subnet['id'] in [s['id'] for s in self.subnets]:
self.subnets_client.delete_subnet(subnet['id'])
self._remove_from_list_by_index(self.subnets, subnet)
body = self.client.list_routers()
body = self.routers_client.list_routers()
routers = body['routers']
for router in routers:
if router['id'] in [r['id'] for r in self.routers]:
self.client.delete_router(router['id'])
self.routers_client.delete_router(router['id'])
self._remove_from_list_by_index(self.routers, router)
def _get_ips_from_subnet(self, **kwargs):

View File

@ -197,13 +197,13 @@ class PortsTestJSON(sec_base.BaseSecGroupTest):
subnet = self.create_subnet(network)
self.addCleanup(self.subnets_client.delete_subnet, subnet['id'])
router = self.create_router(data_utils.rand_name('router-'))
self.addCleanup(self.client.delete_router, router['id'])
self.addCleanup(self.routers_client.delete_router, router['id'])
port = self.ports_client.create_port(network_id=network['id'])
# Add router interface to port created above
self.client.add_router_interface(router['id'],
port_id=port['port']['id'])
self.addCleanup(self.client.remove_router_interface, router['id'],
port_id=port['port']['id'])
self.routers_client.add_router_interface(router['id'],
port_id=port['port']['id'])
self.addCleanup(self.routers_client.remove_router_interface,
router['id'], port_id=port['port']['id'])
# List ports filtered by router_id
port_list = self.ports_client.list_ports(device_id=router['id'])
ports = port_list['ports']

View File

@ -52,7 +52,7 @@ class RoutersTest(base.BaseRouterTest):
# NOTE(salv-orlando): Do not invoke self.create_router
# as we need to check the response code
name = data_utils.rand_name('router-')
create_body = self.client.create_router(
create_body = self.routers_client.create_router(
name, external_gateway_info={
"network_id": CONF.network.public_network_id},
admin_state_up=False)
@ -63,24 +63,25 @@ class RoutersTest(base.BaseRouterTest):
CONF.network.public_network_id)
self.assertEqual(create_body['router']['admin_state_up'], False)
# Show details of the created router
show_body = self.client.show_router(create_body['router']['id'])
show_body = self.routers_client.show_router(
create_body['router']['id'])
self.assertEqual(show_body['router']['name'], name)
self.assertEqual(
show_body['router']['external_gateway_info']['network_id'],
CONF.network.public_network_id)
self.assertEqual(show_body['router']['admin_state_up'], False)
# List routers and verify if created router is there in response
list_body = self.client.list_routers()
list_body = self.routers_client.list_routers()
routers_list = list()
for router in list_body['routers']:
routers_list.append(router['id'])
self.assertIn(create_body['router']['id'], routers_list)
# Update the name of router and verify if it is updated
updated_name = 'updated ' + name
update_body = self.client.update_router(create_body['router']['id'],
name=updated_name)
update_body = self.routers_client.update_router(
create_body['router']['id'], name=updated_name)
self.assertEqual(update_body['router']['name'], updated_name)
show_body = self.client.show_router(
show_body = self.routers_client.show_router(
create_body['router']['id'])
self.assertEqual(show_body['router']['name'], updated_name)
@ -95,9 +96,9 @@ class RoutersTest(base.BaseRouterTest):
self.addCleanup(self.identity_utils.delete_project, project_id)
name = data_utils.rand_name('router-')
create_body = self.admin_client.create_router(name,
tenant_id=project_id)
self.addCleanup(self.admin_client.delete_router,
create_body = self.admin_routers_client.create_router(
name, tenant_id=project_id)
self.addCleanup(self.admin_routers_client.delete_router,
create_body['router']['id'])
self.assertEqual(project_id, create_body['router']['tenant_id'])
@ -122,9 +123,9 @@ class RoutersTest(base.BaseRouterTest):
external_gateway_info = {
'network_id': CONF.network.public_network_id,
'enable_snat': enable_snat}
create_body = self.admin_client.create_router(
create_body = self.admin_routers_client.create_router(
name, external_gateway_info=external_gateway_info)
self.addCleanup(self.admin_client.delete_router,
self.addCleanup(self.admin_routers_client.delete_router,
create_body['router']['id'])
# Verify snat attributes after router creation
self._verify_router_gateway(create_body['router']['id'],
@ -137,8 +138,8 @@ class RoutersTest(base.BaseRouterTest):
subnet = self.create_subnet(network)
router = self._create_router(data_utils.rand_name('router-'))
# Add router interface with subnet id
interface = self.client.add_router_interface(router['id'],
subnet_id=subnet['id'])
interface = self.routers_client.add_router_interface(
router['id'], subnet_id=subnet['id'])
self.addCleanup(self._remove_router_interface_with_subnet_id,
router['id'], subnet['id'])
self.assertIn('subnet_id', interface.keys())
@ -158,7 +159,7 @@ class RoutersTest(base.BaseRouterTest):
port_body = self.ports_client.create_port(
network_id=network['id'])
# add router interface to port created above
interface = self.client.add_router_interface(
interface = self.routers_client.add_router_interface(
router['id'],
port_id=port_body['port']['id'])
self.addCleanup(self._remove_router_interface_with_port_id,
@ -172,7 +173,7 @@ class RoutersTest(base.BaseRouterTest):
router['id'])
def _verify_router_gateway(self, router_id, exp_ext_gw_info=None):
show_body = self.admin_client.show_router(router_id)
show_body = self.admin_routers_client.show_router(router_id)
actual_ext_gw_info = show_body['router']['external_gateway_info']
if exp_ext_gw_info is None:
self.assertIsNone(actual_ext_gw_info)
@ -198,7 +199,7 @@ class RoutersTest(base.BaseRouterTest):
@test.idempotent_id('6cc285d8-46bf-4f36-9b1a-783e3008ba79')
def test_update_router_set_gateway(self):
router = self._create_router(data_utils.rand_name('router-'))
self.client.update_router(
self.routers_client.update_router(
router['id'],
external_gateway_info={
'network_id': CONF.network.public_network_id})
@ -212,7 +213,7 @@ class RoutersTest(base.BaseRouterTest):
@test.requires_ext(extension='ext-gw-mode', service='network')
def test_update_router_set_gateway_with_snat_explicit(self):
router = self._create_router(data_utils.rand_name('router-'))
self.admin_client.update_router_with_snat_gw_info(
self.admin_routers_client.update_router_with_snat_gw_info(
router['id'],
external_gateway_info={
'network_id': CONF.network.public_network_id,
@ -227,7 +228,7 @@ class RoutersTest(base.BaseRouterTest):
@test.requires_ext(extension='ext-gw-mode', service='network')
def test_update_router_set_gateway_without_snat(self):
router = self._create_router(data_utils.rand_name('router-'))
self.admin_client.update_router_with_snat_gw_info(
self.admin_routers_client.update_router_with_snat_gw_info(
router['id'],
external_gateway_info={
'network_id': CONF.network.public_network_id,
@ -243,7 +244,8 @@ class RoutersTest(base.BaseRouterTest):
router = self._create_router(
data_utils.rand_name('router-'),
external_network_id=CONF.network.public_network_id)
self.client.update_router(router['id'], external_gateway_info={})
self.routers_client.update_router(router['id'],
external_gateway_info={})
self._verify_router_gateway(router['id'])
# No gateway port expected
list_body = self.admin_ports_client.list_ports(
@ -257,7 +259,7 @@ class RoutersTest(base.BaseRouterTest):
router = self._create_router(
data_utils.rand_name('router-'),
external_network_id=CONF.network.public_network_id)
self.admin_client.update_router_with_snat_gw_info(
self.admin_routers_client.update_router_with_snat_gw_info(
router['id'],
external_gateway_info={
'network_id': CONF.network.public_network_id,
@ -301,9 +303,9 @@ class RoutersTest(base.BaseRouterTest):
)
test_routes.sort(key=lambda x: x['destination'])
extra_route = self.client.update_extra_routes(router['id'],
routes=test_routes)
show_body = self.client.show_router(router['id'])
extra_route = self.routers_client.update_extra_routes(
router['id'], routes=test_routes)
show_body = self.routers_client.show_router(router['id'])
# Assert the number of routes
self.assertEqual(routes_num, len(extra_route['router']['routes']))
self.assertEqual(routes_num, len(show_body['router']['routes']))
@ -323,22 +325,23 @@ class RoutersTest(base.BaseRouterTest):
routes[i]['destination'])
self.assertEqual(test_routes[i]['nexthop'], routes[i]['nexthop'])
self.client.delete_extra_routes(router['id'])
show_body_after_deletion = self.client.show_router(router['id'])
self.routers_client.delete_extra_routes(router['id'])
show_body_after_deletion = self.routers_client.show_router(
router['id'])
self.assertEmpty(show_body_after_deletion['router']['routes'])
def _delete_extra_routes(self, router_id):
self.client.delete_extra_routes(router_id)
self.routers_client.delete_extra_routes(router_id)
@test.idempotent_id('a8902683-c788-4246-95c7-ad9c6d63a4d9')
def test_update_router_admin_state(self):
router = self._create_router(data_utils.rand_name('router-'))
self.assertFalse(router['admin_state_up'])
# Update router admin state
update_body = self.client.update_router(router['id'],
admin_state_up=True)
update_body = self.routers_client.update_router(router['id'],
admin_state_up=True)
self.assertTrue(update_body['router']['admin_state_up'])
show_body = self.client.show_router(router['id'])
show_body = self.routers_client.show_router(router['id'])
self.assertTrue(show_body['router']['admin_state_up'])
@test.attr(type='smoke')
@ -385,21 +388,21 @@ class DvrRoutersTest(base.BaseRouterTest):
@test.idempotent_id('141297aa-3424-455d-aa8d-f2d95731e00a')
def test_create_distributed_router(self):
name = data_utils.rand_name('router')
create_body = self.admin_client.create_router(
create_body = self.admin_routers_client.create_router(
name, distributed=True)
self.addCleanup(self._delete_router,
create_body['router']['id'],
self.admin_client)
self.admin_routers_client)
self.assertTrue(create_body['router']['distributed'])
@test.idempotent_id('644d7a4a-01a1-4b68-bb8d-0c0042cb1729')
def test_convert_centralized_router(self):
router = self._create_router(data_utils.rand_name('router'))
self.assertNotIn('distributed', router)
update_body = self.admin_client.update_router(router['id'],
distributed=True)
update_body = self.admin_routers_client.update_router(router['id'],
distributed=True)
self.assertTrue(update_body['router']['distributed'])
show_body = self.admin_client.show_router(router['id'])
show_body = self.admin_routers_client.show_router(router['id'])
self.assertTrue(show_body['router']['distributed'])
show_body = self.client.show_router(router['id'])
show_body = self.routers_client.show_router(router['id'])
self.assertNotIn('distributed', show_body['router'])

View File

@ -47,7 +47,7 @@ class RoutersNegativeTest(base.BaseRouterTest):
@test.idempotent_id('37a94fc0-a834-45b9-bd23-9a81d2fd1e22')
def test_router_add_gateway_invalid_network_returns_404(self):
self.assertRaises(lib_exc.NotFound,
self.client.update_router,
self.routers_client.update_router,
self.router['id'],
external_gateway_info={
'network_id': self.router['id']})
@ -60,7 +60,7 @@ class RoutersNegativeTest(base.BaseRouterTest):
sub_cidr = netaddr.IPNetwork(self.tenant_cidr).next()
self.create_subnet(alt_network, cidr=sub_cidr)
self.assertRaises(lib_exc.BadRequest,
self.client.update_router,
self.routers_client.update_router,
self.router['id'],
external_gateway_info={
'network_id': alt_network['id']})
@ -84,31 +84,31 @@ class RoutersNegativeTest(base.BaseRouterTest):
@test.attr(type=['negative'])
@test.idempotent_id('04df80f9-224d-47f5-837a-bf23e33d1c20')
def test_router_remove_interface_in_use_returns_409(self):
self.client.add_router_interface(self.router['id'],
subnet_id=self.subnet['id'])
self.routers_client.add_router_interface(self.router['id'],
subnet_id=self.subnet['id'])
self.assertRaises(lib_exc.Conflict,
self.client.delete_router,
self.routers_client.delete_router,
self.router['id'])
@test.attr(type=['negative'])
@test.idempotent_id('c2a70d72-8826-43a7-8208-0209e6360c47')
def test_show_non_existent_router_returns_404(self):
router = data_utils.rand_name('non_exist_router')
self.assertRaises(lib_exc.NotFound, self.client.show_router,
self.assertRaises(lib_exc.NotFound, self.routers_client.show_router,
router)
@test.attr(type=['negative'])
@test.idempotent_id('b23d1569-8b0c-4169-8d4b-6abd34fad5c7')
def test_update_non_existent_router_returns_404(self):
router = data_utils.rand_name('non_exist_router')
self.assertRaises(lib_exc.NotFound, self.client.update_router,
self.assertRaises(lib_exc.NotFound, self.routers_client.update_router,
router, name="new_name")
@test.attr(type=['negative'])
@test.idempotent_id('c7edc5ad-d09d-41e6-a344-5c0c31e2e3e4')
def test_delete_non_existent_router_returns_404(self):
router = data_utils.rand_name('non_exist_router')
self.assertRaises(lib_exc.NotFound, self.client.delete_router,
self.assertRaises(lib_exc.NotFound, self.routers_client.delete_router,
router)

View File

@ -151,7 +151,7 @@ class NeutronResourcesTestJSON(base.BaseOrchestrationTest):
def test_created_router(self):
"""Verifies created router."""
router_id = self.test_resources.get('Router')['physical_resource_id']
body = self.network_client.show_router(router_id)
body = self.routers_client.show_router(router_id)
router = body['router']
self.assertEqual(self.neutron_basic_template['resources'][
'Router']['properties']['name'], router['name'])

View File

@ -131,6 +131,7 @@ from tempest.services.identity.v3.json.users_clients import \
from tempest.services.image.v1.json.images_client import ImagesClient
from tempest.services.image.v2.json.images_client import ImagesClientV2
from tempest.services.network.json.network_client import NetworkClient
from tempest.services.network.json.routers_client import RoutersClient
from tempest.services.network.json.security_group_rules_client import \
SecurityGroupRulesClient
from tempest.services.object_storage.account_client import AccountClient
@ -308,6 +309,14 @@ class Manager(manager.Manager):
build_interval=CONF.network.build_interval,
build_timeout=CONF.network.build_timeout,
**self.default_params)
self.routers_client = RoutersClient(
self.auth_provider,
CONF.network.catalog_type,
CONF.network.region or CONF.identity.region,
endpoint_type=CONF.network.endpoint_type,
build_interval=CONF.network.build_interval,
build_timeout=CONF.network.build_timeout,
**self.default_params)
self.security_group_rules_client = SecurityGroupRulesClient(
self.auth_provider,
CONF.network.catalog_type,

View File

@ -104,6 +104,7 @@ from tempest.services.identity.v2.json import roles_client
from tempest.services.identity.v2.json import tenants_client
from tempest.services.identity.v2.json import users_client
from tempest.services.network.json import network_client
from tempest.services.network.json import routers_client
LOG = None
CONF = config.CONF
@ -171,6 +172,7 @@ def get_admin_clients(opts):
)
network_admin = None
networks_admin = None
routers_admin = None
subnets_admin = None
neutron_iso_networks = False
if (CONF.service_available.neutron and
@ -188,6 +190,12 @@ def get_admin_clients(opts):
CONF.network.region or CONF.identity.region,
endpoint_type='adminURL',
**params)
routers_admin = routers_client.RoutersClient(
_auth,
CONF.network.catalog_type,
CONF.network.region or CONF.identity.region,
endpoint_type='adminURL',
**params)
subnets_admin = subnets_client.SubnetsClient(
_auth,
CONF.network.catalog_type,
@ -195,12 +203,13 @@ def get_admin_clients(opts):
endpoint_type='adminURL',
**params)
return (identity_admin, tenants_admin, roles_admin, users_admin,
neutron_iso_networks, network_admin, networks_admin, subnets_admin)
neutron_iso_networks, network_admin, networks_admin, routers_admin,
subnets_admin)
def create_resources(opts, resources):
(identity_admin, tenants_admin, roles_admin, users_admin,
neutron_iso_networks, network_admin, networks_admin,
neutron_iso_networks, network_admin, networks_admin, routers_admin,
subnets_admin) = get_admin_clients(opts)
roles = roles_admin.list_roles()['roles']
for u in resources['users']:
@ -246,8 +255,8 @@ def create_resources(opts, resources):
for u in resources['users']:
tenant = identity.get_tenant_by_name(tenants_admin, u['tenant'])
network_name, router_name = create_network_resources(
network_admin, networks_admin, subnets_admin, tenant['id'],
u['name'])
network_admin, networks_admin, routers_admin, subnets_admin,
tenant['id'], u['name'])
u['network'] = network_name
u['router'] = router_name
LOG.info('Networks created')
@ -274,7 +283,8 @@ def create_resources(opts, resources):
def create_network_resources(network_admin_client, networks_admin_client,
subnets_admin_client, tenant_id, name):
routers_admin_client, subnets_admin_client,
tenant_id, name):
def _create_network(name):
resp_body = networks_admin_client.create_network(
@ -305,14 +315,14 @@ def create_network_resources(network_admin_client, networks_admin_client,
def _create_router(router_name):
external_net_id = dict(
network_id=CONF.network.public_network_id)
resp_body = network_admin_client.create_router(
resp_body = routers_admin_client.create_router(
router_name,
external_gateway_info=external_net_id,
tenant_id=tenant_id)
return resp_body['router']
def _add_router_interface(router_id, subnet_id):
network_admin_client.add_router_interface(router_id,
routers_admin_client.add_router_interface(router_id,
subnet_id=subnet_id)
network_name = name + "-network"

View File

@ -449,7 +449,7 @@ class NetworkFloatingIpService(NetworkService):
class NetworkRouterService(NetworkService):
def list(self):
client = self.client
client = self.routers_client
routers = client.list_routers(**self.tenant_filter)
routers = routers['routers']
if self.is_preserve:
@ -460,7 +460,7 @@ class NetworkRouterService(NetworkService):
return routers
def delete(self):
client = self.client
client = self.routers_client
routers = self.list()
for router in routers:
try:

View File

@ -134,6 +134,7 @@ from tempest.services.identity.v2.json import tenants_client
from tempest.services.identity.v2.json import users_client
from tempest.services.image.v2.json import images_client
from tempest.services.network.json import network_client
from tempest.services.network.json import routers_client
from tempest.services.object_storage import container_client
from tempest.services.object_storage import object_client
from tempest.services.telemetry.json import alarming_client
@ -270,6 +271,14 @@ class OSClient(object):
build_interval=CONF.network.build_interval,
build_timeout=CONF.network.build_timeout,
**default_params)
self.routers = routers_client.RoutersClient(
_auth,
CONF.network.catalog_type,
CONF.network.region or CONF.identity.region,
endpoint_type=CONF.network.endpoint_type,
build_interval=CONF.network.build_interval,
build_timeout=CONF.network.build_timeout,
**default_params)
self.subnets = subnets_client.SubnetsClient(
_auth,
CONF.network.catalog_type,
@ -739,7 +748,7 @@ def destroy_images(images):
def _get_router_namespace(client, network):
network_id = _get_resource_by_name(client.networks,
'networks', network)['id']
n_body = client.networks.list_routers()
n_body = client.routers.list_routers()
for router in n_body['routers']:
router_id = router['id']
r_body = client.networks.list_router_interfaces(router_id)
@ -824,7 +833,7 @@ def create_routers(routers):
client = client_for_user(router['owner'])
# only create a router if the name isn't here
body = client.networks.list_routers()
body = client.routers.list_routers()
if any(item['name'] == router['name'] for item in body['routers']):
LOG.warning("Duplicated router name: %s" % router['name'])
continue
@ -841,9 +850,9 @@ def destroy_routers(routers):
for subnet in router['subnet']:
subnet_id = _get_resource_by_name(client.networks,
'subnets', subnet)['id']
client.networks.remove_router_interface(router_id,
subnet_id=subnet_id)
client.networks.delete_router(router_id)
client.routers.remove_router_interface(router_id,
subnet_id=subnet_id)
client.routers.delete_router(router_id)
def add_router_interface(routers):
@ -856,13 +865,13 @@ def add_router_interface(routers):
subnet_id = _get_resource_by_name(client.networks,
'subnets', subnet)['id']
# connect routers to their subnets
client.networks.add_router_interface(router_id,
subnet_id=subnet_id)
client.routers.add_router_interface(router_id,
subnet_id=subnet_id)
# connect routers to external network if set to "gateway"
if router['gateway']:
if CONF.network.public_network_id:
ext_net = CONF.network.public_network_id
client.networks._update_router(
client.routers._update_router(
router_id, set_enable_snat=True,
external_gateway_info={"network_id": ext_net})
else:

View File

@ -64,6 +64,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
self.domains_admin_client,
self.network_admin_client,
self.networks_admin_client,
self.routers_admin_client,
self.subnets_admin_client,
self.ports_admin_client,
self.security_groups_admin_client) = self._get_admin_clients()
@ -93,13 +94,14 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
if self.identity_version == 'v2':
return (os.identity_client, os.tenants_client, os.users_client,
os.roles_client, None, os.network_client,
os.networks_client, os.subnets_client, os.ports_client,
os.security_groups_client)
os.networks_client, os.routers_client, os.subnets_client,
os.ports_client, os.security_groups_client)
else:
return (os.identity_v3_client, os.projects_client,
os.users_v3_client, os.roles_v3_client, os.domains_client,
os.network_client, os.networks_client, os.subnets_client,
os.ports_client, os.security_groups_client)
os.network_client, os.networks_client, os.routers_client,
os.subnets_client, os.ports_client,
os.security_groups_client)
def _create_creds(self, suffix="", admin=False, roles=None):
"""Create random credentials under the following schema.
@ -237,14 +239,14 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
def _create_router(self, router_name, tenant_id):
external_net_id = dict(
network_id=CONF.network.public_network_id)
resp_body = self.network_admin_client.create_router(
resp_body = self.routers_admin_client.create_router(
router_name,
external_gateway_info=external_net_id,
tenant_id=tenant_id)
return resp_body['router']
def _add_router_interface(self, router_id, subnet_id):
self.network_admin_client.add_router_interface(router_id,
self.routers_admin_client.add_router_interface(router_id,
subnet_id=subnet_id)
def get_credentials(self, credential_type):
@ -295,9 +297,9 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
return self.get_credentials(roles)
def _clear_isolated_router(self, router_id, router_name):
net_client = self.network_admin_client
client = self.routers_admin_client
try:
net_client.delete_router(router_id)
client.delete_router(router_id)
except lib_exc.NotFound:
LOG.warning('router with name: %s not found for delete' %
router_name)
@ -331,7 +333,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
(secgroup['name'], secgroup['id']))
def _clear_isolated_net_resources(self):
net_client = self.network_admin_client
client = self.routers_admin_client
for cred in self._creds:
creds = self._creds.get(cred)
if (not creds or not any([creds.router, creds.network,
@ -344,7 +346,7 @@ class DynamicCredentialProvider(cred_provider.CredentialProvider):
if (not self.network_resources or
(self.network_resources.get('router') and creds.subnet)):
try:
net_client.remove_router_interface(
client.remove_router_interface(
creds.router['id'],
subnet_id=creds.subnet['id'])
except lib_exc.NotFound:

View File

@ -5,3 +5,4 @@
./tempest/services/volume/base/base_backups_client.py
./tempest/services/baremetal/base.py
./tempest/services/network/json/network_client.py
./tempest/services/network/json/routers_client.py

View File

@ -66,6 +66,7 @@ class ScenarioTest(tempest.test.BaseTestCase):
cls.network_client = cls.manager.network_client
cls.networks_client = cls.manager.networks_client
cls.ports_client = cls.manager.ports_client
cls.routers_client = cls.manager.routers_client
cls.subnets_client = cls.manager.subnets_client
cls.floating_ips_client = cls.manager.floating_ips_client
cls.security_groups_client = cls.manager.security_groups_client
@ -690,17 +691,21 @@ class NetworkScenarioTest(ScenarioTest):
cls.tenant_id = cls.manager.identity_client.tenant_id
def _create_network(self, client=None, networks_client=None,
tenant_id=None, namestart='network-smoke-'):
routers_client=None, tenant_id=None,
namestart='network-smoke-'):
if not client:
client = self.network_client
if not networks_client:
networks_client = self.networks_client
if not routers_client:
routers_client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
name = data_utils.rand_name(namestart)
result = networks_client.create_network(name=name, tenant_id=tenant_id)
network = net_resources.DeletableNetwork(
networks_client=networks_client, **result['network'])
networks_client=networks_client, routers_client=routers_client,
**result['network'])
self.assertEqual(network.name, name)
self.addCleanup(self.delete_wrapper, network.delete)
return network
@ -719,7 +724,7 @@ class NetworkScenarioTest(ScenarioTest):
def _list_routers(self, *args, **kwargs):
"""List routers using admin creds """
routers_list = self.admin_manager.network_client.list_routers(
routers_list = self.admin_manager.routers_client.list_routers(
*args, **kwargs)
return routers_list['routers']
@ -736,7 +741,8 @@ class NetworkScenarioTest(ScenarioTest):
return agents_list['agents']
def _create_subnet(self, network, client=None, subnets_client=None,
namestart='subnet-smoke', **kwargs):
routers_client=None, namestart='subnet-smoke',
**kwargs):
"""Create a subnet for the given network
within the cidr block configured for tenant networks.
@ -745,6 +751,8 @@ class NetworkScenarioTest(ScenarioTest):
client = self.network_client
if not subnets_client:
subnets_client = self.subnets_client
if not routers_client:
routers_client = self.routers_client
def cidr_in_use(cidr, tenant_id):
"""Check cidr existence
@ -792,7 +800,7 @@ class NetworkScenarioTest(ScenarioTest):
self.assertIsNotNone(result, 'Unable to allocate tenant network')
subnet = net_resources.DeletableSubnet(
network_client=client, subnets_client=subnets_client,
**result['subnet'])
routers_client=routers_client, **result['subnet'])
self.assertEqual(subnet.cidr, str_cidr)
self.addCleanup(self.delete_wrapper, subnet.delete)
return subnet
@ -992,7 +1000,7 @@ class NetworkScenarioTest(ScenarioTest):
sg_dict['tenant_id'] = tenant_id
result = client.create_security_group(**sg_dict)
secgroup = net_resources.DeletableSecurityGroup(
client=client,
client=client, routers_client=self.routers_client,
**result['security_group']
)
self.assertEqual(secgroup.name, sg_name)
@ -1128,7 +1136,7 @@ class NetworkScenarioTest(ScenarioTest):
routes traffic to the public network.
"""
if not client:
client = self.network_client
client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
router_id = CONF.network.public_router_id
@ -1147,14 +1155,14 @@ class NetworkScenarioTest(ScenarioTest):
def _create_router(self, client=None, tenant_id=None,
namestart='router-smoke'):
if not client:
client = self.network_client
client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
name = data_utils.rand_name(namestart)
result = client.create_router(name=name,
admin_state_up=True,
tenant_id=tenant_id)
router = net_resources.DeletableRouter(client=client,
router = net_resources.DeletableRouter(routers_client=client,
**result['router'])
self.assertEqual(router.name, name)
self.addCleanup(self.delete_wrapper, router.delete)
@ -1165,8 +1173,8 @@ class NetworkScenarioTest(ScenarioTest):
self.assertEqual(admin_state_up, router.admin_state_up)
def create_networks(self, client=None, networks_client=None,
subnets_client=None, tenant_id=None,
dns_nameservers=None):
routers_client=None, subnets_client=None,
tenant_id=None, dns_nameservers=None):
"""Create a network with a subnet connected to a router.
The baremetal driver is a special case since all nodes are
@ -1194,10 +1202,12 @@ class NetworkScenarioTest(ScenarioTest):
network = self._create_network(
client=client, networks_client=networks_client,
tenant_id=tenant_id)
router = self._get_router(client=client, tenant_id=tenant_id)
router = self._get_router(client=routers_client,
tenant_id=tenant_id)
subnet_kwargs = dict(network=network, client=client,
subnets_client=subnets_client)
subnets_client=subnets_client,
routers_client=routers_client)
# use explicit check because empty list is a valid option
if dns_nameservers is not None:
subnet_kwargs['dns_nameservers'] = dns_nameservers

View File

@ -680,7 +680,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
# TODO(yfried): refactor this test to be used for other agents (dhcp)
# as well
list_hosts = (self.admin_manager.network_client.
list_hosts = (self.admin_manager.routers_client.
list_l3_agents_hosting_router)
schedule_router = (self.admin_manager.network_agents_client.
create_router_on_l3_agent)
@ -693,7 +693,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
# NOTE(kevinbenton): we have to use the admin credentials to check
# for the distributed flag because self.router only has a tenant view.
admin = self.admin_manager.network_client.show_router(self.router.id)
admin = self.admin_manager.routers_client.show_router(self.router.id)
if admin['router'].get('distributed', False):
msg = "Rescheduling test does not apply to distributed routers."
raise self.skipException(msg)

View File

@ -323,6 +323,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
network, subnet, router = self.create_networks(
client=tenant.manager.network_client,
networks_client=tenant.manager.networks_client,
routers_client=tenant.manager.routers_client,
subnets_client=tenant.manager.subnets_client)
tenant.set_network(network, subnet, router)

View File

@ -93,110 +93,10 @@ class NetworkClient(base.BaseNetworkClient):
message = '(%s) %s' % (caller, message)
raise exceptions.TimeoutException(message)
def create_router(self, name, admin_state_up=True, **kwargs):
post_body = {'router': kwargs}
post_body['router']['name'] = name
post_body['router']['admin_state_up'] = admin_state_up
uri = '/routers'
return self.create_resource(uri, post_body)
def _update_router(self, router_id, set_enable_snat, **kwargs):
uri = '/routers/%s' % router_id
body = self.show_resource(uri)
update_body = {}
update_body['name'] = kwargs.get('name', body['router']['name'])
update_body['admin_state_up'] = kwargs.get(
'admin_state_up', body['router']['admin_state_up'])
cur_gw_info = body['router']['external_gateway_info']
if cur_gw_info:
# TODO(kevinbenton): setting the external gateway info is not
# allowed for a regular tenant. If the ability to update is also
# merged, a test case for this will need to be added similar to
# the SNAT case.
cur_gw_info.pop('external_fixed_ips', None)
if not set_enable_snat:
cur_gw_info.pop('enable_snat', None)
update_body['external_gateway_info'] = kwargs.get(
'external_gateway_info', body['router']['external_gateway_info'])
if 'distributed' in kwargs:
update_body['distributed'] = kwargs['distributed']
update_body = dict(router=update_body)
return self.update_resource(uri, update_body)
def update_router(self, router_id, **kwargs):
"""Update a router leaving enable_snat to its default value."""
# If external_gateway_info contains enable_snat the request will fail
# with 404 unless executed with admin client, and therefore we instruct
# _update_router to not set this attribute
# NOTE(salv-orlando): The above applies as long as Neutron's default
# policy is to restrict enable_snat usage to admins only.
return self._update_router(router_id, set_enable_snat=False, **kwargs)
def show_router(self, router_id, **fields):
uri = '/routers/%s' % router_id
return self.show_resource(uri, **fields)
def delete_router(self, router_id):
uri = '/routers/%s' % router_id
return self.delete_resource(uri)
def list_routers(self, **filters):
uri = '/routers'
return self.list_resources(uri, **filters)
def update_router_with_snat_gw_info(self, router_id, **kwargs):
"""Update a router passing also the enable_snat attribute.
This method must be execute with admin credentials, otherwise the API
call will return a 404 error.
"""
return self._update_router(router_id, set_enable_snat=True, **kwargs)
def add_router_interface(self, router_id, **kwargs):
"""Add router interface.
Available params: see http://developer.openstack.org/
api-ref-networking-v2-ext.html#addRouterInterface
"""
uri = '/routers/%s/add_router_interface' % router_id
return self.update_resource(uri, kwargs)
def remove_router_interface(self, router_id, **kwargs):
"""Remove router interface.
Available params: see http://developer.openstack.org/
api-ref-networking-v2-ext.html#removeRouterInterface
"""
uri = '/routers/%s/remove_router_interface' % router_id
return self.update_resource(uri, kwargs)
def list_router_interfaces(self, uuid):
uri = '/ports?device_id=%s' % uuid
return self.list_resources(uri)
def list_l3_agents_hosting_router(self, router_id):
uri = '/routers/%s/l3-agents' % router_id
return self.list_resources(uri)
def list_dhcp_agent_hosting_network(self, network_id):
uri = '/networks/%s/dhcp-agents' % network_id
return self.list_resources(uri)
def update_extra_routes(self, router_id, **kwargs):
"""Update Extra routes.
Available params: see http://developer.openstack.org/
api-ref-networking-v2-ext.html#updateExtraRoutes
"""
uri = '/routers/%s' % router_id
put_body = {'router': kwargs}
return self.update_resource(uri, put_body)
def delete_extra_routes(self, router_id):
uri = '/routers/%s' % router_id
put_body = {
'router': {
'routes': None
}
}
return self.update_resource(uri, put_body)

View File

@ -0,0 +1,116 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.services.network.json import base
class RoutersClient(base.BaseNetworkClient):
def create_router(self, name, admin_state_up=True, **kwargs):
post_body = {'router': kwargs}
post_body['router']['name'] = name
post_body['router']['admin_state_up'] = admin_state_up
uri = '/routers'
return self.create_resource(uri, post_body)
def _update_router(self, router_id, set_enable_snat, **kwargs):
uri = '/routers/%s' % router_id
body = self.show_resource(uri)
update_body = {}
update_body['name'] = kwargs.get('name', body['router']['name'])
update_body['admin_state_up'] = kwargs.get(
'admin_state_up', body['router']['admin_state_up'])
cur_gw_info = body['router']['external_gateway_info']
if cur_gw_info:
# TODO(kevinbenton): setting the external gateway info is not
# allowed for a regular tenant. If the ability to update is also
# merged, a test case for this will need to be added similar to
# the SNAT case.
cur_gw_info.pop('external_fixed_ips', None)
if not set_enable_snat:
cur_gw_info.pop('enable_snat', None)
update_body['external_gateway_info'] = kwargs.get(
'external_gateway_info', body['router']['external_gateway_info'])
if 'distributed' in kwargs:
update_body['distributed'] = kwargs['distributed']
update_body = dict(router=update_body)
return self.update_resource(uri, update_body)
def update_router(self, router_id, **kwargs):
"""Update a router leaving enable_snat to its default value."""
# If external_gateway_info contains enable_snat the request will fail
# with 404 unless executed with admin client, and therefore we instruct
# _update_router to not set this attribute
# NOTE(salv-orlando): The above applies as long as Neutron's default
# policy is to restrict enable_snat usage to admins only.
return self._update_router(router_id, set_enable_snat=False, **kwargs)
def show_router(self, router_id, **fields):
uri = '/routers/%s' % router_id
return self.show_resource(uri, **fields)
def delete_router(self, router_id):
uri = '/routers/%s' % router_id
return self.delete_resource(uri)
def list_routers(self, **filters):
uri = '/routers'
return self.list_resources(uri, **filters)
def update_extra_routes(self, router_id, **kwargs):
"""Update Extra routes.
Available params: see http://developer.openstack.org/
api-ref-networking-v2-ext.html#updateExtraRoutes
"""
uri = '/routers/%s' % router_id
put_body = {'router': kwargs}
return self.update_resource(uri, put_body)
def delete_extra_routes(self, router_id):
uri = '/routers/%s' % router_id
put_body = {
'router': {
'routes': None
}
}
return self.update_resource(uri, put_body)
def update_router_with_snat_gw_info(self, router_id, **kwargs):
"""Update a router passing also the enable_snat attribute.
This method must be execute with admin credentials, otherwise the API
call will return a 404 error.
"""
return self._update_router(router_id, set_enable_snat=True, **kwargs)
def add_router_interface(self, router_id, **kwargs):
"""Add router interface.
Available params: see http://developer.openstack.org/
api-ref-networking-v2-ext.html#addRouterInterface
"""
uri = '/routers/%s/add_router_interface' % router_id
return self.update_resource(uri, kwargs)
def remove_router_interface(self, router_id, **kwargs):
"""Remove router interface.
Available params: see http://developer.openstack.org/
api-ref-networking-v2-ext.html#removeRouterInterface
"""
uri = '/routers/%s/remove_router_interface' % router_id
return self.update_resource(uri, kwargs)
def list_l3_agents_hosting_router(self, router_id):
uri = '/routers/%s/l3-agents' % router_id
return self.list_resources(uri)

View File

@ -39,6 +39,7 @@ class DeletableResource(AttributeDict):
self.client = kwargs.pop('client', None)
self.network_client = kwargs.pop('network_client', None)
self.networks_client = kwargs.pop('networks_client', None)
self.routers_client = kwargs.pop('routers_client', None)
self.subnets_client = kwargs.pop('subnets_client', None)
self.ports_client = kwargs.pop('ports_client', None)
super(DeletableResource, self).__init__(*args, **kwargs)
@ -89,12 +90,12 @@ class DeletableSubnet(DeletableResource):
def add_to_router(self, router_id):
self._router_ids.add(router_id)
self.network_client.add_router_interface(router_id,
self.routers_client.add_router_interface(router_id,
subnet_id=self.id)
def delete(self):
for router_id in self._router_ids.copy():
self.network_client.remove_router_interface(router_id,
self.routers_client.remove_router_interface(router_id,
subnet_id=self.id)
self._router_ids.remove(router_id)
self.subnets_client.delete_subnet(self.id)
@ -109,14 +110,14 @@ class DeletableRouter(DeletableResource):
return self.update(external_gateway_info=dict())
def update(self, *args, **kwargs):
result = self.client.update_router(self.id,
*args,
**kwargs)
result = self.routers_client.update_router(self.id,
*args,
**kwargs)
return super(DeletableRouter, self).update(**result['router'])
def delete(self):
self.unset_gateway()
self.client.delete_router(self.id)
self.routers_client.delete_router(self.id)
class DeletableFloatingIp(DeletableResource):

View File

@ -250,7 +250,7 @@ class TestCreateResources(JavelinUnitTest):
def test_create_router(self):
self.fake_client.networks.list_routers.return_value = {'routers': []}
self.fake_client.routers.list_routers.return_value = {'routers': []}
self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",
return_value=self.fake_client))
@ -260,7 +260,7 @@ class TestCreateResources(JavelinUnitTest):
mocked_function.assert_called_once_with(self.fake_object['name'])
def test_create_router_existing(self):
self.fake_client.networks.list_routers.return_value = {
self.fake_client.routers.list_routers.return_value = {
'routers': [self.fake_object]}
self.useFixture(mockpatch.PatchObject(javelin, "client_for_user",
return_value=self.fake_client))
@ -405,7 +405,7 @@ class TestDestroyResources(JavelinUnitTest):
javelin.destroy_routers([self.fake_object])
mocked_function = self.fake_client.networks.delete_router
mocked_function = self.fake_client.routers.delete_router
mocked_function.assert_called_once_with(
self.fake_object['router_id'])

View File

@ -31,6 +31,7 @@ from tempest.services.identity.v2.json import tenants_client as \
from tempest.services.identity.v2.json import users_client as \
json_users_client
from tempest.services.network.json import network_client as json_network_client
from tempest.services.network.json import routers_client
from tempest.tests import base
from tempest.tests import fake_config
from tempest.tests import fake_http
@ -154,7 +155,7 @@ class TestDynamicCredentialProvider(base.TestCase):
def _mock_router_create(self, id, name):
router_fix = self.useFixture(mockpatch.PatchObject(
json_network_client.NetworkClient,
routers_client.RoutersClient,
'create_router',
return_value={'router': {'id': id, 'name': name}}))
return router_fix
@ -295,7 +296,7 @@ class TestDynamicCredentialProvider(base.TestCase):
subnet = mock.patch.object(creds.subnets_admin_client,
'delete_subnet')
subnet_mock = subnet.start()
router = mock.patch.object(creds.network_admin_client,
router = mock.patch.object(creds.routers_admin_client,
'delete_router')
router_mock = router.start()
@ -321,7 +322,7 @@ class TestDynamicCredentialProvider(base.TestCase):
self._mock_subnet_create(creds, '1234', 'fake_subnet')
self._mock_router_create('1234', 'fake_router')
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClient.'
'tempest.services.network.json.routers_client.RoutersClient.'
'add_router_interface')
primary_creds = creds.get_primary_creds()
router_interface_mock.assert_called_once_with('1234', subnet_id='1234')
@ -353,7 +354,7 @@ class TestDynamicCredentialProvider(base.TestCase):
self._mock_subnet_create(creds, '1234', 'fake_subnet')
self._mock_router_create('1234', 'fake_router')
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClient.'
'tempest.services.network.json.routers_client.RoutersClient.'
'add_router_interface')
creds.get_primary_creds()
router_interface_mock.assert_called_once_with('1234', subnet_id='1234')
@ -386,11 +387,11 @@ class TestDynamicCredentialProvider(base.TestCase):
subnet = mock.patch.object(creds.subnets_admin_client,
'delete_subnet')
subnet_mock = subnet.start()
router = mock.patch.object(creds.network_admin_client,
router = mock.patch.object(creds.routers_admin_client,
'delete_router')
router_mock = router.start()
remove_router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClient.'
'tempest.services.network.json.routers_client.RoutersClient.'
'remove_router_interface')
return_values = ({'status': 200}, {'ports': []})
port_list_mock = mock.patch.object(creds.ports_admin_client,
@ -461,7 +462,7 @@ class TestDynamicCredentialProvider(base.TestCase):
self._mock_subnet_create(creds, '1234', 'fake_alt_subnet')
self._mock_router_create('1234', 'fake_alt_router')
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClient.'
'tempest.services.network.json.routers_client.RoutersClient.'
'add_router_interface')
alt_creds = creds.get_alt_creds()
router_interface_mock.assert_called_once_with('1234', subnet_id='1234')
@ -485,7 +486,7 @@ class TestDynamicCredentialProvider(base.TestCase):
self._mock_subnet_create(creds, '1234', 'fake_admin_subnet')
self._mock_router_create('1234', 'fake_admin_router')
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClient.'
'tempest.services.network.json.routers_client.RoutersClient.'
'add_router_interface')
self._mock_list_roles('123456', 'admin')
admin_creds = creds.get_admin_creds()
@ -521,7 +522,7 @@ class TestDynamicCredentialProvider(base.TestCase):
subnet = mock.patch.object(creds.subnets_admin_client,
'delete_subnet')
subnet_mock = subnet.start()
router = mock.patch.object(creds.network_admin_client,
router = mock.patch.object(creds.routers_admin_client,
'delete_router')
router_mock = router.start()