diff --git a/doc/source/devref/index.rst b/doc/source/devref/index.rst index e00b1d891f2..e7bc843796b 100644 --- a/doc/source/devref/index.rst +++ b/doc/source/devref/index.rst @@ -46,6 +46,7 @@ Neutron Internals plugin-api db_layer rpc_api + rpc_callbacks layer3 l2_agents quality_of_service diff --git a/doc/source/devref/rpc_callbacks.rst b/doc/source/devref/rpc_callbacks.rst new file mode 100644 index 00000000000..01bc9b6c9c6 --- /dev/null +++ b/doc/source/devref/rpc_callbacks.rst @@ -0,0 +1,229 @@ +================================= +Neutron Messaging Callback System +================================= + +Neutron already has a callback system [link-to: callbacks.rst] for +in-process resource callbacks where publishers and subscribers are able +to publish, subscribe and extend resources. + +This system is different, and is intended to be used for inter-process +callbacks, via the messaging fanout mechanisms. + +In Neutron, agents may need to subscribe to specific resource details which +may change over time. And the purpose of this messaging callback system +is to allow agent subscription to those resources without the need to extend +modify existing RPC calls, or creating new RPC messages. + +A few resource which can benefit of this system: + +* security groups members +* security group rules, +* QoS policies. + +Using a remote publisher/subscriber pattern, the information about such +resources could be published using fanout queues to all interested nodes, +minimizing messaging requests from agents to server since the agents +get subscribed for their whole lifecycle (unless they unsubscribe). + +Within an agent, there could be multiple subscriber callbacks to the same +resource events, the resources updates would be dispatched to the subscriber +callbacks from a single message. Any update would come in a single message, +doing only a single oslo versioned objects deserialization on each receiving +agent. + +This publishing/subscription mechanism is highly dependent on the format +of the resources passed around. This is why the library only allows +versioned objects to be published and subscribed. Oslo versioned objects +allow object version down/up conversion. #[vo_mkcompat]_ #[vo_mkcptests]_ + +For the VO's versioning schema look here: #[vo_versioning]_ + + + +versioned_objects serialization/deserialization with the +obj_to_primitive(target_version=..) and primitive_to_obj() #[ov_serdes]_ +methods is used internally to convert/retrieve objects before/after messaging. + +Considering rolling upgrades, there are several scenarios to look at: + +* publisher (generally neutron-server or a service) and subscriber (agent) + know the same version of the objects, so they serialize, and deserialize + without issues. + +* publisher knows (and sends) an older version of the object, subscriber + will get the object updated to latest version on arrival before any + callback is called. + +* publisher sends a newer version of the object, subscriber won't be able + to deserialize the object, in this case (PLEASE DISCUSS), we can think of two + strategies: + +a) During upgrades, we pin neutron-server to a compatible version for resource + fanout updates, and server sends both the old, and the newer version to + different topic, queues. Old agents receive the updates on the old version + topic, new agents receive updates on the new version topic. + When the whole system upgraded, we un-pin the compatible version fanout. + + A variant of this could be using a single fanout queue, and sending the + pinned version of the object to all. Newer agents can deserialize to the + latest version and upgrade any fields internally. Again at the end, we + unpin the version and restart the service. + +b) The subscriber will rpc call the publisher to start publishing also a downgraded + version of the object on every update on a separate queue. The complication + of this version, is the need to ignore new version objects as long as we keep + receiving the downgraded ones, and otherwise resend the request to send the + downgraded objects after a certain timeout (thinking of the case where the + request for downgraded queue is done, but the publisher restarted). + This approach is more complicated to implement, but more automated from the + administrator point of view. We may want to look into it as a second step + from a + +c) The subscriber will send a registry.get_info for the latest specific version + he knows off. This can have scalability issues during upgrade as any outdated + agent will require a flow of two messages (request, and response). This is + indeed very bad at scale if you have hundreds or thousands of agents. + +Option a seems like a reasonable strategy, similar to what nova does now with +versioned objects. + +Serialized versioned objects look like:: + + {'versioned_object.version': '1.0', + 'versioned_object.name': 'QoSProfile', + 'versioned_object.data': {'rules': [ + {'versioned_object.version': '1.0', + 'versioned_object.name': 'QoSRule', + 'versioned_object.data': {'name': u'a'}, + 'versioned_object.namespace': 'versionedobjects'} + ], + 'uuid': u'abcde', + 'name': u'aaa'}, + 'versioned_object.namespace': 'versionedobjects'} + +Topic names for the fanout queues +================================= + +if we adopted option a: +neutron-_- +[neutron-_-] + +if we adopted option b for rolling upgrades: +neutron-- +neutron--- + +for option c, just: +neutron-- + +Subscribing to resources +======================== + +Imagine that you have agent A, which just got to handle a new port, which +has an associated security group, and QoS policy. + +The agent code processing port updates may look like:: + + from neutron.rpc_resources import events + from neutron.rpc_resources import resources + from neutron.rpc_resources import registry + + + def process_resource_updates(resource_type, resource_id, resource_list, action_type): + + # send to the right handler which will update any control plane + # details related to the updated resource... + + + def port_update(...): + + # here we extract sg_id and qos_policy_id from port.. + + registry.subscribe(resources.SG_RULES, sg_id, + callback=process_resource_updates) + sg_rules = registry.get_info(resources.SG_RULES, sg_id) + + registry.subscribe(resources.SG_MEMBERS, sg_id, + callback=process_resource_updates) + sg_members = registry.get_info(resources.SG_MEMBERS, sg_id) + + registry.subscribe(resources.QOS_RULES, qos_policy_id, + callback=process_resource_updates) + qos_rules = registry.get_info(resources.QOS_RULES, qos_policy_id, + callback=process_resource_updates) + + cleanup_subscriptions() + + + def cleanup_subscriptions() + sg_ids = determine_unreferenced_sg_ids() + qos_policy_id = determine_unreferenced_qos_policy_ids() + registry.unsubscribe_info(resource.SG_RULES, sg_ids) + registry.unsubscribe_info(resource.SG_MEMBERS, sg_ids) + registry.unsubscribe_info(resource.QOS_RULES, qos_policy_id) + +Another unsubscription strategy could be to lazily unsubscribe resources when +we receive updates for them, and we discover that they are not needed anymore. + +Deleted resources are automatically unsubscribed as we receive the delete event. + +NOTE(irenab): this could be extended to core resources like ports, making use +of the standard neutron in-process callbacks at server side and propagating +AFTER_UPDATE events, for example, but we may need to wait until those callbacks +are used with proper versioned objects. + + +Unsubscribing to resources +========================== + +There are a few options to unsubscribe registered callbacks: + +* unsubscribe_resource_id(): it selectively unsubscribes an specific + resource type + id. +* unsubscribe_resource_type(): it unsubscribes from an specific resource type, + any ID. +* unsubscribe_all(): it unsubscribes all subscribed resources and ids. + + +Sending resource updates +======================== + +On the server side, resource updates could come from anywhere, a service plugin, +an extension, anything that updates the resource and that it's of any interest +to the agents. + +The server/publisher side may look like:: + + from neutron.rpc_resources import events + from neutron.rpc_resources import resources + from neutron.rpc_resources import registry as rpc_registry + + def add_qos_x_rule(...): + update_the_db(...) + send_rpc_updates_on_qos_policy(qos_policy_id) + + def del_qos_x_rule(...): + update_the_db(...) + send_rpc_deletion_of_qos_policy(qos_policy_id) + + def send_rpc_updates_on_qos_policy(qos_policy_id): + rules = get_qos_policy_rules_versioned_object(qos_policy_id) + rpc_registry.notify(resources.QOS_RULES, qos_policy_id, rules, events.UPDATE) + + def send_rpc_deletion_of_qos_policy(qos_policy_id): + rpc_registry.notify(resources.QOS_RULES, qos_policy_id, None, events.DELETE) + + # This part is added for the registry mechanism, to be able to request + # older versions of the notified objects if any oudated agent requires + # them. + def retrieve_older_version_callback(qos_policy_id, version): + return get_qos_policy_rules_versioned_object(qos_policy_id, version) + + rpc_registry.register_retrieve_callback(resource.QOS_RULES, + retrieve_older_version_callback) + +References +========== +.. [#ov_serdes] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L621 +.. [#vo_mkcompat] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L460 +.. [#vo_mkcptests] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/tests/test_objects.py#L111 +.. [#vo_versioning] https://github.com/openstack/oslo.versionedobjects/blob/master/oslo_versionedobjects/base.py#L236 diff --git a/neutron/api/rpc/callbacks/__init__.py b/neutron/api/rpc/callbacks/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/api/rpc/callbacks/events.py b/neutron/api/rpc/callbacks/events.py new file mode 100644 index 00000000000..ff8193d9ed1 --- /dev/null +++ b/neutron/api/rpc/callbacks/events.py @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +UPDATED = 'updated' +DELETED = 'deleted' + +VALID = ( + UPDATED, + DELETED +) diff --git a/neutron/api/rpc/callbacks/registry.py b/neutron/api/rpc/callbacks/registry.py new file mode 100644 index 00000000000..fcf663e5d76 --- /dev/null +++ b/neutron/api/rpc/callbacks/registry.py @@ -0,0 +1,68 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.api.rpc.callbacks import resource_manager + +# TODO(ajo): consider adding locking +CALLBACK_MANAGER = None + + +def _get_resources_callback_manager(): + global CALLBACK_MANAGER + if CALLBACK_MANAGER is None: + CALLBACK_MANAGER = resource_manager.ResourcesCallbacksManager() + return CALLBACK_MANAGER + + +#resource implementation callback registration functions +def get_info(resource_type, resource_id, **kwargs): + """Get information about resource type with resource id. + + The function will check the providers for an specific remotable + resource and get the resource. + + :returns: an oslo versioned object. + """ + callback = _get_resources_callback_manager().get_callback(resource_type) + if callback: + return callback(resource_type, resource_id, **kwargs) + + +def register_provider(callback, resource_type): + _get_resources_callback_manager().register(callback, resource_type) + + +# resource RPC callback for pub/sub +#Agent side +def subscribe(callback, resource_type, resource_id): + #TODO(QoS): we have to finish the real update notifications + raise NotImplementedError("we should finish update notifications") + + +def unsubscribe(callback, resource_type, resource_id): + #TODO(QoS): we have to finish the real update notifications + raise NotImplementedError("we should finish update notifications") + + +def unsubscribe_all(): + #TODO(QoS): we have to finish the real update notifications + raise NotImplementedError("we should finish update notifications") + + +#Server side +def notify(resource_type, event, obj): + #TODO(QoS): we have to finish the real update notifications + raise NotImplementedError("we should finish update notifications") + + +def clear(): + _get_resources_callback_manager().clear() diff --git a/neutron/api/rpc/callbacks/resource_manager.py b/neutron/api/rpc/callbacks/resource_manager.py new file mode 100644 index 00000000000..02e940f93e3 --- /dev/null +++ b/neutron/api/rpc/callbacks/resource_manager.py @@ -0,0 +1,69 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections + +from oslo_log import log as logging + +from neutron.api.rpc.callbacks import resources +from neutron.callbacks import exceptions + +LOG = logging.getLogger(__name__) + + +class ResourcesCallbacksManager(object): + """A callback system that allows information providers in a loose manner. + """ + + def __init__(self): + self.clear() + + def register(self, callback, resource): + """register callback for a resource . + + One callback can be register to a resource + + :param callback: the callback. It must raise or return a dict. + :param resource: the resource. It must be a valid resource. + """ + LOG.debug("register: %(callback)s %(resource)s", + {'callback': callback, 'resource': resource}) + if resource not in resources.VALID: + raise exceptions.Invalid(element='resource', value=resource) + + self._callbacks[resource] = callback + + def unregister(self, resource): + """Unregister callback from the registry. + + :param callback: the callback. + :param resource: the resource. + """ + LOG.debug("Unregister: %(resource)s", + {'resource': resource}) + if resource not in resources.VALID: + raise exceptions.Invalid(element='resource', value=resource) + self._callbacks[resource] = None + + def clear(self): + """Brings the manager to a clean slate.""" + self._callbacks = collections.defaultdict(dict) + + def get_callback(self, resource): + """Return the callback if found, None otherwise. + + :param resource: the resource. It must be a valid resource. + """ + if resource not in resources.VALID: + raise exceptions.Invalid(element='resource', value=resource) + + return self._callbacks[resource] diff --git a/neutron/api/rpc/callbacks/resources.py b/neutron/api/rpc/callbacks/resources.py new file mode 100644 index 00000000000..027dde2a16a --- /dev/null +++ b/neutron/api/rpc/callbacks/resources.py @@ -0,0 +1,19 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +QOS_POLICY = 'qos-policy' +QOS_RULE = 'qos-rule' + +VALID = ( + QOS_POLICY, + QOS_RULE, +) diff --git a/neutron/services/qos/qos_plugin.py b/neutron/services/qos/qos_plugin.py index bc866ae01b1..a60abcc7237 100644 --- a/neutron/services/qos/qos_plugin.py +++ b/neutron/services/qos/qos_plugin.py @@ -13,7 +13,81 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron import manager + +from neutron.api.rpc.callbacks import registry as rpc_registry +from neutron.api.rpc.callbacks import resources from neutron.extensions import qos +from neutron.i18n import _LW +from neutron.plugins.common import constants + +from oslo_log import log as logging + + +LOG = logging.getLogger(__name__) + + +#TODO(QoS): remove this stub when db is ready +def _get_qos_policy_cb_stub(resource, policy_id, **kwargs): + """Hardcoded stub for testing until we get the db working.""" + qos_policy = { + "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04", + "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4", + "name": "10Mbit", + "description": "This policy limits the ports to 10Mbit max.", + "shared": False, + "rules": [{ + "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793", + "max_kbps": "10000", + "max_burst_kbps": "0", + "type": "bandwidth_limit" + }] + } + return qos_policy + + +def _get_qos_policy_cb(resource, policy_id, **kwargs): + qos_plugin = manager.NeutronManager.get_service_plugins().get( + constants.QOS) + context = kwargs.get('context') + if context is None: + LOG.warning(_LW( + 'Received %(resource)s %(policy_id)s without context'), + {'resource': resource, 'policy_id': policy_id} + ) + return + + qos_policy = qos_plugin.get_qos_policy(context, policy_id) + return qos_policy + + +#TODO(QoS): remove this stub when db is ready +def _get_qos_bandwidth_limit_rule_cb_stub(resource, rule_id, **kwargs): + """Hardcoded for testing until we get the db working.""" + bandwidth_limit = { + "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793", + "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4", + "max_kbps": "10000", + "max_burst_kbps": "0", + } + return bandwidth_limit + + +def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs): + qos_plugin = manager.NeutronManager.get_service_plugins().get( + constants.QOS) + context = kwargs.get('context') + if context is None: + LOG.warning(_LW( + 'Received %(resource)s %(rule_id,)s without context '), + {'resource': resource, 'rule_id,': rule_id} + ) + return + + bandwidth_limit = qos_plugin.get_qos_bandwidth_limit_rule( + context, + rule_id) + return bandwidth_limit class QoSPlugin(qos.QoSPluginBase): @@ -28,16 +102,31 @@ class QoSPlugin(qos.QoSPluginBase): def __init__(self): super(QoSPlugin, self).__init__() - #self.register_rpc() + self.register_resource_providers() #self.register_port_callbacks() #self.register_net_callbacks() + self._inline_test() - def register_rpc(self): - # RPC support - # TODO(ajo): register ourselves to the generic RPC framework - # so we will provide QoS information for ports and - # networks. - pass + def _inline_test(self): + #TODO(gampel) remove inline unitesting + self.ctx = None + kwargs = {'context': self.ctx} + qos_policy = rpc_registry.get_info( + resources.QOS_POLICY, + "46ebaec0-0570-43ac-82f6-60d2b03168c4", + **kwargs) + + LOG.debug("qos_policy test : %s)", + qos_policy) + + def register_resource_providers(self): + rpc_registry.register_provider( + _get_qos_bandwidth_limit_rule_cb_stub, + resources.QOS_RULE) + + rpc_registry.register_provider( + _get_qos_policy_cb_stub, + resources.QOS_POLICY) def register_port_callbacks(self): # TODO(qos): Register the callbacks to properly manage diff --git a/neutron/tests/unit/api/rpc/callbacks/__init__.py b/neutron/tests/unit/api/rpc/callbacks/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py b/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py new file mode 100644 index 00000000000..f68e02da7ff --- /dev/null +++ b/neutron/tests/unit/api/rpc/callbacks/test_resource_manager.py @@ -0,0 +1,78 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from neutron.api.rpc.callbacks import registry as rpc_registry +from neutron.api.rpc.callbacks import resources + + +from neutron.tests import base + + +class ResourcesCallbackRequestTestCase(base.BaseTestCase): + + def setUp(self): + super(ResourcesCallbackRequestTestCase, self).setUp() + self.resource_id = '46ebaec0-0570-43ac-82f6-60d2b03168c4' + self.qos_rule_id = '5f126d84-551a-4dcf-bb01-0e9c0df0c793' + + def test_resource_callback_request(self): + + #TODO(QoS) convert it to the version object format + def _get_qos_policy_cb(resource, policy_id, **kwargs): + qos_policy = { + "tenant_id": "8d4c70a21fed4aeba121a1a429ba0d04", + "id": "46ebaec0-0570-43ac-82f6-60d2b03168c4", + "name": "10Mbit", + "description": "This policy limits the ports to 10Mbit max.", + "shared": False, + "rules": [{ + "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793", + "max_kbps": "10000", + "max_burst_kbps": "0", + "type": "bnadwidth_limit" + }] + } + return qos_policy + + #TODO(QoS) convert it to the version object format + def _get_qos_bandwidth_limit_rule_cb(resource, rule_id, **kwargs): + bandwidth_limit = { + "id": "5f126d84-551a-4dcf-bb01-0e9c0df0c793", + "qos_policy_id": "46ebaec0-0570-43ac-82f6-60d2b03168c4", + "max_kbps": "10000", + "max_burst_kbps": "0", + } + return bandwidth_limit + + rpc_registry.register_provider( + _get_qos_bandwidth_limit_rule_cb, + resources.QOS_RULE) + + rpc_registry.register_provider( + _get_qos_policy_cb, + resources.QOS_POLICY) + + self.ctx = None + kwargs = {'context': self.ctx} + + qos_policy = rpc_registry.get_info( + resources.QOS_POLICY, + self.resource_id, + **kwargs) + self.assertEqual(self.resource_id, qos_policy['id']) + + qos_rule = rpc_registry.get_info( + resources.QOS_RULE, + self.qos_rule_id, + **kwargs) + self.assertEqual(self.qos_rule_id, qos_rule['id'])