Use neutron-lib payloads for PORT_FORWARDING

This patch switches the code over to the payload style of callbacks [1]
for PORT_FORWARDING events.

[1]https://docs.openstack.org/neutron-lib/latest/contributor/callbacks.html

Change-Id: Id0c4bacb076da08c430276945e856c793ef662e5
This commit is contained in:
Nurmatov Mamatisa 2021-07-30 09:45:45 +03:00
parent 578f74ca87
commit eb430e052d
6 changed files with 116 additions and 138 deletions

View File

@ -1,28 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(flaviof): remove this once moved over to neutron-lib payloads
class PortForwardingPayload(object):
"""Payload for port-forwarding-related callback registry notifications."""
def __init__(self, context, current_pf=None, original_pf=None):
self.context = context
self.current_pf = current_pf
self.original_pf = original_pf
def __eq__(self, other):
return (isinstance(other, self.__class__) and
self.__dict__ == other.__dict__)
def __ne__(self, other):
return not self.__eq__(other)

View File

@ -135,15 +135,8 @@ class OVNPortForwarding(object):
def _get_fip_objs(self, context, payload):
floatingip_ids = set()
# Note on floatingip_id from payload: depending on the event that
# generated the payload provided, expect pf_payload.current_pf (in
# DELETE events) or pf_payload.original_pf (CREATE events) to be None.
# To be agnostic of what event this is, simply build a set from both.
for pf_payload in payload:
if pf_payload.current_pf:
floatingip_ids.add(pf_payload.current_pf.floatingip_id)
if pf_payload.original_pf:
floatingip_ids.add(pf_payload.original_pf.floatingip_id)
for fip in payload.states:
floatingip_ids.add(fip.floatingip_id)
return {fip_id: self._l3_plugin.get_floatingip(context, fip_id)
for fip_id in floatingip_ids}
@ -176,30 +169,28 @@ class OVNPortForwarding(object):
def _handle_notification(self, _resource, event_type, _pf_plugin, payload):
if not payload:
return
context = payload[0].context
context = payload.context
ovn_nb = self._l3_plugin._ovn
with ovn_nb.transaction(check_error=True) as ovn_txn:
if event_type == events.AFTER_CREATE:
for pf_payload in payload:
self._handler.port_forwarding_created(ovn_txn, ovn_nb,
pf_payload.current_pf)
self._l3_plugin.update_floatingip_status(
context, pf_payload.current_pf.floatingip_id,
self._handler.port_forwarding_created(ovn_txn, ovn_nb,
payload.latest_state)
self._l3_plugin.update_floatingip_status(
context, payload.latest_state.floatingip_id,
const.FLOATINGIP_STATUS_ACTIVE)
elif event_type == events.AFTER_UPDATE:
for pf_payload in payload:
self._handler.port_forwarding_updated(ovn_txn, ovn_nb,
pf_payload.current_pf, pf_payload.original_pf)
self._handler.port_forwarding_updated(
ovn_txn, ovn_nb,
payload.latest_state, payload.states[0])
elif event_type == events.AFTER_DELETE:
for pf_payload in payload:
pfs = _pf_plugin.get_floatingip_port_forwardings(
context, pf_payload.original_pf.floatingip_id)
self._handler.port_forwarding_deleted(ovn_txn, ovn_nb,
pf_payload.original_pf)
if not pfs:
self._l3_plugin.update_floatingip_status(
context, pf_payload.original_pf.floatingip_id,
const.FLOATINGIP_STATUS_DOWN)
pfs = _pf_plugin.get_floatingip_port_forwardings(
context, payload.states[0].floatingip_id)
self._handler.port_forwarding_deleted(ovn_txn, ovn_nb,
payload.states[0])
if not pfs:
self._l3_plugin.update_floatingip_status(
context, payload.states[0].floatingip_id,
const.FLOATINGIP_STATUS_DOWN)
# Collect the revision numbers of all floating ips visited and
# update the corresponding load balancer entries affected.

View File

@ -46,7 +46,6 @@ from neutron.extensions import floating_ip_port_forwarding as fip_pf
from neutron.objects import base as base_obj
from neutron.objects import port_forwarding as pf
from neutron.objects import router as l3_obj
from neutron.services.portforwarding import callbacks
from neutron.services.portforwarding.common import exceptions as pf_exc
from neutron.services.portforwarding import constants as pf_consts
@ -270,11 +269,12 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
if self._rpc_notifications_required:
self.push_api.push(context, remove_port_forwarding_list,
rpc_events.DELETED)
registry_notify_payload = [
callbacks.PortForwardingPayload(context, original_pf=pf_obj) for
pf_obj in remove_port_forwarding_list]
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_DELETE, self,
payload=registry_notify_payload)
for pf_obj in remove_port_forwarding_list:
payload = events.DBEventPayload(context, states=(pf_obj,))
registry.publish(pf_consts.PORT_FORWARDING,
events.AFTER_DELETE,
self,
payload=payload)
def _get_internal_ip_subnet(self, request_ip, fixed_ips):
request_ip = netaddr.IPNetwork(request_ip)
@ -396,10 +396,10 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
raise lib_exc.BadRequest(resource=apidef.RESOURCE_NAME,
msg=message)
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_CREATE,
self,
payload=[callbacks.PortForwardingPayload(context,
current_pf=pf_obj)])
registry.publish(pf_consts.PORT_FORWARDING, events.AFTER_CREATE,
self,
payload=events.DBEventPayload(context,
states=(pf_obj,)))
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.CREATED)
@ -456,9 +456,11 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
msg=message)
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.UPDATED)
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_UPDATE, self,
payload=[callbacks.PortForwardingPayload(context,
current_pf=pf_obj, original_pf=original_pf_obj)])
registry.publish(pf_consts.PORT_FORWARDING, events.AFTER_UPDATE,
self,
payload=events.DBEventPayload(
context,
states=(original_pf_obj, pf_obj)))
return pf_obj
def _check_router_match(self, context, fip_obj, router_id, pf_dict):
@ -600,9 +602,10 @@ class PortForwardingPlugin(fip_pf.PortForwardingPluginBase):
pf_obj.delete()
if self._rpc_notifications_required:
self.push_api.push(context, [pf_obj], rpc_events.DELETED)
registry.notify(pf_consts.PORT_FORWARDING, events.AFTER_DELETE, self,
payload=[callbacks.PortForwardingPayload(
context, original_pf=pf_obj)])
registry.publish(pf_consts.PORT_FORWARDING, events.AFTER_DELETE,
self,
payload=events.DBEventPayload(context,
states=(pf_obj,)))
def sync_port_forwarding_fip(self, context, routers):
if not routers:

View File

@ -990,7 +990,7 @@ class TestMaintenance(_TestMaintenanceHelper):
p1 = self._create_port('testp1', net1['id'])
p1_ip = p1['fixed_ips'][0]['ip_address']
with mock.patch('neutron_lib.callbacks.registry.notify') as m_notify:
with mock.patch('neutron_lib.callbacks.registry.publish') as m_publish:
# > Create
fip_pf_args = {
pf_def.EXTERNAL_PORT: 2222,
@ -1000,7 +1000,7 @@ class TestMaintenance(_TestMaintenanceHelper):
pf_def.INTERNAL_IP_ADDRESS: p1_ip}
pf_obj = self.pf_plugin.create_floatingip_port_forwarding(
self.context, fip_id, **fip_attrs(fip_pf_args))
m_notify.assert_called_once()
m_publish.assert_called_once()
# Assert load balancer for port forwarding was not created
self.assertFalse(self._find_pf_lb(router_id, fip_id))
@ -1015,10 +1015,10 @@ class TestMaintenance(_TestMaintenanceHelper):
fip_pf_args = {pf_def.EXTERNAL_PORT: 5353,
pf_def.INTERNAL_PORT: 53,
pf_def.PROTOCOL: 'udp'}
m_notify.reset_mock()
m_publish.reset_mock()
self.pf_plugin.update_floatingip_port_forwarding(
self.context, pf_obj['id'], fip_id, **fip_attrs(fip_pf_args))
m_notify.assert_called_once()
m_publish.assert_called_once()
# Assert load balancer for port forwarding is stale
_verify_lb(self, 'tcp', 2222, 22)
@ -1030,10 +1030,10 @@ class TestMaintenance(_TestMaintenanceHelper):
_verify_lb(self, 'udp', 5353, 53)
# > Delete
m_notify.reset_mock()
m_publish.reset_mock()
self.pf_plugin.delete_floatingip_port_forwarding(
self.context, pf_obj['id'], fip_id)
m_notify.assert_called_once()
m_publish.assert_called_once()
# Assert load balancer for port forwarding is stale
_verify_lb(self, 'udp', 5353, 53)

View File

@ -52,19 +52,22 @@ class TestOVNPortForwardingBase(base.BaseTestCase):
def _fake_pf_payload_entry(self, curr_pf_id, orig_pf_id=None, **kwargs):
mock_pf_payload = mock.Mock()
fake_pf_obj = self._fake_pf_obj(**kwargs)
states = []
if 'context' not in kwargs:
mock_pf_payload.context = self.context
if curr_pf_id:
mock_pf_payload.current_pf = fake_pf_obj
mock_pf_payload.current_pf.floatingip_id = curr_pf_id
else:
mock_pf_payload.current_pf = None
if orig_pf_id:
mock_pf_payload.original_pf = fake_pf_obj
mock_pf_payload.original_pf.floatingip_id = orig_pf_id
fake_pf_obj = self._fake_pf_obj(**kwargs)
fake_pf_obj.floatingip_id = orig_pf_id
states.append(fake_pf_obj)
if curr_pf_id:
fake_pf_obj = self._fake_pf_obj(**kwargs)
mock_pf_payload.latest_state = fake_pf_obj
mock_pf_payload.latest_state.floatingip_id = curr_pf_id
states.append(fake_pf_obj)
else:
mock_pf_payload.original_pf = None
mock_pf_payload.latest_state = None
mock_pf_payload.states = states
return mock_pf_payload
@ -223,13 +226,10 @@ class TestOVNPortForwarding(TestOVNPortForwardingBase):
pf_objs[index].router_id)
def test_get_fip_objs(self):
pf_payload = [self._fake_pf_payload_entry(1),
self._fake_pf_payload_entry(2),
self._fake_pf_payload_entry(None, 1),
self._fake_pf_payload_entry(1, 3)]
payload = self._fake_pf_payload_entry(1, 2)
self.l3_plugin.get_floatingip = lambda _, fip_id: fip_id * 10
fip_objs = self._ovn_pf._get_fip_objs(self.context, pf_payload)
self.assertEqual({3: 30, 2: 20, 1: 10}, fip_objs)
fip_objs = self._ovn_pf._get_fip_objs(self.context, payload)
self.assertEqual({1: 10, 2: 20}, fip_objs)
def _handle_notification_common(self, event_type, payload=None,
fip_objs=None):
@ -237,6 +237,7 @@ class TestOVNPortForwarding(TestOVNPortForwardingBase):
payload = []
if not fip_objs:
fip_objs = {}
self.fake_db_rev.reset_mock()
with mock.patch.object(self._ovn_pf, '_get_fip_objs',
return_value=fip_objs) as mock_get_fip_objs:
self._ovn_pf._handle_notification(None, event_type,
@ -256,59 +257,70 @@ class TestOVNPortForwarding(TestOVNPortForwardingBase):
def test_handle_notification_noop(self):
self._handle_notification_common(events.AFTER_CREATE)
weird_event_type = 666
fake_payload = [self._fake_pf_payload_entry(None)]
fake_payload = self._fake_pf_payload_entry(None)
self._handle_notification_common(weird_event_type, fake_payload)
def test_handle_notification_basic(self):
fake_payload_entry = self._fake_pf_payload_entry(1)
self._handle_notification_common(events.AFTER_CREATE,
[fake_payload_entry])
fake_payload_entry)
self.handler.port_forwarding_created.assert_called_once_with(
mock.ANY, self.l3_plugin._ovn, fake_payload_entry.current_pf)
mock.ANY, self.l3_plugin._ovn, fake_payload_entry.latest_state)
def test_handle_notification_create(self):
fip_objs = {1: {'description': 'one'},
3: {'description': 'three', 'revision_number': '321'}}
fake_payload = [self._fake_pf_payload_entry(id) for id in range(1, 4)]
self._handle_notification_common(events.AFTER_CREATE, fake_payload,
fip_objs)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn, entry.current_pf)
for entry in fake_payload]
self.handler.port_forwarding_created.assert_has_calls(calls)
update_calls = [mock.call(
self.context, entry.current_pf.floatingip_id,
const.FLOATINGIP_STATUS_ACTIVE) for entry in fake_payload]
self.l3_plugin.update_floatingip_status.assert_has_calls(update_calls)
for id in range(1, 4):
fake_payload = self._fake_pf_payload_entry(id)
self._handle_notification_common(events.AFTER_CREATE,
fake_payload,
fip_objs)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn,
fake_payload.latest_state)]
self.handler.port_forwarding_created.assert_has_calls(calls)
update_calls = [mock.call(
self.context, fake_payload.latest_state.floatingip_id,
const.FLOATINGIP_STATUS_ACTIVE)]
self.l3_plugin.update_floatingip_status.assert_has_calls(
update_calls)
def test_handle_notification_update(self):
fip_objs = {100: {'description': 'hundred'}, 101: {}}
fake_payload = [self._fake_pf_payload_entry(100, 100),
self._fake_pf_payload_entry(101, 101)]
fake_payload = self._fake_pf_payload_entry(100, 100)
self._handle_notification_common(events.AFTER_UPDATE, fake_payload,
fip_objs)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn, entry.current_pf,
entry.original_pf) for entry in fake_payload]
calls = [mock.call(mock.ANY, self.l3_plugin._ovn,
fake_payload.latest_state,
fake_payload.states[0])]
self.handler.port_forwarding_updated.assert_has_calls(calls)
fake_payload = self._fake_pf_payload_entry(101, 101)
self._handle_notification_common(events.AFTER_UPDATE, fake_payload,
fip_objs)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn,
fake_payload.latest_state,
fake_payload.states[0])]
self.handler.port_forwarding_updated.assert_has_calls(calls)
def test_handle_notification_delete(self):
fip_objs = {1: {'description': 'one'},
2: {'description': 'two', 'revision_number': '222'}}
fake_payload = [self._fake_pf_payload_entry(None, id)
for id in range(1, 4)]
with mock.patch.object(
self.pf_plugin, 'get_floatingip_port_forwardings',
return_value=[]):
self._handle_notification_common(
events.AFTER_DELETE, fake_payload, fip_objs)
calls = [mock.call(
mock.ANY, self.l3_plugin._ovn, entry.original_pf)
for entry in fake_payload]
self.handler.port_forwarding_deleted.assert_has_calls(calls)
update_calls = [mock.call(
self.context, entry.original_pf.floatingip_id,
const.FLOATINGIP_STATUS_DOWN) for entry in fake_payload]
self.l3_plugin.update_floatingip_status.assert_has_calls(
update_calls)
for id in range(1, 4):
fake_payload = self._fake_pf_payload_entry(None, id)
with mock.patch.object(
self.pf_plugin, 'get_floatingip_port_forwardings',
return_value=[]):
self._handle_notification_common(events.AFTER_DELETE,
fake_payload, fip_objs)
calls = [mock.call(
mock.ANY, self.l3_plugin._ovn, fake_payload.states[0])]
self.handler.port_forwarding_deleted.assert_has_calls(calls)
update_calls = [mock.call(
self.context, fake_payload.states[0].floatingip_id,
const.FLOATINGIP_STATUS_DOWN)]
self.l3_plugin.update_floatingip_status.assert_has_calls(
update_calls)
def test_maintenance_create_or_update(self):
pf_objs = [self._fake_pf_obj()]

View File

@ -108,14 +108,14 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
get_objects_mock.assert_called_once_with(
self.ctxt, _pager=mock.ANY, floatingip_id=None)
@mock.patch.object(registry, 'notify')
@mock.patch.object(registry, 'publish')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(router.FloatingIP, 'get_object')
def test_delete_floatingip_port_forwarding(
self, fip_get_object_mock, pf_get_objects_mock,
pf_get_object_mock, push_api_mock, registry_notify_mock):
pf_get_object_mock, push_api_mock, mock_registry_publish):
# After delete, not empty resource list
pf_get_objects_mock.return_value = [mock.Mock(id='pf_id'),
@ -129,7 +129,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
pf_obj.delete.assert_called()
push_api_mock.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.DELETED)
registry_notify_mock.assert_called_once_with(
mock_registry_publish.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_DELETE, self.pf_plugin, payload=mock.ANY)
@ -137,7 +137,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
pf_get_objects_mock.reset_mock()
pf_get_object_mock.reset_mock()
push_api_mock.reset_mock()
registry_notify_mock.reset_mock()
mock_registry_publish.reset_mock()
pf_obj = mock.Mock(id='need_to_delete_pf_id', floatingip_id='fip_id')
fip_obj = mock.Mock(id='fip_id')
fip_get_object_mock.return_value = fip_obj
@ -155,7 +155,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
fip_obj.update.assert_called()
push_api_mock.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.DELETED)
registry_notify_mock.assert_called_once_with(
mock_registry_publish.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_DELETE, self.pf_plugin, payload=mock.ANY)
@ -170,11 +170,11 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
@mock.patch.object(port_forwarding.PortForwarding, 'get_objects')
@mock.patch.object(db_base_plugin_v2.NeutronDbPluginV2, 'get_port')
@mock.patch.object(registry, 'notify')
@mock.patch.object(registry, 'publish')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(port_forwarding.PortForwarding, 'get_object')
def test_update_floatingip_port_forwarding(
self, mock_pf_get_object, mock_rpc_push, mock_registry_notify,
self, mock_pf_get_object, mock_rpc_push, mock_registry_publish,
mock_get_port, mock_pf_get_objects):
pf_input = {
'port_forwarding':
@ -196,7 +196,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
self.assertTrue(pf_obj.update)
mock_rpc_push.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.UPDATED)
mock_registry_notify.assert_called_once_with(
mock_registry_publish.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_UPDATE, self.pf_plugin, payload=mock.ANY)
@ -293,7 +293,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
@mock.patch.object(pf_plugin.PortForwardingPlugin,
'_check_port_has_binding_floating_ip')
@mock.patch.object(obj_base.NeutronDbObject, 'update_objects')
@mock.patch.object(registry, 'notify')
@mock.patch.object(registry, 'publish')
@mock.patch.object(resources_rpc.ResourcesPushRpcApi, 'push')
@mock.patch.object(pf_plugin.PortForwardingPlugin, '_check_router_match')
@mock.patch.object(pf_plugin.PortForwardingPlugin,
@ -303,7 +303,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
@mock.patch('neutron.objects.port_forwarding.PortForwarding')
def test_create_floatingip_port_forwarding(
self, mock_port_forwarding, mock_fip_get_object, mock_find_router,
mock_check_router_match, mock_push_api, mock_registry_notify,
mock_check_router_match, mock_push_api, mock_registry_publish,
mock_update_objects, mock_check_bind_fip):
# Update fip
pf_input = {
@ -326,7 +326,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
self.assertTrue(pf_obj.create.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
mock_registry_notify.assert_called_once_with(
mock_registry_publish.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_CREATE, self.pf_plugin, payload=mock.ANY)
@ -336,7 +336,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
mock_port_forwarding.reset_mock()
mock_update_objects.reset_mock()
mock_push_api.reset_mock()
mock_registry_notify.reset_mock()
mock_registry_publish.reset_mock()
mock_port_forwarding.return_value = pf_obj
fip_obj.router_id = 'router_id'
fip_obj.fixed_port_id = ''
@ -348,7 +348,7 @@ class TestPortForwardingPlugin(testlib_api.SqlTestCase):
self.assertFalse(mock_update_objects.called)
mock_push_api.assert_called_once_with(
self.ctxt, mock.ANY, rpc_events.CREATED)
mock_registry_notify.assert_called_once_with(
mock_registry_publish.assert_called_once_with(
pf_consts.PORT_FORWARDING,
events.AFTER_CREATE, self.pf_plugin, payload=mock.ANY)