Stop memoizing on request argument in neutron api

neutron api should memoize more persistanly on objects that do not
change when rendering each view, i.e. Supported extensions list

Implements blueprint: improve-horizon-caching

Change-Id: Iec38e865a704752309ba7035d19370dc0dbc31e0
This commit is contained in:
Luis Daniel Castellanos 2016-05-12 16:21:52 -05:00
parent 5401d9245a
commit 989607e4fb
5 changed files with 101 additions and 61 deletions

View File

@ -1178,9 +1178,9 @@ def _server_get_addresses(request, server, ports, floating_ips, network_names):
def list_extensions(request):
extensions_list = neutronclient(request).list_extensions()
if 'extensions' in extensions_list:
return extensions_list['extensions']
return tuple(extensions_list['extensions'])
else:
return {}
return ()
@memoized

View File

@ -182,7 +182,7 @@ class AggregatesViewTests(test.BaseAdminViewTests):
'availability_zone_list',
'tenant_absolute_limits',),
api.cinder: ('tenant_absolute_limits',),
api.neutron: ('list_extensions',),
api.neutron: ('is_extension_supported',),
api.network: ('tenant_floating_ip_list',
'security_group_list'),
api.keystone: ('tenant_list',)})
@ -191,8 +191,9 @@ class AggregatesViewTests(test.BaseAdminViewTests):
MultipleTimes().AndReturn(self.limits['absolute'])
api.cinder.tenant_absolute_limits(IsA(http.HttpRequest)). \
MultipleTimes().AndReturn(self.cinder_limits['absolute'])
api.neutron.list_extensions(IsA(http.HttpRequest)). \
AndReturn(self.api_extensions.list())
api.neutron.\
is_extension_supported(IsA(http.HttpRequest), 'security-group'). \
MultipleTimes().AndReturn(True)
api.network.tenant_floating_ip_list(IsA(http.HttpRequest)) \
.AndReturn(self.floating_ips.list())
api.network.security_group_list(IsA(http.HttpRequest)) \

View File

@ -361,20 +361,19 @@ class NetworkTests(test.BaseAdminViewTests):
self.assertItemsEqual(subnets, [self.subnets.first()])
@test.create_stubs({api.neutron: ('profile_list',
'list_extensions',),
'is_extension_supported',),
api.keystone: ('tenant_list',)})
def test_network_create_get(self,
test_with_profile=False):
tenants = self.tenants.list()
extensions = self.api_extensions.list()
api.keystone.tenant_list(IsA(
http.HttpRequest)).AndReturn([tenants, False])
if test_with_profile:
net_profiles = self.net_profiles.list()
api.neutron.profile_list(IsA(http.HttpRequest),
'network').AndReturn(net_profiles)
api.neutron.list_extensions(
IsA(http.HttpRequest)).AndReturn(extensions)
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
AndReturn(True)
self.mox.ReplayAll()
url = reverse('horizon:admin:networks:create')
@ -389,14 +388,14 @@ class NetworkTests(test.BaseAdminViewTests):
@test.create_stubs({api.neutron: ('network_create',
'profile_list',
'list_extensions',),
'is_extension_supported',),
api.keystone: ('tenant_list',)})
def test_network_create_post(self,
test_with_profile=False):
tenants = self.tenants.list()
tenant_id = self.tenants.first().id
network = self.networks.first()
extensions = self.api_extensions.list()
api.keystone.tenant_list(IsA(http.HttpRequest))\
.AndReturn([tenants, False])
params = {'name': network.name,
@ -411,8 +410,8 @@ class NetworkTests(test.BaseAdminViewTests):
api.neutron.profile_list(IsA(http.HttpRequest),
'network').AndReturn(net_profiles)
params['net_profile_id'] = net_profile_id
api.neutron.list_extensions(
IsA(http.HttpRequest)).AndReturn(extensions)
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
MultipleTimes().AndReturn(True)
api.neutron.network_create(IsA(http.HttpRequest), **params)\
.AndReturn(network)
self.mox.ReplayAll()
@ -438,14 +437,14 @@ class NetworkTests(test.BaseAdminViewTests):
@test.create_stubs({api.neutron: ('network_create',
'profile_list',
'list_extensions',),
'is_extension_supported',),
api.keystone: ('tenant_list',)})
def test_network_create_post_network_exception(self,
test_with_profile=False):
tenants = self.tenants.list()
tenant_id = self.tenants.first().id
network = self.networks.first()
extensions = self.api_extensions.list()
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
False])
params = {'name': network.name,
@ -460,8 +459,8 @@ class NetworkTests(test.BaseAdminViewTests):
api.neutron.profile_list(IsA(http.HttpRequest),
'network').AndReturn(net_profiles)
params['net_profile_id'] = net_profile_id
api.neutron.list_extensions(
IsA(http.HttpRequest)).AndReturn(extensions)
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
MultipleTimes().AndReturn(True)
api.neutron.network_create(IsA(http.HttpRequest),
**params).AndRaise(self.exceptions.neutron)
self.mox.ReplayAll()
@ -486,18 +485,18 @@ class NetworkTests(test.BaseAdminViewTests):
self.test_network_create_post_network_exception(
test_with_profile=True)
@test.create_stubs({api.neutron: ('list_extensions',),
@test.create_stubs({api.neutron: ('is_extension_supported',),
api.keystone: ('tenant_list',)})
def test_network_create_vlan_segmentation_id_invalid(self):
tenants = self.tenants.list()
tenant_id = self.tenants.first().id
network = self.networks.first()
extensions = self.api_extensions.list()
api.keystone.tenant_list(
IsA(http.HttpRequest)
).MultipleTimes().AndReturn([tenants, False])
api.neutron.list_extensions(
IsA(http.HttpRequest)).AndReturn(extensions)
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider')\
.MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
form_data = {'tenant_id': tenant_id,
@ -514,20 +513,19 @@ class NetworkTests(test.BaseAdminViewTests):
self.assertFormErrors(res, 1)
self.assertContains(res, "1 through 4094")
@test.create_stubs({api.neutron: ('list_extensions',),
@test.create_stubs({api.neutron: ('is_extension_supported',),
api.keystone: ('tenant_list',)})
def test_network_create_gre_segmentation_id_invalid(self):
tenants = self.tenants.list()
tenant_id = self.tenants.first().id
network = self.networks.first()
extensions = self.api_extensions.list()
api.keystone.tenant_list(
IsA(http.HttpRequest)
).MultipleTimes().AndReturn([tenants, False])
api.neutron.list_extensions(
IsA(http.HttpRequest)).AndReturn(extensions)
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
form_data = {'tenant_id': tenant_id,
@ -544,7 +542,7 @@ class NetworkTests(test.BaseAdminViewTests):
self.assertFormErrors(res, 1)
self.assertContains(res, "1 through %s" % ((2 ** 32) - 1))
@test.create_stubs({api.neutron: ('list_extensions',),
@test.create_stubs({api.neutron: ('is_extension_supported',),
api.keystone: ('tenant_list',)})
@test.update_settings(
OPENSTACK_NEUTRON_NETWORK={
@ -553,13 +551,13 @@ class NetworkTests(test.BaseAdminViewTests):
tenants = self.tenants.list()
tenant_id = self.tenants.first().id
network = self.networks.first()
extensions = self.api_extensions.list()
api.keystone.tenant_list(
IsA(http.HttpRequest)
).MultipleTimes().AndReturn([tenants, False])
api.neutron.list_extensions(
IsA(http.HttpRequest)).AndReturn(extensions)
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider')\
.MultipleTimes().AndReturn(True)
self.mox.ReplayAll()
form_data = {'tenant_id': tenant_id,
@ -576,18 +574,18 @@ class NetworkTests(test.BaseAdminViewTests):
self.assertFormErrors(res, 1)
self.assertContains(res, "10 through 20")
@test.create_stubs({api.neutron: ('list_extensions',),
@test.create_stubs({api.neutron: ('is_extension_supported',),
api.keystone: ('tenant_list',)})
@test.update_settings(
OPENSTACK_NEUTRON_NETWORK={
'supported_provider_types': []})
def test_network_create_no_provider_types(self):
tenants = self.tenants.list()
extensions = self.api_extensions.list()
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
False])
api.neutron.list_extensions(
IsA(http.HttpRequest)).AndReturn(extensions)
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
AndReturn(True)
self.mox.ReplayAll()
url = reverse('horizon:admin:networks:create')
@ -599,18 +597,17 @@ class NetworkTests(test.BaseAdminViewTests):
'<input type="hidden" name="network_type" id="id_network_type" />',
html=True)
@test.create_stubs({api.neutron: ('list_extensions',),
@test.create_stubs({api.neutron: ('is_extension_supported',),
api.keystone: ('tenant_list',)})
@test.update_settings(
OPENSTACK_NEUTRON_NETWORK={
'supported_provider_types': ['local', 'flat', 'gre']})
def test_network_create_unsupported_provider_types(self):
tenants = self.tenants.list()
extensions = self.api_extensions.list()
api.keystone.tenant_list(IsA(http.HttpRequest)).AndReturn([tenants,
False])
api.neutron.list_extensions(
IsA(http.HttpRequest)).AndReturn(extensions)
api.neutron.is_extension_supported(IsA(http.HttpRequest), 'provider').\
AndReturn(True)
self.mox.ReplayAll()
url = reverse('horizon:admin:networks:create')

View File

@ -40,6 +40,7 @@ class NetworkClientTestCase(test.APITestCase):
self.assertIsInstance(nc.floating_ips, api.nova.FloatingIpManager)
self.assertIsInstance(nc.secgroups, api.nova.SecurityGroupManager)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_networkclient_neutron(self):
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
@ -47,22 +48,24 @@ class NetworkClientTestCase(test.APITestCase):
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
.AndReturn(True)
self.neutronclient = self.stub_neutronclient()
self.neutronclient.list_extensions() \
.AndReturn({'extensions': self.api_extensions.list()})
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group').AndReturn(True)
self.mox.ReplayAll()
nc = api.network.NetworkClient(self.request)
self.assertIsInstance(nc.floating_ips, api.neutron.FloatingIpManager)
self.assertIsInstance(nc.secgroups, api.neutron.SecurityGroupManager)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_networkclient_neutron_with_nova_security_group(self):
self.mox.StubOutWithMock(api.base, 'is_service_enabled')
api.base.is_service_enabled(IsA(http.HttpRequest), 'network') \
.AndReturn(True)
api.neutron.is_extension_supported(IsA(http.HttpRequest),
'security-group').AndReturn(False)
api.base.is_service_enabled(IsA(http.HttpRequest), 'compute') \
.AndReturn(True)
self.neutronclient = self.stub_neutronclient()
self.neutronclient.list_extensions().AndReturn({'extensions': []})
self.mox.ReplayAll()
nc = api.network.NetworkClient(self.request)
@ -382,9 +385,12 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
for (exprule, retrule) in six.moves.zip(exp_rules, ret_sg.rules):
self._cmp_sg_rule(exprule, retrule)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_list(self):
sgs = self.api_q_secgroups.list()
tenant_id = self.request.user.tenant_id
# api.neutron.is_extension_supported(self.request, 'security-group').\
# AndReturn(True)
# use deepcopy to ensure self.api_q_secgroups is not modified.
self.qclient.list_security_groups(tenant_id=tenant_id) \
.AndReturn({'security_groups': copy.deepcopy(sgs)})
@ -395,6 +401,7 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
for (exp, ret) in six.moves.zip(sgs, rets):
self._cmp_sg(exp, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_get(self):
secgroup = self.api_q_secgroups.first()
sg_ids = set([secgroup['id']] +
@ -403,6 +410,8 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
if rule['remote_group_id']])
related_sgs = [sg for sg in self.api_q_secgroups.list()
if sg['id'] in sg_ids]
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
# use deepcopy to ensure self.api_q_secgroups is not modified.
self.qclient.show_security_group(secgroup['id']) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
@ -412,12 +421,15 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
ret = api.network.security_group_get(self.request, secgroup['id'])
self._cmp_sg(secgroup, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_create(self):
secgroup = self.api_q_secgroups.list()[1]
body = {'security_group':
{'name': secgroup['name'],
'description': secgroup['description'],
'tenant_id': self.request.user.project_id}}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.create_security_group(body) \
.AndReturn({'security_group': copy.deepcopy(secgroup)})
self.mox.ReplayAll()
@ -425,6 +437,7 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
secgroup['description'])
self._cmp_sg(secgroup, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_update(self):
secgroup = self.api_q_secgroups.list()[1]
secgroup = copy.deepcopy(secgroup)
@ -433,6 +446,8 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
body = {'security_group':
{'name': secgroup['name'],
'description': secgroup['description']}}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.update_security_group(secgroup['id'], body) \
.AndReturn({'security_group': secgroup})
self.mox.ReplayAll()
@ -442,12 +457,16 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
secgroup['description'])
self._cmp_sg(secgroup, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_delete(self):
secgroup = self.api_q_secgroups.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_security_group(secgroup['id'])
self.mox.ReplayAll()
api.network.security_group_delete(self.request, secgroup['id'])
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_rule_create(self):
sg_rule = [r for r in self.api_q_secgroup_rules.list()
if r['protocol'] == 'tcp' and r['remote_ip_prefix']][0]
@ -459,6 +478,8 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
del post_rule['id']
del post_rule['tenant_id']
post_body = {'security_group_rule': post_rule}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.create_security_group_rule(post_body) \
.AndReturn({'security_group_rule': copy.deepcopy(sg_rule)})
self.qclient.list_security_groups(id=set([sg_id]),
@ -473,8 +494,11 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
sg_rule['remote_ip_prefix'], sg_rule['remote_group_id'])
self._cmp_sg_rule(sg_rule, ret)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_security_group_rule_delete(self):
sg_rule = self.api_q_secgroup_rules.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_security_group_rule(sg_rule['id'])
self.mox.ReplayAll()
api.network.security_group_rule_delete(self.request, sg_rule['id'])
@ -495,7 +519,6 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
def test_server_security_groups(self):
cur_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
secgroups = copy.deepcopy(self.api_q_secgroups.list())
@ -509,7 +532,6 @@ class NetworkApiNeutronSecurityGroupTests(NetworkApiNeutronTestBase):
cur_sg_ids = [self.api_q_secgroups.first()['id']]
new_sg_ids = [sg['id'] for sg in self.api_q_secgroups.list()[:2]]
instance_id, instance_ports = self._get_instance(cur_sg_ids)
self.qclient.list_ports(device_id=instance_id) \
.AndReturn({'ports': instance_ports})
for p in instance_ports:
@ -544,10 +566,13 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
self.mox.ReplayAll()
self.assertFalse(api.network.floating_ip_supported(self.request))
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_pools_list(self):
search_opts = {'router:external': True}
ext_nets = [n for n in self.api_networks.list()
if n['router:external']]
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.list_networks(**search_opts) \
.AndReturn({'networks': ext_nets})
self.mox.ReplayAll()
@ -560,6 +585,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
def test_floating_ip_list(self):
fips = self.api_q_floating_ips.list()
filters = {'tenant_id': self.request.user.tenant_id}
self.qclient.list_floatingips(**filters) \
.AndReturn({'floatingips': fips})
self.qclient.list_ports(**filters) \
@ -611,6 +637,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
def _test_floating_ip_get_associated(self, assoc_port, exp_instance_type):
fip = self.api_q_floating_ips.list()[1]
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
self.qclient.show_port(assoc_port['id']) \
.AndReturn({'port': assoc_port})
@ -635,6 +662,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
def test_floating_ip_get_unassociated(self):
fip = self.api_q_floating_ips.list()[0]
self.qclient.show_floatingip(fip['id']).AndReturn({'floatingip': fip})
self.mox.ReplayAll()
@ -662,8 +690,11 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
self.assertIsNone(ret.instance_id)
self.assertIsNone(ret.instance_type)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_release(self):
fip = self.api_q_floating_ips.first()
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.delete_floatingip(fip['id'])
self.mox.ReplayAll()
@ -684,6 +715,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
def test_floating_ip_disassociate(self):
fip = self.api_q_floating_ips.list()[1]
self.qclient.update_floatingip(fip['id'],
{'floatingip': {'port_id': None}})
self.mox.ReplayAll()
@ -709,6 +741,7 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
'enable_fip_topology_check': True,
}
)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_target_list(self):
ports = self.api_ports.list()
# Port on the first subnet is connected to a router
@ -723,6 +756,10 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
(set(shared_subnet_ids) & set(self._subs_from_port(p)))))
]
filters = {'tenant_id': self.request.user.tenant_id}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
# api.neutron.is_extension_supported(self.request, 'lbaas'). \
# AndReturn(True)
self.qclient.list_ports(**filters).AndReturn({'ports': ports})
servers = self.servers.list()
novaclient = self.stub_novaclient()
@ -752,10 +789,13 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
self.assertEqual(exp[0], ret.id)
self.assertEqual(exp[1], ret.name)
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_target_get_by_instance(self):
ports = self.api_ports.list()
candidates = [p for p in ports if p['device_id'] == '1']
search_opts = {'device_id': '1'}
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.qclient.list_ports(**search_opts).AndReturn({'ports': candidates})
self.mox.ReplayAll()
@ -774,20 +814,26 @@ class NetworkApiNeutronFloatingIpTests(NetworkApiNeutronTestBase):
self.assertEqual(self._get_target_id(candidates[0]), ret[0])
self.assertEqual(len(candidates), len(ret))
# @test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_floating_ip_target_get_by_instance_with_preloaded_target(self):
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
# api.neutron.is_extension_supported(self.request, 'security-group'). \
# AndReturn(True)
self.mox.ReplayAll()
ret = api.network.floating_ip_target_get_by_instance(
self.request, 'vm2', target_list)
self.assertEqual('id21', ret)
# @test.create_stubs({api.neutron: ('is_extension_supported', )})
def test_target_floating_ip_port_by_instance_with_preloaded_target(self):
target_list = [{'name': 'name11', 'id': 'id11', 'instance_id': 'vm1'},
{'name': 'name21', 'id': 'id21', 'instance_id': 'vm2'},
{'name': 'name22', 'id': 'id22', 'instance_id': 'vm2'}]
# api.neutron.is_extension_supported(self.request, 'security-group').\
# AndReturn(True)
self.mox.ReplayAll()
ret = api.network.floating_ip_target_list_by_instance(

View File

@ -461,10 +461,12 @@ class NeutronApiTests(test.APITestCase):
api.neutron.router_remove_interface(
self.request, router_id, port_id=fake_port)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def test_is_extension_supported(self):
neutronclient = self.stub_neutronclient()
neutronclient.list_extensions().MultipleTimes() \
.AndReturn({'extensions': self.api_extensions.list()})
api.neutron.is_extension_supported(self.request, "quotas")\
.AndReturn(True)
api.neutron.is_extension_supported(self.request, "doesntexist") \
.AndReturn(False)
self.mox.ReplayAll()
self.assertTrue(
@ -524,13 +526,10 @@ class NeutronApiTests(test.APITestCase):
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_distributed_router':
True},
POLICY_CHECK_FUNCTION=None)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def _test_get_dvr_permission_dvr_supported(self, dvr_enabled):
neutronclient = self.stub_neutronclient()
extensions = self.api_extensions.list()
if not dvr_enabled:
extensions = [ext for ext in extensions if ext['alias'] != 'dvr']
neutronclient.list_extensions() \
.AndReturn({'extensions': extensions})
api.neutron.is_extension_supported(self.request, 'dvr').\
AndReturn(dvr_enabled)
self.mox.ReplayAll()
self.assertEqual(dvr_enabled,
api.neutron.get_feature_permission(self.request,
@ -545,6 +544,7 @@ class NeutronApiTests(test.APITestCase):
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_distributed_router':
True},
POLICY_CHECK_FUNCTION=policy.check)
@test.create_stubs({api.neutron: ('is_extension_supported',)})
def _test_get_dvr_permission_with_policy_check(self, policy_check_allowed,
operation):
self.mox.StubOutWithMock(policy, 'check')
@ -554,9 +554,8 @@ class NeutronApiTests(test.APITestCase):
role = (("network", "get_router:distributed"),)
policy.check(role, self.request).AndReturn(policy_check_allowed)
if policy_check_allowed:
neutronclient = self.stub_neutronclient()
neutronclient.list_extensions() \
.AndReturn({'extensions': self.api_extensions.list()})
api.neutron.is_extension_supported(self.request, 'dvr').\
AndReturn(policy_check_allowed)
self.mox.ReplayAll()
self.assertEqual(policy_check_allowed,
api.neutron.get_feature_permission(self.request,
@ -603,16 +602,13 @@ class NeutronApiTests(test.APITestCase):
@override_settings(OPENSTACK_NEUTRON_NETWORK={'enable_ha_router': True},
POLICY_CHECK_FUNCTION=policy.check)
@test.create_stubs({api.neutron: ('is_extension_supported', )})
def _test_get_router_ha_permission_with_policy_check(self, ha_enabled):
self.mox.StubOutWithMock(policy, 'check')
role = (("network", "create_router:ha"),)
policy.check(role, self.request).AndReturn(True)
neutronclient = self.stub_neutronclient()
if ha_enabled:
extensions = self.api_extensions.list()
else:
extensions = {}
neutronclient.list_extensions().AndReturn({'extensions': extensions})
api.neutron.is_extension_supported(self.request, 'l3-ha')\
.AndReturn(ha_enabled)
self.mox.ReplayAll()
self.assertEqual(ha_enabled,
api.neutron.get_feature_permission(self.request,