Make RPC event cast synchronous with the event

Sometimes, the methods ``NeutronObject.get_object`` and
``ResourcesPushRpcApi.push`` yield the GIL during the execution.
Because of that, the thread in charge of sending the RPC information
doesn't finish until other operation is pushed (implemented in [1]).

By making the RPC cast synchronous with the update/delete events, it
is ensured that both operations will finish and the agents will receive
the RPC event on time, just after the event happens.

This issue is hitting more frequently in the migration to the WSGI
server, due to [2]. Once the eventlet library has been deprecated from
OpenStack, it will be possible to use the previous model (using a long
thread to handle the RCP updates to the agents). It is commented in the
code as a TODO.

This patch is temporarily reverting [3]. This code should be restored
too.

[1]https://review.opendev.org/c/openstack/neutron/+/788510
[2]https://review.opendev.org/c/openstack/neutron/+/925376
[3]https://review.opendev.org/c/openstack/neutron/+/824508

Closes-Bug: #2077790
Related-Bug: #2075147
Change-Id: I7b806e6de74164ad9730480a115a76d30e7f15fc
This commit is contained in:
Rodolfo Alonso Hernandez 2024-08-24 10:35:03 +00:00 committed by Rodolfo Alonso
parent f19bbae345
commit ae90e2ccbf
2 changed files with 28 additions and 82 deletions
neutron
plugins/ml2
tests/unit/plugins/ml2

@ -11,11 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import atexit
import queue
import threading
import traceback
import weakref
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
@ -23,7 +19,6 @@ from neutron_lib.callbacks import resources
from neutron_lib import context as n_ctx
from neutron_lib.db import api as db_api
from oslo_log import log as logging
from oslo_service import service
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.handlers import resources_rpc
@ -36,42 +31,22 @@ from neutron.objects import subnet
LOG = logging.getLogger(__name__)
def _setup_change_handlers_cleanup():
atexit.register(_ObjectChangeHandler.clean_up)
sh = service.SignalHandler()
sh.add_handler("SIGINT", _ObjectChangeHandler.clean_up)
sh.add_handler("SIGTERM", _ObjectChangeHandler.clean_up)
# TODO(ralonsoh): in [1], the ``_ObjectChangeHandler`` was changed to send the
# RPC resource update in the same thread of the API call. Once ``eventlet`` is
# deprecated, it could be possible to revert to the previous architecture using
# preemptive threads.
# [1] https://review.opendev.org/c/openstack/neutron/+/926922
class _ObjectChangeHandler(object):
MAX_IDLE_FOR = 1
_TO_CLEAN = weakref.WeakSet()
def __init__(self, resource, object_class, resource_push_api):
self._resource = resource
self._obj_class = object_class
self._resource_push_api = resource_push_api
self._resources_to_push = queue.Queue()
self._semantic_warned = False
for event in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self.handle_event, resource, event)
self._stop = threading.Event()
self._worker = threading.Thread(
target=self.dispatch_events,
name='ObjectChangeHandler[%s]' % self._resource,
daemon=True)
self._worker.start()
self._TO_CLEAN.add(self)
def stop(self):
self._stop.set()
def wait(self):
"""Waits for all outstanding events to be dispatched."""
self._resources_to_push.join()
def _is_session_semantic_violated(self, context, resource, event):
"""Return True and print an ugly error on transaction violation.
@ -104,49 +79,30 @@ class _ObjectChangeHandler(object):
resource_id = payload.resource_id
# we preserve the context so we can trace a receive on the agent back
# to the server-side event that triggered it
self._resources_to_push.put((resource_id, payload.context.to_dict()))
self.dispatch_event(resource_id, payload.context.to_dict())
def dispatch_events(self):
# TODO(kevinbenton): now that we are batching these, convert to a
# single get_objects call for all of them
LOG.debug('Thread %(name)s started', {'name': self._worker.name})
while not self._stop.is_set():
try:
resource_id, context_dict = self._resources_to_push.get(
timeout=self.MAX_IDLE_FOR)
context = n_ctx.Context.from_dict(context_dict)
# attempt to get regardless of event type so concurrent delete
# after create/update is the same code-path as a delete event
with db_api.get_context_manager().independent.reader.using(
context):
obj = self._obj_class.get_object(context, id=resource_id)
# CREATE events are always treated as UPDATE events to ensure
# listeners are written to handle out-of-order messages
if obj is None:
rpc_event = rpc_events.DELETED
# construct a fake object with the right ID so we can
# have a payload for the delete message.
obj = self._obj_class(id=resource_id)
else:
rpc_event = rpc_events.UPDATED
self._resource_push_api.push(context, [obj], rpc_event)
self._resources_to_push.task_done()
except queue.Empty:
pass
except Exception as e:
LOG.exception(
"Exception while dispatching %(res)s events: %(e)s",
{'res': self._resource, 'e': e})
LOG.debug('Thread %(name)s finished with %(msgs)s unsent messages',
{'name': self._worker.name,
'msgs': self._resources_to_push.unfinished_tasks})
@classmethod
def clean_up(cls, *args, **kwargs):
"""Ensure all threads that were created were destroyed cleanly."""
while cls._TO_CLEAN:
worker = cls._TO_CLEAN.pop()
worker.stop()
def dispatch_event(self, resource_id, context_dict):
try:
context = n_ctx.Context.from_dict(context_dict)
# attempt to get regardless of event type so concurrent delete
# after create/update is the same code-path as a delete event
with db_api.get_context_manager().independent.reader.using(
context):
obj = self._obj_class.get_object(context, id=resource_id)
# CREATE events are always treated as UPDATE events to ensure
# listeners are written to handle out-of-order messages
if obj is None:
rpc_event = rpc_events.DELETED
# construct a fake object with the right ID so we can
# have a payload for the delete message.
obj = self._obj_class(id=resource_id)
else:
rpc_event = rpc_events.UPDATED
self._resource_push_api.push(context, [obj], rpc_event)
except Exception as e:
LOG.exception(
"Exception while dispatching %(res)s events: %(e)s",
{'res': self._resource, 'e': e})
class OVOServerRpcInterface(object):
@ -158,10 +114,6 @@ class OVOServerRpcInterface(object):
def __init__(self, enable_signals=True):
self._rpc_pusher = resources_rpc.ResourcesPushRpcApi()
self._setup_change_handlers()
# When running behind wsgi server (like apache2/mod_wsgi)
# we should not register signals
if enable_signals:
_setup_change_handlers_cleanup()
LOG.debug("ML2 OVO RPC backend initialized.")
def _setup_change_handlers(self):
@ -178,8 +130,3 @@ class OVOServerRpcInterface(object):
res: _ObjectChangeHandler(res, obj_class, self._rpc_pusher)
for res, obj_class in resource_objclass_map.items()
}
def wait(self):
"""Wait for all handlers to finish processing async events."""
for handler in self._resource_handlers.values():
handler.wait()

@ -41,7 +41,6 @@ class OVOServerRpcInterfaceTestCase(test_plugin.Ml2PluginV2TestCase):
def _assert_object_received(self, ovotype, oid=None, event=None,
count=1):
self.plugin.ovo_notifier.wait()
match = 0
for obj, evt in self.received:
if isinstance(obj, ovotype):