[L3][QoS] L3 agent side Floating IP rate limit

This patch implements the L3 agent side floating IP rate limit.
For all routers, if floating IP has qos rules, the corresponding
TC filters will be added to:
1. for legacy/HA router, the device is qg-device of qrouter-namespace,
   aka router gateway in network node.
2. for dvr (HA) router in compute node, the device is rfp-device, the
   namespace is qrouter-namespace.
3. for dvr (HA) router in network node, the device is qg-device in
   snat-namespace.

Partially-Implements blueprint: floating-ip-rate-limit

Change-Id: Ie8a5fe4ebaeccfb3998732dd972663c54542b5bf
This commit is contained in:
LIU Yulong 2017-04-05 18:04:13 +08:00
parent dada32e95b
commit d0aaae2800
9 changed files with 1035 additions and 8 deletions

View File

View File

@ -0,0 +1,359 @@
# Copyright 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib.agent import l3_extension
from neutron_lib import constants
from neutron_lib.db import constants as db_consts
from neutron_lib.services.qos import constants as qos_consts
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.agent.linux import ip_lib
from neutron.agent.linux import l3_tc_lib as tc_lib
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
LOG = logging.getLogger(__name__)
SUPPORTED_RULES = {
qos_consts.RULE_TYPE_BANDWIDTH_LIMIT: {
qos_consts.MAX_KBPS: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.MAX_BURST: {
'type:range': [0, db_consts.DB_INTEGER_MAX_VALUE]},
qos_consts.DIRECTION: {
'type:values': constants.VALID_DIRECTIONS}
}
}
# We use the default values to illustrate:
# 1. QoS policy does not have some direction `bandwidth_limit`, then we use
# the default value.
# 2. default value 0 will be treated as no limit.
# 3. if one floating IP's rate was changed from x to 0, the extension will do
# a tc filter clean procedure.
FIP_DEFAULT_RATE = 0
FIP_DEFAULT_BURST = 0
class RouterFipRateLimitMaps(object):
def __init__(self):
self.qos_policy_fips = collections.defaultdict(dict)
self.known_policies = {}
self.fip_policies = {}
"""
The router_floating_ips will be:
router_floating_ips = {
router_id_1: set(fip1, fip2),
router_id_2: set(), # default
}
"""
self.router_floating_ips = {}
"""
The rate limits dict will be:
xxx_ratelimits = {
fip_1: (rate, burst),
fip_2: (FIP_DEFAULT_RATE, FIP_DEFAULT_BURST), # default
fip_3: (1, 2),
fip_4: (3, 4),
}
"""
self.ingress_ratelimits = {}
self.egress_ratelimits = {}
def update_policy(self, policy):
self.known_policies[policy.id] = policy
def get_policy(self, policy_id):
return self.known_policies.get(policy_id)
def get_fips(self, policy):
return self.qos_policy_fips[policy.id].values()
def get_fip_policy(self, fip):
policy_id = self.fip_policies.get(fip)
return self.get_policy(policy_id)
def set_fip_policy(self, fip, policy):
"""Attach a fip to policy and return any previous policy on fip."""
old_policy = self.get_fip_policy(fip)
self.update_policy(policy)
self.fip_policies[fip] = policy.id
self.qos_policy_fips[policy.id][fip] = fip
if old_policy and old_policy.id != policy.id:
del self.qos_policy_fips[old_policy.id][fip]
def clean_by_fip(self, fip):
"""Detach fip from policy and cleanup data we don't need anymore."""
if fip in self.fip_policies:
del self.fip_policies[fip]
for qos_policy_id, fip_dict in self.qos_policy_fips.items():
if fip in fip_dict:
del fip_dict[fip]
if not fip_dict:
self._clean_policy_info(qos_policy_id)
return
LOG.debug("Floating IP QoS extension did not have "
"information on floating IP %s", fip)
def _clean_policy_info(self, qos_policy_id):
del self.qos_policy_fips[qos_policy_id]
del self.known_policies[qos_policy_id]
def find_fip_router_id(self, fip):
for router_id, ips in self.router_floating_ips.items():
if fip in ips:
return router_id
class FipQosAgentExtension(l3_extension.L3AgentExtension):
SUPPORTED_RESOURCE_TYPES = [resources.QOS_POLICY]
def initialize(self, connection, driver_type):
"""Initialize agent extension."""
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self.fip_qos_map = RouterFipRateLimitMaps()
self._register_rpc_consumers()
def consume_api(self, agent_api):
self.agent_api = agent_api
@lockutils.synchronized('qos-fip')
def _handle_notification(self, context, resource_type,
qos_policies, event_type):
if event_type == events.UPDATED:
for qos_policy in qos_policies:
self._process_update_policy(qos_policy)
def _policy_rules_modified(self, old_policy, policy):
return not (len(old_policy.rules) == len(policy.rules) and
all(i in old_policy.rules for i in policy.rules))
def _process_update_policy(self, qos_policy):
old_qos_policy = self.fip_qos_map.get_policy(qos_policy.id)
if old_qos_policy:
if self._policy_rules_modified(old_qos_policy, qos_policy):
for fip in self.fip_qos_map.get_fips(qos_policy):
router_id = self.fip_qos_map.find_fip_router_id(fip)
router_info = self._get_router_info(router_id)
if not router_info:
continue
device = self._get_rate_limit_ip_device(router_info)
if not device:
LOG.debug("Router %s does not have a floating IP "
"related device, skipping.", router_id)
continue
rates = self.get_policy_rates(qos_policy)
self.process_ip_rates(fip, device, rates)
self.fip_qos_map.update_policy(qos_policy)
def _process_reset_fip(self, fip):
self.fip_qos_map.clean_by_fip(fip)
def _register_rpc_consumers(self):
registry.register(self._handle_notification, resources.QOS_POLICY)
self._connection = n_rpc.create_connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
topic = resources_rpc.resource_type_versioned_topic(
resources.QOS_POLICY)
self._connection.create_consumer(topic, endpoints, fanout=True)
self._connection.consume_in_threads()
def _get_tc_wrapper(self, device):
return tc_lib.FloatingIPTcCommand(device.name,
namespace=device.namespace)
def process_ip_rate_limit(self, ip, direction, device, rate, burst):
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
old_rate, old_burst = rate_limits.get(ip, (FIP_DEFAULT_RATE,
FIP_DEFAULT_BURST))
if old_rate == rate and old_burst == burst:
# Two possibilities here:
# 1. Floating IP rate limit does not change.
# 2. Floating IP bandwidth does not limit.
return
tc_wrapper = self._get_tc_wrapper(device)
if rate == FIP_DEFAULT_RATE and burst == FIP_DEFAULT_BURST:
# According to the agreements of default value definition,
# floating IP bandwidth was changed to default value (no limit).
# NOTE: l3_tc_lib will ignore exception FilterIDForIPNotFound.
tc_wrapper.clear_ip_rate_limit(direction, ip)
rate_limits.pop(ip, None)
return
# Finally just set it, l3_tc_lib will clean the old rules if exists.
tc_wrapper.set_ip_rate_limit(direction, ip, rate, burst)
rate_limits[ip] = (rate, burst)
def _get_rate_limit_ip_device(self, router_info):
ex_gw_port = router_info.get_ex_gw_port()
if not ex_gw_port:
return
agent_mode = router_info.agent_conf.agent_mode
is_distributed_router = router_info.router.get('distributed')
if is_distributed_router and agent_mode == (
constants.L3_AGENT_MODE_DVR_SNAT):
# DVR edge (or DVR edge ha) router
if not router_info._is_this_snat_host():
return
name = router_info.get_snat_external_device_interface_name(
ex_gw_port)
else:
# DVR local router
# Legacy/HA router
name = router_info.get_external_device_interface_name(ex_gw_port)
if not name:
# DVR local router in dvr_no_external agent mode may not have
# such rfp-device.
return
namespace = router_info.get_gw_ns_name()
return ip_lib.IPDevice(name, namespace=namespace)
def _remove_ip_rate_limit_cache(self, ip, direction):
rate_limits_direction = direction + "_ratelimits"
rate_limits = getattr(self.fip_qos_map, rate_limits_direction, {})
rate_limits.pop(ip, None)
def _remove_fip_rate_limit(self, device, fip_ip):
tc_wrapper = self._get_tc_wrapper(device)
for direction in constants.VALID_DIRECTIONS:
if device.exists():
tc_wrapper.clear_ip_rate_limit(direction, fip_ip)
self._remove_ip_rate_limit_cache(fip_ip, direction)
def get_fip_qos_rates(self, context, fip, policy_id):
if policy_id is None:
self._process_reset_fip(fip)
# process_ip_rate_limit will treat value 0 as
# cleaning the tc filters if exits or no action.
return {constants.INGRESS_DIRECTION: {"rate": FIP_DEFAULT_RATE,
"burst": FIP_DEFAULT_BURST},
constants.EGRESS_DIRECTION: {"rate": FIP_DEFAULT_RATE,
"burst": FIP_DEFAULT_BURST}}
policy = self.resource_rpc.pull(
context, resources.QOS_POLICY, policy_id)
self.fip_qos_map.set_fip_policy(fip, policy)
return self.get_policy_rates(policy)
def get_policy_rates(self, policy):
rates = {}
for rule in policy.rules:
# NOTE(liuyulong): for now, the L3 agent floating IP QoS
# extension only uses ``bandwidth_limit`` rules..
if rule.rule_type in SUPPORTED_RULES:
if rule.direction not in rates:
rates[rule.direction] = {"rate": rule.max_kbps,
"burst": rule.max_burst_kbps}
# The return rates dict must contain all directions. If there is no
# one specific direction QoS rule, use the default values.
for direction in constants.VALID_DIRECTIONS:
if direction not in rates:
LOG.debug("Policy %(id)s does not have '%(direction)s' "
"bandwidth_limit rule, use default value instead.",
{"id": policy.id,
"direction": direction})
rates[direction] = {"rate": FIP_DEFAULT_RATE,
"burst": FIP_DEFAULT_BURST}
return rates
def process_ip_rates(self, fip, device, rates):
for direction in constants.VALID_DIRECTIONS:
rate = rates.get(direction)
self.process_ip_rate_limit(
fip, direction, device,
rate['rate'], rate['burst'])
def process_floating_ip_addresses(self, context, router_info):
# Loop all the router floating ips, the corresponding floating IP tc
# rules will be configured:
# 1. for legacy and HA router, it will be all floating IPs to qg-device
# of qrouter-namespace in (all ha router hosted) network node.
# 2. for dvr router, we can do this simple. No matter the agent
# type is dvr or dvr_snat, we can just set all the
# floating IP tc rules to the corresponding device:
# 2.1 for dvr local router in compute node:
# the namespace is qrouter-x, and the device is rfp-device.
# 2.2 for dvr edge (ha) router in network node:
# the namespace is snat-x, and the device is qg-device.
# 3. for dvr local router, if agent_mod is dvr_no_external, no
# floating IP rules will be configured.
is_distributed_router = router_info.router.get('distributed')
agent_mode = router_info.agent_conf.agent_mode
if is_distributed_router and agent_mode == (
n_const.L3_AGENT_MODE_DVR_NO_EXTERNAL):
# condition 3: dvr local router and dvr_no_external agent
return
device = self._get_rate_limit_ip_device(router_info)
if not device:
return
floating_ips = router_info.get_floating_ips()
current_fips = self.fip_qos_map.router_floating_ips.get(
router_info.router_id, set())
new_fips = set()
for fip in floating_ips:
fip_addr = fip['floating_ip_address']
new_fips.add(fip_addr)
rates = self.get_fip_qos_rates(context,
fip_addr,
fip.get(qos_consts.QOS_POLICY_ID))
self.process_ip_rates(fip_addr, device, rates)
self.fip_qos_map.router_floating_ips[router_info.router_id] = new_fips
fips_removed = current_fips - new_fips
for fip in fips_removed:
self._remove_fip_rate_limit(device, fip)
self._process_reset_fip(fip)
def _get_router_info(self, router_id):
router_info = self.agent_api.get_router_info(router_id)
if router_info:
return router_info
LOG.debug("Router %s is not managed by this agent. "
"It was possibly deleted concurrently.",
router_id)
@lockutils.synchronized('qos-fip')
def add_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:
self.process_floating_ip_addresses(context, router_info)
@lockutils.synchronized('qos-fip')
def update_router(self, context, data):
router_info = self._get_router_info(data['id'])
if router_info:
self.process_floating_ip_addresses(context, router_info)
def delete_router(self, context, data):
# NOTE(liuyulong): to delete the router, you need to disassociate the
# floating IP first, so the update_router has done the cache clean.
pass
def ha_state_change(self, context, data):
pass

View File

@ -16,6 +16,7 @@ import copy
import netaddr
from neutron_lib import constants as lib_constants
from neutron_lib.services.qos import constants as qos_consts
from oslo_utils import uuidutils
from six import moves
@ -110,12 +111,15 @@ def prepare_router_data(ip_version=4, enable_snat=None, num_internal_ports=1,
'gw_port': ex_gw_port}
if enable_floating_ip:
router[lib_constants.FLOATINGIP_KEY] = [{
'id': _uuid(),
'port_id': _uuid(),
'status': 'DOWN',
'floating_ip_address': '19.4.4.2',
'fixed_ip_address': '10.0.0.1'}]
fip = {'id': _uuid(),
'port_id': _uuid(),
'status': 'DOWN',
'floating_ip_address': '19.4.4.2',
'fixed_ip_address': '10.0.0.1'}
qos_policy_id = kwargs.get(qos_consts.QOS_POLICY_ID)
if qos_policy_id:
fip[qos_consts.QOS_POLICY_ID] = qos_policy_id
router[lib_constants.FLOATINGIP_KEY] = [fip]
router_append_interface(router, count=num_internal_ports,
ip_version=ip_version, dual_stack=dual_stack)

View File

@ -0,0 +1,251 @@
# Copyright 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.agent.l3 import agent as neutron_l3_agent
from neutron.agent.l3.extensions import fip_qos
from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.common import utils as common_utils
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.tests.functional.agent.l3 import framework
from neutron.tests.functional.agent.l3 import test_dvr_router
_uuid = uuidutils.generate_uuid
TEST_POLICY_ID1 = _uuid()
TEST_POLICY_ID2 = _uuid()
TEST_POLICY_ID3 = _uuid()
class L3AgentFipQoSExtensionTestFramework(framework.L3AgentTestFramework):
test_bw_limit_rule_1 = rule.QosBandwidthLimitRule(
context=None,
qos_policy_id=TEST_POLICY_ID1,
id=_uuid(),
max_kbps=111,
max_burst_kbps=222,
direction=constants.INGRESS_DIRECTION)
test_bw_limit_rule_2 = rule.QosBandwidthLimitRule(
context=None,
qos_policy_id=TEST_POLICY_ID1,
id=_uuid(),
max_kbps=333,
max_burst_kbps=444,
direction=constants.EGRESS_DIRECTION)
test_bw_limit_rule_3 = rule.QosBandwidthLimitRule(
context=None,
qos_policy_id=TEST_POLICY_ID2,
id=_uuid(),
max_kbps=555,
max_burst_kbps=666,
direction=constants.INGRESS_DIRECTION)
test_bw_limit_rule_4 = rule.QosBandwidthLimitRule(
context=None,
qos_policy_id=TEST_POLICY_ID3,
id=_uuid(),
max_kbps=777,
max_burst_kbps=888,
direction=constants.EGRESS_DIRECTION)
def setUp(self):
super(L3AgentFipQoSExtensionTestFramework, self).setUp()
self.conf.set_override('extensions', ['fip_qos'], 'agent')
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)
self._set_pull_mock()
self.set_test_qos_rules(TEST_POLICY_ID1,
[self.test_bw_limit_rule_1,
self.test_bw_limit_rule_2])
self.set_test_qos_rules(TEST_POLICY_ID2,
[self.test_bw_limit_rule_3])
self.set_test_qos_rules(TEST_POLICY_ID3,
[self.test_bw_limit_rule_4])
self.fip_qos_ext = fip_qos.FipQosAgentExtension()
def _set_pull_mock(self):
self.qos_policies = {}
def _pull_mock(context, resource_type, resource_id):
return self.qos_policies[resource_id]
self.pull = mock.patch(
'neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPullRpcApi.pull').start()
self.pull.side_effect = _pull_mock
def set_test_qos_rules(self, policy_id, policy_rules):
"""This function sets the policy test rules to be exposed."""
qos_policy = policy.QosPolicy(
context=None,
project_id=_uuid(),
id=policy_id,
name="Test Policy Name",
description="This is a policy for testing purposes",
shared=False,
rules=policy_rules)
qos_policy.obj_reset_changes()
self.qos_policies[policy_id] = qos_policy
def _assert_bandwidth_limit_rule_is_set(self, router, ip, rule):
device = self.fip_qos_ext._get_rate_limit_ip_device(router)
tc_wrapper = self.fip_qos_ext._get_tc_wrapper(device)
def get_filter_id():
try:
return tc_wrapper.get_filter_id_for_ip(rule.direction, ip)
except exceptions.FilterIDForIPNotFound:
pass
common_utils.wait_until_true(get_filter_id)
def _assert_bandwidth_limit_rule_not_set(self, router, ip, rule,
dvr_no_external=False):
device = self.fip_qos_ext._get_rate_limit_ip_device(router)
if dvr_no_external:
self.assertIsNone(device)
else:
tc_wrapper = self.fip_qos_ext._get_tc_wrapper(device)
filter_id = tc_wrapper.get_filter_id_for_ip(rule.direction, ip)
self.assertIsNone(filter_id)
class TestL3AgentFipQosExtension(L3AgentFipQoSExtensionTestFramework):
def _test_centralized_routers(self, enable_ha=False,
ingress=True, egress=True):
qos_policy_id = TEST_POLICY_ID1
if ingress and not egress:
qos_policy_id = TEST_POLICY_ID2
elif egress and not ingress:
qos_policy_id = TEST_POLICY_ID3
router_info = self.generate_router_info(
enable_ha=enable_ha,
qos_policy_id=qos_policy_id)
ri = self.manage_router(self.agent, router_info)
if qos_policy_id == TEST_POLICY_ID1:
self._assert_bandwidth_limit_rule_is_set(
ri, '19.4.4.2', self.test_bw_limit_rule_1)
self._assert_bandwidth_limit_rule_is_set(
ri, '19.4.4.2', self.test_bw_limit_rule_2)
elif qos_policy_id == TEST_POLICY_ID2:
self._assert_bandwidth_limit_rule_is_set(
ri, '19.4.4.2', self.test_bw_limit_rule_3)
elif qos_policy_id == TEST_POLICY_ID3:
self._assert_bandwidth_limit_rule_is_set(
ri, '19.4.4.2', self.test_bw_limit_rule_4)
def test_legacy_router_fip_qos(self):
self._test_centralized_routers()
def test_legacy_router_fip_qos_ingress(self):
self._test_centralized_routers(ingress=True, egress=False)
def test_legacy_router_fip_qos_egress(self):
self._test_centralized_routers(ingress=False, egress=True)
def test_ha_router_fip_qos(self):
self._test_centralized_routers(enable_ha=True)
def test_ha_router_fip_qos_ingress(self):
self._test_centralized_routers(enable_ha=True,
ingress=True, egress=False)
def test_ha_router_fip_qos_egress(self):
self._test_centralized_routers(enable_ha=True,
ingress=False, egress=True)
class TestL3AgentFipQosExtensionDVR(
test_dvr_router.TestDvrRouter,
L3AgentFipQoSExtensionTestFramework):
def test_dvr_local_router_no_fip(self):
self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR
router_info = self.generate_dvr_router_info(
enable_floating_ip=False)
ri = self.manage_router(self.agent, router_info)
self._assert_bandwidth_limit_rule_not_set(
ri, '19.4.4.2', self.test_bw_limit_rule_1)
self._assert_bandwidth_limit_rule_not_set(
ri, '19.4.4.2', self.test_bw_limit_rule_2)
def _test_dvr_fip_qos(self, enable_ha=False):
self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR
router_info = self.generate_dvr_router_info(
enable_ha=enable_ha,
enable_gw=True, qos_policy_id=TEST_POLICY_ID1)
ri = self.manage_router(self.agent, router_info)
self._assert_bandwidth_limit_rule_is_set(
ri, '19.4.4.2', self.test_bw_limit_rule_1)
self._assert_bandwidth_limit_rule_is_set(
ri, '19.4.4.2', self.test_bw_limit_rule_2)
def test_dvr_local_router_fip_qos(self):
self._test_dvr_fip_qos()
def test_ha_dvr_local_router_fip_qos(self):
self._test_dvr_fip_qos(enable_ha=True)
def _test_agent_mode_dvr_no_external(self, enable_ha=False):
self.agent.conf.agent_mode = n_const.L3_AGENT_MODE_DVR_NO_EXTERNAL
router_info = self.generate_dvr_router_info(
enable_ha=enable_ha,
enable_floating_ip=True, enable_centralized_fip=True,
enable_snat=True, snat_bound_fip=True,
qos_policy_id=TEST_POLICY_ID1)
ri = self.manage_router(self.agent, router_info)
self._assert_bandwidth_limit_rule_not_set(
ri, '19.4.4.2', self.test_bw_limit_rule_1,
dvr_no_external=True)
self._assert_bandwidth_limit_rule_not_set(
ri, '19.4.4.2', self.test_bw_limit_rule_2,
dvr_no_external=True)
def test_dvr_no_external_no_qos(self):
self._test_agent_mode_dvr_no_external()
def test_ha_dvr_no_external_no_qos(self):
self._test_agent_mode_dvr_no_external(enable_ha=True)
def _test_dvr_fip_snat_bound_agent_mode_dvr_snat(self, enable_ha=False):
self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT
router_info = self.generate_dvr_router_info(
enable_ha=enable_ha,
snat_bound_fip=True,
enable_gw=True,
qos_policy_id=TEST_POLICY_ID1)
ri = self.manage_router(self.agent, router_info)
self._assert_bandwidth_limit_rule_is_set(
ri, '19.4.4.2', self.test_bw_limit_rule_1)
self._assert_bandwidth_limit_rule_is_set(
ri, '19.4.4.2', self.test_bw_limit_rule_2)
def test_dvr_dvr_fip_snat_qos(self):
self._test_dvr_fip_snat_bound_agent_mode_dvr_snat()
def test_ha_dvr_dvr_fip_snat_qos(self):
self._test_dvr_fip_snat_bound_agent_mode_dvr_snat(enable_ha=True)
class LinuxBridgeL3AgentFipQosExtensionTestCase(TestL3AgentFipQosExtension):
INTERFACE_DRIVER = 'neutron.agent.linux.interface.BridgeInterfaceDriver'

View File

@ -103,7 +103,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
def generate_router_info(self, enable_ha, ip_version=4, extra_routes=True,
enable_fip=True, enable_snat=True,
num_internal_ports=1,
dual_stack=False, v6_ext_gw_with_sub=True):
dual_stack=False, v6_ext_gw_with_sub=True,
qos_policy_id=None):
if ip_version == 6 and not dual_stack:
enable_snat = False
enable_fip = False
@ -118,7 +119,8 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
extra_routes=extra_routes,
dual_stack=dual_stack,
v6_ext_gw_with_sub=(
v6_ext_gw_with_sub))
v6_ext_gw_with_sub),
qos_policy_id=qos_policy_id)
def _test_conntrack_disassociate_fip(self, ha):
'''Test that conntrack immediately drops stateful connection

View File

@ -0,0 +1,409 @@
# Copyright 2017 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import constants as lib_const
from neutron_lib import context
from neutron_lib.services.qos import constants as qos_consts
from oslo_utils import uuidutils
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3.extensions import fip_qos
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.l3 import router_info as l3router
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.objects.qos import policy
from neutron.objects.qos import rule
from neutron.tests import base
from neutron.tests.unit.agent.l3 import test_agent
_uuid = uuidutils.generate_uuid
TEST_POLICY = policy.QosPolicy(context=None,
name='test1', id=_uuid())
TEST_POLICY2 = policy.QosPolicy(context=None,
name='test2', id=_uuid())
TEST_QOS_FIP = "3.3.3.3"
TEST_FIP = "1.1.1.1"
TEST_FIP2 = "2.2.2.2"
HOSTNAME = 'myhost'
class QosExtensionBaseTestCase(test_agent.BasicRouterOperationsFramework):
def setUp(self):
super(QosExtensionBaseTestCase, self).setUp()
self.fip_qos_ext = fip_qos.FipQosAgentExtension()
self.context = context.get_admin_context()
self.connection = mock.Mock()
self.policy = policy.QosPolicy(context=None,
name='test1', id=_uuid())
self.ingress_rule = (
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
qos_policy_id=self.policy.id,
max_kbps=1111,
max_burst_kbps=2222,
direction=lib_const.INGRESS_DIRECTION))
self.egress_rule = (
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
qos_policy_id=self.policy.id,
max_kbps=3333,
max_burst_kbps=4444,
direction=lib_const.EGRESS_DIRECTION))
self.policy.rules = [self.ingress_rule, self.egress_rule]
self.new_ingress_rule = (
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
qos_policy_id=self.policy.id,
max_kbps=5555,
max_burst_kbps=6666,
direction=lib_const.INGRESS_DIRECTION))
self.ingress_rule_only_has_max_kbps = (
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
qos_policy_id=self.policy.id,
max_kbps=5555,
max_burst_kbps=0,
direction=lib_const.INGRESS_DIRECTION))
self.policy2 = policy.QosPolicy(context=None,
name='test2', id=_uuid())
self.policy2.rules = [self.ingress_rule]
self.policy3 = policy.QosPolicy(context=None,
name='test3', id=_uuid())
self.policy3.rules = [self.egress_rule]
self.policy4 = policy.QosPolicy(context=None,
name='test4', id=_uuid())
self.dscp = rule.QosDscpMarkingRule(context=None, id=_uuid(),
qos_policy_id=self.policy4.id,
dscp_mark=32)
self.dscp.obj_reset_changes()
self.policy4.rules = [self.dscp]
self.qos_policies = {self.policy.id: self.policy,
self.policy2.id: self.policy2,
self.policy3.id: self.policy3,
self.policy4.id: self.policy4}
self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self.ex_gw_port = {'id': _uuid()}
self.fip = {'id': _uuid(),
'floating_ip_address': TEST_QOS_FIP,
'fixed_ip_address': '192.168.0.1',
'floating_network_id': _uuid(),
'port_id': _uuid(),
'host': HOSTNAME,
'qos_policy_id': self.policy.id}
self.router = {'id': _uuid(),
'gw_port': self.ex_gw_port,
'ha': False,
'distributed': False,
lib_const.FLOATINGIP_KEY: [self.fip]}
self.router_info = l3router.RouterInfo(self.agent, _uuid(),
self.router, **self.ri_kwargs)
self.router_info.ex_gw_port = self.ex_gw_port
self.agent.router_info[self.router['id']] = self.router_info
def _mock_get_router_info(router_id):
return self.router_info
self.get_router_info = mock.patch(
'neutron.agent.l3.l3_agent_extension_api.'
'L3AgentExtensionAPI.get_router_info').start()
self.get_router_info.side_effect = _mock_get_router_info
self.agent_api = l3_ext_api.L3AgentExtensionAPI(None)
self.fip_qos_ext.consume_api(self.agent_api)
class FipQosExtensionInitializeTestCase(QosExtensionBaseTestCase):
@mock.patch.object(registry, 'register')
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
call_to_patch = 'neutron.common.rpc.create_connection'
with mock.patch(call_to_patch,
return_value=self.connection) as create_connection:
self.fip_qos_ext.initialize(
self.connection, lib_const.L3_AGENT_MODE)
create_connection.assert_has_calls([mock.call()])
self.connection.create_consumer.assert_has_calls(
[mock.call(
resources_rpc.resource_type_versioned_topic(
resources.QOS_POLICY),
[rpc_mock()],
fanout=True)]
)
subscribe_mock.assert_called_with(mock.ANY, resources.QOS_POLICY)
class FipQosExtensionTestCase(QosExtensionBaseTestCase):
def setUp(self):
super(FipQosExtensionTestCase, self).setUp()
self.fip_qos_ext.initialize(
self.connection, lib_const.L3_AGENT_MODE)
self._set_pull_mock()
def _set_pull_mock(self):
def _pull_mock(context, resource_type, resource_id):
return self.qos_policies[resource_id]
self.pull = mock.patch(
'neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPullRpcApi.pull').start()
self.pull.side_effect = _pull_mock
def _test_new_fip_add(self, func):
tc_wrapper = mock.Mock()
with mock.patch.object(self.fip_qos_ext, '_get_tc_wrapper',
return_value=tc_wrapper):
func(self.context, self.router)
tc_wrapper.set_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 1111, 2222),
mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP, 3333, 4444)],
any_order=True)
def test_add_router(self):
self._test_new_fip_add(self.fip_qos_ext.add_router)
def test_update_router(self):
self._test_new_fip_add(self.fip_qos_ext.update_router)
def test_update_router_fip_policy_changed(self):
tc_wrapper = mock.Mock()
with mock.patch.object(self.fip_qos_ext, '_get_tc_wrapper',
return_value=tc_wrapper):
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.set_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 1111, 2222),
mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP, 3333, 4444)],
any_order=True)
# the policy of floating IP has been changed to
# which only has one egress rule
self.fip[qos_consts.QOS_POLICY_ID] = self.policy3.id
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.clear_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP)])
def test_update_router_fip_policy_changed_to_none(self):
tc_wrapper = mock.Mock()
with mock.patch.object(self.fip_qos_ext, '_get_tc_wrapper',
return_value=tc_wrapper):
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.set_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 1111, 2222),
mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP, 3333, 4444)],
any_order=True)
# floating IP remove the qos_policy bonding
self.fip[qos_consts.QOS_POLICY_ID] = None
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.clear_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP),
mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP)],
any_order=True)
def test__process_update_policy(self):
tc_wrapper = mock.Mock()
with mock.patch.object(self.fip_qos_ext, '_get_tc_wrapper',
return_value=tc_wrapper):
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.set_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 1111, 2222),
mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP, 3333, 4444)],
any_order=True)
# the rules of floating IP policy has been changed
self.fip_qos_ext._policy_rules_modified = mock.Mock(
return_value=True)
self.policy.rules = [self.new_ingress_rule, self.egress_rule]
self.fip_qos_ext._process_update_policy(self.policy)
tc_wrapper.set_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 5555, 6666)])
def _test_qos_policy_scenarios(self, fip_removed=True,
qos_rules_removed=False):
tc_wrapper = mock.Mock()
with mock.patch.object(self.fip_qos_ext, '_get_tc_wrapper',
return_value=tc_wrapper):
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.set_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 1111, 2222),
mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP, 3333, 4444)],
any_order=True)
if fip_removed:
# floating IP dissociated, then it does not belong to
# this router
self.router[lib_const.FLOATINGIP_KEY] = []
if qos_rules_removed:
self.policy.rules = []
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.clear_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP),
mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP)],
any_order=True)
def test_update_router_fip_removed(self):
self._test_qos_policy_scenarios()
def test_fip_qos_changed_to_none(self):
self._test_qos_policy_scenarios(qos_rules_removed=True)
def _test_only_one_direction_rule(self, func, policy, direction):
tc_wrapper = mock.Mock()
with mock.patch.object(
self.fip_qos_ext.resource_rpc, 'pull',
return_value=policy):
with mock.patch.object(self.fip_qos_ext, '_get_tc_wrapper',
return_value=tc_wrapper):
func(self.context, self.router)
if direction == lib_const.INGRESS_DIRECTION:
calls = [mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 1111, 2222)]
else:
calls = [mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP, 3333, 4444)]
tc_wrapper.set_ip_rate_limit.assert_has_calls(calls)
def test_add_router_only_ingress(self):
self._test_only_one_direction_rule(self.fip_qos_ext.add_router,
self.policy2,
lib_const.INGRESS_DIRECTION)
def test_add_router_only_egress(self):
self._test_only_one_direction_rule(self.fip_qos_ext.add_router,
self.policy3,
lib_const.EGRESS_DIRECTION)
def test_update_router_only_ingress(self):
self._test_only_one_direction_rule(self.fip_qos_ext.add_router,
self.policy2,
lib_const.INGRESS_DIRECTION)
def test_update_router_only_egress(self):
self._test_only_one_direction_rule(self.fip_qos_ext.add_router,
self.policy3,
lib_const.EGRESS_DIRECTION)
def test_rule_only_has_max_kbps(self):
tc_wrapper = mock.Mock()
with mock.patch.object(self.fip_qos_ext, '_get_tc_wrapper',
return_value=tc_wrapper):
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.set_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 1111, 2222),
mock.call(lib_const.EGRESS_DIRECTION,
TEST_QOS_FIP, 3333, 4444)],
any_order=True)
# policy ingress rule changed to only has one max_kbps value
self.policy.rules = [self.ingress_rule_only_has_max_kbps,
self.egress_rule]
self.fip_qos_ext.update_router(self.context, self.router)
tc_wrapper.set_ip_rate_limit.assert_has_calls(
[mock.call(lib_const.INGRESS_DIRECTION,
TEST_QOS_FIP, 5555, 0)])
def test_qos_policy_has_no_bandwidth_limit_rule(self):
tc_wrapper = mock.Mock()
with mock.patch.object(self.fip_qos_ext, '_get_tc_wrapper',
return_value=tc_wrapper):
self.fip['qos_policy_id'] = self.policy4.id
self.fip_qos_ext.add_router(self.context, self.router)
tc_wrapper.set_ip_rate_limit.assert_not_called()
class RouterFipRateLimitMapsTestCase(base.BaseTestCase):
def setUp(self):
super(RouterFipRateLimitMapsTestCase, self).setUp()
self.policy_map = fip_qos.RouterFipRateLimitMaps()
def test_update_policy(self):
self.policy_map.update_policy(TEST_POLICY)
self.assertEqual(TEST_POLICY,
self.policy_map.known_policies[TEST_POLICY.id])
def _set_fips(self):
self.policy_map.set_fip_policy(TEST_FIP, TEST_POLICY)
self.policy_map.set_fip_policy(TEST_FIP2, TEST_POLICY2)
def test_set_fip_policy(self):
self._set_fips()
self.assertEqual(TEST_POLICY,
self.policy_map.known_policies[TEST_POLICY.id])
self.assertIn(TEST_FIP,
self.policy_map.qos_policy_fips[TEST_POLICY.id])
def test_get_fip_policy(self):
self._set_fips()
self.assertEqual(TEST_POLICY,
self.policy_map.get_fip_policy(TEST_FIP))
self.assertEqual(TEST_POLICY2,
self.policy_map.get_fip_policy(TEST_FIP2))
def test_get_fips(self):
self._set_fips()
self.assertEqual([TEST_FIP],
list(self.policy_map.get_fips(TEST_POLICY)))
self.assertEqual([TEST_FIP2],
list(self.policy_map.get_fips(TEST_POLICY2)))
def test_clean_by_fip(self):
self._set_fips()
self.policy_map.clean_by_fip(TEST_FIP)
self.assertNotIn(TEST_POLICY.id, self.policy_map.known_policies)
self.assertNotIn(TEST_FIP, self.policy_map.fip_policies)
self.assertIn(TEST_POLICY2.id, self.policy_map.known_policies)
def test_clean_by_fip_for_unknown_fip(self):
self.policy_map._clean_policy_info = mock.Mock()
self.policy_map.clean_by_fip(TEST_FIP)
self.policy_map._clean_policy_info.assert_not_called()
def test_find_fip_router_id(self):
router_id = _uuid()
self.policy_map.router_floating_ips[router_id] = set([TEST_FIP,
TEST_FIP2])
self.assertIsNone(self.policy_map.find_fip_router_id("8.8.8.8"))
self.assertEqual(router_id,
self.policy_map.find_fip_router_id(TEST_FIP))

View File

@ -115,6 +115,8 @@ neutron.agent.l2.extensions =
qos = neutron.agent.l2.extensions.qos:QosAgentExtension
fdb = neutron.agent.l2.extensions.fdb_population:FdbPopulationAgentExtension
log = neutron.services.logapi.agent.log_extension:LoggingExtension
neutron.agent.l3.extensions =
fip_qos = neutron.agent.l3.extensions.fip_qos:FipQosAgentExtension
neutron.qos.agent_drivers =
ovs = neutron.plugins.ml2.drivers.openvswitch.agent.extension_drivers.qos_driver:QosOVSAgentDriver
sriov = neutron.plugins.ml2.drivers.mech_sriov.agent.extension_drivers.qos_driver:QosSRIOVAgentDriver