Pass context and resource_type in RPC callback

Maintaining the context is important for keeping the request ID
and subsequently operator/developer sanity while debugging.
The resource_type is also helpful to have since a function could be
subscribed for multiple resources.

This maintains and deprecates the existing 'subscribe' method for
backwards compatibility with callbacks that don't support receiving
the context and resource type. A new 'register' method is added
for callbacks to use that are compatible with receiving the context.

Change-Id: I06c8302951c99039b532acd9f2a68d5b989fdab5
This commit is contained in:
Kevin Benton 2016-11-29 07:01:07 -08:00
parent f448cccabb
commit cf6ffb78f6
13 changed files with 93 additions and 41 deletions

View File

@ -211,16 +211,16 @@ The agent code processing port updates may look like::
from neutron.api.rpc.callbacks import resources
def process_resource_updates(resource_type, resource_list, event_type):
def process_resource_updates(context, resource_type, resource_list, event_type):
# send to the right handler which will update any control plane
# details related to the updated resources...
def subscribe_resources():
registry.subscribe(process_resource_updates, resources.SEC_GROUP)
registry.register(process_resource_updates, resources.SEC_GROUP)
registry.subscribe(process_resource_updates, resources.QOS_POLICY)
registry.register(process_resource_updates, resources.QOS_POLICY)
def port_update(port):
@ -232,11 +232,12 @@ The agent code processing port updates may look like::
The relevant function is:
* subscribe(callback, resource_type): subscribes callback to a resource type.
* register(callback, resource_type): subscribes callback to a resource type.
The callback function will receive the following arguments:
* context: the neutron context that triggered the notification.
* resource_type: the type of resource which is receiving the update.
* resource_list: list of resources which have been pushed by server.
* event_type: will be one of CREATED, UPDATED, or DELETED, see

View File

@ -215,12 +215,13 @@ class QosAgentExtension(l2_agent_extension.L2AgentExtension):
for resource_type in self.SUPPORTED_RESOURCE_TYPES:
# We assume that the neutron server always broadcasts the latest
# version known to the agent
registry.subscribe(self._handle_notification, resource_type)
registry.register(self._handle_notification, resource_type)
topic = resources_rpc.resource_type_versioned_topic(resource_type)
connection.create_consumer(topic, endpoints, fanout=True)
@lockutils.synchronized('qos-port')
def _handle_notification(self, qos_policies, event_type):
def _handle_notification(self, context, resource_type,
qos_policies, event_type):
# server does not allow to remove a policy that is attached to any
# port, so we ignore DELETED events. Also, if we receive a CREATED
# event for a policy, it means that there are no ports so far that are

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import debtcollector
from neutron.api.rpc.callbacks import resource_manager
@ -19,7 +21,22 @@ def _get_manager():
return resource_manager.ConsumerResourceCallbacksManager()
@debtcollector.removals.remove(
message="This will be removed in the future. Please register callbacks "
"using the 'register' method in this model and adjust the "
"callback to accept the context and resource type as arguments.",
version="Ocata"
)
def subscribe(callback, resource_type):
# temporary hack to differentiate between callback types until the
# 'subscribe' method is removed
callback.__dict__['_ACCEPTS_CONTEXT'] = False
_get_manager().register(callback, resource_type)
def register(callback, resource_type):
# TODO(kevinbenton): remove this on debt collection
callback.__dict__['_ACCEPTS_CONTEXT'] = True
_get_manager().register(callback, resource_type)
@ -27,12 +44,17 @@ def unsubscribe(callback, resource_type):
_get_manager().unregister(callback, resource_type)
def push(resource_type, resource_list, event_type):
def push(context, resource_type, resource_list, event_type):
"""Push resource list into all registered callbacks for the event type."""
callbacks = _get_manager().get_callbacks(resource_type)
for callback in callbacks:
callback(resource_list, event_type)
if callback._ACCEPTS_CONTEXT:
callback(context, resource_type, resource_list, event_type)
else:
# backwards compat for callback listeners that don't take
# context and resource_type
callback(resource_list, event_type)
def clear():

View File

@ -266,4 +266,4 @@ class ResourcesPushRpcCallback(object):
for resource in resource_list]
resource_type = resources.get_resource_type(resource_objs[0])
cons_registry.push(resource_type, resource_objs, event_type)
cons_registry.push(context, resource_type, resource_objs, event_type)

View File

@ -20,7 +20,6 @@ from neutron.api.rpc.handlers import resources_rpc
from neutron.callbacks import events as local_events
from neutron.callbacks import registry
from neutron.callbacks import resources as local_resources
from neutron import context as n_ctx
from neutron.services.trunk import constants as t_const
from neutron.services.trunk.drivers.linuxbridge.agent import trunk_plumber
from neutron.services.trunk.rpc import agent as trunk_rpc
@ -54,9 +53,8 @@ class LinuxBridgeTrunkDriver(trunk_rpc.TrunkSkeleton):
local_events.AFTER_DELETE)
super(LinuxBridgeTrunkDriver, self).__init__()
def handle_trunks(self, trunks, event_type):
def handle_trunks(self, context, resource_type, trunks, event_type):
"""Trunk data model change from the server."""
context = n_ctx.get_admin_context()
for trunk in trunks:
if event_type in (events.UPDATED, events.CREATED):
self._tapi.put_trunk(trunk.port_id, trunk)
@ -65,9 +63,8 @@ class LinuxBridgeTrunkDriver(trunk_rpc.TrunkSkeleton):
self._tapi.put_trunk(trunk.port_id, None)
self._plumber.delete_trunk_subports(trunk)
def handle_subports(self, subports, event_type):
def handle_subports(self, context, resource_type, subports, event_type):
"""Subport data model change from the server."""
context = n_ctx.get_admin_context()
affected_trunks = set()
if event_type == events.DELETED:
method = self._tapi.delete_trunk_subport

View File

@ -37,14 +37,14 @@ class OVSTrunkSkeleton(agent.TrunkSkeleton):
self.ovsdb_handler = ovsdb_handler
registry.unsubscribe(self.handle_trunks, resources.TRUNK)
def handle_trunks(self, trunk, event_type):
def handle_trunks(self, context, resource_type, trunk, event_type):
"""This method is not required by the OVS Agent driver.
Trunk notifications are handled via local OVSDB events.
"""
raise NotImplementedError()
def handle_subports(self, subports, event_type):
def handle_subports(self, context, resource_type, subports, event_type):
# Subports are always created with the same trunk_id and there is
# always at least one item in subports list
trunk_id = subports[0].trunk_id

View File

@ -42,8 +42,8 @@ class TrunkSkeleton(object):
"""Skeleton proxy code for server->agent communication."""
def __init__(self):
registry.subscribe(self.handle_trunks, resources.TRUNK)
registry.subscribe(self.handle_subports, resources.SUBPORT)
registry.register(self.handle_trunks, resources.TRUNK)
registry.register(self.handle_subports, resources.SUBPORT)
self._connection = n_rpc.create_connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
@ -54,7 +54,7 @@ class TrunkSkeleton(object):
self._connection.consume_in_threads()
@abc.abstractmethod
def handle_trunks(self, trunks, event_type):
def handle_trunks(self, context, resource_type, trunks, event_type):
"""Handle trunk events."""
# if common logic may be extracted out, consider making a base
# version of this method that can be overridden by the inherited
@ -63,7 +63,7 @@ class TrunkSkeleton(object):
# either be ignored or cached for future use.
@abc.abstractmethod
def handle_subports(self, subports, event_type):
def handle_subports(self, context, resource_type, subports, event_type):
"""Handle subports event."""
# if common logic may be extracted out, consider making a base
# version of this method that can be overridden by the inherited

View File

@ -229,7 +229,9 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
policy_copy.rules[0].max_kbps = 500
policy_copy.rules[0].max_burst_kbps = 5
policy_copy.rules[1].dscp_mark = TEST_DSCP_MARK_2
consumer_reg.push(resources.QOS_POLICY, [policy_copy], events.UPDATED)
context = mock.Mock()
consumer_reg.push(context, resources.QOS_POLICY,
[policy_copy], events.UPDATED)
self.wait_until_bandwidth_limit_rule_applied(self.ports[0],
policy_copy.rules[0])
self._assert_bandwidth_limit_rule_is_set(self.ports[0],
@ -265,6 +267,8 @@ class TestOVSAgentQosExtension(OVSAgentQoSExtensionTestFramework):
policy_copy = copy.deepcopy(self.qos_policies[TEST_POLICY_ID1])
policy_copy.rules = list()
consumer_reg.push(resources.QOS_POLICY, [policy_copy], events.UPDATED)
context = mock.Mock()
consumer_reg.push(context, resources.QOS_POLICY, [policy_copy],
events.UPDATED)
self.wait_until_bandwidth_limit_rule_applied(port_dict, None)

View File

@ -230,7 +230,8 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase):
self.qos_ext, '_process_update_policy') as update_mock:
for event_type in set(events.VALID) - {events.UPDATED}:
self.qos_ext._handle_notification(object(), event_type)
self.qos_ext._handle_notification(mock.Mock(), 'QOS',
object(), event_type)
self.assertFalse(update_mock.called)
def test__handle_notification_passes_update_events(self):
@ -238,7 +239,8 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase):
self.qos_ext, '_process_update_policy') as update_mock:
policy_obj = mock.Mock()
self.qos_ext._handle_notification([policy_obj], events.UPDATED)
self.qos_ext._handle_notification(mock.Mock(), 'QOS',
[policy_obj], events.UPDATED)
update_mock.assert_called_with(policy_obj)
def test__process_update_policy(self):
@ -298,7 +300,7 @@ class QosExtensionRpcTestCase(QosExtensionBaseTestCase):
class QosExtensionInitializeTestCase(QosExtensionBaseTestCase):
@mock.patch.object(registry, 'subscribe')
@mock.patch.object(registry, 'register')
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
self.qos_ext.initialize(

View File

@ -30,6 +30,12 @@ class ConsumerRegistryTestCase(base.BaseTestCase):
registry.subscribe(callback, 'TYPE')
manager_mock().register.assert_called_with(callback, 'TYPE')
@mock.patch.object(registry, '_get_manager')
def test_register(self, manager_mock):
callback = lambda: None
registry.register(callback, 'TYPE')
manager_mock().register.assert_called_with(callback, 'TYPE')
@mock.patch.object(registry, '_get_manager')
def test_unsubscribe(self, manager_mock):
callback = lambda: None
@ -47,10 +53,17 @@ class ConsumerRegistryTestCase(base.BaseTestCase):
resource_ = object()
event_type_ = object()
context = mock.Mock()
callback1 = mock.Mock()
callback2 = mock.Mock()
callbacks = {callback1, callback2}
legacy_callback = mock.Mock()
registry.register(callback1, 'x')
registry.register(callback2, 'x')
registry.subscribe(legacy_callback, 'x')
callbacks = {callback1, callback2, legacy_callback}
manager_mock().get_callbacks.return_value = callbacks
registry.push(resource_type_, [resource_], event_type_)
for callback in callbacks:
callback.assert_called_with([resource_], event_type_)
registry.push(context, resource_type_, [resource_], event_type_)
for callback in (callback1, callback2):
callback.assert_called_with(context, resource_type_,
[resource_], event_type_)
legacy_callback.assert_called_with([resource_], event_type_)

View File

@ -286,7 +286,8 @@ class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase):
resource_list=[resource.obj_to_primitive()
for resource in self.resource_objs],
event_type=TEST_EVENT)
reg_push_mock.assert_called_once_with(self.resource_objs[0].obj_name(),
reg_push_mock.assert_called_once_with(self.context,
self.resource_objs[0].obj_name(),
self.resource_objs,
TEST_EVENT)
@ -299,6 +300,7 @@ class ResourcesPushRpcCallbackTestCase(ResourcesRpcBaseTestCase):
self.callbacks.push(self.context,
resource=self.resource_objs[0].obj_to_primitive(),
event_type=TEST_EVENT)
reg_push_mock.assert_called_once_with(self.resource_objs[0].obj_name(),
reg_push_mock.assert_called_once_with(self.context,
self.resource_objs[0].obj_name(),
[self.resource_objs[0]],
TEST_EVENT)

View File

@ -52,7 +52,8 @@ class LinuxBridgeTrunkDriverTestCase(base.BaseTestCase):
def _test_handle_trunks_wire_event(self, event):
self.plumber.trunk_on_host.return_value = True
self.lbd.handle_trunks([self.trunk], event)
self.lbd.handle_trunks(mock.Mock(), 'TRUNKS',
[self.trunk], event)
self.tapi.put_trunk.assert_called_once_with(
self.trunk.port_id, self.trunk)
self.tapi.bind_subports_to_host.assert_called_once_with(
@ -60,21 +61,24 @@ class LinuxBridgeTrunkDriverTestCase(base.BaseTestCase):
self.assertFalse(self.plumber.delete_trunk_subports.called)
def test_handle_trunks_deleted(self):
self.lbd.handle_trunks([self.trunk], events.DELETED)
self.lbd.handle_trunks(mock.Mock(), 'TRUNKS',
[self.trunk], events.DELETED)
self.tapi.put_trunk.assert_called_once_with(
self.trunk.port_id, None)
self.plumber.delete_trunk_subports.assert_called_once_with(self.trunk)
def test_handle_subports_deleted(self):
self.tapi.get_trunk_by_id.return_value = self.trunk
self.lbd.handle_subports(self.trunk.sub_ports, events.DELETED)
self.lbd.handle_subports(mock.Mock(), 'TRUNKS',
self.trunk.sub_ports, events.DELETED)
self.assertEqual(20, len(self.tapi.delete_trunk_subport.mock_calls))
# should have tried to wire trunk at the end with state
self.plumber.trunk_on_host.assert_called_once_with(self.trunk)
def test_handle_subports_created(self):
self.tapi.get_trunk_by_id.return_value = self.trunk
self.lbd.handle_subports(self.trunk.sub_ports, events.CREATED)
self.lbd.handle_subports(mock.Mock(), 'TRUNKS',
self.trunk.sub_ports, events.CREATED)
self.assertEqual(20, len(self.tapi.put_trunk_subport.mock_calls))
# should have tried to wire trunk at the end with state
self.plumber.trunk_on_host.assert_called_once_with(self.trunk)

View File

@ -69,7 +69,8 @@ class OvsTrunkSkeletonTest(base.BaseTestCase):
trunk_rpc.update_subport_bindings.side_effect = (
fake_update_subport_bindings)
self.skeleton.handle_subports(self.subports, events.CREATED)
self.skeleton.handle_subports(mock.Mock(), 'SUBPORTS',
self.subports, events.CREATED)
expected_calls = [
mock.call(subport.trunk_id, subport.port_id, mock.ANY,
subport.segmentation_id)
@ -79,7 +80,8 @@ class OvsTrunkSkeletonTest(base.BaseTestCase):
@mock.patch('neutron.agent.common.ovs_lib.OVSBridge')
def test_handle_subports_deleted(self, br):
"""Test handler calls into trunk manager for deleting subports."""
self.skeleton.handle_subports(self.subports, events.DELETED)
self.skeleton.handle_subports(mock.Mock(), 'SUBPORTS',
self.subports, events.DELETED)
expected_calls = [
mock.call(subport.trunk_id, subport.port_id)
for subport in self.subports]
@ -88,7 +90,8 @@ class OvsTrunkSkeletonTest(base.BaseTestCase):
def test_handle_subports_not_for_this_agent(self):
with mock.patch.object(self.skeleton, 'ovsdb_handler') as handler_m:
handler_m.manages_this_trunk.return_value = False
self.skeleton.handle_subports(self.subports, mock.ANY)
self.skeleton.handle_subports(mock.Mock(), 'SUBPORTS',
self.subports, mock.ANY)
self.assertFalse(self.trunk_manager.wire_subports_for_trunk.called)
self.assertFalse(self.trunk_manager.unwire_subports_for_trunk.called)
@ -102,7 +105,8 @@ class OvsTrunkSkeletonTest(base.BaseTestCase):
mock.patch.object(
self.skeleton.ovsdb_handler,
'unwire_subports_for_trunk') as g:
self.skeleton.handle_subports(self.subports, events.UPDATED)
self.skeleton.handle_subports(mock.Mock(), 'SUBPORTS',
self.subports, events.UPDATED)
self.assertFalse(f.called)
self.assertFalse(g.called)
self.assertFalse(trunk_rpc.update_trunk_status.called)
@ -111,12 +115,14 @@ class OvsTrunkSkeletonTest(base.BaseTestCase):
trunk_rpc = self.skeleton.ovsdb_handler.trunk_rpc
trunk_rpc.update_subport_bindings.side_effect = (
oslo_messaging.MessagingException)
self.skeleton.handle_subports(self.subports, events.CREATED)
self.skeleton.handle_subports(mock.Mock(), 'SUBPORTS',
self.subports, events.CREATED)
self.assertTrue(trunk_rpc.update_subport_bindings.called)
def _test_handle_subports_trunk_on_trunk_update(self, event):
trunk_rpc = self.skeleton.ovsdb_handler.trunk_rpc
self.skeleton.handle_subports(self.subports, event)
self.skeleton.handle_subports(mock.Mock(), 'SUBPORTS',
self.subports, event)
# Make sure trunk state is reported to the server
self.assertTrue(trunk_rpc.update_trunk_status.called)