From 507989fc622a6633926c4288827f482bae6054dd Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Sat, 29 Jan 2022 10:10:22 +0000 Subject: [PATCH] Use a thread local variable to store the Nova Notifier enable flag The Nova Notifier can be called simultaneously by several RPC callbacks from the agents (DHCP, L2), trying to update the provisioning status of a port. In order to handle each context notifier enable flag, a thread local variable is used. This will isolate the flag update if two entities inform at the same time and one RPC callback is attended during the processing of the other one. This patch also removes the debug messages added to debug this issue. Closes-Bug: #1958363 Change-Id: Ie670fba4b3afe427747732d2c3948d92311e960e --- neutron/notifiers/nova.py | 23 +++++++++------ neutron/plugins/ml2/plugin.py | 6 ---- neutron/tests/unit/notifiers/test_nova.py | 34 +++++++++++++++++++++++ 3 files changed, 48 insertions(+), 15 deletions(-) diff --git a/neutron/notifiers/nova.py b/neutron/notifiers/nova.py index 0f45e9837ad..a9b29b346ff 100644 --- a/neutron/notifiers/nova.py +++ b/neutron/notifiers/nova.py @@ -15,6 +15,7 @@ import contextlib +from eventlet.green import threading from keystoneauth1 import loading as ks_loading from neutron_lib.callbacks import events from neutron_lib.callbacks import registry @@ -45,6 +46,13 @@ NEUTRON_NOVA_EVENT_STATUS_MAP = {constants.PORT_STATUS_ACTIVE: 'completed', constants.PORT_STATUS_DOWN: 'completed'} NOVA_API_VERSION = "2.1" +NOTIFIER_ENABLE_DEFAULT = True +# NOTE(ralonsoh): the Nova notifier can be called simultaneously by several RPC +# callbacks from the agents (DHCP, L2), trying to update the provisioning +# status of a port. In order to handle each context notifier enable flag, a +# thread local variable is used. +_notifier_store = threading.local() + @registry.has_registry_receivers class Notifier(object): @@ -68,20 +76,14 @@ class Notifier(object): if ext.name == "server_external_events"] self.batch_notifier = batch_notifier.BatchNotifier( cfg.CONF.send_events_interval, self.send_events) - self._enabled = True @contextlib.contextmanager def context_enabled(self, enabled): - stored_enabled = self._enabled - LOG.debug("nova notifier stored_enabled: %s; enabled: %s", - stored_enabled, enabled) try: - self._enabled = enabled + _notifier_store.enable = enabled yield finally: - LOG.debug("Restoring value: %s for the _enabled flag.", - stored_enabled) - self._enabled = stored_enabled + _notifier_store.enable = NOTIFIER_ENABLE_DEFAULT def _get_nova_client(self): global_id = common_context.generate_request_id() @@ -180,7 +182,10 @@ class Notifier(object): return self._get_network_changed_event(port) def _can_notify(self, port): - if not self._enabled: + if getattr(_notifier_store, 'enable', None) is None: + _notifier_store.enable = NOTIFIER_ENABLE_DEFAULT + + if not _notifier_store.enable: LOG.debug("Nova notifier disabled") return False diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 75c3f598c74..8bb30a52e36 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -365,14 +365,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, send_nova_event = bool(trigger == provisioning_blocks.L2_AGENT_ENTITY) with self.nova_notifier.context_enabled(send_nova_event): - LOG.debug("Updating port %s status to %s; " - "nova_notifier enabled: %s", - port_id, const.PORT_STATUS_ACTIVE, send_nova_event) self.update_port_status(payload.context, port_id, const.PORT_STATUS_ACTIVE) - LOG.debug("After port %s status update nova_notifier is now " - "restored to old value: %s", - port_id, self.nova_notifier._enabled) else: self.update_port_status(payload.context, port_id, const.PORT_STATUS_ACTIVE) diff --git a/neutron/tests/unit/notifiers/test_nova.py b/neutron/tests/unit/notifiers/test_nova.py index 5d065336a93..47d654194b8 100644 --- a/neutron/tests/unit/notifiers/test_nova.py +++ b/neutron/tests/unit/notifiers/test_nova.py @@ -13,8 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import queue from unittest import mock +import eventlet from neutron_lib import constants as n_const from neutron_lib import context as n_ctx from neutron_lib import exceptions as n_exc @@ -371,3 +373,35 @@ class TestNovaNotify(base.BaseTestCase): self.assertEqual( expected_event, self.nova_notifier.batch_notifier._pending_events.get()) + + def test_notify_concurrent_enable_flag_update(self): + # This test assumes Neutron server uses eventlet. + # NOTE(ralonsoh): the exceptions raise inside a thread won't stop the + # test. The checks are stored in "_queue" and tested at the end of the + # test execution. + _queue = eventlet.queue.Queue() + + def _local_executor(thread_idx): + # This thread has not yet initialized the local "enable" flag. + _queue.put(getattr(nova._notifier_store, 'enable', None) is None) + eventlet.sleep(0) # Next thread execution. + new_enable = bool(thread_idx % 2) + with self.nova_notifier.context_enabled(new_enable): + # At this point, the Nova Notifier should have updated the + # "enable" flag. + _queue.put(new_enable == nova._notifier_store.enable) + eventlet.sleep(0) # Next thread execution. + _queue.put(new_enable == nova._notifier_store.enable) + _queue.put(nova.NOTIFIER_ENABLE_DEFAULT == + nova._notifier_store.enable) + + num_threads = 20 + pool = eventlet.GreenPool(num_threads) + for idx in range(num_threads): + pool.spawn(_local_executor, idx) + pool.waitall() + try: + while True: + self.assertTrue(_queue.get(block=False)) + except queue.Empty: + pass