Load the QoS notification driver from the configuration file

The agent based RPC notification driver for message queue is the default.

Added support for multiple notification drivers.

DocImpact

Partially-Implements: blueprint quantum-qos-api
Change-Id: I4108c3d111067d8217bc4112c05e1bde0125e0ef
This commit is contained in:
Ihar Hrachyshka 2015-07-27 16:25:24 +02:00
parent 01e9b77103
commit ec1e812e34
7 changed files with 221 additions and 10 deletions

View File

@ -0,0 +1,74 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from neutron.i18n import _LI
from neutron import manager
QOS_DRIVER_NAMESPACE = 'neutron.qos.service_notification_drivers'
QOS_PLUGIN_OPTS = [
cfg.ListOpt('service_notification_drivers',
default='message_queue',
help=_('Drivers list to use to send the update notification')),
]
cfg.CONF.register_opts(QOS_PLUGIN_OPTS, "qos")
LOG = logging.getLogger(__name__)
class QosServiceNotificationDriverManager(object):
def __init__(self):
self.notification_drivers = []
self._load_drivers(cfg.CONF.qos.service_notification_drivers)
def update_policy(self, qos_policy):
for driver in self.notification_drivers:
driver.update_policy(qos_policy)
def delete_policy(self, qos_policy):
for driver in self.notification_drivers:
driver.delete_policy(qos_policy)
def create_policy(self, qos_policy):
for driver in self.notification_drivers:
driver.create_policy(qos_policy)
def _load_drivers(self, notification_drivers):
"""Load all the instances of the configured QoS notification drivers
:param notification_drivers: comma separated string
"""
if not notification_drivers:
raise SystemExit(_('A QoS driver must be specified'))
LOG.debug("Loading QoS notification drivers: %s", notification_drivers)
for notification_driver in notification_drivers:
driver_ins = self._load_driver_instance(notification_driver)
self.notification_drivers.append(driver_ins)
def _load_driver_instance(self, notification_driver):
"""Returns an instance of the configured QoS notification driver
:returns: An instance of Driver for the QoS notification
"""
mgr = manager.NeutronManager
driver = mgr.load_class_for_provider(QOS_DRIVER_NAMESPACE,
notification_driver)
driver_instance = driver()
LOG.info(
_LI("Loading %(name)s (%(description)s) notification driver "
"for QoS plugin"),
{"name": notification_driver,
"description": driver_instance.get_description()})
return driver_instance

View File

@ -41,12 +41,13 @@ class RpcQosServiceNotificationDriver(
"""RPC message queue service notification driver for QoS."""
def __init__(self):
LOG.debug(
"Initializing RPC Messaging Queue notification driver for QoS")
rpc_registry.register_provider(
_get_qos_policy_cb,
resources.QOS_POLICY)
def get_description(self):
return "Message queue updates"
def create_policy(self, policy):
#No need to update agents on create
pass

View File

@ -18,6 +18,11 @@ import six
class QosServiceNotificationDriverBase(object):
"""QoS service notification driver base class."""
@abc.abstractmethod
def get_description(self):
"""Get the notification driver description.
"""
@abc.abstractmethod
def create_policy(self, policy):
"""Create the QoS policy."""

View File

@ -20,7 +20,7 @@ from neutron.extensions import qos
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.objects.qos import rule_type as rule_type_object
from neutron.services.qos.notification_drivers import message_queue
from neutron.services.qos.notification_drivers import manager as driver_mgr
LOG = logging.getLogger(__name__)
@ -38,27 +38,26 @@ class QoSPlugin(qos.QoSPluginBase):
def __init__(self):
super(QoSPlugin, self).__init__()
#TODO(QoS) load from configuration option
self.notification_driver = (
message_queue.RpcQosServiceNotificationDriver())
self.notification_driver_manager = (
driver_mgr.QosServiceNotificationDriverManager())
def create_policy(self, context, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.create()
self.notification_driver.create_policy(policy)
self.notification_driver_manager.create_policy(policy)
return policy.to_dict()
def update_policy(self, context, policy_id, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.id = policy_id
policy.update()
self.notification_driver.update_policy(policy)
self.notification_driver_manager.update_policy(policy)
return policy.to_dict()
def delete_policy(self, context, policy_id):
policy = policy_object.QosPolicy(context)
policy.id = policy_id
self.notification_driver.delete_policy(policy)
self.notification_driver_manager.delete_policy(policy)
policy.delete()
def _get_policy_obj(self, context, policy_id):
@ -66,7 +65,7 @@ class QoSPlugin(qos.QoSPluginBase):
def _update_policy_on_driver(self, context, policy_id):
policy = self._get_policy_obj(context, policy_id)
self.notification_driver.update_policy(policy)
self.notification_driver_manager.update_policy(policy)
@db_base_plugin_common.filter_fields
def get_policy(self, context, policy_id, fields=None):

View File

@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.services.qos.notification_drivers import qos_base
class DummyQosServiceNotificationDriver(
qos_base.QosServiceNotificationDriverBase):
"""Dummy service notification driver for QoS."""
def get_description(self):
return "Dummy"
def create_policy(self, policy):
pass
def update_policy(self, policy):
pass
def delete_policy(self, policy):
pass

View File

@ -0,0 +1,100 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron import context
from neutron.objects.qos import policy as policy_object
from neutron.services.qos.notification_drivers import manager as driver_mgr
from neutron.services.qos.notification_drivers import message_queue
from neutron.tests import base
DUMMY_DRIVER = ("neutron.tests.unit.services.qos.notification_drivers."
"dummy.DummyQosServiceNotificationDriver")
def _load_multiple_drivers():
cfg.CONF.set_override(
"service_notification_drivers",
["message_queue", DUMMY_DRIVER],
"qos")
class TestQosDriversManager(base.BaseTestCase):
def setUp(self):
super(TestQosDriversManager, self).setUp()
self.config_parse()
self.setup_coreplugin()
self.registry_p = mock.patch(
'neutron.api.rpc.callbacks.registry.notify')
self.registry_m = self.registry_p.start()
self.driver_manager = driver_mgr.QosServiceNotificationDriverManager()
config = cfg.ConfigOpts()
config.register_opts(driver_mgr.QOS_PLUGIN_OPTS, "qos")
self.policy_data = {'policy': {
'id': 7777777,
'tenant_id': 888888,
'name': 'test-policy',
'description': 'test policy description',
'shared': True}}
self.policy = policy_object.QosPolicy(context,
**self.policy_data['policy'])
ctxt = None
self.kwargs = {'context': ctxt}
def _validate_registry_params(self, event_type, policy):
self.assertTrue(self.registry_m.called, policy)
self.registry_m.assert_called_with(
resources.QOS_POLICY,
event_type,
policy)
def test_create_policy_default_configuration(self):
#RPC driver should be loaded by default
self.driver_manager.create_policy(self.policy)
self.assertFalse(self.registry_m.called)
def test_update_policy_default_configuration(self):
#RPC driver should be loaded by default
self.driver_manager.update_policy(self.policy)
self._validate_registry_params(events.UPDATED, self.policy)
def test_delete_policy_default_configuration(self):
#RPC driver should be loaded by default
self.driver_manager.delete_policy(self.policy)
self._validate_registry_params(events.DELETED, self.policy)
def _test_multi_drivers_configuration_op(self, op):
_load_multiple_drivers()
# create a new manager with new configuration
driver_manager = driver_mgr.QosServiceNotificationDriverManager()
handler = '%s_policy' % op
with mock.patch('.'.join([DUMMY_DRIVER, handler])) as dummy_mock:
rpc_driver = message_queue.RpcQosServiceNotificationDriver
with mock.patch.object(rpc_driver, handler) as rpc_mock:
getattr(driver_manager, handler)(self.policy)
for mock_ in (dummy_mock, rpc_mock):
mock_.assert_called_with(self.policy)
def test_multi_drivers_configuration_create(self):
self._test_multi_drivers_configuration_op('create')
def test_multi_drivers_configuration_update(self):
self._test_multi_drivers_configuration_op('update')
def test_multi_drivers_configuration_delete(self):
self._test_multi_drivers_configuration_op('delete')

View File

@ -155,6 +155,8 @@ neutron.service_providers =
# These are for backwards compat with Juno vpnaas service provider configuration values
neutron.services.vpn.service_drivers.cisco_ipsec.CiscoCsrIPsecVPNDriver = neutron_vpnaas.services.vpn.service_drivers.cisco_ipsec:CiscoCsrIPsecVPNDriver
neutron.services.vpn.service_drivers.ipsec.IPsecVPNDriver = neutron_vpnaas.services.vpn.service_drivers.ipsec:IPsecVPNDriver
neutron.qos.service_notification_drivers =
message_queue = neutron.services.qos.notification_drivers.message_queue:RpcQosServiceNotificationDriver
neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver