Add support for qos

Qos in neutron is modelled using QosPolicy that has a list of rules.
Rules can be of type DSCP rule or Bandwidth Limiting rule.

In AIM, neutron.QosPolicy along with optional DSCP rule are mapped to
api_res.QosRequirement and Bandwidth limiting rule is mapped to
api_res.QosDppPol resource.

Any Qos neutron object create/modify/delete actions in turn triggers
the newly added qos_driver. The driver in turn updates the corresponding
AIM resources.

When Qos policy is updated on a network, the mechanism driver updates the
AIM endpointgroup resource if it an epg kind of network and the policy
will eventually be configured in ovs wherever the epg is deployed.

Change-Id: I2f772312362606c7b85876368761507d857981ee
This commit is contained in:
Satya Patchava 2020-08-12 05:20:24 -07:00
parent eb3b19d82d
commit bfd2506314
6 changed files with 450 additions and 1 deletions

View File

@ -115,6 +115,11 @@ class InvalidNetworkForEpgContractMaster(exceptions.BadRequest):
"an external or SVI network.")
class InvalidNetworkForQos(exceptions.BadRequest):
message = _("Cannot specify qos policy for "
"an external or SVI network.")
class InvalidPreexistingBdForNetwork(exceptions.BadRequest):
message = _("The Bridge Domain specified in apic:distinguished_names "
"either does not exist in ACI or belongs to another network "

View File

@ -63,6 +63,7 @@ from neutron_lib import exceptions as n_exceptions
from neutron_lib.plugins import constants as pconst
from neutron_lib.plugins import directory
from neutron_lib.plugins.ml2 import api
from neutron_lib.services.qos import constants as qos_consts
from neutron_lib.utils import net
from opflexagent import constants as ofcst
from opflexagent import rpc as ofrpc
@ -93,6 +94,8 @@ from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import nova_client
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import rpc
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import trunk_driver
from gbpservice.neutron.plugins.ml2plus.drivers.apic_aim import qos_driver
# REVISIT: We need the aim_mapping policy driver's config until
# advertise_mtu and nested_host_vlan are moved to the mechanism
# driver's own config. Also, the noqa comment has to be on the same
@ -279,6 +282,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
self.vpcport_desc_re = re.compile(ACI_VPCPORT_DESCR_FORMAT)
self.apic_router_id_pool = cfg.CONF.ml2_apic_aim.apic_router_id_pool
self.apic_router_id_subnet = netaddr.IPSet([self.apic_router_id_pool])
self.qos_driver = qos_driver.register(self)
def start_rpc_listeners(self):
LOG.info("APIC AIM MD starting RPC listeners")
@ -662,6 +666,98 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
raise exceptions.InvalidPreexistingBdForNetwork()
return aim_bd
def _handle_qos_policy(self, context, policy):
session = context.session
aim_ctx = aim_context.AimContext(session)
tenant_aname = self.name_mapper.project(session, context.tenant_id)
aim_qos_db = aim_resource.QosRequirement(
tenant_name=tenant_aname, name=policy['id'])
aim_qos_db = self.aim.get(aim_ctx, aim_qos_db)
bw_rules = []
dscp = None
egress_dpp_pol = None
ingress_dpp_pol = None
for rule in policy['rules']:
if rule.rule_type == qos_consts.RULE_TYPE_DSCP_MARKING:
dscp = rule.dscp_mark
elif rule.rule_type == qos_consts.RULE_TYPE_BANDWIDTH_LIMIT:
if rule.direction == 'egress':
egress_dpp_pol = rule.id
bw_rules.append({
'egress': True,
'burst': str(rule.max_burst_kbps),
'tenant_name': tenant_aname,
'name': rule.id, 'burst_unit': 'kilo',
'display_name': policy['name'] + '_egress',
'rate_unit': 'kilo',
'rate': rule.max_kbps})
elif rule.direction == 'ingress':
ingress_dpp_pol = rule.id
bw_rules.append({
'egress': False,
'burst': str(rule.max_burst_kbps),
'tenant_name': tenant_aname,
'name': rule.id, 'burst_unit': 'kilo',
'display_name': policy['name'] + '_ingress',
'rate_unit': 'kilo', 'rate': rule.max_kbps})
if aim_qos_db and aim_qos_db.egress_dpp_pol and \
aim_qos_db.egress_dpp_pol != egress_dpp_pol:
aim_bw = aim_resource.QosDppPol(tenant_name=tenant_aname,
name=aim_qos_db.egress_dpp_pol)
self.aim.delete(aim_ctx, aim_bw)
if aim_qos_db and aim_qos_db.ingress_dpp_pol and \
aim_qos_db.ingress_dpp_pol != ingress_dpp_pol:
aim_bw = aim_resource.QosDppPol(tenant_name=tenant_aname,
name=aim_qos_db.ingress_dpp_pol)
self.aim.delete(aim_ctx, aim_bw)
aim_qos = aim_resource.QosRequirement(
tenant_name=tenant_aname, name=policy['id'],
display_name=policy['name'],
dscp=dscp, egress_dpp_pol=egress_dpp_pol,
ingress_dpp_pol=ingress_dpp_pol)
self.aim.create(aim_ctx, aim_qos, overwrite=True)
for i in bw_rules:
res = aim_resource.QosDppPol(**i)
self.aim.create(aim_ctx, res, overwrite=True)
def create_qos_policy_precommit(self, context, policy):
"""Create a QoS policy.
:param context: neutron api request context
:type context: neutron_lib.context.Context
:param policy: policy data to be applied
:type policy: dict
:returns: a QosPolicy object
"""
self._handle_qos_policy(context, policy)
def update_qos_policy_precommit(self, context, policy):
"""Create a QoS policy.
:param context: neutron api request context
:type context: neutron_lib.context.Context
:param policy: policy data to be applied
:type policy: dict
:returns: a QosPolicy object
"""
self._handle_qos_policy(context, policy)
def delete_qos_policy_precommit(self, context, policy):
session = context.session
aim_ctx = aim_context.AimContext(session)
tenant_aname = self.name_mapper.project(session, context.tenant_id)
qos_aim = aim_resource.QosRequirement(
tenant_name=tenant_aname, name=policy['id'])
aim_qos_db = self.aim.get(aim_ctx, qos_aim)
if aim_qos_db and aim_qos_db.egress_dpp_pol:
aim_bw = aim_resource.QosDppPol(tenant_name=tenant_aname,
name=aim_qos_db.egress_dpp_pol)
self.aim.delete(aim_ctx, aim_bw)
if aim_qos_db and aim_qos_db.ingress_dpp_pol:
aim_bw = aim_resource.QosDppPol(tenant_name=tenant_aname,
name=aim_qos_db.ingress_dpp_pol)
self.aim.delete(aim_ctx, aim_bw)
self.aim.delete(aim_ctx, qos_aim, cascade=True)
def create_network_precommit(self, context):
current = context.current
LOG.debug("APIC AIM MD creating network: %s", current)
@ -680,6 +776,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
if (current[cisco_apic.EPG_CONTRACT_MASTERS] and (is_ext or is_svi)):
raise exceptions.InvalidNetworkForEpgContractMaster()
if (current.get(qos_consts.QOS_POLICY_ID) and (is_ext or is_svi)):
raise exceptions.InvalidNetworkForQos()
if is_ext:
l3out, ext_net, ns = self._get_aim_nat_strategy(current)
if not ext_net:
@ -813,6 +912,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
cisco_apic.EXTRA_CONSUMED_CONTRACTS]
epg.epg_contract_masters = current[
cisco_apic.EPG_CONTRACT_MASTERS]
epg.qos_name = current.get(qos_consts.QOS_POLICY_ID, None)
self.aim.create(aim_ctx, epg)
self._add_network_mapping_and_notify(
@ -833,6 +933,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
is_ext = self._is_external(current)
is_svi = self._is_svi(current)
if (current.get(qos_consts.QOS_POLICY_ID) and (is_ext or is_svi)):
raise exceptions.InvalidNetworkForQos()
# Update name if changed. REVISIT: Remove is_ext from
# condition and add UT for updating external network name.
if (not is_ext and
@ -848,6 +951,13 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
if l3out:
self.aim.update(aim_ctx, l3out, display_name=dname)
if (current.get(qos_consts.QOS_POLICY_ID) !=
original.get(qos_consts.QOS_POLICY_ID)):
epg = self._get_network_epg(mapping)
self.aim.update(
aim_ctx, epg,
qos_name=current.get(qos_consts.QOS_POLICY_ID))
# Update extra provided/consumed contracts if changed.
curr_prov = set(current[cisco_apic.EXTRA_PROVIDED_CONTRACTS])
curr_cons = set(current[cisco_apic.EXTRA_CONSUMED_CONTRACTS])
@ -6009,6 +6119,7 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
if d['type'] == 'OpenStack']
expected_epg.physical_domain_names = [
d['name'] for d in expected_epg.physical_domains]
expected_epg.qos_name = actual_epg.qos_name
else:
# REVISIT: Force rebinding of ports using this
# EPG?
@ -6334,6 +6445,9 @@ class ApicMechanismDriver(api_plus.MechanismDriver,
epg.static_paths = []
epg.epg_contract_masters = epg_contract_masters
epg.monitored = False
qos_policy_binding = net_db.get('qos_policy_binding')
epg.qos_name = (
qos_policy_binding.policy_id if qos_policy_binding else None)
mgr.expect_aim_resource(epg)
return bd, epg, vrf

View File

@ -0,0 +1,78 @@
# Copyright (c) 2020 Cisco Systems Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants
from neutron_lib.db import constants as db_consts
from neutron_lib.services.qos import base
from neutron_lib.services.qos import constants as qos_consts
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
SUPPORTED_RULES = {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
qos_consts.MAX_KBPS: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.MAX_BURST: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.DIRECTION: {
'type:values': constants.VALID_DIRECTIONS},
},
qos_consts.RULE_TYPE_DSCP_MARKING: {
qos_consts.DSCP_MARK: {'type:values': constants.VALID_DSCP_MARKS},
}
}
VIF_TYPES = [portbindings.VIF_TYPE_OVS, portbindings.VIF_TYPE_VHOST_USER]
VNIC_TYPES = [portbindings.VNIC_NORMAL]
DRIVER = None
class ACIQosDriver(base.DriverBase):
@classmethod
def create(cls, plugin_driver):
obj = ACIQosDriver(name='ACIQosDriver',
vif_types=VIF_TYPES,
vnic_types=VNIC_TYPES,
supported_rules=SUPPORTED_RULES,
requires_rpc_notifications=False)
obj._driver = plugin_driver
return obj
@property
def is_loaded(self):
return 'qos' in cfg.CONF.ml2.extension_drivers
def create_policy_precommit(self, context, policy):
self._driver.create_qos_policy_precommit(context, policy)
def update_policy_precommit(self, context, policy):
self._driver.update_qos_policy_precommit(context, policy)
def delete_policy_precommit(self, context, policy):
self._driver.delete_qos_policy_precommit(context, policy)
def register(plugin_driver):
"""Register the driver."""
global DRIVER
if not DRIVER:
DRIVER = ACIQosDriver.create(plugin_driver)
LOG.debug('ACI QoS driver registered')
return DRIVER

View File

@ -161,7 +161,9 @@ class ValidationManager(object):
for attr_name, attr_type in resource.other_attributes.items():
attr_type_type = attr_type['type']
if attr_type_type == 'string':
setattr(resource, attr_name, str(getattr(resource, attr_name)))
value = getattr(resource, attr_name)
setattr(resource, attr_name,
str(value) if value else value)
elif (attr_type_type == 'array' and
attr_type['items']['type'] == 'string'):
# REVISIT: May also need to dedup arrays of types

View File

@ -0,0 +1,250 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
# from aim.api import infra as aim_infra
from aim.api import resource as aim_res
from gbpservice.neutron.tests.unit.services.grouppolicy import (
test_aim_mapping_driver as test_aim_base)
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron_lib import context
from neutron_lib.plugins import directory
from oslo_log import log as logging
from oslo_utils import uuidutils
from gbpservice.neutron.services.grouppolicy import config
LOG = logging.getLogger(__name__)
class TestAIMQosBase(test_aim_base.AIMBaseTestCase):
def setUp(self, *args, **kwargs):
net_type = kwargs.pop('tenant_network_types', None) or ['vlan']
ml2_options = {'mechanism_drivers': ['apic_aim', 'openvswitch'],
'extension_drivers': ['apic_aim', 'port_security',
'dns', 'qos'],
'type_drivers': ['opflex', 'local', 'vlan'],
'tenant_network_types': net_type}
config.cfg.CONF.set_override(
'network_vlan_ranges', ['physnet1:100:200'], group='ml2_type_vlan')
self._aim_mech_driver = None
self._qos_driver = None
super(TestAIMQosBase, self).setUp(
*args, ml2_options=ml2_options, qos_plugin='qos', **kwargs)
self._plugin = directory.get_plugin()
self._plugin.remove_networks_from_down_agents = mock.Mock()
self._plugin.is_agent_down = mock.Mock(return_value=False)
self._ctx = context.get_admin_context()
self.ctxt = context.Context('fake_user', 'fake_tenant')
self.admin_ctxt = context.get_admin_context()
self.policy_data = {
'policy': {'id': uuidutils.generate_uuid(),
'project_id': uuidutils.generate_uuid(),
'name': 'test-policy',
'description': 'Test policy description',
'shared': True,
'is_default': False}}
self.rule_data = {
'egress_bandwidth_limit_rule': {
'id': uuidutils.generate_uuid(),
'direction': 'egress',
'max_kbps': 100,
'max_burst_kbps': 150},
'ingress_bandwidth_limit_rule': {
'id': uuidutils.generate_uuid(),
'direction': 'ingress',
'max_kbps': 101,
'max_burst_kbps': 1150},
'dscp_marking_rule': {'id': uuidutils.generate_uuid(),
'dscp_mark': 16}, }
self.policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
self.egress_rule = rule_object.QosBandwidthLimitRule(
self.ctxt, **self.rule_data['egress_bandwidth_limit_rule'])
self.ingress_rule = rule_object.QosBandwidthLimitRule(
self.ctxt, **self.rule_data['ingress_bandwidth_limit_rule'])
self.dscp_rule = rule_object.QosDscpMarkingRule(
self.ctxt, **self.rule_data['dscp_marking_rule'])
def tearDown(self):
super(TestAIMQosBase, self).tearDown()
@property
def qos_plugin(self):
if not self._qos_plugin:
self._qos_plugin = directory.get_plugin('qos')
return self._qos_plugin
@property
def qos_driver(self):
if not self._qos_driver:
self._qos_driver = self.aim_mech.qos_driver
return self._qos_driver
@property
def aim_mech(self):
if not self._aim_mech_driver:
self._aim_mech_driver = (
self._plugin.mechanism_manager.mech_drivers['apic_aim'].obj)
return self._aim_mech_driver
class TestQosPolicy(TestAIMQosBase):
def test_create_delete_policy(self):
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
self.qos_driver.create_policy_precommit(self.ctxt, _policy)
tenant_name = 'prj_' + self.ctxt.tenant_id
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosRequirement(
name=_policy.id,
tenant_name=tenant_name))
self.assertIsNotNone(pol)
self.qos_driver.delete_policy_precommit(self.ctxt, _policy)
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosRequirement(
name=_policy.id,
tenant_name=tenant_name))
self.assertIsNone(pol)
def test_create_delete_dscp(self):
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
setattr(_policy, "rules", [self.dscp_rule])
self.qos_driver.create_policy_precommit(self.ctxt, _policy)
tenant_name = 'prj_' + self.ctxt.tenant_id
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosRequirement(
name=_policy.id,
tenant_name=tenant_name))
self.assertIsNotNone(pol)
self.assertEqual(pol.dscp,
self.rule_data['dscp_marking_rule']['dscp_mark'])
setattr(_policy, "rules", [])
self.qos_driver.update_policy_precommit(self.ctxt, _policy)
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosRequirement(
name=_policy.id,
tenant_name=tenant_name))
self.assertIsNone(pol.dscp)
def test_create_delete_bandwidth_rules(self):
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
setattr(_policy, "rules", [self.egress_rule, self.ingress_rule])
self.qos_driver.create_policy_precommit(self.ctxt, _policy)
tenant_name = 'prj_' + self.ctxt.tenant_id
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosRequirement(
name=_policy.id,
tenant_name=tenant_name))
self.assertIsNotNone(pol)
self.assertEqual(pol.ingress_dpp_pol, self.ingress_rule.id)
self.assertEqual(pol.egress_dpp_pol, self.egress_rule.id)
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosDppPol(
name=self.egress_rule.id,
tenant_name=tenant_name))
self.assertIsNotNone(pol)
self.assertEqual(pol.burst, str(self.egress_rule.max_burst_kbps))
self.assertEqual(pol.rate, self.egress_rule.max_kbps)
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosDppPol(
name=self.ingress_rule.id,
tenant_name=tenant_name))
self.assertIsNotNone(pol)
self.assertEqual(pol.burst, str(self.ingress_rule.max_burst_kbps))
self.assertEqual(pol.rate, self.ingress_rule.max_kbps)
setattr(_policy, "rules", [])
self.qos_driver.update_policy_precommit(self.ctxt, _policy)
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosRequirement(
name=_policy.id,
tenant_name=tenant_name))
self.assertIsNone(pol.egress_dpp_pol)
self.assertIsNone(pol.ingress_dpp_pol)
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosDppPol(
name=self.egress_rule.id,
tenant_name=tenant_name))
self.assertIsNone(pol)
pol = self.aim_mgr.get(
self._aim_context,
aim_res.QosDppPol(
name=self.ingress_rule.id,
tenant_name=tenant_name))
self.assertIsNone(pol)
def _make_qos_policy(self):
qos_policy = policy_object.QosPolicy(
self.admin_ctxt, **self.policy_data['policy'])
qos_policy.create()
return qos_policy
def test_attach_detach_network_qos(self):
net_qos_obj = self._make_qos_policy()
net_qos_id = net_qos_obj.id if net_qos_obj else None
network = self._make_network(self.fmt, 'net1', True,
arg_list=tuple(list(['qos_policy_id'])),
qos_policy_id=net_qos_id)
epg = self.aim_mech._get_epg_by_network_id(self._ctx.session,
network['network']['id'])
epg = self.aim_mgr.get(self._aim_context, epg)
self.assertEqual(epg.qos_name, net_qos_id)
data = {'network': {'qos_policy_id': None}}
self._update('networks', network['network']['id'], data)
epg = self.aim_mgr.get(self._aim_context, epg)
self.assertIsNone(epg.qos_name)
def _test_invalid_network_exception(self, kwargs):
# Verify creating network with QoS fails
net_qos_obj = self._make_qos_policy()
net_qos_id = net_qos_obj.id if net_qos_obj else None
kwargs['qos_policy_id'] = net_qos_id
resp = self._create_network(
self.fmt, 'net', True, arg_list=tuple(list(kwargs.keys())),
**kwargs)
result = self.deserialize(self.fmt, resp)
self.assertEqual(
'InvalidNetworkForQos',
result['NeutronError']['type'])
def test_external_network_exception(self):
self._test_invalid_network_exception({'router:external': True})
def test_svi_network_exception(self):
self._test_invalid_network_exception({'apic:svi': True})