From 01e9b771031bd83c12f5ca2f1725927e70050763 Mon Sep 17 00:00:00 2001 From: Eran Gampel Date: Wed, 1 Jul 2015 18:32:30 +0300 Subject: [PATCH] Add pluggable backend driver for QoS Service notification Added a reference driver for the agent based solutions RPC sending the messages over the message queue Partially-Implements: blueprint quantum-qos-api Change-Id: I725c876739ff85b4db8fb053de0362ce367ae78c --- .../qos/notification_drivers/__init__.py | 0 .../qos/notification_drivers/message_queue.py | 70 ++++++++++++ .../qos/notification_drivers/qos_base.py | 37 +++++++ neutron/services/qos/qos_plugin.py | 52 ++++----- .../qos/notification_drivers/__init__.py | 0 .../test_message_queue.py | 72 ++++++++++++ .../unit/services/qos/test_qos_plugin.py | 103 ++++++++++++++++++ 7 files changed, 303 insertions(+), 31 deletions(-) create mode 100644 neutron/services/qos/notification_drivers/__init__.py create mode 100644 neutron/services/qos/notification_drivers/message_queue.py create mode 100644 neutron/services/qos/notification_drivers/qos_base.py create mode 100644 neutron/tests/unit/services/qos/notification_drivers/__init__.py create mode 100644 neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py create mode 100644 neutron/tests/unit/services/qos/test_qos_plugin.py diff --git a/neutron/services/qos/notification_drivers/__init__.py b/neutron/services/qos/notification_drivers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/services/qos/notification_drivers/message_queue.py b/neutron/services/qos/notification_drivers/message_queue.py new file mode 100644 index 00000000000..2cce2746ad2 --- /dev/null +++ b/neutron/services/qos/notification_drivers/message_queue.py @@ -0,0 +1,70 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import log as logging + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import registry as rpc_registry +from neutron.api.rpc.callbacks import resources +from neutron.i18n import _LW +from neutron.objects.qos import policy as policy_object +from neutron.services.qos.notification_drivers import qos_base + + +LOG = logging.getLogger(__name__) + + +def _get_qos_policy_cb(resource, policy_id, **kwargs): + context = kwargs.get('context') + if context is None: + LOG.warning(_LW( + 'Received %(resource)s %(policy_id)s without context'), + {'resource': resource, 'policy_id': policy_id} + ) + return + + policy = policy_object.QosPolicy.get_by_id(context, policy_id) + return policy + + +class RpcQosServiceNotificationDriver( + qos_base.QosServiceNotificationDriverBase): + """RPC message queue service notification driver for QoS.""" + + def __init__(self): + LOG.debug( + "Initializing RPC Messaging Queue notification driver for QoS") + rpc_registry.register_provider( + _get_qos_policy_cb, + resources.QOS_POLICY) + + def create_policy(self, policy): + #No need to update agents on create + pass + + def update_policy(self, policy): + # TODO(QoS): this is temporary until we get notify() implemented + try: + rpc_registry.notify(resources.QOS_POLICY, + events.UPDATED, + policy) + except NotImplementedError: + pass + + def delete_policy(self, policy): + # TODO(QoS): this is temporary until we get notify() implemented + try: + rpc_registry.notify(resources.QOS_POLICY, + events.DELETED, + policy) + except NotImplementedError: + pass diff --git a/neutron/services/qos/notification_drivers/qos_base.py b/neutron/services/qos/notification_drivers/qos_base.py new file mode 100644 index 00000000000..86d792c06e7 --- /dev/null +++ b/neutron/services/qos/notification_drivers/qos_base.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class QosServiceNotificationDriverBase(object): + """QoS service notification driver base class.""" + + @abc.abstractmethod + def create_policy(self, policy): + """Create the QoS policy.""" + + @abc.abstractmethod + def update_policy(self, policy): + """Update the QoS policy. + + Apply changes to the QoS policy. + """ + + @abc.abstractmethod + def delete_policy(self, policy): + """Delete the QoS policy. + + Remove all rules for this policy and free up all the resources. + """ diff --git a/neutron/services/qos/qos_plugin.py b/neutron/services/qos/qos_plugin.py index fb84aa9de15..92d58131b1a 100644 --- a/neutron/services/qos/qos_plugin.py +++ b/neutron/services/qos/qos_plugin.py @@ -12,40 +12,20 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -from neutron import manager - -from neutron.api.rpc.callbacks import registry as rpc_registry -from neutron.api.rpc.callbacks import resources as rpc_resources -from neutron.db import db_base_plugin_common -from neutron.extensions import qos -from neutron.i18n import _LW -from neutron.objects.qos import policy as policy_object -from neutron.objects.qos import rule as rule_object -from neutron.objects.qos import rule_type as rule_type_object -from neutron.plugins.common import constants - from oslo_log import log as logging +from neutron.db import db_base_plugin_common +from neutron.extensions import qos +from neutron.objects.qos import policy as policy_object +from neutron.objects.qos import rule as rule_object +from neutron.objects.qos import rule_type as rule_type_object +from neutron.services.qos.notification_drivers import message_queue + + LOG = logging.getLogger(__name__) -def _get_qos_policy_cb(resource_type, policy_id, **kwargs): - qos_plugin = manager.NeutronManager.get_service_plugins().get( - constants.QOS) - context = kwargs.get('context') - if context is None: - LOG.warning(_LW( - 'Received %(resource_type)s %(policy_id)s without context'), - {'resource_type': resource_type, 'policy_id': policy_id} - ) - return - - qos_policy = qos_plugin.get_qos_policy(context, policy_id) - return qos_policy - - class QoSPlugin(qos.QoSPluginBase): """Implementation of the Neutron QoS Service Plugin. @@ -58,29 +38,36 @@ class QoSPlugin(qos.QoSPluginBase): def __init__(self): super(QoSPlugin, self).__init__() - rpc_registry.register_provider( - _get_qos_policy_cb, - rpc_resources.QOS_POLICY) + #TODO(QoS) load from configuration option + self.notification_driver = ( + message_queue.RpcQosServiceNotificationDriver()) def create_policy(self, context, policy): policy = policy_object.QosPolicy(context, **policy['policy']) policy.create() + self.notification_driver.create_policy(policy) return policy.to_dict() def update_policy(self, context, policy_id, policy): policy = policy_object.QosPolicy(context, **policy['policy']) policy.id = policy_id policy.update() + self.notification_driver.update_policy(policy) return policy.to_dict() def delete_policy(self, context, policy_id): policy = policy_object.QosPolicy(context) policy.id = policy_id + self.notification_driver.delete_policy(policy) policy.delete() def _get_policy_obj(self, context, policy_id): return policy_object.QosPolicy.get_by_id(context, policy_id) + def _update_policy_on_driver(self, context, policy_id): + policy = self._get_policy_obj(context, policy_id) + self.notification_driver.update_policy(policy) + @db_base_plugin_common.filter_fields def get_policy(self, context, policy_id, fields=None): return self._get_policy_obj(context, policy_id).to_dict() @@ -107,6 +94,7 @@ class QoSPlugin(qos.QoSPluginBase): context, qos_policy_id=policy_id, **bandwidth_limit_rule['bandwidth_limit_rule']) rule.create() + self._update_policy_on_driver(context, policy_id) return rule.to_dict() def update_policy_bandwidth_limit_rule(self, context, rule_id, policy_id, @@ -115,12 +103,14 @@ class QoSPlugin(qos.QoSPluginBase): context, **bandwidth_limit_rule['bandwidth_limit_rule']) rule.id = rule_id rule.update() + self._update_policy_on_driver(context, policy_id) return rule.to_dict() def delete_policy_bandwidth_limit_rule(self, context, rule_id, policy_id): rule = rule_object.QosBandwidthLimitRule(context) rule.id = rule_id rule.delete() + self._update_policy_on_driver(context, policy_id) @db_base_plugin_common.filter_fields def get_policy_bandwidth_limit_rule(self, context, rule_id, diff --git a/neutron/tests/unit/services/qos/notification_drivers/__init__.py b/neutron/tests/unit/services/qos/notification_drivers/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py b/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py new file mode 100644 index 00000000000..a4f163f54b2 --- /dev/null +++ b/neutron/tests/unit/services/qos/notification_drivers/test_message_queue.py @@ -0,0 +1,72 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import resources +from neutron import context +from neutron.objects.qos import policy as policy_object +from neutron.objects.qos import rule as rule_object +from neutron.services.qos.notification_drivers import message_queue +from neutron.tests import base + +DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2' + + +class TestQosRpcNotificationDriver(base.BaseTestCase): + + def setUp(self): + super(TestQosRpcNotificationDriver, self).setUp() + + registry_p = mock.patch( + 'neutron.api.rpc.callbacks.registry.notify') + self.registry_m = registry_p.start() + self.driver = message_queue.RpcQosServiceNotificationDriver() + + self.policy_data = {'policy': { + 'id': 7777777, + 'tenant_id': 888888, + 'name': 'testi-policy', + 'description': 'test policyi description', + 'shared': True}} + + self.rule_data = {'bandwidth_limit_rule': { + 'id': 7777777, + 'max_kbps': 100, + 'max_burst_kbps': 150}} + + self.policy = policy_object.QosPolicy(context, + **self.policy_data['policy']) + + self.rule = rule_object.QosBandwidthLimitRule( + context, + **self.rule_data['bandwidth_limit_rule']) + + def _validate_registry_params(self, event_type, policy): + self.assertTrue(self.registry_m.called, policy) + self.registry_m.assert_called_once_with( + resources.QOS_POLICY, + event_type, + policy) + + def test_create_policy(self): + self.driver.create_policy(self.policy) + self.assertFalse(self.registry_m.called) + + def test_update_policy(self): + self.driver.update_policy(self.policy) + self._validate_registry_params(events.UPDATED, self.policy) + + def test_delete_policy(self): + self.driver.delete_policy(self.policy) + self._validate_registry_params(events.DELETED, self.policy) diff --git a/neutron/tests/unit/services/qos/test_qos_plugin.py b/neutron/tests/unit/services/qos/test_qos_plugin.py new file mode 100644 index 00000000000..d4927b67778 --- /dev/null +++ b/neutron/tests/unit/services/qos/test_qos_plugin.py @@ -0,0 +1,103 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_config import cfg + +from neutron.api.rpc.callbacks import events +from neutron.api.rpc.callbacks import resources +from neutron import context +from neutron import manager +from neutron.objects.qos import policy as policy_object +from neutron.objects.qos import rule as rule_object +from neutron.plugins.common import constants +from neutron.tests import base + + +DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2' + + +class TestQosPlugin(base.BaseTestCase): + + def setUp(self): + super(TestQosPlugin, self).setUp() + self.setup_coreplugin() + + mock.patch('neutron.db.api.create_object').start() + mock.patch('neutron.db.api.update_object').start() + mock.patch('neutron.db.api.delete_object').start() + mock.patch('neutron.db.api.get_object').start() + mock.patch( + 'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start() + self.registry_p = mock.patch( + 'neutron.api.rpc.callbacks.registry.notify') + self.registry_m = self.registry_p.start() + cfg.CONF.set_override("core_plugin", DB_PLUGIN_KLASS) + cfg.CONF.set_override("service_plugins", ["qos"]) + + mgr = manager.NeutronManager.get_instance() + self.qos_plugin = mgr.get_service_plugins().get( + constants.QOS) + self.ctxt = context.Context('fake_user', 'fake_tenant') + self.policy_data = { + 'policy': {'id': 7777777, + 'tenant_id': 888888, + 'name': 'test-policy', + 'description': 'Test policy description', + 'shared': True}} + + self.rule_data = { + 'bandwidth_limit_rule': {'id': 7777777, + 'max_kbps': 100, + 'max_burst_kbps': 150}} + + self.policy = policy_object.QosPolicy( + context, **self.policy_data['policy']) + + self.rule = rule_object.QosBandwidthLimitRule( + context, **self.rule_data['bandwidth_limit_rule']) + + def _validate_registry_params(self, event_type): + self.registry_m.assert_called_once_with( + resources.QOS_POLICY, + event_type, + mock.ANY) + self.assertIsInstance( + self.registry_m.call_args[0][2], policy_object.QosPolicy) + + def test_qos_plugin_add_policy(self): + self.qos_plugin.create_policy(self.ctxt, self.policy_data) + self.assertFalse(self.registry_m.called) + + def test_qos_plugin_update_policy(self): + self.qos_plugin.update_policy( + self.ctxt, self.policy.id, self.policy_data) + self._validate_registry_params(events.UPDATED) + + def test_qos_plugin_delete_policy(self): + self.qos_plugin.delete_policy(self.ctxt, self.policy.id) + self._validate_registry_params(events.DELETED) + + def test_qos_plugin_create_policy_rule(self): + self.qos_plugin.create_policy_bandwidth_limit_rule( + self.ctxt, self.policy.id, self.rule_data) + self._validate_registry_params(events.UPDATED) + + def test_qos_plugin_update_policy_rule(self): + self.qos_plugin.update_policy_bandwidth_limit_rule( + self.ctxt, self.rule.id, self.policy.id, self.rule_data) + self._validate_registry_params(events.UPDATED) + + def test_qos_plugin_delete_policy_rule(self): + self.qos_plugin.delete_policy_bandwidth_limit_rule( + self.ctxt, self.rule.id, self.policy.id) + self._validate_registry_params(events.UPDATED)