Merge "Making notification-queue use thread-safe" into stable/mitaka

This commit is contained in:
Jenkins
2017-01-24 01:33:15 +00:00
committed by Gerrit Code Review
4 changed files with 68 additions and 50 deletions

View File

@@ -45,23 +45,23 @@ def get_outer_transaction(transaction):
BATCH_NOTIFICATIONS = False
NOTIFICATION_QUEUE = {}
NOTIFIER_REF = 'notifier_object_reference'
NOTIFIER_METHOD = 'notifier_method_name'
NOTIFICATION_ARGS = 'notification_args'
def _queue_notification(transaction_key, notifier_obj, notifier_method, args):
def _queue_notification(session, transaction_key, notifier_obj,
notifier_method, args):
entry = {NOTIFIER_REF: notifier_obj, NOTIFIER_METHOD: notifier_method,
NOTIFICATION_ARGS: args}
if transaction_key not in NOTIFICATION_QUEUE:
NOTIFICATION_QUEUE[transaction_key] = [entry]
if transaction_key not in session.notification_queue:
session.notification_queue[transaction_key] = [entry]
else:
NOTIFICATION_QUEUE[transaction_key].append(entry)
session.notification_queue[transaction_key].append(entry)
def send_or_queue_notification(transaction_key, notifier_obj, notifier_method,
args):
def send_or_queue_notification(session, transaction_key, notifier_obj,
notifier_method, args):
if not transaction_key or 'subnet.delete.end' in args:
# We make an exception for the dhcp agent notification
# for subnet delete since the implementation for sending
@@ -72,19 +72,20 @@ def send_or_queue_notification(transaction_key, notifier_obj, notifier_method,
getattr(notifier_obj, notifier_method)(*args)
return
_queue_notification(transaction_key, notifier_obj, notifier_method, args)
_queue_notification(session, transaction_key, notifier_obj,
notifier_method, args)
def post_notifications_from_queue(transaction_key):
queue = NOTIFICATION_QUEUE[transaction_key]
def post_notifications_from_queue(session, transaction_key):
queue = session.notification_queue[transaction_key]
for entry in queue:
getattr(entry[NOTIFIER_REF],
entry[NOTIFIER_METHOD])(*entry[NOTIFICATION_ARGS])
del NOTIFICATION_QUEUE[transaction_key]
del session.notification_queue[transaction_key]
def discard_notifications_after_rollback(transaction_key):
NOTIFICATION_QUEUE.pop(transaction_key, None)
def discard_notifications_after_rollback(session):
session.notification_queue.pop(session.transaction, None)
class dummy_context_mgr(object):
@@ -164,14 +165,14 @@ class LocalAPI(object):
args = [action, orig_obj, {resource: obj}]
else:
args = [action, {}, {resource: obj}]
send_or_queue_notification(
send_or_queue_notification(context._session,
outer_transaction, self._nova_notifier,
'send_network_change', args)
# REVISIT(rkukura): Do create.end notification?
if cfg.CONF.dhcp_agent_notification:
action_state = ".%s.end" % action.split('_')[0]
args = [context, {resource: obj}, resource + action_state]
send_or_queue_notification(
send_or_queue_notification(context._session,
outer_transaction, self._dhcp_agent_notifier,
'notify', args)

View File

@@ -184,7 +184,6 @@ def get_port_from_device_mac(context, device_mac):
ml2_db.get_port_from_device_mac = get_port_from_device_mac
LOCAL_API_NOTIFICATION_QUEUE = None
PUSH_NOTIFICATIONS_METHOD = None
DISCARD_NOTIFICATIONS_METHOD = None
@@ -192,18 +191,14 @@ DISCARD_NOTIFICATIONS_METHOD = None
def get_session(autocommit=True, expire_on_commit=False, use_slave=False):
# The folowing are declared as global so that they can
# used in the inner functions that follow.
global LOCAL_API_NOTIFICATION_QUEUE
global PUSH_NOTIFICATIONS_METHOD
global DISCARD_NOTIFICATIONS_METHOD
# This conditional logic is to ensure that local_api
# is imported only once.
if 'local_api' not in locals():
from gbpservice.network.neutronv2 import local_api
LOCAL_API_NOTIFICATION_QUEUE = local_api.NOTIFICATION_QUEUE
PUSH_NOTIFICATIONS_METHOD = (
local_api.post_notifications_from_queue)
DISCARD_NOTIFICATIONS_METHOD = (
local_api.discard_notifications_after_rollback)
from gbpservice.network.neutronv2 import local_api
PUSH_NOTIFICATIONS_METHOD = (
local_api.post_notifications_from_queue)
DISCARD_NOTIFICATIONS_METHOD = (
local_api.discard_notifications_after_rollback)
# The following two lines are copied from the original
# implementation of db_api.get_session() and should be updated
# if the original implementation changes.
@@ -212,18 +207,19 @@ def get_session(autocommit=True, expire_on_commit=False, use_slave=False):
expire_on_commit=expire_on_commit,
use_slave=use_slave)
new_session.notification_queue = {}
def gbp_after_transaction(session, transaction):
global LOCAL_API_NOTIFICATION_QUEUE
if transaction and not transaction._parent and (
not transaction.is_active and not transaction.nested):
if transaction in (LOCAL_API_NOTIFICATION_QUEUE or []):
if transaction in session.notification_queue:
# push the queued notifications only when the
# outermost transaction completes
PUSH_NOTIFICATIONS_METHOD(transaction)
PUSH_NOTIFICATIONS_METHOD(session, transaction)
def gbp_after_rollback(session):
# We discard all queued notifiactions if the transaction fails.
DISCARD_NOTIFICATIONS_METHOD(session.transaction)
DISCARD_NOTIFICATIONS_METHOD(session)
if local_api.BATCH_NOTIFICATIONS:
event.listen(new_session, "after_transaction_end",

View File

@@ -1991,7 +1991,8 @@ class ApicMechanismDriver(api_plus.MechanismDriver):
LOG.debug("Enqueing notify for port %s", port['id'])
txn = local_api.get_outer_transaction(
plugin_context.session.transaction)
local_api.send_or_queue_notification(txn, self.notifier,
local_api.send_or_queue_notification(plugin_context.session,
txn, self.notifier,
'port_update',
[plugin_context, port])

View File

@@ -2837,6 +2837,7 @@ class NotificationTest(AIMBaseTestCase):
self.mac_prefix = '12:34:56:78:5d:'
self.queue_notification_call_count = 0
self.max_notification_queue_length = 0
self.notification_queue = None
self.post_notifications_from_queue_call_count = 0
self.orig_generate_uuid = uuidutils.generate_uuid
self.orig_is_uuid_like = uuidutils.is_uuid_like
@@ -2867,34 +2868,58 @@ class NotificationTest(AIMBaseTestCase):
sc_plugin=sc_plugin, **kwargs)
self.orig_generate_mac = self._plugin._generate_mac
self._plugin._generate_mac = _generate_mac
self.orig_queue_notification = local_api._queue_notification
# The two functions are patched below to instrument how
# The functions are patched below to instrument how
# many times the functions are called and also to track
# the queue length.
def _queue_notification(
def _queue_notification(session,
transaction_key, notifier_obj, notifier_method, args):
self.queue_notification_call_count += 1
self.orig_queue_notification(
self.orig_queue_notification(session,
transaction_key, notifier_obj, notifier_method, args)
if local_api.NOTIFICATION_QUEUE:
key = local_api.NOTIFICATION_QUEUE.keys()[0]
length = len(local_api.NOTIFICATION_QUEUE[key])
if session.notification_queue:
key = session.notification_queue.keys()[0]
length = len(session.notification_queue[key])
if length > self.max_notification_queue_length:
self.max_notification_queue_length = length
self.notification_queue = session.notification_queue
local_api._queue_notification = _queue_notification
self.orig_send_or_queue_notification = (
local_api.send_or_queue_notification)
def send_or_queue_notification(
session, transaction_key, notifier_obj, notifier_method, args):
self.orig_send_or_queue_notification(session,
transaction_key, notifier_obj, notifier_method, args)
self.notification_queue = session.notification_queue
local_api.send_or_queue_notification = send_or_queue_notification
self.orig_post_notifications_from_queue = (
local_api.post_notifications_from_queue)
def post_notifications_from_queue(transaction_key):
def post_notifications_from_queue(session, transaction_key):
self.post_notifications_from_queue_call_count += 1
self.orig_post_notifications_from_queue(transaction_key)
self.orig_post_notifications_from_queue(session, transaction_key)
self.notification_queue = session.notification_queue
local_api.post_notifications_from_queue = (
post_notifications_from_queue)
self.orig_discard_notifications_after_rollback = (
local_api.discard_notifications_after_rollback)
def discard_notifications_after_rollback(session):
self.orig_discard_notifications_after_rollback(session)
self.notification_queue = session.notification_queue
local_api.discard_notifications_after_rollback = (
discard_notifications_after_rollback)
def tearDown(self):
super(NotificationTest, self).tearDown()
self._plugin._generate_mac = self.orig_generate_mac
@@ -2902,8 +2927,12 @@ class NotificationTest(AIMBaseTestCase):
uuidutils.is_uuid_like = self.orig_is_uuid_like
local_api.BATCH_NOTIFICATIONS = False
local_api._queue_notification = self.orig_queue_notification
local_api.send_or_queue_notification = (
self.orig_send_or_queue_notification)
local_api.post_notifications_from_queue = (
self.orig_post_notifications_from_queue)
local_api.discard_notifications_after_rollback = (
self.orig_discard_notifications_after_rollback)
def _expected_dhcp_agent_call_list(self):
# This testing strategy assumes the sequence of notifications
@@ -3021,7 +3050,7 @@ class NotificationTest(AIMBaseTestCase):
'security-groups', sg_id).get_response(self.ext_api)
notifier.assert_has_calls(expected_calls(), any_order=False)
# test that no notifications have been left out
self.assertEqual({}, local_api.NOTIFICATION_QUEUE)
self.assertEqual({}, self.notification_queue)
def _disable_checks(self, no_batch_event, with_batch_event):
# this is a temporarty workaround to avoid having to repeatedly
@@ -3119,22 +3148,13 @@ class NotificationTest(AIMBaseTestCase):
self.assertEqual([], dhcp_notifier.call_args_list)
self.assertEqual([], nova_notifier.call_args_list)
# test that notification queue has been flushed
self.assertEqual({}, local_api.NOTIFICATION_QUEUE)
self.assertEqual({}, self.notification_queue)
# test that the push notifications func itself was not called
self.assertEqual(
0, self.post_notifications_from_queue_call_count)
# restore mock
self.dummy.create_policy_target_group_precommit = orig_func
def test_notifier_queue_populated(self):
local_api.BATCH_NOTIFICATIONS = True
with mock.patch.object(local_api, 'post_notifications_from_queue'):
self.create_policy_target_group(name="ptg1")
self.assertEqual(1, len(local_api.NOTIFICATION_QUEUE))
key = local_api.NOTIFICATION_QUEUE.keys()[0]
self.assertLess(0, len(local_api.NOTIFICATION_QUEUE[key]))
local_api.NOTIFICATION_QUEUE = {}
class TestExternalSegment(AIMBaseTestCase):