Use a thread local variable to store the Nova Notifier enable flag
The Nova Notifier can be called simultaneously by several RPC callbacks from the agents (DHCP, L2), trying to update the provisioning status of a port. In order to handle each context notifier enable flag, a thread local variable is used. This will isolate the flag update if two entities inform at the same time and one RPC callback is attended during the processing of the other one. This patch also removes the debug messages added to debug this issue. Closes-Bug: #1958363 Change-Id: Ie670fba4b3afe427747732d2c3948d92311e960e
This commit is contained in:
committed by
Rodolfo Alonso
parent
e7b70521d0
commit
507989fc62
@@ -15,6 +15,7 @@
|
||||
|
||||
import contextlib
|
||||
|
||||
from eventlet.green import threading
|
||||
from keystoneauth1 import loading as ks_loading
|
||||
from neutron_lib.callbacks import events
|
||||
from neutron_lib.callbacks import registry
|
||||
@@ -45,6 +46,13 @@ NEUTRON_NOVA_EVENT_STATUS_MAP = {constants.PORT_STATUS_ACTIVE: 'completed',
|
||||
constants.PORT_STATUS_DOWN: 'completed'}
|
||||
NOVA_API_VERSION = "2.1"
|
||||
|
||||
NOTIFIER_ENABLE_DEFAULT = True
|
||||
# NOTE(ralonsoh): the Nova notifier can be called simultaneously by several RPC
|
||||
# callbacks from the agents (DHCP, L2), trying to update the provisioning
|
||||
# status of a port. In order to handle each context notifier enable flag, a
|
||||
# thread local variable is used.
|
||||
_notifier_store = threading.local()
|
||||
|
||||
|
||||
@registry.has_registry_receivers
|
||||
class Notifier(object):
|
||||
@@ -68,20 +76,14 @@ class Notifier(object):
|
||||
if ext.name == "server_external_events"]
|
||||
self.batch_notifier = batch_notifier.BatchNotifier(
|
||||
cfg.CONF.send_events_interval, self.send_events)
|
||||
self._enabled = True
|
||||
|
||||
@contextlib.contextmanager
|
||||
def context_enabled(self, enabled):
|
||||
stored_enabled = self._enabled
|
||||
LOG.debug("nova notifier stored_enabled: %s; enabled: %s",
|
||||
stored_enabled, enabled)
|
||||
try:
|
||||
self._enabled = enabled
|
||||
_notifier_store.enable = enabled
|
||||
yield
|
||||
finally:
|
||||
LOG.debug("Restoring value: %s for the _enabled flag.",
|
||||
stored_enabled)
|
||||
self._enabled = stored_enabled
|
||||
_notifier_store.enable = NOTIFIER_ENABLE_DEFAULT
|
||||
|
||||
def _get_nova_client(self):
|
||||
global_id = common_context.generate_request_id()
|
||||
@@ -180,7 +182,10 @@ class Notifier(object):
|
||||
return self._get_network_changed_event(port)
|
||||
|
||||
def _can_notify(self, port):
|
||||
if not self._enabled:
|
||||
if getattr(_notifier_store, 'enable', None) is None:
|
||||
_notifier_store.enable = NOTIFIER_ENABLE_DEFAULT
|
||||
|
||||
if not _notifier_store.enable:
|
||||
LOG.debug("Nova notifier disabled")
|
||||
return False
|
||||
|
||||
|
||||
@@ -365,14 +365,8 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
||||
send_nova_event = bool(trigger ==
|
||||
provisioning_blocks.L2_AGENT_ENTITY)
|
||||
with self.nova_notifier.context_enabled(send_nova_event):
|
||||
LOG.debug("Updating port %s status to %s; "
|
||||
"nova_notifier enabled: %s",
|
||||
port_id, const.PORT_STATUS_ACTIVE, send_nova_event)
|
||||
self.update_port_status(payload.context, port_id,
|
||||
const.PORT_STATUS_ACTIVE)
|
||||
LOG.debug("After port %s status update nova_notifier is now "
|
||||
"restored to old value: %s",
|
||||
port_id, self.nova_notifier._enabled)
|
||||
else:
|
||||
self.update_port_status(payload.context, port_id,
|
||||
const.PORT_STATUS_ACTIVE)
|
||||
|
||||
@@ -13,8 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import queue
|
||||
from unittest import mock
|
||||
|
||||
import eventlet
|
||||
from neutron_lib import constants as n_const
|
||||
from neutron_lib import context as n_ctx
|
||||
from neutron_lib import exceptions as n_exc
|
||||
@@ -371,3 +373,35 @@ class TestNovaNotify(base.BaseTestCase):
|
||||
self.assertEqual(
|
||||
expected_event,
|
||||
self.nova_notifier.batch_notifier._pending_events.get())
|
||||
|
||||
def test_notify_concurrent_enable_flag_update(self):
|
||||
# This test assumes Neutron server uses eventlet.
|
||||
# NOTE(ralonsoh): the exceptions raise inside a thread won't stop the
|
||||
# test. The checks are stored in "_queue" and tested at the end of the
|
||||
# test execution.
|
||||
_queue = eventlet.queue.Queue()
|
||||
|
||||
def _local_executor(thread_idx):
|
||||
# This thread has not yet initialized the local "enable" flag.
|
||||
_queue.put(getattr(nova._notifier_store, 'enable', None) is None)
|
||||
eventlet.sleep(0) # Next thread execution.
|
||||
new_enable = bool(thread_idx % 2)
|
||||
with self.nova_notifier.context_enabled(new_enable):
|
||||
# At this point, the Nova Notifier should have updated the
|
||||
# "enable" flag.
|
||||
_queue.put(new_enable == nova._notifier_store.enable)
|
||||
eventlet.sleep(0) # Next thread execution.
|
||||
_queue.put(new_enable == nova._notifier_store.enable)
|
||||
_queue.put(nova.NOTIFIER_ENABLE_DEFAULT ==
|
||||
nova._notifier_store.enable)
|
||||
|
||||
num_threads = 20
|
||||
pool = eventlet.GreenPool(num_threads)
|
||||
for idx in range(num_threads):
|
||||
pool.spawn(_local_executor, idx)
|
||||
pool.waitall()
|
||||
try:
|
||||
while True:
|
||||
self.assertTrue(_queue.get(block=False))
|
||||
except queue.Empty:
|
||||
pass
|
||||
|
||||
Reference in New Issue
Block a user