[aim-mapping] Queue DHCP and Nova notifications

Currently, whenever resources are created by GBP (either GBP resources or
Neutron resources), DHCP and Nova notifications are sent immediately
after a specific resource creation completes, i.e. the transaction
handling that resource is complete. However, this transaction might
actually be a sub-transaction that is nested within other transactions
and the resource state is committed to the DB, only when the outermost
transaction completes.

This creates a problem in the aim-mapping policy driver which wraps every
user-facing operation in a big nested transaction, but the notifications
for implicitly created resources are sent before the outermost transaction
is committed.

The bug in question is one such manifestation of the problem where the
subnet create notification is sent to the DHCP agent as a part of the PTG
creation orchestration. The DHCP agent tries to validate the subnet by
checking the presence of the associated Neutron network, but neither the
subnet nor the network is yet committed to the DB since the outermost
transaction for the PTG creation is not yet complete.

This patch introduces a queueing framework for queueing DHCP agent and Nova
notifications, and then sending them once the outermost transaction (parent)
completes (by registering for Sqlalchemy transaction event hooks). The order
of the notifications is maintained when the notifications are eventually
dispatched.

This queueing scheme is turned ON only for the aim-mapping driver (as a part
of its initialization). Any other policy drivers wanting to leverage this
queueing feature should set the following attribute:

BATCH_NOTIFICATIONS

of the:

gbpservice/network/neutronv2/local_api.py

module to True.

Change-Id: I23a12eda7f98970431c8cbea4acad18f762b201d
Closes-bug: 1631833
This commit is contained in:
Sumit Naiksatam
2016-10-09 20:54:52 -07:00
parent ca0429730a
commit e553a03e23
4 changed files with 379 additions and 4 deletions

View File

@@ -34,6 +34,52 @@ from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
LOG = logging.getLogger(__name__)
def _get_outer_transaction(transaction):
if not transaction:
return
if not transaction._parent:
return transaction
else:
return _get_outer_transaction(transaction._parent)
BATCH_NOTIFICATIONS = False
NOTIFICATION_QUEUE = {}
NOTIFIER_REF = 'notifier_object_reference'
NOTIFIER_METHOD = 'notifier_method_name'
NOTIFICATION_ARGS = 'notification_args'
def _queue_notification(transaction_key, notifier_obj, notifier_method, args):
entry = {NOTIFIER_REF: notifier_obj, NOTIFIER_METHOD: notifier_method,
NOTIFICATION_ARGS: args}
if transaction_key not in NOTIFICATION_QUEUE:
NOTIFICATION_QUEUE[transaction_key] = [entry]
else:
NOTIFICATION_QUEUE[transaction_key].append(entry)
def send_or_queue_notification(transaction_key, notifier_obj, notifier_method,
args):
if not transaction_key:
getattr(notifier_obj, notifier_method)(*args)
return
_queue_notification(transaction_key, notifier_obj, notifier_method, args)
def post_notifications_from_queue(transaction_key):
queue = NOTIFICATION_QUEUE[transaction_key]
for entry in queue:
getattr(entry[NOTIFIER_REF],
entry[NOTIFIER_METHOD])(*entry[NOTIFICATION_ARGS])
del NOTIFICATION_QUEUE[transaction_key]
def discard_notifications_after_rollback(transaction_key):
NOTIFICATION_QUEUE.pop(transaction_key, None)
class dummy_context_mgr(object):
def __enter__(self):
@@ -135,12 +181,22 @@ class LocalAPI(object):
# explicit resource creation request, and hence the above
# method will be invoked in the API layer.
if do_notify:
self._nova_notifier.send_network_change(action, {},
{resource: obj})
if BATCH_NOTIFICATIONS and not clean_session:
outer_transaction = (_get_outer_transaction(
context._session.transaction))
else:
outer_transaction = None
args = [action, {}, {resource: obj}]
send_or_queue_notification(
outer_transaction, self._nova_notifier,
'send_network_change', args)
# REVISIT(rkukura): Do create.end notification?
if cfg.CONF.dhcp_agent_notification:
self._dhcp_agent_notifier.notify(
context, {resource: obj}, resource + '.create.end')
args = [context, {resource: obj},
resource + '.create.end']
send_or_queue_notification(
outer_transaction, self._dhcp_agent_notifier,
'notify', args)
return obj
def _update_resource(self, plugin, context, resource, resource_id, attrs,

View File

@@ -15,11 +15,13 @@ from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as l3_constants
from neutron.db import api as db_api
from neutron.db import l3_db
from neutron.db import models_v2
from neutron.db import securitygroups_db
from neutron.plugins.ml2 import db as ml2_db
from oslo_log import log
from sqlalchemy import event
LOG = log.getLogger(__name__)
@@ -181,3 +183,55 @@ def get_port_from_device_mac(context, device_mac):
return qry.first()
ml2_db.get_port_from_device_mac = get_port_from_device_mac
LOCAL_API_NOTIFICATION_QUEUE = None
PUSH_NOTIFICATIONS_METHOD = None
DISCARD_NOTIFICATIONS_METHOD = None
def get_session(autocommit=True, expire_on_commit=False, use_slave=False):
# The folowing are declared as global so that they can
# used in the inner functions that follow.
global LOCAL_API_NOTIFICATION_QUEUE
global PUSH_NOTIFICATIONS_METHOD
global DISCARD_NOTIFICATIONS_METHOD
# This conditional logic is to ensure that local_api
# is imported only once.
if 'local_api' not in locals():
from gbpservice.network.neutronv2 import local_api
LOCAL_API_NOTIFICATION_QUEUE = local_api.NOTIFICATION_QUEUE
PUSH_NOTIFICATIONS_METHOD = (
local_api.post_notifications_from_queue)
DISCARD_NOTIFICATIONS_METHOD = (
local_api.discard_notifications_after_rollback)
# The following two lines are copied from the original
# implementation of db_api.get_session() and should be updated
# if the original implementation changes.
facade = db_api._create_facade_lazily()
new_session = facade.get_session(autocommit=autocommit,
expire_on_commit=expire_on_commit,
use_slave=use_slave)
def gbp_after_transaction(session, transaction):
global LOCAL_API_NOTIFICATION_QUEUE
if transaction and not transaction._parent and (
not transaction.is_active and not transaction.nested):
if transaction in LOCAL_API_NOTIFICATION_QUEUE:
# push the queued notifications only when the
# outermost transaction completes
PUSH_NOTIFICATIONS_METHOD(transaction)
def gbp_after_rollback(session):
# We discard all queued notifiactions if the transaction fails.
DISCARD_NOTIFICATIONS_METHOD(session.transaction)
if local_api.BATCH_NOTIFICATIONS:
event.listen(new_session, "after_transaction_end",
gbp_after_transaction)
event.listen(new_session, "after_rollback",
gbp_after_rollback)
return new_session
db_api.get_session = get_session

View File

@@ -13,6 +13,7 @@
from neutron import manager
from oslo_log import helpers as log
from gbpservice.network.neutronv2 import local_api
from gbpservice.neutron.services.grouppolicy.common import exceptions as exc
from gbpservice.neutron.services.grouppolicy.drivers import (
implicit_policy as ipd)
@@ -33,6 +34,7 @@ class CommonNeutronBase(ipd.ImplicitPolicyBase, rmd.OwnedResourcesOperations,
# REVISIT: Check if this is still required
self._cached_agent_notifier = None
self._gbp_plugin = None
local_api.BATCH_NOTIFICATIONS = True
super(CommonNeutronBase, self).initialize()
@property

View File

@@ -19,12 +19,16 @@ from aim.api import status as aim_status
from aim import context as aim_context
from aim.db import model_base as aim_model_base
from keystoneclient.v3 import client as ksc_client
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron import context as nctx
from neutron.db import api as db_api
from neutron.notifiers import nova
from neutron.tests.unit.extensions import test_address_scope
from opflexagent import constants as ocst
from oslo_utils import uuidutils
import webob.exc
from gbpservice.network.neutronv2 import local_api
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import model
from gbpservice.neutron.services.grouppolicy.common import (
constants as gp_const)
@@ -1640,3 +1644,262 @@ class TestPolicyRuleSetRollback(TestPolicyRuleSetBase):
# restore mock
self.dummy.delete_policy_rule_set_precommit = orig_func
class NotificationTest(AIMBaseTestCase):
def setUp(self, policy_drivers=None, core_plugin=None, ml2_options=None,
l3_plugin=None, sc_plugin=None, **kwargs):
self.fake_uuid = 0
self.mac_prefix = '12:34:56:78:5d:'
self.queue_notification_call_count = 0
self.max_notification_queue_length = 0
self.post_notifications_from_queue_call_count = 0
self.orig_generate_uuid = uuidutils.generate_uuid
self.orig_is_uuid_like = uuidutils.is_uuid_like
# The following three functions are patched so that
# the same worflow can be run more than once in a single
# test and will result in objects created that are
# identical in all their attribute values.
# The workflow is exercised once with batching turned
# OFF, and once with batching turned ON.
def generate_uuid():
self.fake_uuid += 1
return str(self.fake_uuid)
def is_uuid_like(val):
return True
def _generate_mac():
lsb = 10 + self.fake_uuid
return self.mac_prefix + str(lsb)
uuidutils.generate_uuid = generate_uuid
uuidutils.is_uuid_like = is_uuid_like
super(NotificationTest, self).setUp(
policy_drivers=policy_drivers, core_plugin=core_plugin,
ml2_options=ml2_options, l3_plugin=l3_plugin,
sc_plugin=sc_plugin, **kwargs)
self.orig_generate_mac = self._plugin._generate_mac
self._plugin._generate_mac = _generate_mac
self.orig_queue_notification = local_api._queue_notification
# The two functions are patched below to instrument how
# many times the functions are called and also to track
# the queue length.
def _queue_notification(
transaction_key, notifier_obj, notifier_method, args):
self.queue_notification_call_count += 1
self.orig_queue_notification(
transaction_key, notifier_obj, notifier_method, args)
if local_api.NOTIFICATION_QUEUE:
key = local_api.NOTIFICATION_QUEUE.keys()[0]
length = len(local_api.NOTIFICATION_QUEUE[key])
if length > self.max_notification_queue_length:
self.max_notification_queue_length = length
local_api._queue_notification = _queue_notification
self.orig_post_notifications_from_queue = (
local_api.post_notifications_from_queue)
def post_notifications_from_queue(transaction_key):
self.post_notifications_from_queue_call_count += 1
self.orig_post_notifications_from_queue(transaction_key)
local_api.post_notifications_from_queue = (
post_notifications_from_queue)
def tearDown(self):
super(NotificationTest, self).tearDown()
self._plugin._generate_mac = self.orig_generate_mac
uuidutils.generate_uuid = self.orig_generate_uuid
uuidutils.is_uuid_like = self.orig_is_uuid_like
local_api.BATCH_NOTIFICATIONS = False
local_api._queue_notification = self.orig_queue_notification
local_api.post_notifications_from_queue = (
self.orig_post_notifications_from_queue)
def _expected_dhcp_agent_call_list(self):
# This testing strategy assumes the sequence of notifications
# that result from the sequence of operations currently
# performed. If the internal orchestration logic changes resulting
# in a change in the sequence of operations, the following
# list should be updated accordingly.
# The 2nd argument is the resource object that is created,
# and can be potentially verified for further detail
calls = [
mock.call().notify(mock.ANY, mock.ANY,
"address_scope.create.end"),
mock.call().notify(mock.ANY, mock.ANY,
"subnetpool.create.end"),
mock.call().notify(mock.ANY, mock.ANY, "router.create.end"),
mock.call().notify(mock.ANY, mock.ANY, "network.create.end"),
mock.call().notify(mock.ANY, mock.ANY, "subnet.create.end"),
mock.call().notify(mock.ANY, mock.ANY,
"policy_target_group.create.end"),
mock.call().notify(mock.ANY, mock.ANY, "port.create.end"),
mock.call().notify(mock.ANY, mock.ANY,
"policy_target.create.end"),
mock.call().notify(mock.ANY, mock.ANY, "port.delete.end"),
mock.call().notify(mock.ANY, mock.ANY,
"policy_target.delete.end"),
mock.call().notify(mock.ANY, mock.ANY, "port.delete.end"),
mock.call().notify(mock.ANY, mock.ANY, "subnet.delete.end"),
mock.call().notify(mock.ANY, mock.ANY, "network.delete.end"),
mock.call().notify(mock.ANY, mock.ANY,
"subnetpool.delete.end"),
mock.call().notify(mock.ANY, mock.ANY,
"address_scope.delete.end"),
mock.call().notify(mock.ANY, mock.ANY, "router.delete.end"),
mock.call().notify(mock.ANY, mock.ANY,
"policy_target_group.delete.end"),
mock.call().notify(mock.ANY, mock.ANY,
"security_group.delete.end")]
return calls
def _expected_nova_call_list(self):
# This testing strategy assumes the sequence of notifications
# that result from the sequence of operations currently
# performed. If the internal orchestration logic changes resulting
# in a change in the sequence of operations, the following
# list should be updated accordingly.
# The 2nd argument is the resource object that is created,
# and can be potentially verified for further detail
calls = [
mock.call().notify("create_address_scope", mock.ANY, mock.ANY),
mock.call().notify("create_subnetpool", mock.ANY, mock.ANY),
mock.call().notify("create_router", mock.ANY, mock.ANY),
mock.call().notify("create_network", mock.ANY, mock.ANY),
mock.call().notify("create_subnet", mock.ANY, mock.ANY),
mock.call().notify("create_policy_target_group",
mock.ANY, mock.ANY),
mock.call().notify("create_port", mock.ANY, mock.ANY),
mock.call().notify("create_policy_target", mock.ANY, mock.ANY),
mock.call().notify("delete_port", mock.ANY, mock.ANY),
mock.call().notify("delete_policy_target", mock.ANY, mock.ANY),
mock.call().notify("delete_subnet", mock.ANY, mock.ANY),
mock.call().notify("delete_network", mock.ANY, mock.ANY),
mock.call().notify("delete_subnetpool", mock.ANY, mock.ANY),
mock.call().notify("delete_address_scope", mock.ANY, mock.ANY),
mock.call().notify("delete_router", mock.ANY, mock.ANY),
mock.call().notify("delete_policy_target_group",
mock.ANY, mock.ANY),
mock.call().notify("delete_security_group",
mock.ANY, mock.ANY)]
return calls
def _test_notifier(self, notifier, expected_calls,
batch_notifications=False):
local_api.BATCH_NOTIFICATIONS = batch_notifications
ptg = self.create_policy_target_group(name="ptg1")
ptg_id = ptg['policy_target_group']['id']
pt = self.create_policy_target(
name="pt1", policy_target_group_id=ptg_id)['policy_target']
self.assertEqual(pt['policy_target_group_id'], ptg_id)
self.new_delete_request(
'policy_targets', pt['id']).get_response(self.ext_api)
self.new_delete_request(
'policy_target_groups', ptg_id).get_response(self.ext_api)
sg_rules = self._plugin.get_security_group_rules(
self._neutron_context)
sg_id = sg_rules[0]['security_group_id']
self.new_delete_request(
'security-groups', sg_id).get_response(self.ext_api)
notifier.assert_has_calls(expected_calls(), any_order=False)
# test that no notifications have been left out
self.assertEqual({}, local_api.NOTIFICATION_QUEUE)
def test_dhcp_notifier(self):
with mock.patch.object(dhcp_rpc_agent_api.DhcpAgentNotifyAPI,
'notify') as dhcp_notifier_no_batch:
self._test_notifier(dhcp_notifier_no_batch,
self._expected_dhcp_agent_call_list, False)
self.assertEqual(0, self.queue_notification_call_count)
self.assertEqual(0, self.max_notification_queue_length)
self.assertEqual(0, self.post_notifications_from_queue_call_count)
self.fake_uuid = 0
with mock.patch.object(dhcp_rpc_agent_api.DhcpAgentNotifyAPI,
'notify') as dhcp_notifier_with_batch:
self._test_notifier(dhcp_notifier_with_batch,
self._expected_dhcp_agent_call_list, True)
self.assertLess(0, self.queue_notification_call_count)
self.assertLess(0, self.max_notification_queue_length)
# Two resources (PTG and PT) are created in the _test_notifier
# function via the tenant API, hence two batches of notifications
# should be sent
self.assertEqual(2, self.post_notifications_from_queue_call_count)
for n1, n2 in zip(dhcp_notifier_no_batch.call_args_list,
dhcp_notifier_with_batch.call_args_list):
# test the resource objects are identical with and without batch
self.assertEqual(n1[0][1], n2[0][1])
# test that all the same events are pushed with and without batch
self.assertEqual(n1[0][2], n2[0][2])
def test_nova_notifier(self):
with mock.patch.object(nova.Notifier,
'send_network_change') as nova_notifier_nobatch:
self._test_notifier(nova_notifier_nobatch,
self._expected_nova_call_list, False)
self.assertEqual(0, self.queue_notification_call_count)
self.assertEqual(0, self.max_notification_queue_length)
self.assertEqual(0, self.post_notifications_from_queue_call_count)
self.fake_uuid = 0
with mock.patch.object(nova.Notifier,
'send_network_change') as nova_notifier_batch:
self._test_notifier(nova_notifier_batch,
self._expected_nova_call_list, True)
self.assertLess(0, self.queue_notification_call_count)
self.assertLess(0, self.max_notification_queue_length)
# Two resources (PTG and PT) are created in the _test_notifier
# function via the tenant API, hence two batches of notifications
# should be sent
self.assertEqual(2, self.post_notifications_from_queue_call_count)
for n1, n2 in zip(nova_notifier_nobatch.call_args_list,
nova_notifier_batch.call_args_list):
# test the resource objects are identical with and without batch
self.assertEqual(n1[0][1], n2[0][1])
# test that all the same events are pushed with and without batch
self.assertEqual(n1[0][2], n2[0][2])
def test_notifiers_with_transaction_rollback(self):
# No notifications should get pushed in this case
orig_func = self.dummy.create_policy_target_group_precommit
self.dummy.create_policy_target_group_precommit = mock.Mock(
side_effect=Exception)
local_api.BATCH_NOTIFICATIONS = True
with mock.patch.object(dhcp_rpc_agent_api.DhcpAgentNotifyAPI,
'notify') as dhcp_notifier:
with mock.patch.object(nova.Notifier,
'send_network_change') as nova_notifier:
self.create_policy_target_group(name="ptg1",
expected_res_status=500)
# test that notifier was not called
self.assertEqual([], dhcp_notifier.call_args_list)
self.assertEqual([], nova_notifier.call_args_list)
# test that notification queue has been flushed
self.assertEqual({}, local_api.NOTIFICATION_QUEUE)
# test that the push notifications func itself was not called
self.assertEqual(
0, self.post_notifications_from_queue_call_count)
# restore mock
self.dummy.create_policy_target_group_precommit = orig_func
def test_notifier_queue_populated(self):
local_api.BATCH_NOTIFICATIONS = True
with mock.patch.object(local_api, 'post_notifications_from_queue'):
self.create_policy_target_group(name="ptg1")
self.assertEqual(1, len(local_api.NOTIFICATION_QUEUE))
key = local_api.NOTIFICATION_QUEUE.keys()[0]
self.assertLess(0, len(local_api.NOTIFICATION_QUEUE[key]))
local_api.NOTIFICATION_QUEUE = {}