Qos Agent Extension

This patch introduces the following:
QosAgentExtension - implementation of AgentCoreResourceExtension
QosAgentDriver - interface class
This will allow any agent to implement their own low level driver for
Qos Agent Extension.

Co-Authored-By: Miguel Angel Ajo <mangelajo@redhat.com>

Change-Id: I9e388173dfe0eb43c961018bd687bc86f34c7a6a
This commit is contained in:
Moshe Levi
2015-06-24 18:44:08 +03:00
committed by Miguel Angel Ajo
parent 436bc5d9c3
commit 9544ffb615
6 changed files with 222 additions and 0 deletions

View File

View File

@@ -0,0 +1,125 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
from oslo_utils import importutils
import six
from neutron.agent.l2 import agent_extension
from neutron.api.rpc.callbacks import resources
@six.add_metaclass(abc.ABCMeta)
class QosAgentDriver(object):
"""Define stable abstract interface for Qos Agent Driver.
Qos Agent driver defines the interface to be implemented by Agent
for applying Qos Rules on a port.
"""
@abc.abstractmethod
def initialize(self):
"""Perform Qos agent driver initialization.
"""
pass
@abc.abstractmethod
def create(self, port, rules):
"""Apply Qos rules on port for the first time.
:param port: port object.
:param rules: the list of rules to apply on port.
"""
#TODO(Qos) we may want to provide default implementations of calling
#delete and then update
pass
@abc.abstractmethod
def update(self, port, rules):
"""Apply Qos rules on port.
:param port: port object.
:param rules: the list of rules to be apply on port.
"""
pass
@abc.abstractmethod
def delete(self, port, rules):
"""Remove Qos rules from port.
:param port: port object.
:param rules: the list of rules to be removed from port.
"""
pass
class QosAgentExtension(agent_extension.AgentCoreResourceExtension):
def initialize(self, resource_rpc):
"""Perform Agent Extension initialization.
:param resource_rpc: the agent side rpc for getting
resource by type and id
"""
super(QosAgentExtension, self).initialize(resource_rpc)
#TODO(QoS) - Load it from Config
qos_driver_cls = importutils.import_class(
'neutron.plugins.ml2.drivers.openvswitch.agent.'
'extension_drivers.qos_driver.QosOVSAgentDriver')
self.qos_driver = qos_driver_cls()
self.qos_driver.initialize()
self.qos_policy_ports = collections.defaultdict(dict)
self.known_ports = set()
def handle_port(self, context, port):
"""Handle agent qos extension for port.
This method subscribes to qos_policy_id changes
with a callback and get all the qos_policy_ports and apply
them using the qos driver.
Updates and delete event should be handle by the registered
callback.
"""
port_id = port['port_id']
qos_policy_id = port.get('qos_policy_id')
if qos_policy_id is None:
#TODO(QoS): we should also handle removing policy
return
#Note(moshele) check if we have seen this port
#and it has the same policy we do nothing.
if (port_id in self.known_ports and
port_id in self.qos_policy_ports[qos_policy_id]):
return
self.qos_policy_ports[qos_policy_id][port_id] = port
self.known_ports.add(port_id)
#TODO(QoS): handle updates when implemented
# we have two options:
# 1. to add new api for subscribe
# registry.subscribe(self._process_rules_updates,
# resources.QOS_RULES, qos_policy_id)
# 2. combine get_info rpc to also subscribe to the resource
qos_rules = self.resource_rpc.get_info(
context, resources.QOS_POLICY, qos_policy_id)
self._process_rules_updates(
port, resources.QOS_POLICY, qos_policy_id,
qos_rules, 'create')
def _process_rules_updates(
self, port, resource_type, resource_id,
qos_rules, action_type):
getattr(self.qos_driver, action_type)(port, qos_rules)

View File

View File

@@ -0,0 +1,96 @@
# Copyright (c) 2015 Mellanox Technologies, Ltd
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
from oslo_utils import uuidutils
from neutron.agent.l2.extensions import qos_agent
from neutron.api.rpc.callbacks import resources
from neutron.tests import base
# This is a minimalistic mock of rules to be passed/checked around
# which should be exteneded as needed to make real rules
TEST_GET_INFO_RULES = ['rule1', 'rule2']
class QosAgentExtensionTestCase(base.BaseTestCase):
def setUp(self):
super(QosAgentExtensionTestCase, self).setUp()
self.qos_agent = qos_agent.QosAgentExtension()
self.context = mock.Mock()
# Force our fake underlying QoS driver
#TODO(QoS): change config value when we tie this to a configuration
# entry.
self.import_patcher = mock.patch(
'oslo_utils.importutils.import_class',
return_value=mock.Mock())
self.import_patcher.start()
self._create_fake_resource_rpc()
self.qos_agent.initialize(self.resource_rpc_mock)
def _create_fake_resource_rpc(self):
self.get_info_mock = mock.Mock(return_value=TEST_GET_INFO_RULES)
self.resource_rpc_mock = mock.Mock()
self.resource_rpc_mock.get_info = self.get_info_mock
def _create_test_port_dict(self):
return {'port_id': uuidutils.generate_uuid(),
'qos_policy_id': uuidutils.generate_uuid()}
def test_handle_port_with_no_policy(self):
port = self._create_test_port_dict()
del port['qos_policy_id']
self.qos_agent._process_rules_updates = mock.Mock()
self.qos_agent.handle_port(self.context, port)
self.assertFalse(self.qos_agent._process_rules_updates.called)
def test_handle_unknown_port(self):
port = self._create_test_port_dict()
qos_policy_id = port['qos_policy_id']
port_id = port['port_id']
self.qos_agent.handle_port(self.context, port)
# we make sure the underlaying qos driver is called with the
# right parameters
self.qos_agent.qos_driver.create.assert_called_once_with(
port, TEST_GET_INFO_RULES)
self.assertEqual(port,
self.qos_agent.qos_policy_ports[qos_policy_id][port_id])
self.assertTrue(port_id in self.qos_agent.known_ports)
def test_handle_known_port(self):
port_obj1 = self._create_test_port_dict()
port_obj2 = copy.copy(port_obj1)
self.qos_agent.handle_port(self.context, port_obj1)
self.qos_agent.qos_driver.reset_mock()
self.qos_agent.handle_port(self.context, port_obj2)
self.assertFalse(self.qos_agent.qos_driver.create.called)
def test_handle_known_port_change_policy_id(self):
port = self._create_test_port_dict()
self.qos_agent.handle_port(self.context, port)
self.resource_rpc_mock.get_info.reset_mock()
port['qos_policy_id'] = uuidutils.generate_uuid()
self.qos_agent.handle_port(self.context, port)
self.get_info_mock.assert_called_once_with(
self.context, resources.QOS_POLICY,
port['qos_policy_id'])
#TODO(QoS): handle qos_driver.update call check when
# we do that

View File

@@ -203,6 +203,7 @@ neutron.ipam_drivers =
fake = neutron.tests.unit.ipam.fake_driver:FakeDriver
internal = neutron.ipam.drivers.neutrondb_ipam.driver:NeutronDbPool
neutron.agent.l2.extensions =
qos = neutron.agent.l2.extensions.qos_agent:QosAgentExtension
# These are for backwards compat with Icehouse notification_driver configuration values
oslo.messaging.notify.drivers =
neutron.openstack.common.notifier.log_notifier = oslo_messaging.notify._impl_log:LogDriver