NSX|V add qos support for networks

Add support for the qos service in NSX|V, including:
- Attach/Detach qos policy to a new or updated network
- Allow qos configuration on a backend network only,
  and only if use_dvs_features is True
- Update the bw limitations on the edge through the dvs
- Update the networks bw limitations when a policy or rule changes
  through the QoS notification driver

Change-Id: Icee25b59e8e0f3c1c093077b631250a908e127c1
This commit is contained in:
Adit Sarfaty 2016-03-15 11:51:28 +02:00
parent 42cc581c01
commit 24a84004ff
13 changed files with 764 additions and 6 deletions

View File

@ -34,7 +34,7 @@ neutron.core_plugins =
vmware = vmware_nsx.plugin:NsxMhPlugin
neutron.service_plugins =
vmware_nsx_l2gw = vmware_nsx.services.l2gateway.common.plugin:NsxL2GatewayPlugin
vmware_nsx_qos = vmware_nsx.services.qos.plugin:NsxQosPlugin
vmware_nsxv_qos = vmware_nsx.services.qos.nsx_v.plugin:NsxVQosPlugin
vmware_nsx.neutron.nsxv.router_type_drivers =
shared = vmware_nsx.plugins.nsx_v.drivers.shared_router_driver:RouterSharedDriver
distributed = vmware_nsx.plugins.nsx_v.drivers.distributed_router_driver:RouterDistributedDriver

View File

@ -22,6 +22,7 @@ from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.dvs import dvs_utils
LOG = logging.getLogger(__name__)
PORTGROUP_PREFIX = 'dvportgroup'
class DvsManager(object):
@ -118,6 +119,128 @@ class DvsManager(object):
return val
raise exceptions.NetworkNotFound(net_id=net_id)
def _is_vlan_network_by_moref(self, moref):
"""
This can either be a VXLAN or a VLAN network. The type is determined
by the prefix of the moref.
"""
return moref.startswith(PORTGROUP_PREFIX)
def _copy_port_group_spec(self, orig_spec):
client_factory = self._session.vim.client.factory
pg_spec = client_factory.create('ns0:DVPortgroupConfigSpec')
pg_spec.autoExpand = orig_spec['autoExpand']
pg_spec.configVersion = orig_spec['configVersion']
pg_spec.defaultPortConfig = orig_spec['defaultPortConfig']
pg_spec.name = orig_spec['name']
pg_spec.numPorts = orig_spec['numPorts']
pg_spec.policy = orig_spec['policy']
pg_spec.type = orig_spec['type']
return pg_spec
def update_port_group_spec_qos(self, pg_spec, qos_data):
outPol = pg_spec.defaultPortConfig.outShapingPolicy
if qos_data.enabled:
outPol.inherited = False
outPol.enabled.inherited = False
outPol.enabled.value = True
outPol.averageBandwidth.inherited = False
outPol.averageBandwidth.value = qos_data.averageBandwidth
outPol.peakBandwidth.inherited = False
outPol.peakBandwidth.value = qos_data.peakBandwidth
outPol.burstSize.inherited = False
outPol.burstSize.value = qos_data.burstSize
else:
outPol.inherited = True
def _reconfigure_port_group(self, pg_moref, spec_update_calback,
spec_update_data):
# Get the current configuration of the port group
pg_spec = self._session.invoke_api(vim_util,
'get_object_properties',
self._session.vim,
pg_moref, ['config'])
if len(pg_spec) == 0 or len(pg_spec[0].propSet[0]) == 0:
LOG.error(_LE('Failed to get object properties of %s'), pg_moref)
raise nsx_exc.DvsNotFound(dvs=pg_moref)
# Convert the extracted config to DVPortgroupConfigSpec
new_spec = self._copy_port_group_spec(pg_spec[0].propSet[0].val)
# Update the configuration using the callback & data
spec_update_calback(new_spec, spec_update_data)
# Update the port group configuration
task = self._session.invoke_api(self._session.vim,
'ReconfigureDVPortgroup_Task',
pg_moref, spec=new_spec)
try:
self._session.wait_for_task(task)
except Exception:
LOG.error(_LE('Failed to reconfigure DVPortGroup %s'), pg_moref)
raise nsx_exc.DvsNotFound(dvs=pg_moref)
# Update the dvs port groups config for a vxlan/vlan network
# update the spec using a callback and user data
def update_port_groups_config(self, net_id, net_moref,
spec_update_calback, spec_update_data):
is_vlan = self._is_vlan_network_by_moref(net_moref)
if is_vlan:
return self._update_net_port_groups_config(net_moref,
spec_update_calback,
spec_update_data)
else:
return self._update_vxlan_port_groups_config(net_id,
net_moref,
spec_update_calback,
spec_update_data)
# Update the dvs port groups config for a vxlan network
# Searching the port groups for a partial match to the network id & moref
# update the spec using a callback and user data
def _update_vxlan_port_groups_config(self,
net_id,
net_moref,
spec_update_calback,
spec_update_data):
port_groups = self._session.invoke_api(vim_util,
'get_object_properties',
self._session.vim,
self._dvs_moref,
['portgroup'])
found = False
if len(port_groups) and hasattr(port_groups[0], 'propSet'):
for prop in port_groups[0].propSet:
for pg_moref in prop.val[0]:
props = self._session.invoke_api(vim_util,
'get_object_properties',
self._session.vim,
pg_moref, ['name'])
if len(props) and hasattr(props[0], 'propSet'):
for prop in props[0].propSet:
if net_id in prop.val and net_moref in prop.val:
found = True
self._reconfigure_port_group(
pg_moref,
spec_update_calback,
spec_update_data)
if not found:
raise exceptions.NetworkNotFound(net_id=net_id)
# Update the dvs port groups config for a vlan network
# Finding the port group using the exact moref of the network
# update the spec using a callback and user data
def _update_net_port_groups_config(self,
net_moref,
spec_update_calback,
spec_update_data):
pg_moref = vim_util.get_moref(net_moref,
"DistributedVirtualPortgroup")
self._reconfigure_port_group(pg_moref,
spec_update_calback,
spec_update_data)
def delete_port_group(self, net_id):
"""Delete a specific port group."""
moref = self._net_id_to_moref(net_id)

View File

@ -27,10 +27,14 @@ from oslo_utils import uuidutils
from sqlalchemy.orm import exc as sa_exc
from neutron.api import extensions as neutron_extensions
from neutron.api.rpc.callbacks.consumer import registry as callbacks_registry
from neutron.api.rpc.callbacks import resources as callbacks_resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.api.v2 import attributes as attr
from neutron.callbacks import events
from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import rpc as n_rpc
from neutron import context as n_context
from neutron.db import agents_db
from neutron.db import allowedaddresspairs_db as addr_pair_db
@ -56,7 +60,9 @@ from neutron.extensions import securitygroup as ext_sg
from neutron.plugins.common import constants as plugin_const
from neutron.plugins.common import utils
from neutron.quota import resource_registry
from neutron.services.qos import qos_consts
from vmware_nsx.dvs import dvs
from vmware_nsx.services.qos.nsx_v import utils as qos_utils
import vmware_nsx
from vmware_nsx._i18n import _, _LE, _LI, _LW
@ -183,6 +189,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
else:
self._dvs = None
# Bind QoS notifications
callbacks_registry.subscribe(self._handle_qos_notification,
callbacks_resources.QOS_POLICY)
self._start_rpc_listeners()
has_metadata_cfg = (
cfg.CONF.nsxv.nova_metadata_ips
and cfg.CONF.nsxv.mgt_net_moid
@ -192,6 +203,16 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self.metadata_proxy_handler = (
nsx_v_md_proxy.NsxVMetadataProxyHandler(self))
def _start_rpc_listeners(self):
self.conn = n_rpc.create_connection()
qos_topic = resources_rpc.resource_type_versioned_topic(
callbacks_resources.QOS_POLICY)
self.conn.create_consumer(
qos_topic, [resources_rpc.ResourcesPushRpcCallback()],
fanout=False)
return self.conn.consume_in_threads()
def _create_security_group_container(self):
name = "OpenStack Security Group container"
with locking.LockManager.get_lock('security-group-container-init'):
@ -342,6 +363,18 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context, neutron_port_db['id'], network_id,
neutron_port_db['mac_address'])
def _validate_network_qos(self, network, backend_network):
err_msg = None
if attr.is_attr_set(network.get(qos_consts.QOS_POLICY_ID)):
if not backend_network:
err_msg = (_("Cannot configure QOS on external networks"))
if not cfg.CONF.nsxv.use_dvs_features:
err_msg = (_("Cannot configure QOS "
"without enabling use_dvs_features"))
if err_msg:
raise n_exc.InvalidInput(error_message=err_msg)
def _validate_provider_create(self, context, network):
if not attr.is_attr_set(network.get(mpnet.SEGMENTS)):
return
@ -678,6 +711,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
external = net_data.get(ext_net_extn.EXTERNAL)
backend_network = (not attr.is_attr_set(external) or
attr.is_attr_set(external) and not external)
self._validate_network_qos(net_data, backend_network)
if backend_network:
network_type = None
#NOTE(abhiraut): Consider refactoring code below to have more
@ -704,9 +739,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
h, c = self.nsx_v.vcns.create_virtual_wire(vdn_scope_id,
config_spec)
net_morefs = [c]
dvs_net_ids = [net_data['id']]
elif network_type == c_utils.NsxVNetworkTypes.PORTGROUP:
segment = net_data[mpnet.SEGMENTS][0]
net_morefs = [segment.get(pnet.PHYSICAL_NETWORK)]
dvs_net_ids = [net_data['name']]
else:
segment = net_data[mpnet.SEGMENTS][0]
physical_network = segment.get(pnet.PHYSICAL_NETWORK)
@ -714,6 +751,7 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# If physical_network attr is not set, retrieve a list
# consisting of a single dvs-id pre-configured in nsx.ini
dvs_ids = self._get_dvs_ids(physical_network)
dvs_net_ids = []
# Save the list of netmorefs from the backend
net_morefs = []
dvs_pg_mappings = {}
@ -731,6 +769,8 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self._delete_backend_network(net_moref)
dvs_pg_mappings[dvs_id] = net_moref
net_morefs.append(net_moref)
dvs_net_ids.append(self._get_vlan_network_name(
net_data, dvs_id))
try:
# Create SpoofGuard policy for network anti-spoofing
if cfg.CONF.nsxv.spoofguard_enabled and backend_network:
@ -801,12 +841,36 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
self._delete_backend_network(net_moref)
LOG.exception(_LE('Failed to create network'))
if backend_network:
# Update the QOS restrictions of the backend network
self._update_network_qos(context, net_data, dvs_net_ids, net_moref)
# this extra lookup is necessary to get the
# latest db model for the extension functions
net_model = self._get_network(context, new_net['id'])
self._apply_dict_extend_functions('networks', new_net, net_model)
return new_net
def _update_network_qos(self, context, net_data, dvs_net_ids, net_moref):
if attr.is_attr_set(net_data.get(qos_consts.QOS_POLICY_ID)):
# Translate the QoS rule data into Nsx values
qos_data = qos_utils.NsxVQosRule(
context=context,
qos_policy_id=net_data[qos_consts.QOS_POLICY_ID])
# update the qos data on the dvs
for dvs_net_id in dvs_net_ids:
self._dvs.update_port_groups_config(
dvs_net_id,
net_moref,
self._dvs.update_port_group_spec_qos, qos_data)
# attach the policy to the network in the neutron DB
qos_utils.update_network_policy_binding(
context,
net_data['id'],
net_data[qos_consts.QOS_POLICY_ID])
def _cleanup_dhcp_edge_before_deletion(self, context, net_id):
if self.metadata_proxy_handler:
# Find if this is the last network which is bound
@ -914,6 +978,9 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
if net_attrs.get("admin_state_up") is False:
raise NotImplementedError(_("admin_state_up=False networks "
"are not supported."))
net_morefs = nsx_db.get_nsx_switch_ids(context.session, id)
backend_network = True if len(net_morefs) > 0 else False
self._validate_network_qos(net_attrs, backend_network)
# PortSecurity validation checks
# TODO(roeyc): enacapsulate validation in a method
@ -938,7 +1005,6 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# old state
if cfg.CONF.nsxv.spoofguard_enabled and psec_update:
policy_id = nsxv_db.get_spoofguard_policy_id(context.session, id)
net_morefs = nsx_db.get_nsx_switch_ids(context.session, id)
try:
self.nsx_v.vcns.update_spoofguard_policy(
policy_id, net_morefs, id,
@ -951,6 +1017,25 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
context, revert_update, net_res)
super(NsxVPluginV2, self).update_network(
context, id, {'network': revert_update})
# Handle QOS updates (Value can be None, meaning to delete the
# current policy)
if qos_consts.QOS_POLICY_ID in net_attrs:
# update the qos data
qos_data = qos_utils.NsxVQosRule(
context=context,
qos_policy_id=net_attrs[qos_consts.QOS_POLICY_ID])
# get the network moref/s from the db
for moref in net_morefs:
# update the qos restrictions of the network
self._dvs.update_port_groups_config(
id, moref, self._dvs.update_port_group_spec_qos, qos_data)
# attach the policy to the network in neutron DB
qos_utils.update_network_policy_binding(
context, id, net_attrs[qos_consts.QOS_POLICY_ID])
return net_res
def _validate_address_pairs(self, attrs, db_port):
@ -2495,6 +2580,9 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
error = _("Configured %s not found") % field
raise nsx_exc.NsxPluginException(err_msg=error)
def _handle_qos_notification(self, qos_policy, event_type):
qos_utils.handle_qos_notification(qos_policy, event_type, self._dvs)
# Register the callback
def _validate_network_has_subnet(resource, event, trigger, **kwargs):

View File

@ -0,0 +1,41 @@
# Copyright 2016 VMware, Inc.
#
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.services.qos import qos_plugin
from oslo_config import cfg
from oslo_log import log as logging
from vmware_nsx._i18n import _, _LI
from vmware_nsx.common import exceptions as nsx_exc
LOG = logging.getLogger(__name__)
class NsxVQosPlugin(qos_plugin.QoSPlugin):
"""Service plugin for VMware NSX-v to implement Neutron's Qos API."""
supported_extension_aliases = ["qos"]
def __init__(self):
LOG.info(_LI("Loading VMware NSX-V Qos Service Plugin"))
super(NsxVQosPlugin, self).__init__()
if not cfg.CONF.nsxv.use_dvs_features:
error = _("Cannot use the NSX-V QoS plugin without "
"enabling the dvs features")
raise nsx_exc.NsxPluginException(err_msg=error)

View File

@ -0,0 +1,113 @@
# Copyright 2016 VMware, Inc.
#
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import events as callbacks_events
from neutron import context as n_context
from neutron import manager
from neutron.objects.qos import policy as qos_policy
from neutron.plugins.common import constants
from oslo_log import log as logging
from vmware_nsx.db import db as nsx_db
LOG = logging.getLogger(__name__)
class NsxVQosRule(object):
def __init__(self, context=None, qos_policy_id=None):
super(NsxVQosRule, self).__init__()
# Data structure to hold the NSX-V representation
# of the neutron qos rule.
self._qos_plugin = None
self.enabled = False
self.averageBandwidth = 0
self.peakBandwidth = 0
self.burstSize = 0
if qos_policy_id is not None:
self._init_from_policy_id(context, qos_policy_id)
def _get_qos_plugin(self):
if not self._qos_plugin:
loaded_plugins = manager.NeutronManager.get_service_plugins()
self._qos_plugin = loaded_plugins[constants.QOS]
return self._qos_plugin
# init the nsx_v qos data (outShapingPolicy) from a neutron qos policy
def _init_from_policy_id(self, context, qos_policy_id):
self.enabled = False
# read the neutron policy restrictions
if qos_policy_id is not None:
# read the QOS rule from DB
plugin = self._get_qos_plugin()
rules_obj = plugin.get_policy_bandwidth_limit_rules(
context, qos_policy_id)
if rules_obj is not None and len(rules_obj) > 0:
rule_obj = rules_obj[0]
self.enabled = True
# averageBandwidth: kbps (neutron) -> bps (nsxv)
self.averageBandwidth = rule_obj['max_kbps'] * 1024
# peakBandwidth: the same as the average value because the
# neutron qos configuration supports only 1 value
self.peakBandwidth = self.averageBandwidth
# burstSize: kbps (neutron) -> Bytes (nsxv)
self.burstSize = rule_obj['max_burst_kbps'] * 128
return self
def update_network_policy_binding(context, net_id, new_policy_id):
# detach the old policy (if exists) from the network
old_policy = qos_policy.QosPolicy.get_network_policy(
context, net_id)
if old_policy:
old_policy.detach_network(net_id)
# attach the new policy (if exists) to the network
if new_policy_id is not None:
new_policy = qos_policy.QosPolicy.get_object(
context, id=new_policy_id)
if new_policy:
new_policy.attach_network(net_id)
def handle_qos_notification(policy_obj, event_type, dvs):
# Check if QoS policy rule was created/deleted/updated
# Only if the policy rule was updated, we need to update the dvs
if (event_type == callbacks_events.UPDATED and
hasattr(policy_obj, "rules")):
# Reload the policy as admin so we will have a context
context = n_context.get_admin_context()
admin_policy = qos_policy.QosPolicy.get_object(
context, id=policy_obj.id)
# get all the bound networks of this policy
networks = admin_policy.get_bound_networks()
qos_rule = NsxVQosRule(context=context,
qos_policy_id=policy_obj.id)
for net_id in networks:
# update the new bw limitations for this network
net_morefs = nsx_db.get_nsx_switch_ids(context.session, net_id)
for moref in net_morefs:
# update the qos restrictions of the network
dvs.update_port_groups_config(
net_id,
moref,
dvs.update_port_group_spec_qos,
qos_rule)

View File

@ -28,14 +28,14 @@ from vmware_nsx.nsxlib import v3 as nsxlib
LOG = logging.getLogger(__name__)
class NsxQosPlugin(qos_plugin.QoSPlugin):
class NsxV3QosPlugin(qos_plugin.QoSPlugin):
"""Service plugin for VMware NSX to implement Neutron's Qos API."""
"""Service plugin for VMware NSX-v3 to implement Neutron's Qos API."""
supported_extension_aliases = ["qos"]
def __init__(self):
super(NsxQosPlugin, self).__init__()
super(NsxV3QosPlugin, self).__init__()
LOG.info(_LI("Loading VMware Qos Service Plugin"))
@db_base_plugin_common.convert_result_to_dict

View File

@ -101,6 +101,40 @@ class DvsTestCase(base.BaseTestCase):
'fake-uuid')
fake_get_moref.assert_called_once_with('fake-uuid')
@mock.patch.object(dvs.DvsManager, '_update_vxlan_port_groups_config')
@mock.patch.object(dvs.DvsManager, '_get_port_group_spec',
return_value='fake-spec')
@mock.patch.object(dvs.DvsManager, '_net_id_to_moref',
return_value='fake-moref')
def test_update_vxlan_net_group_conf(self, fake_get_moref,
fake_get_spec, fake_update_vxlan):
net_id = 'vxlan-uuid'
vlan = 7
self._dvs.add_port_group(net_id, vlan)
moref = self._dvs._net_id_to_moref(net_id)
fake_get_moref.assert_called_once_with(net_id)
fake_get_spec.assert_called_once_with(net_id, vlan)
self._dvs.update_port_groups_config(net_id, moref, None, None)
fake_update_vxlan.assert_called_once_with(net_id, moref, None, None)
@mock.patch.object(dvs.DvsManager, '_update_net_port_groups_config')
@mock.patch.object(dvs.DvsManager, '_get_port_group_spec',
return_value='fake-spec')
@mock.patch.object(dvs.DvsManager, '_net_id_to_moref',
return_value='dvportgroup-fake-moref')
def test_update_flat_net_conf(self, fake_get_moref,
fake_get_spec, fake_update_net):
net_id = 'flat-uuid'
vlan = 7
self._dvs.add_port_group(net_id, vlan)
moref = self._dvs._net_id_to_moref(net_id)
fake_get_moref.assert_called_once_with(net_id)
fake_get_spec.assert_called_once_with(net_id, vlan)
self._dvs.update_port_groups_config(net_id, moref, None, None)
fake_update_net.assert_called_once_with(moref, None, None)
class NeutronSimpleDvsTest(test_plugin.NeutronDbPluginV2TestCase):

View File

@ -17,6 +17,7 @@ import contextlib
from eventlet import greenthread
import mock
import netaddr
from neutron.api.rpc.callbacks import events as callbacks_events
from neutron.api.v2 import attributes
from neutron import context
from neutron.extensions import dvr as dist_router
@ -28,6 +29,7 @@ from neutron.extensions import portsecurity as psec
from neutron.extensions import providernet as pnet
from neutron.extensions import securitygroup as secgrp
from neutron import manager
from neutron.objects.qos import policy as qos_pol
from neutron.tests.unit import _test_extension_portbindings as test_bindings
import neutron.tests.unit.db.test_allowedaddresspairs_db as test_addr_pair
import neutron.tests.unit.db.test_db_base_plugin_v2 as test_plugin
@ -48,6 +50,8 @@ from vmware_nsx._i18n import _
from vmware_nsx.common import exceptions as nsxv_exc
from vmware_nsx.common import nsx_constants
from vmware_nsx.db import nsxv_db
from vmware_nsx.dvs import dvs
from vmware_nsx.dvs import dvs_utils
from vmware_nsx.extensions import routersize as router_size
from vmware_nsx.extensions import routertype as router_type
from vmware_nsx.extensions import securitygrouplogging
@ -56,6 +60,7 @@ from vmware_nsx.plugins.nsx_v.drivers import (
shared_router_driver as router_driver)
from vmware_nsx.plugins.nsx_v.vshield.common import constants as vcns_const
from vmware_nsx.plugins.nsx_v.vshield import edge_utils
from vmware_nsx.services.qos.nsx_v import utils as qos_utils
from vmware_nsx.tests import unit as vmware
from vmware_nsx.tests.unit.extensions import test_vnic_index
from vmware_nsx.tests.unit.nsx_v.vshield import fake_vcns
@ -459,6 +464,132 @@ class TestNetworksV2(test_plugin.TestNetworksV2, NsxVPluginV2TestCase):
context.get_admin_context(),
data)
def test_create_network_with_qos_no_dvs_fail(self):
# network creation should fail if the qos policy parameter exists,
# and no use_dvs_features configured
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'qos_policy_id': _uuid()}}
plugin = manager.NeutronManager.get_plugin()
self.assertRaises(n_exc.InvalidInput,
plugin.create_network,
context.get_admin_context(),
data)
def test_update_network_with_qos_no_dvs_fail(self):
# network update should fail if the qos policy parameter exists,
# and no use_dvs_features configured
data = {'network': {'qos_policy_id': _uuid()}}
with self.network() as net:
plugin = manager.NeutronManager.get_plugin()
self.assertRaises(n_exc.InvalidInput,
plugin.update_network,
context.get_admin_context(),
net['network']['id'], data)
def _get_core_plugin_with_dvs(self):
# enable dvs features to allow policy with QOS
cfg.CONF.set_default('use_dvs_features', True, 'nsxv')
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(dvs_utils, 'dvs_create_session'):
with mock.patch.object(dvs.DvsManager, '_get_dvs_moref'):
plugin._dvs = dvs.DvsManager()
return plugin
@mock.patch.object(dvs.DvsManager, 'update_port_groups_config')
@mock.patch.object(qos_utils.NsxVQosRule, '_init_from_policy_id')
def test_create_network_with_qos_policy(self,
fake_init_from_policy,
fake_dvs_update):
# enable dvs features to allow policy with QOS
plugin = self._get_core_plugin_with_dvs()
ctx = context.get_admin_context()
# fake policy id
policy_id = _uuid()
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'qos_policy_id': policy_id,
'port_security_enabled': False,
'admin_state_up': False,
'shared': False
}}
# create the network - should succeed and translate the policy id
plugin.create_network(ctx, data)
fake_init_from_policy.assert_called_once_with(ctx, policy_id)
self.assertTrue(fake_dvs_update.called)
@mock.patch.object(dvs.DvsManager, 'update_port_groups_config')
@mock.patch.object(qos_utils.NsxVQosRule, '_init_from_policy_id')
def test_update_network_with_qos_policy(self,
fake_init_from_policy,
fake_dvs_update):
# enable dvs features to allow policy with QOS
plugin = self._get_core_plugin_with_dvs()
ctx = context.get_admin_context()
# create the network without qos policy
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'port_security_enabled': False,
'admin_state_up': True,
'shared': False
}}
net = plugin.create_network(ctx, data)
# fake policy id
policy_id = _uuid()
data['network']['qos_policy_id'] = policy_id
# update the network - should succeed and translate the policy id
plugin.update_network(ctx, net['id'], data)
fake_init_from_policy.assert_called_once_with(ctx, policy_id)
self.assertTrue(fake_dvs_update.called)
@mock.patch.object(dvs.DvsManager, 'update_port_groups_config')
@mock.patch.object(qos_utils.NsxVQosRule, '_init_from_policy_id')
def test_network_with_updated_qos_policy(self,
fake_init_from_policy,
fake_dvs_update):
# enable dvs features to allow policy with QOS
plugin = self._get_core_plugin_with_dvs()
ctx = context.get_admin_context()
# create the network with qos policy
policy_id = _uuid()
data = {'network': {
'name': 'test-qos',
'tenant_id': self._tenant_id,
'qos_policy_id': policy_id,
'port_security_enabled': False,
'admin_state_up': True,
'shared': False
}}
net = plugin.create_network(ctx, data)
# reset fake methods called flag
fake_init_from_policy.called = False
fake_dvs_update.called = False
# fake QoS policy obj:
fake_policy = qos_pol.QosPolicy()
fake_policy.id = policy_id
fake_policy.rules = []
# call the plugin notification callback as if the network was updated
with mock.patch.object(qos_pol.QosPolicy, "get_object",
return_value=fake_policy):
with mock.patch.object(qos_pol.QosPolicy, "get_bound_networks",
return_value=[net["id"]]):
plugin._handle_qos_notification(fake_policy,
callbacks_events.UPDATED)
# make sure the policy data was read, and the dvs was updated
self.assertTrue(fake_init_from_policy.called)
self.assertTrue(fake_dvs_update.called)
class TestVnicIndex(NsxVPluginV2TestCase,
test_vnic_index.VnicIndexDbTestCase):
@ -2058,7 +2189,6 @@ class L3NatTestCaseBase(test_l3_plugin.L3NatTestCaseMixin):
def test_router_add_interface_ipv6_port_existing_network_returns_400(self):
"""Ensure unique IPv6 router ports per network id.
Adding a router port containing one or more IPv6 subnets with the same
network id as an existing router port should fail. This is so
there is no ambiguity regarding on which port to add an IPv6 subnet

View File

@ -0,0 +1,39 @@
# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.api.rpc.callbacks import events
from neutron.services.qos.notification_drivers import message_queue
from vmware_nsx.services.qos.nsx_v import utils as qos_utils
class DummyNsxVNotificationDriver(
message_queue.RpcQosServiceNotificationDriver):
def __init__(self):
super(DummyNsxVNotificationDriver, self).__init__()
self._dvs = mock.Mock()
def create_policy(self, context, policy):
# there is no notification for newly created policy
pass
def update_policy(self, context, policy):
qos_utils.handle_qos_notification(policy, events.UPDATED, self._dvs)
def delete_policy(self, context, policy):
qos_utils.handle_qos_notification(policy, events.DELETED, self._dvs)

View File

@ -0,0 +1,190 @@
# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron import context
from neutron import manager
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.services.qos import qos_plugin
from neutron.tests.unit.services.qos import base
from vmware_nsx.dvs import dvs
from vmware_nsx.dvs import dvs_utils
from vmware_nsx.services.qos.nsx_v import utils as qos_utils
from vmware_nsx.tests.unit.nsx_v import test_plugin
CORE_PLUGIN = "vmware_nsx.plugins.nsx_v.plugin.NsxVPluginV2"
class TestQosNsxVNotification(test_plugin.NsxVPluginV2TestCase,
base.BaseQosTestCase):
@mock.patch.object(dvs_utils, 'dvs_create_session')
@mock.patch.object(dvs.DvsManager, '_get_dvs_moref')
def setUp(self, *mocks):
# init the nsx-v plugin for testing with DVS
self._init_dvs_config()
super(TestQosNsxVNotification, self).setUp(plugin=CORE_PLUGIN,
ext_mgr=None)
plugin_instance = manager.NeutronManager.get_plugin()
self._core_plugin = plugin_instance
# Setup the QoS plugin:
# Add a dummy notification driver that calls our handler directly
# (to skip the message queue)
cfg.CONF.set_override(
"notification_drivers",
['vmware_nsx.tests.unit.services.qos.fake_nsxv_notifier.'
'DummyNsxVNotificationDriver'],
"qos")
self.qos_plugin = qos_plugin.QoSPlugin()
mock.patch.object(qos_utils.NsxVQosRule,
'_get_qos_plugin',
return_value=self.qos_plugin).start()
# Pre defined QoS data for the tests
self.ctxt = context.Context('fake_user', 'fake_tenant')
self.policy_data = {
'policy': {'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid(),
'name': 'test-policy',
'description': 'Test policy description',
'shared': True}}
self.rule_data = {
'bandwidth_limit_rule': {'id': uuidutils.generate_uuid(),
'max_kbps': 100,
'max_burst_kbps': 150}}
self.policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
self.rule = rule_object.QosBandwidthLimitRule(
self.ctxt, **self.rule_data['bandwidth_limit_rule'])
self._net_data = {'network': {
'name': 'test-qos',
'tenant_id': 'fake_tenant',
'qos_policy_id': self.policy.id,
'port_security_enabled': False,
'admin_state_up': False,
'shared': False
}}
self._rules = [self.rule_data['bandwidth_limit_rule']]
mock.patch('neutron.objects.db.api.create_object').start()
mock.patch('neutron.objects.db.api.update_object').start()
mock.patch('neutron.objects.db.api.delete_object').start()
mock.patch('neutron.objects.db.api.get_object').start()
mock.patch(
'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start()
def _init_dvs_config(self):
# Ensure that DVS is enabled
# and enable the DVS features for nsxv qos support
cfg.CONF.set_override('host_ip', 'fake_ip', group='dvs')
cfg.CONF.set_override('host_username', 'fake_user', group='dvs')
cfg.CONF.set_override('host_password', 'fake_password', group='dvs')
cfg.CONF.set_override('dvs_name', 'fake_dvs', group='dvs')
cfg.CONF.set_default('use_dvs_features', True, 'nsxv')
def _create_net(self):
return self._core_plugin.create_network(self.ctxt, self._net_data)
@mock.patch.object(qos_utils, 'update_network_policy_binding')
@mock.patch.object(dvs.DvsManager, 'update_port_groups_config')
def test_create_network_with_policy_rule(self,
dvs_update_mock,
update_bindings_mock):
"""Test the DVS update when a QoS rule is attached to a network"""
# Create a policy with a rule
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
setattr(_policy, "rules", [self.rule])
with mock.patch('neutron.services.qos.qos_plugin.QoSPlugin.'
'get_policy_bandwidth_limit_rules',
return_value=self._rules) as get_rules_mock:
# create the network to use this policy
net = self._create_net()
# make sure the network-policy binding was updated
update_bindings_mock.assert_called_once_with(
self.ctxt, net['id'], self.policy.id)
# make sure the qos rule was found
get_rules_mock.assert_called_with(self.ctxt, self.policy.id)
# make sure the dvs was updated
self.assertTrue(dvs_update_mock.called)
def _test_rule_action_notification(self, action):
with mock.patch.object(qos_utils, 'update_network_policy_binding'):
with mock.patch.object(dvs.DvsManager,
'update_port_groups_config') as dvs_mock:
# Create a policy with a rule
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
# set the rule in the policy data
if action != 'create':
setattr(_policy, "rules", [self.rule])
with mock.patch('neutron.services.qos.qos_plugin.QoSPlugin.'
'get_policy_bandwidth_limit_rules',
return_value=self._rules) as get_rules_mock:
with mock.patch('neutron.objects.qos.policy.'
'QosPolicy.get_object',
return_value=_policy):
# create the network to use this policy
self._create_net()
# create/update/delete the rule
if action == 'create':
self.qos_plugin.create_policy_bandwidth_limit_rule(
self.ctxt, self.policy.id, self.rule_data)
elif action == 'update':
self.qos_plugin.update_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id,
self.policy.id, self.rule_data)
else:
self.qos_plugin.delete_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id, self.policy.id)
# make sure the qos rule was found
self.assertTrue(get_rules_mock.called)
# make sure the dvs was updated
self.assertTrue(dvs_mock.called)
def test_create_rule_notification(self):
"""Test the DVS update when a QoS rule, attached to a network,
is created
"""
self._test_rule_action_notification('create')
def test_update_rule_notification(self):
"""Test the DVS update when a QoS rule, attached to a network,
is modified
"""
self._test_rule_action_notification('update')
def test_delete_rule_notification(self):
"""Test the DVS update when a QoS rule, attached to a network,
is deleted
"""
self._test_rule_action_notification('delete')