Removing duplicated classes from lbaas.py and vpn.py

This patch removes duplicated classes AttributeDict from
api/lbaas.py and api/vpn.py. Additional attributes are
added as needed in 'list' and 'get' methods.

Closes-Bug: #1267779
Change-Id: I47d8eb5fc9bd0767ac85a7c65d394ab2b7a2ea5d
This commit is contained in:
Tatiana Mazur 2014-01-10 22:38:14 +04:00
parent e578ec1a33
commit 5391659fdf
9 changed files with 367 additions and 398 deletions

View File

@ -16,6 +16,8 @@
from __future__ import absolute_import
from django.utils.datastructures import SortedDict
from openstack_dashboard.api import neutron
neutronclient = neutron.neutronclient
@ -36,43 +38,6 @@ class Pool(neutron.NeutronAPIDictWrapper):
apiresource['provider'] = None
super(Pool, self).__init__(apiresource)
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
def readable(self, request):
pFormatted = {'id': self.id,
'name': self.name,
'description': self.description,
'status': self.status,
'protocol': self.protocol,
'health_monitors': self.health_monitors,
'provider': self.provider}
try:
pFormatted['subnet_id'] = self.subnet_id
pFormatted['subnet_name'] = neutron.subnet_get(
request, self.subnet_id).cidr
except Exception:
pFormatted['subnet_id'] = self.subnet_id
pFormatted['subnet_name'] = self.subnet_id
if self.vip_id is not None:
try:
pFormatted['vip_id'] = self.vip_id
pFormatted['vip_name'] = vip_get(
request, self.vip_id).name
except Exception:
pFormatted['vip_id'] = self.vip_id
pFormatted['vip_name'] = self.vip_id
else:
pFormatted['vip_id'] = None
pFormatted['vip_name'] = None
return self.AttributeDict(pFormatted)
class Member(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron load balancer member."""
@ -80,28 +45,6 @@ class Member(neutron.NeutronAPIDictWrapper):
def __init__(self, apiresource):
super(Member, self).__init__(apiresource)
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
def readable(self, request):
mFormatted = {'id': self.id,
'status': self.status,
'address': self.address,
'protocol_port': self.protocol_port}
try:
mFormatted['pool_id'] = self.pool_id
mFormatted['pool_name'] = pool_get(
request, self.pool_id).name
except Exception:
mFormatted['pool_id'] = self.pool_id
mFormatted['pool_name'] = self.pool_id
return self.AttributeDict(mFormatted)
class PoolStats(neutron.NeutronAPIDictWrapper):
"""Wrapper for neutron load balancer pool stats."""
@ -189,13 +132,49 @@ def pool_create(request, **kwargs):
return Pool(pool)
def _get_vip_name(request, pool, vip_dict):
if pool['vip_id'] is not None:
try:
if vip_dict:
return vip_dict.get(pool['vip_id']).name
else:
return vip_get(request, pool['vip_id']).name
except Exception:
return pool['vip_id']
else:
return None
def pool_list(request, **kwargs):
return _pool_list(request, expand_subnet=True, expand_vip=True, **kwargs)
def _pool_list(request, expand_subnet=False, expand_vip=False, **kwargs):
pools = neutronclient(request).list_pools(**kwargs).get('pools')
if expand_subnet:
subnets = neutron.subnet_list(request)
subnet_dict = SortedDict((s.id, s) for s in subnets)
for p in pools:
p['subnet_name'] = subnet_dict.get(p['subnet_id']).cidr
if expand_vip:
vips = vip_list(request)
vip_dict = SortedDict((v.id, v) for v in vips)
for p in pools:
p['vip_name'] = _get_vip_name(request, p, vip_dict)
return [Pool(p) for p in pools]
def pool_get(request, pool_id):
return _pool_get(request, pool_id, expand_subnet=True, expand_vip=True)
def _pool_get(request, pool_id, expand_subnet=False, expand_vip=False):
pool = neutronclient(request).show_pool(pool_id).get('pool')
if expand_subnet:
pool['subnet_name'] = neutron.subnet_get(request,
pool['subnet_id']).cidr
if expand_vip:
pool['vip_name'] = _get_vip_name(request, pool, vip_dict=False)
return Pool(pool)
@ -288,12 +267,27 @@ def member_create(request, **kwargs):
def member_list(request, **kwargs):
return _member_list(request, expand_pool=True, **kwargs)
def _member_list(request, expand_pool, **kwargs):
members = neutronclient(request).list_members(**kwargs).get('members')
if expand_pool:
pools = _pool_list(request)
pool_dict = SortedDict((p.id, p) for p in pools)
for m in members:
m['pool_name'] = pool_dict.get(m['pool_id']).name
return [Member(m) for m in members]
def member_get(request, member_id):
return _member_get(request, member_id, expand_pool=True)
def _member_get(request, member_id, expand_pool):
member = neutronclient(request).show_member(member_id).get('member')
if expand_pool:
member['pool_name'] = _pool_get(request, member['pool_id']).name
return Member(member)

View File

@ -18,6 +18,8 @@
from __future__ import absolute_import
from django.utils.datastructures import SortedDict
from horizon.utils.memoized import memoized # noqa
from openstack_dashboard.api import neutron
@ -32,30 +34,6 @@ class IKEPolicy(neutron.NeutronAPIDictWrapper):
def __init__(self, apiresource):
super(IKEPolicy, self).__init__(apiresource)
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
def readable(self, request):
pFormatted = {'id': self.id,
'name': self.name,
'description': self.description,
'auth_algorithm': self.auth_algorithm,
'encryption_algorithm': self.encryption_algorithm,
'pfs': self.pfs,
}
try:
conns = ipsecsiteconnection_list(request)
pFormatted['ipsecsiteconns'] = [c.id for c in conns
if c.ikepolicy_id == self.id]
except Exception:
pFormatted['ipsecsiteconns'] = None
return self.AttributeDict(pFormatted)
class IPSecPolicy(neutron.NeutronAPIDictWrapper):
@ -64,30 +42,6 @@ class IPSecPolicy(neutron.NeutronAPIDictWrapper):
def __init__(self, apiresource):
super(IPSecPolicy, self).__init__(apiresource)
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
def readable(self, request):
pFormatted = {'id': self.id,
'name': self.name,
'description': self.description,
'auth_algorithm': self.auth_algorithm,
'encryption_algorithm': self.encryption_algorithm,
'pfs': self.pfs,
}
try:
conns = ipsecsiteconnection_list(request)
pFormatted['ipsecsiteconns'] = [c.id for c in conns
if c.ipsecpolicy_id == self.id]
except Exception:
pFormatted['ipsecsiteconns'] = None
return self.AttributeDict(pFormatted)
class IPSecSiteConnection(neutron.NeutronAPIDictWrapper):
@ -96,45 +50,6 @@ class IPSecSiteConnection(neutron.NeutronAPIDictWrapper):
def __init__(self, apiresource):
super(IPSecSiteConnection, self).__init__(apiresource)
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
def readable(self, request):
cFormatted = {'id': self.id,
'name': self.name,
'description': self.description,
'status': self.status,
}
try:
cFormatted['ikepolicy_id'] = self.ikepolicy_id
cFormatted['ikepolicy_name'] = ikepolicy_get(
request, self.ikepolicy_id).name
except Exception:
cFormatted['ikepolicy_id'] = self.ikepolicy_id
cFormatted['ikepolicy_name'] = self.ikepolicy_id
try:
cFormatted['ipsecpolicy_id'] = self.ipsecpolicy_id
cFormatted['ipsecpolicy_name'] = ipsecpolicy_get(
request, self.ipsecpolicy_id).name
except Exception:
cFormatted['ipsecpolicy_id'] = self.ipsecpolicy_id
cFormatted['ipsecpolicy_name'] = self.ipsecpolicy_id
try:
cFormatted['vpnservice_id'] = self.vpnservice_id
cFormatted['vpnservice_name'] = vpnservice_get(
request, self.vpnservice_id).name
except Exception:
cFormatted['vpnservice_id'] = self.vpnservice_id
cFormatted['vpnservice_name'] = self.vpnservice_id
return self.AttributeDict(cFormatted)
class VPNService(neutron.NeutronAPIDictWrapper):
@ -143,45 +58,6 @@ class VPNService(neutron.NeutronAPIDictWrapper):
def __init__(self, apiresource):
super(VPNService, self).__init__(apiresource)
class AttributeDict(dict):
def __getattr__(self, attr):
return self[attr]
def __setattr__(self, attr, value):
self[attr] = value
def readable(self, request):
sFormatted = {'id': self.id,
'name': self.name,
'description': self.description,
'admin_state_up': self.admin_state_up,
'status': self.status,
}
try:
sFormatted['subnet_id'] = self.subnet_id
sFormatted['subnet_name'] = neutron.subnet_get(
request, self.subnet_id).cidr
except Exception:
sFormatted['subnet_id'] = self.subnet_id
sFormatted['subnet_name'] = self.subnet_id
try:
sFormatted['router_id'] = self.router_id
sFormatted['router_name'] = neutron.router_get(
request, self.router_id).name
except Exception:
sFormatted['router_id'] = self.router_id
sFormatted['router_name'] = self.router_id
try:
conns = ipsecsiteconnection_list(request)
sFormatted['ipsecsiteconns'] = [c.id for c in conns
if c.vpnservice_id == self.id]
except Exception:
sFormatted['ipsecsiteconns'] = None
return self.AttributeDict(sFormatted)
def vpnservice_create(request, **kwargs):
"""Create VPNService
@ -206,14 +82,51 @@ def vpnservice_create(request, **kwargs):
def vpnservice_list(request, **kwargs):
return _vpnservice_list(request, expand_subnet=True, expand_router=True,
expand_conns=True, **kwargs)
def _vpnservice_list(request, expand_subnet=False, expand_router=False,
expand_conns=False, **kwargs):
vpnservices = neutronclient(request).list_vpnservices(
**kwargs).get('vpnservices')
if expand_subnet:
subnets = neutron.subnet_list(request)
subnet_dict = SortedDict((s.id, s) for s in subnets)
for s in vpnservices:
s['subnet_name'] = subnet_dict.get(s['subnet_id']).cidr
if expand_router:
routers = neutron.router_list(request)
router_dict = SortedDict((r.id, r) for r in routers)
for s in vpnservices:
s['router_name'] = router_dict.get(s['router_id']).name
if expand_conns:
ipsecsiteconns = _ipsecsiteconnection_list(request, **kwargs)
for s in vpnservices:
s['ipsecsiteconns'] = [c.id for c in ipsecsiteconns
if c.vpnservice_id == s['id']]
return [VPNService(v) for v in vpnservices]
def vpnservice_get(request, vpnservice_id):
return _vpnservice_get(request, vpnservice_id, expand_subnet=True,
expand_router=True, expand_conns=True)
def _vpnservice_get(request, vpnservice_id, expand_subnet=False,
expand_router=False, expand_conns=False):
vpnservice = neutronclient(request).show_vpnservice(vpnservice_id).get(
'vpnservice')
if expand_subnet:
vpnservice['subnet_name'] = neutron.subnet_get(
request, vpnservice['subnet_id']).cidr
if expand_router:
vpnservice['router_name'] = neutron.router_get(
request, vpnservice['router_id']).name
if expand_conns:
ipsecsiteconns = _ipsecsiteconnection_list(request)
vpnservice['ipsecsiteconns'] = [c.id for c in ipsecsiteconns
if c.vpnservice_id == vpnservice['id']]
return VPNService(vpnservice)
@ -256,14 +169,31 @@ def ikepolicy_create(request, **kwargs):
def ikepolicy_list(request, **kwargs):
return _ikepolicy_list(request, expand_conns=True, **kwargs)
def _ikepolicy_list(request, expand_conns=False, **kwargs):
ikepolicies = neutronclient(request).list_ikepolicies(
**kwargs).get('ikepolicies')
if expand_conns:
ipsecsiteconns = _ipsecsiteconnection_list(request, **kwargs)
for p in ikepolicies:
p['ipsecsiteconns'] = [c.id for c in ipsecsiteconns
if c.ikepolicy_id == p['id']]
return [IKEPolicy(v) for v in ikepolicies]
def ikepolicy_get(request, ikepolicy_id):
return _ikepolicy_get(request, ikepolicy_id, expand_conns=True)
def _ikepolicy_get(request, ikepolicy_id, expand_conns=False):
ikepolicy = neutronclient(request).show_ikepolicy(
ikepolicy_id).get('ikepolicy')
if expand_conns:
ipsecsiteconns = _ipsecsiteconnection_list(request)
ikepolicy['ipsecsiteconns'] = [c.id for c in ipsecsiteconns
if c.ikepolicy_id == ikepolicy['id']]
return IKEPolicy(ikepolicy)
@ -306,14 +236,31 @@ def ipsecpolicy_create(request, **kwargs):
def ipsecpolicy_list(request, **kwargs):
return _ipsecpolicy_list(request, expand_conns=True, **kwargs)
def _ipsecpolicy_list(request, expand_conns=False, **kwargs):
ipsecpolicies = neutronclient(request).list_ipsecpolicies(
**kwargs).get('ipsecpolicies')
if expand_conns:
ipsecsiteconns = _ipsecsiteconnection_list(request, **kwargs)
for p in ipsecpolicies:
p['ipsecsiteconns'] = [c.id for c in ipsecsiteconns
if c.ipsecpolicy_id == p['id']]
return [IPSecPolicy(v) for v in ipsecpolicies]
def ipsecpolicy_get(request, ipsecpolicy_id):
return _ipsecpolicy_get(request, ipsecpolicy_id, expand_conns=True)
def _ipsecpolicy_get(request, ipsecpolicy_id, expand_conns=False):
ipsecpolicy = neutronclient(request).show_ipsecpolicy(
ipsecpolicy_id).get('ipsecpolicy')
if expand_conns:
ipsecsiteconns = _ipsecsiteconnection_list(request)
ipsecpolicy['ipsecsiteconns'] = [c.id for c in ipsecsiteconns
if c.ipsecpolicy_id == ipsecpolicy['id']]
return IPSecPolicy(ipsecpolicy)
@ -367,14 +314,56 @@ def ipsecsiteconnection_create(request, **kwargs):
@memoized
def ipsecsiteconnection_list(request, **kwargs):
return _ipsecsiteconnection_list(request, expand_ikepolicies=True,
expand_ipsecpolicies=True,
expand_vpnservices=True, **kwargs)
@memoized
def _ipsecsiteconnection_list(request, expand_ikepolicies=False,
expand_ipsecpolicies=False,
expand_vpnservices=False, **kwargs):
ipsecsiteconnections = neutronclient(request).list_ipsec_site_connections(
**kwargs).get('ipsec_site_connections')
if expand_ikepolicies:
ikepolicies = _ikepolicy_list(request, **kwargs)
policy_dict = SortedDict((p.id, p) for p in ikepolicies)
for c in ipsecsiteconnections:
c['ikepolicy_name'] = policy_dict.get(c['ikepolicy_id']).name
if expand_ipsecpolicies:
ipsecpolicies = _ipsecpolicy_list(request, **kwargs)
policy_dict = SortedDict((p.id, p) for p in ipsecpolicies)
for c in ipsecsiteconnections:
c['ipsecpolicy_name'] = policy_dict.get(c['ipsecpolicy_id']).name
if expand_vpnservices:
vpnservices = _vpnservice_list(request, **kwargs)
service_dict = SortedDict((s.id, s) for s in vpnservices)
for c in ipsecsiteconnections:
c['vpnservice_name'] = service_dict.get(c['vpnservice_id']).name
return [IPSecSiteConnection(v) for v in ipsecsiteconnections]
def ipsecsiteconnection_get(request, ipsecsiteconnection_id):
return _ipsecsiteconnection_get(request, ipsecsiteconnection_id,
expand_ikepolicies=True,
expand_ipsecpolicies=True,
expand_vpnservices=True)
def _ipsecsiteconnection_get(request, ipsecsiteconnection_id,
expand_ikepolicies, expand_ipsecpolicies,
expand_vpnservices):
ipsecsiteconnection = neutronclient(request).show_ipsec_site_connection(
ipsecsiteconnection_id).get('ipsec_site_connection')
if expand_ikepolicies:
ipsecsiteconnection['ikepolicy_name'] = _ikepolicy_get(
request, ipsecsiteconnection['ikepolicy_id']).name
if expand_ipsecpolicies:
ipsecsiteconnection['ipsecpolicy_name'] = _ipsecpolicy_get(
request, ipsecsiteconnection['ipsecpolicy_id']).name
if expand_vpnservices:
ipsecsiteconnection['vpnservice_name'] = _vpnservice_get(
request, ipsecsiteconnection['vpnservice_id'])
return IPSecSiteConnection(ipsecsiteconnection)

View File

@ -36,13 +36,13 @@ class PoolsTab(tabs.TableTab):
tenant_id = self.request.user.tenant_id
pools = api.lbaas.pool_list(self.tab_group.request,
tenant_id=tenant_id)
poolsFormatted = [p.readable(self.tab_group.request) for
p in pools]
except Exception:
poolsFormatted = []
pools = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve pools list.'))
return poolsFormatted
for p in pools:
p.set_id_as_name_if_empty()
return pools
class MembersTab(tabs.TableTab):
@ -56,13 +56,13 @@ class MembersTab(tabs.TableTab):
tenant_id = self.request.user.tenant_id
members = api.lbaas.member_list(self.tab_group.request,
tenant_id=tenant_id)
membersFormatted = [m.readable(self.tab_group.request) for
m in members]
except Exception:
membersFormatted = []
members = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve member list.'))
return membersFormatted
for m in members:
m.set_id_as_name_if_empty()
return members
class MonitorsTab(tabs.TableTab):

View File

@ -58,27 +58,15 @@ class LoadBalancerTests(test.TestCase):
def set_up_expect(self):
# retrieve pools
vip1, vip2 = self.vips.list()[:2]
api.lbaas.pool_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id) \
.AndReturn(self.pools.list())
api.lbaas.vip_get(IsA(http.HttpRequest), vip1.id).AndReturn(vip1)
api.lbaas.vip_get(IsA(http.HttpRequest), vip2.id).AndReturn(vip2)
# retrieves members
api.lbaas.member_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id) \
.AndReturn(self.members.list())
pool1, pool2 = self.pools.list()[:2]
api.lbaas.pool_get(IsA(http.HttpRequest),
self.members.list()[0].pool_id).AndReturn(pool1)
api.lbaas.pool_get(IsA(http.HttpRequest),
self.members.list()[1].pool_id).AndReturn(pool2)
# retrieves monitors
api.lbaas.pool_health_monitor_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id).MultipleTimes() \
@ -95,8 +83,7 @@ class LoadBalancerTests(test.TestCase):
IsA(http.HttpRequest), tenant_id=self.tenant.id) \
.AndRaise(self.exceptions.neutron)
@test.create_stubs({api.lbaas: ('pool_list', 'vip_get',
'member_list', 'pool_get',
@test.create_stubs({api.lbaas: ('pool_list', 'member_list',
'pool_health_monitor_list')})
def test_index_pools(self):
self.set_up_expect()
@ -111,8 +98,7 @@ class LoadBalancerTests(test.TestCase):
self.assertEqual(len(res.context['table'].data),
len(self.pools.list()))
@test.create_stubs({api.lbaas: ('pool_list', 'vip_get',
'member_list', 'pool_get',
@test.create_stubs({api.lbaas: ('pool_list', 'member_list',
'pool_health_monitor_list')})
def test_index_members(self):
self.set_up_expect()
@ -127,9 +113,8 @@ class LoadBalancerTests(test.TestCase):
self.assertEqual(len(res.context['memberstable_table'].data),
len(self.members.list()))
@test.create_stubs({api.lbaas: ('pool_list', 'vip_get',
'pool_health_monitor_list',
'member_list', 'pool_get')})
@test.create_stubs({api.lbaas: ('pool_list', 'member_list',
'pool_health_monitor_list')})
def test_index_monitors(self):
self.set_up_expect()
@ -870,8 +855,7 @@ class LoadBalancerTests(test.TestCase):
'<DeletePMAssociationStep: deletepmassociationaction>', ]
self.assertQuerysetEqual(workflow.steps, expected_objs)
@test.create_stubs({api.lbaas: ('pool_list', 'vip_get',
'member_list', 'pool_get',
@test.create_stubs({api.lbaas: ('pool_list', 'member_list',
'pool_health_monitor_list',
'pool_delete')})
def test_delete_pool(self):
@ -885,14 +869,14 @@ class LoadBalancerTests(test.TestCase):
self.assertNoFormErrors(res)
@test.create_stubs({api.lbaas: ('pool_list', 'vip_get',
'member_list', 'pool_get',
@test.create_stubs({api.lbaas: ('pool_list', 'member_list',
'pool_health_monitor_list',
'vip_delete')})
'pool_get', 'vip_delete')})
def test_delete_vip(self):
self.set_up_expect()
pool = self.pools.first()
vip = self.vips.first()
api.lbaas.pool_get(IsA(http.HttpRequest), pool.id).AndReturn(pool)
api.lbaas.vip_delete(IsA(http.HttpRequest), vip.id)
self.mox.ReplayAll()
@ -901,8 +885,7 @@ class LoadBalancerTests(test.TestCase):
self.assertNoFormErrors(res)
@test.create_stubs({api.lbaas: ('pool_list', 'vip_get',
'member_list', 'pool_get',
@test.create_stubs({api.lbaas: ('pool_list', 'member_list',
'pool_health_monitor_list',
'member_delete')})
def test_delete_member(self):
@ -916,8 +899,7 @@ class LoadBalancerTests(test.TestCase):
self.assertNoFormErrors(res)
@test.create_stubs({api.lbaas: ('pool_list', 'vip_get',
'member_list', 'pool_get',
@test.create_stubs({api.lbaas: ('pool_list', 'member_list',
'pool_health_monitor_list',
'pool_health_monitor_delete')})
def test_delete_monitor(self):

View File

@ -60,13 +60,13 @@ class IPSecSiteConnectionsTab(tabs.TableTab):
tenant_id = self.request.user.tenant_id
ipsecsiteconnections = api.vpn.ipsecsiteconnection_list(
self.tab_group.request, tenant_id=tenant_id)
ipsecsiteconnectionsFormatted = [s.readable(self.tab_group.request)
for s in ipsecsiteconnections]
except Exception:
ipsecsiteconnectionsFormatted = []
ipsecsiteconnections = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve IPSec Site Connections list.'))
return ipsecsiteconnectionsFormatted
for c in ipsecsiteconnections:
c.set_id_as_name_if_empty()
return ipsecsiteconnections
class VPNServicesTab(tabs.TableTab):
@ -80,13 +80,13 @@ class VPNServicesTab(tabs.TableTab):
tenant_id = self.request.user.tenant_id
vpnservices = api.vpn.vpnservice_list(
self.tab_group.request, tenant_id=tenant_id)
vpnservicesFormatted = [s.readable(self.tab_group.request)
for s in vpnservices]
except Exception:
vpnservicesFormatted = []
vpnservices = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve VPN Services list.'))
return vpnservicesFormatted
for s in vpnservices:
s.set_id_as_name_if_empty()
return vpnservices
class IKEPoliciesTab(tabs.TableTab):
@ -100,13 +100,13 @@ class IKEPoliciesTab(tabs.TableTab):
tenant_id = self.request.user.tenant_id
ikepolicies = api.vpn.ikepolicy_list(
self.tab_group.request, tenant_id=tenant_id)
ikepoliciesFormatted = [s.readable(self.tab_group.request)
for s in ikepolicies]
except Exception:
ikepoliciesFormatted = []
ikepolicies = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve IKE Policies list.'))
return ikepoliciesFormatted
for p in ikepolicies:
p.set_id_as_name_if_empty()
return ikepolicies
class IPSecPoliciesTab(tabs.TableTab):
@ -120,13 +120,13 @@ class IPSecPoliciesTab(tabs.TableTab):
tenant_id = self.request.user.tenant_id
ipsecpolicies = api.vpn.ipsecpolicy_list(
self.tab_group.request, tenant_id=tenant_id)
ipsecpoliciesFormatted = [s.readable(self.tab_group.request)
for s in ipsecpolicies]
except Exception:
ipsecpoliciesFormatted = []
ipsecpolicies = []
exceptions.handle(self.tab_group.request,
_('Unable to retrieve IPSec Policies list.'))
return ipsecpoliciesFormatted
for p in ipsecpolicies:
p.set_id_as_name_if_empty()
return ipsecpolicies
class VPNTabs(tabs.TabGroup):

View File

@ -54,52 +54,23 @@ class VPNTests(test.TestCase):
def set_up_expect(self):
# retrieves vpnservices
vpnservice1, vpnservice2 = self.vpnservices.list()[:2]
api.vpn.vpnservice_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id) \
.AndReturn(self.vpnservices.list())
api.vpn.vpnservice_get(
IsA(http.HttpRequest), vpnservice1.id).AndReturn(vpnservice1)
api.vpn.vpnservice_get(
IsA(http.HttpRequest), vpnservice2.id).AndReturn(vpnservice2)
# retrieves ikepolicies
api.vpn.ikepolicy_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id) \
.AndReturn(self.ikepolicies.list())
ikepolicy1, ikepolicy2 = self.ikepolicies.list()[:2]
api.vpn.ikepolicy_get(
IsA(http.HttpRequest), ikepolicy1.id).AndReturn(ikepolicy1)
api.vpn.ikepolicy_get(
IsA(http.HttpRequest), ikepolicy2.id).AndReturn(ikepolicy2)
# retrieves ipsecpolicies
api.vpn.ipsecpolicy_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id) \
.AndReturn(self.ipsecpolicies.list())
ipsecpolicy1, ipsecpolicy2 = self.ipsecpolicies.list()[:2]
api.vpn.ipsecpolicy_get(
IsA(http.HttpRequest), ipsecpolicy1.id).AndReturn(ipsecpolicy1)
api.vpn.ipsecpolicy_get(
IsA(http.HttpRequest), ipsecpolicy2.id).AndReturn(ipsecpolicy2)
# retrieves ipsecsiteconnections
# In unit tests memoized decorator is not called when methods with
# memoized decorators are stubbed out.
# That's the reason MultipleTimes is needed here.
api.vpn.ipsecsiteconnection_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id).MultipleTimes() \
.AndReturn(self.ipsecsiteconnections.list())
api.vpn.ipsecsiteconnection_list(
IsA(http.HttpRequest)).MultipleTimes() \
IsA(http.HttpRequest), tenant_id=self.tenant.id) \
.AndReturn(self.ipsecsiteconnections.list())
def set_up_expect_with_exception(self):
@ -118,9 +89,7 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.vpn: ('ikepolicy_list', 'ipsecpolicy_list',
'vpnservice_list',
'ipsecsiteconnection_list', 'ikepolicy_get',
'ipsecpolicy_get', 'vpnservice_get',
'ipsecsiteconnection_get')})
'ipsecsiteconnection_list')})
def test_index_vpnservices(self):
self.set_up_expect()
@ -136,9 +105,7 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.vpn: ('ikepolicy_list', 'ipsecpolicy_list',
'vpnservice_list',
'ipsecsiteconnection_list', 'ikepolicy_get',
'ipsecpolicy_get', 'vpnservice_get',
'ipsecsiteconnection_get')})
'ipsecsiteconnection_list')})
def test_index_ikepolicies(self):
self.set_up_expect()
@ -154,9 +121,7 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.vpn: ('ikepolicy_list', 'ipsecpolicy_list',
'vpnservice_list',
'ipsecsiteconnection_list', 'ikepolicy_get',
'ipsecpolicy_get', 'vpnservice_get',
'ipsecsiteconnection_get')})
'ipsecsiteconnection_list')})
def test_index_ipsecpolicies(self):
self.set_up_expect()
@ -172,9 +137,7 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.vpn: ('ikepolicy_list', 'ipsecpolicy_list',
'vpnservice_list',
'ipsecsiteconnection_list', 'ikepolicy_get',
'ipsecpolicy_get', 'vpnservice_get',
'ipsecsiteconnection_get')})
'ipsecsiteconnection_list')})
def test_index_ipsecsiteconnections(self):
self.set_up_expect()
@ -258,15 +221,11 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.neutron: ('network_list_for_tenant',
'router_list')})
def test_add_vpnservice_get(self):
subnet = self.subnets.first()
networks = [{'subnets': [subnet, ]}, ]
networks = [{'subnets': [self.subnets.first(), ]}, ]
routers = self.routers.list()
api.neutron.network_list_for_tenant(
IsA(http.HttpRequest), self.tenant.id).AndReturn(networks)
routers = self.routers.list()
api.neutron.router_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(routers)
@ -286,17 +245,13 @@ class VPNTests(test.TestCase):
api.vpn: ('vpnservice_create', )})
def test_add_vpnservice_post(self):
vpnservice = self.vpnservices.first()
networks = [{'subnets': [self.subnets.first(), ]}, ]
routers = self.routers.list()
api.neutron.router_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(routers)
subnet = self.subnets.first()
networks = [{'subnets': [subnet, ]}, ]
api.neutron.network_list_for_tenant(
IsA(http.HttpRequest), self.tenant.id).AndReturn(networks)
api.neutron.router_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(routers)
api.vpn.vpnservice_create(
IsA(http.HttpRequest),
@ -324,17 +279,13 @@ class VPNTests(test.TestCase):
'network_list_for_tenant')})
def test_add_vpnservice_post_error(self):
vpnservice = self.vpnservices.first()
networks = [{'subnets': [self.subnets.first(), ]}, ]
routers = self.routers.list()
api.neutron.router_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(routers)
subnet = self.subnets.first()
networks = [{'subnets': [subnet, ]}, ]
api.neutron.network_list_for_tenant(
IsA(http.HttpRequest), self.tenant.id).AndReturn(networks)
api.neutron.router_list(
IsA(http.HttpRequest), tenant_id=self.tenant.id).AndReturn(routers)
self.mox.ReplayAll()
@ -735,9 +686,7 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.vpn: ('ikepolicy_list', 'ipsecpolicy_list',
'vpnservice_list',
'ipsecsiteconnection_list', 'ikepolicy_get',
'ipsecpolicy_get', 'vpnservice_get',
'ipsecsiteconnection_get',
'ipsecsiteconnection_list',
'vpnservice_delete',)})
def test_delete_vpnservice(self):
self.set_up_expect()
@ -757,9 +706,7 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.vpn: ('ikepolicy_list', 'ipsecpolicy_list',
'vpnservice_list',
'ipsecsiteconnection_list', 'ikepolicy_get',
'ipsecpolicy_get', 'vpnservice_get',
'ipsecsiteconnection_get',
'ipsecsiteconnection_list',
'ikepolicy_delete',)})
def test_delete_ikepolicy(self):
self.set_up_expect()
@ -779,9 +726,7 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.vpn: ('ikepolicy_list', 'ipsecpolicy_list',
'vpnservice_list',
'ipsecsiteconnection_list', 'ikepolicy_get',
'ipsecpolicy_get', 'vpnservice_get',
'ipsecsiteconnection_get',
'ipsecsiteconnection_list',
'ipsecpolicy_delete',)})
def test_delete_ipsecpolicy(self):
self.set_up_expect()
@ -802,9 +747,7 @@ class VPNTests(test.TestCase):
@test.create_stubs({api.vpn: ('ikepolicy_list', 'ipsecpolicy_list',
'vpnservice_list',
'ipsecsiteconnection_list', 'ikepolicy_get',
'ipsecpolicy_get', 'vpnservice_get',
'ipsecsiteconnection_get',
'ipsecsiteconnection_list',
'ipsecsiteconnection_delete',)})
def test_delete_ipsecsiteconnection(self):
self.set_up_expect()

View File

@ -86,6 +86,7 @@ class LbaasApiTests(test.APITestCase):
ret_val = api.lbaas.vip_list(self.request)
for v in ret_val:
self.assertIsInstance(v, api.lbaas.Vip)
self.assertTrue(v.id)
@test.create_stubs({neutronclient: ('show_vip',)})
def test_vip_get(self):
@ -163,37 +164,37 @@ class LbaasApiTests(test.APITestCase):
ret_val = api.lbaas.pool_create(self.request, **form_data)
self.assertIsInstance(ret_val, api.lbaas.Pool)
@test.create_stubs({neutronclient: ('list_pools',)})
@test.create_stubs({neutronclient: ('list_pools', 'list_vips'),
api.neutron: ('subnet_list',)})
def test_pool_list(self):
pools = {'pools': [{
'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'name': 'pool1name',
'description': 'pool1description',
'subnet_id': '12381d38-c3eb-4fee-9763-12de3338041e',
'protocol': 'HTTP',
'lb_method': 'ROUND_ROBIN',
'admin_state_up': True}, ]}
pools = {'pools': self.api_pools.list()}
subnets = self.subnets.list()
vips = {'vips': self.api_vips.list()}
neutronclient.list_pools().AndReturn(pools)
api.neutron.subnet_list(self.request).AndReturn(subnets)
neutronclient.list_vips().AndReturn(vips)
self.mox.ReplayAll()
ret_val = api.lbaas.pool_list(self.request)
for v in ret_val:
self.assertIsInstance(v, api.lbaas.Pool)
self.assertTrue(v.id)
@test.create_stubs({neutronclient: ('show_pool',)})
@test.create_stubs({neutronclient: ('show_pool', 'show_vip'),
api.neutron: ('subnet_get',)})
def test_pool_get(self):
pool = {'pool': {'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'name': 'pool1name',
'description': 'pool1description',
'subnet_id': '12381d38-c3eb-4fee-9763-12de3338041e',
'protocol': 'HTTP',
'lb_method': 'ROUND_ROBIN',
'admin_state_up': True
}}
neutronclient.show_pool(pool['pool']['id']).AndReturn(pool)
pool = self.pools.first()
subnet = self.subnets.first()
pool_dict = {'pool': self.api_pools.first()}
vip_dict = {'vip': self.api_vips.first()}
neutronclient.show_pool(pool.id).AndReturn(pool_dict)
api.neutron.subnet_get(self.request, subnet.id).AndReturn(subnet)
neutronclient.show_vip(pool.vip_id).AndReturn(vip_dict)
self.mox.ReplayAll()
ret_val = api.lbaas.pool_get(self.request, pool['pool']['id'])
ret_val = api.lbaas.pool_get(self.request, pool.id)
self.assertIsInstance(ret_val, api.lbaas.Pool)
@test.create_stubs({neutronclient: ('update_pool',)})
@ -264,6 +265,7 @@ class LbaasApiTests(test.APITestCase):
ret_val = api.lbaas.pool_health_monitor_list(self.request)
for v in ret_val:
self.assertIsInstance(v, api.lbaas.PoolMonitor)
self.assertTrue(v.id)
@test.create_stubs({neutronclient: ('show_health_monitor',)})
def test_pool_health_monitor_get(self):
@ -308,35 +310,31 @@ class LbaasApiTests(test.APITestCase):
ret_val = api.lbaas.member_create(self.request, **form_data)
self.assertIsInstance(ret_val, api.lbaas.Member)
@test.create_stubs({neutronclient: ('list_members',)})
@test.create_stubs({neutronclient: ('list_members', 'list_pools')})
def test_member_list(self):
members = {'members': [
{'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'pool_id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'address': '10.0.1.2',
'protocol_port': '80',
'weight': '10',
'admin_state_up': True
}, ]}
members = {'members': self.api_members.list()}
pools = {'pools': self.api_pools.list()}
neutronclient.list_members().AndReturn(members)
neutronclient.list_pools().AndReturn(pools)
self.mox.ReplayAll()
ret_val = api.lbaas.member_list(self.request)
for v in ret_val:
self.assertIsInstance(v, api.lbaas.Member)
self.assertTrue(v.id)
@test.create_stubs({neutronclient: ('show_member',)})
@test.create_stubs({neutronclient: ('show_member', 'show_pool')})
def test_member_get(self):
member = {'member': {'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'pool_id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'address': '10.0.1.2',
'protocol_port': '80',
'weight': '10',
'admin_state_up': True}}
neutronclient.show_member(member['member']['id']).AndReturn(member)
member = self.members.first()
member_dict = {'member': self.api_members.first()}
pool_dict = {'pool': self.api_pools.first()}
neutronclient.show_member(member.id).AndReturn(member_dict)
neutronclient.show_pool(member.pool_id).AndReturn(pool_dict)
self.mox.ReplayAll()
ret_val = api.lbaas.member_get(self.request, member['member']['id'])
ret_val = api.lbaas.member_get(self.request, member.id)
self.assertIsInstance(ret_val, api.lbaas.Member)
@test.create_stubs({neutronclient: ('update_member',)})

View File

@ -44,12 +44,22 @@ class VPNaasApiTests(test.APITestCase):
ret_val = api.vpn.vpnservice_create(self.request, **form_data)
self.assertIsInstance(ret_val, api.vpn.VPNService)
@test.create_stubs({neutronclient: ('list_vpnservices',)})
@test.create_stubs({neutronclient: ('list_vpnservices',
'list_ipsec_site_connections'),
api.neutron: ('subnet_list', 'router_list')})
def test_vpnservice_list(self):
vpnservices = {'vpnservices': self.vpnservices.list()}
vpnservices_dict = {'vpnservices': self.api_vpnservices.list()}
subnets = self.subnets.list()
routers = self.routers.list()
ipsecsiteconnections_dict = {
'ipsec_site_connections': self.api_ipsecsiteconnections.list()}
neutronclient.list_vpnservices().AndReturn(vpnservices_dict)
api.neutron.subnet_list(self.request).AndReturn(subnets)
api.neutron.router_list(self.request).AndReturn(routers)
neutronclient.list_ipsec_site_connections().AndReturn(
ipsecsiteconnections_dict)
self.mox.ReplayAll()
@ -59,18 +69,27 @@ class VPNaasApiTests(test.APITestCase):
self.assertTrue(v.name, d.name)
self.assertTrue(v.id)
@test.create_stubs({neutronclient: ('show_vpnservice',)})
@test.create_stubs({neutronclient: ('show_vpnservice',
'list_ipsec_site_connections'),
api.neutron: ('subnet_get', 'router_get')})
def test_vpnservice_get(self):
vpnservice1 = self.api_vpnservices.first()
vpnservice = {'vpnservice': vpnservice1}
vpnservice = self.vpnservices.first()
vpnservice_dict = {'vpnservice': self.api_vpnservices.first()}
subnet = self.subnets.first()
router = self.routers.first()
ipsecsiteconnections_dict = {
'ipsec_site_connections': self.api_ipsecsiteconnections.list()}
neutronclient.show_vpnservice(
vpnservice['vpnservice']['id']).AndReturn(vpnservice)
vpnservice.id).AndReturn(vpnservice_dict)
api.neutron.subnet_get(self.request, subnet.id).AndReturn(subnet)
api.neutron.router_get(self.request, router.id).AndReturn(router)
neutronclient.list_ipsec_site_connections().AndReturn(
ipsecsiteconnections_dict)
self.mox.ReplayAll()
ret_val = api.vpn.vpnservice_get(self.request,
vpnservice['vpnservice']['id'])
ret_val = api.vpn.vpnservice_get(self.request, vpnservice.id)
self.assertIsInstance(ret_val, api.vpn.VPNService)
@test.create_stubs({neutronclient: ('create_ikepolicy',)})
@ -95,12 +114,17 @@ class VPNaasApiTests(test.APITestCase):
ret_val = api.vpn.ikepolicy_create(self.request, **form_data)
self.assertIsInstance(ret_val, api.vpn.IKEPolicy)
@test.create_stubs({neutronclient: ('list_ikepolicies',)})
@test.create_stubs({neutronclient: ('list_ikepolicies',
'list_ipsec_site_connections')})
def test_ikepolicy_list(self):
ikepolicies = {'ikepolicies': self.ikepolicies.list()}
ikepolicies_dict = {'ikepolicies': self.api_ikepolicies.list()}
ipsecsiteconnections_dict = {
'ipsec_site_connections': self.api_ipsecsiteconnections.list()}
neutronclient.list_ikepolicies().AndReturn(ikepolicies_dict)
neutronclient.list_ipsec_site_connections().AndReturn(
ipsecsiteconnections_dict)
self.mox.ReplayAll()
@ -110,18 +134,22 @@ class VPNaasApiTests(test.APITestCase):
self.assertTrue(v.name, d.name)
self.assertTrue(v.id)
@test.create_stubs({neutronclient: ('show_ikepolicy',)})
@test.create_stubs({neutronclient: ('show_ikepolicy',
'list_ipsec_site_connections')})
def test_ikepolicy_get(self):
ikepolicy1 = self.api_ikepolicies.first()
ikepolicy = {'ikepolicy': ikepolicy1}
ikepolicy = self.ikepolicies.first()
ikepolicy_dict = {'ikepolicy': self.api_ikepolicies.first()}
ipsecsiteconnections_dict = {
'ipsec_site_connections': self.api_ipsecsiteconnections.list()}
neutronclient.show_ikepolicy(
ikepolicy['ikepolicy']['id']).AndReturn(ikepolicy)
ikepolicy.id).AndReturn(ikepolicy_dict)
neutronclient.list_ipsec_site_connections().AndReturn(
ipsecsiteconnections_dict)
self.mox.ReplayAll()
ret_val = api.vpn.ikepolicy_get(self.request,
ikepolicy['ikepolicy']['id'])
ret_val = api.vpn.ikepolicy_get(self.request, ikepolicy.id)
self.assertIsInstance(ret_val, api.vpn.IKEPolicy)
@test.create_stubs({neutronclient: ('create_ipsecpolicy',)})
@ -146,12 +174,17 @@ class VPNaasApiTests(test.APITestCase):
ret_val = api.vpn.ipsecpolicy_create(self.request, **form_data)
self.assertIsInstance(ret_val, api.vpn.IPSecPolicy)
@test.create_stubs({neutronclient: ('list_ipsecpolicies',)})
@test.create_stubs({neutronclient: ('list_ipsecpolicies',
'list_ipsec_site_connections')})
def test_ipsecpolicy_list(self):
ipsecpolicies = {'ipsecpolicies': self.ipsecpolicies.list()}
ipsecpolicies_dict = {'ipsecpolicies': self.api_ipsecpolicies.list()}
ipsecsiteconnections_dict = {
'ipsec_site_connections': self.api_ipsecsiteconnections.list()}
neutronclient.list_ipsecpolicies().AndReturn(ipsecpolicies_dict)
neutronclient.list_ipsec_site_connections().AndReturn(
ipsecsiteconnections_dict)
self.mox.ReplayAll()
@ -161,18 +194,22 @@ class VPNaasApiTests(test.APITestCase):
self.assertTrue(v.name, d.name)
self.assertTrue(v.id)
@test.create_stubs({neutronclient: ('show_ipsecpolicy',)})
@test.create_stubs({neutronclient: ('show_ipsecpolicy',
'list_ipsec_site_connections')})
def test_ipsecpolicy_get(self):
ipsecpolicy1 = self.api_ipsecpolicies.first()
ipsecpolicy = {'ipsecpolicy': ipsecpolicy1}
ipsecpolicy = self.ipsecpolicies.first()
ipsecpolicy_dict = {'ipsecpolicy': self.api_ipsecpolicies.first()}
ipsecsiteconnections_dict = {
'ipsec_site_connections': self.api_ipsecsiteconnections.list()}
neutronclient.show_ipsecpolicy(
ipsecpolicy['ipsecpolicy']['id']).AndReturn(ipsecpolicy)
ipsecpolicy.id).AndReturn(ipsecpolicy_dict)
neutronclient.list_ipsec_site_connections().AndReturn(
ipsecsiteconnections_dict)
self.mox.ReplayAll()
ret_val = api.vpn.ipsecpolicy_get(self.request,
ipsecpolicy['ipsecpolicy']['id'])
ret_val = api.vpn.ipsecpolicy_get(self.request, ipsecpolicy.id)
self.assertIsInstance(ret_val, api.vpn.IPSecPolicy)
@test.create_stubs({neutronclient: ('create_ipsec_site_connection',)})
@ -205,15 +242,24 @@ class VPNaasApiTests(test.APITestCase):
self.request, **form_data)
self.assertIsInstance(ret_val, api.vpn.IPSecSiteConnection)
@test.create_stubs({neutronclient: ('list_ipsec_site_connections',)})
@test.create_stubs({neutronclient: ('list_ipsec_site_connections',
'list_ikepolicies',
'list_ipsecpolicies',
'list_vpnservices')})
def test_ipsecsiteconnection_list(self):
ipsecsiteconnections = {
'ipsec_site_connections': self.ipsecsiteconnections.list()}
ipsecsiteconnections_dict = {
'ipsec_site_connections': self.api_ipsecsiteconnections.list()}
ikepolicies_dict = {'ikepolicies': self.api_ikepolicies.list()}
ipsecpolicies_dict = {'ipsecpolicies': self.api_ipsecpolicies.list()}
vpnservices_dict = {'vpnservices': self.api_vpnservices.list()}
neutronclient.list_ipsec_site_connections().AndReturn(
ipsecsiteconnections_dict)
neutronclient.list_ikepolicies().AndReturn(ikepolicies_dict)
neutronclient.list_ipsecpolicies().AndReturn(ipsecpolicies_dict)
neutronclient.list_vpnservices().AndReturn(vpnservices_dict)
self.mox.ReplayAll()
@ -224,17 +270,28 @@ class VPNaasApiTests(test.APITestCase):
self.assertTrue(v.name, d.name)
self.assertTrue(v.id)
@test.create_stubs({neutronclient: ('show_ipsec_site_connection',)})
@test.create_stubs({neutronclient: ('show_ipsec_site_connection',
'show_ikepolicy', 'show_ipsecpolicy',
'show_vpnservice')})
def test_ipsecsiteconnection_get(self):
ipsecsiteconnection1 = self.api_ipsecsiteconnections.first()
ipsecsiteconnection = {'ipsec_site_connection': ipsecsiteconnection1}
ipsecsiteconnection = self.ipsecsiteconnections.first()
connection_dict = {'ipsec_site_connection':
self.api_ipsecsiteconnections.first()}
ikepolicy_dict = {'ikepolicy': self.api_ikepolicies.first()}
ipsecpolicy_dict = {'ipsecpolicy': self.api_ipsecpolicies.first()}
vpnservice_dict = {'vpnservice': self.api_vpnservices.first()}
neutronclient.show_ipsec_site_connection(
ipsecsiteconnection['ipsec_site_connection']['id']).AndReturn(
ipsecsiteconnection)
ipsecsiteconnection.id).AndReturn(connection_dict)
neutronclient.show_ikepolicy(
ipsecsiteconnection.ikepolicy_id).AndReturn(ikepolicy_dict)
neutronclient.show_ipsecpolicy(
ipsecsiteconnection.ipsecpolicy_id).AndReturn(ipsecpolicy_dict)
neutronclient.show_vpnservice(
ipsecsiteconnection.vpnservice_id).AndReturn(vpnservice_dict)
self.mox.ReplayAll()
ret_val = api.vpn.ipsecsiteconnection_get(self.request,
ipsecsiteconnection['ipsec_site_connection']['id'])
ipsecsiteconnection.id)
self.assertIsInstance(ret_val, api.vpn.IPSecSiteConnection)

View File

@ -444,6 +444,21 @@ def data(TEST):
TEST.api_pools.add(pool_dict)
TEST.pools.add(lbaas.Pool(pool_dict))
# 2nd pool
pool_dict = {'id': '8913dde8-4915-4b90-8d3e-b95eeedb0d50',
'tenant_id': '1',
'vip_id': 'f0881d38-c3eb-4fee-9763-12de3338041d',
'name': 'pool2',
'description': 'pool description',
'subnet_id': TEST.subnets.first().id,
'protocol': 'HTTPS',
'lb_method': 'ROUND_ROBIN',
'health_monitors': ['d4a0500f-db2b-4cc4-afcf-ec026febff97'],
'status': 'PENDING_CREATE',
'admin_state_up': True}
TEST.api_pools.add(pool_dict)
TEST.pools.add(lbaas.Pool(pool_dict))
# 1st vip
vip_dict = {'id': 'abcdef-c3eb-4fee-9763-12de3338041e',
'name': 'vip1',
@ -506,21 +521,6 @@ def data(TEST):
TEST.api_members.add(member_dict)
TEST.members.add(lbaas.Member(member_dict))
# 2nd pool
pool_dict = {'id': '8913dde8-4915-4b90-8d3e-b95eeedb0d50',
'tenant_id': '1',
'vip_id': 'f0881d38-c3eb-4fee-9763-12de3338041d',
'name': 'pool2',
'description': 'pool description',
'subnet_id': TEST.subnets.first().id,
'protocol': 'HTTPS',
'lb_method': 'ROUND_ROBIN',
'health_monitors': ['d4a0500f-db2b-4cc4-afcf-ec026febff97'],
'status': 'PENDING_CREATE',
'admin_state_up': True}
TEST.api_pools.add(pool_dict)
TEST.pools.add(lbaas.Pool(pool_dict))
# 1st monitor
monitor_dict = {'id': 'd4a0500f-db2b-4cc4-afcf-ec026febff96',
'type': 'ping',
@ -629,7 +629,8 @@ def data(TEST):
'vpn_type': 'ipsec',
'ipsecsiteconnections': [],
'admin_state_up': True,
'status': 'Active'}
'status': 'Active',
'ipsecsiteconns': TEST.ipsecsiteconnections.list()}
TEST.api_vpnservices.add(vpnservice_dict)
TEST.vpnservices.add(vpn.VPNService(vpnservice_dict))
@ -643,7 +644,8 @@ def data(TEST):
'vpn_type': 'ipsec',
'ipsecsiteconnections': [],
'admin_state_up': True,
'status': 'Active'}
'status': 'Active',
'ipsecsiteconns': []}
TEST.api_vpnservices.add(vpnservice_dict)
TEST.vpnservices.add(vpn.VPNService(vpnservice_dict))
@ -657,7 +659,8 @@ def data(TEST):
'ike_version': 'v1',
'lifetime': {'units': 'seconds', 'value': 3600},
'phase1_negotiation_mode': 'main',
'pfs': 'group5'}
'pfs': 'group5',
'ipsecsiteconns': TEST.ipsecsiteconnections.list()}
TEST.api_ikepolicies.add(ikepolicy_dict)
TEST.ikepolicies.add(vpn.IKEPolicy(ikepolicy_dict))
@ -671,7 +674,8 @@ def data(TEST):
'ike_version': 'v1',
'lifetime': {'units': 'seconds', 'value': 3600},
'phase1_negotiation_mode': 'main',
'pfs': 'group5'}
'pfs': 'group5',
'ipsecsiteconns': []}
TEST.api_ikepolicies.add(ikepolicy_dict)
TEST.ikepolicies.add(vpn.IKEPolicy(ikepolicy_dict))
@ -685,7 +689,8 @@ def data(TEST):
'encryption_algorithm': '3des',
'lifetime': {'units': 'seconds', 'value': 3600},
'pfs': 'group5',
'transform_protocol': 'esp'}
'transform_protocol': 'esp',
'ipsecsiteconns': TEST.ipsecsiteconnections.list()}
TEST.api_ipsecpolicies.add(ipsecpolicy_dict)
TEST.ipsecpolicies.add(vpn.IPSecPolicy(ipsecpolicy_dict))
@ -699,7 +704,8 @@ def data(TEST):
'encryption_algorithm': '3des',
'lifetime': {'units': 'seconds', 'value': 3600},
'pfs': 'group5',
'transform_protocol': 'esp'}
'transform_protocol': 'esp',
'ipsecsiteconns': []}
TEST.api_ipsecpolicies.add(ipsecpolicy_dict)
TEST.ipsecpolicies.add(vpn.IPSecPolicy(ipsecpolicy_dict))