Fix ObjectChangeHandler thread usage
Instead of spawning new thread for each OVO change,
keep one long running thread fetching changes
from queue.
Conflicts:
neutron/plugins/ml2/ovo_rpc.py
Closes-Bug: #1926417
Change-Id: I390cabeaf6ebbc1c6206fe2cc226ef437462c7fd
(cherry picked from commit 9ae22b1fef
)
This commit is contained in:
parent
607dacdd18
commit
9384908c40
|
@ -11,17 +11,18 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import atexit
|
||||||
|
import queue
|
||||||
|
import signal
|
||||||
|
import threading
|
||||||
import traceback
|
import traceback
|
||||||
|
import weakref
|
||||||
import futurist
|
|
||||||
from futurist import waiters
|
|
||||||
|
|
||||||
from neutron_lib.callbacks import events
|
from neutron_lib.callbacks import events
|
||||||
from neutron_lib.callbacks import registry
|
from neutron_lib.callbacks import registry
|
||||||
from neutron_lib.callbacks import resources
|
from neutron_lib.callbacks import resources
|
||||||
from neutron_lib import context as n_ctx
|
from neutron_lib import context as n_ctx
|
||||||
from neutron_lib.db import api as db_api
|
from neutron_lib.db import api as db_api
|
||||||
from oslo_concurrency import lockutils
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from neutron._i18n import _
|
from neutron._i18n import _
|
||||||
|
@ -36,27 +37,32 @@ LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class _ObjectChangeHandler(object):
|
class _ObjectChangeHandler(object):
|
||||||
|
MAX_IDLE_FOR = 1
|
||||||
|
_TO_CLEAN = weakref.WeakSet()
|
||||||
|
|
||||||
def __init__(self, resource, object_class, resource_push_api):
|
def __init__(self, resource, object_class, resource_push_api):
|
||||||
self._resource = resource
|
self._resource = resource
|
||||||
self._obj_class = object_class
|
self._obj_class = object_class
|
||||||
self._resource_push_api = resource_push_api
|
self._resource_push_api = resource_push_api
|
||||||
self._resources_to_push = {}
|
self._resources_to_push = queue.Queue()
|
||||||
|
|
||||||
# NOTE(annp): uWSGI seems not happy with eventlet.GreenPool.
|
|
||||||
# So switching to ThreadPool
|
|
||||||
self._worker_pool = futurist.ThreadPoolExecutor()
|
|
||||||
self.fts = []
|
|
||||||
|
|
||||||
self._semantic_warned = False
|
self._semantic_warned = False
|
||||||
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
|
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
|
||||||
events.AFTER_DELETE):
|
events.AFTER_DELETE):
|
||||||
registry.subscribe(self.handle_event, resource, event)
|
registry.subscribe(self.handle_event, resource, event)
|
||||||
|
self._stop = threading.Event()
|
||||||
|
self._worker = threading.Thread(
|
||||||
|
target=self.dispatch_events,
|
||||||
|
name='ObjectChangeHandler[%s]' % self._resource,
|
||||||
|
daemon=True)
|
||||||
|
self._worker.start()
|
||||||
|
self._TO_CLEAN.add(self)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._stop.set()
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
"""Waits for all outstanding events to be dispatched."""
|
"""Waits for all outstanding events to be dispatched."""
|
||||||
done, not_done = waiters.wait_for_all(self.fts)
|
self._resources_to_push.join()
|
||||||
if not not_done:
|
|
||||||
del self.fts[:]
|
|
||||||
|
|
||||||
def _is_session_semantic_violated(self, context, resource, event):
|
def _is_session_semantic_violated(self, context, resource, event):
|
||||||
"""Return True and print an ugly error on transaction violation.
|
"""Return True and print an ugly error on transaction violation.
|
||||||
|
@ -90,18 +96,16 @@ class _ObjectChangeHandler(object):
|
||||||
resource_id = self._extract_resource_id(kwargs)
|
resource_id = self._extract_resource_id(kwargs)
|
||||||
# we preserve the context so we can trace a receive on the agent back
|
# we preserve the context so we can trace a receive on the agent back
|
||||||
# to the server-side event that triggered it
|
# to the server-side event that triggered it
|
||||||
self._resources_to_push[resource_id] = context.to_dict()
|
self._resources_to_push.put((resource_id, context.to_dict()))
|
||||||
# spawn worker so we don't block main AFTER_UPDATE thread
|
|
||||||
self.fts.append(self._worker_pool.submit(self.dispatch_events))
|
|
||||||
|
|
||||||
@lockutils.synchronized('event-dispatch')
|
|
||||||
def dispatch_events(self):
|
def dispatch_events(self):
|
||||||
# this is guarded by a lock to ensure we don't get too many concurrent
|
|
||||||
# dispatchers hitting the database simultaneously.
|
|
||||||
to_dispatch, self._resources_to_push = self._resources_to_push, {}
|
|
||||||
# TODO(kevinbenton): now that we are batching these, convert to a
|
# TODO(kevinbenton): now that we are batching these, convert to a
|
||||||
# single get_objects call for all of them
|
# single get_objects call for all of them
|
||||||
for resource_id, context_dict in to_dispatch.items():
|
LOG.debug('Thread %(name)s started', {'name': self._worker.name})
|
||||||
|
while not self._stop.is_set():
|
||||||
|
try:
|
||||||
|
resource_id, context_dict = self._resources_to_push.get(
|
||||||
|
timeout=self.MAX_IDLE_FOR)
|
||||||
context = n_ctx.Context.from_dict(context_dict)
|
context = n_ctx.Context.from_dict(context_dict)
|
||||||
# attempt to get regardless of event type so concurrent delete
|
# attempt to get regardless of event type so concurrent delete
|
||||||
# after create/update is the same code-path as a delete event
|
# after create/update is the same code-path as a delete event
|
||||||
|
@ -118,6 +122,16 @@ class _ObjectChangeHandler(object):
|
||||||
else:
|
else:
|
||||||
rpc_event = rpc_events.UPDATED
|
rpc_event = rpc_events.UPDATED
|
||||||
self._resource_push_api.push(context, [obj], rpc_event)
|
self._resource_push_api.push(context, [obj], rpc_event)
|
||||||
|
self._resources_to_push.task_done()
|
||||||
|
except queue.Empty:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception(
|
||||||
|
"Exception while dispatching %(res)s events: %(e)s",
|
||||||
|
{'res': self._resource, 'e': e})
|
||||||
|
LOG.debug('Thread %(name)s finished with %(msgs)s unsent messages',
|
||||||
|
{'name': self._worker.name,
|
||||||
|
'msgs': self._resources_to_push.unfinished_tasks})
|
||||||
|
|
||||||
def _extract_resource_id(self, callback_kwargs):
|
def _extract_resource_id(self, callback_kwargs):
|
||||||
id_kwarg = '%s_id' % self._resource
|
id_kwarg = '%s_id' % self._resource
|
||||||
|
@ -127,6 +141,13 @@ class _ObjectChangeHandler(object):
|
||||||
return callback_kwargs[self._resource]['id']
|
return callback_kwargs[self._resource]['id']
|
||||||
raise RuntimeError(_("Couldn't find resource ID in callback event"))
|
raise RuntimeError(_("Couldn't find resource ID in callback event"))
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def clean_up(cls, *args, **kwargs):
|
||||||
|
"""Ensure all threads that were created were destroyed cleanly."""
|
||||||
|
while cls._TO_CLEAN:
|
||||||
|
worker = cls._TO_CLEAN.pop()
|
||||||
|
worker.stop()
|
||||||
|
|
||||||
|
|
||||||
class OVOServerRpcInterface(object):
|
class OVOServerRpcInterface(object):
|
||||||
"""ML2 server-side RPC interface.
|
"""ML2 server-side RPC interface.
|
||||||
|
@ -157,3 +178,8 @@ class OVOServerRpcInterface(object):
|
||||||
"""Wait for all handlers to finish processing async events."""
|
"""Wait for all handlers to finish processing async events."""
|
||||||
for handler in self._resource_handlers.values():
|
for handler in self._resource_handlers.values():
|
||||||
handler.wait()
|
handler.wait()
|
||||||
|
|
||||||
|
|
||||||
|
atexit.register(_ObjectChangeHandler.clean_up)
|
||||||
|
signal.signal(signal.SIGINT, _ObjectChangeHandler.clean_up)
|
||||||
|
signal.signal(signal.SIGTERM, _ObjectChangeHandler.clean_up)
|
||||||
|
|
Loading…
Reference in New Issue