diff --git a/neutron/notifiers/nova.py b/neutron/notifiers/nova.py index 0f45e9837ad..a9b29b346ff 100644 --- a/neutron/notifiers/nova.py +++ b/neutron/notifiers/nova.py @@ -15,6 +15,7 @@ import contextlib +from eventlet.green import threading from keystoneauth1 import loading as ks_loading from neutron_lib.callbacks import events from neutron_lib.callbacks import registry @@ -45,6 +46,13 @@ NEUTRON_NOVA_EVENT_STATUS_MAP = {constants.PORT_STATUS_ACTIVE: 'completed', constants.PORT_STATUS_DOWN: 'completed'} NOVA_API_VERSION = "2.1" +NOTIFIER_ENABLE_DEFAULT = True +# NOTE(ralonsoh): the Nova notifier can be called simultaneously by several RPC +# callbacks from the agents (DHCP, L2), trying to update the provisioning +# status of a port. In order to handle each context notifier enable flag, a +# thread local variable is used. +_notifier_store = threading.local() + @registry.has_registry_receivers class Notifier(object): @@ -68,20 +76,14 @@ class Notifier(object): if ext.name == "server_external_events"] self.batch_notifier = batch_notifier.BatchNotifier( cfg.CONF.send_events_interval, self.send_events) - self._enabled = True @contextlib.contextmanager def context_enabled(self, enabled): - stored_enabled = self._enabled - LOG.debug("nova notifier stored_enabled: %s; enabled: %s", - stored_enabled, enabled) try: - self._enabled = enabled + _notifier_store.enable = enabled yield finally: - LOG.debug("Restoring value: %s for the _enabled flag.", - stored_enabled) - self._enabled = stored_enabled + _notifier_store.enable = NOTIFIER_ENABLE_DEFAULT def _get_nova_client(self): global_id = common_context.generate_request_id() @@ -180,7 +182,10 @@ class Notifier(object): return self._get_network_changed_event(port) def _can_notify(self, port): - if not self._enabled: + if getattr(_notifier_store, 'enable', None) is None: + _notifier_store.enable = NOTIFIER_ENABLE_DEFAULT + + if not _notifier_store.enable: LOG.debug("Nova notifier disabled") return False diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index 75c3f598c74..8bb30a52e36 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -365,14 +365,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, send_nova_event = bool(trigger == provisioning_blocks.L2_AGENT_ENTITY) with self.nova_notifier.context_enabled(send_nova_event): - LOG.debug("Updating port %s status to %s; " - "nova_notifier enabled: %s", - port_id, const.PORT_STATUS_ACTIVE, send_nova_event) self.update_port_status(payload.context, port_id, const.PORT_STATUS_ACTIVE) - LOG.debug("After port %s status update nova_notifier is now " - "restored to old value: %s", - port_id, self.nova_notifier._enabled) else: self.update_port_status(payload.context, port_id, const.PORT_STATUS_ACTIVE) diff --git a/neutron/tests/unit/notifiers/test_nova.py b/neutron/tests/unit/notifiers/test_nova.py index 5d065336a93..47d654194b8 100644 --- a/neutron/tests/unit/notifiers/test_nova.py +++ b/neutron/tests/unit/notifiers/test_nova.py @@ -13,8 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import queue from unittest import mock +import eventlet from neutron_lib import constants as n_const from neutron_lib import context as n_ctx from neutron_lib import exceptions as n_exc @@ -371,3 +373,35 @@ class TestNovaNotify(base.BaseTestCase): self.assertEqual( expected_event, self.nova_notifier.batch_notifier._pending_events.get()) + + def test_notify_concurrent_enable_flag_update(self): + # This test assumes Neutron server uses eventlet. + # NOTE(ralonsoh): the exceptions raise inside a thread won't stop the + # test. The checks are stored in "_queue" and tested at the end of the + # test execution. + _queue = eventlet.queue.Queue() + + def _local_executor(thread_idx): + # This thread has not yet initialized the local "enable" flag. + _queue.put(getattr(nova._notifier_store, 'enable', None) is None) + eventlet.sleep(0) # Next thread execution. + new_enable = bool(thread_idx % 2) + with self.nova_notifier.context_enabled(new_enable): + # At this point, the Nova Notifier should have updated the + # "enable" flag. + _queue.put(new_enable == nova._notifier_store.enable) + eventlet.sleep(0) # Next thread execution. + _queue.put(new_enable == nova._notifier_store.enable) + _queue.put(nova.NOTIFIER_ENABLE_DEFAULT == + nova._notifier_store.enable) + + num_threads = 20 + pool = eventlet.GreenPool(num_threads) + for idx in range(num_threads): + pool.spawn(_local_executor, idx) + pool.waitall() + try: + while True: + self.assertTrue(_queue.get(block=False)) + except queue.Empty: + pass