Merge "[L3][QoS] Agent side router gateway IP rate limit"
This commit is contained in:
commit
36b27b16f2
|
@ -22,3 +22,7 @@ function configure_qos {
|
|||
function configure_l3_agent_extension_fip_qos {
|
||||
plugin_agent_add_l3_agent_extension "fip_qos"
|
||||
}
|
||||
|
||||
function configure_l3_agent_extension_gateway_ip_qos {
|
||||
plugin_agent_add_l3_agent_extension "gateway_ip_qos"
|
||||
}
|
||||
|
|
|
@ -72,6 +72,7 @@ if [[ "$1" == "stack" ]]; then
|
|||
if is_service_enabled q-l3 neutron-l3; then
|
||||
if is_service_enabled q-qos neutron-qos; then
|
||||
configure_l3_agent_extension_fip_qos
|
||||
configure_l3_agent_extension_gateway_ip_qos
|
||||
fi
|
||||
if is_service_enabled q-port-forwarding neutron-port-forwarding; then
|
||||
configure_port_forwarding
|
||||
|
|
|
@ -0,0 +1,191 @@
|
|||
# Copyright 2018 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import netaddr
|
||||
|
||||
from neutron_lib.agent import l3_extension
|
||||
from neutron_lib import constants
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_log import log as logging
|
||||
|
||||
|
||||
from neutron.agent.l3.extensions.qos import base as qos_base
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.api.rpc.callbacks import events
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RouterGatewayIPQosAgentExtension(qos_base.L3QosAgentExtensionBase,
|
||||
l3_extension.L3AgentExtension):
|
||||
|
||||
def initialize(self, connection, driver_type):
|
||||
"""Initialize agent extension."""
|
||||
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
|
||||
self._register_rpc_consumers()
|
||||
self.gateway_ip_qos_map = qos_base.RateLimitMaps()
|
||||
|
||||
@lockutils.synchronized('qos-gateway-ip')
|
||||
def _handle_notification(self, context, resource_type,
|
||||
qos_policies, event_type):
|
||||
if event_type == events.UPDATED:
|
||||
for qos_policy in qos_policies:
|
||||
self._process_update_policy(qos_policy)
|
||||
|
||||
def _process_router_gateway_after_policy_update(
|
||||
self, router_id, qos_policy):
|
||||
router_info = self._get_router_info(router_id)
|
||||
if not router_info:
|
||||
return
|
||||
ex_gw_port = router_info.get_ex_gw_port()
|
||||
if not ex_gw_port:
|
||||
return
|
||||
interface_name = router_info.get_external_device_name(
|
||||
ex_gw_port['id'])
|
||||
device = self._get_gateway_tc_rule_device(
|
||||
router_info, interface_name)
|
||||
if not device.exists():
|
||||
return
|
||||
tc_wrapper = self._get_tc_wrapper(device)
|
||||
# Clear all old gateway IP tc rules first.
|
||||
self._empty_router_gateway_rate_limits(router_info, tc_wrapper)
|
||||
rates = self.get_policy_rates(qos_policy)
|
||||
self.gateway_ip_qos_map.set_resource_policy(
|
||||
router_info.router_id, qos_policy)
|
||||
self._set_gateway_tc_rules(
|
||||
router_info, tc_wrapper,
|
||||
ex_gw_port, rates)
|
||||
|
||||
def _process_update_policy(self, qos_policy):
|
||||
old_qos_policy = self.gateway_ip_qos_map.get_policy(qos_policy.id)
|
||||
if old_qos_policy:
|
||||
if self._policy_rules_modified(old_qos_policy, qos_policy):
|
||||
router_ids = self.gateway_ip_qos_map.get_resources(
|
||||
qos_policy)
|
||||
for router_id in list(router_ids):
|
||||
self._process_router_gateway_after_policy_update(
|
||||
router_id, qos_policy)
|
||||
self.gateway_ip_qos_map.update_policy(qos_policy)
|
||||
|
||||
@lockutils.synchronized('qos-gateway-ip')
|
||||
def add_router(self, context, data):
|
||||
router_info = self._get_router_info(data['id'])
|
||||
if router_info:
|
||||
self.process_gateway_rate_limit(context, router_info)
|
||||
|
||||
@lockutils.synchronized('qos-gateway-ip')
|
||||
def update_router(self, context, data):
|
||||
router_info = self._get_router_info(data['id'])
|
||||
if router_info:
|
||||
self.process_gateway_rate_limit(context, router_info)
|
||||
|
||||
def delete_router(self, context, data):
|
||||
# Remove the router and policy map in case the router deletion with
|
||||
# gateway.
|
||||
self.gateway_ip_qos_map.clean_by_resource(data['id'])
|
||||
|
||||
def ha_state_change(self, context, data):
|
||||
pass
|
||||
|
||||
def process_gateway_rate_limit(self, context, router_info):
|
||||
is_distributed_router = router_info.router.get('distributed')
|
||||
agent_mode = router_info.agent_conf.agent_mode
|
||||
LOG.debug("Start processing gateway IP QoS for "
|
||||
"router %(router_id)s, router "
|
||||
"distributed: %(distributed)s, "
|
||||
"agent mode: %(agent_mode)s",
|
||||
{"router_id": router_info.router_id,
|
||||
"distributed": is_distributed_router,
|
||||
"agent_mode": agent_mode})
|
||||
if is_distributed_router and agent_mode in (
|
||||
constants.L3_AGENT_MODE_DVR,
|
||||
constants.L3_AGENT_MODE_DVR_NO_EXTERNAL):
|
||||
# Dvr local router and dvr_no_external agent do not process
|
||||
# gateway IPs.
|
||||
return
|
||||
|
||||
self._handle_router_gateway_rate_limit(context, router_info)
|
||||
|
||||
def _empty_router_gateway_rate_limits(self, router_info, tc_wrapper):
|
||||
self.gateway_ip_qos_map.clean_by_resource(router_info.router_id)
|
||||
for ip in router_info.qos_gateway_ips:
|
||||
for direction in constants.VALID_DIRECTIONS:
|
||||
tc_wrapper.clear_ip_rate_limit(direction, ip)
|
||||
router_info.qos_gateway_ips.clear()
|
||||
|
||||
def _handle_router_gateway_rate_limit(self, context, router_info):
|
||||
ex_gw_port = router_info.get_ex_gw_port()
|
||||
if not ex_gw_port:
|
||||
return
|
||||
|
||||
interface_name = router_info.get_external_device_name(
|
||||
ex_gw_port['id'])
|
||||
device = self._get_gateway_tc_rule_device(router_info, interface_name)
|
||||
if not device.exists():
|
||||
return
|
||||
|
||||
tc_wrapper = self._get_tc_wrapper(device)
|
||||
# Clear all old gateway IP tc rules first.
|
||||
self._empty_router_gateway_rate_limits(router_info, tc_wrapper)
|
||||
|
||||
rates = self._get_rates_by_policy(context, router_info)
|
||||
if not rates:
|
||||
return
|
||||
|
||||
self._set_gateway_tc_rules(router_info, tc_wrapper, ex_gw_port, rates)
|
||||
|
||||
def _get_gateway_tc_rule_device(self, router_info, interface_name):
|
||||
is_distributed_router = router_info.router.get('distributed')
|
||||
agent_mode = router_info.agent_conf.agent_mode
|
||||
namespace = router_info.ns_name
|
||||
if (is_distributed_router and
|
||||
agent_mode == constants.L3_AGENT_MODE_DVR_SNAT):
|
||||
namespace = router_info.snat_namespace.name
|
||||
return ip_lib.IPDevice(interface_name, namespace=namespace)
|
||||
|
||||
def _get_rates_by_policy(self, context, router_info):
|
||||
gateway_info = router_info.router.get('external_gateway_info')
|
||||
if not gateway_info:
|
||||
return
|
||||
|
||||
policy_id = gateway_info.get('qos_policy_id')
|
||||
if not policy_id:
|
||||
return
|
||||
|
||||
policy = self.resource_rpc.pull(
|
||||
context, resources.QOS_POLICY, policy_id)
|
||||
self.gateway_ip_qos_map.set_resource_policy(
|
||||
router_info.router_id, policy)
|
||||
return self.get_policy_rates(policy)
|
||||
|
||||
def _set_gateway_tc_rules(self, router_info, tc_wrapper,
|
||||
ex_gw_port, rates):
|
||||
for ip_addr in ex_gw_port['fixed_ips']:
|
||||
ex_gw_ip = ip_addr['ip_address']
|
||||
ip_ver = netaddr.IPAddress(ex_gw_ip).version
|
||||
if ip_ver == constants.IP_VERSION_4:
|
||||
self._set_gateway_ip_rate_limit(tc_wrapper, ex_gw_ip, rates)
|
||||
router_info.qos_gateway_ips.add(ex_gw_ip)
|
||||
|
||||
def _set_gateway_ip_rate_limit(self, tc_wrapper, ex_gw_ip, rates):
|
||||
for direction in constants.VALID_DIRECTIONS:
|
||||
rate = rates.get(direction)
|
||||
if (rate['rate'] == qos_base.IP_DEFAULT_RATE and
|
||||
rate['burst'] == qos_base.IP_DEFAULT_BURST):
|
||||
continue
|
||||
tc_wrapper.set_ip_rate_limit(direction, ex_gw_ip,
|
||||
rate['rate'], rate['burst'])
|
|
@ -82,6 +82,7 @@ class RouterInfo(object):
|
|||
self.radvd = None
|
||||
self.centralized_port_forwarding_fip_set = set()
|
||||
self.fip_managed_by_port_forwardings = None
|
||||
self.qos_gateway_ips = set()
|
||||
|
||||
def initialize(self, process_monitor):
|
||||
"""Initialize the router on the system.
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
# Copyright 2018 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from neutron_lib import constants
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.agent.l3 import agent as neutron_l3_agent
|
||||
from neutron.agent.l3.extensions.qos import gateway_ip as gateway_ip_qos
|
||||
from neutron.common import exceptions
|
||||
from neutron.common import utils as common_utils
|
||||
from neutron.objects.qos import policy
|
||||
from neutron.objects.qos import rule
|
||||
from neutron.tests.functional.agent.l3 import framework
|
||||
from neutron.tests.functional.agent.l3 import test_dvr_router
|
||||
|
||||
_uuid = uuidutils.generate_uuid
|
||||
INGRESS_EGRESS_POLICY_ID = _uuid()
|
||||
INGRESS_POLICY_ID = _uuid()
|
||||
EGRESS_POLICY_ID = _uuid()
|
||||
|
||||
|
||||
class RouterGatewayIPQosAgentExtensionTestFramework(
|
||||
framework.L3AgentTestFramework):
|
||||
|
||||
test_bw_limit_rule_1 = rule.QosBandwidthLimitRule(
|
||||
context=None,
|
||||
qos_policy_id=INGRESS_EGRESS_POLICY_ID,
|
||||
id=_uuid(),
|
||||
max_kbps=111,
|
||||
max_burst_kbps=222,
|
||||
direction=constants.INGRESS_DIRECTION)
|
||||
test_bw_limit_rule_2 = rule.QosBandwidthLimitRule(
|
||||
context=None,
|
||||
qos_policy_id=INGRESS_EGRESS_POLICY_ID,
|
||||
id=_uuid(),
|
||||
max_kbps=333,
|
||||
max_burst_kbps=444,
|
||||
direction=constants.EGRESS_DIRECTION)
|
||||
test_bw_limit_rule_3 = rule.QosBandwidthLimitRule(
|
||||
context=None,
|
||||
qos_policy_id=INGRESS_POLICY_ID,
|
||||
id=_uuid(),
|
||||
max_kbps=555,
|
||||
max_burst_kbps=666,
|
||||
direction=constants.INGRESS_DIRECTION)
|
||||
test_bw_limit_rule_4 = rule.QosBandwidthLimitRule(
|
||||
context=None,
|
||||
qos_policy_id=EGRESS_POLICY_ID,
|
||||
id=_uuid(),
|
||||
max_kbps=777,
|
||||
max_burst_kbps=888,
|
||||
direction=constants.EGRESS_DIRECTION)
|
||||
|
||||
def setUp(self):
|
||||
super(RouterGatewayIPQosAgentExtensionTestFramework, self).setUp()
|
||||
self.conf.set_override('extensions', ['gateway_ip_qos'], 'agent')
|
||||
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
|
||||
self.conf)
|
||||
self._set_pull_mock()
|
||||
self.set_test_qos_rules(INGRESS_EGRESS_POLICY_ID,
|
||||
[self.test_bw_limit_rule_1,
|
||||
self.test_bw_limit_rule_2])
|
||||
self.set_test_qos_rules(INGRESS_POLICY_ID,
|
||||
[self.test_bw_limit_rule_3])
|
||||
self.set_test_qos_rules(EGRESS_POLICY_ID,
|
||||
[self.test_bw_limit_rule_4])
|
||||
self.gateway_ip_qos_ext = (
|
||||
gateway_ip_qos.RouterGatewayIPQosAgentExtension())
|
||||
|
||||
def _set_pull_mock(self):
|
||||
|
||||
self.qos_policies = {}
|
||||
|
||||
def _pull_mock(context, resource_type, resource_id):
|
||||
return self.qos_policies[resource_id]
|
||||
|
||||
self.pull = mock.patch(
|
||||
'neutron.api.rpc.handlers.resources_rpc.'
|
||||
'ResourcesPullRpcApi.pull').start()
|
||||
self.pull.side_effect = _pull_mock
|
||||
|
||||
def set_test_qos_rules(self, policy_id, policy_rules):
|
||||
"""This function sets the policy test rules to be exposed."""
|
||||
|
||||
qos_policy = policy.QosPolicy(
|
||||
context=None,
|
||||
project_id=_uuid(),
|
||||
id=policy_id,
|
||||
name="Test Policy Name",
|
||||
description="This is a policy for testing purposes",
|
||||
shared=False,
|
||||
rules=policy_rules)
|
||||
|
||||
qos_policy.obj_reset_changes()
|
||||
self.qos_policies[policy_id] = qos_policy
|
||||
|
||||
def _assert_bandwidth_limit_rule_is_set(self, router, ip, rule):
|
||||
ex_gw_port = router.get_ex_gw_port()
|
||||
interface_name = router.get_external_device_name(ex_gw_port['id'])
|
||||
device = self.gateway_ip_qos_ext._get_gateway_tc_rule_device(
|
||||
router, interface_name)
|
||||
tc_wrapper = self.gateway_ip_qos_ext._get_tc_wrapper(device)
|
||||
|
||||
def get_filter_id():
|
||||
try:
|
||||
return tc_wrapper.get_filter_id_for_ip(rule.direction, ip)
|
||||
except exceptions.FilterIDForIPNotFound:
|
||||
pass
|
||||
|
||||
common_utils.wait_until_true(get_filter_id)
|
||||
|
||||
|
||||
class TestRouterGatewayIPQosAgentExtension(
|
||||
RouterGatewayIPQosAgentExtensionTestFramework):
|
||||
|
||||
def _test_centralized_routers(self, enable_ha=False,
|
||||
ingress=True, egress=True):
|
||||
qos_policy_id = INGRESS_EGRESS_POLICY_ID
|
||||
if ingress and not egress:
|
||||
qos_policy_id = INGRESS_POLICY_ID
|
||||
elif egress and not ingress:
|
||||
qos_policy_id = EGRESS_POLICY_ID
|
||||
router_info = self.generate_router_info(
|
||||
enable_ha=enable_ha,
|
||||
qos_policy_id=qos_policy_id)
|
||||
ri = self.manage_router(self.agent, router_info)
|
||||
if qos_policy_id == INGRESS_EGRESS_POLICY_ID:
|
||||
self._assert_bandwidth_limit_rule_is_set(
|
||||
ri, '19.4.4.4', self.test_bw_limit_rule_1)
|
||||
self._assert_bandwidth_limit_rule_is_set(
|
||||
ri, '19.4.4.4', self.test_bw_limit_rule_2)
|
||||
elif qos_policy_id == INGRESS_POLICY_ID:
|
||||
self._assert_bandwidth_limit_rule_is_set(
|
||||
ri, '19.4.4.4', self.test_bw_limit_rule_3)
|
||||
elif qos_policy_id == EGRESS_POLICY_ID:
|
||||
self._assert_bandwidth_limit_rule_is_set(
|
||||
ri, '19.4.4.4', self.test_bw_limit_rule_4)
|
||||
|
||||
def test_legacy_router_gateway_ip_qos(self):
|
||||
self._test_centralized_routers()
|
||||
|
||||
def test_legacy_router_gateway_ip_qos_ingress(self):
|
||||
self._test_centralized_routers(ingress=True, egress=False)
|
||||
|
||||
def test_legacy_router_gateway_ip_qos_egress(self):
|
||||
self._test_centralized_routers(ingress=False, egress=True)
|
||||
|
||||
def test_ha_router_gateway_ip_qos(self):
|
||||
self._test_centralized_routers(enable_ha=True)
|
||||
|
||||
def test_ha_router_gateway_ip_qos_ingress(self):
|
||||
self._test_centralized_routers(enable_ha=True,
|
||||
ingress=True, egress=False)
|
||||
|
||||
def test_ha_router_gateway_ip_qos_egress(self):
|
||||
self._test_centralized_routers(enable_ha=True,
|
||||
ingress=False, egress=True)
|
||||
|
||||
|
||||
class TestRouterGatewayIPQosAgentExtensionDVR(
|
||||
test_dvr_router.TestDvrRouter,
|
||||
RouterGatewayIPQosAgentExtensionTestFramework):
|
||||
|
||||
def _test_dvr_gateway_ip_qos(self, enable_ha=False):
|
||||
self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT
|
||||
router_info = self.generate_dvr_router_info(
|
||||
enable_ha=enable_ha, enable_snat=True,
|
||||
enable_gw=True, qos_policy_id=INGRESS_EGRESS_POLICY_ID)
|
||||
ri = self.manage_router(self.agent, router_info)
|
||||
self._assert_bandwidth_limit_rule_is_set(
|
||||
ri, '19.4.4.4', self.test_bw_limit_rule_1)
|
||||
self._assert_bandwidth_limit_rule_is_set(
|
||||
ri, '19.4.4.4', self.test_bw_limit_rule_2)
|
||||
|
||||
def test_dvr_edge_router_gateway_ip_qos(self):
|
||||
self._test_dvr_gateway_ip_qos()
|
||||
|
||||
def test_ha_dvr_edge_router_gateway_ip_qos(self):
|
||||
self._test_dvr_gateway_ip_qos(enable_ha=True)
|
||||
|
||||
|
||||
class LinuxBridgeRouterGatewayIPQosAgentExtensionTestCase(
|
||||
TestRouterGatewayIPQosAgentExtension):
|
||||
INTERFACE_DRIVER = 'neutron.agent.linux.interface.BridgeInterfaceDriver'
|
|
@ -0,0 +1,221 @@
|
|||
# Copyright 2017 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
import mock
|
||||
from neutron_lib import constants as lib_const
|
||||
from neutron_lib import context
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.agent.l3 import agent as l3_agent
|
||||
from neutron.agent.l3.extensions.qos import gateway_ip as gateway_ip_qos
|
||||
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
|
||||
from neutron.agent.l3 import router_info as l3router
|
||||
from neutron.api.rpc.callbacks.consumer import registry
|
||||
from neutron.api.rpc.callbacks import resources
|
||||
from neutron.api.rpc.handlers import resources_rpc
|
||||
from neutron.objects.qos import policy
|
||||
from neutron.objects.qos import rule
|
||||
from neutron.tests.unit.agent.l3 import test_agent
|
||||
|
||||
_uuid = uuidutils.generate_uuid
|
||||
|
||||
TEST_QOS_GW_IP = "172.24.4.1"
|
||||
|
||||
HOSTNAME = 'myhost'
|
||||
|
||||
|
||||
class QosExtensionBaseTestCase(test_agent.BasicRouterOperationsFramework):
|
||||
|
||||
def setUp(self):
|
||||
super(QosExtensionBaseTestCase, self).setUp()
|
||||
|
||||
self.gw_ip_qos_ext = gateway_ip_qos.RouterGatewayIPQosAgentExtension()
|
||||
self.context = context.get_admin_context()
|
||||
self.connection = mock.Mock()
|
||||
|
||||
self.policy = policy.QosPolicy(context=None,
|
||||
name='test1', id=_uuid())
|
||||
self.ingress_rule = (
|
||||
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
|
||||
qos_policy_id=self.policy.id,
|
||||
max_kbps=1111,
|
||||
max_burst_kbps=2222,
|
||||
direction=lib_const.INGRESS_DIRECTION))
|
||||
self.egress_rule = (
|
||||
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
|
||||
qos_policy_id=self.policy.id,
|
||||
max_kbps=3333,
|
||||
max_burst_kbps=4444,
|
||||
direction=lib_const.EGRESS_DIRECTION))
|
||||
self.policy.rules = [self.ingress_rule, self.egress_rule]
|
||||
|
||||
self.new_ingress_rule = (
|
||||
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
|
||||
qos_policy_id=self.policy.id,
|
||||
max_kbps=5555,
|
||||
max_burst_kbps=6666,
|
||||
direction=lib_const.INGRESS_DIRECTION))
|
||||
self.ingress_rule_only_has_max_kbps = (
|
||||
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
|
||||
qos_policy_id=self.policy.id,
|
||||
max_kbps=5555,
|
||||
max_burst_kbps=0,
|
||||
direction=lib_const.INGRESS_DIRECTION))
|
||||
|
||||
self.policy2 = policy.QosPolicy(context=None,
|
||||
name='test2', id=_uuid())
|
||||
self.policy2.rules = [self.ingress_rule]
|
||||
|
||||
self.policy3 = policy.QosPolicy(context=None,
|
||||
name='test3', id=_uuid())
|
||||
self.policy3.rules = [self.egress_rule]
|
||||
|
||||
self.policy4 = policy.QosPolicy(context=None,
|
||||
name='test4', id=_uuid())
|
||||
self.dscp = rule.QosDscpMarkingRule(context=None, id=_uuid(),
|
||||
qos_policy_id=self.policy4.id,
|
||||
dscp_mark=32)
|
||||
self.dscp.obj_reset_changes()
|
||||
self.policy4.rules = [self.dscp]
|
||||
|
||||
self.qos_policies = {self.policy.id: self.policy,
|
||||
self.policy2.id: self.policy2,
|
||||
self.policy3.id: self.policy3,
|
||||
self.policy4.id: self.policy4}
|
||||
|
||||
self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
||||
self.ex_gw_port = {'id': _uuid(),
|
||||
'fixed_ips': [
|
||||
{'ip_address': TEST_QOS_GW_IP}],
|
||||
'qos_policy_id': self.policy.id,
|
||||
'enable_snat': True}
|
||||
self.fip = {'id': _uuid(),
|
||||
'floating_ip_address': '172.24.4.9',
|
||||
'fixed_ip_address': '192.168.0.1',
|
||||
'floating_network_id': _uuid(),
|
||||
'port_id': _uuid(),
|
||||
'host': HOSTNAME,
|
||||
'qos_policy_id': self.policy.id}
|
||||
self.router = {'id': _uuid(),
|
||||
'gw_port': self.ex_gw_port,
|
||||
'ha': False,
|
||||
'distributed': False,
|
||||
lib_const.FLOATINGIP_KEY: [self.fip],
|
||||
'external_gateway_info': self.ex_gw_port}
|
||||
self.router_info = l3router.RouterInfo(self.agent, self.router['id'],
|
||||
self.router, **self.ri_kwargs)
|
||||
self.router_info.ex_gw_port = self.ex_gw_port
|
||||
self.agent.router_info[self.router['id']] = self.router_info
|
||||
|
||||
def _mock_get_router_info(router_id):
|
||||
return self.router_info
|
||||
|
||||
self.get_router_info = mock.patch(
|
||||
'neutron.agent.l3.l3_agent_extension_api.'
|
||||
'L3AgentExtensionAPI.get_router_info').start()
|
||||
self.get_router_info.side_effect = _mock_get_router_info
|
||||
|
||||
self.agent_api = l3_ext_api.L3AgentExtensionAPI(None)
|
||||
self.gw_ip_qos_ext.consume_api(self.agent_api)
|
||||
|
||||
|
||||
class RouterGatewayIPQosAgentExtensionInitializeTestCase(
|
||||
QosExtensionBaseTestCase):
|
||||
|
||||
@mock.patch.object(registry, 'register')
|
||||
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
|
||||
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
|
||||
call_to_patch = 'neutron.common.rpc.Connection'
|
||||
with mock.patch(call_to_patch,
|
||||
return_value=self.connection) as create_connection:
|
||||
self.gw_ip_qos_ext.initialize(
|
||||
self.connection, lib_const.L3_AGENT_MODE)
|
||||
create_connection.assert_has_calls([mock.call()])
|
||||
self.connection.create_consumer.assert_has_calls(
|
||||
[mock.call(
|
||||
resources_rpc.resource_type_versioned_topic(
|
||||
resources.QOS_POLICY),
|
||||
[rpc_mock()],
|
||||
fanout=True)]
|
||||
)
|
||||
subscribe_mock.assert_called_with(mock.ANY, resources.QOS_POLICY)
|
||||
|
||||
|
||||
class RouterGatewayIPQosAgentExtensionTestCase(
|
||||
QosExtensionBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(RouterGatewayIPQosAgentExtensionTestCase, self).setUp()
|
||||
self.gw_ip_qos_ext.initialize(
|
||||
self.connection, lib_const.L3_AGENT_MODE)
|
||||
self._set_pull_mock()
|
||||
|
||||
def _set_pull_mock(self):
|
||||
|
||||
def _pull_mock(context, resource_type, resource_id):
|
||||
return self.qos_policies[resource_id]
|
||||
|
||||
self.pull = mock.patch(
|
||||
'neutron.api.rpc.handlers.resources_rpc.'
|
||||
'ResourcesPullRpcApi.pull').start()
|
||||
self.pull.side_effect = _pull_mock
|
||||
|
||||
def _test_gateway_ip_add(self, func):
|
||||
tc_wrapper = mock.Mock()
|
||||
with mock.patch.object(self.gw_ip_qos_ext, '_get_tc_wrapper',
|
||||
return_value=tc_wrapper):
|
||||
func(self.context, self.router)
|
||||
tc_wrapper.set_ip_rate_limit.assert_has_calls(
|
||||
[mock.call(lib_const.INGRESS_DIRECTION,
|
||||
TEST_QOS_GW_IP, 1111, 2222),
|
||||
mock.call(lib_const.EGRESS_DIRECTION,
|
||||
TEST_QOS_GW_IP, 3333, 4444)],
|
||||
any_order=True)
|
||||
|
||||
self.assertEqual(
|
||||
{self.router_info.router_id: self.policy.id},
|
||||
self.gw_ip_qos_ext.gateway_ip_qos_map.resource_policies)
|
||||
|
||||
def test_add_router(self):
|
||||
self._test_gateway_ip_add(self.gw_ip_qos_ext.add_router)
|
||||
|
||||
def test_update_router(self):
|
||||
self._test_gateway_ip_add(self.gw_ip_qos_ext.update_router)
|
||||
|
||||
def test__process_update_policy(self):
|
||||
tc_wrapper = mock.Mock()
|
||||
with mock.patch.object(self.gw_ip_qos_ext, '_get_tc_wrapper',
|
||||
return_value=tc_wrapper):
|
||||
self.gw_ip_qos_ext.add_router(self.context, self.router)
|
||||
tc_wrapper.set_ip_rate_limit.assert_has_calls(
|
||||
[mock.call(lib_const.INGRESS_DIRECTION,
|
||||
TEST_QOS_GW_IP, 1111, 2222),
|
||||
mock.call(lib_const.EGRESS_DIRECTION,
|
||||
TEST_QOS_GW_IP, 3333, 4444)],
|
||||
any_order=True)
|
||||
new_policy = copy.deepcopy(self.policy)
|
||||
new_policy.rules = [self.new_ingress_rule, self.egress_rule]
|
||||
self.gw_ip_qos_ext._process_update_policy(new_policy)
|
||||
self.assertEqual(
|
||||
{self.router_info.router_id: self.policy.id},
|
||||
self.gw_ip_qos_ext.gateway_ip_qos_map.resource_policies)
|
||||
tc_wrapper.set_ip_rate_limit.assert_has_calls(
|
||||
[mock.call(lib_const.INGRESS_DIRECTION,
|
||||
TEST_QOS_GW_IP, 5555, 6666),
|
||||
mock.call(lib_const.EGRESS_DIRECTION,
|
||||
TEST_QOS_GW_IP, 3333, 4444)],
|
||||
any_order=True)
|
|
@ -110,6 +110,7 @@ neutron.agent.l2.extensions =
|
|||
log = neutron.services.logapi.agent.log_extension:LoggingExtension
|
||||
neutron.agent.l3.extensions =
|
||||
fip_qos = neutron.agent.l3.extensions.qos.fip:FipQosAgentExtension
|
||||
gateway_ip_qos = neutron.agent.l3.extensions.qos.gateway_ip:RouterGatewayIPQosAgentExtension
|
||||
port_forwarding = neutron.agent.l3.extensions.port_forwarding:PortForwardingAgentExtension
|
||||
snat_log = neutron.agent.l3.extensions.snat_log:SNATLoggingExtension
|
||||
neutron.services.logapi.drivers =
|
||||
|
|
Loading…
Reference in New Issue