use payloads for NETWORK callback events

This patch switches over to the payload style of callbacks for
NETWORK based events. As part of this change a few shims are needed
to handle cases where some callbacks don't yet use payloads and others
do. Once we move over to payloads for all callbacks the shims can be
removed.

NeutronLibImpact

Change-Id: I889364b5d184d47a79fe6ed604ce13a4b334acfa
changes/11/670611/13
Nurmatov Mamatisa 2 years ago
parent ce22171b05
commit 4aa5de254d

@ -112,12 +112,15 @@ class DhcpAgentNotifyAPI(object):
self.uses_native_notifications[resource] = {'create': False,
'update': False,
'delete': False}
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_CREATE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_UPDATE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_DELETE)
callback = self._native_event_send_dhcp_notification
# TODO(boden): remove shim below once all events use payloads
if resource == resources.NETWORK:
callback = self._native_event_send_dhcp_notification_payload
registry.subscribe(callback, resource, events.AFTER_CREATE)
registry.subscribe(callback, resource, events.AFTER_UPDATE)
registry.subscribe(callback, resource, events.AFTER_DELETE)
@property
def plugin(self):
@ -283,6 +286,26 @@ class DhcpAgentNotifyAPI(object):
'fixed_ips': kwargs['port']['fixed_ips']},
kwargs['port']['network_id'])
def _native_event_send_dhcp_notification_payload(
self, resource, event, trigger, payload=None):
# TODO(boden): collapse the native event methods back into one
action = event.replace('after_', '')
# we unsubscribe the _send_dhcp_notification method now that we know
# the loaded core plugin emits native resource events
if resource not in self._unsubscribed_resources:
self.uses_native_notifications[resource][action] = True
if all(self.uses_native_notifications[resource].values()):
# only unsubscribe the API level listener if we are
# receiving all event types for this resource
self._unsubscribed_resources.append(resource)
registry.unsubscribe_by_resource(self._send_dhcp_notification,
resource)
method_name = '.'.join((resource, action, 'end'))
data = {resource: payload.latest_state}
self.notify(payload.context, data, method_name)
def _native_event_send_dhcp_notification(self, resource, event, trigger,
context, **kwargs):
action = event.replace('after_', '')

@ -506,17 +506,19 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
with db_api.CONTEXT_WRITER.using(context):
network_db = self._get_network(context, id)
network = self._make_network_dict(network_db, context=context)
registry.notify(resources.NETWORK, events.PRECOMMIT_DELETE,
self, context=context, network_id=id,
network=network)
registry.publish(resources.NETWORK, events.PRECOMMIT_DELETE,
self, payload=events.DBEventPayload(
context, resource_id=id,
states=(network,)))
# We expire network_db here because precommit deletion
# might have left the relationship stale, for example,
# if we deleted a segment.
context.session.expire(network_db)
network_db = self._get_network(context, id)
context.session.delete(network_db)
registry.notify(resources.NETWORK, events.AFTER_DELETE,
self, context=context, network=network)
registry.publish(resources.NETWORK, events.AFTER_DELETE,
self, payload=events.DBEventPayload(
context, resource_id=id, states=(network,)))
@db_api.retry_if_session_inactive()
def get_network(self, context, id, fields=None):

@ -417,12 +417,13 @@ class DVRResourceOperationHandler(object):
@registry.receives(resources.NETWORK, [events.AFTER_DELETE])
def delete_fip_namespaces_for_ext_net(self, rtype, event, trigger,
context, network, **kwargs):
payload=None):
network = payload.latest_state
if network.get(extnet_apidef.EXTERNAL):
# Send the information to all the L3 Agent hosts
# to clean up the fip namespace as it is no longer required.
self.l3plugin.l3_rpc_notifier.delete_fipnamespace_for_ext_net(
context, network['id'])
payload.context, payload.resource_id)
def _get_ports_for_allowed_address_pair_ip(self, context, network_id,
fixed_ip):

@ -890,9 +890,8 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
@registry.receives(resources.PORT, [events.BEFORE_CREATE,
events.BEFORE_UPDATE])
@registry.receives(resources.NETWORK, [events.BEFORE_CREATE])
def _ensure_default_security_group_handler(self, resource, event, trigger,
context, **kwargs):
def _ensure_default_security_group_handler_port(
self, resource, event, trigger, context, **kwargs):
if event == events.BEFORE_UPDATE:
tenant_id = kwargs['original_' + resource]['tenant_id']
else:
@ -900,6 +899,15 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
if tenant_id:
self._ensure_default_security_group(context, tenant_id)
@registry.receives(resources.NETWORK, [events.BEFORE_CREATE])
def _ensure_default_security_group_handler_net(
self, resource, event, trigger, payload=None):
# TODO(boden): refactor into single callback method
project_id = payload.latest_state['tenant_id']
if project_id:
self._ensure_default_security_group(payload.context, project_id)
def _ensure_default_security_group(self, context, tenant_id):
"""Create a default security group if one doesn't exist.

@ -37,6 +37,8 @@ LOG = logging.getLogger(__name__)
class _ObjectChangeHandler(object):
_PAYLOAD_RESOURCES = (resources.NETWORK,)
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource
self._obj_class = object_class
@ -51,7 +53,12 @@ class _ObjectChangeHandler(object):
self._semantic_warned = False
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)
handler = self.handle_event
# TODO(boden): remove shim below once all events use payloads
if resource in self._PAYLOAD_RESOURCES:
handler = self.handle_payload_event
registry.subscribe(handler, resource, event)
def wait(self):
"""Waits for all outstanding events to be dispatched."""
@ -79,6 +86,18 @@ class _ObjectChangeHandler(object):
self._semantic_warned = True
return True
def handle_payload_event(self, resource, event,
trigger, payload=None):
if self._is_session_semantic_violated(
payload.context, resource, event):
return
resource_id = payload.resource_id
# we preserve the context so we can trace a receive on the agent back
# to the server-side event that triggered it
self._resources_to_push[resource_id] = payload.context.to_dict()
# spawn worker so we don't block main AFTER_UPDATE thread
self.fts.append(self._worker_pool.submit(self.dispatch_events))
def handle_event(self, resource, event, trigger,
context, *args, **kwargs):
"""Callback handler for resource change that pushes change to RPC.

@ -1042,8 +1042,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _before_create_network(self, context, network):
net_data = network[net_def.RESOURCE_NAME]
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
registry.publish(resources.NETWORK, events.BEFORE_CREATE, self,
payload=events.DBEventPayload(
context, desired_state=net_data))
def _create_network_db(self, context, network):
net_data = network[net_def.RESOURCE_NAME]
@ -1080,8 +1081,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
net_data[az_def.AZ_HINTS])
net_db[az_def.AZ_HINTS] = az_hints
result[az_def.AZ_HINTS] = az_hints
registry.notify(resources.NETWORK, events.PRECOMMIT_CREATE, self,
context=context, request=net_data, network=result)
registry.publish(resources.NETWORK, events.PRECOMMIT_CREATE, self,
payload=events.DBEventPayload(
context, states=(result,),
resource_id=result['id'],
request_body=net_data))
resource_extend.apply_funcs('networks', result, net_db)
mech_context = driver_context.NetworkContext(self, context,
@ -1097,8 +1101,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return self._after_create_network(context, result, mech_context)
def _after_create_network(self, context, result, mech_context):
kwargs = {'context': context, 'network': result}
registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs)
registry.publish(resources.NETWORK, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
context, states=(result,),
resource_id=result['id']))
try:
self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
@ -1166,9 +1172,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# by re-calling update_network with the previous attributes. For
# now the error is propagated to the caller, which is expected to
# either undo/retry the operation or delete the resource.
kwargs = {'context': context, 'network': updated_network,
'original_network': original_network}
registry.notify(resources.NETWORK, events.AFTER_UPDATE, self, **kwargs)
registry.publish(resources.NETWORK, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
context,
states=(original_network, updated_network,),
resource_id=updated_network['id']))
self.mechanism_manager.update_network_postcommit(mech_context)
if need_network_update_notify:
self.notifier.network_update(context, updated_network)
@ -1227,31 +1235,33 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@registry.receives(resources.NETWORK, [events.PRECOMMIT_DELETE],
priority=0)
def _network_delete_precommit_handler(self, rtype, event, trigger,
context, network_id, **kwargs):
network = (kwargs.get('network') or
self.get_network(context, network_id))
payload=None):
context = payload.context
network_id = payload.resource_id
network = payload.latest_state if payload.states else \
self.get_network(context, network_id)
mech_context = driver_context.NetworkContext(self,
context,
network)
# TODO(kevinbenton): move this mech context into something like
# a 'delete context' so it's not polluting the real context object
setattr(context, '_mech_context', mech_context)
setattr(payload.context, '_mech_context', mech_context)
self.mechanism_manager.delete_network_precommit(
mech_context)
@registry.receives(resources.NETWORK, [events.AFTER_DELETE])
def _network_delete_after_delete_handler(self, rtype, event, trigger,
context, network, **kwargs):
payload=None):
try:
self.mechanism_manager.delete_network_postcommit(
context._mech_context)
payload.context._mech_context)
except ml2_exc.MechanismDriverError:
# TODO(apech) - One or more mechanism driver failed to
# delete the network. Ideally we'd notify the caller of
# the fact that an error occurred.
LOG.error("mechanism_manager.delete_network_postcommit"
" failed")
self.notifier.network_delete(context, network['id'])
self.notifier.network_delete(payload.context, payload.resource_id)
def _before_create_subnet(self, context, subnet):
subnet_data = subnet[subnet_def.RESOURCE_NAME]

@ -41,20 +41,13 @@ CHECK_REQUIREMENTS = 'dry-run'
def _ensure_external_network_default_value_callback(
resource, event, trigger, **kwargs):
resource, event, trigger, payload=None):
"""Ensure the is_default db field matches the create/update request."""
# TODO(boden): remove shim once all callbacks use payloads
if 'payload' in kwargs:
_request = kwargs['payload'].request_body
_context = kwargs['payload'].context
_network = kwargs['payload'].desired_state
_orig = kwargs['payload'].states[0]
else:
_request = kwargs['request']
_context = kwargs['context']
_network = kwargs['network']
_orig = kwargs.get('original_network')
_request = payload.request_body
_context = payload.context
_network = payload.desired_state or payload.latest_state
_orig = payload.states[0]
@db_api.retry_if_session_inactive()
def _do_ensure_external_network_default_value_callback(

@ -350,9 +350,9 @@ class QoSPlugin(qos.QoSPluginBase):
self.validate_policy_for_port(context, policy, updated_port)
def _validate_create_network_callback(self, resource, event, trigger,
**kwargs):
context = kwargs['context']
network_id = kwargs['network']['id']
payload=None):
context = payload.context
network_id = payload.resource_id
network = network_object.Network.get_object(context, id=network_id)
policy_id = network.qos_policy_id

@ -29,7 +29,6 @@ from oslo_db import exception as db_exc
from oslo_log import helpers as log_helpers
from oslo_utils import uuidutils
from neutron.common import utils as common_utils
from neutron.db import segments_db as db
from neutron.extensions import segment as extension
from neutron import manager
@ -336,8 +335,9 @@ def _add_segment_host_mapping_for_segment(resource, event, trigger,
def _delete_segments_for_network(resource, event, trigger,
context, network_id, **kwargs):
admin_ctx = common_utils.get_elevated_context(context)
payload=None, **kwargs):
network_id = payload.resource_id
admin_ctx = payload.context.elevated()
global segments_plugin
if not segments_plugin:
segments_plugin = manager.NeutronManager.load_class_for_provider(

@ -263,7 +263,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
def test__native_notification_unsubscribes(self):
self.assertFalse(self.notifier._unsubscribed_resources)
for res in (resources.PORT, resources.NETWORK, resources.SUBNET):
for res in (resources.PORT, resources.SUBNET):
self.notifier._unsubscribed_resources = []
kwargs = {res: {}}
registry.notify(res, events.AFTER_CREATE, self,
@ -281,6 +281,23 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
for res in [resources.NETWORK]:
self.notifier._unsubscribed_resources = []
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(mock.Mock()))
# don't unsubscribe until all three types are observed
self.assertEqual([], self.notifier._unsubscribed_resources)
registry.publish(res, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(mock.Mock()))
self.assertEqual([], self.notifier._unsubscribed_resources)
registry.publish(res, events.AFTER_DELETE, self,
payload=events.DBEventPayload(mock.Mock()))
self.assertEqual([res], self.notifier._unsubscribed_resources)
# after first time, no further unsubscribing should happen
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(mock.Mock()))
self.assertEqual([res], self.notifier._unsubscribed_resources)
def test__only_status_changed(self):
p1 = {'id': 1, 'status': 'DOWN', 'updated_at': '10:00:00',
'revision_number': 1}

@ -419,9 +419,7 @@ class TestSegment(SegmentTestCase):
dsn.assert_called_with(resources.NETWORK,
events.PRECOMMIT_DELETE,
mock.ANY,
context=mock.ANY,
network_id=mock.ANY,
network=mock.ANY)
payload=mock.ANY)
class TestSegmentML2(SegmentTestCase):

@ -239,10 +239,10 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
with self.network() as n:
after_create.assert_called_once_with(
resources.NETWORK, events.AFTER_CREATE, mock.ANY,
context=mock.ANY, network=mock.ANY)
kwargs = after_create.mock_calls[0][2]
payload=mock.ANY)
payload = after_create.mock_calls[0][2]['payload']
self.assertEqual(n['network']['id'],
kwargs['network']['id'])
payload.resource_id)
def test_network_precommit_create_callback(self):
precommit_create = mock.Mock()
@ -251,7 +251,7 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
with self.network():
precommit_create.assert_called_once_with(
resources.NETWORK, events.PRECOMMIT_CREATE, mock.ANY,
context=mock.ANY, network=mock.ANY, request=mock.ANY)
payload=mock.ANY)
def test_network_precommit_create_callback_aborts(self):
precommit_create = mock.Mock()
@ -289,11 +289,11 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
self.deserialize(self.fmt, req.get_response(self.api))
after_update.assert_called_once_with(
resources.NETWORK, events.AFTER_UPDATE, mock.ANY,
context=mock.ANY, network=mock.ANY, original_network=mock.ANY)
kwargs = after_update.mock_calls[0][2]
payload=mock.ANY)
payload = after_update.mock_calls[0][2]['payload']
self.assertEqual(n['network']['name'],
kwargs['original_network']['name'])
self.assertEqual('updated', kwargs['network']['name'])
payload.states[0]['name'])
self.assertEqual('updated', payload.latest_state['name'])
def test_network_after_delete_callback(self):
after_delete = mock.Mock()
@ -304,10 +304,10 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
req.get_response(self.api)
after_delete.assert_called_once_with(
resources.NETWORK, events.AFTER_DELETE, mock.ANY,
context=mock.ANY, network=mock.ANY)
kwargs = after_delete.mock_calls[0][2]
payload=mock.ANY)
payload = after_delete.mock_calls[0][2]['payload']
self.assertEqual(n['network']['id'],
kwargs['network']['id'])
payload.resource_id)
def test_create_port_obj_bulk(self):
cfg.CONF.set_override('base_mac', "12:34:56:00")
@ -350,8 +350,13 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
# capture session states during each before and after event
before = []
after = []
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
def b_func(r, c, v, payload=None):
before.append(payload.context.session.is_active)
def a_func(r, c, v, payload=None):
after.append(payload.context.session.is_active)
registry.subscribe(b_func, resources.NETWORK, events.BEFORE_CREATE)
registry.subscribe(a_func, resources.NETWORK, events.AFTER_CREATE)
data = [{'tenant_id': self._tenant_id}] * 4

@ -14,6 +14,7 @@
from unittest import mock
from neutron_lib.api.definitions import constants as api_const
from neutron_lib.callbacks import events
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions as n_exc
@ -65,7 +66,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase):
return_value=network_mock
) as get_external_net:
db._ensure_external_network_default_value_callback(
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctx, request_body=kwargs['request'],
states=(kwargs['original_network'], kwargs['network'])))
get_external_nets.assert_called_once_with(
self.ctx, _pager=mock.ANY, is_default=True)
get_external_net.assert_called_once_with(
@ -94,7 +98,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase):
return_value=network_mock
) as get_external_net:
db._ensure_external_network_default_value_callback(
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctx, request_body=kwargs['request'],
states=(kwargs['network'],)))
get_external_nets.assert_not_called()
get_external_net.assert_not_called()
network_mock.update.assert_not_called()
@ -125,7 +132,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase):
return_value=network_mock
) as get_external_net:
db._ensure_external_network_default_value_callback(
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctx, request_body=kwargs['request'],
states=(kwargs['original_network'], kwargs['network'])))
get_external_nets.assert_called_once_with(
self.ctx, _pager=mock.ANY, is_default=True)
get_external_net.assert_not_called()
@ -158,7 +168,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase):
) as get_external_net:
self.assertRaises(exceptions.DefaultExternalNetworkExists,
db._ensure_external_network_default_value_callback,
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctx, request_body=kwargs['request'],
states=(kwargs['original_network'], kwargs['network'])))
get_external_nets.assert_called_once_with(
self.ctx, _pager=mock.ANY, is_default=True)
get_external_net.assert_not_called()

@ -1274,8 +1274,9 @@ class TestQosPluginDB(base.BaseQosTestCase):
'validate_policy_for_network') \
as mock_validate_policy:
self.qos_plugin._validate_create_network_callback(
'NETWORK', 'precommit_create', 'test_plugin', **kwargs)
"NETWORK", "precommit_create", "test_plugin",
payload=events.DBEventPayload(
self.context, resource_id=kwargs['network']['id'],))
qos_policy = None
if network_qos:
qos_policy = net_qos_obj

Loading…
Cancel
Save