Add nvp qos extension

Implements blueprint nvp-qos-extension

Change-Id: I8ad980128407c6ddb57e5f928663e0df15cc0065
This commit is contained in:
Aaron Rosen 2013-02-13 14:49:29 -08:00
parent f3b64bc985
commit 2d1762ced0
9 changed files with 910 additions and 44 deletions

View File

@ -49,5 +49,9 @@
"create_service_type": "rule:admin_only",
"update_service_type": "rule:admin_only",
"delete_service_type": "rule:admin_only",
"get_service_type": "rule:regular_user"
"get_service_type": "rule:regular_user",
"create_qos_queue:": "rule:admin_only",
"get_qos_queue:": "rule:admin_only",
"get_qos_queues:": "rule:admin_only"
}

View File

@ -53,13 +53,15 @@ from quantum import policy
from quantum.plugins.nicira.nicira_nvp_plugin.common import config
from quantum.plugins.nicira.nicira_nvp_plugin.common import (exceptions
as nvp_exc)
from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
as ext_qos)
from quantum.plugins.nicira.nicira_nvp_plugin import nicira_db
from quantum.plugins.nicira.nicira_nvp_plugin import NvpApiClient
from quantum.plugins.nicira.nicira_nvp_plugin import nvplib
from quantum.plugins.nicira.nicira_nvp_plugin import nvp_cluster
from quantum.plugins.nicira.nicira_nvp_plugin.nvp_plugin_version import (
PLUGIN_VERSION)
from quantum.plugins.nicira.nicira_nvp_plugin import nicira_qos_db as qos_db
LOG = logging.getLogger("QuantumPlugin")
NVP_FLOATINGIP_NAT_RULES_ORDER = 200
NVP_EXTGW_NAT_RULES_ORDER = 255
@ -123,14 +125,14 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
l3_db.L3_NAT_db_mixin,
portsecurity_db.PortSecurityDbMixin,
securitygroups_db.SecurityGroupDbMixin,
nvp_sec.NVPSecurityGroups):
nvp_sec.NVPSecurityGroups, qos_db.NVPQoSDbMixin):
"""
NvpPluginV2 is a Quantum plugin that provides L2 Virtual Network
functionality using NVP.
"""
supported_extension_aliases = ["provider", "quotas", "port-security",
"router", "security-group"]
"router", "security-group", "nvp-qos"]
__native_bulk_support = True
# Default controller cluster
@ -355,7 +357,8 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
port_data['mac_address'],
port_data['fixed_ips'],
port_data[psec.PORTSECURITY],
port_data[ext_sg.SECURITYGROUPS])
port_data[ext_sg.SECURITYGROUPS],
port_data[ext_qos.QUEUE])
nicira_db.add_quantum_nvp_port_mapping(
context.session, port_data['id'], lport['uuid'])
d_owner = port_data['device_owner']
@ -740,9 +743,18 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
network)
# Ensure there's an id in net_data
net_data['id'] = new_net['id']
# Process port security extension
self._process_network_create_port_security(context, net_data)
# DB Operations for setting the network as external
self._process_l3_create(context, net_data, new_net['id'])
# Process QoS queue extension
if network['network'].get(ext_qos.QUEUE):
new_net[ext_qos.QUEUE] = network['network'][ext_qos.QUEUE]
# Raises if not found
self.get_qos_queue(context, new_net[ext_qos.QUEUE])
self._process_network_queue_mapping(context, new_net)
self._extend_network_qos_queue(context, new_net)
if net_data.get(pnet.NETWORK_TYPE):
net_binding = nicira_db.add_network_binding(
context.session, new_net['id'],
@ -827,28 +839,6 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
return pairs
def get_network(self, context, id, fields=None):
"""
Retrieves all attributes of the network, NOT including
the ports of that network.
:returns: a sequence of mappings with the following signature:
{'id': UUID representing the network.
'name': Human-readable name identifying the network.
'tenant_id': Owner of network. only admin user
can specify a tenant_id other than its own.
'admin_state_up': Sets admin state of network. if down,
network does not forward packets.
'status': Indicates whether network is currently
operational (limit values to "ACTIVE", "DOWN",
"BUILD", and "ERROR"?
'subnets': Subnets associated with this network. Plan
to allow fully specified subnets as part of
network create.
}
:raises: exception.NetworkNotFound
:raises: exception.QuantumException
"""
with context.session.begin(subtransactions=True):
# goto to the plugin DB and fetch the network
network = self._get_network(context, id)
@ -892,6 +882,7 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._extend_network_dict_provider(context, net_result)
self._extend_network_port_security_dict(context, net_result)
self._extend_network_dict_l3(context, net_result)
self._extend_network_qos_queue(context, net_result)
return self._fields(net_result, fields)
def get_networks(self, context, filters=None, fields=None):
@ -904,6 +895,8 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._extend_network_dict_provider(context, net)
self._extend_network_port_security_dict(context, net)
self._extend_network_dict_l3(context, net)
self._extend_network_qos_queue(context, net)
quantum_lswitches = self._filter_nets_l3(context,
quantum_lswitches,
filters)
@ -981,10 +974,15 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
if psec.PORTSECURITY in network['network']:
self._update_network_security_binding(
context, id, network['network'][psec.PORTSECURITY])
if network['network'].get(ext_qos.QUEUE):
net[ext_qos.QUEUE] = network['network'][ext_qos.QUEUE]
self._delete_network_queue_mapping(context, id)
self._process_network_queue_mapping(context, net)
self._extend_network_port_security_dict(context, net)
self._process_l3_update(context, network['network'], id)
self._extend_network_dict_provider(context, net)
self._extend_network_dict_l3(context, net)
self._extend_network_qos_queue(context, net)
return net
def get_ports(self, context, filters=None, fields=None):
@ -1128,6 +1126,10 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self._get_security_groups_on_port(context, port))
self._process_port_create_security_group(
context, quantum_db['id'], port_data[ext_sg.SECURITYGROUPS])
# QoS extension checks
port_data[ext_qos.QUEUE] = self._check_for_queue_and_create(
context, port_data)
self._process_port_queue_mapping(context, port_data)
# provider networking extension checks
# Fetch the network and network binding from Quantum db
try:
@ -1148,8 +1150,11 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
LOG.debug(_("create_port completed on NVP for tenant "
"%(tenant_id)s: (%(id)s)"), port_data)
# remove since it will be added in extend based on policy
del port_data[ext_qos.QUEUE]
self._extend_port_port_security_dict(context, port_data)
self._extend_port_dict_security_group(context, port_data)
self._extend_port_qos_queue(context, port_data)
return port_data
def update_port(self, context, id, port):
@ -1207,6 +1212,11 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
if psec.PORTSECURITY in port['port']:
self._update_port_security_binding(
context, id, ret_port[psec.PORTSECURITY])
ret_port[ext_qos.QUEUE] = self._check_for_queue_and_create(
context, ret_port)
self._delete_port_queue_mapping(context, ret_port['id'])
self._process_port_queue_mapping(context, ret_port)
self._extend_port_port_security_dict(context, ret_port)
self._extend_port_dict_security_group(context, ret_port)
LOG.debug(_("Update port request: %s"), port)
@ -1219,8 +1229,12 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
ret_port['mac_address'],
ret_port['fixed_ips'],
ret_port[psec.PORTSECURITY],
ret_port[ext_sg.SECURITYGROUPS])
ret_port[ext_sg.SECURITYGROUPS],
ret_port[ext_qos.QUEUE])
# remove since it will be added in extend based on policy
del ret_port[ext_qos.QUEUE]
self._extend_port_qos_queue(context, ret_port)
# Update the port status from nvp. If we fail here hide it since
# the port was successfully updated but we were not able to retrieve
# the status.
@ -1244,11 +1258,15 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
port_delete_func(context, quantum_db_port)
self.disassociate_floatingips(context, id)
with context.session.begin(subtransactions=True):
queue = self._get_port_queue_bindings(context, {'port_id': [id]})
if (cfg.CONF.metadata_dhcp_host_route and
quantum_db_port.device_owner == constants.DEVICE_OWNER_DHCP):
self._ensure_metadata_host_route(
context, quantum_db_port.fixed_ips[0], is_delete=True)
super(NvpPluginV2, self).delete_port(context, id)
# Delete qos queue if possible
if queue:
self.delete_qos_queue(context, queue[0]['queue_id'], False)
def get_port(self, context, id, fields=None):
with context.session.begin(subtransactions=True):
@ -1256,6 +1274,7 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
id, fields)
self._extend_port_port_security_dict(context, quantum_db_port)
self._extend_port_dict_security_group(context, quantum_db_port)
self._extend_port_qos_queue(context, quantum_db_port)
if self._network_is_external(context,
quantum_db_port['network_id']):
@ -1869,3 +1888,40 @@ class NvpPluginV2(db_base_plugin_v2.QuantumDbPluginV2,
self.default_cluster, sgid, current_rules)
return super(NvpPluginV2, self).delete_security_group_rule(context,
sgrid)
def create_qos_queue(self, context, qos_queue, check_policy=True):
if check_policy:
self._enforce_set_auth(context, qos_queue,
ext_qos.qos_queue_create)
q = qos_queue.get('qos_queue')
self._validate_qos_queue(context, q)
q['id'] = nvplib.create_lqueue(self.default_cluster,
self._nvp_lqueue(q))
return super(NvpPluginV2, self).create_qos_queue(context, qos_queue)
def delete_qos_queue(self, context, id, raise_in_use=True):
filters = {'queue_id': [id]}
queues = self._get_port_queue_bindings(context, filters)
if queues:
if raise_in_use:
raise ext_qos.QueueInUseByPort()
else:
return
nvplib.delete_lqueue(self.default_cluster, id)
return super(NvpPluginV2, self).delete_qos_queue(context, id)
def get_qos_queue(self, context, id, fields=None):
if not self._check_view_auth(context, {'qos_queue': None},
ext_qos.qos_queue_get):
# don't want the user to find out that they guessed the right id
# so we raise not found if the policy.json file doesn't allow them
raise ext_qos.QueueNotFound(id=id)
return super(NvpPluginV2, self).get_qos_queue(context, id, fields)
def get_qos_queues(self, context, filters=None, fields=None):
if not self._check_view_auth(context, {'qos_queue': []},
ext_qos.qos_queue_list):
return []
return super(NvpPluginV2, self).get_qos_queues(context, filters,
fields)

View File

@ -0,0 +1,18 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Nicira, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Aaron Rosen, Nicira Networks, Inc.

View File

@ -0,0 +1,202 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Nicira, Inc.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Aaron Rosen, Nicira Networks, Inc.
from abc import abstractmethod
from quantum.api.v2 import attributes as attr
from quantum.api.v2 import base
from quantum.api import extensions
from quantum.common import exceptions as qexception
from quantum import manager
# For policy.json/Auth
qos_queue_create = "create_qos_queue"
qos_queue_delete = "delete_qos_queue"
qos_queue_get = "get_qos_queue"
qos_queue_list = "get_qos_queues"
class DefaultQueueCreateNotAdmin(qexception.InUse):
message = _("Need to be admin in order to create queue called default")
class DefaultQueueAlreadyExists(qexception.InUse):
message = _("Default queue already exists.")
class QueueInvalidDscp(qexception.InvalidInput):
message = _("Invalid value for dscp %(data)s must be integer.")
class QueueMinGreaterMax(qexception.InvalidInput):
message = _("Invalid bandwidth rate, min greater than max.")
class QueueInvalidBandwidth(qexception.InvalidInput):
message = _("Invalid bandwidth rate, %(data)s must be a non negative"
" integer.")
class MissingDSCPForTrusted(qexception.InvalidInput):
message = _("No DSCP field needed when QoS workload marked trusted")
class QueueNotFound(qexception.NotFound):
message = _("Queue %(id)s does not exist")
class QueueInUseByPort(qexception.InUse):
message = _("Unable to delete queue attached to port.")
class QueuePortBindingNotFound(qexception.NotFound):
message = _("Port is not associated with lqueue")
def convert_to_unsigned_int_or_none(val):
if val is None:
return
try:
val = int(val)
if val < 0:
raise ValueError
except (ValueError, TypeError):
msg = _("'%s' must be a non negative integer.") % val
raise qexception.InvalidInput(error_message=msg)
return val
# Attribute Map
RESOURCE_ATTRIBUTE_MAP = {
'qos_queues': {
'id': {'allow_post': False, 'allow_put': False,
'is_visible': True},
'default': {'allow_post': True, 'allow_put': False,
'convert_to': attr.convert_to_boolean,
'is_visible': True, 'default': False},
'name': {'allow_post': True, 'allow_put': False,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
'min': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': '0',
'convert_to': convert_to_unsigned_int_or_none},
'max': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': None,
'convert_to': convert_to_unsigned_int_or_none},
'qos_marking': {'allow_post': True, 'allow_put': False,
'validate': {'type:values': ['untrusted', 'trusted']},
'default': 'untrusted', 'is_visible': True},
'dscp': {'allow_post': True, 'allow_put': False,
'is_visible': True, 'default': '0',
'convert_to': convert_to_unsigned_int_or_none},
'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True,
'validate': {'type:string': None},
'is_visible': True},
},
}
QUEUE = 'queue_id'
RXTX_FACTOR = 'rxtx_factor'
EXTENDED_ATTRIBUTES_2_0 = {
'ports': {
RXTX_FACTOR: {'allow_post': True,
'allow_put': False,
'is_visible': False,
'default': 1,
'convert_to': convert_to_unsigned_int_or_none},
QUEUE: {'allow_post': False,
'allow_put': False,
'is_visible': True,
'default': False}},
'networks': {QUEUE: {'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': False}}
}
class Nvp_qos(object):
"""Port Queue extension"""
@classmethod
def get_name(cls):
return "nvp-qos"
@classmethod
def get_alias(cls):
return "nvp-qos"
@classmethod
def get_description(cls):
return "NVP QoS extension."
@classmethod
def get_namespace(cls):
return "http://docs.openstack.org/ext/nvp-qos/api/v2.0"
@classmethod
def get_updated(cls):
return "2012-10-05T10:00:00-00:00"
@classmethod
def get_resources(cls):
""" Returns Ext Resources """
exts = []
plugin = manager.QuantumManager.get_plugin()
resource_name = 'qos_queue'
collection_name = resource_name.replace('_', '-') + "s"
params = RESOURCE_ATTRIBUTE_MAP.get(resource_name + "s", dict())
controller = base.create_resource(collection_name,
resource_name,
plugin, params, allow_bulk=False)
ex = extensions.ResourceExtension(collection_name,
controller)
exts.append(ex)
return exts
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}
class QueuePluginBase(object):
@abstractmethod
def create_qos_queue(self, context, queue):
pass
@abstractmethod
def delete_qos_queue(self, context, id):
pass
@abstractmethod
def get_qos_queue(self, context, id, fields=None):
pass
@abstractmethod
def get_qos_queues(self, context, filters=None, fields=None):
pass

View File

@ -0,0 +1,300 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2013 Nicira Networks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Aaron Rosen, Nicira, Inc
import sqlalchemy as sa
from sqlalchemy.orm import exc
from quantum.api.v2 import attributes as attr
from quantum.db import model_base
from quantum.db import models_v2
from quantum.openstack.common import uuidutils
from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
as ext_qos)
class QoSQueue(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
name = sa.Column(sa.String(255))
default = sa.Column(sa.Boolean, default=False)
min = sa.Column(sa.Integer, nullable=False)
max = sa.Column(sa.Integer, nullable=True)
qos_marking = sa.Column(sa.Enum('untrusted', 'trusted',
name='qosqueues_qos_marking'))
dscp = sa.Column(sa.Integer)
class PortQueueMapping(model_base.BASEV2):
port_id = sa.Column(sa.String(36),
sa.ForeignKey("ports.id", ondelete="CASCADE"),
primary_key=True)
queue_id = sa.Column(sa.String(36), sa.ForeignKey("qosqueues.id"),
primary_key=True)
class NetworkQueueMapping(model_base.BASEV2):
network_id = sa.Column(sa.String(36),
sa.ForeignKey("networks.id", ondelete="CASCADE"),
primary_key=True)
queue_id = sa.Column(sa.String(36), sa.ForeignKey("qosqueues.id",
ondelete="CASCADE"))
class NVPQoSDbMixin(ext_qos.QueuePluginBase):
"""Mixin class to add queues."""
def create_qos_queue(self, context, qos_queue):
q = qos_queue['qos_queue']
with context.session.begin(subtransactions=True):
qos_queue = QoSQueue(id=q.get('id', uuidutils.generate_uuid()),
name=q.get('name'),
tenant_id=q['tenant_id'],
default=q.get('default'),
min=q.get('min'),
max=q.get('max'),
qos_marking=q.get('qos_marking'),
dscp=q.get('dscp'))
context.session.add(qos_queue)
return self._make_qos_queue_dict(qos_queue)
def get_qos_queue(self, context, id, fields=None):
return self._make_qos_queue_dict(
self._get_qos_queue(context, id), fields)
def _get_qos_queue(self, context, id):
try:
return self._get_by_id(context, QoSQueue, id)
except exc.NoResultFound:
raise ext_qos.QueueNotFound(id=id)
def get_qos_queues(self, context, filters=None, fields=None):
return self._get_collection(context, QoSQueue,
self._make_qos_queue_dict,
filters=filters, fields=fields)
def delete_qos_queue(self, context, id):
qos_queue = self._get_qos_queue(context, id)
with context.session.begin(subtransactions=True):
context.session.delete(qos_queue)
def _process_port_queue_mapping(self, context, p):
if not p.get(ext_qos.QUEUE):
return
with context.session.begin(subtransactions=True):
db = PortQueueMapping(port_id=p['id'],
queue_id=p.get(ext_qos.QUEUE))
context.session.add(db)
def _get_port_queue_bindings(self, context, filters=None, fields=None):
return self._get_collection(context, PortQueueMapping,
self._make_port_queue_binding_dict,
filters=filters, fields=fields)
def _delete_port_queue_mapping(self, context, port_id):
query = self._model_query(context, PortQueueMapping)
try:
binding = query.filter(PortQueueMapping.port_id == port_id).one()
except exc.NoResultFound:
# return since this can happen if we are updating a port that
# did not already have a queue on it. There is no need to check
# if there is one before deleting if we return here.
return
with context.session.begin(subtransactions=True):
context.session.delete(binding)
def _process_network_queue_mapping(self, context, network):
if not network.get(ext_qos.QUEUE):
return
with context.session.begin(subtransactions=True):
db = NetworkQueueMapping(network_id=network['id'],
queue_id=network.get(ext_qos.QUEUE))
context.session.add(db)
def _get_network_queue_bindings(self, context, filters=None, fields=None):
return self._get_collection(context, NetworkQueueMapping,
self._make_network_queue_binding_dict,
filters=filters, fields=fields)
def _delete_network_queue_mapping(self, context, network_id):
query = self._model_query(context, NetworkQueueMapping)
try:
with context.session.begin(subtransactions=True):
binding = query.filter_by(network_id=network_id).first()
if binding:
context.session.delete(binding)
except exc.NoResultFound:
# return since this can happen if we are updating a port that
# did not already have a queue on it. There is no need to check
# if there is one before deleting if we return here.
return
def _extend_port_qos_queue(self, context, port):
if self._check_view_auth(context, {'qos_queue': None},
ext_qos.qos_queue_get):
filters = {'port_id': [port['id']]}
fields = ['queue_id']
port[ext_qos.QUEUE] = None
queue_id = self._get_port_queue_bindings(
context, filters, fields)
if queue_id:
port[ext_qos.QUEUE] = queue_id[0]['queue_id']
return port
def _extend_network_qos_queue(self, context, network):
if self._check_view_auth(context, {'qos_queue': None},
ext_qos.qos_queue_get):
filters = {'network_id': [network['id']]}
fields = ['queue_id']
network[ext_qos.QUEUE] = None
queue_id = self._get_network_queue_bindings(
context, filters, fields)
if queue_id:
network[ext_qos.QUEUE] = queue_id[0]['queue_id']
return network
def _make_qos_queue_dict(self, queue, fields=None):
res = {'id': queue['id'],
'name': queue.get('name'),
'default': queue.get('default'),
'tenant_id': queue['tenant_id'],
'min': queue.get('min'),
'max': queue.get('max'),
'qos_marking': queue.get('qos_marking'),
'dscp': queue.get('dscp')}
return self._fields(res, fields)
def _make_port_queue_binding_dict(self, queue, fields=None):
res = {'port_id': queue['port_id'],
'queue_id': queue['queue_id']}
return self._fields(res, fields)
def _make_network_queue_binding_dict(self, queue, fields=None):
res = {'network_id': queue['network_id'],
'queue_id': queue['queue_id']}
return self._fields(res, fields)
def _check_for_queue_and_create(self, context, port):
"""This function determines if a port should be associated with a
queue. It works by first querying NetworkQueueMapping to determine
if the network is associated with a queue. If so, then it queries
NetworkQueueMapping for all the networks that are associated with
this queue. Next, it queries against all the ports on these networks
with the port device_id. Finally it queries PortQueueMapping. If that
query returns a queue_id that is returned. Otherwise a queue is
created that is the size of the queue associated with the network and
that queue_id is returned.
If the network is not associated with a queue we then query to see
if there is a default queue in the system. If so, a copy of that is
created and the queue_id is returned.
Otherwise None is returned. None is also returned if the port does not
have a device_id or if the device_owner is network:
"""
queue_to_create = None
# If there is no device_id don't create a queue. The queue will be
# created on update port when the device_id is present. Also don't
# apply QoS to network ports.
if (not port.get('device_id') or
port['device_owner'].startswith('network:')):
return
# Check if there is a queue assocated with the network
filters = {'network_id': [port['network_id']]}
network_queue_id = self._get_network_queue_bindings(
context, filters, ['queue_id'])
if network_queue_id:
# get networks that queue is assocated with
filters = {'queue_id': [network_queue_id[0]['queue_id']]}
networks_with_same_queue = self._get_network_queue_bindings(
context, filters)
# get the ports on these networks with the same_queue and device_id
filters = {'device_id': [port.get('device_id')],
'network_id': [network['network_id'] for
network in networks_with_same_queue]}
query = self._model_query(context, models_v2.Port)
ports = self._apply_filters_to_query(query, models_v2.Port,
filters).all()
if ports:
# shared queue already exists find the queue id
filters = {'port_id': [p['id'] for p in ports]}
queues = self._get_port_queue_bindings(context, filters,
['queue_id'])
if queues:
return queues[0]['queue_id']
# get the size of the queue we want to create
queue_to_create = self._get_qos_queue(
context, network_queue_id[0]['queue_id'])
else:
# check for default queue
filters = {'default': [True]}
# context is elevated since default queue is owned by admin
queue_to_create = self.get_qos_queues(context.elevated(), filters)
if not queue_to_create:
return
queue_to_create = queue_to_create[0]
# create the queue
tenant_id = self._get_tenant_id_for_create(context, port)
if port.get(ext_qos.RXTX_FACTOR) and queue_to_create.get('max'):
queue_to_create['max'] *= int(port[ext_qos.RXTX_FACTOR])
queue = {'qos_queue': {'name': queue_to_create.get('name'),
'min': queue_to_create.get('min'),
'max': queue_to_create.get('max'),
'dscp': queue_to_create.get('dscp'),
'qos_marking':
queue_to_create.get('qos_marking'),
'tenant_id': tenant_id}}
return self.create_qos_queue(context, queue, False)['id']
def _validate_qos_queue(self, context, qos_queue):
if qos_queue.get('default'):
if context.is_admin:
if self.get_qos_queues(context, filters={'default': [True]}):
raise ext_qos.DefaultQueueAlreadyExists()
else:
raise ext_qos.DefaultQueueCreateNotAdmin()
if (qos_queue.get('qos_marking') == 'trusted' and
not qos_queue.get('dscp')):
raise ext_qos.MissingDSCPForTrusted()
max = qos_queue.get('max')
min = qos_queue.get('min')
# Max can be None
if max and min > max:
raise ext_qos.QueueMinGreaterMax()
def _nvp_lqueue(self, queue):
"""Convert fields to nvp fields."""
nvp_queue = {}
params = {'name': 'display_name',
'qos_marking': 'qos_marking',
'min': 'min_bandwidth_rate',
'max': 'max_bandwidth_rate',
'dscp': 'dscp'}
nvp_queue = dict(
(nvp_name, queue.get(api_name))
for api_name, nvp_name in params.iteritems()
if attr.is_attr_set(queue.get(api_name))
)
return nvp_queue

View File

@ -50,6 +50,9 @@ LSWITCHPORT_RESOURCE = "lport-%s" % LSWITCH_RESOURCE
LROUTER_RESOURCE = "lrouter"
LROUTERPORT_RESOURCE = "lport-%s" % LROUTER_RESOURCE
LROUTERNAT_RESOURCE = "nat-lrouter"
LQUEUE_RESOURCE = "lqueue"
# Current quantum version
QUANTUM_VERSION = "2013.1"
# Constants for NAT rules
MATCH_KEYS = ["destination_ip_addresses", "destination_port_max",
@ -573,7 +576,7 @@ def get_port(cluster, network, port, relations=None):
def _configure_extensions(lport_obj, mac_address, fixed_ips,
port_security_enabled, security_profiles):
port_security_enabled, security_profiles, queue_id):
lport_obj['allowed_address_pairs'] = []
if port_security_enabled:
for fixed_ip in fixed_ips:
@ -587,12 +590,13 @@ def _configure_extensions(lport_obj, mac_address, fixed_ips,
{"mac_address": mac_address,
"ip_address": "0.0.0.0"})
lport_obj['security_profiles'] = list(security_profiles or [])
lport_obj['queue_uuid'] = queue_id
def update_port(cluster, lswitch_uuid, lport_uuid, quantum_port_id, tenant_id,
display_name, device_id, admin_status_enabled,
mac_address=None, fixed_ips=None, port_security_enabled=None,
security_profiles=None):
security_profiles=None, queue_id=None):
# device_id can be longer than 40 so we rehash it
hashed_device_id = hashlib.sha1(device_id).hexdigest()
@ -604,7 +608,8 @@ def update_port(cluster, lswitch_uuid, lport_uuid, quantum_port_id, tenant_id,
dict(scope='vm_id', tag=hashed_device_id)])
_configure_extensions(lport_obj, mac_address, fixed_ips,
port_security_enabled, security_profiles)
port_security_enabled, security_profiles,
queue_id)
path = "/ws.v1/lswitch/" + lswitch_uuid + "/lport/" + lport_uuid
try:
@ -624,7 +629,7 @@ def update_port(cluster, lswitch_uuid, lport_uuid, quantum_port_id, tenant_id,
def create_lport(cluster, lswitch_uuid, tenant_id, quantum_port_id,
display_name, device_id, admin_status_enabled,
mac_address=None, fixed_ips=None, port_security_enabled=None,
security_profiles=None):
security_profiles=None, queue_id=None):
""" Creates a logical port on the assigned logical switch """
# device_id can be longer than 40 so we rehash it
hashed_device_id = hashlib.sha1(device_id).hexdigest()
@ -637,7 +642,8 @@ def create_lport(cluster, lswitch_uuid, tenant_id, quantum_port_id,
)
_configure_extensions(lport_obj, mac_address, fixed_ips,
port_security_enabled, security_profiles)
port_security_enabled, security_profiles,
queue_id)
path = _build_uri_path(LSWITCHPORT_RESOURCE,
parent_resource_id=lswitch_uuid)
@ -1184,3 +1190,29 @@ NVPLIB_FUNC_DICT = {
'create_lrouter_snat_rule': {2: create_lrouter_snat_rule_v2,
3: create_lrouter_snat_rule_v3}
}
# -----------------------------------------------------------------------------
# QOS API Calls
# -----------------------------------------------------------------------------
def create_lqueue(cluster, lqueue):
uri = _build_uri_path(LQUEUE_RESOURCE)
lqueue['tags'] = [{'tag': QUANTUM_VERSION, 'scope': 'quantum'}]
try:
resp_obj = do_single_request(HTTP_POST, uri, json.dumps(lqueue),
cluster=cluster)
except NvpApiClient.NvpApiException:
LOG.exception(_("Failed to create logical queue"))
raise exception.QuantumException()
return json.loads(resp_obj)['uuid']
def delete_lqueue(cluster, id):
try:
do_single_request(HTTP_DELETE,
_build_uri_path(LQUEUE_RESOURCE,
resource_id=id),
cluster=cluster)
except Exception:
LOG.exception(_("Failed to delete logical queue"))
raise exception.QuantumException()

View File

@ -0,0 +1,11 @@
{
"display_name": "%(display_name)s",
"uuid": "%(uuid)s",
"type": "LogicalSwitchConfig",
"_schema": "/ws.v1/schema/LogicalQueueConfig",
"dscp": "%(dscp)s",
"max_bandwidth_rate": "%(max_bandwidth_rate)s",
"min_bandwidth_rate": "%(min_bandwidth_rate)s",
"qos_marking": "%(qos_marking)s",
"_href": "/ws.v1/lqueue/%(uuid)s"
}

View File

@ -30,6 +30,7 @@ class FakeClient:
LPORT_RESOURCE = 'lport'
LROUTER_RESOURCE = 'lrouter'
NAT_RESOURCE = 'nat'
LQUEUE_RESOURCE = 'lqueue'
SECPROF_RESOURCE = 'securityprofile'
LSWITCH_STATUS = 'lswitchstatus'
LROUTER_STATUS = 'lrouterstatus'
@ -41,7 +42,7 @@ class FakeClient:
LROUTER_LPORT_STATUS = 'lrouter_lportstatus'
LROUTER_LPORT_ATT = 'lrouter_lportattachment'
RESOURCES = [LSWITCH_RESOURCE, LROUTER_RESOURCE,
RESOURCES = [LSWITCH_RESOURCE, LROUTER_RESOURCE, LQUEUE_RESOURCE,
LPORT_RESOURCE, NAT_RESOURCE, SECPROF_RESOURCE]
FAKE_GET_RESPONSES = {
@ -63,7 +64,8 @@ class FakeClient:
LSWITCH_LPORT_RESOURCE: "fake_post_lswitch_lport.json",
LROUTER_LPORT_RESOURCE: "fake_post_lrouter_lport.json",
LROUTER_NAT_RESOURCE: "fake_post_lrouter_nat.json",
SECPROF_RESOURCE: "fake_post_security_profile.json"
SECPROF_RESOURCE: "fake_post_security_profile.json",
LQUEUE_RESOURCE: "fake_post_lqueue.json"
}
FAKE_PUT_RESPONSES = {
@ -74,7 +76,8 @@ class FakeClient:
LROUTER_NAT_RESOURCE: "fake_post_lrouter_nat.json",
LSWITCH_LPORT_ATT: "fake_put_lswitch_lport_att.json",
LROUTER_LPORT_ATT: "fake_put_lrouter_lport_att.json",
SECPROF_RESOURCE: "fake_post_security_profile.json"
SECPROF_RESOURCE: "fake_post_security_profile.json",
LQUEUE_RESOURCE: "fake_post_lqueue.json"
}
MANAGED_RELATIONS = {
@ -92,6 +95,7 @@ class FakeClient:
_fake_lswitch_lportstatus_dict = {}
_fake_lrouter_lportstatus_dict = {}
_fake_securityprofile_dict = {}
_fake_lqueue_dict = {}
def __init__(self, fake_files_path):
self.fake_files_path = fake_files_path
@ -138,6 +142,12 @@ class FakeClient:
'gateway_ip_address', '0.0.0.0')
return fake_lrouter
def _add_lqueue(self, body):
fake_lqueue = json.loads(body)
fake_lqueue['uuid'] = uuidutils.generate_uuid()
self._fake_lqueue_dict[fake_lqueue['uuid']] = fake_lqueue
return fake_lqueue
def _add_lswitch_lport(self, body, ls_uuid):
fake_lport = json.loads(body)
new_uuid = uuidutils.generate_uuid()
@ -251,6 +261,7 @@ class FakeClient:
/ws.v1/lrouter/zzz/status
/ws.v1/lrouter/zzz/lport/www
/ws.v1/lrouter/zzz/lport/www/status
/ws.v1/lqueue/xxx
"""
# The first element will always be 'ws.v1' - so we just discard it
uri_split = path.split('/')[1:]
@ -487,3 +498,4 @@ class FakeClient:
self._fake_lrouter_lport_dict.clear()
self._fake_lswitch_lportstatus_dict.clear()
self._fake_lrouter_lportstatus_dict.clear()
self._fake_lqueue_dict.clear()

View File

@ -26,8 +26,11 @@ from quantum.extensions import providernet as pnet
from quantum.extensions import securitygroup as secgrp
from quantum import manager
from quantum.openstack.common import cfg
from quantum.plugins.nicira.nicira_nvp_plugin.extensions import (nvp_qos
as ext_qos)
from quantum.plugins.nicira.nicira_nvp_plugin import nvplib
from quantum.tests.unit.nicira import fake_nvpapiclient
from quantum.tests.unit import test_extensions
import quantum.tests.unit.test_db_plugin as test_plugin
import quantum.tests.unit.test_extension_portsecurity as psec
import quantum.tests.unit.test_extension_security_group as ext_sg
@ -35,6 +38,7 @@ import quantum.tests.unit.test_l3_plugin as test_l3_plugin
LOG = logging.getLogger(__name__)
NICIRA_PKG_PATH = 'quantum.plugins.nicira.nicira_nvp_plugin'
NICIRA_EXT_PATH = "../../plugins/nicira/nicira_nvp_plugin/extensions"
class NiciraPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
@ -149,11 +153,11 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
def _test_create_bridge_network(self, vlan_id=None):
net_type = vlan_id and 'vlan' or 'flat'
name = 'bridge_net'
keys = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
(pnet.NETWORK_TYPE, net_type),
(pnet.PHYSICAL_NETWORK, 'tzuuid'),
(pnet.SEGMENTATION_ID, vlan_id)]
expected = [('subnets', []), ('name', name), ('admin_state_up', True),
('status', 'ACTIVE'), ('shared', False),
(pnet.NETWORK_TYPE, net_type),
(pnet.PHYSICAL_NETWORK, 'tzuuid'),
(pnet.SEGMENTATION_ID, vlan_id)]
providernet_args = {pnet.NETWORK_TYPE: net_type,
pnet.PHYSICAL_NETWORK: 'tzuuid'}
if vlan_id:
@ -163,8 +167,8 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
arg_list=(pnet.NETWORK_TYPE,
pnet.PHYSICAL_NETWORK,
pnet.SEGMENTATION_ID)) as net:
for k, v in keys:
self.assertEquals(net['network'][k], v)
for k, v in expected:
self.assertEqual(net['network'][k], v)
def test_create_bridge_network(self):
self._test_create_bridge_network()
@ -175,7 +179,7 @@ class TestNiciraNetworksV2(test_plugin.TestNetworksV2,
def test_create_bridge_vlan_network_outofrange_returns_400(self):
with self.assertRaises(webob.exc.HTTPClientError) as ctx_manager:
self._test_create_bridge_network(vlan_id=5000)
self.assertEquals(ctx_manager.exception.code, 400)
self.assertEqual(ctx_manager.exception.code, 400)
def test_list_networks_filter_by_id(self):
# We add this unit test to cover some logic specific to the
@ -259,3 +263,230 @@ class TestNiciraL3NatTestCase(test_l3_plugin.L3NatDBTestCase,
self._test_floatingip_with_assoc_fails(
'quantum.plugins.nicira.nicira_nvp_plugin.'
'QuantumPlugin.NvpPluginV2')
class NvpQoSTestExtensionManager(object):
def get_resources(self):
return ext_qos.Nvp_qos.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class TestNiciraQoSQueue(NiciraPluginV2TestCase):
def setUp(self, plugin=None):
ext_path = os.path.join(os.path.dirname(os.path.dirname(__file__)),
NICIRA_EXT_PATH)
cfg.CONF.set_override('api_extensions_path', ext_path)
super(TestNiciraQoSQueue, self).setUp()
ext_mgr = NvpQoSTestExtensionManager()
self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr)
def _create_qos_queue(self, fmt, body, **kwargs):
qos_queue = self.new_create_request('qos-queues', body)
if (kwargs.get('set_context') and 'tenant_id' in kwargs):
# create a specific auth context for this request
qos_queue.environ['quantum.context'] = context.Context(
'', kwargs['tenant_id'])
return qos_queue.get_response(self.ext_api)
@contextlib.contextmanager
def qos_queue(self, name='foo', min='0', max='10',
qos_marking=None, dscp='0', default=None, no_delete=False):
body = {'qos_queue': {'tenant_id': 'tenant',
'name': name,
'min': min,
'max': max}}
if qos_marking:
body['qos_queue']['qos_marking'] = qos_marking
if dscp:
body['qos_queue']['dscp'] = dscp
if default:
body['qos_queue']['default'] = default
res = self._create_qos_queue('json', body)
qos_queue = self.deserialize('json', res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
yield qos_queue
finally:
if not no_delete:
self._delete('qos-queues',
qos_queue['qos_queue']['id'])
def test_create_qos_queue(self):
with self.qos_queue(name='fake_lqueue', min=34, max=44,
qos_marking='untrusted', default=False) as q:
self.assertEqual(q['qos_queue']['name'], 'fake_lqueue')
self.assertEqual(q['qos_queue']['min'], 34)
self.assertEqual(q['qos_queue']['max'], 44)
self.assertEqual(q['qos_queue']['qos_marking'], 'untrusted')
self.assertFalse(q['qos_queue']['default'])
def test_create_qos_queue_default(self):
with self.qos_queue(default=True) as q:
self.assertTrue(q['qos_queue']['default'])
def test_create_qos_queue_two_default_queues_fail(self):
with self.qos_queue(default=True):
body = {'qos_queue': {'tenant_id': 'tenant',
'name': 'second_default_queue',
'default': True}}
res = self._create_qos_queue('json', body)
self.assertEqual(res.status_int, 409)
def test_create_port_with_queue(self):
with self.qos_queue(default=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
self.assertEqual(net1['network'][ext_qos.QUEUE],
q1['qos_queue']['id'])
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
with self.port(device_id=device_id, do_delete=False) as p:
self.assertEqual(len(p['port'][ext_qos.QUEUE]), 36)
def test_create_shared_queue_networks(self):
with self.qos_queue(default=True, no_delete=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
self.assertEqual(net1['network'][ext_qos.QUEUE],
q1['qos_queue']['id'])
res = self._create_network('json', 'net2', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net2 = self.deserialize('json', res)
self.assertEqual(net1['network'][ext_qos.QUEUE],
q1['qos_queue']['id'])
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
res = self._create_port('json', net1['network']['id'],
device_id=device_id)
port1 = self.deserialize('json', res)
res = self._create_port('json', net2['network']['id'],
device_id=device_id)
port2 = self.deserialize('json', res)
self.assertEqual(port1['port'][ext_qos.QUEUE],
port2['port'][ext_qos.QUEUE])
self._delete('ports', port1['port']['id'])
self._delete('ports', port2['port']['id'])
def test_remove_queue_in_use_fail(self):
with self.qos_queue(no_delete=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
res = self._create_port('json', net1['network']['id'],
device_id=device_id)
port = self.deserialize('json', res)
self._delete('qos-queues', port['port'][ext_qos.QUEUE], 409)
def test_update_network_new_queue(self):
with self.qos_queue() as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
with self.qos_queue() as new_q:
data = {'network': {ext_qos.QUEUE: new_q['qos_queue']['id']}}
req = self.new_update_request('networks', data,
net1['network']['id'])
res = req.get_response(self.api)
net1 = self.deserialize('json', res)
self.assertEqual(net1['network'][ext_qos.QUEUE],
new_q['qos_queue']['id'])
def test_update_port_adding_device_id(self):
with self.qos_queue(no_delete=True) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
res = self._create_port('json', net1['network']['id'])
port = self.deserialize('json', res)
self.assertEqual(port['port'][ext_qos.QUEUE], None)
data = {'port': {'device_id': device_id}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = req.get_response(self.api)
port = self.deserialize('json', res)
self.assertEqual(len(port['port'][ext_qos.QUEUE]), 36)
def test_get_port_with_qos_not_admin(self):
body = {'qos_queue': {'tenant_id': 'not_admin',
'name': 'foo', 'min': 20, 'max': 20}}
res = self._create_qos_queue('json', body, tenant_id='not_admin')
q1 = self.deserialize('json', res)
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE, 'tenant_id',),
queue_id=q1['qos_queue']['id'],
tenant_id="not_admin")
net1 = self.deserialize('json', res)
self.assertEqual(len(net1['network'][ext_qos.QUEUE]), 36)
res = self._create_port('json', net1['network']['id'],
tenant_id='not_admin', set_context=True)
port = self.deserialize('json', res)
self.assertEqual(ext_qos.QUEUE not in port['port'], True)
def test_non_admin_cannot_create_queue(self):
body = {'qos_queue': {'tenant_id': 'not_admin',
'name': 'foo', 'min': 20, 'max': 20}}
res = self._create_qos_queue('json', body, tenant_id='not_admin',
set_context=True)
self.assertEqual(res.status_int, 403)
def test_update_port_non_admin_does_not_show_queue_id(self):
body = {'qos_queue': {'tenant_id': 'not_admin',
'name': 'foo', 'min': 20, 'max': 20}}
res = self._create_qos_queue('json', body, tenant_id='not_admin')
q1 = self.deserialize('json', res)
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
tenant_id='not_admin',
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
res = self._create_port('json', net1['network']['id'],
tenant_id='not_admin', set_context=True)
port = self.deserialize('json', res)
device_id = "00fff4d0-e4a8-4a3a-8906-4c4cdafb59f1"
data = {'port': {'device_id': device_id}}
quantum_context = context.Context('', 'not_admin')
port = self._update('ports', port['port']['id'], data,
quantum_context=quantum_context)
self.assertEqual(ext_qos.QUEUE not in port['port'], True)
def test_rxtx_factor(self):
with self.qos_queue(max=10) as q1:
res = self._create_network('json', 'net1', True,
arg_list=(ext_qos.QUEUE,),
queue_id=q1['qos_queue']['id'])
net1 = self.deserialize('json', res)
res = self._create_port('json', net1['network']['id'],
arg_list=(ext_qos.RXTX_FACTOR,),
rxtx_factor=2, device_id='1')
port = self.deserialize('json', res)
req = self.new_show_request('qos-queues',
port['port'][ext_qos.QUEUE])
res = req.get_response(self.ext_api)
queue = self.deserialize('json', res)
self.assertEqual(queue['qos_queue']['max'], 20)