[L3][QoS] Neutron server side Floating IP QoS

Add qos_policy_id to floating IP, then the user could set/update
the binding QoS policy of a floating IP.

APIImpact: a new parameter qos_policy_id was added to floating IP
related API.

Partial-Bug: #1596611
Partially-Implements blueprint: floating-ip-rate-limit
Change-Id: I4efe9e49d268dffeb3df4de4ea1780152218633b
This commit is contained in:
LIU Yulong 2016-04-26 17:24:11 +08:00 committed by LIU Yulong
parent 921cfa9cdb
commit 8fcda21a06
15 changed files with 586 additions and 12 deletions

View File

@ -50,6 +50,16 @@ class NetworkQosBindingNotFound(e.NotFound):
"could not be found.")
class FloatingIPQosBindingNotFound(e.NotFound):
message = _("QoS binding for floating IP %(fip_id)s and policy "
"%(policy_id)s could not be found.")
class FloatingIPQosBindingError(e.NeutronException):
message = _("QoS binding for floating IP %(fip_id)s and policy "
"%(policy_id)s could not be created: %(db_error)s.")
class NetworkQosBindingError(e.NeutronException):
message = _("QoS binding for network %(net_id)s and policy %(policy_id)s "
"could not be created: %(db_error)s.")

View File

@ -49,6 +49,7 @@ from neutron.db.models import l3 as l3_models
from neutron.db import models_v2
from neutron.db import standardattrdescription_db as st_attr
from neutron.extensions import l3
from neutron.extensions import qos_fip
from neutron.objects import ports as port_obj
from neutron.objects import router as l3_obj
from neutron.plugins.common import utils as p_utils
@ -85,6 +86,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
_dns_integration = None
_fip_qos = None
def __new__(cls):
inst = super(L3_NAT_dbonly_mixin, cls).__new__(cls)
inst._start_janitor()
@ -110,6 +113,14 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
'dns-domain-ports'))
return self._dns_integration
@property
def _is_fip_qos_supported(self):
if self._fip_qos is None:
# Check L3 service plugin
self._fip_qos = utils.is_extension_supported(
self, qos_fip.FIP_QOS_ALIAS)
return self._fip_qos
@property
def _core_plugin(self):
return directory.get_plugin()
@ -1308,6 +1319,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
if self._is_dns_integration_supported:
dns_data = self._process_dns_floatingip_create_precommit(
context, floatingip_dict, fip)
if self._is_fip_qos_supported:
self._process_extra_fip_qos_create(context, fip_id, fip)
self._core_plugin.update_port(context.elevated(), external_port['id'],
{'port': {'device_id': fip_id}})
@ -1342,6 +1355,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
if self._is_dns_integration_supported:
dns_data = self._process_dns_floatingip_update_precommit(
context, floatingip_dict)
if self._is_fip_qos_supported:
self._process_extra_fip_qos_update(context,
floatingip_db,
fip,
old_floatingip)
registry.notify(resources.FLOATING_IP,
events.AFTER_UPDATE,
self._update_fip_assoc,

85
neutron/db/l3_fip_qos.py Normal file
View File

@ -0,0 +1,85 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib.services.qos import constants as qos_consts
from neutron.common import exceptions as n_exc
from neutron.db import _resource_extend as resource_extend
from neutron.extensions import l3
from neutron.objects.db import api as obj_db_api
from neutron.objects.qos import policy as policy_object
@resource_extend.has_resource_extenders
class FloatingQoSDbMixin(object):
"""Mixin class to enable floating IP's QoS extra attributes."""
@staticmethod
@resource_extend.extends([l3.FLOATINGIPS])
def _extend_extra_fip_dict(fip_res, fip_db):
if fip_db.get('qos_policy_binding'):
fip_res[qos_consts.QOS_POLICY_ID] = (
fip_db.qos_policy_binding.policy_id)
else:
fip_res[qos_consts.QOS_POLICY_ID] = None
return fip_res
def _get_policy_obj(self, context, policy_id):
obj = policy_object.QosPolicy.get_object(context, id=policy_id)
if obj is None:
raise n_exc.QosPolicyNotFound(policy_id=policy_id)
return obj
def _create_fip_qos_db(self, context, fip_id, policy_id):
policy = self._get_policy_obj(context, policy_id)
policy.attach_floatingip(fip_id)
binding_db_obj = obj_db_api.get_object(
context, policy.fip_binding_model, fip_id=fip_id)
return binding_db_obj
def _delete_fip_qos_db(self, context, fip_id, policy_id):
policy = self._get_policy_obj(context, policy_id)
policy.detach_floatingip(fip_id)
def _process_extra_fip_qos_create(self, context, fip_id, fip):
qos_policy_id = fip.get(qos_consts.QOS_POLICY_ID)
if not qos_policy_id:
return
self._create_fip_qos_db(context, fip_id, qos_policy_id)
def _process_extra_fip_qos_update(
self, context, floatingip_db, fip, old_floatingip):
if qos_consts.QOS_POLICY_ID not in fip:
# No qos_policy_id in API input, do nothing
return
new_qos_policy_id = fip.get(qos_consts.QOS_POLICY_ID)
old_qos_policy_id = old_floatingip.get(qos_consts.QOS_POLICY_ID)
if old_qos_policy_id == new_qos_policy_id:
return
if old_qos_policy_id:
self._delete_fip_qos_db(context,
floatingip_db['id'],
old_qos_policy_id)
if floatingip_db.qos_policy_binding:
floatingip_db.qos_policy_binding['policy_id'] = new_qos_policy_id
if not new_qos_policy_id:
return
qos_policy_binding = self._create_fip_qos_db(
context,
floatingip_db['id'],
new_qos_policy_id)
if not floatingip_db.qos_policy_binding:
floatingip_db.qos_policy_binding = qos_policy_binding

View File

@ -1 +1 @@
7d32f979895f
594422d373ee

View File

@ -0,0 +1,43 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""fip qos
Revision ID: 594422d373ee
Revises: 7d32f979895f
Create Date: 2016-04-26 17:16:10.323756
"""
# revision identifiers, used by Alembic.
revision = '594422d373ee'
down_revision = '7d32f979895f'
from alembic import op
import sqlalchemy as sa
from neutron_lib.db import constants as db_const
def upgrade():
op.create_table(
'qos_fip_policy_bindings',
sa.Column('policy_id',
sa.String(length=db_const.UUID_FIELD_SIZE),
sa.ForeignKey('qos_policies.id', ondelete='CASCADE'),
nullable=False),
sa.Column('fip_id',
sa.String(length=db_const.UUID_FIELD_SIZE),
sa.ForeignKey('floatingips.id', ondelete='CASCADE'),
nullable=False, unique=True))

View File

@ -18,6 +18,7 @@ from neutron_lib.db import constants as db_const
from neutron_lib.db import model_base
import sqlalchemy as sa
from neutron.db.models import l3
from neutron.db import models_v2
from neutron.db import rbac_db_models
from neutron.db import standard_attr
@ -55,6 +56,26 @@ class QosNetworkPolicyBinding(model_base.BASEV2):
cascade='delete', lazy='joined'))
class QosFIPPolicyBinding(model_base.BASEV2):
__tablename__ = 'qos_fip_policy_bindings'
policy_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
sa.ForeignKey('qos_policies.id',
ondelete='CASCADE'),
nullable=False,
primary_key=True)
fip_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
sa.ForeignKey('floatingips.id',
ondelete='CASCADE'),
nullable=False,
unique=True,
primary_key=True)
revises_on_change = ('floatingip', )
floatingip = sa.orm.relationship(
l3.FloatingIP, load_on_pending=True,
backref=sa.orm.backref("qos_policy_binding", uselist=False,
cascade='delete', lazy='joined'))
class QosPortPolicyBinding(model_base.BASEV2):
__tablename__ = 'qos_port_policy_bindings'
policy_id = sa.Column(sa.String(36),

View File

@ -0,0 +1,57 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import l3
from neutron_lib.api import extensions
from neutron_lib.services.qos import constants as qos_consts
FIP_QOS_ALIAS = "qos-fip"
EXTENDED_ATTRIBUTES_2_0 = {
l3.FLOATINGIPS: {
qos_consts.QOS_POLICY_ID: {
'allow_post': True,
'allow_put': True,
'is_visible': True,
'default': None,
'validate': {'type:uuid_or_none': None}}
}
}
class Qos_fip(extensions.ExtensionDescriptor):
"""Extension class supporting floating IP QoS in all router."""
@classmethod
def get_name(cls):
return "Floating IP QoS"
@classmethod
def get_alias(cls):
return FIP_QOS_ALIAS
@classmethod
def get_description(cls):
return "The floating IP Quality of Service extension"
@classmethod
def get_updated(cls):
return "2017-07-20T00:00:00-00:00"
def get_required_extensions(self):
return ["router", "qos"]
def get_extended_resources(self, version):
if version == "2.0":
return EXTENDED_ATTRIBUTES_2_0
else:
return {}

View File

@ -49,3 +49,19 @@ class QosPolicyNetworkBinding(base.NeutronDbObject):
primary_keys = ['network_id']
fields_no_update = ['policy_id', 'network_id']
@base.NeutronObjectRegistry.register
class QosPolicyFloatingIPBinding(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = qos_db_model.QosFIPPolicyBinding
fields = {
'policy_id': common_types.UUIDField(),
'fip_id': common_types.UUIDField()
}
primary_keys = ['policy_id', 'fip_id']
fields_no_update = ['policy_id', 'fip_id']

View File

@ -23,6 +23,7 @@ from oslo_versionedobjects import fields as obj_fields
from neutron.common import exceptions
from neutron.db import api as db_api
from neutron.db.models import l3
from neutron.db import models_v2
from neutron.db.qos import models as qos_db_model
from neutron.db.rbac_db_models import QosPolicyRBAC
@ -43,7 +44,8 @@ class QosPolicy(rbac_db.NeutronRbacObject):
# Version 1.4: Changed tenant_id to project_id
# Version 1.5: Direction for bandwidth limit rule added
# Version 1.6: Added "is_default" field
VERSION = '1.6'
# Version 1.7: Added floating IP bindings
VERSION = '1.7'
# required by RbacNeutronMetaclass
rbac_db_model = QosPolicyRBAC
@ -51,6 +53,7 @@ class QosPolicy(rbac_db.NeutronRbacObject):
port_binding_model = qos_db_model.QosPortPolicyBinding
network_binding_model = qos_db_model.QosNetworkPolicyBinding
fip_binding_model = qos_db_model.QosFIPPolicyBinding
fields = {
'id': common_types.UUIDField(),
@ -68,7 +71,8 @@ class QosPolicy(rbac_db.NeutronRbacObject):
extra_filter_names = {'is_default'}
binding_models = {'port': binding.QosPolicyPortBinding,
'network': binding.QosPolicyNetworkBinding}
'network': binding.QosPolicyNetworkBinding,
'fip': binding.QosPolicyFloatingIPBinding}
def obj_load_attr(self, attrname):
if attrname == 'rules':
@ -164,6 +168,11 @@ class QosPolicy(rbac_db.NeutronRbacObject):
return cls._get_object_policy(context, cls.port_binding_model,
port_id=port_id)
@classmethod
def get_fip_policy(cls, context, fip_id):
return cls._get_object_policy(context, cls.fip_binding_model,
fip_id=fip_id)
# TODO(QoS): Consider extending base to trigger registered methods for us
def create(self):
with db_api.autonested_transaction(self.obj_context.session):
@ -218,6 +227,16 @@ class QosPolicy(rbac_db.NeutronRbacObject):
port_id=port_id,
db_error=e)
def attach_floatingip(self, fip_id):
fip_binding_obj = binding.QosPolicyFloatingIPBinding(
self.obj_context, policy_id=self.id, fip_id=fip_id)
try:
fip_binding_obj.create()
except db_exc.DBReferenceError as e:
raise exceptions.FloatingIPQosBindingError(policy_id=self.id,
fip_id=fip_id,
db_error=e)
def detach_network(self, network_id):
deleted = binding.QosPolicyNetworkBinding.delete_objects(
self.obj_context, network_id=network_id)
@ -232,6 +251,13 @@ class QosPolicy(rbac_db.NeutronRbacObject):
raise exceptions.PortQosBindingNotFound(port_id=port_id,
policy_id=self.id)
def detach_floatingip(self, fip_id):
deleted = binding.QosPolicyFloatingIPBinding.delete_objects(
self.obj_context, fip_id=fip_id)
if not deleted:
raise exceptions.FloatingIPQosBindingNotFound(fip_id=fip_id,
policy_id=self.id)
def set_default(self):
if not self.get_default():
qos_default_policy = QosPolicyDefault(self.obj_context,
@ -268,6 +294,13 @@ class QosPolicy(rbac_db.NeutronRbacObject):
self.obj_context, policy_id=self.id)
]
def get_bound_floatingips(self):
return [
fb.fip_id
for fb in binding.QosPolicyFloatingIPBinding.get_objects(
self.obj_context, policy_id=self.id)
]
@classmethod
def _get_bound_tenant_ids(cls, session, binding_db, bound_db,
binding_db_id_column, policy_id):
@ -286,6 +319,8 @@ class QosPolicy(rbac_db.NeutronRbacObject):
qosnet = qos_db_model.QosNetworkPolicyBinding
port = models_v2.Port
qosport = qos_db_model.QosPortPolicyBinding
fip = l3.FloatingIP
qosfip = qos_db_model.QosFIPPolicyBinding
bound_tenants = []
with db_api.autonested_transaction(context.session):
bound_tenants.extend(cls._get_bound_tenant_ids(
@ -293,6 +328,9 @@ class QosPolicy(rbac_db.NeutronRbacObject):
bound_tenants.extend(
cls._get_bound_tenant_ids(context.session, qosport, port,
qosport.port_id, policy_id))
bound_tenants.extend(
cls._get_bound_tenant_ids(context.session, qosfip, fip,
qosfip.fip_id, policy_id))
return set(bound_tenants)
def obj_make_compatible(self, primitive, target_version):

View File

@ -31,6 +31,7 @@ from neutron.db import dns_db
from neutron.db import extraroute_db
from neutron.db import l3_dvr_ha_scheduler_db
from neutron.db import l3_dvrscheduler_db
from neutron.db import l3_fip_qos
from neutron.db import l3_gwmode_db
from neutron.db import l3_hamode_db
from neutron.db import l3_hascheduler_db
@ -51,6 +52,13 @@ def disable_dvr_extension_by_config(aliases):
aliases.remove('dvr')
def disable_qos_fip_extension_by_plugins(aliases):
qos_class = 'neutron.services.qos.qos_plugin.QoSPlugin'
if all(p not in cfg.CONF.service_plugins for p in ['qos', qos_class]):
if 'qos-fip' in aliases:
aliases.remove('qos-fip')
@resource_extend.has_resource_extenders
class L3RouterPlugin(service_base.ServicePluginBase,
common_db_mixin.CommonDbMixin,
@ -58,7 +66,8 @@ class L3RouterPlugin(service_base.ServicePluginBase,
l3_hamode_db.L3_HA_NAT_db_mixin,
l3_gwmode_db.L3_NAT_db_mixin,
l3_dvr_ha_scheduler_db.L3_DVR_HA_scheduler_db_mixin,
dns_db.DNSDbMixin):
dns_db.DNSDbMixin,
l3_fip_qos.FloatingQoSDbMixin):
"""Implementation of the Neutron L3 Router Service Plugin.
@ -72,7 +81,7 @@ class L3RouterPlugin(service_base.ServicePluginBase,
_supported_extension_aliases = ["dvr", "router", "ext-gw-mode",
"extraroute", "l3_agent_scheduler",
"l3-ha", "router_availability_zone",
"l3-flavors"]
"l3-flavors", "qos-fip"]
__native_pagination_support = True
__native_sorting_support = True
@ -101,6 +110,7 @@ class L3RouterPlugin(service_base.ServicePluginBase,
if not hasattr(self, '_aliases'):
aliases = self._supported_extension_aliases[:]
disable_dvr_extension_by_config(aliases)
disable_qos_fip_extension_by_plugins(aliases)
self._aliases = aliases
return self._aliases

View File

@ -0,0 +1,250 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from neutron_lib import context
from neutron_lib.services.qos import constants as qos_consts
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.common import exceptions as n_exception
from neutron.db import l3_fip_qos
from neutron.extensions import l3
from neutron.extensions import qos_fip
from neutron.objects.qos import policy
from neutron.tests.unit.extensions import test_l3
class FloatingIPQoSTestExtensionManager(object):
def get_resources(self):
l3.RESOURCE_ATTRIBUTE_MAP['floatingips'].update(
qos_fip.EXTENDED_ATTRIBUTES_2_0['floatingips'])
return l3.L3.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class TestFloatingIPQoSIntPlugin(
test_l3.TestL3NatIntPlugin,
l3_fip_qos.FloatingQoSDbMixin):
supported_extension_aliases = ["external-net", "router",
qos_fip.FIP_QOS_ALIAS]
class TestFloatingIPQoSL3NatServicePlugin(
test_l3.TestL3NatServicePlugin,
l3_fip_qos.FloatingQoSDbMixin):
supported_extension_aliases = ["router", qos_fip.FIP_QOS_ALIAS]
class FloatingIPQoSDBTestCaseBase(object):
def test_create_fip_with_qos_policy_id(self):
ctx = context.get_admin_context()
policy_obj = policy.QosPolicy(ctx,
id=uuidutils.generate_uuid(),
project_id='tenant', name='pol1',
rules=[])
policy_obj.create()
with self.subnet(cidr='11.0.0.0/24') as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
fip = self._make_floatingip(
self.fmt,
network_id,
qos_policy_id=policy_obj.id)
self.assertEqual(policy_obj.id,
fip['floatingip'][qos_consts.QOS_POLICY_ID])
def test_fip_has_qos_policy_id_remove_policy(self):
ctx = context.get_admin_context()
policy_obj = policy.QosPolicy(ctx,
id=uuidutils.generate_uuid(),
project_id='tenant', name='pol1',
rules=[])
policy_obj.create()
with self.subnet(cidr='11.0.0.0/24') as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
fip = self._make_floatingip(
self.fmt,
network_id,
qos_policy_id=policy_obj.id)
self.assertEqual(policy_obj.id,
fip['floatingip'][qos_consts.QOS_POLICY_ID])
self.assertRaises(n_exception.QosPolicyInUse, policy_obj.delete)
def test_floatingip_update_qos_policy_id(self):
ctx = context.get_admin_context()
policy_obj_1 = policy.QosPolicy(ctx,
id=uuidutils.generate_uuid(),
project_id='tenant', name='pol2',
rules=[])
policy_obj_1.create()
policy_obj_2 = policy.QosPolicy(ctx,
id=uuidutils.generate_uuid(),
project_id='tenant', name='pol3',
rules=[])
policy_obj_2.create()
with self.subnet(cidr='11.0.0.0/24') as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
fip = self._make_floatingip(
self.fmt,
network_id,
qos_policy_id=policy_obj_1.id)
self.assertEqual(policy_obj_1.id,
fip['floatingip'][qos_consts.QOS_POLICY_ID])
body = self._show('floatingips', fip['floatingip']['id'])
self.assertEqual(policy_obj_1.id,
body['floatingip'][qos_consts.QOS_POLICY_ID])
body = self._update(
'floatingips', fip['floatingip']['id'],
{'floatingip': {qos_consts.QOS_POLICY_ID: policy_obj_2.id}})
self.assertEqual(policy_obj_2.id,
body['floatingip'][qos_consts.QOS_POLICY_ID])
def test_floatingip_adding_qos_policy_id_by_update(self):
ctx = context.get_admin_context()
policy_obj = policy.QosPolicy(ctx,
id=uuidutils.generate_uuid(),
project_id='tenant', name='pol4',
rules=[])
policy_obj.create()
with self.subnet(cidr='11.0.0.0/24') as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
fip = self._make_floatingip(
self.fmt,
network_id)
self.assertIsNone(fip['floatingip'].get(qos_consts.QOS_POLICY_ID))
body = self._update(
'floatingips', fip['floatingip']['id'],
{'floatingip': {qos_consts.QOS_POLICY_ID: policy_obj.id}})
body = self._show('floatingips', body['floatingip']['id'])
self.assertEqual(policy_obj.id,
body['floatingip'][qos_consts.QOS_POLICY_ID])
def test_floatingip_remove_qos_policy_id(self):
ctx = context.get_admin_context()
policy_obj = policy.QosPolicy(ctx,
id=uuidutils.generate_uuid(),
project_id='tenant', name='pol5',
rules=[])
policy_obj.create()
with self.subnet(cidr='11.0.0.0/24') as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
fip = self._make_floatingip(
self.fmt,
network_id,
qos_policy_id=policy_obj.id)
self.assertEqual(policy_obj.id,
fip['floatingip'][qos_consts.QOS_POLICY_ID])
self._update(
'floatingips', fip['floatingip']['id'],
{'floatingip': {qos_consts.QOS_POLICY_ID: None}})
body = self._show('floatingips', fip['floatingip']['id'])
self.assertIsNone(
body['floatingip'].get(qos_consts.QOS_POLICY_ID))
def test_floatingip_update_change_nothing(self):
ctx = context.get_admin_context()
policy_obj = policy.QosPolicy(ctx,
id=uuidutils.generate_uuid(),
project_id='tenant', name='pol2',
rules=[])
policy_obj.create()
with self.subnet(cidr='11.0.0.0/24') as s:
network_id = s['subnet']['network_id']
self._set_net_external(network_id)
fip = self._make_floatingip(
self.fmt,
network_id)
self.assertIsNone(fip['floatingip'].get(qos_consts.QOS_POLICY_ID))
# Updating policy_id from None to None
body = self._update(
'floatingips', fip['floatingip']['id'],
{'floatingip': {qos_consts.QOS_POLICY_ID: None}})
self.assertIsNone(
body['floatingip'].get(qos_consts.QOS_POLICY_ID))
body = self._show('floatingips', fip['floatingip']['id'])
self.assertIsNone(
body['floatingip'].get(qos_consts.QOS_POLICY_ID))
body = self._update(
'floatingips', fip['floatingip']['id'],
{'floatingip': {qos_consts.QOS_POLICY_ID: policy_obj.id}})
self.assertEqual(policy_obj.id,
body['floatingip'][qos_consts.QOS_POLICY_ID])
# Updating again with same policy_id
body = self._update(
'floatingips', fip['floatingip']['id'],
{'floatingip': {qos_consts.QOS_POLICY_ID: policy_obj.id}})
self.assertEqual(policy_obj.id,
body['floatingip'][qos_consts.QOS_POLICY_ID])
class FloatingIPQoSDBIntTestCase(test_l3.L3BaseForIntTests,
test_l3.L3NatTestCaseMixin,
FloatingIPQoSDBTestCaseBase):
def setUp(self, plugin=None):
if not plugin:
plugin = ('neutron.tests.unit.extensions.test_qos_fip.'
'TestFloatingIPQoSIntPlugin')
service_plugins = {'qos': 'neutron.services.qos.qos_plugin.QoSPlugin'}
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
cfg.CONF.set_default('max_routes', 3)
ext_mgr = FloatingIPQoSTestExtensionManager()
super(test_l3.L3BaseForIntTests, self).setUp(
plugin=plugin,
ext_mgr=ext_mgr,
service_plugins=service_plugins)
self.setup_notification_driver()
class FloatingIPQoSDBSepTestCase(test_l3.L3BaseForSepTests,
test_l3.L3NatTestCaseMixin,
FloatingIPQoSDBTestCaseBase):
def setUp(self):
# the plugin without L3 support
plugin = 'neutron.tests.unit.extensions.test_l3.TestNoL3NatPlugin'
# the L3 service plugin
l3_plugin = ('neutron.tests.unit.extensions.test_qos_fip.'
'TestFloatingIPQoSL3NatServicePlugin')
service_plugins = {'l3_plugin_name': l3_plugin,
'qos': 'neutron.services.qos.qos_plugin.QoSPlugin'}
# for these tests we need to enable overlapping ips
cfg.CONF.set_default('allow_overlapping_ips', True)
cfg.CONF.set_default('max_routes', 3)
ext_mgr = FloatingIPQoSTestExtensionManager()
super(test_l3.L3BaseForSepTests, self).setUp(
plugin=plugin,
ext_mgr=ext_mgr,
service_plugins=service_plugins)
self.setup_notification_driver()

View File

@ -49,3 +49,22 @@ class QosPolicyNetworkBindingDbObjectTestCase(test_base.BaseDbObjectTestCase,
for db_obj in self.db_objs:
self._create_test_qos_policy(id=db_obj['policy_id'])
self._create_test_network(network_id=db_obj['network_id'])
class QosPolicyFloatingIPBindingObjectTestCase(
test_base.BaseObjectIfaceTestCase):
_test_class = binding.QosPolicyFloatingIPBinding
class QosPolicyFloatingIPBindingDbObjectTestCase(
test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = binding.QosPolicyFloatingIPBinding
def setUp(self):
super(QosPolicyFloatingIPBindingDbObjectTestCase, self).setUp()
for db_obj in self.db_objs:
self._create_test_qos_policy(id=db_obj['policy_id'])
self._create_test_fip_id(fip_id=db_obj['fip_id'])

View File

@ -1443,17 +1443,19 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
ext_net.create()
return ext_net.network_id
def _create_test_fip_id(self):
def _create_test_fip_id(self, fip_id=None):
fake_fip = '172.23.3.0'
ext_net_id = self._create_external_network_id()
# TODO(manjeets) replace this with fip ovo
# once it is implemented
return obj_db_api.create_object(
self.context, l3_model.FloatingIP,
{'floating_ip_address': fake_fip,
values = {'floating_ip_address': fake_fip,
'floating_network_id': ext_net_id,
'floating_port_id': self._create_test_port_id(
network_id=ext_net_id)}).id
network_id=ext_net_id)}
if fip_id:
values['id'] = fip_id
return obj_db_api.create_object(
self.context, l3_model.FloatingIP, values).id
def _create_test_subnet_id(self, network_id=None):
if not network_id:

View File

@ -74,8 +74,9 @@ object_data = {
'QosMinimumBandwidthRule': '1.3-314c3419f4799067cc31cc319080adff',
'QosRuleType': '1.3-7286188edeb3a0386f9cf7979b9700fc',
'QosRuleTypeDriver': '1.0-7d8cb9f0ef661ac03700eae97118e3db',
'QosPolicy': '1.6-4adb0cde3102c10d8970ec9487fd7fe7',
'QosPolicy': '1.7-4adb0cde3102c10d8970ec9487fd7fe7',
'QosPolicyDefault': '1.0-59e5060eedb1f06dd0935a244d27d11c',
'QosPolicyFloatingIPBinding': '1.0-5625df4205a18778cd6aa40f99be024e',
'QosPolicyNetworkBinding': '1.0-df53a1e0f675aab8d27a1ccfed38dc42',
'QosPolicyPortBinding': '1.0-66cb364ac99aa64523ade07f9f868ea6',
'Quota': '1.0-6bb6a0f1bd5d66a2134ffa1a61873097',

View File

@ -0,0 +1,4 @@
---
features:
- Implementation of floating IP QoS. A new parameter ``qos_policy_id``
was added to floating IP related API.