Pecan: rework notifier hook for registry callbacks

[1] and [2] were added to use the registry callback notifier for dhcp and nova
notifications for the legacy wsgi layer.  This adds the same functionality
to the pecan wsgi layer.  It just so happens to clean the code up nicely
and also fixes a bug that was introduced by [3] that caused an ever
increasing number of subscriptions to the registry callback notifier.

[1] I7440becb6d30af7159ecaeba09d7a28eceb71bea
[2] I1d7d4b80ee77deefce18df22f76cab81750c0397
[3] I607635601caff0322fd0c80c9023f5c4f663ca25

Change-Id: I0a8b64e7742283a9d6c6b42ebc27887836df69ec
Closes-Bug: #1633296
This commit is contained in:
Brandon Logan 2016-10-14 18:19:54 -05:00
parent 80d4df144d
commit 7fdb98cf17
3 changed files with 93 additions and 256 deletions

View File

@ -13,15 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import constants
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import jsonutils
from pecan import hooks
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.common import rpc as n_rpc
from neutron import manager
from neutron.pecan_wsgi import constants as pecan_constants
from neutron.pecan_wsgi.hooks import utils
@ -37,36 +34,6 @@ class NotifierHook(hooks.PecanHook):
self._notifier_inst = n_rpc.get_notifier('network')
return self._notifier_inst
def _nova_notify(self, action, resource, *args):
action_resource = '%s_%s' % (action, resource)
if not hasattr(self, '_nova_notifier'):
# this is scoped to avoid a dependency on nova client when nova
# notifications aren't enabled
from neutron.notifiers import nova
self._nova_notifier = nova.Notifier.get_instance()
self._nova_notifier.send_network_change(action_resource, *args)
def _notify_dhcp_agent(self, context, resource_name, action, resources):
# NOTE(kevinbenton): we should remove this whole method in Ocata and
# make plugins emit the core resource events
plugin = manager.NeutronManager.get_plugin_for_resource(resource_name)
notifier_method = '%s.%s.end' % (resource_name, action)
# use plugin's dhcp notifier, if this is already instantiated
agent_notifiers = getattr(plugin, 'agent_notifiers', {})
dhcp_agent_notifier = (
agent_notifiers.get(constants.AGENT_TYPE_DHCP) or
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
)
native_map = getattr(dhcp_agent_notifier, 'uses_native_notifications',
{})
if native_map.get(resource_name, {}).get(action):
return
# The DHCP Agent does not accept bulk notifications
for resource in resources:
item = {resource_name: resource}
LOG.debug("Sending DHCP agent notification for: %s", item)
dhcp_agent_notifier.notify(context, item, notifier_method)
def before(self, state):
if state.request.method not in ('POST', 'PUT', 'DELETE'):
return
@ -98,7 +65,7 @@ class NotifierHook(hooks.PecanHook):
"resource associated with the request")
return
action = pecan_constants.ACTION_MAP.get(state.request.method)
if not action or action == 'get':
if not action or action not in ('create', 'update', 'delete'):
LOG.debug("No notification will be sent for action: %s", action)
return
if utils.is_member_action(utils.get_controller(state)):
@ -108,57 +75,31 @@ class NotifierHook(hooks.PecanHook):
"status code: %s", state.response.status_int)
return
original = {}
if (action in ('delete', 'update') and
state.request.context.get('original_resources', [])):
# We only need the original resource for updates and deletes
original = state.request.context.get('original_resources')[0]
if action == 'delete':
# The object has been deleted, so we must notify the agent with the
# data of the original object
data = {collection_name:
state.request.context.get('original_resources', [])}
# data of the original object as the payload, but we do not need
# to pass it in as the original
result = {resource_name: original}
original = {}
else:
try:
data = jsonutils.loads(state.response.body)
except ValueError:
if not state.response.body:
data = {}
resources = []
if data:
if resource_name in data:
resources = [data[resource_name]]
elif collection_name in data:
# This was a bulk request
resources = data[collection_name]
# Send a notification only if a resource can be identified in the
# response. This means that for operations such as add_router_interface
# no notification will be sent
if cfg.CONF.dhcp_agent_notification and data:
self._notify_dhcp_agent(
neutron_context, resource_name,
action, resources)
if cfg.CONF.notify_nova_on_port_data_changes:
orig = {}
if action == 'update':
orig = state.request.context.get('original_resources')[0]
elif action == 'delete':
# NOTE(kevinbenton): the nova notifier is a bit strange because
# it expects the original to be in the last argument on a
# delete rather than in the 'original_obj' position
resources = (
state.request.context.get('original_resources') or [])
for resource in resources:
self._nova_notify(action, resource_name, orig,
{resource_name: resource})
result = {}
else:
result = state.response.json
notifier_method = '%s.%s.end' % (resource_name, action)
notifier_action = utils.get_controller(state).plugin_handlers[action]
registry.notify(resource_name, events.BEFORE_RESPONSE, self,
context=neutron_context, data=result,
method_name=notifier_method, action=notifier_action,
collection=collection_name, original=original)
event = '%s.%s.end' % (resource_name, action)
if action == 'delete':
resource_id = state.request.context.get('resource_id')
payload = {resource_name + '_id': resource_id}
elif action in ('create', 'update'):
if not resources:
# create/update did not complete so no notification
return
if len(resources) > 1:
payload = {collection_name: resources}
else:
payload = {resource_name: resources[0]}
else:
return
self._notifier.info(neutron_context, event, payload)
result = {resource_name + '_id': resource_id}
self._notifier.info(neutron_context, notifier_method, result)

View File

@ -487,7 +487,7 @@ class TestRequestProcessing(TestRootController):
def setUp(self):
super(TestRequestProcessing, self).setUp()
mock.patch('neutron.pecan_wsgi.hooks.notifier.registry').start()
# request.context is thread-local storage so it has to be accessed by
# the controller. We can capture it into a list here to assert on after
# the request finishes.

View File

@ -14,17 +14,15 @@
# under the License.
import mock
from oslo_config import cfg
from oslo_policy import policy as oslo_policy
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron import context
from neutron.db.quota import driver as quota_driver
from neutron import manager
from neutron.pecan_wsgi.controllers import resource
from neutron.pecan_wsgi.hooks import policy_enforcement as pe
from neutron import policy
from neutron.tests.functional.pecan_wsgi import test_functional
@ -222,174 +220,6 @@ class TestPolicyEnforcementHook(test_functional.PecanFunctionalTest):
self.assertNotIn('restricted_attr', json_response['mehs'][0])
class DHCPNotifierTestBase(test_functional.PecanFunctionalTest):
def setUp(self):
# the DHCP notifier needs to be mocked so that correct operations can
# be easily validated. For the purpose of this test it is indeed not
# necessary that the notification is actually received and processed by
# the agent
patcher = mock.patch('neutron.api.rpc.agentnotifiers.'
'dhcp_rpc_agent_api.DhcpAgentNotifyAPI.notify')
self.mock_notifier = patcher.start()
super(DHCPNotifierTestBase, self).setUp()
class TestDHCPNotifierHookNegative(DHCPNotifierTestBase):
def setUp(self):
cfg.CONF.set_override('dhcp_agent_notification', False)
super(TestDHCPNotifierHookNegative, self).setUp()
def test_dhcp_notifications_disabled(self):
self.app.post_json(
'/v2.0/networks.json',
params={'network': {'name': 'meh'}},
headers={'X-Project-Id': 'tenid'})
self.assertEqual(0, self.mock_notifier.call_count)
class TestDHCPNotifierHook(DHCPNotifierTestBase):
def test_get_does_not_trigger_notification(self):
self.do_request('/v2.0/networks', tenant_id='tenid')
self.assertEqual(0, self.mock_notifier.call_count)
def test_post_put_delete_triggers_notification(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
response = self.app.post_json(
'/v2.0/networks.json',
params={'network': {'name': 'meh'}}, headers=req_headers)
self.assertEqual(201, response.status_int)
json_body = jsonutils.loads(response.body)
net = {'network': plugin.get_network(ctx, json_body['network']['id'])}
self.assertEqual(1, self.mock_notifier.call_count)
self.assertEqual(mock.call(mock.ANY, net, 'network.create.end'),
self.mock_notifier.mock_calls[-1])
network_id = json_body['network']['id']
response = self.app.put_json(
'/v2.0/networks/%s.json' % network_id,
params={'network': {'name': 'meh-2'}},
headers=req_headers)
self.assertEqual(200, response.status_int)
json_body = jsonutils.loads(response.body)
net = {'network': plugin.get_network(ctx, json_body['network']['id'])}
self.assertEqual(2, self.mock_notifier.call_count)
self.assertEqual(mock.call(mock.ANY, net, 'network.update.end'),
self.mock_notifier.mock_calls[-1])
response = self.app.delete(
'/v2.0/networks/%s.json' % network_id, headers=req_headers)
self.assertEqual(204, response.status_int)
self.assertEqual(3, self.mock_notifier.call_count)
# No need to validate data content sent to the notifier as it's just
# going to load the object from the database
self.assertEqual(mock.call(mock.ANY, mock.ANY, 'network.delete.end'),
self.mock_notifier.mock_calls[-1])
def test_bulk_create_triggers_notifications(self):
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
response = self.app.post_json(
'/v2.0/networks.json',
params={'networks': [{'name': 'meh_1'},
{'name': 'meh_2'}]},
headers=req_headers)
self.assertEqual(201, response.status_int)
json_body = jsonutils.loads(response.body)
item_1 = json_body['networks'][0]
item_2 = json_body['networks'][1]
self.assertEqual(2, self.mock_notifier.call_count)
self.mock_notifier.assert_has_calls(
[mock.call(mock.ANY, {'network': item_1}, 'network.create.end'),
mock.call(mock.ANY, {'network': item_2}, 'network.create.end')])
class TestNovaNotifierHook(test_functional.PecanFunctionalTest):
def setUp(self):
patcher = mock.patch('neutron.pecan_wsgi.hooks.notifier.NotifierHook.'
'_nova_notify')
self.mock_notifier = patcher.start()
super(TestNovaNotifierHook, self).setUp()
def test_nova_notification_skips_on_failure(self):
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
response = self.app.put_json(
'/v2.0/networks/%s.json' % uuidutils.generate_uuid(),
params={'network': {'name': 'meh-2'}},
headers=req_headers,
expect_errors=True)
self.assertEqual(404, response.status_int)
self.assertFalse(self.mock_notifier.called)
def test_nova_notifications_disabled(self):
cfg.CONF.set_override('notify_nova_on_port_data_changes', False)
self.app.post_json(
'/v2.0/networks.json',
params={'network': {'name': 'meh'}},
headers={'X-Project-Id': 'tenid'})
self.assertFalse(self.mock_notifier.called)
def test_post_put_delete_triggers_notification(self):
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
response = self.app.post_json(
'/v2.0/networks.json',
params={'network': {'name': 'meh'}}, headers=req_headers)
self.assertEqual(201, response.status_int)
json_body = jsonutils.loads(response.body)
self.mock_notifier.assert_called_once_with('create', 'network', {},
json_body)
self.mock_notifier.reset_mock()
network_id = json_body['network']['id']
# NOTE(kevinbenton): the original passed into the notifier does
# not contain all of the fields of the object. Only those required
# by the policy engine are included.
controller = manager.NeutronManager.get_controller_for_resource(
'networks')
orig = pe.fetch_resource(context.get_admin_context(), controller,
'network', network_id)
response = self.app.put_json(
'/v2.0/networks/%s.json' % network_id,
params={'network': {'name': 'meh-2'}},
headers=req_headers)
self.assertEqual(200, response.status_int)
json_body = jsonutils.loads(response.body)
self.mock_notifier.assert_called_once_with('update', 'network',
orig, json_body)
self.mock_notifier.reset_mock()
orig = pe.fetch_resource(context.get_admin_context(), controller,
'network', network_id)
response = self.app.delete(
'/v2.0/networks/%s.json' % network_id, headers=req_headers)
self.assertEqual(204, response.status_int)
# No need to validate data content sent to the notifier as it's just
# going to load the object from the database
self.mock_notifier.assert_called_once_with('delete', 'network', {},
{'network': orig})
def test_bulk_create_triggers_notifications(self):
req_headers = {'X-Project-Id': 'tenid', 'X-Roles': 'admin'}
response = self.app.post_json(
'/v2.0/networks.json',
params={'networks': [{'name': 'meh_1'},
{'name': 'meh_2'}]},
headers=req_headers)
self.assertEqual(201, response.status_int)
json_body = jsonutils.loads(response.body)
item_1 = json_body['networks'][0]
item_2 = json_body['networks'][1]
self.assertEqual(
[mock.call('create', 'network', {}, {'network': item_1}),
mock.call('create', 'network', {}, {'network': item_2})],
self.mock_notifier.mock_calls)
class TestMetricsNotifierHook(test_functional.PecanFunctionalTest):
def setUp(self):
@ -507,3 +337,69 @@ class TestMetricsNotifierHook(test_functional.PecanFunctionalTest):
self.assertEqual(
[mock.call(mock.ANY, 'network.delete.start', mock.ANY)],
self.mock_notifier.mock_calls)
class TestCallbackRegistryNotifier(test_functional.PecanFunctionalTest):
def setUp(self):
super(TestCallbackRegistryNotifier, self).setUp()
patcher = mock.patch('neutron.pecan_wsgi.hooks.notifier.registry')
self.mock_notifier = patcher.start().notify
def _create(self, bulk=False):
if bulk:
body = {'networks': [{'name': 'meh-1'}, {'name': 'meh-2'}]}
else:
body = {'network': {'name': 'meh-1'}}
response = self.app.post_json(
'/v2.0/networks.json',
params=body, headers={'X-Project-Id': 'tenid'})
return response.json
def test_create(self):
self._create()
self.mock_notifier.assert_called_once_with(
'network', events.BEFORE_RESPONSE, mock.ANY, context=mock.ANY,
data=mock.ANY, method_name='network.create.end',
action='create_network', collection='networks', original={})
actual = self.mock_notifier.call_args[1]['data']
self.assertEqual('meh-1', actual['network']['name'])
def test_create_bulk(self):
self._create(bulk=True)
self.mock_notifier.assert_called_once_with(
'network', events.BEFORE_RESPONSE, mock.ANY, context=mock.ANY,
data=mock.ANY, method_name='network.create.end',
action='create_network', collection='networks', original={})
actual = self.mock_notifier.call_args[1]['data']
self.assertEqual(2, len(actual['networks']))
self.assertEqual('meh-1', actual['networks'][0]['name'])
self.assertEqual('meh-2', actual['networks'][1]['name'])
def test_update(self):
network_id = self._create()['network']['id']
self.mock_notifier.reset_mock()
self.app.put_json('/v2.0/networks/%s.json' % network_id,
params={'network': {'name': 'new-meh'}},
headers={'X-Project-Id': 'tenid'})
self.mock_notifier.assert_called_once_with(
'network', events.BEFORE_RESPONSE, mock.ANY, context=mock.ANY,
data=mock.ANY, method_name='network.update.end',
action='update_network', collection='networks', original=mock.ANY)
actual_new = self.mock_notifier.call_args[1]['data']
self.assertEqual('new-meh', actual_new['network']['name'])
actual_original = self.mock_notifier.call_args[1]['original']
self.assertEqual(network_id, actual_original['id'])
def test_delete(self):
network_id = self._create()['network']['id']
self.mock_notifier.reset_mock()
self.app.delete(
'/v2.0/networks/%s.json' % network_id,
headers={'X-Project-Id': 'tenid'})
self.mock_notifier.assert_called_once_with(
'network', events.BEFORE_RESPONSE, mock.ANY, context=mock.ANY,
data=mock.ANY, method_name='network.delete.end',
action='delete_network', collection='networks', original={})
actual = self.mock_notifier.call_args[1]['data']
self.assertEqual(network_id, actual['network']['id'])