Merge "Verify MTU is valid for ipsec_site_connection"

This commit is contained in:
Jenkins 2013-09-04 05:18:39 +00:00 committed by Gerrit Code Review
commit 0f35cddf1a
3 changed files with 176 additions and 224 deletions

View File

@ -17,6 +17,7 @@
#
# @author: Swaminathan Vasudevan, Hewlett-Packard.
import netaddr
import sqlalchemy as sa
from sqlalchemy import orm
from sqlalchemy.orm import exc
@ -37,6 +38,8 @@ from neutron.plugins.common import constants
LOG = logging.getLogger(__name__)
IP_MIN_MTU = {4: 68, 6: 1280}
class IPsecPeerCidr(model_base.BASEV2):
"""Internal representation of a IPsec Peer Cidrs."""
@ -241,6 +244,9 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
self._get_resource(context,
IPsecPolicy,
ipsec_sitecon['ipsecpolicy_id'])
self._check_mtu(context,
ipsec_sitecon['mtu'],
ipsec_sitecon['vpnservice_id'])
ipsec_site_conn_db = IPsecSiteConnection(
id=uuidutils.generate_uuid(),
tenant_id=tenant_id,
@ -276,6 +282,13 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
raise vpnaas.IPsecSiteConnectionDpdIntervalValueError(
attr='dpd_timeout')
def _check_mtu(self, context, mtu, vpnservice_id):
vpn_service_db = self._get_vpnservice(context, vpnservice_id)
subnet = vpn_service_db.subnet['cidr']
version = netaddr.IPNetwork(subnet).version
if mtu < IP_MIN_MTU[version]:
raise vpnaas.IPsecSiteConnectionMtuError(mtu=mtu, version=version)
def update_ipsec_site_connection(
self, context,
ipsec_site_conn_id, ipsec_site_connection):
@ -296,6 +309,11 @@ class VPNPluginDb(VPNPluginBase, base_db.CommonDbMixin):
'timeout', ipsec_site_conn_db.dpd_timeout)
self._check_dpd(conn)
if 'mtu' in conn:
self._check_mtu(context,
conn['mtu'],
ipsec_site_conn_db.vpnservice_id)
self.assert_update_allowed(ipsec_site_conn_db)
if "peer_cidrs" in conn:

View File

@ -44,6 +44,11 @@ class IPsecSiteConnectionDpdIntervalValueError(qexception.InvalidInput):
"equal to or less than dpd_interval")
class IPsecSiteConnectionMtuError(qexception.InvalidInput):
message = _("ipsec_site_connection MTU %(mtu)d is too small "
"for ipv%(version)s")
class IKEPolicyNotFound(qexception.NotFound):
message = _("IKEPolicy %(ikepolicy_id)s could not be found")

View File

@ -962,15 +962,19 @@ class TestVpnaas(VPNPluginDbTestCase):
dpd_interval=100,
dpd_timeout=100, expected_status_int=400)
def test_create_ipsec_site_connection(self, **extras):
"""Test case to create an ipsec_site_connection."""
ikename = "ikepolicy1"
ipsecname = "ipsecpolicy1"
vpnsname = "vpnservice1"
name = "connection1"
description = "my-ipsec-connection"
keys = {'name': name,
'description': "my-ipsec-connection",
def _test_create_ipsec_site_connection(self, key_overrides=None,
setup_overrides=None,
expected_status_int=200):
"""Create ipsec_site_connection and check results."""
params = {'ikename': 'ikepolicy1',
'ipsecname': 'ipsecpolicy1',
'vpnsname': 'vpnservice1',
'subnet_cidr': '10.2.0.0/24',
'subnet_version': 4}
if setup_overrides is not None:
params.update(setup_overrides)
keys = {'name': 'connection1',
'description': 'my-ipsec-connection',
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
@ -980,17 +984,19 @@ class TestVpnaas(VPNPluginDbTestCase):
'psk': 'abcd',
'status': 'PENDING_CREATE',
'admin_state_up': True}
if key_overrides is not None:
keys.update(key_overrides)
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
keys.update(extras)
with contextlib.nested(
self.ikepolicy(name=ikename),
self.ipsecpolicy(name=ipsecname),
self.subnet(),
self.ikepolicy(name=params['ikename']),
self.ipsecpolicy(name=params['ipsecname']),
self.subnet(cidr=params['subnet_cidr'],
ip_version=params['subnet_version']),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet,
with self.vpnservice(name=params['vpnsname'], subnet=subnet,
router=router) as vpnservice1:
keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
keys['ipsecpolicy_id'] = (
@ -999,9 +1005,10 @@ class TestVpnaas(VPNPluginDbTestCase):
keys['vpnservice_id'] = (
vpnservice1['vpnservice']['id']
)
try:
with self.ipsec_site_connection(
self.fmt,
name,
keys['name'],
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
@ -1015,23 +1022,48 @@ class TestVpnaas(VPNPluginDbTestCase):
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=description,
**extras
description=keys['description']
) as ipsec_site_connection:
if expected_status_int != 200:
self.fail("Expected failure on create")
self._check_ipsec_site_connection(
ipsec_site_connection['ipsec_site_connection'],
keys,
dpd)
except webob.exc.HTTPClientError as ce:
self.assertEqual(ce.code, expected_status_int)
def test_create_ipsec_site_connection(self, **extras):
"""Test case to create an ipsec_site_connection."""
self._test_create_ipsec_site_connection(key_overrides=extras)
def test_create_ipsec_site_connection_invalid_mtu(self):
"""Test creating an ipsec_site_connection with invalid MTU."""
self._test_create_ipsec_site_connection(key_overrides={'mtu': 67},
expected_status_int=400)
ipv6_overrides = {
'peer_address': 'fe80::c0a8:10a',
'peer_id': 'fe80::c0a8:10a',
'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
'mtu': 1279}
ipv6_setup_params = {'subnet_cidr': 'fe80::a01:0/120',
'subnet_version': 6}
self._test_create_ipsec_site_connection(
key_overrides=ipv6_overrides,
setup_overrides=ipv6_setup_params,
expected_status_int=400)
def _check_ipsec_site_connection(self, ipsec_site_connection, keys, dpd):
self.assertEqual(
keys,
dict((k, v) for k, v
in ipsec_site_connection.items()
if k in keys), keys)
if k in keys))
self.assertEqual(
dpd,
dict((k, v) for k, v
in ipsec_site_connection['dpd'].items()
if k in dpd), dpd)
if k in dpd))
def test_delete_ipsec_site_connection(self):
"""Test case to delete a ipsec_site_connection."""
@ -1045,14 +1077,23 @@ class TestVpnaas(VPNPluginDbTestCase):
self.assertEqual(res.status_int, 204)
def test_update_ipsec_site_connection(self):
"""Test case for valid updates to IPSec site connection."""
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
self._test_update_ipsec_site_connection(
update={'dpd': dpd}
)
self._test_update_ipsec_site_connection(update={'dpd': dpd})
self._test_update_ipsec_site_connection(update={'mtu': 2000})
ipv6_settings = {
'peer_address': 'fe80::c0a8:10a',
'peer_id': 'fe80::c0a8:10a',
'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
'subnet_cidr': 'fe80::a02:0/120',
'subnet_version': 6}
self._test_update_ipsec_site_connection(update={'mtu': 2000},
overrides=ipv6_settings)
def test_update_ipsec_site_connection_with_invalid_dpd(self):
"""Test updates to ipsec_site_connection with invalid DPD settings."""
dpd1 = {'action': 'hold',
'interval': 100,
'timeout': 100}
@ -1072,159 +1113,44 @@ class TestVpnaas(VPNPluginDbTestCase):
update={'dpd': dpd3},
expected_status_int=400)
def _test_update_ipsec_site_connection(
self, update=None, expected_status_int=200):
"""Test case to update a ipsec_site_connection."""
name = 'new_ipsec_site_connection'
ikename = 'ikepolicy1'
ipsecname = 'ipsecpolicy1'
vpnsname = 'vpnservice1'
description = 'my-ipsec-connection'
keys = {'name': 'new_ipsec_site_connection',
'description': "my-ipsec-connection",
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
'initiator': 'bi-directional',
'mtu': 1500,
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'ACTIVE',
'admin_state_up': True}
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
with contextlib.nested(
self.ikepolicy(name=ikename),
self.ipsecpolicy(name=ipsecname),
self.subnet(cidr='10.2.0.0/24'),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet,
router=router) as vpnservice1:
keys['vpnservice_id'] = (
vpnservice1['vpnservice']['id']
)
keys['ikepolicy_id'] = (
ikepolicy['ikepolicy']['id']
)
keys['ipsecpolicy_id'] = (
ipsecpolicy['ipsecpolicy']['id']
)
with self.ipsec_site_connection(
self.fmt,
name,
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
dpd['action'],
dpd['interval'],
dpd['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=description
) as ipsec_site_connection:
if not update:
update = {'name': name}
data = {'ipsec_site_connection': update}
self._set_active(
vpn_db.IPsecSiteConnection,
ipsec_site_connection['ipsec_site_connection']['id'])
req = self.new_update_request(
'ipsec-site-connections',
data,
ipsec_site_connection['ipsec_site_connection']['id']
)
res = req.get_response(self.ext_api)
self.assertEqual(expected_status_int, res.status_int)
if expected_status_int == 200:
res_dict = self.deserialize(
self.fmt,
res
)
for k, v in update.items():
self.assertEqual(
res_dict['ipsec_site_connection'][k], v)
def test_update_ipsec_site_connection_with_invalid_mtu(self):
"""Test updates to ipsec_site_connection with invalid MTU settings."""
self._test_update_ipsec_site_connection(
update={'mtu': 67}, expected_status_int=400)
ipv6_settings = {
'peer_address': 'fe80::c0a8:10a',
'peer_id': 'fe80::c0a8:10a',
'peer_cidrs': ['fe80::c0a8:200/120', 'fe80::c0a8:300/120'],
'subnet_cidr': 'fe80::a02:0/120',
'subnet_version': 6}
self._test_update_ipsec_site_connection(
update={'mtu': 1279},
overrides=ipv6_settings,
expected_status_int=400)
def test_update_ipsec_site_connection_with_invalid_state(self):
"""Test case to update an ipsec_site_connection in invalid state."""
name = 'new_ipsec_site_connection'
ikename = 'ikepolicy1'
ipsecname = 'ipsecpolicy1'
vpnsname = 'vpnservice1'
description = 'my-ipsec-connection'
keys = {'name': 'new_ipsec_site_connection',
'description': "my-ipsec-connection",
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
'initiator': 'bi-directional',
'mtu': 1500,
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'ACTIVE',
'admin_state_up': True}
dpd = {'action': 'hold',
'interval': 40,
'timeout': 120}
with contextlib.nested(
self.ikepolicy(name=ikename),
self.ipsecpolicy(name=ipsecname),
self.subnet(cidr='10.2.0.0/24'),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet,
router=router) as vpnservice1:
keys['vpnservice_id'] = (
vpnservice1['vpnservice']['id']
)
keys['ikepolicy_id'] = (
ikepolicy['ikepolicy']['id']
)
keys['ipsecpolicy_id'] = (
ipsecpolicy['ipsecpolicy']['id']
)
with self.ipsec_site_connection(
self.fmt,
name,
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
dpd['action'],
dpd['interval'],
dpd['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=description
) as ipsec_site_connection:
data = {'ipsec_site_connection': {'name': name}}
req = self.new_update_request(
'ipsec-site-connections',
data,
ipsec_site_connection['ipsec_site_connection']['id']
)
res = req.get_response(self.ext_api)
self.assertEqual(400, res.status_int)
"""Test updating an ipsec_site_connection in invalid state."""
self._test_update_ipsec_site_connection(
overrides={'make_active': False},
expected_status_int=400)
def test_update_ipsec_site_connection_peer_cidrs(self):
"""Test case to update a ipsec_site_connection for peer_cidrs."""
name = 'ipsec_site_connection'
ikename = 'ikepolicy1'
ipsecname = 'ipsecpolicy1'
vpnsname = 'vpnservice1'
description = 'my-ipsec-connection'
keys = {'name': 'ipsec_site_connection',
'description': "my-ipsec-connection",
"""Test updating an ipsec_site_connection for peer_cidrs."""
new_peers = {'peer_cidrs': ['192.168.4.0/24',
'192.168.5.0/24']}
self._test_update_ipsec_site_connection(
update=new_peers)
def _test_update_ipsec_site_connection(self,
update={'name': 'new name'},
overrides=None,
expected_status_int=200):
"""Creates and then updates ipsec_site_connection."""
keys = {'name': 'new_ipsec_site_connection',
'ikename': 'ikepolicy1',
'ipsecname': 'ipsecpolicy1',
'vpnsname': 'vpnservice1',
'description': 'my-ipsec-connection',
'peer_address': '192.168.1.10',
'peer_id': '192.168.1.10',
'peer_cidrs': ['192.168.2.0/24', '192.168.3.0/24'],
@ -1233,60 +1159,63 @@ class TestVpnaas(VPNPluginDbTestCase):
'tenant_id': self._tenant_id,
'psk': 'abcd',
'status': 'ACTIVE',
'admin_state_up': True}
dpd = {'action': 'hold',
'admin_state_up': True,
'action': 'hold',
'interval': 40,
'timeout': 120}
'timeout': 120,
'subnet_cidr': '10.2.0.0/24',
'subnet_version': 4,
'make_active': True}
if overrides is not None:
keys.update(overrides)
with contextlib.nested(
self.ikepolicy(name=ikename),
self.ipsecpolicy(name=ipsecname),
self.subnet(cidr='10.2.0.0/24'),
self.ikepolicy(name=keys['ikename']),
self.ipsecpolicy(name=keys['ipsecname']),
self.subnet(cidr=keys['subnet_cidr'],
ip_version=keys['subnet_version']),
self.router()) as (
ikepolicy, ipsecpolicy, subnet, router):
with self.vpnservice(name=vpnsname, subnet=subnet,
with self.vpnservice(name=keys['vpnsname'], subnet=subnet,
router=router) as vpnservice1:
keys['vpnservice_id'] = vpnservice1['vpnservice']['id']
keys['ikepolicy_id'] = ikepolicy['ikepolicy']['id']
keys['ipsecpolicy_id'] = ipsecpolicy['ipsecpolicy']['id']
with self.ipsec_site_connection(
self.fmt,
name,
keys['name'],
keys['peer_address'],
keys['peer_id'],
keys['peer_cidrs'],
keys['mtu'],
keys['psk'],
keys['initiator'],
dpd['action'],
dpd['interval'],
dpd['timeout'],
keys['action'],
keys['interval'],
keys['timeout'],
vpnservice1,
ikepolicy,
ipsecpolicy,
keys['admin_state_up'],
description=description
description=keys['description']
) as ipsec_site_connection:
data = {'ipsec_site_connection': {
'peer_cidrs': ['192.168.2.0/24',
'192.168.3.0/24']
}}
data = {'ipsec_site_connection': update}
if keys.get('make_active', None):
self._set_active(
vpn_db.IPsecSiteConnection,
ipsec_site_connection['ipsec_site_connection']['id'])
(ipsec_site_connection['ipsec_site_connection']
['id']))
req = self.new_update_request(
'ipsec-site-connections',
data,
ipsec_site_connection[
'ipsec_site_connection']['id']
)
res = self.deserialize(
self.fmt,
req.get_response(self.ext_api)
)
self._check_ipsec_site_connection(
res['ipsec_site_connection'],
keys,
dpd)
ipsec_site_connection['ipsec_site_connection']['id'])
res = req.get_response(self.ext_api)
self.assertEqual(expected_status_int, res.status_int)
if expected_status_int == 200:
res_dict = self.deserialize(self.fmt, res)
for k, v in update.items():
self.assertEqual(
res_dict['ipsec_site_connection'][k], v)
def test_show_ipsec_site_connection(self):
"""Test case to show a ipsec_site_connection."""