|
|
|
@ -11,17 +11,18 @@
|
|
|
|
|
# License for the specific language governing permissions and limitations |
|
|
|
|
# under the License. |
|
|
|
|
|
|
|
|
|
import atexit |
|
|
|
|
import queue |
|
|
|
|
import signal |
|
|
|
|
import threading |
|
|
|
|
import traceback |
|
|
|
|
|
|
|
|
|
import futurist |
|
|
|
|
from futurist import waiters |
|
|
|
|
import weakref |
|
|
|
|
|
|
|
|
|
from neutron_lib.callbacks import events |
|
|
|
|
from neutron_lib.callbacks import registry |
|
|
|
|
from neutron_lib.callbacks import resources |
|
|
|
|
from neutron_lib import context as n_ctx |
|
|
|
|
from neutron_lib.db import api as db_api |
|
|
|
|
from oslo_concurrency import lockutils |
|
|
|
|
from oslo_log import log as logging |
|
|
|
|
|
|
|
|
|
from neutron._i18n import _ |
|
|
|
@ -36,27 +37,32 @@ LOG = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class _ObjectChangeHandler(object): |
|
|
|
|
MAX_IDLE_FOR = 1 |
|
|
|
|
_TO_CLEAN = weakref.WeakSet() |
|
|
|
|
|
|
|
|
|
def __init__(self, resource, object_class, resource_push_api): |
|
|
|
|
self._resource = resource |
|
|
|
|
self._obj_class = object_class |
|
|
|
|
self._resource_push_api = resource_push_api |
|
|
|
|
self._resources_to_push = {} |
|
|
|
|
|
|
|
|
|
# NOTE(annp): uWSGI seems not happy with eventlet.GreenPool. |
|
|
|
|
# So switching to ThreadPool |
|
|
|
|
self._worker_pool = futurist.ThreadPoolExecutor() |
|
|
|
|
self.fts = [] |
|
|
|
|
|
|
|
|
|
self._resources_to_push = queue.Queue() |
|
|
|
|
self._semantic_warned = False |
|
|
|
|
for event in (events.AFTER_CREATE, events.AFTER_UPDATE, |
|
|
|
|
events.AFTER_DELETE): |
|
|
|
|
registry.subscribe(self.handle_event, resource, event) |
|
|
|
|
self._stop = threading.Event() |
|
|
|
|
self._worker = threading.Thread( |
|
|
|
|
target=self.dispatch_events, |
|
|
|
|
name='ObjectChangeHandler[%s]' % self._resource, |
|
|
|
|
daemon=True) |
|
|
|
|
self._worker.start() |
|
|
|
|
self._TO_CLEAN.add(self) |
|
|
|
|
|
|
|
|
|
def stop(self): |
|
|
|
|
self._stop.set() |
|
|
|
|
|
|
|
|
|
def wait(self): |
|
|
|
|
"""Waits for all outstanding events to be dispatched.""" |
|
|
|
|
done, not_done = waiters.wait_for_all(self.fts) |
|
|
|
|
if not not_done: |
|
|
|
|
del self.fts[:] |
|
|
|
|
self._resources_to_push.join() |
|
|
|
|
|
|
|
|
|
def _is_session_semantic_violated(self, context, resource, event): |
|
|
|
|
"""Return True and print an ugly error on transaction violation. |
|
|
|
@ -90,34 +96,42 @@ class _ObjectChangeHandler(object):
|
|
|
|
|
resource_id = self._extract_resource_id(kwargs) |
|
|
|
|
# we preserve the context so we can trace a receive on the agent back |
|
|
|
|
# to the server-side event that triggered it |
|
|
|
|
self._resources_to_push[resource_id] = context.to_dict() |
|
|
|
|
# spawn worker so we don't block main AFTER_UPDATE thread |
|
|
|
|
self.fts.append(self._worker_pool.submit(self.dispatch_events)) |
|
|
|
|
self._resources_to_push.put((resource_id, context.to_dict())) |
|
|
|
|
|
|
|
|
|
@lockutils.synchronized('event-dispatch') |
|
|
|
|
def dispatch_events(self): |
|
|
|
|
# this is guarded by a lock to ensure we don't get too many concurrent |
|
|
|
|
# dispatchers hitting the database simultaneously. |
|
|
|
|
to_dispatch, self._resources_to_push = self._resources_to_push, {} |
|
|
|
|
# TODO(kevinbenton): now that we are batching these, convert to a |
|
|
|
|
# single get_objects call for all of them |
|
|
|
|
for resource_id, context_dict in to_dispatch.items(): |
|
|
|
|
context = n_ctx.Context.from_dict(context_dict) |
|
|
|
|
# attempt to get regardless of event type so concurrent delete |
|
|
|
|
# after create/update is the same code-path as a delete event |
|
|
|
|
with db_api.get_context_manager().independent.reader.using( |
|
|
|
|
context): |
|
|
|
|
obj = self._obj_class.get_object(context, id=resource_id) |
|
|
|
|
# CREATE events are always treated as UPDATE events to ensure |
|
|
|
|
# listeners are written to handle out-of-order messages |
|
|
|
|
if obj is None: |
|
|
|
|
rpc_event = rpc_events.DELETED |
|
|
|
|
# construct a fake object with the right ID so we can |
|
|
|
|
# have a payload for the delete message. |
|
|
|
|
obj = self._obj_class(id=resource_id) |
|
|
|
|
else: |
|
|
|
|
rpc_event = rpc_events.UPDATED |
|
|
|
|
self._resource_push_api.push(context, [obj], rpc_event) |
|
|
|
|
LOG.debug('Thread %(name)s started', {'name': self._worker.name}) |
|
|
|
|
while not self._stop.is_set(): |
|
|
|
|
try: |
|
|
|
|
resource_id, context_dict = self._resources_to_push.get( |
|
|
|
|
timeout=self.MAX_IDLE_FOR) |
|
|
|
|
context = n_ctx.Context.from_dict(context_dict) |
|
|
|
|
# attempt to get regardless of event type so concurrent delete |
|
|
|
|
# after create/update is the same code-path as a delete event |
|
|
|
|
with db_api.get_context_manager().independent.reader.using( |
|
|
|
|
context): |
|
|
|
|
obj = self._obj_class.get_object(context, id=resource_id) |
|
|
|
|
# CREATE events are always treated as UPDATE events to ensure |
|
|
|
|
# listeners are written to handle out-of-order messages |
|
|
|
|
if obj is None: |
|
|
|
|
rpc_event = rpc_events.DELETED |
|
|
|
|
# construct a fake object with the right ID so we can |
|
|
|
|
# have a payload for the delete message. |
|
|
|
|
obj = self._obj_class(id=resource_id) |
|
|
|
|
else: |
|
|
|
|
rpc_event = rpc_events.UPDATED |
|
|
|
|
self._resource_push_api.push(context, [obj], rpc_event) |
|
|
|
|
self._resources_to_push.task_done() |
|
|
|
|
except queue.Empty: |
|
|
|
|
pass |
|
|
|
|
except Exception as e: |
|
|
|
|
LOG.exception( |
|
|
|
|
"Exception while dispatching %(res)s events: %(e)s", |
|
|
|
|
{'res': self._resource, 'e': e}) |
|
|
|
|
LOG.debug('Thread %(name)s finished with %(msgs)s unsent messages', |
|
|
|
|
{'name': self._worker.name, |
|
|
|
|
'msgs': self._resources_to_push.unfinished_tasks}) |
|
|
|
|
|
|
|
|
|
def _extract_resource_id(self, callback_kwargs): |
|
|
|
|
id_kwarg = '%s_id' % self._resource |
|
|
|
@ -127,6 +141,13 @@ class _ObjectChangeHandler(object):
|
|
|
|
|
return callback_kwargs[self._resource]['id'] |
|
|
|
|
raise RuntimeError(_("Couldn't find resource ID in callback event")) |
|
|
|
|
|
|
|
|
|
@classmethod |
|
|
|
|
def clean_up(cls, *args, **kwargs): |
|
|
|
|
"""Ensure all threads that were created were destroyed cleanly.""" |
|
|
|
|
while cls._TO_CLEAN: |
|
|
|
|
worker = cls._TO_CLEAN.pop() |
|
|
|
|
worker.stop() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OVOServerRpcInterface(object): |
|
|
|
|
"""ML2 server-side RPC interface. |
|
|
|
@ -157,3 +178,8 @@ class OVOServerRpcInterface(object):
|
|
|
|
|
"""Wait for all handlers to finish processing async events.""" |
|
|
|
|
for handler in self._resource_handlers.values(): |
|
|
|
|
handler.wait() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
atexit.register(_ObjectChangeHandler.clean_up) |
|
|
|
|
signal.signal(signal.SIGINT, _ObjectChangeHandler.clean_up) |
|
|
|
|
signal.signal(signal.SIGTERM, _ObjectChangeHandler.clean_up) |
|
|
|
|