Make ipsec_site_connection dpd_timeout == dpd_interval return 400
dpd_timeout == dpd_interval is a invalid case, so in this commit, we modify validation and added test. Fixes bug 1219440 Change-Id: I14fb9aa7df890f9c5a27f18f20d7dc1a316b2d79
This commit is contained in:
parent
60bc053946
commit
e57934761d
@ -229,9 +229,7 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
|
||||
ipsec_sitecon['dpd_interval'] = dpd.get('interval', 30)
|
||||
ipsec_sitecon['dpd_timeout'] = dpd.get('timeout', 120)
|
||||
tenant_id = self._get_tenant_id_for_create(context, ipsec_sitecon)
|
||||
if ipsec_sitecon['dpd_timeout'] < ipsec_sitecon['dpd_interval']:
|
||||
raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
|
||||
attribute_a='dpd_timeout')
|
||||
self._check_dpd(ipsec_sitecon)
|
||||
with context.session.begin(subtransactions=True):
|
||||
#Check permissions
|
||||
self._get_resource(context,
|
||||
@ -273,31 +271,40 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
|
||||
context.session.add(peer_cidr_db)
|
||||
return self._make_ipsec_site_connection_dict(ipsec_site_conn_db)
|
||||
|
||||
def _check_dpd(self, ipsec_sitecon):
|
||||
if ipsec_sitecon['dpd_timeout'] <= ipsec_sitecon['dpd_interval']:
|
||||
raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
|
||||
attr='dpd_timeout')
|
||||
|
||||
def update_ipsec_site_connection(
|
||||
self, context,
|
||||
ipsec_site_conn_id, ipsec_site_connection):
|
||||
ipsec_sitecon = ipsec_site_connection['ipsec_site_connection']
|
||||
dpd = ipsec_sitecon.get('dpd', {})
|
||||
if dpd.get('action'):
|
||||
ipsec_sitecon['dpd_action'] = dpd.get('action')
|
||||
if dpd.get('interval'):
|
||||
ipsec_sitecon['dpd_interval'] = dpd.get('interval')
|
||||
if dpd.get('timeout'):
|
||||
ipsec_sitecon['dpd_timeout'] = dpd.get('timeout')
|
||||
conn = ipsec_site_connection['ipsec_site_connection']
|
||||
changed_peer_cidrs = False
|
||||
with context.session.begin(subtransactions=True):
|
||||
ipsec_site_conn_db = self._get_resource(
|
||||
context,
|
||||
IPsecSiteConnection,
|
||||
ipsec_site_conn_id)
|
||||
dpd = conn.get('dpd', {})
|
||||
if dpd.get('action'):
|
||||
conn['dpd_action'] = dpd.get('action')
|
||||
if dpd.get('interval') or dpd.get('timeout'):
|
||||
conn['dpd_interval'] = dpd.get(
|
||||
'interval', ipsec_site_conn_db.dpd_interval)
|
||||
conn['dpd_timeout'] = dpd.get(
|
||||
'timeout', ipsec_site_conn_db.dpd_timeout)
|
||||
self._check_dpd(conn)
|
||||
|
||||
self.assert_update_allowed(ipsec_site_conn_db)
|
||||
if "peer_cidrs" in ipsec_sitecon:
|
||||
|
||||
if "peer_cidrs" in conn:
|
||||
changed_peer_cidrs = True
|
||||
old_peer_cidr_list = ipsec_site_conn_db['peer_cidrs']
|
||||
old_peer_cidr_dict = dict(
|
||||
(peer_cidr['cidr'], peer_cidr)
|
||||
for peer_cidr in old_peer_cidr_list)
|
||||
new_peer_cidr_set = set(ipsec_sitecon["peer_cidrs"])
|
||||
new_peer_cidr_set = set(conn["peer_cidrs"])
|
||||
old_peer_cidr_set = set(old_peer_cidr_dict)
|
||||
|
||||
new_peer_cidrs = list(new_peer_cidr_set)
|
||||
@ -308,9 +315,9 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
|
||||
cidr=peer_cidr,
|
||||
ipsec_site_connection_id=ipsec_site_conn_id)
|
||||
context.session.add(pcidr)
|
||||
del ipsec_sitecon["peer_cidrs"]
|
||||
if ipsec_sitecon:
|
||||
ipsec_site_conn_db.update(ipsec_sitecon)
|
||||
del conn["peer_cidrs"]
|
||||
if conn:
|
||||
ipsec_site_conn_db.update(conn)
|
||||
result = self._make_ipsec_site_connection_dict(ipsec_site_conn_db)
|
||||
if changed_peer_cidrs:
|
||||
result['peer_cidrs'] = new_peer_cidrs
|
||||
|
@ -956,6 +956,11 @@ class TestVpnaas(VPNPluginDbTestCase):
|
||||
name=name,
|
||||
dpd_interval=30,
|
||||
dpd_timeout=20, expected_status_int=400)
|
||||
self._create_ipsec_site_connection(
|
||||
fmt=self.fmt,
|
||||
name=name,
|
||||
dpd_interval=100,
|
||||
dpd_timeout=100, expected_status_int=400)
|
||||
|
||||
def test_create_ipsec_site_connection(self, **extras):
|
||||
"""Test case to create an ipsec_site_connection."""
|
||||
@ -1040,6 +1045,35 @@ class TestVpnaas(VPNPluginDbTestCase):
|
||||
self.assertEqual(res.status_int, 204)
|
||||
|
||||
def test_update_ipsec_site_connection(self):
|
||||
dpd = {'action': 'hold',
|
||||
'interval': 40,
|
||||
'timeout': 120}
|
||||
self._test_update_ipsec_site_connection(
|
||||
update={'dpd': dpd}
|
||||
)
|
||||
|
||||
def test_update_ipsec_site_connection_with_invalid_dpd(self):
|
||||
dpd1 = {'action': 'hold',
|
||||
'interval': 100,
|
||||
'timeout': 100}
|
||||
self._test_update_ipsec_site_connection(
|
||||
update={'dpd': dpd1},
|
||||
expected_status_int=400)
|
||||
dpd2 = {'action': 'hold',
|
||||
'interval': 100,
|
||||
'timeout': 60}
|
||||
self._test_update_ipsec_site_connection(
|
||||
update={'dpd': dpd2},
|
||||
expected_status_int=400)
|
||||
dpd3 = {'action': 'hold',
|
||||
'interval': -50,
|
||||
'timeout': -100}
|
||||
self._test_update_ipsec_site_connection(
|
||||
update={'dpd': dpd3},
|
||||
expected_status_int=400)
|
||||
|
||||
def _test_update_ipsec_site_connection(
|
||||
self, update=None, expected_status_int=200):
|
||||
"""Test case to update a ipsec_site_connection."""
|
||||
name = 'new_ipsec_site_connection'
|
||||
ikename = 'ikepolicy1'
|
||||
@ -1095,7 +1129,9 @@ class TestVpnaas(VPNPluginDbTestCase):
|
||||
keys['admin_state_up'],
|
||||
description=description
|
||||
) as ipsec_site_connection:
|
||||
data = {'ipsec_site_connection': {'name': name}}
|
||||
if not update:
|
||||
update = {'name': name}
|
||||
data = {'ipsec_site_connection': update}
|
||||
self._set_active(
|
||||
vpn_db.IPsecSiteConnection,
|
||||
ipsec_site_connection['ipsec_site_connection']['id'])
|
||||
@ -1104,12 +1140,16 @@ class TestVpnaas(VPNPluginDbTestCase):
|
||||
data,
|
||||
ipsec_site_connection['ipsec_site_connection']['id']
|
||||
)
|
||||
res = self.deserialize(
|
||||
self.fmt,
|
||||
req.get_response(self.ext_api)
|
||||
)
|
||||
for k, v in keys.items():
|
||||
self.assertEqual(res['ipsec_site_connection'][k], v)
|
||||
res = req.get_response(self.ext_api)
|
||||
self.assertEqual(expected_status_int, res.status_int)
|
||||
if expected_status_int == 200:
|
||||
res_dict = self.deserialize(
|
||||
self.fmt,
|
||||
res
|
||||
)
|
||||
for k, v in update.items():
|
||||
self.assertEqual(
|
||||
res_dict['ipsec_site_connection'][k], v)
|
||||
|
||||
def test_update_ipsec_site_connection_with_invalid_state(self):
|
||||
"""Test case to update an ipsec_site_connection in invalid state."""
|
||||
|
Loading…
Reference in New Issue
Block a user