port_forwarding: validate args before invoking db update

Add validator to update_floatingip_port_forwarding so codepath does not
attempt performing invalid database operation. With that, operation fails
right away, with a hint on the offending argument(s).

This is a backport that combines changes from 2 changes that go together:
https://review.opendev.org/#/c/738145/
https://review.opendev.org/#/c/744993/

Note: pep8 failed with following error:
  ./neutron/tests/unit/services/portforwarding/test_pf_plugin.py:237:9:
  ./neutron/tests/unit/services/portforwarding/test_pf_plugin.py:261:9:
    N322  Possible use of no-op mock method. please use assert_called_once_with.
        mock_pf_get_objects.assert_called_once()
        ^
So additional changes were needed for backport.

Change-Id: I8284b22c5d691bfd9eadeb8590c3d4b27d261b04
Closes-Bug: #1878299
(cherry picked from commit f379740348)
(cherry picked from commit 838399f0a4)
This commit is contained in:
Flavio Fernandes 2020-06-25 23:34:32 -04:00
parent 47ec363f5f
commit 4ac33d1a9b
2 changed files with 131 additions and 1 deletions

View File

@ -397,6 +397,7 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
pf_obj.internal_port
})
pf_obj.update_fields(port_forwarding, reset_changes=True)
self._check_port_forwarding_update(context, pf_obj)
pf_obj.update()
except obj_exc.NeutronDbObjectDuplicateEntry:
(__, conflict_params) = self._find_existing_port_forwarding(
@ -434,6 +435,48 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
def _check_port_forwarding_update(self, context, pf_obj):
def _raise_port_forwarding_update_failed(conflict):
message = _("Another port forwarding entry with the same "
"attributes already exists, conflicting "
"values are %s") % conflict
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
db_port = self.core_plugin.get_port(context, pf_obj.internal_port_id)
for fixed_ip in db_port['fixed_ips']:
if str(pf_obj.internal_ip_address) == fixed_ip['ip_address']:
break
else:
# Reached end of internal_port iteration w/out finding fixed_ip
message = _("The internal IP does not correspond to an "
"address on the internal port, which has "
"fixed_ips %s") % db_port['fixed_ips']
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
objs = pf.PortForwarding.get_objects(
context,
floatingip_id=pf_obj.floatingip_id,
protocol=pf_obj.protocol)
for obj in objs:
if obj.id == pf_obj.id:
continue
# Ensure there are no conflicts on the outside
if (obj.floating_ip_address == pf_obj.floating_ip_address and
obj.external_port == pf_obj.external_port):
_raise_port_forwarding_update_failed(
{'floating_ip_address': str(obj.floating_ip_address),
'external_port': obj.external_port})
# Ensure there are no conflicts in the inside
# socket: internal_ip_address + internal_port
if (obj.internal_port_id == pf_obj.internal_port_id and
obj.internal_ip_address == pf_obj.internal_ip_address and
obj.internal_port == pf_obj.internal_port):
_raise_port_forwarding_update_failed(
{'internal_port_id': obj.internal_port_id,
'internal_ip_address': str(obj.internal_ip_address),
'internal_port': obj.internal_port})
def _find_existing_port_forwarding(self, context, floatingip_id,
port_forwarding, specify_params=None):
# Because the session had been flushed by NeutronDbObjectDuplicateEntry

View File

@ -15,6 +15,7 @@
import mock
from neutron_lib.callbacks import registry
from neutron_lib import context
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
@ -154,10 +155,14 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
self.pf_plugin.delete_floatingip_port_forwarding,
self.ctxt, 'pf_id', floatingip_id='fip_id')
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
@mock.patch.object(registry, 'notify')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_update_floatingip_port_forwarding(
self, mock_pf_get_object, mock_rpc_push):
self, mock_pf_get_object, mock_rpc_push, mock_registry_notify,
mock_get_port, mock_pf_get_objects):
pf_input = {
'port_forwarding':
{'port_forwarding': {
@ -165,7 +170,12 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
'floatingip_id': 'fip_id'}},
'floatingip_id': 'fip_id'}
pf_obj = mock.Mock()
pf_obj.internal_ip_address = "10.0.0.1"
mock_pf_get_object.return_value = pf_obj
port_dict = {'id': 'ID', 'fixed_ips': [{"subnet_id": "test-subnet-id",
"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
mock_pf_get_objects.return_value = []
self.pf_plugin.update_floatingip_port_forwarding(
self.ctxt, 'pf_id', **pf_input)
mock_pf_get_object.assert_called_once_with(self.ctxt, id='pf_id')
@ -174,6 +184,83 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
mock_rpc_push.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.UPDATED)
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
def test_check_port_forwarding_update(self, mock_get_port,
mock_pf_get_objects):
port_dict = {'fixed_ips': [{"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
other_pf_obj = mock.Mock(id='cmp_obj_id')
pf_obj = mock.Mock(id='pf_obj_id', internal_port_id='int_port_id',
internal_ip_address='10.0.0.1')
mock_pf_get_objects.return_value = [pf_obj, other_pf_obj]
self.pf_plugin._check_port_forwarding_update(self.ctxt, pf_obj)
mock_get_port.assert_called_once_with(self.ctxt, 'int_port_id')
mock_pf_get_objects.assert_called_once_with(self.ctxt,
floatingip_id=pf_obj.floatingip_id, protocol=pf_obj.protocol)
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
def test_check_port_forwarding_update_invalid_address(
self, mock_get_port, mock_pf_get_objects):
port_dict = {'fixed_ips': [{"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
pf_obj = mock.Mock(id='pf_obj_id', internal_port_id='int_port_id',
internal_ip_address="99.99.99.99")
self.assertRaisesRegex(
lib_exc.BadRequest,
"not correspond to an address on the internal port",
self.pf_plugin._check_port_forwarding_update,
self.ctxt, pf_obj)
mock_get_port.assert_called_once_with(self.ctxt, 'int_port_id')
mock_pf_get_objects.assert_not_called()
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
def test_check_port_forwarding_update_invalid_external(
self, mock_get_port, mock_pf_get_objects):
port_dict = {'fixed_ips': [{"ip_address": "10.0.0.1"}]}
mock_get_port.return_value = port_dict
pf_obj_dict = {'id': 'pf_obj_one',
'floating_ip_address': 'same_fip_addr',
'external_port': 'same_ext_port'}
other_pf_obj = mock.Mock(**pf_obj_dict)
pf_obj_dict.update(id='pf_obj_two', internal_ip_address='10.0.0.1')
pf_obj = mock.Mock(**pf_obj_dict)
mock_pf_get_objects.return_value = [pf_obj, other_pf_obj]
self.assertRaisesRegex(
lib_exc.BadRequest,
"already exist.*same_fip_addr",
self.pf_plugin._check_port_forwarding_update,
self.ctxt, pf_obj)
mock_get_port.assert_called_once_with(self.ctxt, mock.ANY)
mock_pf_get_objects.assert_called_with(
self.ctxt, floatingip_id=mock.ANY, protocol=mock.ANY)
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
def test_check_port_forwarding_update_invalid_internal(
self, mock_get_port, mock_pf_get_objects):
same_internal_ip = "10.0.0.10"
port_dict = {'fixed_ips': [{"ip_address": same_internal_ip}]}
mock_get_port.return_value = port_dict
pf_obj_dict = {'id': 'pf_obj_one',
'internal_port_id': 'same_int_port_id',
'internal_ip_address': same_internal_ip,
'internal_port': 'same_int_port'}
other_pf_obj = mock.Mock(**pf_obj_dict)
pf_obj_dict.update(id='pf_obj_two')
pf_obj = mock.Mock(**pf_obj_dict)
mock_pf_get_objects.return_value = [pf_obj, other_pf_obj]
self.assertRaisesRegex(
lib_exc.BadRequest,
"already exist.*{}".format(same_internal_ip),
self.pf_plugin._check_port_forwarding_update,
self.ctxt, pf_obj)
mock_get_port.assert_called_once_with(self.ctxt, 'same_int_port_id')
mock_pf_get_objects.assert_called_with(
self.ctxt, floatingip_id=mock.ANY, protocol=mock.ANY)
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_negative_update_floatingip_port_forwarding(
self, mock_pf_get_object):