From 9384908c40c731a5e297b0cd1f41821ed23c638c Mon Sep 17 00:00:00 2001 From: Szymon Wroblewski Date: Mon, 26 Apr 2021 17:25:42 +0200 Subject: [PATCH] Fix ObjectChangeHandler thread usage Instead of spawning new thread for each OVO change, keep one long running thread fetching changes from queue. Conflicts: neutron/plugins/ml2/ovo_rpc.py Closes-Bug: #1926417 Change-Id: I390cabeaf6ebbc1c6206fe2cc226ef437462c7fd (cherry picked from commit 9ae22b1fefabe8bf18d5dd26f49821b673bd39d4) --- neutron/plugins/ml2/ovo_rpc.py | 102 +++++++++++++++++++++------------ 1 file changed, 64 insertions(+), 38 deletions(-) diff --git a/neutron/plugins/ml2/ovo_rpc.py b/neutron/plugins/ml2/ovo_rpc.py index a526900e117..043c9099772 100644 --- a/neutron/plugins/ml2/ovo_rpc.py +++ b/neutron/plugins/ml2/ovo_rpc.py @@ -11,17 +11,18 @@ # License for the specific language governing permissions and limitations # under the License. +import atexit +import queue +import signal +import threading import traceback - -import futurist -from futurist import waiters +import weakref from neutron_lib.callbacks import events from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources from neutron_lib import context as n_ctx from neutron_lib.db import api as db_api -from oslo_concurrency import lockutils from oslo_log import log as logging from neutron._i18n import _ @@ -36,27 +37,32 @@ LOG = logging.getLogger(__name__) class _ObjectChangeHandler(object): + MAX_IDLE_FOR = 1 + _TO_CLEAN = weakref.WeakSet() + def __init__(self, resource, object_class, resource_push_api): self._resource = resource self._obj_class = object_class self._resource_push_api = resource_push_api - self._resources_to_push = {} - - # NOTE(annp): uWSGI seems not happy with eventlet.GreenPool. - # So switching to ThreadPool - self._worker_pool = futurist.ThreadPoolExecutor() - self.fts = [] - + self._resources_to_push = queue.Queue() self._semantic_warned = False for event in (events.AFTER_CREATE, events.AFTER_UPDATE, events.AFTER_DELETE): registry.subscribe(self.handle_event, resource, event) + self._stop = threading.Event() + self._worker = threading.Thread( + target=self.dispatch_events, + name='ObjectChangeHandler[%s]' % self._resource, + daemon=True) + self._worker.start() + self._TO_CLEAN.add(self) + + def stop(self): + self._stop.set() def wait(self): """Waits for all outstanding events to be dispatched.""" - done, not_done = waiters.wait_for_all(self.fts) - if not not_done: - del self.fts[:] + self._resources_to_push.join() def _is_session_semantic_violated(self, context, resource, event): """Return True and print an ugly error on transaction violation. @@ -90,34 +96,42 @@ class _ObjectChangeHandler(object): resource_id = self._extract_resource_id(kwargs) # we preserve the context so we can trace a receive on the agent back # to the server-side event that triggered it - self._resources_to_push[resource_id] = context.to_dict() - # spawn worker so we don't block main AFTER_UPDATE thread - self.fts.append(self._worker_pool.submit(self.dispatch_events)) + self._resources_to_push.put((resource_id, context.to_dict())) - @lockutils.synchronized('event-dispatch') def dispatch_events(self): - # this is guarded by a lock to ensure we don't get too many concurrent - # dispatchers hitting the database simultaneously. - to_dispatch, self._resources_to_push = self._resources_to_push, {} # TODO(kevinbenton): now that we are batching these, convert to a # single get_objects call for all of them - for resource_id, context_dict in to_dispatch.items(): - context = n_ctx.Context.from_dict(context_dict) - # attempt to get regardless of event type so concurrent delete - # after create/update is the same code-path as a delete event - with db_api.get_context_manager().independent.reader.using( - context): - obj = self._obj_class.get_object(context, id=resource_id) - # CREATE events are always treated as UPDATE events to ensure - # listeners are written to handle out-of-order messages - if obj is None: - rpc_event = rpc_events.DELETED - # construct a fake object with the right ID so we can - # have a payload for the delete message. - obj = self._obj_class(id=resource_id) - else: - rpc_event = rpc_events.UPDATED - self._resource_push_api.push(context, [obj], rpc_event) + LOG.debug('Thread %(name)s started', {'name': self._worker.name}) + while not self._stop.is_set(): + try: + resource_id, context_dict = self._resources_to_push.get( + timeout=self.MAX_IDLE_FOR) + context = n_ctx.Context.from_dict(context_dict) + # attempt to get regardless of event type so concurrent delete + # after create/update is the same code-path as a delete event + with db_api.get_context_manager().independent.reader.using( + context): + obj = self._obj_class.get_object(context, id=resource_id) + # CREATE events are always treated as UPDATE events to ensure + # listeners are written to handle out-of-order messages + if obj is None: + rpc_event = rpc_events.DELETED + # construct a fake object with the right ID so we can + # have a payload for the delete message. + obj = self._obj_class(id=resource_id) + else: + rpc_event = rpc_events.UPDATED + self._resource_push_api.push(context, [obj], rpc_event) + self._resources_to_push.task_done() + except queue.Empty: + pass + except Exception as e: + LOG.exception( + "Exception while dispatching %(res)s events: %(e)s", + {'res': self._resource, 'e': e}) + LOG.debug('Thread %(name)s finished with %(msgs)s unsent messages', + {'name': self._worker.name, + 'msgs': self._resources_to_push.unfinished_tasks}) def _extract_resource_id(self, callback_kwargs): id_kwarg = '%s_id' % self._resource @@ -127,6 +141,13 @@ class _ObjectChangeHandler(object): return callback_kwargs[self._resource]['id'] raise RuntimeError(_("Couldn't find resource ID in callback event")) + @classmethod + def clean_up(cls, *args, **kwargs): + """Ensure all threads that were created were destroyed cleanly.""" + while cls._TO_CLEAN: + worker = cls._TO_CLEAN.pop() + worker.stop() + class OVOServerRpcInterface(object): """ML2 server-side RPC interface. @@ -157,3 +178,8 @@ class OVOServerRpcInterface(object): """Wait for all handlers to finish processing async events.""" for handler in self._resource_handlers.values(): handler.wait() + + +atexit.register(_ObjectChangeHandler.clean_up) +signal.signal(signal.SIGINT, _ObjectChangeHandler.clean_up) +signal.signal(signal.SIGTERM, _ObjectChangeHandler.clean_up)