NSX|V3 add qos support for ports

Add support for the qos service in NSX|V3, including:
- Attach/Detach qos policy to a new or updated port
- Allow qos configuration on ports with internal networks only
- Update the switch profile with the BW limitations and tags
  through the qos notification driver
- Add a new mapping db table to link the policy id and the
  nsx-v3 switch profile id

For this to work, the following configuration should appear under the 'qos' section in the neutron.conf:
notification_drivers = vmware_nsxv3_message_queue

Change-Id: I4016de756cebe0032e61d3c2a5250527e44b49e4
This commit is contained in:
Adit Sarfaty 2016-03-23 11:56:44 +02:00
parent 17c839a7af
commit 6fbf7ff64c
14 changed files with 877 additions and 27 deletions

View File

@ -33,6 +33,8 @@ neutron.core_plugins =
neutron.service_plugins =
vmware_nsx_l2gw = vmware_nsx.services.l2gateway.common.plugin:NsxL2GatewayPlugin
vmware_nsxv_qos = vmware_nsx.services.qos.nsx_v.plugin:NsxVQosPlugin
neutron.qos.notification_drivers =
vmware_nsxv3_message_queue = vmware_nsx.services.qos.nsx_v3.message_queue:NsxV3QosNotificationDriver
vmware_nsx.neutron.nsxv.router_type_drivers =
shared = vmware_nsx.plugins.nsx_v.drivers.shared_router_driver:RouterSharedDriver
distributed = vmware_nsx.plugins.nsx_v.drivers.distributed_router_driver:RouterDistributedDriver

View File

@ -171,3 +171,12 @@ class SecurityGroupMaximumCapacityReached(NsxPluginException):
class NsxResourceNotFound(n_exc.NotFound):
message = _("%(res_name)s %(res_id)s not found on the backend.")
class NsxQosPolicyMappingNotFound(n_exc.NotFound):
message = _('Unable to find mapping for QoS policy: %(policy)s')
class NsxQosSmallBw(n_exc.InvalidInput):
message = _("Invalid input for max_kbps. Reason: The minimal legal value "
"for max_kbps is 1024")

View File

@ -238,3 +238,22 @@ def get_l2gw_connection_mapping(session, connection_id):
filter_by(connection_id=connection_id).one())
except exc.NoResultFound:
raise nsx_exc.NsxL2GWConnectionMappingNotFound(conn=connection_id)
# NSXv3 QoS policy id <-> switch Id mapping
def add_qos_policy_profile_mapping(session, qos_policy_id, switch_profile_id):
with session.begin(subtransactions=True):
mapping = nsx_models.QosPolicySwitchProfile(
qos_policy_id=qos_policy_id,
switch_profile_id=switch_profile_id)
session.add(mapping)
return mapping
def get_switch_profile_by_qos_policy(session, qos_policy_id):
try:
entry = (session.query(nsx_models.QosPolicySwitchProfile).
filter_by(qos_policy_id=qos_policy_id).one())
return entry.switch_profile_id
except exc.NoResultFound:
raise nsx_exc.NsxQosPolicyMappingNotFound(policy=qos_policy_id)

View File

@ -1 +1 @@
967462f585e1
b7f41687cbad

View File

@ -0,0 +1,37 @@
# Copyright 2016 VMware, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""nsxv3_qos_policy_mapping
Revision ID: b7f41687cbad
Revises: 967462f585e1
Create Date: 2016-03-17 06:12:09.450116
"""
# revision identifiers, used by Alembic.
revision = 'b7f41687cbad'
down_revision = '967462f585e1'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'neutron_nsx_qos_policy_mappings',
sa.Column('qos_policy_id', sa.String(36), nullable=False),
sa.Column('switch_profile_id', sa.String(36), nullable=False),
sa.ForeignKeyConstraint(['qos_policy_id'], ['qos_policies.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('qos_policy_id'))

View File

@ -319,3 +319,13 @@ class NsxL2GWConnectionMapping(model_base.BASEV2):
sa.ForeignKey("ports.id", ondelete="CASCADE"),
nullable=False)
bridge_endpoint_id = sa.Column(sa.String(36), nullable=False)
class QosPolicySwitchProfile(model_base.BASEV2):
# Maps neutron qos policy identifiers to NSX-V3 switch profile identifiers
__tablename__ = 'neutron_nsx_qos_policy_mappings'
qos_policy_id = sa.Column(sa.String(36),
sa.ForeignKey('qos_policies.id',
ondelete='CASCADE'),
primary_key=True)
switch_profile_id = sa.Column(sa.String(36), nullable=False)

View File

@ -178,12 +178,18 @@ def update_logical_router_advertisement(logical_router_id, **kwargs):
return update_resource_with_retry(resource, kwargs)
def create_qos_switching_profile(tags, qos_marking=None, dscp=None, name=None,
description=None):
resource = 'switching-profiles'
def _build_qos_switching_profile_args(tags, qos_marking=None, dscp=None,
name=None, description=None):
body = {"resource_type": "QosSwitchingProfile",
"tags": tags}
# TODO(abhide): Add TrafficShaper configuration.
return _update_qos_switching_profile_args(
body, qos_marking=qos_marking, dscp=dscp,
name=name, description=description)
def _update_qos_switching_profile_args(body, qos_marking=None, dscp=None,
name=None, description=None):
if qos_marking:
body["dscp"] = {}
body["dscp"]["mode"] = qos_marking.upper()
@ -193,9 +199,75 @@ def create_qos_switching_profile(tags, qos_marking=None, dscp=None, name=None,
body["display_name"] = name
if description:
body["description"] = description
return body
def _enable_shaping_in_args(body, burst_size=None, peak_bandwidth=None,
average_bandwidth=None):
for shaper in body["shaper_configuration"]:
# Neutron currently supports only shaping of Egress traffic
if shaper["resource_type"] == "EgressRateShaper":
shaper["enabled"] = True
if burst_size:
shaper["burst_size_bytes"] = burst_size
if peak_bandwidth:
shaper["peak_bandwidth_mbps"] = peak_bandwidth
if average_bandwidth:
shaper["average_bandwidth_mbps"] = average_bandwidth
break
return body
def _disable_shaping_in_args(body):
for shaper in body["shaper_configuration"]:
# Neutron currently supports only shaping of Egress traffic
if shaper["resource_type"] == "EgressRateShaper":
shaper["enabled"] = False
shaper["burst_size_bytes"] = 0
shaper["peak_bandwidth_mbps"] = 0
shaper["average_bandwidth_mbps"] = 0
break
return body
def create_qos_switching_profile(tags, qos_marking=None, dscp=None, name=None,
description=None):
resource = 'switching-profiles'
body = _build_qos_switching_profile_args(tags, qos_marking, dscp,
name, description)
return client.create_resource(resource, body)
def update_qos_switching_profile(profile_id, tags, qos_marking=None,
dscp=None, name=None, description=None):
resource = 'switching-profiles/%s' % profile_id
# get the current configuration
body = get_qos_switching_profile(profile_id)
# update the relevant fields
body = _update_qos_switching_profile_args(body, qos_marking, dscp,
name, description)
return update_resource_with_retry(resource, body)
def update_qos_switching_profile_shaping(profile_id, shaping_enabled=False,
burst_size=None, peak_bandwidth=None,
average_bandwidth=None):
resource = 'switching-profiles/%s' % profile_id
# get the current configuration
body = get_qos_switching_profile(profile_id)
# update the relevant fields
if shaping_enabled:
body = _enable_shaping_in_args(body,
burst_size=burst_size,
peak_bandwidth=peak_bandwidth,
average_bandwidth=average_bandwidth)
else:
body = _disable_shaping_in_args(body)
return update_resource_with_retry(resource, body)
def get_qos_switching_profile(profile_id):
resource = 'switching-profiles/%s' % profile_id
return client.get_resource(resource)

View File

@ -16,8 +16,11 @@ import netaddr
import six
from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api
from neutron.api.rpc.callbacks.consumer import registry as callbacks_registry
from neutron.api.rpc.callbacks import resources as callbacks_resources
from neutron.api.rpc.handlers import dhcp_rpc
from neutron.api.rpc.handlers import metadata_rpc
from neutron.api.rpc.handlers import resources_rpc
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import exceptions as callback_exc
@ -53,6 +56,7 @@ from neutron.extensions import securitygroup as ext_sg
from neutron.plugins.common import constants as plugin_const
from neutron.plugins.common import utils as n_utils
from neutron.quota import resource_registry
from neutron.services.qos import qos_consts
from neutron_lib import constants as const
from neutron_lib import exceptions as n_exc
from oslo_config import cfg
@ -79,6 +83,7 @@ from vmware_nsx.nsxlib.v3 import dfw_api as firewall
from vmware_nsx.nsxlib.v3 import resources as nsx_resources
from vmware_nsx.nsxlib.v3 import router
from vmware_nsx.nsxlib.v3 import security
from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils
LOG = log.getLogger(__name__)
@ -87,7 +92,10 @@ NSX_V3_NO_PSEC_PROFILE_NAME = 'nsx-default-spoof-guard-vif-profile'
NSX_V3_DHCP_PROFILE_NAME = 'neutron_port_dhcp_profile'
class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
# NOTE(asarfaty): the order of inheritance here is important. in order for the
# QoS notification to work, the AgentScheduler init must be called first
class NsxV3Plugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
addr_pair_db.AllowedAddressPairsMixin,
db_base_plugin_v2.NeutronDbPluginV2,
extend_sg_rule.ExtendedSecurityGroupRuleMixin,
securitygroups_db.SecurityGroupDbMixin,
@ -97,7 +105,6 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
l3_gwmode_db.L3_NAT_db_mixin,
portbindings_db.PortBindingMixin,
portsecurity_db.PortSecurityDbMixin,
agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
extradhcpopt_db.ExtraDhcpOptMixin,
dns_db.DNSDbMixin):
@ -170,6 +177,12 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
self._switching_profiles,
self._switching_profiles.find_by_display_name(
NSX_V3_NO_PSEC_PROFILE_NAME)[0])[0]
# Bind QoS notifications
callbacks_registry.subscribe(qos_utils.handle_qos_notification,
callbacks_resources.QOS_POLICY)
self.start_rpc_listeners_called = False
LOG.debug("Initializing NSX v3 DHCP switching profile")
self._dhcp_profile = None
try:
@ -333,6 +346,10 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
)
def start_rpc_listeners(self):
if self.start_rpc_listeners_called:
# If called more than once - we should not create it again
return self.conn.consume_in_threads()
self._setup_rpc()
self.topic = topics.PLUGIN
self.conn = n_rpc.create_connection()
@ -340,6 +357,13 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
self.conn.create_consumer(topics.REPORTS,
[agents_db.AgentExtRpcCallback()],
fanout=False)
qos_topic = resources_rpc.resource_type_versioned_topic(
callbacks_resources.QOS_POLICY)
self.conn.create_consumer(qos_topic,
[resources_rpc.ResourcesPushRpcCallback()],
fanout=False)
self.start_rpc_listeners_called = True
return self.conn.consume_in_threads()
def _validate_provider_create(self, context, network_data):
@ -483,8 +507,18 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
network[pnet.PHYSICAL_NETWORK] = bindings[0].phy_uuid
network[pnet.SEGMENTATION_ID] = bindings[0].vlan_id
# NSX-V3 networks cannot be associated with QoS policies
def _validate_no_qos(self, net_data):
err_msg = None
if attributes.is_attr_set(net_data.get(qos_consts.QOS_POLICY_ID)):
err_msg = _("Cannot configure QOS on networks")
if err_msg:
raise n_exc.InvalidInput(error_message=err_msg)
def create_network(self, context, network):
net_data = network['network']
self._validate_no_qos(net_data)
external = net_data.get(ext_net_extn.EXTERNAL)
is_backend_network = False
if attributes.is_attr_set(external) and external:
@ -621,6 +655,7 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
def update_network(self, context, id, network):
original_net = super(NsxV3Plugin, self).get_network(context, id)
net_data = network['network']
self._validate_no_qos(net_data)
# Neutron does not support changing provider network values
pnet._raise_if_updates_provider_attributes(net_data)
updated_net = super(NsxV3Plugin, self).update_network(context, id,
@ -772,6 +807,21 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
name = port_data['name']
return name
def _get_qos_profile_id(self, context, policy_id):
switch_profile_id = nsx_db.get_switch_profile_by_qos_policy(
context.session, policy_id)
qos_profile = nsxlib.get_qos_switching_profile(switch_profile_id)
if qos_profile:
profile_ids = self._switching_profiles.build_switch_profile_ids(
self._switching_profiles, qos_profile)
if profile_ids and len(profile_ids) > 0:
# We have only 1 QoS profile, so this array is of size 1
return profile_ids[0]
# Didn't find it
err_msg = _("Could not find QoS switching profile for policy "
"%s") % policy_id
raise n_exc.InvalidInput(error_message=err_msg)
def _create_port_at_the_backend(self, context, port_data,
l2gw_port_check, psec_is_on):
device_owner = port_data.get('device_owner')
@ -812,19 +862,39 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
if device_owner == const.DEVICE_OWNER_DHCP:
profiles.append(self._dhcp_profile)
# Add QoS switching profile, if exists
qos_policy_id = None
if attributes.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)):
qos_policy_id = port_data[qos_consts.QOS_POLICY_ID]
qos_profile_id = self._get_qos_profile_id(context, qos_policy_id)
profiles.append(qos_profile_id)
name = self._get_port_name(context, port_data)
nsx_net_id = port_data[pbin.VIF_DETAILS]['nsx-logical-switch-id']
result = self._port_client.create(
nsx_net_id, vif_uuid,
tags=tags,
name=name,
admin_state=port_data['admin_state_up'],
address_bindings=address_bindings,
attachment_type=attachment_type,
parent_name=parent_name, parent_tag=tag,
switch_profile_ids=profiles)
try:
result = self._port_client.create(
nsx_net_id, vif_uuid,
tags=tags,
name=name,
admin_state=port_data['admin_state_up'],
address_bindings=address_bindings,
attachment_type=attachment_type,
parent_name=parent_name, parent_tag=tag,
switch_profile_ids=profiles)
except nsx_exc.ManagerError as inst:
# we may fail if the QoS is not supported for this port
# (for example - transport zone with KVM)
LOG.exception(_LE("Unable to create port on the backend: %s"),
inst)
msg = _("Unable to create port on the backend")
raise nsx_exc.NsxPluginException(err_msg=msg)
# Attach the policy to the port in the neutron DB
if qos_policy_id:
qos_utils.update_port_policy_binding(context,
port_data['id'],
qos_policy_id)
return result
def _validate_address_pairs(self, address_pairs):
@ -879,6 +949,15 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
if lport_id:
self._port_client.delete(lport_id)
def _assert_on_external_net_with_qos(self, port_data):
# Prevent creating/update port with QoS policy
# on external networks.
if attributes.is_attr_set(port_data.get(qos_consts.QOS_POLICY_ID)):
err_msg = _("Unable to update/create a port with an external "
"network and a QoS policy")
LOG.warning(err_msg)
raise n_exc.InvalidInput(error_message=err_msg)
def create_port(self, context, port, l2gw_port_check=False):
port_data = port['port']
dhcp_opts = port_data.get(ext_edo.EXTRADHCPOPTS, [])
@ -889,6 +968,7 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
context, port_data['network_id'])
if is_external_net:
self._assert_on_external_net_with_compute(port_data)
self._assert_on_external_net_with_qos(port_data)
neutron_db = super(NsxV3Plugin, self).create_port(context, port)
port["port"].update(neutron_db)
@ -1103,15 +1183,49 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
if updated_device_owner == const.DEVICE_OWNER_DHCP:
switch_profile_ids.append(self._dhcp_profile)
self._port_client.update(
lport_id, vif_uuid, name=name,
attachment_type=attachment_type,
admin_state=updated_port.get('admin_state_up'),
address_bindings=address_bindings,
switch_profile_ids=switch_profile_ids,
resources=resources,
parent_name=parent_name,
parent_tag=tag)
# Update QoS switch profile
qos_policy_id, qos_profile_id = self._get_port_qos_ids(context,
updated_port)
if qos_profile_id is not None:
switch_profile_ids.append(qos_profile_id)
try:
self._port_client.update(
lport_id, vif_uuid, name=name,
attachment_type=attachment_type,
admin_state=updated_port.get('admin_state_up'),
address_bindings=address_bindings,
switch_profile_ids=switch_profile_ids,
resources=resources,
parent_name=parent_name,
parent_tag=tag)
except nsx_exc.ManagerError as inst:
# we may fail if the QoS is not supported for this port
# (for example - transport zone with KVM)
LOG.exception(_LE("Unable to update port on the backend: %s"),
inst)
msg = _("Unable to update port on the backend")
raise nsx_exc.NsxPluginException(err_msg=msg)
# Attach/Detach the QoS policies to the port in the neutron DB
qos_utils.update_port_policy_binding(context,
updated_port['id'],
qos_policy_id)
def _get_port_qos_ids(self, context, updated_port):
# when a port is updated, get the current QoS policy/profile ids
policy_id = None
profile_id = None
if (qos_consts.QOS_POLICY_ID in updated_port):
policy_id = updated_port[qos_consts.QOS_POLICY_ID]
else:
# Look for the previous QoS policy
policy_id = qos_utils.get_port_policy_id(
context, updated_port['id'])
if policy_id is not None:
profile_id = self._get_qos_profile_id(context, policy_id)
return policy_id, profile_id
def update_port(self, context, id, port):
switch_profile_ids = None
@ -1124,6 +1238,7 @@ class NsxV3Plugin(addr_pair_db.AllowedAddressPairsMixin,
context, original_port['network_id'])
if is_external_net:
self._assert_on_external_net_with_compute(port['port'])
self._assert_on_external_net_with_qos(port['port'])
updated_port = super(NsxV3Plugin, self).update_port(context,
id, port)

View File

@ -0,0 +1,29 @@
# Copyright 2016 VMware, Inc.
#
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import events
from neutron.services.qos.notification_drivers import message_queue
class NsxV3QosNotificationDriver(
message_queue.RpcQosServiceNotificationDriver):
"""NSXv3 message queue service notification driver for QoS.
Overriding the create_policy method in order to add a notification
message in this case too.
"""
def create_policy(self, context, policy):
self.notification_api.push(context, policy, events.CREATED)

View File

@ -0,0 +1,181 @@
# Copyright 2016 VMware, Inc.
#
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import events as callbacks_events
from neutron.api.v2 import attributes
from neutron import context as n_context
from neutron.objects.qos import policy as qos_policy
from oslo_log import log as logging
from vmware_nsx._i18n import _
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.common import utils
from vmware_nsx.db import db as nsx_db
from vmware_nsx.nsxlib import v3 as nsxlib
LOG = logging.getLogger(__name__)
MAX_KBPS_MIN_VALUE = 1024
def update_port_policy_binding(context, port_id, new_policy_id):
# detach the old policy (if exists) from the port
old_policy = qos_policy.QosPolicy.get_port_policy(
context, port_id)
if old_policy:
if old_policy.id == new_policy_id:
return
old_policy.detach_port(port_id)
# attach the new policy (if exists) to the port
if new_policy_id is not None:
new_policy = qos_policy.QosPolicy.get_object(
context, id=new_policy_id)
if new_policy:
new_policy.attach_port(port_id)
def get_port_policy_id(context, port_id):
policy = qos_policy.QosPolicy.get_port_policy(
context, port_id)
if policy:
return policy.id
return
def handle_qos_notification(policy_obj, event_type):
handler = QosNotificationsHandler()
context = n_context.get_admin_context()
# Reload the policy as admin so we will have a context
if (event_type != callbacks_events.DELETED):
policy = qos_policy.QosPolicy.get_object(context, id=policy_obj.id)
# Check if QoS policy rule was created/deleted/updated
if (event_type == callbacks_events.CREATED):
handler.create_policy(context, policy)
elif (event_type == callbacks_events.UPDATED):
if (hasattr(policy_obj, "rules")):
# With rules - the policy rule was created / deleted / updated
rules = policy_obj["rules"]
if not len(rules):
# the rule was deleted
handler.delete_policy_bandwidth_limit_rule(
context, policy_obj.id)
else:
# New or updated rule
handler.create_or_update_policy_bandwidth_limit_rule(
context, policy_obj.id, rules[0])
else:
# Without rules - need to update only name / description
handler.update_policy(context, policy_obj.id, policy)
elif (event_type == callbacks_events.DELETED):
handler.delete_policy(context, policy_obj.id)
else:
msg = _("Unknown QoS notification event %s") % event_type
raise nsx_exc.NsxPluginException(err_msg=msg)
class QosNotificationsHandler(object):
def __init__(self):
super(QosNotificationsHandler, self).__init__()
def _get_tags(self, context, policy):
policy_dict = {'id': policy.id, 'tenant_id': policy.tenant_id}
return utils.build_v3_tags_payload(
policy_dict, resource_type='os-neutron-qos-id',
project_name=context.tenant_name)
def create_policy(self, context, policy):
policy_id = policy.id
tags = self._get_tags(context, policy)
result = nsxlib.create_qos_switching_profile(
tags=tags, name=policy.name,
description=policy.description)
if not result or not attributes.is_attr_set(result.get('id')):
msg = _("Unable to create QoS switching profile on the backend")
raise nsx_exc.NsxPluginException(err_msg=msg)
profile_id = result['id']
# Add the mapping entry of the policy_id <-> profile_id
nsx_db.add_qos_policy_profile_mapping(context.session,
policy_id,
profile_id)
def delete_policy(self, context, policy_id):
profile_id = nsx_db.get_switch_profile_by_qos_policy(
context.session, policy_id)
nsxlib.delete_qos_switching_profile(profile_id)
def update_policy(self, context, policy_id, policy):
profile_id = nsx_db.get_switch_profile_by_qos_policy(
context.session, policy_id)
tags = self._get_tags(context, policy)
nsxlib.update_qos_switching_profile(
profile_id,
tags=tags,
name=policy.name,
description=policy.description)
def _get_bw_values_from_rule(self, bw_rule):
"""Translate the neutron bandwidth_limit_rule values, into the
values expected by the NSX-v3 QoS switch profile,
and validate that those are legal
"""
# validate the max_kbps - it must be at least 1Mbps for the
# switch profile configuration to succeed.
if (bw_rule.max_kbps < MAX_KBPS_MIN_VALUE):
raise nsx_exc.NsxQosSmallBw()
# 'None' value means we will keep the old value
burst_size = peak_bandwidth = average_bandwidth = None
# translate kbps -> bytes
burst_size = int(bw_rule.max_burst_kbps) * 128
# translate kbps -> Mbps
peak_bandwidth = int(float(bw_rule.max_kbps) / 1024)
# neutron QoS does not support this parameter
average_bandwidth = peak_bandwidth
return burst_size, peak_bandwidth, average_bandwidth
def create_or_update_policy_bandwidth_limit_rule(self, context, policy_id,
bw_rule):
"""Update the QoS switch profile with the BW limitations of a
new or updated bandwidth limit rule
"""
profile_id = nsx_db.get_switch_profile_by_qos_policy(
context.session, policy_id)
burst_size, peak_bw, average_bw = self._get_bw_values_from_rule(
bw_rule)
nsxlib.update_qos_switching_profile_shaping(
profile_id,
shaping_enabled=True,
burst_size=burst_size,
peak_bandwidth=peak_bw,
average_bandwidth=average_bw)
def delete_policy_bandwidth_limit_rule(self, context, policy_id):
profile_id = nsx_db.get_switch_profile_by_qos_policy(
context.session, policy_id)
nsxlib.update_qos_switching_profile_shaping(
profile_id, shaping_enabled=False)

View File

@ -216,6 +216,11 @@ class TestPortsV2(test_plugin.TestPortsV2, NsxV3PluginTestCaseMixin,
VIF_TYPE = portbindings.VIF_TYPE_OVS
HAS_PORT_FILTER = True
def setUp(self):
super(TestPortsV2, self).setUp()
self.plugin = manager.NeutronManager.get_plugin()
self.ctx = context.get_admin_context()
def test_update_port_delete_ip(self):
# This test case overrides the default because the nsx plugin
# implements port_security/security groups and it is not allowed
@ -258,6 +263,54 @@ class TestPortsV2(test_plugin.TestPortsV2, NsxV3PluginTestCaseMixin,
self.assertEqual(webob.exc.HTTPBadRequest.code,
res.status_int)
def test_create_port_with_qos(self):
with self.network() as network:
policy_id = uuidutils.generate_uuid()
data = {'port': {
'network_id': network['network']['id'],
'tenant_id': self._tenant_id,
'qos_policy_id': policy_id,
'name': 'qos_port',
'admin_state_up': True,
'device_id': 'fake_device',
'device_owner': 'fake_owner',
'fixed_ips': [],
'mac_address': '00:00:00:00:00:01'}
}
with mock.patch.object(self.plugin, '_get_qos_profile_id'):
port = self.plugin.create_port(self.ctx, data)
self.assertEqual(policy_id, port['qos_policy_id'])
def test_update_port_with_qos(self):
with self.network() as network:
data = {'port': {
'network_id': network['network']['id'],
'tenant_id': self._tenant_id,
'name': 'qos_port',
'admin_state_up': True,
'device_id': 'fake_device',
'device_owner': 'fake_owner',
'fixed_ips': [],
'mac_address': '00:00:00:00:00:01'}
}
port = self.plugin.create_port(self.ctx, data)
policy_id = uuidutils.generate_uuid()
data['port']['qos_policy_id'] = policy_id
with mock.patch.object(self.plugin, '_get_qos_profile_id'):
res = self.plugin.update_port(self.ctx, port['id'], data)
self.assertEqual(policy_id, res['qos_policy_id'])
def test_create_ext_port_with_qos_fail(self):
with self._create_l3_ext_network() as network:
with self.subnet(network=network, cidr='10.0.0.0/24'):
policy_id = uuidutils.generate_uuid()
data = {'port': {'network_id': network['network']['id'],
'tenant_id': self._tenant_id,
'qos_policy_id': policy_id}}
# Cannot add qos policy to a port on ext network
self.assertRaises(n_exc.InvalidInput,
self.plugin.create_port, self.ctx, data)
class DHCPOptsTestCase(test_dhcpopts.TestExtraDhcpOpt,
NsxV3PluginTestCaseMixin):

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import mock
from oslo_log import log
from oslo_serialization import jsonutils
@ -26,7 +28,8 @@ LOG = log.getLogger(__name__)
class NsxLibQosTestCase(nsxlib_testcase.NsxClientTestCase):
def _body(self, qos_marking=None, dscp=None):
def _body(self, qos_marking=None, dscp=None,
description=test_constants_v3.FAKE_NAME):
body = {
"resource_type": "QosSwitchingProfile",
"tags": []
@ -37,7 +40,30 @@ class NsxLibQosTestCase(nsxlib_testcase.NsxClientTestCase):
if dscp:
body["dscp"]["priority"] = dscp
body["display_name"] = test_constants_v3.FAKE_NAME
body["description"] = test_constants_v3.FAKE_NAME
body["description"] = description
return body
def _body_with_shaping(self, shaping_enabled=False,
burst_size=None,
peak_bandwidth=None,
average_bandwidth=None,
description=test_constants_v3.FAKE_NAME):
body = test_constants_v3.FAKE_QOS_PROFILE
body["display_name"] = test_constants_v3.FAKE_NAME
body["description"] = description
for shaper in body["shaper_configuration"]:
# Neutron currently support only shaping of Egress traffic
if shaper["resource_type"] == "EgressRateShaper":
shaper["enabled"] = shaping_enabled
if burst_size:
shaper["burst_size_bytes"] = burst_size
if peak_bandwidth:
shaper["peak_bandwidth_mbps"] = peak_bandwidth
if average_bandwidth:
shaper["average_bandwidth_mbps"] = average_bandwidth
break
return body
@ -75,6 +101,90 @@ class NsxLibQosTestCase(nsxlib_testcase.NsxClientTestCase):
data=jsonutils.dumps(self._body(qos_marking='trusted', dscp=0),
sort_keys=True))
def test_update_qos_switching_profile(self):
"""
Test updating a qos-switching profile returns the correct response
"""
api = self.mocked_rest_fns(nsxlib, 'client')
original_profile = self._body()
new_description = "Test"
with mock.patch.object(nsxlib.client, 'get_resource',
return_value=original_profile):
# update the description of the profile
nsxlib.update_qos_switching_profile(
test_constants_v3.FAKE_QOS_PROFILE['id'],
tags=[],
description=new_description)
test_client.assert_json_call(
'put', api,
'https://1.2.3.4/api/v1/switching-profiles/%s'
% test_constants_v3.FAKE_QOS_PROFILE['id'],
data=jsonutils.dumps(self._body(description=new_description),
sort_keys=True))
def test_enable_qos_switching_profile_shaping(self):
"""
Test updating a qos-switching profile returns the correct response
"""
api = self.mocked_rest_fns(nsxlib, 'client')
original_profile = self._body_with_shaping()
burst_size = 100
peak_bandwidth = 200
average_bandwidth = 300
with mock.patch.object(nsxlib.client, 'get_resource',
return_value=original_profile):
# update the bw shaping of the profile
nsxlib.update_qos_switching_profile_shaping(
test_constants_v3.FAKE_QOS_PROFILE['id'],
shaping_enabled=True,
burst_size=burst_size,
peak_bandwidth=peak_bandwidth,
average_bandwidth=average_bandwidth)
test_client.assert_json_call(
'put', api,
'https://1.2.3.4/api/v1/switching-profiles/%s'
% test_constants_v3.FAKE_QOS_PROFILE['id'],
data=jsonutils.dumps(
self._body_with_shaping(
shaping_enabled=True,
burst_size=burst_size,
peak_bandwidth=peak_bandwidth,
average_bandwidth=average_bandwidth),
sort_keys=True))
def test_disable_qos_switching_profile_shaping(self):
"""
Test updating a qos-switching profile returns the correct response
"""
api = self.mocked_rest_fns(nsxlib, 'client')
burst_size = 100
peak_bandwidth = 200
average_bandwidth = 300
original_profile = self._body_with_shaping(
shaping_enabled=True,
burst_size=burst_size,
peak_bandwidth=peak_bandwidth,
average_bandwidth=average_bandwidth)
with mock.patch.object(nsxlib.client, 'get_resource',
return_value=original_profile):
# update the bw shaping of the profile
nsxlib.update_qos_switching_profile_shaping(
test_constants_v3.FAKE_QOS_PROFILE['id'],
shaping_enabled=False)
test_client.assert_json_call(
'put', api,
'https://1.2.3.4/api/v1/switching-profiles/%s'
% test_constants_v3.FAKE_QOS_PROFILE['id'],
data=jsonutils.dumps(
self._body_with_shaping(),
sort_keys=True))
def test_delete_qos_switching_profile(self):
"""
Test deleting qos-switching-profile

View File

@ -0,0 +1,31 @@
# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.api.rpc.callbacks import events
from neutron.services.qos.notification_drivers import message_queue
from vmware_nsx.services.qos.nsx_v3 import utils as qos_utils
class DummyNotificationDriver(
message_queue.RpcQosServiceNotificationDriver):
def create_policy(self, context, policy):
qos_utils.handle_qos_notification(policy, events.CREATED)
def update_policy(self, context, policy):
qos_utils.handle_qos_notification(policy, events.UPDATED)
def delete_policy(self, context, policy):
qos_utils.handle_qos_notification(policy, events.DELETED)

View File

@ -0,0 +1,182 @@
# Copyright 2016 VMware, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron import context
from neutron.objects import base as base_object
from neutron.objects.qos import policy as policy_object
from neutron.objects.qos import rule as rule_object
from neutron.services.qos import qos_plugin
from neutron.tests.unit.services.qos import base
from vmware_nsx.common import utils
from vmware_nsx.db import db as nsx_db
from vmware_nsx.nsxlib import v3 as nsxlib
from vmware_nsx.tests.unit.nsxlib.v3 import nsxlib_testcase
class TestQosNsxV3Notification(nsxlib_testcase.NsxClientTestCase,
base.BaseQosTestCase):
def setUp(self):
super(TestQosNsxV3Notification, self).setUp()
self.setup_coreplugin()
# Add a dummy notification driver that calls our handler directly
# (to skip the message queue)
cfg.CONF.set_override(
"notification_drivers",
['vmware_nsx.tests.unit.services.qos.fake_notifier.'
'DummyNotificationDriver'],
"qos")
self.qos_plugin = qos_plugin.QoSPlugin()
self.ctxt = context.Context('fake_user', 'fake_tenant')
self.policy_data = {
'policy': {'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid(),
'name': 'test-policy',
'description': 'Test policy description',
'shared': True}}
self.rule_data = {
'bandwidth_limit_rule': {'id': uuidutils.generate_uuid(),
'max_kbps': 2000,
'max_burst_kbps': 150}}
self.policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
self.rule = rule_object.QosBandwidthLimitRule(
self.ctxt, **self.rule_data['bandwidth_limit_rule'])
self.fake_profile_id = 'fake_profile'
self.fake_profile = {'id': self.fake_profile_id}
mock.patch('neutron.objects.db.api.create_object').start()
mock.patch('neutron.objects.db.api.update_object').start()
mock.patch('neutron.objects.db.api.delete_object').start()
mock.patch(
'neutron.objects.qos.policy.QosPolicy.obj_load_attr').start()
mock.patch.object(nsx_db, 'get_switch_profile_by_qos_policy',
return_value=self.fake_profile_id).start()
@mock.patch(
'neutron.objects.rbac_db.RbacNeutronDbObjectMixin'
'.create_rbac_policy')
@mock.patch.object(nsx_db, 'add_qos_policy_profile_mapping')
def test_policy_create_profile(self, fake_db_add, fake_rbac_create):
# test the switch profile creation when a QoS policy is created
with mock.patch.object(nsxlib, 'create_qos_switching_profile',
return_value=self.fake_profile) as create_profile:
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object',
return_value=self.policy):
with mock.patch('neutron.objects.qos.policy.QosPolicy.create'):
policy = self.qos_plugin.create_policy(self.ctxt,
self.policy_data)
expected_tags = utils.build_v3_tags_payload(
policy,
resource_type='os-neutron-qos-id',
project_name=self.ctxt.tenant_name)
create_profile.assert_called_once_with(
description=self.policy_data["policy"]["description"],
name=self.policy_data["policy"]["name"],
tags=expected_tags)
# verify that the policy->profile mapping entry was added
self.assertTrue(fake_db_add.called)
@mock.patch(
'neutron.objects.rbac_db.RbacNeutronDbObjectMixin'
'.create_rbac_policy')
def test_policy_update_profile(self, *mocks):
# test the switch profile update when a QoS policy is updated
fields = base_object.get_updatable_fields(
policy_object.QosPolicy, self.policy_data['policy'])
with mock.patch.object(nsxlib,
'update_qos_switching_profile') as update_profile:
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object',
return_value=self.policy):
with mock.patch('neutron.objects.qos.policy.QosPolicy.update'):
self.qos_plugin.update_policy(
self.ctxt, self.policy.id, {'policy': fields})
# verify that the profile was updated with the correct data
self.policy_data["policy"]["id"] = self.policy.id
expected_tags = utils.build_v3_tags_payload(
self.policy_data["policy"],
resource_type='os-neutron-qos-id',
project_name=self.ctxt.tenant_name)
update_profile.assert_called_once_with(
self.fake_profile_id,
description=self.policy_data["policy"]["description"],
name=self.policy_data["policy"]["name"],
tags=expected_tags
)
@mock.patch.object(policy_object.QosPolicy, 'reload_rules')
def test_rule_create_profile(self, *mocks):
# test the switch profile update when a QoS rule is created
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
# add a rule to the policy
setattr(_policy, "rules", [self.rule])
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object',
return_value=_policy):
with mock.patch.object(nsxlib,
'update_qos_switching_profile_shaping') as update_profile:
self.qos_plugin.update_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id, _policy.id, self.rule_data)
# validate the data on the profile
rule_dict = self.rule_data['bandwidth_limit_rule']
expected_bw = rule_dict['max_kbps'] / 1024
expected_burst = rule_dict['max_burst_kbps'] * 128
update_profile.assert_called_once_with(
self.fake_profile_id,
average_bandwidth=expected_bw,
burst_size=expected_burst,
peak_bandwidth=expected_bw,
shaping_enabled=True
)
def test_rule_delete_profile(self):
# test the switch profile update when a QoS rule is deleted
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
# The mock will return the policy without the rule,
# as if it was deleted
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_object',
return_value=_policy):
with mock.patch.object(nsxlib,
'update_qos_switching_profile_shaping') as update_profile:
setattr(_policy, "rules", [self.rule])
self.qos_plugin.delete_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id, self.policy.id)
# validate the data on the profile
update_profile.assert_called_once_with(
self.fake_profile_id,
shaping_enabled=False
)
@mock.patch('neutron.objects.db.api.get_object', return_value=None)
def test_policy_delete_profile(self, *mocks):
# test the switch profile deletion when a QoS policy is deleted
with mock.patch.object(nsxlib, 'delete_qos_switching_profile',
return_value=self.fake_profile) as delete_profile:
self.qos_plugin.delete_policy(self.ctxt, self.policy.id)
delete_profile.assert_called_once_with(self.fake_profile_id)