239 lines
10 KiB
Python
239 lines
10 KiB
Python
# Copyright 2017 OpenStack Foundation
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import copy
|
|
from unittest import mock
|
|
|
|
from neutron_lib import constants as lib_const
|
|
from neutron_lib import context
|
|
from oslo_utils import uuidutils
|
|
|
|
from neutron.agent.l3 import agent as l3_agent
|
|
from neutron.agent.l3.extensions.qos import gateway_ip as gateway_ip_qos
|
|
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
|
|
from neutron.agent.l3 import router_info as l3router
|
|
from neutron.api.rpc.callbacks.consumer import registry
|
|
from neutron.api.rpc.callbacks import resources
|
|
from neutron.api.rpc.handlers import resources_rpc
|
|
from neutron.objects.qos import policy
|
|
from neutron.objects.qos import rule
|
|
from neutron.tests.unit.agent.l3 import test_agent
|
|
|
|
_uuid = uuidutils.generate_uuid
|
|
|
|
TEST_QOS_GW_IP = "172.24.4.1"
|
|
|
|
HOSTNAME = 'myhost'
|
|
|
|
|
|
class QosExtensionBaseTestCase(test_agent.BasicRouterOperationsFramework):
|
|
|
|
def setUp(self):
|
|
super(QosExtensionBaseTestCase, self).setUp()
|
|
|
|
self.gw_ip_qos_ext = gateway_ip_qos.RouterGatewayIPQosAgentExtension()
|
|
self.context = context.get_admin_context()
|
|
self.connection = mock.Mock()
|
|
|
|
self.policy = policy.QosPolicy(context=None,
|
|
name='test1', id=_uuid())
|
|
self.ingress_rule = (
|
|
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
|
|
qos_policy_id=self.policy.id,
|
|
max_kbps=1111,
|
|
max_burst_kbps=2222,
|
|
direction=lib_const.INGRESS_DIRECTION))
|
|
self.egress_rule = (
|
|
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
|
|
qos_policy_id=self.policy.id,
|
|
max_kbps=3333,
|
|
max_burst_kbps=4444,
|
|
direction=lib_const.EGRESS_DIRECTION))
|
|
self.policy.rules = [self.ingress_rule, self.egress_rule]
|
|
|
|
self.new_ingress_rule = (
|
|
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
|
|
qos_policy_id=self.policy.id,
|
|
max_kbps=5555,
|
|
max_burst_kbps=6666,
|
|
direction=lib_const.INGRESS_DIRECTION))
|
|
self.ingress_rule_only_has_max_kbps = (
|
|
rule.QosBandwidthLimitRule(context=None, id=_uuid(),
|
|
qos_policy_id=self.policy.id,
|
|
max_kbps=5555,
|
|
max_burst_kbps=0,
|
|
direction=lib_const.INGRESS_DIRECTION))
|
|
|
|
self.policy2 = policy.QosPolicy(context=None,
|
|
name='test2', id=_uuid())
|
|
self.policy2.rules = [self.ingress_rule]
|
|
|
|
self.policy3 = policy.QosPolicy(context=None,
|
|
name='test3', id=_uuid())
|
|
self.policy3.rules = [self.egress_rule]
|
|
|
|
self.policy4 = policy.QosPolicy(context=None,
|
|
name='test4', id=_uuid())
|
|
self.dscp = rule.QosDscpMarkingRule(context=None, id=_uuid(),
|
|
qos_policy_id=self.policy4.id,
|
|
dscp_mark=32)
|
|
self.dscp.obj_reset_changes()
|
|
self.policy4.rules = [self.dscp]
|
|
|
|
self.qos_policies = {self.policy.id: self.policy,
|
|
self.policy2.id: self.policy2,
|
|
self.policy3.id: self.policy3,
|
|
self.policy4.id: self.policy4}
|
|
|
|
self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
|
|
self.ex_gw_port = {'id': _uuid(),
|
|
'fixed_ips': [
|
|
{'ip_address': TEST_QOS_GW_IP}],
|
|
'qos_policy_id': self.policy.id,
|
|
'enable_snat': True}
|
|
self.fip = {'id': _uuid(),
|
|
'floating_ip_address': '172.24.4.9',
|
|
'fixed_ip_address': '192.168.0.1',
|
|
'floating_network_id': _uuid(),
|
|
'port_id': _uuid(),
|
|
'host': HOSTNAME,
|
|
'qos_policy_id': self.policy.id}
|
|
self.router = {'id': _uuid(),
|
|
'gw_port': self.ex_gw_port,
|
|
'ha': False,
|
|
'distributed': False,
|
|
lib_const.FLOATINGIP_KEY: [self.fip],
|
|
'external_gateway_info': self.ex_gw_port}
|
|
self.router_info = l3router.RouterInfo(self.agent, self.router['id'],
|
|
self.router, **self.ri_kwargs)
|
|
self.router_info.ex_gw_port = self.ex_gw_port
|
|
self.agent.router_info[self.router['id']] = self.router_info
|
|
|
|
def _mock_get_router_info(router_id):
|
|
return self.router_info
|
|
|
|
self.get_router_info = mock.patch(
|
|
'neutron.agent.l3.l3_agent_extension_api.'
|
|
'L3AgentExtensionAPI.get_router_info').start()
|
|
self.get_router_info.side_effect = _mock_get_router_info
|
|
|
|
self.agent_api = l3_ext_api.L3AgentExtensionAPI(None, None)
|
|
self.gw_ip_qos_ext.consume_api(self.agent_api)
|
|
|
|
|
|
class RouterGatewayIPQosAgentExtensionInitializeTestCase(
|
|
QosExtensionBaseTestCase):
|
|
|
|
@mock.patch.object(registry, 'register')
|
|
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
|
|
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
|
|
call_to_patch = 'neutron_lib.rpc.Connection'
|
|
with mock.patch(call_to_patch,
|
|
return_value=self.connection) as create_connection:
|
|
self.gw_ip_qos_ext.initialize(
|
|
self.connection, lib_const.L3_AGENT_MODE)
|
|
create_connection.assert_has_calls([mock.call()])
|
|
self.connection.create_consumer.assert_has_calls(
|
|
[mock.call(
|
|
resources_rpc.resource_type_versioned_topic(
|
|
resources.QOS_POLICY),
|
|
[rpc_mock()],
|
|
fanout=True)]
|
|
)
|
|
subscribe_mock.assert_called_with(mock.ANY, resources.QOS_POLICY)
|
|
|
|
|
|
class RouterGatewayIPQosAgentExtensionTestCase(
|
|
QosExtensionBaseTestCase):
|
|
|
|
def setUp(self):
|
|
super(RouterGatewayIPQosAgentExtensionTestCase, self).setUp()
|
|
self.gw_ip_qos_ext.initialize(
|
|
self.connection, lib_const.L3_AGENT_MODE)
|
|
self._set_pull_mock()
|
|
|
|
def _set_pull_mock(self):
|
|
|
|
def _pull_mock(context, resource_type, resource_id):
|
|
return self.qos_policies[resource_id]
|
|
|
|
self.pull = mock.patch(
|
|
'neutron.api.rpc.handlers.resources_rpc.'
|
|
'ResourcesPullRpcApi.pull').start()
|
|
self.pull.side_effect = _pull_mock
|
|
|
|
def _test_gateway_ip_add(self, func):
|
|
tc_wrapper = mock.Mock()
|
|
with mock.patch.object(self.gw_ip_qos_ext, '_get_tc_wrapper',
|
|
return_value=tc_wrapper):
|
|
func(self.context, self.router)
|
|
tc_wrapper.set_ip_rate_limit.assert_has_calls(
|
|
[mock.call(lib_const.INGRESS_DIRECTION,
|
|
TEST_QOS_GW_IP, 1111, 2222),
|
|
mock.call(lib_const.EGRESS_DIRECTION,
|
|
TEST_QOS_GW_IP, 3333, 4444)],
|
|
any_order=True)
|
|
|
|
self.assertEqual(
|
|
{self.router_info.router_id: self.policy.id},
|
|
self.gw_ip_qos_ext.gateway_ip_qos_map.resource_2_qos_policies)
|
|
|
|
def test_add_router(self):
|
|
self._test_gateway_ip_add(self.gw_ip_qos_ext.add_router)
|
|
|
|
def test_update_router(self):
|
|
self._test_gateway_ip_add(self.gw_ip_qos_ext.update_router)
|
|
|
|
def test_delete_router(self):
|
|
tc_wrapper = mock.Mock()
|
|
with mock.patch.object(self.gw_ip_qos_ext, '_get_tc_wrapper',
|
|
return_value=tc_wrapper):
|
|
self.gw_ip_qos_ext.add_router(self.context, self.router)
|
|
tc_wrapper.set_ip_rate_limit.assert_has_calls(
|
|
[mock.call(lib_const.INGRESS_DIRECTION,
|
|
TEST_QOS_GW_IP, 1111, 2222),
|
|
mock.call(lib_const.EGRESS_DIRECTION,
|
|
TEST_QOS_GW_IP, 3333, 4444)],
|
|
any_order=True)
|
|
|
|
self.gw_ip_qos_ext.delete_router(self.context, self.router)
|
|
self.assertIsNone(
|
|
self.gw_ip_qos_ext.gateway_ip_qos_map.get_resource_policy(
|
|
self.router_info.router_id))
|
|
|
|
def test__process_update_policy(self):
|
|
tc_wrapper = mock.Mock()
|
|
with mock.patch.object(self.gw_ip_qos_ext, '_get_tc_wrapper',
|
|
return_value=tc_wrapper):
|
|
self.gw_ip_qos_ext.add_router(self.context, self.router)
|
|
tc_wrapper.set_ip_rate_limit.assert_has_calls(
|
|
[mock.call(lib_const.INGRESS_DIRECTION,
|
|
TEST_QOS_GW_IP, 1111, 2222),
|
|
mock.call(lib_const.EGRESS_DIRECTION,
|
|
TEST_QOS_GW_IP, 3333, 4444)],
|
|
any_order=True)
|
|
new_policy = copy.deepcopy(self.policy)
|
|
new_policy.rules = [self.new_ingress_rule, self.egress_rule]
|
|
self.gw_ip_qos_ext._process_update_policy(new_policy)
|
|
self.assertEqual(
|
|
{self.router_info.router_id: self.policy.id},
|
|
self.gw_ip_qos_ext.gateway_ip_qos_map.resource_2_qos_policies)
|
|
tc_wrapper.set_ip_rate_limit.assert_has_calls(
|
|
[mock.call(lib_const.INGRESS_DIRECTION,
|
|
TEST_QOS_GW_IP, 5555, 6666),
|
|
mock.call(lib_const.EGRESS_DIRECTION,
|
|
TEST_QOS_GW_IP, 3333, 4444)],
|
|
any_order=True)
|