Handle qos_policy on network/port create/update

Added handling for qos_policy_id field in the network and port
entities via ML2 extension driver.
The QoS profile will be associated to the network/port when requested as
part of the entity creation or update.

Allow ML2 extension manager to not register for any api extension
(new use case).

===

Extend the resources using the QoS extension class

Since the QoS extension for plugins is handles by this class, it makes
sense for it to handle also property extension of resources.

For ML2 this means that that extend_{network,port}_dict functions will
handle the extension of resources by calling QosExtensionHandler.
This logic can easily be reused by other plugins.

Note: we should make sure that resource extension does not require db
access, otherwise we see DBDeadLock errors and random tempest failures.
To achieve this, we define a new SQLAlchemy joined relationship on
policy bindings to make networks and ports receive those bindings on
their fetch from database. After that, the only work to do left for
resource extension handler is to copy the fetched policy into resource
dictionary.

===

Also enable new qos ml2 extension until we configure it in gate via
project-config and devstack-gate to make sure it's enabled and tested.

Co-Authored-By: Ihar Hrachyshka <ihrachys@redhat.com>
Partially-implements: blueprint quantum-qos-api
Change-Id: I1b7d4611215a471d5c24eb3d7208dcddb7e015f4
This commit is contained in:
Mike Kolesnik 2015-07-15 10:44:15 +03:00 committed by Ihar Hrachyshka
parent d8259702aa
commit 0395f14203
10 changed files with 310 additions and 61 deletions

View File

@ -44,6 +44,10 @@ class QosNetworkPolicyBinding(model_base.BASEV2):
nullable=False,
unique=True,
primary_key=True)
network = sa.orm.relationship(
models_v2.Network,
backref=sa.orm.backref("qos_policy_binding", uselist=False,
cascade='delete', lazy='joined'))
class QosPortPolicyBinding(model_base.BASEV2):
@ -59,6 +63,10 @@ class QosPortPolicyBinding(model_base.BASEV2):
nullable=False,
unique=True,
primary_key=True)
port = sa.orm.relationship(
models_v2.Port,
backref=sa.orm.backref("qos_policy_binding", uselist=False,
cascade='delete', lazy='joined'))
class QosRule(model_base.BASEV2, models_v2.HasId):

View File

@ -911,12 +911,14 @@ class ExtensionDriver(object):
"""
pass
@abc.abstractproperty
@property
def extension_alias(self):
"""Supported extension alias.
Return the alias identifying the core API extension supported
by this driver.
by this driver. Do not declare if API extension handling will
be left to a service plugin, and we just need to provide
core resource extension and updates.
"""
pass

View File

@ -0,0 +1,50 @@
# Copyright (c) 2015 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from neutron.plugins.ml2 import driver_api as api
from neutron.services.qos import qos_extension
LOG = logging.getLogger(__name__)
class QosExtensionDriver(api.ExtensionDriver):
def initialize(self):
self.qos_ext_handler = qos_extension.QosResourceExtensionHandler()
LOG.debug("QosExtensionDriver initialization complete")
def process_create_network(self, context, data, result):
self.qos_ext_handler.process_resource(
context, qos_extension.NETWORK, data, result)
process_update_network = process_create_network
def process_create_port(self, context, data, result):
self.qos_ext_handler.process_resource(
context, qos_extension.PORT, data, result)
process_update_port = process_create_port
def extend_network_dict(self, session, db_data, result):
result.update(
self.qos_ext_handler.extract_resource_fields(qos_extension.NETWORK,
db_data))
def extend_port_dict(self, session, db_data, result):
result.update(
self.qos_ext_handler.extract_resource_fields(qos_extension.PORT,
db_data))

View File

@ -723,10 +723,14 @@ class ExtensionManager(stevedore.named.NamedExtensionManager):
# the order in which the drivers are called.
self.ordered_ext_drivers = []
#TODO(QoS): enforce qos extension until we enable it in devstack-gate
drivers = cfg.CONF.ml2.extension_drivers
if 'qos' not in drivers:
drivers += ['qos']
LOG.info(_LI("Configured extension driver names: %s"),
cfg.CONF.ml2.extension_drivers)
drivers)
super(ExtensionManager, self).__init__('neutron.ml2.extension_drivers',
cfg.CONF.ml2.extension_drivers,
drivers,
invoke_on_load=True,
name_order=True)
LOG.info(_LI("Loaded extension driver names: %s"), self.names())
@ -753,9 +757,10 @@ class ExtensionManager(stevedore.named.NamedExtensionManager):
exts = []
for driver in self.ordered_ext_drivers:
alias = driver.obj.extension_alias
exts.append(alias)
LOG.info(_LI("Got %(alias)s extension from driver '%(drv)s'"),
{'alias': alias, 'drv': driver.name})
if alias:
exts.append(alias)
LOG.info(_LI("Got %(alias)s extension from driver '%(drv)s'"),
{'alias': alias, 'drv': driver.name})
return exts
def _call_on_ext_drivers(self, method_name, plugin_context, data, result):

View File

@ -64,6 +64,7 @@ from neutron.extensions import extra_dhcp_opt as edo_ext
from neutron.extensions import portbindings
from neutron.extensions import portsecurity as psec
from neutron.extensions import providernet as provider
from neutron.extensions import qos
from neutron.extensions import vlantransparent
from neutron.i18n import _LE, _LI, _LW
from neutron import manager
@ -1140,6 +1141,12 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
original_port[psec.PORTSECURITY] !=
updated_port[psec.PORTSECURITY]):
need_port_update_notify = True
# TODO(QoS): Move out to the extension framework somehow.
# Follow https://review.openstack.org/#/c/169223 for a solution.
if (qos.QOS_POLICY_ID in attrs and
original_port[qos.QOS_POLICY_ID] !=
updated_port[qos.QOS_POLICY_ID]):
need_port_update_notify = True
if addr_pair.ADDRESS_PAIRS in attrs:
need_port_update_notify |= (

View File

@ -0,0 +1,82 @@
# Copyright (c) 2015 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.extensions import qos
from neutron import manager
from neutron.objects.qos import policy as policy_object
from neutron.plugins.common import constants as plugin_constants
NETWORK = 'network'
PORT = 'port'
# TODO(QoS): Add interface to define how this should look like
class QosResourceExtensionHandler(object):
@property
def plugin_loaded(self):
if not hasattr(self, '_plugin_loaded'):
service_plugins = manager.NeutronManager.get_service_plugins()
self._plugin_loaded = plugin_constants.QOS in service_plugins
return self._plugin_loaded
def _get_policy_obj(self, context, policy_id):
return policy_object.QosPolicy.get_by_id(context, policy_id)
def _update_port_policy(self, context, port, port_changes):
old_policy = policy_object.QosPolicy.get_port_policy(
context, port['id'])
if old_policy:
#TODO(QoS): this means two transactions. One for detaching
# one for re-attaching, we may want to update
# within a single transaction instead, or put
# a whole transaction on top, or handle the switch
# at db api level automatically within transaction.
old_policy.detach_port(port['id'])
qos_policy_id = port_changes.get(qos.QOS_POLICY_ID)
if qos_policy_id is not None:
policy = self._get_policy_obj(context, qos_policy_id)
policy.attach_port(port['id'])
port[qos.QOS_POLICY_ID] = qos_policy_id
def _update_network_policy(self, context, network, network_changes):
old_policy = policy_object.QosPolicy.get_network_policy(
context, network['id'])
if old_policy:
old_policy.detach_network(network['id'])
qos_policy_id = network_changes.get(qos.QOS_POLICY_ID)
if qos_policy_id:
policy = self._get_policy_obj(context, qos_policy_id)
policy.attach_network(network['id'])
network[qos.QOS_POLICY_ID] = qos_policy_id
def _exec(self, method_name, context, kwargs):
return getattr(self, method_name)(context=context, **kwargs)
def process_resource(self, context, resource_type, requested_resource,
actual_resource):
if qos.QOS_POLICY_ID in requested_resource and self.plugin_loaded:
self._exec('_update_%s_policy' % resource_type, context,
{resource_type: actual_resource,
"%s_changes" % resource_type: requested_resource})
def extract_resource_fields(self, resource_type, resource):
if not self.plugin_loaded:
return {}
binding = resource['qos_policy_binding']
return {qos.QOS_POLICY_ID: binding['policy_id'] if binding else None}

View File

@ -17,9 +17,6 @@ from neutron import manager
from neutron.api.rpc.callbacks import registry as rpc_registry
from neutron.api.rpc.callbacks import resources as rpc_resources
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.extensions import qos
from neutron.i18n import _LW
from neutron.objects.qos import policy as policy_object
@ -108,8 +105,6 @@ class QoSPlugin(qos.QoSPluginBase):
def __init__(self):
super(QoSPlugin, self).__init__()
self.register_resource_providers()
self.register_port_callbacks()
self.register_net_callbacks()
def register_resource_providers(self):
rpc_registry.register_provider(
@ -120,55 +115,6 @@ class QoSPlugin(qos.QoSPluginBase):
_get_qos_policy_cb_stub,
rpc_resources.QOS_POLICY)
def register_port_callbacks(self):
registry.subscribe(
self._extend_port_policy_data, resources.PORT, events.AFTER_READ)
def _extend_port_policy_data(self, resource, event, trigger, **kwargs):
context = kwargs['context']
port = kwargs['port']
policy = policy_object.QosPolicy.get_port_policy(context, port['id'])
port['qos_policy_id'] = policy.id if policy else None
def update_port_policy(self, context, port):
old_policy = policy_object.QosPolicy.get_port_policy(
context, port['id'])
if old_policy is not None:
#TODO(QoS): this means two transactions. One for detaching
# one for re-attaching, we may want to update
# within a single transaction instead, or put
# a whole transaction on top, or handle the switch
# at db api level automatically within transaction.
old_policy.detach_port(port['id'])
qos_policy_id = port.get('qos_policy_id')
if qos_policy_id is not None:
policy = self._get_policy_obj(context, qos_policy_id)
policy.attach_port(port['id'])
def register_net_callbacks(self):
registry.subscribe(self._extend_network_policy_data,
resources.NETWORK,
events.AFTER_READ)
def _extend_network_policy_data(self, resource, event, trigger, **kwargs):
context = kwargs['context']
network = kwargs['network']
policy = policy_object.QosPolicy.get_network_policy(
context, network['id'])
network['qos_policy_id'] = policy.id if policy else None
def update_network_policy(self, context, network):
old_policy = policy_object.QosPolicy.get_network_policy(
context, network['id'])
if old_policy:
old_policy.detach_network(network['id'])
qos_policy_id = network.get('qos_policy_id')
if qos_policy_id:
policy = self._get_policy_obj(context, qos_policy_id)
policy.attach_network(network['id'])
def create_policy(self, context, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.create()

View File

@ -0,0 +1,148 @@
# Copyright (c) 2015 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.extensions import qos
from neutron.plugins.common import constants as plugin_constants
from neutron.services.qos import qos_extension
from neutron.tests import base
def _get_test_dbdata(qos_policy_id):
return {'id': None, 'qos_policy_binding': {'policy_id': qos_policy_id,
'network_id': 'fake_net_id'}}
class QosResourceExtensionHandlerTestCase(base.BaseTestCase):
def setUp(self):
super(QosResourceExtensionHandlerTestCase, self).setUp()
self.ext_handler = qos_extension.QosResourceExtensionHandler()
policy_p = mock.patch('neutron.objects.qos.policy.QosPolicy')
self.policy_m = policy_p.start()
def test_process_resource_no_qos_policy_id(self):
self.ext_handler.process_resource(None, qos_extension.PORT, {}, None)
self.assertFalse(self.policy_m.called)
def _mock_plugin_loaded(self, plugin_loaded):
plugins = {}
if plugin_loaded:
plugins[plugin_constants.QOS] = None
return mock.patch('neutron.manager.NeutronManager.get_service_plugins',
return_value=plugins)
def test_process_resource_no_qos_plugin_loaded(self):
with self._mock_plugin_loaded(False):
self.ext_handler.process_resource(None, qos_extension.PORT,
{qos.QOS_POLICY_ID: None}, None)
self.assertFalse(self.policy_m.called)
def test_process_resource_port_new_policy(self):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.Mock()
actual_port = {'id': mock.Mock(),
qos.QOS_POLICY_ID: qos_policy_id}
qos_policy = mock.MagicMock()
self.policy_m.get_by_id = mock.Mock(return_value=qos_policy)
self.ext_handler.process_resource(
None, qos_extension.PORT, {qos.QOS_POLICY_ID: qos_policy_id},
actual_port)
qos_policy.attach_port.assert_called_once_with(actual_port['id'])
def test_process_resource_port_updated_policy(self):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.Mock()
port_id = mock.Mock()
actual_port = {'id': port_id,
qos.QOS_POLICY_ID: qos_policy_id}
old_qos_policy = mock.MagicMock()
self.policy_m.get_port_policy = mock.Mock(
return_value=old_qos_policy)
new_qos_policy = mock.MagicMock()
self.policy_m.get_by_id = mock.Mock(return_value=new_qos_policy)
self.ext_handler.process_resource(
None, qos_extension.PORT, {qos.QOS_POLICY_ID: qos_policy_id},
actual_port)
old_qos_policy.detach_port.assert_called_once_with(port_id)
new_qos_policy.attach_port.assert_called_once_with(port_id)
def test_process_resource_network_new_policy(self):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.Mock()
actual_network = {'id': mock.Mock(),
qos.QOS_POLICY_ID: qos_policy_id}
qos_policy = mock.MagicMock()
self.policy_m.get_by_id = mock.Mock(return_value=qos_policy)
self.ext_handler.process_resource(
None, qos_extension.NETWORK,
{qos.QOS_POLICY_ID: qos_policy_id}, actual_network)
qos_policy.attach_network.assert_called_once_with(
actual_network['id'])
def test_process_resource_network_updated_policy(self):
with self._mock_plugin_loaded(True):
qos_policy_id = mock.Mock()
network_id = mock.Mock()
actual_network = {'id': network_id,
qos.QOS_POLICY_ID: qos_policy_id}
old_qos_policy = mock.MagicMock()
self.policy_m.get_network_policy = mock.Mock(
return_value=old_qos_policy)
new_qos_policy = mock.MagicMock()
self.policy_m.get_by_id = mock.Mock(return_value=new_qos_policy)
self.ext_handler.process_resource(
None, qos_extension.NETWORK,
{qos.QOS_POLICY_ID: qos_policy_id}, actual_network)
old_qos_policy.detach_network.assert_called_once_with(network_id)
new_qos_policy.attach_network.assert_called_once_with(network_id)
def test_extract_resource_fields_plugin_not_loaded(self):
with self._mock_plugin_loaded(False):
fields = self.ext_handler.extract_resource_fields(None, None)
self.assertEqual({}, fields)
def _test_extract_resource_fields_for_port(self, qos_policy_id):
with self._mock_plugin_loaded(True):
fields = self.ext_handler.extract_resource_fields(
qos_extension.PORT, _get_test_dbdata(qos_policy_id))
self.assertEqual({qos.QOS_POLICY_ID: qos_policy_id}, fields)
def test_extract_resource_fields_no_port_policy(self):
self._test_extract_resource_fields_for_port(None)
def test_extract_resource_fields_port_policy_exists(self):
qos_policy_id = mock.Mock()
self._test_extract_resource_fields_for_port(qos_policy_id)
def _test_extract_resource_fields_for_network(self, qos_policy_id):
with self._mock_plugin_loaded(True):
fields = self.ext_handler.extract_resource_fields(
qos_extension.NETWORK, _get_test_dbdata(qos_policy_id))
self.assertEqual({qos.QOS_POLICY_ID: qos_policy_id}, fields)
def test_extract_resource_fields_no_network_policy(self):
self._test_extract_resource_fields_for_network(None)
def test_extract_resource_fields_network_policy_exists(self):
qos_policy_id = mock.Mock()
qos_policy = mock.Mock()
qos_policy.id = qos_policy_id
self._test_extract_resource_fields_for_network(qos_policy_id)

View File

@ -197,6 +197,7 @@ neutron.ml2.extension_drivers =
testdb = neutron.tests.unit.plugins.ml2.drivers.ext_test:TestDBExtensionDriver
port_security = neutron.plugins.ml2.extensions.port_security:PortSecurityExtensionDriver
cisco_n1kv_ext = neutron.plugins.ml2.drivers.cisco.n1kv.n1kv_ext_driver:CiscoN1kvExtensionDriver
qos = neutron.plugins.ml2.extensions.qos:QosExtensionDriver
neutron.openstack.common.cache.backends =
memory = neutron.openstack.common.cache._backends.memory:MemoryBackend
neutron.ipam_drivers =