[OVN] Sync QoS policies

The tool "neutron-ovn-db-sync-util" now syncs the Neutron QoS policies
with the OVN NB database. The tools reads the port and the floaiting IP
QoS policies and creates the corresponding OVN QoS rules.

The ovsdbapp library is bumped to version 1.15.0. This version updates
the "QoSAddCommand" to allow register updates. If the OVN NB QoS
register to be created is present in the DB and all parameters match,
no transaction is commited to the DB.

Depends-On: https://review.opendev.org/c/openstack/ovsdbapp/+/822138
Closes-Bug: #1947334

Conflicts:
    lower-constraints.txt
    neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/extensions/qos.py
    neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovn_client.py
    neutron/plugins/ml2/drivers/ovn/mech_driver/ovsdb/ovn_db_sync.py
    neutron/tests/unit/plugins/ml2/drivers/ovn/mech_driver/ovsdb/extensions/test_qos.py
    requirements.txt

Change-Id: Ib597b62017b56b41009dd4d7359e169f424272b0
(cherry picked from commit cde5657a504f6658cb1c9c98235c120030f86130)
(cherry picked from commit a51f243016c884bf9a8ed3fc4792a5227b750ed6)
This commit is contained in:
Rodolfo Alonso Hernandez 2021-10-07 16:20:21 +00:00 committed by Rodolfo Alonso
parent cbcece50d9
commit 0e9c6539b3
9 changed files with 372 additions and 72 deletions

View File

@ -200,6 +200,7 @@ def main():
'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin',
'neutron.services.segments.plugin.Plugin',
'port_forwarding',
'qos'
]
else:
LOG.error('Invalid core plugin : ["%s"].', cfg.CONF.core_plugin)

View File

@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from ovsdbapp.backend.ovs_idl import idlutils
from neutron.objects.qos import binding as qos_binding
from neutron.objects.qos import policy as qos_policy
from neutron.objects.qos import rule as qos_rule
@ -36,13 +34,20 @@ OVN_QOS_DEFAULT_RULE_PRIORITY = 2002
class OVNClientQosExtension(object):
"""OVN client QoS extension"""
def __init__(self, driver):
def __init__(self, driver=None, nb_idl=None):
LOG.info('Starting OVNClientQosExtension')
super(OVNClientQosExtension, self).__init__()
self._driver = driver
self._nb_idl = nb_idl
self._plugin_property = None
self._plugin_l3_property = None
@property
def nb_idl(self):
if not self._nb_idl:
self._nb_idl = self._driver._nb_idl
return self._nb_idl
@property
def _plugin(self):
if self._plugin_property is None:
@ -175,7 +180,8 @@ class OVNClientQosExtension(object):
return ovn_qos_rule
def _port_effective_qos_policy_id(self, port):
@staticmethod
def port_effective_qos_policy_id(port):
"""Return port effective QoS policy
If the port does not have any QoS policy reference or is a network
@ -191,35 +197,43 @@ class OVNClientQosExtension(object):
else:
return port['qos_network_policy_id'], 'network'
def _update_port_qos_rules(self, txn, port_id, network_id, qos_policy_id,
qos_rules):
# NOTE(ralonsoh): we don't use the transaction context because the
# QoS policy could belong to another user (network QoS policy).
admin_context = n_context.get_admin_context()
def _delete_port_qos_rules(self, txn, port_id, network_id):
# Generate generic deletion rules for both directions. In case of
# creating deletion rules, the rule content is irrelevant.
for ovn_rule in [self._ovn_qos_rule(direction, {}, port_id,
network_id, delete=True)
for direction in constants.VALID_DIRECTIONS]:
# TODO(lucasagomes): qos_del() in ovsdbapp doesn't support
# if_exists=True
try:
txn.add(self._driver._nb_idl.qos_del(**ovn_rule))
except idlutils.RowNotFound:
continue
txn.add(self.nb_idl.qos_del(**ovn_rule))
if not qos_policy_id:
return # If no QoS policy is defined, there are no QoS rules.
def _add_port_qos_rules(self, txn, port_id, network_id, qos_policy_id,
qos_rules):
# NOTE(ralonsoh): we don't use the transaction context because the
# QoS policy could belong to another user (network QoS policy).
admin_context = n_context.get_admin_context()
# TODO(ralonsoh): for update_network and update_policy operations,
# the QoS rules can be retrieved only once.
qos_rules = qos_rules or self._qos_rules(admin_context, qos_policy_id)
for direction, rules in qos_rules.items():
# "delete=not rule": that means, when we don't have rules, we
# generate a "ovn_rule" to be used as input in a "qos_del" method.
ovn_rule = self._ovn_qos_rule(direction, rules, port_id,
network_id)
if ovn_rule:
txn.add(self._driver._nb_idl.qos_add(**ovn_rule))
network_id, delete=not rules)
if rules:
# NOTE(ralonsoh): with "may_exist=True", the "qos_add" will
# create the QoS OVN rule or update the existing one.
txn.add(self.nb_idl.qos_add(**ovn_rule, may_exist=True))
else:
# Delete, if exists, the QoS rule in this direction.
txn.add(self.nb_idl.qos_del(**ovn_rule, if_exists=True))
def _update_port_qos_rules(self, txn, port_id, network_id, qos_policy_id,
qos_rules):
if not qos_policy_id:
self._delete_port_qos_rules(txn, port_id, network_id)
else:
self._add_port_qos_rules(txn, port_id, network_id, qos_policy_id,
qos_rules)
def create_port(self, txn, port):
self.update_port(txn, port, None, reset=True)
@ -242,9 +256,9 @@ class OVNClientQosExtension(object):
return
qos_policy_id = (None if delete else
self._port_effective_qos_policy_id(port)[0])
self.port_effective_qos_policy_id(port)[0])
if not reset and not delete:
original_qos_policy_id = self._port_effective_qos_policy_id(
original_qos_policy_id = self.port_effective_qos_policy_id(
original_port)[0]
if qos_policy_id == original_qos_policy_id:
return # No QoS policy change
@ -281,26 +295,30 @@ class OVNClientQosExtension(object):
return updated_port_ids
def _delete_fip_qos_rules(self, txn, fip_id, network_id):
if network_id:
lswitch_name = utils.ovn_name(network_id)
txn.add(self.nb_idl.qos_del_ext_ids(
lswitch_name,
{ovn_const.OVN_FIP_EXT_ID_KEY: fip_id}))
def create_floatingip(self, txn, floatingip):
self.update_floatingip(txn, floatingip)
def update_floatingip(self, txn, floatingip):
router_id = floatingip.get('router_id')
qos_policy_id = floatingip.get('qos_policy_id')
if floatingip['floating_network_id']:
lswitch_name = utils.ovn_name(floatingip['floating_network_id'])
txn.add(self._driver._nb_idl.qos_del_ext_ids(
lswitch_name,
{ovn_const.OVN_FIP_EXT_ID_KEY: floatingip['id']}))
if not (router_id and qos_policy_id):
return
return self._delete_fip_qos_rules(
txn, floatingip['id'], floatingip['floating_network_id'])
admin_context = n_context.get_admin_context()
router_db = self._plugin_l3._get_router(admin_context, router_id)
gw_port_id = router_db.get('gw_port_id')
if not gw_port_id:
return
return self._delete_fip_qos_rules(
txn, floatingip['id'], floatingip['floating_network_id'])
if ovn_conf.is_ovn_distributed_floating_ip():
# DVR, floating IP GW is in the same compute node as private port.
@ -311,13 +329,20 @@ class OVNClientQosExtension(object):
qos_rules = self._qos_rules(admin_context, qos_policy_id)
for direction, rules in qos_rules.items():
# "delete=not rule": that means, when we don't have rules, we
# generate a "ovn_rule" to be used as input in a "qos_del" method.
ovn_rule = self._ovn_qos_rule(
direction, rules, gw_port_id,
floatingip['floating_network_id'], fip_id=floatingip['id'],
ip_address=floatingip['floating_ip_address'],
resident_port=resident_port)
if ovn_rule:
txn.add(self._driver._nb_idl.qos_add(**ovn_rule))
resident_port=resident_port, delete=not rules)
if rules:
# NOTE(ralonsoh): with "may_exist=True", the "qos_add" will
# create the QoS OVN rule or update the existing one.
txn.add(self.nb_idl.qos_add(**ovn_rule, may_exist=True))
else:
# Delete, if exists, the QoS rule in this direction.
txn.add(self.nb_idl.qos_del(**ovn_rule, if_exists=True))
def delete_floatingip(self, txn, floatingip):
self.update_floatingip(txn, floatingip)
@ -333,7 +358,7 @@ class OVNClientQosExtension(object):
# TODO(ralonsoh): we need to benchmark this transaction in systems with
# a huge amount of ports. This can take a while and could block other
# operations.
with self._driver._nb_idl.transaction(check_error=True) as txn:
with self.nb_idl.transaction(check_error=True) as txn:
for network_id in bound_networks:
network = {'qos_policy_id': policy.id, 'id': network_id}
updated_port_ids.update(

View File

@ -73,7 +73,7 @@ class OVNClient(object):
self._l3_plugin_property = None
# TODO(ralonsoh): handle the OVN client extensions with an ext. manager
self._qos_driver = qos_extension.OVNClientQosExtension(self)
self._qos_driver = qos_extension.OVNClientQosExtension(driver=self)
self._ovn_scheduler = l3_ovn_scheduler.get_scheduler()
@property

View File

@ -31,6 +31,8 @@ from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron import manager
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb.extensions import qos \
as ovn_qos
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.services.segments import db as segments_db
@ -103,6 +105,8 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
self.sync_port_dns_records(ctx)
self.sync_acls(ctx)
self.sync_routers_and_rports(ctx)
self.sync_port_qos_policies(ctx)
self.sync_fip_qos_policies(ctx)
def _create_port_in_ovn(self, ctx, port):
# Remove any old ACLs for the port to avoid creating duplicate ACLs.
@ -1169,6 +1173,36 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
LOG.debug('Port Groups Migration task finished')
def sync_port_qos_policies(self, ctx):
"""Sync port QoS policies.
This method reads the port QoS policy assigned or the one inherited
from the network. Does not apply to "network" owned ports.
"""
LOG.debug('Port QoS policies migration task started')
ovn_qos_ext = ovn_qos.OVNClientQosExtension(nb_idl=self.ovn_api)
with db_api.CONTEXT_READER.using(ctx), \
self.ovn_api.transaction(check_error=True) as txn:
for port in self.core_plugin.get_ports(ctx):
if not ovn_qos_ext.port_effective_qos_policy_id(port)[0]:
continue
ovn_qos_ext.create_port(txn, port)
LOG.debug('Port QoS policies migration task finished')
def sync_fip_qos_policies(self, ctx):
"""Sync floating IP QoS policies."""
LOG.debug('Floating IP QoS policies migration task started')
ovn_qos_ext = ovn_qos.OVNClientQosExtension(nb_idl=self.ovn_api)
with db_api.CONTEXT_READER.using(ctx), \
self.ovn_api.transaction(check_error=True) as txn:
for fip in self.l3_plugin.get_floatingips(ctx):
if not fip.get('qos_policy_id'):
continue
ovn_qos_ext.create_floatingip(txn, fip)
LOG.debug('Floating IP QoS policies migration task finished')
class OvnSbSynchronizer(OvnDbSynchronizer):
"""Synchronizer class for SB."""

View File

@ -46,25 +46,18 @@ QOS_RULES_2 = {
QOS_RULES_3 = {
constants.INGRESS_DIRECTION: {
qos_constants.RULE_TYPE_BANDWIDTH_LIMIT: QOS_RULE_BW_1,
qos_constants.RULE_TYPE_DSCP_MARKING: QOS_RULE_DSCP_1}
qos_constants.RULE_TYPE_BANDWIDTH_LIMIT: QOS_RULE_BW_1}
}
class _OVNClient(object):
def __init__(self, nd_idl):
self._nb_idl = nd_idl
class TestOVNClientQosExtension(base.TestOVNFunctionalBase):
def setUp(self, maintenance_worker=False):
super(TestOVNClientQosExtension, self).setUp(
maintenance_worker=maintenance_worker)
self._add_logical_switch()
_ovn_client = _OVNClient(self.nb_api)
self.qos_driver = qos_extension.OVNClientQosExtension(_ovn_client)
self.qos_driver = qos_extension.OVNClientQosExtension(
nb_idl=self.nb_api)
self.gw_port_id = 'gw_port_id'
self._mock_get_router = mock.patch.object(l3_db.L3_NAT_dbonly_mixin,
'_get_router')
@ -93,7 +86,7 @@ class TestOVNClientQosExtension(base.TestOVNFunctionalBase):
fip_id=fip_id, ip_address=ip_address)
with self.nb_api.transaction(check_error=True):
ls = self.qos_driver._driver._nb_idl.lookup(
ls = self.qos_driver.nb_idl.lookup(
'Logical_Switch', ovn_utils.ovn_name(self.network_1))
self.assertEqual(len(rules), len(ls.qos_rules))
for rule in ls.qos_rules:
@ -116,7 +109,10 @@ class TestOVNClientQosExtension(base.TestOVNFunctionalBase):
def update_and_check(qos_rules):
with self.nb_api.transaction(check_error=True) as txn:
self.mock_qos_rules.return_value = qos_rules
_qos_rules = copy.deepcopy(qos_rules)
for direction in constants.VALID_DIRECTIONS:
_qos_rules[direction] = _qos_rules.get(direction, {})
self.mock_qos_rules.return_value = _qos_rules
self.qos_driver._update_port_qos_rules(
txn, port, self.network_1, 'qos1', None)
self._check_rules(qos_rules, port, self.network_1)
@ -128,7 +124,10 @@ class TestOVNClientQosExtension(base.TestOVNFunctionalBase):
def _update_fip_and_check(self, fip, qos_rules):
with self.nb_api.transaction(check_error=True) as txn:
self.mock_qos_rules.return_value = qos_rules
_qos_rules = copy.deepcopy(qos_rules)
for direction in constants.VALID_DIRECTIONS:
_qos_rules[direction] = _qos_rules.get(direction, {})
self.mock_qos_rules.return_value = _qos_rules
self.qos_driver.update_floatingip(txn, fip)
self._check_rules(qos_rules, self.gw_port_id, self.network_1,
fip_id='fip_id', ip_address='1.2.3.4')

View File

@ -15,18 +15,6 @@
from collections import namedtuple
import netaddr
from neutron.common.ovn import acl as acl_utils
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
from neutron.services.portforwarding.drivers.ovn.driver import \
OVNPortForwarding as ovn_pf
from neutron.services.segments import db as segments_db
from neutron.tests.functional import base
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.extensions import test_extraroute
from neutron.tests.unit.extensions import test_securitygroup
from neutron_lib.api.definitions import dns as dns_apidef
from neutron_lib.api.definitions import fip_pf_description as ext_pf_def
from neutron_lib.api.definitions import floating_ip_port_forwarding as pf_def
@ -34,16 +22,33 @@ from neutron_lib.api.definitions import l3
from neutron_lib.api.definitions import port_security as ps
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.services.qos import constants as qos_const
from oslo_utils import uuidutils
from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp import constants as ovsdbapp_const
from neutron.common.ovn import acl as acl_utils
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb.extensions \
import qos as qos_extension
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
from neutron.services.portforwarding.drivers.ovn.driver import \
OVNPortForwarding as ovn_pf
from neutron.services.revisions import revision_plugin
from neutron.services.segments import db as segments_db
from neutron.tests.functional import base
from neutron.tests.unit.api import test_extensions
from neutron.tests.unit.extensions import test_extraroute
from neutron.tests.unit.extensions import test_securitygroup
class TestOvnNbSync(base.TestOVNFunctionalBase):
_extension_drivers = ['port_security', 'dns']
_extension_drivers = ['port_security', 'dns', 'qos', 'revision_plugin']
def setUp(self):
def setUp(self, *args):
ovn_config.cfg.CONF.set_override('dns_domain', 'ovn.test')
super(TestOvnNbSync, self).setUp(maintenance_worker=True)
ext_mgr = test_extraroute.ExtraRouteTestExtensionManager()
@ -83,11 +88,18 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.match_old_mac_dhcp_subnets = []
self.expected_dns_records = []
self.expected_ports_with_unknown_addr = []
self.expected_qos_records = []
self.ctx = context.get_admin_context()
ovn_config.cfg.CONF.set_override('ovn_metadata_enabled', True,
group='ovn')
ovn_config.cfg.CONF.set_override(
'enable_distributed_floating_ip', True, group='ovn')
self.rp = revision_plugin.RevisionPlugin()
self.qos_driver = qos_extension.OVNClientQosExtension(
nb_idl=self.nb_api)
def get_additional_service_plugins(self):
return {'qos': 'qos', 'segments': 'segments'}
def _api_for_resource(self, resource):
if resource in ['security-groups']:
@ -1508,6 +1520,20 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
self.assertRaises(AssertionError, self.assertCountEqual,
self.expected_dns_records, observed_dns_records)
def _validate_qos_records(self, should_match=True):
observed_qos_records = []
for qos_row in self.nb_api.tables['QoS'].rows.values():
observed_qos_records.append({
'action': qos_row.action, 'bandwidth': qos_row.bandwidth,
'direction': qos_row.direction, 'match': qos_row.match,
'external_ids': qos_row.external_ids})
if should_match:
self.assertCountEqual(self.expected_qos_records,
observed_qos_records)
else:
self.assertEqual([], observed_qos_records)
def _validate_resources(self, should_match=True):
self._validate_networks(should_match=should_match)
self._validate_metadata_ports(should_match=should_match)
@ -1562,6 +1588,152 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
def test_ovn_nb_sync_off(self):
self._test_ovn_nb_sync_helper('off', should_match_after_sync=False)
def test_sync_port_qos_policies(self):
res = self._create_network(self.fmt, 'n1', True)
net = self.deserialize(self.fmt, res)['network']
self._create_subnet(self.fmt, net['id'], '10.0.0.0/24')
res = self._create_qos_policy(self.fmt, 'qos_maxbw')
qos_maxbw = self.deserialize(self.fmt, res)['policy']
self._create_qos_rule(self.fmt, qos_maxbw['id'],
qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
max_kbps=1000, max_burst_kbps=800)
self._create_qos_rule(self.fmt, qos_maxbw['id'],
qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
direction=constants.INGRESS_DIRECTION,
max_kbps=700, max_burst_kbps=600)
res = self._create_qos_policy(self.fmt, 'qos_maxbw')
qos_dscp = self.deserialize(self.fmt, res)['policy']
self._create_qos_rule(self.fmt, qos_dscp['id'],
qos_const.RULE_TYPE_DSCP_MARKING, dscp_mark=14)
res = self._create_port(
self.fmt, net['id'], arg_list=('qos_policy_id', ),
name='n1-port1', device_owner='compute:nova',
qos_policy_id=qos_maxbw['id'])
port_1 = self.deserialize(self.fmt, res)['port']
res = self._create_port(
self.fmt, net['id'], arg_list=('qos_policy_id', ),
name='n1-port2', device_owner='compute:nova',
qos_policy_id=qos_dscp['id'])
port_2 = self.deserialize(self.fmt, res)['port']
# Check QoS policies have been correctly created in OVN DB.
self.expected_qos_records = [
{'action': {}, 'bandwidth': {'burst': 800, 'rate': 1000},
'direction': 'from-lport',
'match': 'inport == "%s"' % port_1['id'],
'external_ids': {ovn_const.OVN_PORT_EXT_ID_KEY: port_1['id']}},
{'action': {}, 'bandwidth': {'burst': 600, 'rate': 700},
'direction': 'to-lport',
'match': 'outport == "%s"' % port_1['id'],
'external_ids': {ovn_const.OVN_PORT_EXT_ID_KEY: port_1['id']}},
{'action': {'dscp': 14}, 'bandwidth': {},
'direction': 'from-lport',
'match': 'inport == "%s"' % port_2['id'],
'external_ids': {ovn_const.OVN_PORT_EXT_ID_KEY: port_2['id']}}]
self._validate_qos_records()
# Delete QoS policies from the OVN DB.
with self.nb_api.transaction(check_error=True) as txn:
for port in (port_1, port_2):
for ovn_rule in [self.qos_driver._ovn_qos_rule(
direction, {}, port['id'], port['network_id'],
delete=True)
for direction in constants.VALID_DIRECTIONS]:
txn.add(self.nb_api.qos_del(**ovn_rule))
self._validate_qos_records(should_match=False)
# Manually sync port QoS registers.
nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(
self.plugin, self.mech_driver.nb_ovn, self.mech_driver.sb_ovn,
'log', self.mech_driver)
ctx = context.get_admin_context()
nb_synchronizer.sync_port_qos_policies(ctx)
self._validate_qos_records()
def _create_floatingip(self, fip_network_id, port_id, qos_policy_id):
body = {'tenant_id': self._tenant_id,
'floating_network_id': fip_network_id,
'port_id': port_id,
'qos_policy_id': qos_policy_id}
return self.l3_plugin.create_floatingip(self.context,
{'floatingip': body})
def test_sync_fip_qos_policies(self):
res = self._create_network(self.fmt, 'n1_ext', True,
arg_list=('router:external', ),
**{'router:external': True})
net_ext = self.deserialize(self.fmt, res)['network']
res = self._create_subnet(self.fmt, net_ext['id'], '10.0.0.0/24')
subnet_ext = self.deserialize(self.fmt, res)['subnet']
res = self._create_network(self.fmt, 'n1_int', True)
net_int = self.deserialize(self.fmt, res)['network']
self._create_subnet(self.fmt, net_int['id'], '10.10.0.0/24')
res = self._create_qos_policy(self.fmt, 'qos_maxbw')
qos_maxbw = self.deserialize(self.fmt, res)['policy']
self._create_qos_rule(self.fmt, qos_maxbw['id'],
qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
max_kbps=1000, max_burst_kbps=800)
self._create_qos_rule(self.fmt, qos_maxbw['id'],
qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
direction=constants.INGRESS_DIRECTION,
max_kbps=700, max_burst_kbps=600)
# Create a router with net_ext as GW network and net_int as internal
# one, and a floating IP on the external network.
data = {'name': 'r1', 'admin_state_up': True,
'tenant_id': self._tenant_id,
'external_gateway_info': {
'enable_snat': True,
'network_id': net_ext['id'],
'external_fixed_ips': [{'ip_address': '10.0.0.5',
'subnet_id': subnet_ext['id']}]}
}
router = self.l3_plugin.create_router(self.context, {'router': data})
net_int_prtr = self._make_port(self.fmt, net_int['id'],
name='n1_int-p-rtr')['port']
self.l3_plugin.add_router_interface(
self.context, router['id'], {'port_id': net_int_prtr['id']})
fip = self._create_floatingip(net_ext['id'], net_int_prtr['id'],
qos_maxbw['id'])
# Check QoS policies have been correctly created in OVN DB.
fip_match = ('%s == "%s" && ip4.%s == %s && '
'is_chassis_resident("%s")')
self.expected_qos_records = [
{'action': {}, 'bandwidth': {'burst': 600, 'rate': 700},
'direction': 'to-lport',
'external_ids': {'neutron:fip_id': fip['id']},
'match': fip_match % ('outport', router['gw_port_id'], 'dst',
fip['floating_ip_address'],
net_int_prtr['id'])},
{'action': {}, 'bandwidth': {'burst': 800, 'rate': 1000},
'direction': 'from-lport',
'external_ids': {'neutron:fip_id': fip['id']},
'match': fip_match % ('inport', router['gw_port_id'], 'src',
fip['floating_ip_address'],
net_int_prtr['id'])}]
self._validate_qos_records()
# Delete QoS policies from the OVN DB.
with self.nb_api.transaction(check_error=True) as txn:
lswitch_name = utils.ovn_name(net_ext['id'])
txn.add(self.nb_api.qos_del_ext_ids(
lswitch_name, {ovn_const.OVN_FIP_EXT_ID_KEY: fip['id']}))
self._validate_qos_records(should_match=False)
# Manually sync port QoS registers.
nb_synchronizer = ovn_db_sync.OvnNbSynchronizer(
self.plugin, self.mech_driver.nb_ovn, self.mech_driver.sb_ovn,
'log', self.mech_driver)
ctx = context.get_admin_context()
nb_synchronizer.sync_fip_qos_policies(ctx)
self._validate_qos_records()
class TestOvnSbSync(base.TestOVNFunctionalBase):

View File

@ -30,6 +30,7 @@ from neutron_lib.db import api as db_api
from neutron_lib import exceptions as lib_exc
from neutron_lib import fixture
from neutron_lib.plugins import directory
from neutron_lib.services.qos import constants as qos_const
from neutron_lib.tests import tools
from neutron_lib.utils import helpers
from neutron_lib.utils import net
@ -573,6 +574,58 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(fmt, res)
def _create_qos_rule(self, fmt, qos_policy_id, rule_type, max_kbps=None,
max_burst_kbps=None, dscp_mark=None, min_kbps=None,
direction=constants.EGRESS_DIRECTION,
expected_res_status=None, project_id=None,
set_context=False, is_admin=False):
# Accepted rule types: "bandwidth_limit", "dscp_marking" and
# "minimum_bandwidth"
self.assertIn(rule_type, [qos_const.RULE_TYPE_BANDWIDTH_LIMIT,
qos_const.RULE_TYPE_DSCP_MARKING,
qos_const.RULE_TYPE_MINIMUM_BANDWIDTH])
project_id = project_id or self._tenant_id
type_req = rule_type + '_rule'
data = {type_req: {'project_id': project_id}}
if rule_type == qos_const.RULE_TYPE_BANDWIDTH_LIMIT:
data[type_req][qos_const.MAX_KBPS] = max_kbps
data[type_req][qos_const.MAX_BURST] = max_burst_kbps
data[type_req][qos_const.DIRECTION] = direction
elif rule_type == qos_const.RULE_TYPE_DSCP_MARKING:
data[type_req][qos_const.DSCP_MARK] = dscp_mark
else:
data[type_req][qos_const.MIN_KBPS] = min_kbps
data[type_req][qos_const.DIRECTION] = direction
route = 'qos/policies/%s/%s' % (qos_policy_id, type_req + 's')
qos_rule_req = self.new_create_request(route, data, fmt)
if set_context and project_id:
# create a specific auth context for this request
qos_rule_req.environ['neutron.context'] = context.Context(
'', project_id, is_admin=is_admin)
qos_rule_res = qos_rule_req.get_response(self.api)
if expected_res_status:
self.assertEqual(expected_res_status, qos_rule_res.status_int)
return qos_rule_res
def _create_qos_policy(self, fmt, qos_policy_name=None,
expected_res_status=None, project_id=None,
set_context=False, is_admin=False):
project_id = project_id or self._tenant_id
name = qos_policy_name or uuidutils.generate_uuid()
data = {'policy': {'name': name,
'project_id': project_id}}
qos_req = self.new_create_request('policies', data, fmt)
if set_context and project_id:
# create a specific auth context for this request
qos_req.environ['neutron.context'] = context.Context(
'', project_id, is_admin=is_admin)
qos_policy_res = qos_req.get_response(self.api)
if expected_res_status:
self.assertEqual(expected_res_status, qos_policy_res.status_int)
return qos_policy_res
def _api_for_resource(self, resource):
if resource in ['networks', 'subnets', 'ports', 'subnetpools',
'security-groups']:

View File

@ -75,7 +75,8 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.txn = _Context()
mock_driver = mock.Mock()
mock_driver._nb_idl.transaction.return_value = self.txn
self.qos_driver = qos_extension.OVNClientQosExtension(mock_driver)
self.qos_driver = qos_extension.OVNClientQosExtension(
driver=mock_driver)
self._mock_rules = mock.patch.object(self.qos_driver,
'_update_port_qos_rules')
self.mock_rules = self._mock_rules.start()
@ -250,28 +251,28 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
def test__port_effective_qos_policy_id(self):
port = {'qos_policy_id': 'qos1'}
self.assertEqual(('qos1', 'port'),
self.qos_driver._port_effective_qos_policy_id(port))
self.qos_driver.port_effective_qos_policy_id(port))
port = {'qos_network_policy_id': 'qos1'}
self.assertEqual(('qos1', 'network'),
self.qos_driver._port_effective_qos_policy_id(port))
self.qos_driver.port_effective_qos_policy_id(port))
port = {'qos_policy_id': 'qos_port',
'qos_network_policy_id': 'qos_network'}
self.assertEqual(('qos_port', 'port'),
self.qos_driver._port_effective_qos_policy_id(port))
self.qos_driver.port_effective_qos_policy_id(port))
port = {}
self.assertEqual((None, None),
self.qos_driver._port_effective_qos_policy_id(port))
self.qos_driver.port_effective_qos_policy_id(port))
port = {'qos_policy_id': None, 'qos_network_policy_id': None}
self.assertEqual((None, None),
self.qos_driver._port_effective_qos_policy_id(port))
self.qos_driver.port_effective_qos_policy_id(port))
port = {'qos_policy_id': 'qos1', 'device_owner': 'neutron:port'}
self.assertEqual((None, None),
self.qos_driver._port_effective_qos_policy_id(port))
self.qos_driver.port_effective_qos_policy_id(port))
def test_update_port(self):
port = self.ports[0]
@ -495,6 +496,11 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
mock_update_fip.assert_called_once_with(self.txn, fip)
def test_update_floatingip(self):
# NOTE(ralonsoh): this rule will always apply:
# - If the FIP is being deleted, "qos_del_ext_ids" is called;
# "qos_add" and "qos_del" won't.
# - If the FIP is added or updated, "qos_del_ext_ids" won't be called
# and "qos_add" or "qos_del" will, depending on the rule directions.
nb_idl = self.qos_driver._driver._nb_idl
fip = self.fips[0]
original_fip = self.fips[1]
@ -504,6 +510,7 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()
nb_idl.reset_mock()
# Attach a port and a router, not QoS policy
@ -513,14 +520,19 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()
nb_idl.reset_mock()
# Add a QoS policy
fip.qos_policy_id = self.qos_policies[0].id
fip.update()
self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_del_ext_ids.assert_not_called()
# QoS DSCP rule has only egress direction, ingress one is deleted.
# Check "OVNClientQosExtension.update_floatingip" and how the OVN QoS
# rules are added (if there is a rule in this direction) or deleted.
nb_idl.qos_add.assert_called_once()
nb_idl.qos_del.assert_called_once()
nb_idl.reset_mock()
# Remove QoS
@ -531,6 +543,7 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()
nb_idl.reset_mock()
# Add again another QoS policy
@ -539,8 +552,9 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
original_fip.qos_policy_id = None
original_fip.update()
self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_del_ext_ids.assert_not_called()
nb_idl.qos_add.assert_called_once()
nb_idl.qos_del.assert_called_once()
nb_idl.reset_mock()
# Detach the port and the router
@ -554,6 +568,7 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip)
nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()
nb_idl.reset_mock()
# Force reset (delete any QoS)
@ -562,3 +577,4 @@ class TestOVNClientQosExtension(test_plugin.Ml2PluginV2TestCase):
self.qos_driver.update_floatingip(txn, fip_dict)
nb_idl.qos_del_ext_ids.assert_called_once()
nb_idl.qos_add.assert_not_called()
nb_idl.qos_del.assert_not_called()

View File

@ -45,7 +45,7 @@ oslo.versionedobjects>=1.35.1 # Apache-2.0
osprofiler>=2.3.0 # Apache-2.0
os-ken >= 0.3.0 # Apache-2.0
ovs>=2.10.0 # Apache-2.0
ovsdbapp>=1.9.2 # Apache-2.0
ovsdbapp>=1.9.3 # Apache-2.0
packaging>=20.4 # Apache-2.0
psutil>=5.3.0 # BSD
pyroute2>=0.6.6;sys_platform!='win32' # Apache-2.0 (+ dual licensed GPL2)