diff --git a/doc/source/devref/rpc_callbacks.rst b/doc/source/devref/rpc_callbacks.rst index 01bc9b6c9c6..f72672482b3 100644 --- a/doc/source/devref/rpc_callbacks.rst +++ b/doc/source/devref/rpc_callbacks.rst @@ -4,7 +4,7 @@ Neutron Messaging Callback System Neutron already has a callback system [link-to: callbacks.rst] for in-process resource callbacks where publishers and subscribers are able -to publish, subscribe and extend resources. +to publish and subscribe for resource events. This system is different, and is intended to be used for inter-process callbacks, via the messaging fanout mechanisms. @@ -16,12 +16,11 @@ modify existing RPC calls, or creating new RPC messages. A few resource which can benefit of this system: -* security groups members -* security group rules, -* QoS policies. +* QoS policies; +* Security Groups. Using a remote publisher/subscriber pattern, the information about such -resources could be published using fanout queues to all interested nodes, +resources could be published using fanout messages to all interested nodes, minimizing messaging requests from agents to server since the agents get subscribed for their whole lifecycle (unless they unsubscribe). @@ -38,8 +37,6 @@ allow object version down/up conversion. #[vo_mkcompat]_ #[vo_mkcptests]_ For the VO's versioning schema look here: #[vo_versioning]_ - - versioned_objects serialization/deserialization with the obj_to_primitive(target_version=..) and primitive_to_obj() #[ov_serdes]_ methods is used internally to convert/retrieve objects before/after messaging. @@ -58,42 +55,21 @@ Considering rolling upgrades, there are several scenarios to look at: to deserialize the object, in this case (PLEASE DISCUSS), we can think of two strategies: -a) During upgrades, we pin neutron-server to a compatible version for resource - fanout updates, and server sends both the old, and the newer version to - different topic, queues. Old agents receive the updates on the old version - topic, new agents receive updates on the new version topic. - When the whole system upgraded, we un-pin the compatible version fanout. - A variant of this could be using a single fanout queue, and sending the - pinned version of the object to all. Newer agents can deserialize to the - latest version and upgrade any fields internally. Again at the end, we - unpin the version and restart the service. - -b) The subscriber will rpc call the publisher to start publishing also a downgraded - version of the object on every update on a separate queue. The complication - of this version, is the need to ignore new version objects as long as we keep - receiving the downgraded ones, and otherwise resend the request to send the - downgraded objects after a certain timeout (thinking of the case where the - request for downgraded queue is done, but the publisher restarted). - This approach is more complicated to implement, but more automated from the - administrator point of view. We may want to look into it as a second step - from a - -c) The subscriber will send a registry.get_info for the latest specific version - he knows off. This can have scalability issues during upgrade as any outdated - agent will require a flow of two messages (request, and response). This is - indeed very bad at scale if you have hundreds or thousands of agents. - -Option a seems like a reasonable strategy, similar to what nova does now with -versioned objects. +The strategy for upgrades will be: + During upgrades, we pin neutron-server to a compatible version for resource + fanout updates, and the server sends both the old, and the newer version. + The new agents process updates, taking the newer version of the resource + fanout updates. When the whole system upgraded, we un-pin the compatible + version fanout. Serialized versioned objects look like:: {'versioned_object.version': '1.0', - 'versioned_object.name': 'QoSProfile', + 'versioned_object.name': 'QoSPolicy', 'versioned_object.data': {'rules': [ {'versioned_object.version': '1.0', - 'versioned_object.name': 'QoSRule', + 'versioned_object.name': 'QoSBandwidthLimitRule', 'versioned_object.data': {'name': u'a'}, 'versioned_object.namespace': 'versionedobjects'} ], @@ -101,19 +77,18 @@ Serialized versioned objects look like:: 'name': u'aaa'}, 'versioned_object.namespace': 'versionedobjects'} -Topic names for the fanout queues -================================= +Topic names for every resource type RPC endpoint +================================================ -if we adopted option a: -neutron-_- -[neutron-_-] +neutron-vo-- -if we adopted option b for rolling upgrades: -neutron-- -neutron--- +In the future, we may want to get oslo messaging to support subscribing +topics dynamically, then we may want to use: -for option c, just: -neutron-- +neutron-vo--- instead, + +or something equivalent which would allow fine granularity for the receivers +to only get interesting information to them. Subscribing to resources ======================== @@ -123,103 +98,86 @@ has an associated security group, and QoS policy. The agent code processing port updates may look like:: - from neutron.rpc_resources import events - from neutron.rpc_resources import resources - from neutron.rpc_resources import registry + from neutron.api.rpc.callbacks.consumer import registry + from neutron.api.rpc.callbacks import events + from neutron.api.rpc.callbacks import resources - def process_resource_updates(resource_type, resource_id, resource_list, action_type): + def process_resource_updates(resource_type, resource, event_type): # send to the right handler which will update any control plane # details related to the updated resource... - def port_update(...): + def subscribe_resources(): + registry.subscribe(process_resource_updates, resources.SEC_GROUP) + + registry.subscribe(process_resource_updates, resources.QOS_POLICY) + + def port_update(port): # here we extract sg_id and qos_policy_id from port.. - registry.subscribe(resources.SG_RULES, sg_id, - callback=process_resource_updates) - sg_rules = registry.get_info(resources.SG_RULES, sg_id) - - registry.subscribe(resources.SG_MEMBERS, sg_id, - callback=process_resource_updates) - sg_members = registry.get_info(resources.SG_MEMBERS, sg_id) - - registry.subscribe(resources.QOS_RULES, qos_policy_id, - callback=process_resource_updates) - qos_rules = registry.get_info(resources.QOS_RULES, qos_policy_id, - callback=process_resource_updates) - - cleanup_subscriptions() + sec_group = registry.pull(resources.SEC_GROUP, sg_id) + qos_policy = registry.pull(resources.QOS_POLICY, qos_policy_id) - def cleanup_subscriptions() - sg_ids = determine_unreferenced_sg_ids() - qos_policy_id = determine_unreferenced_qos_policy_ids() - registry.unsubscribe_info(resource.SG_RULES, sg_ids) - registry.unsubscribe_info(resource.SG_MEMBERS, sg_ids) - registry.unsubscribe_info(resource.QOS_RULES, qos_policy_id) +The relevant function is: -Another unsubscription strategy could be to lazily unsubscribe resources when -we receive updates for them, and we discover that they are not needed anymore. - -Deleted resources are automatically unsubscribed as we receive the delete event. - -NOTE(irenab): this could be extended to core resources like ports, making use -of the standard neutron in-process callbacks at server side and propagating -AFTER_UPDATE events, for example, but we may need to wait until those callbacks -are used with proper versioned objects. +* subscribe(callback, resource_type): subscribes callback to a resource type. -Unsubscribing to resources -========================== +The callback function will receive the following arguments: -There are a few options to unsubscribe registered callbacks: +* resource_type: the type of resource which is receiving the update. +* resource: resource of supported object +* event_type: will be one of CREATED, UPDATED, or DELETED, see + neutron.api.rpc.callbacks.events for details. -* unsubscribe_resource_id(): it selectively unsubscribes an specific - resource type + id. -* unsubscribe_resource_type(): it unsubscribes from an specific resource type, - any ID. -* unsubscribe_all(): it unsubscribes all subscribed resources and ids. +With the underlaying oslo_messaging support for dynamic topics on the receiver +we cannot implement a per "resource type + resource id" topic, rabbitmq seems +to handle 10000's of topics without suffering, but creating 100's of +oslo_messaging receivers on different topics seems to crash. + +We may want to look into that later, to avoid agents receiving resource updates +which are uninteresting to them. + +Unsubscribing from resources +============================ + +To unsubscribe registered callbacks: + +* unsubscribe(callback, resource_type): unsubscribe from specific resource type. +* unsubscribe_all(): unsubscribe from all resources. -Sending resource updates -======================== +Sending resource events +======================= On the server side, resource updates could come from anywhere, a service plugin, -an extension, anything that updates the resource and that it's of any interest -to the agents. +an extension, anything that updates, creates, or destroys the resource and that +is of any interest to subscribed agents. The server/publisher side may look like:: - from neutron.rpc_resources import events - from neutron.rpc_resources import resources - from neutron.rpc_resources import registry as rpc_registry + from neutron.api.rpc.callbacks.producer import registry + from neutron.api.rpc.callbacks import events - def add_qos_x_rule(...): + def create_qos_policy(...): + policy = fetch_policy(...) update_the_db(...) - send_rpc_updates_on_qos_policy(qos_policy_id) + registry.push(policy, events.CREATED) - def del_qos_x_rule(...): + def update_qos_policy(...): + policy = fetch_policy(...) update_the_db(...) - send_rpc_deletion_of_qos_policy(qos_policy_id) + registry.push(policy, events.UPDATED) - def send_rpc_updates_on_qos_policy(qos_policy_id): - rules = get_qos_policy_rules_versioned_object(qos_policy_id) - rpc_registry.notify(resources.QOS_RULES, qos_policy_id, rules, events.UPDATE) + def delete_qos_policy(...): + policy = fetch_policy(...) + update_the_db(...) + registry.push(policy, events.DELETED) - def send_rpc_deletion_of_qos_policy(qos_policy_id): - rpc_registry.notify(resources.QOS_RULES, qos_policy_id, None, events.DELETE) - - # This part is added for the registry mechanism, to be able to request - # older versions of the notified objects if any oudated agent requires - # them. - def retrieve_older_version_callback(qos_policy_id, version): - return get_qos_policy_rules_versioned_object(qos_policy_id, version) - - rpc_registry.register_retrieve_callback(resource.QOS_RULES, - retrieve_older_version_callback) References ========== diff --git a/neutron/agent/l2/extensions/qos.py b/neutron/agent/l2/extensions/qos.py index f3442c8ea2f..6483d5aa9f0 100644 --- a/neutron/agent/l2/extensions/qos.py +++ b/neutron/agent/l2/extensions/qos.py @@ -76,7 +76,7 @@ class QosAgentExtension(agent_extension.AgentCoreResourceExtension): """ super(QosAgentExtension, self).initialize() - self.resource_rpc = resources_rpc.ResourcesServerRpcApi() + self.resource_rpc = resources_rpc.ResourcesPullRpcApi() self.qos_driver = manager.NeutronManager.load_class_for_provider( 'neutron.qos.agent_drivers', cfg.CONF.qos.agent_driver)() self.qos_driver.initialize() @@ -111,8 +111,8 @@ class QosAgentExtension(agent_extension.AgentCoreResourceExtension): # 1. to add new api for subscribe # registry.subscribe(self._process_policy_updates, # resources.QOS_POLICY, qos_policy_id) - # 2. combine get_info rpc to also subscribe to the resource - qos_policy = self.resource_rpc.get_info( + # 2. combine pull rpc to also subscribe to the resource + qos_policy = self.resource_rpc.pull( context, resources.QOS_POLICY, qos_policy_id) diff --git a/neutron/api/rpc/callbacks/consumer/__init__.py b/neutron/api/rpc/callbacks/consumer/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/api/rpc/callbacks/consumer/registry.py b/neutron/api/rpc/callbacks/consumer/registry.py new file mode 100644 index 00000000000..454e423a083 --- /dev/null +++ b/neutron/api/rpc/callbacks/consumer/registry.py @@ -0,0 +1,44 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from neutron.api.rpc.callbacks import resource_manager + + +LOG = logging.getLogger(__name__) + + +#TODO(ajo): consider adding locking to _get_manager, it's +# safe for eventlet, but not for normal threading. +def _get_manager(): + return resource_manager.ConsumerResourceCallbacksManager() + + +def subscribe(callback, resource_type): + _get_manager().register(callback, resource_type) + + +def unsubscribe(callback, resource_type): + _get_manager().unregister(callback, resource_type) + + +def push(resource_type, resource, event_type): + """Push resource events into all registered callbacks for the type.""" + + callbacks = _get_manager().get_callbacks(resource_type) + for callback in callbacks: + callback(resource_type, resource, event_type) + + +def clear(): + _get_manager().clear() diff --git a/neutron/api/rpc/callbacks/events.py b/neutron/api/rpc/callbacks/events.py index ff8193d9ed1..485a1bc801e 100644 --- a/neutron/api/rpc/callbacks/events.py +++ b/neutron/api/rpc/callbacks/events.py @@ -10,10 +10,12 @@ # License for the specific language governing permissions and limitations # under the License. +CREATED = 'created' UPDATED = 'updated' DELETED = 'deleted' VALID = ( + CREATED, UPDATED, DELETED ) diff --git a/neutron/api/rpc/callbacks/exceptions.py b/neutron/api/rpc/callbacks/exceptions.py new file mode 100644 index 00000000000..9e17474db08 --- /dev/null +++ b/neutron/api/rpc/callbacks/exceptions.py @@ -0,0 +1,25 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import exceptions + + +class CallbackWrongResourceType(exceptions.NeutronException): + message = _('Callback for %(resource_type)s returned wrong resource type') + + +class CallbackNotFound(exceptions.NeutronException): + message = _('Callback for %(resource_type)s not found') + + +class CallbacksMaxLimitReached(exceptions.NeutronException): + message = _("Cannot add multiple callbacks for %(resource_type)s") diff --git a/neutron/api/rpc/callbacks/producer/__init__.py b/neutron/api/rpc/callbacks/producer/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/api/rpc/callbacks/producer/registry.py b/neutron/api/rpc/callbacks/producer/registry.py new file mode 100644 index 00000000000..b19a8bfd501 --- /dev/null +++ b/neutron/api/rpc/callbacks/producer/registry.py @@ -0,0 +1,62 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from neutron.api.rpc.callbacks import exceptions +from neutron.api.rpc.callbacks import resource_manager +from neutron.objects import base + + +LOG = logging.getLogger(__name__) + + +# TODO(ajo): consider adding locking: it's safe for eventlet but not +# for other types of threading. +def _get_manager(): + return resource_manager.ProducerResourceCallbacksManager() + + +def provide(callback, resource_type): + """Register a callback as a producer for the resource type. + + This callback will be used to produce resources of corresponding type for + interested parties. + """ + _get_manager().register(callback, resource_type) + + +def unprovide(callback, resource_type): + """Unregister a callback for corresponding resource type.""" + _get_manager().unregister(callback, resource_type) + + +def clear(): + """Clear all callbacks.""" + _get_manager().clear() + + +def pull(resource_type, resource_id, **kwargs): + """Get resource object that corresponds to resource id. + + The function will return an object that is provided by resource producer. + + :returns: NeutronObject + """ + callback = _get_manager().get_callback(resource_type) + obj = callback(resource_type, resource_id, **kwargs) + if obj: + if (not isinstance(obj, base.NeutronObject) or + resource_type != obj.obj_name()): + raise exceptions.CallbackWrongResourceType( + resource_type=resource_type) + return obj diff --git a/neutron/api/rpc/callbacks/registry.py b/neutron/api/rpc/callbacks/registry.py deleted file mode 100644 index de132983d31..00000000000 --- a/neutron/api/rpc/callbacks/registry.py +++ /dev/null @@ -1,87 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.api.rpc.callbacks import resource_manager -from neutron.api.rpc.callbacks import resources -from neutron.common import exceptions - - -# TODO(ajo): consider adding locking -CALLBACK_MANAGER = None - - -def _get_resources_callback_manager(): - global CALLBACK_MANAGER - if CALLBACK_MANAGER is None: - CALLBACK_MANAGER = resource_manager.ResourcesCallbacksManager() - return CALLBACK_MANAGER - - -class CallbackReturnedWrongObjectType(exceptions.NeutronException): - message = _('Callback for %(resource_type)s returned wrong object type') - - -class CallbackNotFound(exceptions.NeutronException): - message = _('Callback for %(resource_type)s not found') - - -#resource implementation callback registration functions -def get_info(resource_type, resource_id, **kwargs): - """Get information about resource type with resource id. - - The function will check the providers for a specific remotable - resource and get the resource. - - :returns: NeutronObject - """ - callback = _get_resources_callback_manager().get_callback(resource_type) - if not callback: - raise CallbackNotFound(resource_type=resource_type) - - obj = callback(resource_type, resource_id, **kwargs) - if obj: - expected_cls = resources.get_resource_cls(resource_type) - if not isinstance(obj, expected_cls): - raise CallbackReturnedWrongObjectType( - resource_type=resource_type) - return obj - - -def register_provider(callback, resource_type): - _get_resources_callback_manager().register(callback, resource_type) - - -# resource RPC callback for pub/sub -#Agent side -def subscribe(callback, resource_type, resource_id): - #TODO(QoS): we have to finish the real update notifications - raise NotImplementedError("we should finish update notifications") - - -def unsubscribe(callback, resource_type, resource_id): - #TODO(QoS): we have to finish the real update notifications - raise NotImplementedError("we should finish update notifications") - - -def unsubscribe_all(): - #TODO(QoS): we have to finish the real update notifications - raise NotImplementedError("we should finish update notifications") - - -#Server side -def notify(resource_type, event, obj): - #TODO(QoS): we have to finish the real update notifications - raise NotImplementedError("we should finish update notifications") - - -def clear(): - _get_resources_callback_manager().clear() diff --git a/neutron/api/rpc/callbacks/resource_manager.py b/neutron/api/rpc/callbacks/resource_manager.py index f28326fef72..63f89803358 100644 --- a/neutron/api/rpc/callbacks/resource_manager.py +++ b/neutron/api/rpc/callbacks/resource_manager.py @@ -10,58 +10,130 @@ # License for the specific language governing permissions and limitations # under the License. +import abc import collections from oslo_log import log as logging +import six +from neutron.api.rpc.callbacks import exceptions as rpc_exc from neutron.api.rpc.callbacks import resources from neutron.callbacks import exceptions LOG = logging.getLogger(__name__) +# TODO(QoS): split the registry/resources_rpc modules into two separate things: +# one for pull and one for push APIs -class ResourcesCallbacksManager(object): + +def _validate_resource_type(resource_type): + if not resources.is_valid_resource_type(resource_type): + raise exceptions.Invalid(element='resource', value=resource_type) + + +@six.add_metaclass(abc.ABCMeta) +class ResourceCallbacksManager(object): """A callback system that allows information providers in a loose manner. """ - def __init__(self): - self.clear() + # This hook is to allow tests to get new objects for the class + _singleton = True + + def __new__(cls, *args, **kwargs): + if not cls._singleton: + return super(ResourceCallbacksManager, cls).__new__(cls) + + if not hasattr(cls, '_instance'): + cls._instance = super(ResourceCallbacksManager, cls).__new__(cls) + return cls._instance + + @abc.abstractmethod + def _add_callback(self, callback, resource_type): + pass + + @abc.abstractmethod + def _delete_callback(self, callback, resource_type): + pass def register(self, callback, resource_type): """Register a callback for a resource type. - Only one callback can be registered for a resource type. - :param callback: the callback. It must raise or return NeutronObject. :param resource_type: must be a valid resource type. """ - LOG.debug("register: %(callback)s %(resource_type)s", - {'callback': callback, 'resource_type': resource_type}) - if not resources.is_valid_resource_type(resource_type): - raise exceptions.Invalid(element='resource', value=resource_type) + LOG.debug("Registering callback for %s", resource_type) + _validate_resource_type(resource_type) + self._add_callback(callback, resource_type) - self._callbacks[resource_type] = callback - - def unregister(self, resource_type): + def unregister(self, callback, resource_type): """Unregister callback from the registry. - :param resource: must be a valid resource type. + :param callback: the callback. + :param resource_type: must be a valid resource type. """ - LOG.debug("Unregister: %s", resource_type) - if not resources.is_valid_resource_type(resource_type): - raise exceptions.Invalid(element='resource', value=resource_type) - self._callbacks[resource_type] = None + LOG.debug("Unregistering callback for %s", resource_type) + _validate_resource_type(resource_type) + self._delete_callback(callback, resource_type) + @abc.abstractmethod def clear(self): """Brings the manager to a clean state.""" - self._callbacks = collections.defaultdict(dict) + + def get_subscribed_types(self): + return list(self._callbacks.keys()) + + +class ProducerResourceCallbacksManager(ResourceCallbacksManager): + + _callbacks = dict() + + def _add_callback(self, callback, resource_type): + if resource_type in self._callbacks: + raise rpc_exc.CallbacksMaxLimitReached(resource_type=resource_type) + self._callbacks[resource_type] = callback + + def _delete_callback(self, callback, resource_type): + try: + del self._callbacks[resource_type] + except KeyError: + raise rpc_exc.CallbackNotFound(resource_type=resource_type) + + def clear(self): + self._callbacks = dict() def get_callback(self, resource_type): + _validate_resource_type(resource_type) + try: + return self._callbacks[resource_type] + except KeyError: + raise rpc_exc.CallbackNotFound(resource_type=resource_type) + + +class ConsumerResourceCallbacksManager(ResourceCallbacksManager): + + _callbacks = collections.defaultdict(set) + + def _add_callback(self, callback, resource_type): + self._callbacks[resource_type].add(callback) + + def _delete_callback(self, callback, resource_type): + try: + self._callbacks[resource_type].remove(callback) + if not self._callbacks[resource_type]: + del self._callbacks[resource_type] + except KeyError: + raise rpc_exc.CallbackNotFound(resource_type=resource_type) + + def clear(self): + self._callbacks = collections.defaultdict(set) + + def get_callbacks(self, resource_type): """Return the callback if found, None otherwise. :param resource_type: must be a valid resource type. """ - if not resources.is_valid_resource_type(resource_type): - raise exceptions.Invalid(element='resource', value=resource_type) - - return self._callbacks[resource_type] + _validate_resource_type(resource_type) + callbacks = self._callbacks[resource_type] + if not callbacks: + raise rpc_exc.CallbackNotFound(resource_type=resource_type) + return callbacks diff --git a/neutron/api/rpc/handlers/resources_rpc.py b/neutron/api/rpc/handlers/resources_rpc.py index 6c801e5dc2a..eed2dfde076 100755 --- a/neutron/api/rpc/handlers/resources_rpc.py +++ b/neutron/api/rpc/handlers/resources_rpc.py @@ -17,7 +17,7 @@ from oslo_log import helpers as log_helpers from oslo_log import log as logging import oslo_messaging -from neutron.api.rpc.callbacks import registry +from neutron.api.rpc.callbacks.producer import registry from neutron.api.rpc.callbacks import resources from neutron.common import constants from neutron.common import exceptions @@ -46,14 +46,20 @@ def _validate_resource_type(resource_type): raise InvalidResourceTypeClass(resource_type=resource_type) -class ResourcesServerRpcApi(object): +class ResourcesPullRpcApi(object): """Agent-side RPC (stub) for agent-to-plugin interaction. This class implements the client side of an rpc interface. The server side - can be found below: ResourcesServerRpcCallback. For more information on + can be found below: ResourcesPullRpcCallback. For more information on this RPC interface, see doc/source/devref/rpc_callbacks.rst. """ + def __new__(cls): + # make it a singleton + if not hasattr(cls, '_instance'): + cls._instance = super(ResourcesPullRpcApi, cls).__new__(cls) + return cls._instance + def __init__(self): target = oslo_messaging.Target( topic=topics.PLUGIN, version='1.0', @@ -61,7 +67,7 @@ class ResourcesServerRpcApi(object): self.client = n_rpc.get_client(target) @log_helpers.log_method_call - def get_info(self, context, resource_type, resource_id): + def pull(self, context, resource_type, resource_id): _validate_resource_type(resource_type) # we've already validated the resource type, so we are pretty sure the @@ -69,7 +75,7 @@ class ResourcesServerRpcApi(object): resource_type_cls = resources.get_resource_cls(resource_type) cctxt = self.client.prepare() - primitive = cctxt.call(context, 'get_info', + primitive = cctxt.call(context, 'pull', resource_type=resource_type, version=resource_type_cls.VERSION, resource_id=resource_id) @@ -82,11 +88,11 @@ class ResourcesServerRpcApi(object): return obj -class ResourcesServerRpcCallback(object): +class ResourcesPullRpcCallback(object): """Plugin-side RPC (implementation) for agent-to-plugin interaction. This class implements the server side of an rpc interface. The client side - can be found above: ResourcesServerRpcApi. For more information on + can be found above: ResourcesPullRpcApi. For more information on this RPC interface, see doc/source/devref/rpc_callbacks.rst. """ @@ -96,14 +102,10 @@ class ResourcesServerRpcCallback(object): target = oslo_messaging.Target( version='1.0', namespace=constants.RPC_NAMESPACE_RESOURCES) - def get_info(self, context, resource_type, version, resource_id): + def pull(self, context, resource_type, version, resource_id): _validate_resource_type(resource_type) - obj = registry.get_info( - resource_type, - resource_id, - context=context) - + obj = registry.pull(resource_type, resource_id, context=context) if obj: # don't request a backport for the latest known version if version == obj.VERSION: diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index cdcd3a61a2c..85b9f483760 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -164,7 +164,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, dhcp_rpc.DhcpRpcCallback(), agents_db.AgentExtRpcCallback(), metadata_rpc.MetadataRpcCallback(), - resources_rpc.ResourcesServerRpcCallback() + resources_rpc.ResourcesPullRpcCallback() ] def _setup_dhcp(self): diff --git a/neutron/services/qos/notification_drivers/message_queue.py b/neutron/services/qos/notification_drivers/message_queue.py index d430730a6d0..aa804f72306 100644 --- a/neutron/services/qos/notification_drivers/message_queue.py +++ b/neutron/services/qos/notification_drivers/message_queue.py @@ -12,8 +12,7 @@ from oslo_log import log as logging -from neutron.api.rpc.callbacks import events -from neutron.api.rpc.callbacks import registry as rpc_registry +from neutron.api.rpc.callbacks.producer import registry from neutron.api.rpc.callbacks import resources from neutron.i18n import _LW from neutron.objects.qos import policy as policy_object @@ -41,9 +40,7 @@ class RpcQosServiceNotificationDriver( """RPC message queue service notification driver for QoS.""" def __init__(self): - rpc_registry.register_provider( - _get_qos_policy_cb, - resources.QOS_POLICY) + registry.provide(_get_qos_policy_cb, resources.QOS_POLICY) def get_description(self): return "Message queue updates" @@ -53,19 +50,9 @@ class RpcQosServiceNotificationDriver( pass def update_policy(self, policy): - # TODO(QoS): this is temporary until we get notify() implemented - try: - rpc_registry.notify(resources.QOS_POLICY, - events.UPDATED, - policy) - except NotImplementedError: - pass + # TODO(QoS): implement notification + pass def delete_policy(self, policy): - # TODO(QoS): this is temporary until we get notify() implemented - try: - rpc_registry.notify(resources.QOS_POLICY, - events.DELETED, - policy) - except NotImplementedError: - pass + # TODO(QoS): implement notification + pass diff --git a/neutron/services/qos/qos_plugin.py b/neutron/services/qos/qos_plugin.py index 9073d712bc9..0b91d46b9c2 100644 --- a/neutron/services/qos/qos_plugin.py +++ b/neutron/services/qos/qos_plugin.py @@ -60,8 +60,8 @@ class QoSPlugin(qos.QoSPluginBase): def delete_policy(self, context, policy_id): policy = policy_object.QosPolicy(context) policy.id = policy_id - self.notification_driver_manager.delete_policy(policy) policy.delete() + self.notification_driver_manager.delete_policy(policy) def _get_policy_obj(self, context, policy_id): obj = policy_object.QosPolicy.get_by_id(context, policy_id) diff --git a/neutron/tests/unit/agent/l2/extensions/test_qos.py b/neutron/tests/unit/agent/l2/extensions/test_qos.py index 8772394bdb1..006044bf369 100755 --- a/neutron/tests/unit/agent/l2/extensions/test_qos.py +++ b/neutron/tests/unit/agent/l2/extensions/test_qos.py @@ -23,7 +23,7 @@ from neutron.tests import base # This is a minimalistic mock of rules to be passed/checked around # which should be exteneded as needed to make real rules -TEST_GET_INFO_RULES = ['rule1', 'rule2'] +TEST_GET_RESOURCE_RULES = ['rule1', 'rule2'] class QosAgentExtensionTestCase(base.BaseTestCase): @@ -40,11 +40,10 @@ class QosAgentExtensionTestCase(base.BaseTestCase): ).start() self.qos_ext.initialize() - self._create_fake_resource_rpc() - def _create_fake_resource_rpc(self): - self.get_info_mock = mock.Mock(return_value=TEST_GET_INFO_RULES) - self.qos_ext.resource_rpc.get_info = self.get_info_mock + self.pull_mock = mock.patch.object( + self.qos_ext.resource_rpc, 'pull', + return_value=TEST_GET_RESOURCE_RULES).start() def _create_test_port_dict(self): return {'port_id': uuidutils.generate_uuid(), @@ -65,7 +64,7 @@ class QosAgentExtensionTestCase(base.BaseTestCase): # we make sure the underlaying qos driver is called with the # right parameters self.qos_ext.qos_driver.create.assert_called_once_with( - port, TEST_GET_INFO_RULES) + port, TEST_GET_RESOURCE_RULES) self.assertEqual(port, self.qos_ext.qos_policy_ports[qos_policy_id][port_id]) self.assertTrue(port_id in self.qos_ext.known_ports) @@ -81,10 +80,10 @@ class QosAgentExtensionTestCase(base.BaseTestCase): def test_handle_known_port_change_policy_id(self): port = self._create_test_port_dict() self.qos_ext.handle_port(self.context, port) - self.qos_ext.resource_rpc.get_info.reset_mock() + self.qos_ext.resource_rpc.pull.reset_mock() port['qos_policy_id'] = uuidutils.generate_uuid() self.qos_ext.handle_port(self.context, port) - self.get_info_mock.assert_called_once_with( + self.pull_mock.assert_called_once_with( self.context, resources.QOS_POLICY, port['qos_policy_id']) #TODO(QoS): handle qos_driver.update call check when diff --git a/neutron/tests/unit/api/rpc/callbacks/consumer/__init__.py b/neutron/tests/unit/api/rpc/callbacks/consumer/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py b/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py new file mode 100644 index 00000000000..5d18e539fd7 --- /dev/null +++ b/neutron/tests/unit/api/rpc/callbacks/consumer/test_registry.py @@ -0,0 +1,56 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.api.rpc.callbacks.consumer import registry +from neutron.tests import base + + +class ConsumerRegistryTestCase(base.BaseTestCase): + + def setUp(self): + super(ConsumerRegistryTestCase, self).setUp() + + def test__get_manager_is_singleton(self): + self.assertIs(registry._get_manager(), registry._get_manager()) + + @mock.patch.object(registry, '_get_manager') + def test_subscribe(self, manager_mock): + callback = lambda: None + registry.subscribe(callback, 'TYPE') + manager_mock().register.assert_called_with(callback, 'TYPE') + + @mock.patch.object(registry, '_get_manager') + def test_unsubscribe(self, manager_mock): + callback = lambda: None + registry.unsubscribe(callback, 'TYPE') + manager_mock().unregister.assert_called_with(callback, 'TYPE') + + @mock.patch.object(registry, '_get_manager') + def test_clear(self, manager_mock): + registry.clear() + manager_mock().clear.assert_called_with() + + @mock.patch.object(registry, '_get_manager') + def test_push(self, manager_mock): + resource_type_ = object() + resource_ = object() + event_type_ = object() + + callback1 = mock.Mock() + callback2 = mock.Mock() + callbacks = {callback1, callback2} + manager_mock().get_callbacks.return_value = callbacks + registry.push(resource_type_, resource_, event_type_) + for callback in callbacks: + callback.assert_called_with(resource_type_, resource_, event_type_) diff --git a/neutron/tests/unit/api/rpc/callbacks/producer/__init__.py b/neutron/tests/unit/api/rpc/callbacks/producer/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/api/rpc/callbacks/producer/test_registry.py b/neutron/tests/unit/api/rpc/callbacks/producer/test_registry.py new file mode 100644 index 00000000000..5b7b049c60a --- /dev/null +++ b/neutron/tests/unit/api/rpc/callbacks/producer/test_registry.py @@ -0,0 +1,81 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.api.rpc.callbacks import exceptions +from neutron.api.rpc.callbacks.producer import registry +from neutron.api.rpc.callbacks import resources +from neutron.objects.qos import policy +from neutron.tests.unit.services.qos import base + + +class ProducerRegistryTestCase(base.BaseQosTestCase): + + def test_pull_returns_callback_result(self): + policy_obj = policy.QosPolicy(context=None) + + def _fake_policy_cb(*args, **kwargs): + return policy_obj + + registry.provide(_fake_policy_cb, resources.QOS_POLICY) + + self.assertEqual( + policy_obj, + registry.pull(resources.QOS_POLICY, 'fake_id')) + + def test_pull_does_not_raise_on_none(self): + def _none_cb(*args, **kwargs): + pass + + registry.provide(_none_cb, resources.QOS_POLICY) + + obj = registry.pull(resources.QOS_POLICY, 'fake_id') + self.assertIsNone(obj) + + def test_pull_raises_on_wrong_object_type(self): + def _wrong_type_cb(*args, **kwargs): + return object() + + registry.provide(_wrong_type_cb, resources.QOS_POLICY) + + self.assertRaises( + exceptions.CallbackWrongResourceType, + registry.pull, resources.QOS_POLICY, 'fake_id') + + def test_pull_raises_on_callback_not_found(self): + self.assertRaises( + exceptions.CallbackNotFound, + registry.pull, resources.QOS_POLICY, 'fake_id') + + def test__get_manager_is_singleton(self): + self.assertIs(registry._get_manager(), registry._get_manager()) + + def test_unprovide(self): + def _fake_policy_cb(*args, **kwargs): + pass + + registry.provide(_fake_policy_cb, resources.QOS_POLICY) + registry.unprovide(_fake_policy_cb, resources.QOS_POLICY) + + self.assertRaises( + exceptions.CallbackNotFound, + registry.pull, resources.QOS_POLICY, 'fake_id') + + def test_clear_unprovides_all_producers(self): + def _fake_policy_cb(*args, **kwargs): + pass + + registry.provide(_fake_policy_cb, resources.QOS_POLICY) + registry.clear() + + self.assertRaises( + exceptions.CallbackNotFound, + registry.pull, resources.QOS_POLICY, 'fake_id') diff --git a/neutron/tests/unit/api/rpc/callbacks/test_registry.py b/neutron/tests/unit/api/rpc/callbacks/test_registry.py deleted file mode 100644 index 3c12b38dc74..00000000000 --- a/neutron/tests/unit/api/rpc/callbacks/test_registry.py +++ /dev/null @@ -1,63 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock - -from neutron.api.rpc.callbacks import registry -from neutron.api.rpc.callbacks import resource_manager -from neutron.api.rpc.callbacks import resources -from neutron.objects.qos import policy -from neutron.tests import base - - -class GetInfoTestCase(base.BaseTestCase): - def setUp(self): - super(GetInfoTestCase, self).setUp() - mgr = resource_manager.ResourcesCallbacksManager() - mgr_p = mock.patch.object( - registry, '_get_resources_callback_manager', return_value=mgr) - mgr_p.start() - - def test_returns_callback_result(self): - policy_obj = policy.QosPolicy(context=None) - - def _fake_policy_cb(*args, **kwargs): - return policy_obj - - registry.register_provider(_fake_policy_cb, resources.QOS_POLICY) - - self.assertEqual(policy_obj, - registry.get_info(resources.QOS_POLICY, 'fake_id')) - - def test_does_not_raise_on_none(self): - def _wrong_type_cb(*args, **kwargs): - pass - - registry.register_provider(_wrong_type_cb, resources.QOS_POLICY) - - obj = registry.get_info(resources.QOS_POLICY, 'fake_id') - self.assertIsNone(obj) - - def test_raises_on_wrong_object_type(self): - def _wrong_type_cb(*args, **kwargs): - return object() - - registry.register_provider(_wrong_type_cb, resources.QOS_POLICY) - - self.assertRaises( - registry.CallbackReturnedWrongObjectType, - registry.get_info, resources.QOS_POLICY, 'fake_id') - - def test_raises_on_callback_not_found(self): - self.assertRaises( - registry.CallbackNotFound, - registry.get_info, resources.QOS_POLICY, 'fake_id') diff --git a/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py b/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py index bc708dbbd28..79d5ed55c5a 100644 --- a/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py +++ b/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py @@ -10,52 +10,131 @@ # License for the specific language governing permissions and limitations # under the License. +import mock -from neutron.api.rpc.callbacks import registry as rpc_registry -from neutron.api.rpc.callbacks import resources -from neutron.objects.qos import policy -from neutron.objects.qos import rule +from neutron.api.rpc.callbacks import exceptions as rpc_exc +from neutron.api.rpc.callbacks import resource_manager +from neutron.callbacks import exceptions as exceptions +from neutron.tests.unit.services.qos import base + +IS_VALID_RESOURCE_TYPE = ( + 'neutron.api.rpc.callbacks.resources.is_valid_resource_type') -from neutron.tests import base +class ResourceCallbacksManagerTestCaseMixin(object): + + def test_register_fails_on_invalid_type(self): + self.assertRaises( + exceptions.Invalid, + self.mgr.register, lambda: None, 'TYPE') + + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_clear_unregisters_all_callbacks(self, *mocks): + self.mgr.register(lambda: None, 'TYPE1') + self.mgr.register(lambda: None, 'TYPE2') + self.mgr.clear() + self.assertEqual([], self.mgr.get_subscribed_types()) + + def test_unregister_fails_on_invalid_type(self): + self.assertRaises( + exceptions.Invalid, + self.mgr.unregister, lambda: None, 'TYPE') + + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_unregister_fails_on_unregistered_callback(self, *mocks): + self.assertRaises( + rpc_exc.CallbackNotFound, + self.mgr.unregister, lambda: None, 'TYPE') + + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_unregister_unregisters_callback(self, *mocks): + callback = lambda: None + self.mgr.register(callback, 'TYPE') + self.mgr.unregister(callback, 'TYPE') + self.assertEqual([], self.mgr.get_subscribed_types()) + + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test___init___does_not_reset_callbacks(self, *mocks): + callback = lambda: None + self.mgr.register(callback, 'TYPE') + resource_manager.ProducerResourceCallbacksManager() + self.assertEqual(['TYPE'], self.mgr.get_subscribed_types()) -class ResourcesCallbackRequestTestCase(base.BaseTestCase): +class ProducerResourceCallbacksManagerTestCase( + base.BaseQosTestCase, ResourceCallbacksManagerTestCaseMixin): def setUp(self): - super(ResourcesCallbackRequestTestCase, self).setUp() - self.resource_id = '46ebaec0-0570-43ac-82f6-60d2b03168c4' - self.qos_rule_id = '5f126d84-551a-4dcf-bb01-0e9c0df0c793' + super(ProducerResourceCallbacksManagerTestCase, self).setUp() + self.mgr = self.prod_mgr - def test_resource_callback_request(self): + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_register_registers_callback(self, *mocks): + callback = lambda: None + self.mgr.register(callback, 'TYPE') + self.assertEqual(callback, self.mgr.get_callback('TYPE')) - def _get_qos_policy_cb(resource, policy_id, **kwargs): - context = kwargs.get('context') - qos_policy = policy.QosPolicy(context, - tenant_id="8d4c70a21fed4aeba121a1a429ba0d04", - id="46ebaec0-0570-43ac-82f6-60d2b03168c4", - name="10Mbit", - description="This policy limits the ports to 10Mbit max.", - shared=False, - rules=[ - rule.QosBandwidthLimitRule(context, - id="5f126d84-551a-4dcf-bb01-0e9c0df0c793", - max_kbps=10000, - max_burst_kbps=0) - ] - ) - qos_policy.obj_reset_changes() - return qos_policy + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_register_fails_on_multiple_calls(self, *mocks): + self.mgr.register(lambda: None, 'TYPE') + self.assertRaises( + rpc_exc.CallbacksMaxLimitReached, + self.mgr.register, lambda: None, 'TYPE') - rpc_registry.register_provider( - _get_qos_policy_cb, - resources.QOS_POLICY) + def test_get_callback_fails_on_invalid_type(self): + self.assertRaises( + exceptions.Invalid, + self.mgr.get_callback, 'TYPE') - self.ctx = None - kwargs = {'context': self.ctx} + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_get_callback_fails_on_unregistered_callback( + self, *mocks): + self.assertRaises( + rpc_exc.CallbackNotFound, + self.mgr.get_callback, 'TYPE') - qos_policy = rpc_registry.get_info( - resources.QOS_POLICY, - self.resource_id, - **kwargs) - self.assertEqual(self.resource_id, qos_policy['id']) + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_get_callback_returns_proper_callback(self, *mocks): + callback1 = lambda: None + callback2 = lambda: None + self.mgr.register(callback1, 'TYPE1') + self.mgr.register(callback2, 'TYPE2') + self.assertEqual(callback1, self.mgr.get_callback('TYPE1')) + self.assertEqual(callback2, self.mgr.get_callback('TYPE2')) + + +class ConsumerResourceCallbacksManagerTestCase( + base.BaseQosTestCase, ResourceCallbacksManagerTestCaseMixin): + + def setUp(self): + super(ConsumerResourceCallbacksManagerTestCase, self).setUp() + self.mgr = self.cons_mgr + + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_register_registers_callback(self, *mocks): + callback = lambda: None + self.mgr.register(callback, 'TYPE') + self.assertEqual({callback}, self.mgr.get_callbacks('TYPE')) + + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_register_succeeds_on_multiple_calls(self, *mocks): + callback1 = lambda: None + callback2 = lambda: None + self.mgr.register(callback1, 'TYPE') + self.mgr.register(callback2, 'TYPE') + + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_get_callbacks_fails_on_unregistered_callback( + self, *mocks): + self.assertRaises( + rpc_exc.CallbackNotFound, + self.mgr.get_callbacks, 'TYPE') + + @mock.patch(IS_VALID_RESOURCE_TYPE, return_value=True) + def test_get_callbacks_returns_proper_callbacks(self, *mocks): + callback1 = lambda: None + callback2 = lambda: None + self.mgr.register(callback1, 'TYPE1') + self.mgr.register(callback2, 'TYPE2') + self.assertEqual(set([callback1]), self.mgr.get_callbacks('TYPE1')) + self.assertEqual(set([callback2]), self.mgr.get_callbacks('TYPE2')) diff --git a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py index 3d1104c408d..f7b52201f6f 100755 --- a/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py +++ b/neutron/tests/unit/api/rpc/handlers/test_resources_rpc.py @@ -42,55 +42,59 @@ class ResourcesRpcBaseTestCase(base.BaseTestCase): return policy_obj -class ResourcesServerRpcApiTestCase(ResourcesRpcBaseTestCase): +class ResourcesPullRpcApiTestCase(ResourcesRpcBaseTestCase): def setUp(self): - super(ResourcesServerRpcApiTestCase, self).setUp() + super(ResourcesPullRpcApiTestCase, self).setUp() self.client_p = mock.patch.object(resources_rpc.n_rpc, 'get_client') self.client = self.client_p.start() - self.rpc = resources_rpc.ResourcesServerRpcApi() + self.rpc = resources_rpc.ResourcesPullRpcApi() self.mock_cctxt = self.rpc.client.prepare.return_value - def test_get_info(self): + def test_is_singleton(self): + self.assertEqual(id(self.rpc), + id(resources_rpc.ResourcesPullRpcApi())) + + def test_pull(self): policy_dict = self._create_test_policy_dict() expected_policy_obj = self._create_test_policy(policy_dict) qos_policy_id = policy_dict['id'] self.mock_cctxt.call.return_value = ( expected_policy_obj.obj_to_primitive()) - get_info_result = self.rpc.get_info( + pull_result = self.rpc.pull( self.context, resources.QOS_POLICY, qos_policy_id) self.mock_cctxt.call.assert_called_once_with( - self.context, 'get_info', resource_type=resources.QOS_POLICY, + self.context, 'pull', resource_type=resources.QOS_POLICY, version=policy.QosPolicy.VERSION, resource_id=qos_policy_id) - self.assertEqual(expected_policy_obj, get_info_result) + self.assertEqual(expected_policy_obj, pull_result) - def test_get_info_invalid_resource_type_cls(self): + def test_pull_invalid_resource_type_cls(self): self.assertRaises( - resources_rpc.InvalidResourceTypeClass, self.rpc.get_info, + resources_rpc.InvalidResourceTypeClass, self.rpc.pull, self.context, 'foo_type', 'foo_id') - def test_get_info_resource_not_found(self): + def test_pull_resource_not_found(self): policy_dict = self._create_test_policy_dict() qos_policy_id = policy_dict['id'] self.mock_cctxt.call.return_value = None self.assertRaises( - resources_rpc.ResourceNotFound, self.rpc.get_info, self.context, - resources.QOS_POLICY, qos_policy_id) + resources_rpc.ResourceNotFound, self.rpc.pull, + self.context, resources.QOS_POLICY, qos_policy_id) -class ResourcesServerRpcCallbackTestCase(ResourcesRpcBaseTestCase): +class ResourcesPullRpcCallbackTestCase(ResourcesRpcBaseTestCase): def setUp(self): - super(ResourcesServerRpcCallbackTestCase, self).setUp() - self.callbacks = resources_rpc.ResourcesServerRpcCallback() + super(ResourcesPullRpcCallbackTestCase, self).setUp() + self.callbacks = resources_rpc.ResourcesPullRpcCallback() - def test_get_info(self): + def test_pull(self): policy_dict = self._create_test_policy_dict() policy_obj = self._create_test_policy(policy_dict) qos_policy_id = policy_dict['id'] - with mock.patch.object(resources_rpc.registry, 'get_info', + with mock.patch.object(resources_rpc.registry, 'pull', return_value=policy_obj) as registry_mock: - primitive = self.callbacks.get_info( + primitive = self.callbacks.pull( self.context, resource_type=resources.QOS_POLICY, version=policy.QosPolicy.VERSION, resource_id=qos_policy_id) @@ -101,26 +105,26 @@ class ResourcesServerRpcCallbackTestCase(ResourcesRpcBaseTestCase): self.assertEqual(policy_obj.obj_to_primitive(), primitive) @mock.patch.object(policy.QosPolicy, 'obj_to_primitive') - def test_get_info_no_backport_for_latest_version(self, to_prim_mock): + def test_pull_no_backport_for_latest_version(self, to_prim_mock): policy_dict = self._create_test_policy_dict() policy_obj = self._create_test_policy(policy_dict) qos_policy_id = policy_dict['id'] - with mock.patch.object(resources_rpc.registry, 'get_info', + with mock.patch.object(resources_rpc.registry, 'pull', return_value=policy_obj): - self.callbacks.get_info( + self.callbacks.pull( self.context, resource_type=resources.QOS_POLICY, version=policy.QosPolicy.VERSION, resource_id=qos_policy_id) to_prim_mock.assert_called_with(target_version=None) @mock.patch.object(policy.QosPolicy, 'obj_to_primitive') - def test_get_info_backports_to_older_version(self, to_prim_mock): + def test_pull_backports_to_older_version(self, to_prim_mock): policy_dict = self._create_test_policy_dict() policy_obj = self._create_test_policy(policy_dict) qos_policy_id = policy_dict['id'] - with mock.patch.object(resources_rpc.registry, 'get_info', + with mock.patch.object(resources_rpc.registry, 'pull', return_value=policy_obj): - self.callbacks.get_info( + self.callbacks.pull( self.context, resource_type=resources.QOS_POLICY, version='0.9', # less than initial version 1.0 resource_id=qos_policy_id) diff --git a/neutron/tests/unit/services/qos/base.py b/neutron/tests/unit/services/qos/base.py new file mode 100644 index 00000000000..e731340bd76 --- /dev/null +++ b/neutron/tests/unit/services/qos/base.py @@ -0,0 +1,38 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.api.rpc.callbacks.consumer import registry as cons_registry +from neutron.api.rpc.callbacks.producer import registry as prod_registry +from neutron.api.rpc.callbacks import resource_manager +from neutron.tests import base + + +class BaseQosTestCase(base.BaseTestCase): + def setUp(self): + super(BaseQosTestCase, self).setUp() + + with mock.patch.object( + resource_manager.ResourceCallbacksManager, '_singleton', + new_callable=mock.PropertyMock(return_value=False)): + + self.cons_mgr = resource_manager.ConsumerResourceCallbacksManager() + self.prod_mgr = resource_manager.ProducerResourceCallbacksManager() + for mgr in (self.cons_mgr, self.prod_mgr): + mgr.clear() + + mock.patch.object( + cons_registry, '_get_manager', return_value=self.cons_mgr).start() + + mock.patch.object( + prod_registry, '_get_manager', return_value=self.prod_mgr).start() diff --git a/neutron/tests/unit/services/qos/notification_drivers/test_manager.py b/neutron/tests/unit/services/qos/notification_drivers/test_manager.py index 6f67fa605b9..efc1cbbbb03 100644 --- a/neutron/tests/unit/services/qos/notification_drivers/test_manager.py +++ b/neutron/tests/unit/services/qos/notification_drivers/test_manager.py @@ -14,12 +14,11 @@ import mock from oslo_config import cfg from neutron.api.rpc.callbacks import events -from neutron.api.rpc.callbacks import resources from neutron import context from neutron.objects.qos import policy as policy_object from neutron.services.qos.notification_drivers import manager as driver_mgr from neutron.services.qos.notification_drivers import message_queue -from neutron.tests import base +from neutron.tests.unit.services.qos import base DUMMY_DRIVER = ("neutron.tests.unit.services.qos.notification_drivers." "dummy.DummyQosServiceNotificationDriver") @@ -32,16 +31,12 @@ def _load_multiple_drivers(): "qos") -class TestQosDriversManager(base.BaseTestCase): +class TestQosDriversManagerBase(base.BaseQosTestCase): def setUp(self): - super(TestQosDriversManager, self).setUp() + super(TestQosDriversManagerBase, self).setUp() self.config_parse() self.setup_coreplugin() - self.registry_p = mock.patch( - 'neutron.api.rpc.callbacks.registry.notify') - self.registry_m = self.registry_p.start() - self.driver_manager = driver_mgr.QosServiceNotificationDriverManager() config = cfg.ConfigOpts() config.register_opts(driver_mgr.QOS_PLUGIN_OPTS, "qos") self.policy_data = {'policy': { @@ -56,17 +51,20 @@ class TestQosDriversManager(base.BaseTestCase): ctxt = None self.kwargs = {'context': ctxt} + +class TestQosDriversManager(TestQosDriversManagerBase): + + def setUp(self): + super(TestQosDriversManager, self).setUp() + self.driver_manager = driver_mgr.QosServiceNotificationDriverManager() + def _validate_registry_params(self, event_type, policy): - self.assertTrue(self.registry_m.called, policy) - self.registry_m.assert_called_with( - resources.QOS_POLICY, - event_type, - policy) + #TODO(QoS): actually validate the notification once implemented + pass def test_create_policy_default_configuration(self): #RPC driver should be loaded by default self.driver_manager.create_policy(self.policy) - self.assertFalse(self.registry_m.called) def test_update_policy_default_configuration(self): #RPC driver should be loaded by default @@ -78,9 +76,11 @@ class TestQosDriversManager(base.BaseTestCase): self.driver_manager.delete_policy(self.policy) self._validate_registry_params(events.DELETED, self.policy) + +class TestQosDriversManagerMulti(TestQosDriversManagerBase): + def _test_multi_drivers_configuration_op(self, op): _load_multiple_drivers() - # create a new manager with new configuration driver_manager = driver_mgr.QosServiceNotificationDriverManager() handler = '%s_policy' % op with mock.patch('.'.join([DUMMY_DRIVER, handler])) as dummy_mock: diff --git a/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py b/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py index a4f163f54b2..710451307a9 100644 --- a/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py +++ b/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py @@ -10,27 +10,20 @@ # License for the specific language governing permissions and limitations # under the License. -import mock - from neutron.api.rpc.callbacks import events -from neutron.api.rpc.callbacks import resources from neutron import context from neutron.objects.qos import policy as policy_object from neutron.objects.qos import rule as rule_object from neutron.services.qos.notification_drivers import message_queue -from neutron.tests import base +from neutron.tests.unit.services.qos import base DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2' -class TestQosRpcNotificationDriver(base.BaseTestCase): +class TestQosRpcNotificationDriver(base.BaseQosTestCase): def setUp(self): super(TestQosRpcNotificationDriver, self).setUp() - - registry_p = mock.patch( - 'neutron.api.rpc.callbacks.registry.notify') - self.registry_m = registry_p.start() self.driver = message_queue.RpcQosServiceNotificationDriver() self.policy_data = {'policy': { @@ -52,21 +45,18 @@ class TestQosRpcNotificationDriver(base.BaseTestCase): context, **self.rule_data['bandwidth_limit_rule']) - def _validate_registry_params(self, event_type, policy): - self.assertTrue(self.registry_m.called, policy) - self.registry_m.assert_called_once_with( - resources.QOS_POLICY, - event_type, - policy) + def _validate_push_params(self, event_type, policy): + # TODO(QoS): actually validate push works once implemented + pass def test_create_policy(self): self.driver.create_policy(self.policy) - self.assertFalse(self.registry_m.called) + self._validate_push_params(events.CREATED, self.policy) def test_update_policy(self): self.driver.update_policy(self.policy) - self._validate_registry_params(events.UPDATED, self.policy) + self._validate_push_params(events.UPDATED, self.policy) def test_delete_policy(self): self.driver.delete_policy(self.policy) - self._validate_registry_params(events.DELETED, self.policy) + self._validate_push_params(events.DELETED, self.policy) diff --git a/neutron/tests/unit/services/qos/test_qos_plugin.py b/neutron/tests/unit/services/qos/test_qos_plugin.py index 92ef36a0039..1f530512a19 100644 --- a/neutron/tests/unit/services/qos/test_qos_plugin.py +++ b/neutron/tests/unit/services/qos/test_qos_plugin.py @@ -13,8 +13,6 @@ import mock from oslo_config import cfg -from neutron.api.rpc.callbacks import events -from neutron.api.rpc.callbacks import resources from neutron.common import exceptions as n_exc from neutron import context from neutron import manager @@ -22,13 +20,13 @@ from neutron.objects import base as base_object from neutron.objects.qos import policy as policy_object from neutron.objects.qos import rule as rule_object from neutron.plugins.common import constants -from neutron.tests import base +from neutron.tests.unit.services.qos import base DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2' -class TestQosPlugin(base.BaseTestCase): +class TestQosPlugin(base.BaseQosTestCase): def setUp(self): super(TestQosPlugin, self).setUp() @@ -40,15 +38,18 @@ class TestQosPlugin(base.BaseTestCase): mock.patch('neutron.db.api.get_object').start() mock.patch( 'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start() - self.registry_p = mock.patch( - 'neutron.api.rpc.callbacks.registry.notify') - self.registry_m = self.registry_p.start() + cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS) cfg.CONF.set_override("service_plugins", ["qos"]) mgr = manager.NeutronManager.get_instance() self.qos_plugin = mgr.get_service_plugins().get( constants.QOS) + + self.notif_driver_p = mock.patch.object( + self.qos_plugin, 'notification_driver_manager') + self.notif_driver_m = self.notif_driver_p.start() + self.ctxt = context.Context('fake_user', 'fake_tenant') self.policy_data = { 'policy': {'id': 7777777, @@ -68,50 +69,48 @@ class TestQosPlugin(base.BaseTestCase): self.rule = rule_object.QosBandwidthLimitRule( context, **self.rule_data['bandwidth_limit_rule']) - def _validate_registry_params(self, event_type): - self.registry_m.assert_called_once_with( - resources.QOS_POLICY, - event_type, - mock.ANY) + def _validate_notif_driver_params(self, method_name): + method = getattr(self.notif_driver_m, method_name) + self.assertTrue(method.called) self.assertIsInstance( - self.registry_m.call_args[0][2], policy_object.QosPolicy) + method.call_args[0][0], policy_object.QosPolicy) def test_add_policy(self): self.qos_plugin.create_policy(self.ctxt, self.policy_data) - self.assertFalse(self.registry_m.called) + self._validate_notif_driver_params('create_policy') def test_update_policy(self): fields = base_object.get_updatable_fields( policy_object.QosPolicy, self.policy_data['policy']) self.qos_plugin.update_policy( self.ctxt, self.policy.id, {'policy': fields}) - self._validate_registry_params(events.UPDATED) + self._validate_notif_driver_params('update_policy') @mock.patch('neutron.db.api.get_object', return_value=None) def test_delete_policy(self, *mocks): self.qos_plugin.delete_policy(self.ctxt, self.policy.id) - self._validate_registry_params(events.DELETED) + self._validate_notif_driver_params('delete_policy') def test_create_policy_rule(self): with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id', return_value=self.policy): self.qos_plugin.create_policy_bandwidth_limit_rule( self.ctxt, self.policy.id, self.rule_data) - self._validate_registry_params(events.UPDATED) + self._validate_notif_driver_params('update_policy') def test_update_policy_rule(self): with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id', return_value=self.policy): self.qos_plugin.update_policy_bandwidth_limit_rule( self.ctxt, self.rule.id, self.policy.id, self.rule_data) - self._validate_registry_params(events.UPDATED) + self._validate_notif_driver_params('update_policy') def test_delete_policy_rule(self): with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id', return_value=self.policy): self.qos_plugin.delete_policy_bandwidth_limit_rule( self.ctxt, self.rule.id, self.policy.id) - self._validate_registry_params(events.UPDATED) + self._validate_notif_driver_params('update_policy') def test_get_policy_for_nonexistent_policy(self): with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',