diff --git a/neutron/plugins/ml2/ovo_rpc.py b/neutron/plugins/ml2/ovo_rpc.py index 95e944f6256..6354fae3a29 100644 --- a/neutron/plugins/ml2/ovo_rpc.py +++ b/neutron/plugins/ml2/ovo_rpc.py @@ -11,11 +11,7 @@ # License for the specific language governing permissions and limitations # under the License. -import atexit -import queue -import threading import traceback -import weakref from neutron_lib.callbacks import events from neutron_lib.callbacks import registry @@ -23,7 +19,6 @@ from neutron_lib.callbacks import resources from neutron_lib import context as n_ctx from neutron_lib.db import api as db_api from oslo_log import log as logging -from oslo_service import service from neutron.api.rpc.callbacks import events as rpc_events from neutron.api.rpc.handlers import resources_rpc @@ -36,42 +31,22 @@ from neutron.objects import subnet LOG = logging.getLogger(__name__) -def _setup_change_handlers_cleanup(): - atexit.register(_ObjectChangeHandler.clean_up) - sh = service.SignalHandler() - sh.add_handler("SIGINT", _ObjectChangeHandler.clean_up) - sh.add_handler("SIGTERM", _ObjectChangeHandler.clean_up) - - +# TODO(ralonsoh): in [1], the ``_ObjectChangeHandler`` was changed to send the +# RPC resource update in the same thread of the API call. Once ``eventlet`` is +# deprecated, it could be possible to revert to the previous architecture using +# preemptive threads. +# [1] https://review.opendev.org/c/openstack/neutron/+/926922 class _ObjectChangeHandler(object): - MAX_IDLE_FOR = 1 - _TO_CLEAN = weakref.WeakSet() def __init__(self, resource, object_class, resource_push_api): self._resource = resource self._obj_class = object_class self._resource_push_api = resource_push_api - self._resources_to_push = queue.Queue() self._semantic_warned = False for event in (events.AFTER_CREATE, events.AFTER_UPDATE, events.AFTER_DELETE): registry.subscribe(self.handle_event, resource, event) - self._stop = threading.Event() - self._worker = threading.Thread( - target=self.dispatch_events, - name='ObjectChangeHandler[%s]' % self._resource, - daemon=True) - self._worker.start() - self._TO_CLEAN.add(self) - - def stop(self): - self._stop.set() - - def wait(self): - """Waits for all outstanding events to be dispatched.""" - self._resources_to_push.join() - def _is_session_semantic_violated(self, context, resource, event): """Return True and print an ugly error on transaction violation. @@ -104,49 +79,30 @@ class _ObjectChangeHandler(object): resource_id = payload.resource_id # we preserve the context so we can trace a receive on the agent back # to the server-side event that triggered it - self._resources_to_push.put((resource_id, payload.context.to_dict())) + self.dispatch_event(resource_id, payload.context.to_dict()) - def dispatch_events(self): - # TODO(kevinbenton): now that we are batching these, convert to a - # single get_objects call for all of them - LOG.debug('Thread %(name)s started', {'name': self._worker.name}) - while not self._stop.is_set(): - try: - resource_id, context_dict = self._resources_to_push.get( - timeout=self.MAX_IDLE_FOR) - context = n_ctx.Context.from_dict(context_dict) - # attempt to get regardless of event type so concurrent delete - # after create/update is the same code-path as a delete event - with db_api.get_context_manager().independent.reader.using( - context): - obj = self._obj_class.get_object(context, id=resource_id) - # CREATE events are always treated as UPDATE events to ensure - # listeners are written to handle out-of-order messages - if obj is None: - rpc_event = rpc_events.DELETED - # construct a fake object with the right ID so we can - # have a payload for the delete message. - obj = self._obj_class(id=resource_id) - else: - rpc_event = rpc_events.UPDATED - self._resource_push_api.push(context, [obj], rpc_event) - self._resources_to_push.task_done() - except queue.Empty: - pass - except Exception as e: - LOG.exception( - "Exception while dispatching %(res)s events: %(e)s", - {'res': self._resource, 'e': e}) - LOG.debug('Thread %(name)s finished with %(msgs)s unsent messages', - {'name': self._worker.name, - 'msgs': self._resources_to_push.unfinished_tasks}) - - @classmethod - def clean_up(cls, *args, **kwargs): - """Ensure all threads that were created were destroyed cleanly.""" - while cls._TO_CLEAN: - worker = cls._TO_CLEAN.pop() - worker.stop() + def dispatch_event(self, resource_id, context_dict): + try: + context = n_ctx.Context.from_dict(context_dict) + # attempt to get regardless of event type so concurrent delete + # after create/update is the same code-path as a delete event + with db_api.get_context_manager().independent.reader.using( + context): + obj = self._obj_class.get_object(context, id=resource_id) + # CREATE events are always treated as UPDATE events to ensure + # listeners are written to handle out-of-order messages + if obj is None: + rpc_event = rpc_events.DELETED + # construct a fake object with the right ID so we can + # have a payload for the delete message. + obj = self._obj_class(id=resource_id) + else: + rpc_event = rpc_events.UPDATED + self._resource_push_api.push(context, [obj], rpc_event) + except Exception as e: + LOG.exception( + "Exception while dispatching %(res)s events: %(e)s", + {'res': self._resource, 'e': e}) class OVOServerRpcInterface(object): @@ -158,10 +114,6 @@ class OVOServerRpcInterface(object): def __init__(self, enable_signals=True): self._rpc_pusher = resources_rpc.ResourcesPushRpcApi() self._setup_change_handlers() - # When running behind wsgi server (like apache2/mod_wsgi) - # we should not register signals - if enable_signals: - _setup_change_handlers_cleanup() LOG.debug("ML2 OVO RPC backend initialized.") def _setup_change_handlers(self): @@ -178,8 +130,3 @@ class OVOServerRpcInterface(object): res: _ObjectChangeHandler(res, obj_class, self._rpc_pusher) for res, obj_class in resource_objclass_map.items() } - - def wait(self): - """Wait for all handlers to finish processing async events.""" - for handler in self._resource_handlers.values(): - handler.wait() diff --git a/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py b/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py index b984e61a1ac..673c589c09d 100644 --- a/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py +++ b/neutron/tests/unit/plugins/ml2/test_ovo_rpc.py @@ -41,7 +41,6 @@ class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase): def _assert_object_received(self, ovotype, oid=None, event=None, count=1): - self.plugin.ovo_notifier.wait() match = 0 for obj, evt in self.received: if isinstance(obj, ovotype):