Browse Source

Merge "Fix ObjectChangeHandler thread usage" into stable/train

changes/43/819443/2
Zuul 4 months ago committed by Gerrit Code Review
parent
commit
14ec66a4a9
  1. 104
      neutron/plugins/ml2/ovo_rpc.py

104
neutron/plugins/ml2/ovo_rpc.py

@ -11,17 +11,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import queue
import signal
import threading
import traceback
import futurist
from futurist import waiters
import weakref
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import context as n_ctx
from neutron_lib.db import api as db_api
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron._i18n import _
@ -35,28 +36,39 @@ from neutron.objects import subnet
LOG = logging.getLogger(__name__)
def _setup_change_handlers_cleanup():
atexit.register(_ObjectChangeHandler.clean_up)
signal.signal(signal.SIGINT, _ObjectChangeHandler.clean_up)
signal.signal(signal.SIGTERM, _ObjectChangeHandler.clean_up)
class _ObjectChangeHandler(object):
MAX_IDLE_FOR = 1
_TO_CLEAN = weakref.WeakSet()
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource
self._obj_class = object_class
self._resource_push_api = resource_push_api
self._resources_to_push = {}
# NOTE(annp): uWSGI seems not happy with eventlet.GreenPool.
# So switching to ThreadPool
self._worker_pool = futurist.ThreadPoolExecutor()
self.fts = []
self._resources_to_push = queue.Queue()
self._semantic_warned = False
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)
self._stop = threading.Event()
self._worker = threading.Thread(
target=self.dispatch_events,
name='ObjectChangeHandler[%s]' % self._resource)
self._worker.setDaemon(True)
self._worker.start()
self._TO_CLEAN.add(self)
def stop(self):
self._stop.set()
def wait(self):
"""Waits for all outstanding events to be dispatched."""
done, not_done = waiters.wait_for_all(self.fts)
if not not_done:
del self.fts[:]
self._resources_to_push.join()
def _is_session_semantic_violated(self, context, resource, event):
"""Return True and print an ugly error on transaction violation.
@ -90,34 +102,42 @@ class _ObjectChangeHandler(object):
resource_id = self._extract_resource_id(kwargs)
# we preserve the context so we can trace a receive on the agent back
# to the server-side event that triggered it
self._resources_to_push[resource_id] = context.to_dict()
# spawn worker so we don't block main AFTER_UPDATE thread
self.fts.append(self._worker_pool.submit(self.dispatch_events))
self._resources_to_push.put((resource_id, context.to_dict()))
@lockutils.synchronized('event-dispatch')
def dispatch_events(self):
# this is guarded by a lock to ensure we don't get too many concurrent
# dispatchers hitting the database simultaneously.
to_dispatch, self._resources_to_push = self._resources_to_push, {}
# TODO(kevinbenton): now that we are batching these, convert to a
# single get_objects call for all of them
for resource_id, context_dict in to_dispatch.items():
context = n_ctx.Context.from_dict(context_dict)
# attempt to get regardless of event type so concurrent delete
# after create/update is the same code-path as a delete event
with db_api.get_context_manager().independent.reader.using(
context):
obj = self._obj_class.get_object(context, id=resource_id)
# CREATE events are always treated as UPDATE events to ensure
# listeners are written to handle out-of-order messages
if obj is None:
rpc_event = rpc_events.DELETED
# construct a fake object with the right ID so we can
# have a payload for the delete message.
obj = self._obj_class(id=resource_id)
else:
rpc_event = rpc_events.UPDATED
self._resource_push_api.push(context, [obj], rpc_event)
LOG.debug('Thread %(name)s started', {'name': self._worker.name})
while not self._stop.is_set():
try:
resource_id, context_dict = self._resources_to_push.get(
timeout=self.MAX_IDLE_FOR)
context = n_ctx.Context.from_dict(context_dict)
# attempt to get regardless of event type so concurrent delete
# after create/update is the same code-path as a delete event
with db_api.get_context_manager().independent.reader.using(
context):
obj = self._obj_class.get_object(context, id=resource_id)
# CREATE events are always treated as UPDATE events to ensure
# listeners are written to handle out-of-order messages
if obj is None:
rpc_event = rpc_events.DELETED
# construct a fake object with the right ID so we can
# have a payload for the delete message.
obj = self._obj_class(id=resource_id)
else:
rpc_event = rpc_events.UPDATED
self._resource_push_api.push(context, [obj], rpc_event)
self._resources_to_push.task_done()
except queue.Empty:
pass
except Exception as e:
LOG.exception(
"Exception while dispatching %(res)s events: %(e)s",
{'res': self._resource, 'e': e})
LOG.debug('Thread %(name)s finished with %(msgs)s unsent messages',
{'name': self._worker.name,
'msgs': self._resources_to_push.unfinished_tasks})
def _extract_resource_id(self, callback_kwargs):
id_kwarg = '%s_id' % self._resource
@ -127,6 +147,13 @@ class _ObjectChangeHandler(object):
return callback_kwargs[self._resource]['id']
raise RuntimeError(_("Couldn't find resource ID in callback event"))
@classmethod
def clean_up(cls, *args, **kwargs):
"""Ensure all threads that were created were destroyed cleanly."""
while cls._TO_CLEAN:
worker = cls._TO_CLEAN.pop()
worker.stop()
class OVOServerRpcInterface(object):
"""ML2 server-side RPC interface.
@ -137,6 +164,7 @@ class OVOServerRpcInterface(object):
def __init__(self):
self._rpc_pusher = resources_rpc.ResourcesPushRpcApi()
self._setup_change_handlers()
_setup_change_handlers_cleanup()
LOG.debug("ML2 OVO RPC backend initialized.")
def _setup_change_handlers(self):

Loading…
Cancel
Save