Browse Source

Merge "port_forwarding: validate args before invoking db update" into stable/stein

tags/14.3.1^0
Zuul 1 month ago
committed by Gerrit Code Review
parent
commit
c65d1329e1
2 changed files with 131 additions and 1 deletions
  1. +43
    -0
      neutron/services/portforwarding/pf_plugin.py
  2. +88
    -1
      neutron/tests/unit/services/portforwarding/test_pf_plugin.py

+ 43
- 0
neutron/services/portforwarding/pf_plugin.py View File

@@ -400,6 +400,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
pf_obj.internal_port
})
pf_obj.update_fields(port_forwarding, reset_changes=True)
self._check_port_forwarding_update(context, pf_obj)
pf_obj.update()
except obj_exc.NeutronDbObjectDuplicateEntry:
(__, conflict_params) = self._find_existing_port_forwarding(
@@ -437,6 +438,48 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)

def _check_port_forwarding_update(self, context, pf_obj):
def _raise_port_forwarding_update_failed(conflict):
message = _("Another port forwarding entry with the same "
"attributes already exists, conflicting "
"values are %s") % conflict
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)

db_port = self.core_plugin.get_port(context, pf_obj.internal_port_id)
for fixed_ip in db_port['fixed_ips']:
if str(pf_obj.internal_ip_address) == fixed_ip['ip_address']:
break
else:
# Reached end of internal_port iteration w/out finding fixed_ip
message = _("The internal IP does not correspond to an "
"address on the internal port, which has "
"fixed_ips %s") % db_port['fixed_ips']
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
objs = pf.PortForwarding.get_objects(
context,
floatingip_id=pf_obj.floatingip_id,
protocol=pf_obj.protocol)
for obj in objs:
if obj.id == pf_obj.id:
continue
# Ensure there are no conflicts on the outside
if (obj.floating_ip_address == pf_obj.floating_ip_address and
obj.external_port == pf_obj.external_port):
_raise_port_forwarding_update_failed(
{'floating_ip_address': str(obj.floating_ip_address),
'external_port': obj.external_port})
# Ensure there are no conflicts in the inside
# socket: internal_ip_address + internal_port
if (obj.internal_port_id == pf_obj.internal_port_id and
obj.internal_ip_address == pf_obj.internal_ip_address and
obj.internal_port == pf_obj.internal_port):
_raise_port_forwarding_update_failed(
{'internal_port_id': obj.internal_port_id,
'internal_ip_address': str(obj.internal_ip_address),
'internal_port': obj.internal_port})

def _find_existing_port_forwarding(self, context, floatingip_id,
port_forwarding, specify_params=None):
# Because the session had been flushed by NeutronDbObjectDuplicateEntry


+ 88
- 1
neutron/tests/unit/services/portforwarding/test_pf_plugin.py View File

@@ -15,6 +15,7 @@

import mock

from neutron_lib.callbacks import registry
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
@@ -154,10 +155,14 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
self.pf_plugin.delete_floatingip_port_forwarding,
self.ctxt, 'pf_id', floatingip_id='fip_id')

@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
@mock.patch.object(registry, 'notify')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_update_floatingip_port_forwarding(
self, mock_pf_get_object, mock_rpc_push):
self, mock_pf_get_object, mock_rpc_push, mock_registry_notify,
mock_get_port, mock_pf_get_objects):
pf_input = {
'port_forwarding':
{'port_forwarding': {
@@ -165,7 +170,12 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
'floatingip_id': 'fip_id'}},
'floatingip_id': 'fip_id'}
pf_obj = mock.Mock()
pf_obj.internal_ip_address = "10.0.0.1"
mock_pf_get_object.return_value = pf_obj
port_dict = {'id': 'ID', 'fixed_ips': [{"subnet_id": "test-subnet-id",
"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
mock_pf_get_objects.return_value = []
self.pf_plugin.update_floatingip_port_forwarding(
self.ctxt, 'pf_id', **pf_input)
mock_pf_get_object.assert_called_once_with(self.ctxt, id='pf_id')
@@ -174,6 +184,83 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
mock_rpc_push.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.UPDATED)

@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
def test_check_port_forwarding_update(self, mock_get_port,
mock_pf_get_objects):
port_dict = {'fixed_ips': [{"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
other_pf_obj = mock.Mock(id='cmp_obj_id')
pf_obj = mock.Mock(id='pf_obj_id', internal_port_id='int_port_id',
internal_ip_address='10.0.0.1')
mock_pf_get_objects.return_value = [pf_obj, other_pf_obj]
self.pf_plugin._check_port_forwarding_update(self.ctxt, pf_obj)
mock_get_port.assert_called_once_with(self.ctxt, 'int_port_id')
mock_pf_get_objects.assert_called_once_with(self.ctxt,
floatingip_id=pf_obj.floatingip_id, protocol=pf_obj.protocol)

@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
def test_check_port_forwarding_update_invalid_address(
self, mock_get_port, mock_pf_get_objects):
port_dict = {'fixed_ips': [{"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
pf_obj = mock.Mock(id='pf_obj_id', internal_port_id='int_port_id',
internal_ip_address="99.99.99.99")
self.assertRaisesRegex(
lib_exc.BadRequest,
"not correspond to an address on the internal port",
self.pf_plugin._check_port_forwarding_update,
self.ctxt, pf_obj)
mock_get_port.assert_called_once_with(self.ctxt, 'int_port_id')
mock_pf_get_objects.assert_not_called()

@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
def test_check_port_forwarding_update_invalid_external(
self, mock_get_port, mock_pf_get_objects):
port_dict = {'fixed_ips': [{"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
pf_obj_dict = {'id': 'pf_obj_one',
'floating_ip_address': 'same_fip_addr',
'external_port': 'same_ext_port'}
other_pf_obj = mock.Mock(**pf_obj_dict)
pf_obj_dict.update(id='pf_obj_two', internal_ip_address='10.0.0.1')
pf_obj = mock.Mock(**pf_obj_dict)
mock_pf_get_objects.return_value = [pf_obj, other_pf_obj]
self.assertRaisesRegex(
lib_exc.BadRequest,
"already exist.*same_fip_addr",
self.pf_plugin._check_port_forwarding_update,
self.ctxt, pf_obj)
mock_get_port.assert_called_once_with(self.ctxt, mock.ANY)
mock_pf_get_objects.assert_called_with(
self.ctxt, floatingip_id=mock.ANY, protocol=mock.ANY)

@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
def test_check_port_forwarding_update_invalid_internal(
self, mock_get_port, mock_pf_get_objects):
same_internal_ip = "10.0.0.10"
port_dict = {'fixed_ips': [{"ip_address": same_internal_ip}]}
mock_get_port.return_value = port_dict
pf_obj_dict = {'id': 'pf_obj_one',
'internal_port_id': 'same_int_port_id',
'internal_ip_address': same_internal_ip,
'internal_port': 'same_int_port'}
other_pf_obj = mock.Mock(**pf_obj_dict)
pf_obj_dict.update(id='pf_obj_two')
pf_obj = mock.Mock(**pf_obj_dict)
mock_pf_get_objects.return_value = [pf_obj, other_pf_obj]
self.assertRaisesRegex(
lib_exc.BadRequest,
"already exist.*{}".format(same_internal_ip),
self.pf_plugin._check_port_forwarding_update,
self.ctxt, pf_obj)
mock_get_port.assert_called_once_with(self.ctxt, 'same_int_port_id')
mock_pf_get_objects.assert_called_with(
self.ctxt, floatingip_id=mock.ANY, protocol=mock.ANY)

@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_negative_update_floatingip_port_forwarding(
self, mock_pf_get_object):


Loading…
Cancel
Save