Merge "use payloads for NETWORK callback events"

This commit is contained in:
Zuul 2021-05-16 08:50:19 +00:00 committed by Gerrit Code Review
commit 3b3398b8c5
14 changed files with 164 additions and 74 deletions

View File

@ -112,12 +112,15 @@ class DhcpAgentNotifyAPI(object):
self.uses_native_notifications[resource] = {'create': False,
'update': False,
'delete': False}
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_CREATE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_UPDATE)
registry.subscribe(self._native_event_send_dhcp_notification,
resource, events.AFTER_DELETE)
callback = self._native_event_send_dhcp_notification
# TODO(boden): remove shim below once all events use payloads
if resource == resources.NETWORK:
callback = self._native_event_send_dhcp_notification_payload
registry.subscribe(callback, resource, events.AFTER_CREATE)
registry.subscribe(callback, resource, events.AFTER_UPDATE)
registry.subscribe(callback, resource, events.AFTER_DELETE)
@property
def plugin(self):
@ -283,6 +286,26 @@ class DhcpAgentNotifyAPI(object):
'fixed_ips': kwargs['port']['fixed_ips']},
kwargs['port']['network_id'])
def _native_event_send_dhcp_notification_payload(
self, resource, event, trigger, payload=None):
# TODO(boden): collapse the native event methods back into one
action = event.replace('after_', '')
# we unsubscribe the _send_dhcp_notification method now that we know
# the loaded core plugin emits native resource events
if resource not in self._unsubscribed_resources:
self.uses_native_notifications[resource][action] = True
if all(self.uses_native_notifications[resource].values()):
# only unsubscribe the API level listener if we are
# receiving all event types for this resource
self._unsubscribed_resources.append(resource)
registry.unsubscribe_by_resource(self._send_dhcp_notification,
resource)
method_name = '.'.join((resource, action, 'end'))
data = {resource: payload.latest_state}
self.notify(payload.context, data, method_name)
def _native_event_send_dhcp_notification(self, resource, event, trigger,
context, **kwargs):
action = event.replace('after_', '')

View File

@ -506,17 +506,19 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
with db_api.CONTEXT_WRITER.using(context):
network_db = self._get_network(context, id)
network = self._make_network_dict(network_db, context=context)
registry.notify(resources.NETWORK, events.PRECOMMIT_DELETE,
self, context=context, network_id=id,
network=network)
registry.publish(resources.NETWORK, events.PRECOMMIT_DELETE,
self, payload=events.DBEventPayload(
context, resource_id=id,
states=(network,)))
# We expire network_db here because precommit deletion
# might have left the relationship stale, for example,
# if we deleted a segment.
context.session.expire(network_db)
network_db = self._get_network(context, id)
context.session.delete(network_db)
registry.notify(resources.NETWORK, events.AFTER_DELETE,
self, context=context, network=network)
registry.publish(resources.NETWORK, events.AFTER_DELETE,
self, payload=events.DBEventPayload(
context, resource_id=id, states=(network,)))
@db_api.retry_if_session_inactive()
def get_network(self, context, id, fields=None):

View File

@ -417,12 +417,13 @@ class DVRResourceOperationHandler(object):
@registry.receives(resources.NETWORK, [events.AFTER_DELETE])
def delete_fip_namespaces_for_ext_net(self, rtype, event, trigger,
context, network, **kwargs):
payload=None):
network = payload.latest_state
if network.get(extnet_apidef.EXTERNAL):
# Send the information to all the L3 Agent hosts
# to clean up the fip namespace as it is no longer required.
self.l3plugin.l3_rpc_notifier.delete_fipnamespace_for_ext_net(
context, network['id'])
payload.context, payload.resource_id)
def _get_ports_for_allowed_address_pair_ip(self, context, network_id,
fixed_ip):

View File

@ -890,9 +890,8 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
@registry.receives(resources.PORT, [events.BEFORE_CREATE,
events.BEFORE_UPDATE])
@registry.receives(resources.NETWORK, [events.BEFORE_CREATE])
def _ensure_default_security_group_handler(self, resource, event, trigger,
context, **kwargs):
def _ensure_default_security_group_handler_port(
self, resource, event, trigger, context, **kwargs):
if event == events.BEFORE_UPDATE:
tenant_id = kwargs['original_' + resource]['tenant_id']
else:
@ -900,6 +899,15 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
if tenant_id:
self._ensure_default_security_group(context, tenant_id)
@registry.receives(resources.NETWORK, [events.BEFORE_CREATE])
def _ensure_default_security_group_handler_net(
self, resource, event, trigger, payload=None):
# TODO(boden): refactor into single callback method
project_id = payload.latest_state['tenant_id']
if project_id:
self._ensure_default_security_group(payload.context, project_id)
def _ensure_default_security_group(self, context, tenant_id):
"""Create a default security group if one doesn't exist.

View File

@ -37,6 +37,8 @@ LOG = logging.getLogger(__name__)
class _ObjectChangeHandler(object):
_PAYLOAD_RESOURCES = (resources.NETWORK,)
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource
self._obj_class = object_class
@ -51,7 +53,12 @@ class _ObjectChangeHandler(object):
self._semantic_warned = False
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)
handler = self.handle_event
# TODO(boden): remove shim below once all events use payloads
if resource in self._PAYLOAD_RESOURCES:
handler = self.handle_payload_event
registry.subscribe(handler, resource, event)
def wait(self):
"""Waits for all outstanding events to be dispatched."""
@ -79,6 +86,18 @@ class _ObjectChangeHandler(object):
self._semantic_warned = True
return True
def handle_payload_event(self, resource, event,
trigger, payload=None):
if self._is_session_semantic_violated(
payload.context, resource, event):
return
resource_id = payload.resource_id
# we preserve the context so we can trace a receive on the agent back
# to the server-side event that triggered it
self._resources_to_push[resource_id] = payload.context.to_dict()
# spawn worker so we don't block main AFTER_UPDATE thread
self.fts.append(self._worker_pool.submit(self.dispatch_events))
def handle_event(self, resource, event, trigger,
context, *args, **kwargs):
"""Callback handler for resource change that pushes change to RPC.

View File

@ -1042,8 +1042,9 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
def _before_create_network(self, context, network):
net_data = network[net_def.RESOURCE_NAME]
registry.notify(resources.NETWORK, events.BEFORE_CREATE, self,
context=context, network=net_data)
registry.publish(resources.NETWORK, events.BEFORE_CREATE, self,
payload=events.DBEventPayload(
context, desired_state=net_data))
def _create_network_db(self, context, network):
net_data = network[net_def.RESOURCE_NAME]
@ -1080,8 +1081,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
net_data[az_def.AZ_HINTS])
net_db[az_def.AZ_HINTS] = az_hints
result[az_def.AZ_HINTS] = az_hints
registry.notify(resources.NETWORK, events.PRECOMMIT_CREATE, self,
context=context, request=net_data, network=result)
registry.publish(resources.NETWORK, events.PRECOMMIT_CREATE, self,
payload=events.DBEventPayload(
context, states=(result,),
resource_id=result['id'],
request_body=net_data))
resource_extend.apply_funcs('networks', result, net_db)
mech_context = driver_context.NetworkContext(self, context,
@ -1097,8 +1101,10 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
return self._after_create_network(context, result, mech_context)
def _after_create_network(self, context, result, mech_context):
kwargs = {'context': context, 'network': result}
registry.notify(resources.NETWORK, events.AFTER_CREATE, self, **kwargs)
registry.publish(resources.NETWORK, events.AFTER_CREATE, self,
payload=events.DBEventPayload(
context, states=(result,),
resource_id=result['id']))
try:
self.mechanism_manager.create_network_postcommit(mech_context)
except ml2_exc.MechanismDriverError:
@ -1166,9 +1172,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
# by re-calling update_network with the previous attributes. For
# now the error is propagated to the caller, which is expected to
# either undo/retry the operation or delete the resource.
kwargs = {'context': context, 'network': updated_network,
'original_network': original_network}
registry.notify(resources.NETWORK, events.AFTER_UPDATE, self, **kwargs)
registry.publish(resources.NETWORK, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(
context,
states=(original_network, updated_network,),
resource_id=updated_network['id']))
self.mechanism_manager.update_network_postcommit(mech_context)
if need_network_update_notify:
self.notifier.network_update(context, updated_network)
@ -1227,31 +1235,33 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
@registry.receives(resources.NETWORK, [events.PRECOMMIT_DELETE],
priority=0)
def _network_delete_precommit_handler(self, rtype, event, trigger,
context, network_id, **kwargs):
network = (kwargs.get('network') or
self.get_network(context, network_id))
payload=None):
context = payload.context
network_id = payload.resource_id
network = payload.latest_state if payload.states else \
self.get_network(context, network_id)
mech_context = driver_context.NetworkContext(self,
context,
network)
# TODO(kevinbenton): move this mech context into something like
# a 'delete context' so it's not polluting the real context object
setattr(context, '_mech_context', mech_context)
setattr(payload.context, '_mech_context', mech_context)
self.mechanism_manager.delete_network_precommit(
mech_context)
@registry.receives(resources.NETWORK, [events.AFTER_DELETE])
def _network_delete_after_delete_handler(self, rtype, event, trigger,
context, network, **kwargs):
payload=None):
try:
self.mechanism_manager.delete_network_postcommit(
context._mech_context)
payload.context._mech_context)
except ml2_exc.MechanismDriverError:
# TODO(apech) - One or more mechanism driver failed to
# delete the network. Ideally we'd notify the caller of
# the fact that an error occurred.
LOG.error("mechanism_manager.delete_network_postcommit"
" failed")
self.notifier.network_delete(context, network['id'])
self.notifier.network_delete(payload.context, payload.resource_id)
def _before_create_subnet(self, context, subnet):
subnet_data = subnet[subnet_def.RESOURCE_NAME]

View File

@ -41,20 +41,13 @@ CHECK_REQUIREMENTS = 'dry-run'
def _ensure_external_network_default_value_callback(
resource, event, trigger, **kwargs):
resource, event, trigger, payload=None):
"""Ensure the is_default db field matches the create/update request."""
# TODO(boden): remove shim once all callbacks use payloads
if 'payload' in kwargs:
_request = kwargs['payload'].request_body
_context = kwargs['payload'].context
_network = kwargs['payload'].desired_state
_orig = kwargs['payload'].states[0]
else:
_request = kwargs['request']
_context = kwargs['context']
_network = kwargs['network']
_orig = kwargs.get('original_network')
_request = payload.request_body
_context = payload.context
_network = payload.desired_state or payload.latest_state
_orig = payload.states[0]
@db_api.retry_if_session_inactive()
def _do_ensure_external_network_default_value_callback(

View File

@ -350,9 +350,9 @@ class QoSPlugin(qos.QoSPluginBase):
self.validate_policy_for_port(context, policy, updated_port)
def _validate_create_network_callback(self, resource, event, trigger,
**kwargs):
context = kwargs['context']
network_id = kwargs['network']['id']
payload=None):
context = payload.context
network_id = payload.resource_id
network = network_object.Network.get_object(context, id=network_id)
policy_id = network.qos_policy_id

View File

@ -29,7 +29,6 @@ from oslo_db import exception as db_exc
from oslo_log import helpers as log_helpers
from oslo_utils import uuidutils
from neutron.common import utils as common_utils
from neutron.db import segments_db as db
from neutron.extensions import segment as extension
from neutron import manager
@ -344,8 +343,9 @@ def _add_segment_host_mapping_for_segment(resource, event, trigger,
def _delete_segments_for_network(resource, event, trigger,
context, network_id, **kwargs):
admin_ctx = common_utils.get_elevated_context(context)
payload=None, **kwargs):
network_id = payload.resource_id
admin_ctx = payload.context.elevated()
global segments_plugin
if not segments_plugin:
segments_plugin = manager.NeutronManager.load_class_for_provider(

View File

@ -263,7 +263,7 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
def test__native_notification_unsubscribes(self):
self.assertFalse(self.notifier._unsubscribed_resources)
for res in (resources.PORT, resources.NETWORK, resources.SUBNET):
for res in (resources.PORT, resources.SUBNET):
self.notifier._unsubscribed_resources = []
kwargs = {res: {}}
registry.notify(res, events.AFTER_CREATE, self,
@ -281,6 +281,23 @@ class TestDhcpAgentNotifyAPI(base.BaseTestCase):
context=mock.Mock(), **kwargs)
self.assertEqual([res], self.notifier._unsubscribed_resources)
for res in [resources.NETWORK]:
self.notifier._unsubscribed_resources = []
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(mock.Mock()))
# don't unsubscribe until all three types are observed
self.assertEqual([], self.notifier._unsubscribed_resources)
registry.publish(res, events.AFTER_UPDATE, self,
payload=events.DBEventPayload(mock.Mock()))
self.assertEqual([], self.notifier._unsubscribed_resources)
registry.publish(res, events.AFTER_DELETE, self,
payload=events.DBEventPayload(mock.Mock()))
self.assertEqual([res], self.notifier._unsubscribed_resources)
# after first time, no further unsubscribing should happen
registry.publish(res, events.AFTER_CREATE, self,
payload=events.DBEventPayload(mock.Mock()))
self.assertEqual([res], self.notifier._unsubscribed_resources)
def test__only_status_changed(self):
p1 = {'id': 1, 'status': 'DOWN', 'updated_at': '10:00:00',
'revision_number': 1}

View File

@ -420,9 +420,7 @@ class TestSegment(SegmentTestCase):
dsn.assert_called_with(resources.NETWORK,
events.PRECOMMIT_DELETE,
mock.ANY,
context=mock.ANY,
network_id=mock.ANY,
network=mock.ANY)
payload=mock.ANY)
class TestSegmentML2(SegmentTestCase):

View File

@ -239,10 +239,10 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
with self.network() as n:
after_create.assert_called_once_with(
resources.NETWORK, events.AFTER_CREATE, mock.ANY,
context=mock.ANY, network=mock.ANY)
kwargs = after_create.mock_calls[0][2]
payload=mock.ANY)
payload = after_create.mock_calls[0][2]['payload']
self.assertEqual(n['network']['id'],
kwargs['network']['id'])
payload.resource_id)
def test_network_precommit_create_callback(self):
precommit_create = mock.Mock()
@ -251,7 +251,7 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
with self.network():
precommit_create.assert_called_once_with(
resources.NETWORK, events.PRECOMMIT_CREATE, mock.ANY,
context=mock.ANY, network=mock.ANY, request=mock.ANY)
payload=mock.ANY)
def test_network_precommit_create_callback_aborts(self):
precommit_create = mock.Mock()
@ -289,11 +289,11 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
self.deserialize(self.fmt, req.get_response(self.api))
after_update.assert_called_once_with(
resources.NETWORK, events.AFTER_UPDATE, mock.ANY,
context=mock.ANY, network=mock.ANY, original_network=mock.ANY)
kwargs = after_update.mock_calls[0][2]
payload=mock.ANY)
payload = after_update.mock_calls[0][2]['payload']
self.assertEqual(n['network']['name'],
kwargs['original_network']['name'])
self.assertEqual('updated', kwargs['network']['name'])
payload.states[0]['name'])
self.assertEqual('updated', payload.latest_state['name'])
def test_network_after_delete_callback(self):
after_delete = mock.Mock()
@ -304,10 +304,10 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
req.get_response(self.api)
after_delete.assert_called_once_with(
resources.NETWORK, events.AFTER_DELETE, mock.ANY,
context=mock.ANY, network=mock.ANY)
kwargs = after_delete.mock_calls[0][2]
payload=mock.ANY)
payload = after_delete.mock_calls[0][2]['payload']
self.assertEqual(n['network']['id'],
kwargs['network']['id'])
payload.resource_id)
def test_create_port_obj_bulk(self):
cfg.CONF.set_override('base_mac', "12:34:56:00")
@ -350,8 +350,13 @@ class TestMl2NetworksV2(test_plugin.TestNetworksV2,
# capture session states during each before and after event
before = []
after = []
b_func = lambda *a, **k: before.append(k['context'].session.is_active)
a_func = lambda *a, **k: after.append(k['context'].session.is_active)
def b_func(r, c, v, payload=None):
before.append(payload.context.session.is_active)
def a_func(r, c, v, payload=None):
after.append(payload.context.session.is_active)
registry.subscribe(b_func, resources.NETWORK, events.BEFORE_CREATE)
registry.subscribe(a_func, resources.NETWORK, events.AFTER_CREATE)
data = [{'tenant_id': self._tenant_id}] * 4

View File

@ -14,6 +14,7 @@
from unittest import mock
from neutron_lib.api.definitions import constants as api_const
from neutron_lib.callbacks import events
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions as n_exc
@ -65,7 +66,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase):
return_value=network_mock
) as get_external_net:
db._ensure_external_network_default_value_callback(
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctx, request_body=kwargs['request'],
states=(kwargs['original_network'], kwargs['network'])))
get_external_nets.assert_called_once_with(
self.ctx, _pager=mock.ANY, is_default=True)
get_external_net.assert_called_once_with(
@ -94,7 +98,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase):
return_value=network_mock
) as get_external_net:
db._ensure_external_network_default_value_callback(
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctx, request_body=kwargs['request'],
states=(kwargs['network'],)))
get_external_nets.assert_not_called()
get_external_net.assert_not_called()
network_mock.update.assert_not_called()
@ -125,7 +132,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase):
return_value=network_mock
) as get_external_net:
db._ensure_external_network_default_value_callback(
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctx, request_body=kwargs['request'],
states=(kwargs['original_network'], kwargs['network'])))
get_external_nets.assert_called_once_with(
self.ctx, _pager=mock.ANY, is_default=True)
get_external_net.assert_not_called()
@ -158,7 +168,10 @@ class AutoAllocateTestCase(testlib_api.SqlTestCase):
) as get_external_net:
self.assertRaises(exceptions.DefaultExternalNetworkExists,
db._ensure_external_network_default_value_callback,
"NETWORK", "precommit_update", "test_plugin", **kwargs)
"NETWORK", "precommit_update", "test_plugin",
payload=events.DBEventPayload(
self.ctx, request_body=kwargs['request'],
states=(kwargs['original_network'], kwargs['network'])))
get_external_nets.assert_called_once_with(
self.ctx, _pager=mock.ANY, is_default=True)
get_external_net.assert_not_called()

View File

@ -1274,8 +1274,9 @@ class TestQosPluginDB(base.BaseQosTestCase):
'validate_policy_for_network') \
as mock_validate_policy:
self.qos_plugin._validate_create_network_callback(
'NETWORK', 'precommit_create', 'test_plugin', **kwargs)
"NETWORK", "precommit_create", "test_plugin",
payload=events.DBEventPayload(
self.context, resource_id=kwargs['network']['id'],))
qos_policy = None
if network_qos:
qos_policy = net_qos_obj