From ae90e2ccbfa45a8e864ec6f7fca2f28fa90d8062 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez <ralonsoh@redhat.com> Date: Sat, 24 Aug 2024 10:35:03 +0000 Subject: [PATCH] Make RPC event cast synchronous with the event Sometimes, the methods ``NeutronObject.get_object`` and ``ResourcesPushRpcApi.push`` yield the GIL during the execution. Because of that, the thread in charge of sending the RPC information doesn't finish until other operation is pushed (implemented in [1]). By making the RPC cast synchronous with the update/delete events, it is ensured that both operations will finish and the agents will receive the RPC event on time, just after the event happens. This issue is hitting more frequently in the migration to the WSGI server, due to [2]. Once the eventlet library has been deprecated from OpenStack, it will be possible to use the previous model (using a long thread to handle the RCP updates to the agents). It is commented in the code as a TODO. This patch is temporarily reverting [3]. This code should be restored too. [1]https://review.opendev.org/c/openstack/neutron/+/788510 [2]https://review.opendev.org/c/openstack/neutron/+/925376 [3]https://review.opendev.org/c/openstack/neutron/+/824508 Closes-Bug: #2077790 Related-Bug: #2075147 Change-Id: I7b806e6de74164ad9730480a115a76d30e7f15fc --- neutron/plugins/ml2/ovo_rpc.py | 109 +++++------------- .../tests/unit/plugins/ml2/test_ovo_rpc.py | 1 - 2 files changed, 28 insertions(+), 82 deletions(-) diff --git a/neutron/plugins/ml2/ovo_rpc.py b/neutron/plugins/ml2/ovo_rpc.py index 95e944f6256..6354fae3a29 100644 --- a/neutron/plugins/ml2/ovo_rpc.py +++ b/neutron/plugins/ml2/ovo_rpc.py @@ -11,11 +11,7 @@ # License for the specific language governing permissions and limitations # under the License. -import atexit -import queue -import threading import traceback -import weakref from neutron_lib.callbacks import events from neutron_lib.callbacks import registry @@ -23,7 +19,6 @@ from neutron_lib.callbacks import resources from neutron_lib import context as n_ctx from neutron_lib.db import api as db_api from oslo_log import log as logging -from oslo_service import service from neutron.api.rpc.callbacks import events as rpc_events from neutron.api.rpc.handlers import resources_rpc @@ -36,42 +31,22 @@ from neutron.objects import subnet LOG = logging.getLogger(__name__) -def _setup_change_handlers_cleanup(): - atexit.register(_ObjectChangeHandler.clean_up) - sh = service.SignalHandler() - sh.add_handler("SIGINT", _ObjectChangeHandler.clean_up) - sh.add_handler("SIGTERM", _ObjectChangeHandler.clean_up) - - +# TODO(ralonsoh): in [1], the ``_ObjectChangeHandler`` was changed to send the +# RPC resource update in the same thread of the API call. Once ``eventlet`` is +# deprecated, it could be possible to revert to the previous architecture using +# preemptive threads. +# [1] https://review.opendev.org/c/openstack/neutron/+/926922 class _ObjectChangeHandler(object): - MAX_IDLE_FOR = 1 - _TO_CLEAN = weakref.WeakSet() def __init__(self, resource, object_class, resource_push_api): self._resource = resource self._obj_class = object_class self._resource_push_api = resource_push_api - self._resources_to_push = queue.Queue() self._semantic_warned = False for event in (events.AFTER_CREATE, events.AFTER_UPDATE, events.AFTER_DELETE): registry.subscribe(self.handle_event, resource, event) - self._stop = threading.Event() - self._worker = threading.Thread( - target=self.dispatch_events, - name='ObjectChangeHandler[%s]' % self._resource, - daemon=True) - self._worker.start() - self._TO_CLEAN.add(self) - - def stop(self): - self._stop.set() - - def wait(self): - """Waits for all outstanding events to be dispatched.""" - self._resources_to_push.join() - def _is_session_semantic_violated(self, context, resource, event): """Return True and print an ugly error on transaction violation. @@ -104,49 +79,30 @@ class _ObjectChangeHandler(object): resource_id = payload.resource_id # we preserve the context so we can trace a receive on the agent back # to the server-side event that triggered it - self._resources_to_push.put((resource_id, payload.context.to_dict())) + self.dispatch_event(resource_id, payload.context.to_dict()) - def dispatch_events(self): - # TODO(kevinbenton): now that we are batching these, convert to a - # single get_objects call for all of them - LOG.debug('Thread %(name)s started', {'name': self._worker.name}) - while not self._stop.is_set(): - try: - resource_id, context_dict = self._resources_to_push.get( - timeout=self.MAX_IDLE_FOR) - context = n_ctx.Context.from_dict(context_dict) - # attempt to get regardless of event type so concurrent delete - # after create/update is the same code-path as a delete event - with db_api.get_context_manager().independent.reader.using( - context): - obj = self._obj_class.get_object(context, id=resource_id) - # CREATE events are always treated as UPDATE events to ensure - # listeners are written to handle out-of-order messages - if obj is None: - rpc_event = rpc_events.DELETED - # construct a fake object with the right ID so we can - # have a payload for the delete message. - obj = self._obj_class(id=resource_id) - else: - rpc_event = rpc_events.UPDATED - self._resource_push_api.push(context, [obj], rpc_event) - self._resources_to_push.task_done() - except queue.Empty: - pass - except Exception as e: - LOG.exception( - "Exception while dispatching %(res)s events: %(e)s", - {'res': self._resource, 'e': e}) - LOG.debug('Thread %(name)s finished with %(msgs)s unsent messages', - {'name': self._worker.name, - 'msgs': self._resources_to_push.unfinished_tasks}) - - @classmethod - def clean_up(cls, *args, **kwargs): - """Ensure all threads that were created were destroyed cleanly.""" - while cls._TO_CLEAN: - worker = cls._TO_CLEAN.pop() - worker.stop() + def dispatch_event(self, resource_id, context_dict): + try: + context = n_ctx.Context.from_dict(context_dict) + # attempt to get regardless of event type so concurrent delete + # after create/update is the same code-path as a delete event + with db_api.get_context_manager().independent.reader.using( + context): + obj = self._obj_class.get_object(context, id=resource_id) + # CREATE events are always treated as UPDATE events to ensure + # listeners are written to handle out-of-order messages + if obj is None: + rpc_event = rpc_events.DELETED + # construct a fake object with the right ID so we can + # have a payload for the delete message. + obj = self._obj_class(id=resource_id) + else: + rpc_event = rpc_events.UPDATED + self._resource_push_api.push(context, [obj], rpc_event) + except Exception as e: + LOG.exception( + "Exception while dispatching %(res)s events: %(e)s", + {'res': self._resource, 'e': e}) class OVOServerRpcInterface(object): @@ -158,10 +114,6 @@ class OVOServerRpcInterface(object): def __init__(self, enable_signals=True): self._rpc_pusher = resources_rpc.ResourcesPushRpcApi() self._setup_change_handlers() - # When running behind wsgi server (like apache2/mod_wsgi) - # we should not register signals - if enable_signals: - _setup_change_handlers_cleanup() LOG.debug("ML2 OVO RPC backend initialized.") def _setup_change_handlers(self): @@ -178,8 +130,3 @@ class OVOServerRpcInterface(object): res: _ObjectChangeHandler(res, obj_class, self._rpc_pusher) for res, obj_class in resource_objclass_map.items() } - - def wait(self): - """Wait for all handlers to finish processing async events.""" - for handler in self._resource_handlers.values(): - handler.wait() diff --git a/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py b/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py index b984e61a1ac..673c589c09d 100644 --- a/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py @@ -41,7 +41,6 @@ class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase): def _assert_object_received(self, ovotype, oid=None, event=None, count=1): - self.plugin.ovo_notifier.wait() match = 0 for obj, evt in self.received: if isinstance(obj, ovotype):