neutron/neutron/tests/unit/agent/l3/test_router_info.py

107 lines
4.0 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron.agent.common import config as agent_config
from neutron.agent.l3 import router_info
from neutron.openstack.common import uuidutils
from neutron.tests import base
_uuid = uuidutils.generate_uuid
class TestRouterInfo(base.BaseTestCase):
def setUp(self):
super(TestRouterInfo, self).setUp()
conf = agent_config.setup_conf()
conf.use_namespaces = True
self.ip_cls_p = mock.patch('neutron.agent.linux.ip_lib.IPWrapper')
ip_cls = self.ip_cls_p.start()
self.mock_ip = mock.MagicMock()
ip_cls.return_value = self.mock_ip
self.ri_kwargs = {'agent_conf': conf,
'interface_driver': mock.sentinel.interface_driver}
def _check_agent_method_called(self, calls):
self.mock_ip.netns.execute.assert_has_calls(
[mock.call(call, check_exit_code=False) for call in calls],
any_order=True)
def test_routing_table_update(self):
ri = router_info.RouterInfo(_uuid(), {}, **self.ri_kwargs)
ri.router = {}
fake_route1 = {'destination': '135.207.0.0/16',
'nexthop': '1.2.3.4'}
fake_route2 = {'destination': '135.207.111.111/32',
'nexthop': '1.2.3.4'}
ri._update_routing_table('replace', fake_route1)
expected = [['ip', 'route', 'replace', 'to', '135.207.0.0/16',
'via', '1.2.3.4']]
self._check_agent_method_called(expected)
ri._update_routing_table('delete', fake_route1)
expected = [['ip', 'route', 'delete', 'to', '135.207.0.0/16',
'via', '1.2.3.4']]
self._check_agent_method_called(expected)
ri._update_routing_table('replace', fake_route2)
expected = [['ip', 'route', 'replace', 'to', '135.207.111.111/32',
'via', '1.2.3.4']]
self._check_agent_method_called(expected)
ri._update_routing_table('delete', fake_route2)
expected = [['ip', 'route', 'delete', 'to', '135.207.111.111/32',
'via', '1.2.3.4']]
self._check_agent_method_called(expected)
def test_routes_updated(self):
ri = router_info.RouterInfo(_uuid(), {}, **self.ri_kwargs)
ri.router = {}
fake_old_routes = []
fake_new_routes = [{'destination': "110.100.31.0/24",
'nexthop': "10.100.10.30"},
{'destination': "110.100.30.0/24",
'nexthop': "10.100.10.30"}]
ri.routes = fake_old_routes
ri.router['routes'] = fake_new_routes
ri.routes_updated()
expected = [['ip', 'route', 'replace', 'to', '110.100.30.0/24',
'via', '10.100.10.30'],
['ip', 'route', 'replace', 'to', '110.100.31.0/24',
'via', '10.100.10.30']]
self._check_agent_method_called(expected)
fake_new_routes = [{'destination': "110.100.30.0/24",
'nexthop': "10.100.10.30"}]
ri.router['routes'] = fake_new_routes
ri.routes_updated()
expected = [['ip', 'route', 'delete', 'to', '110.100.31.0/24',
'via', '10.100.10.30']]
self._check_agent_method_called(expected)
fake_new_routes = []
ri.router['routes'] = fake_new_routes
ri.routes_updated()
expected = [['ip', 'route', 'delete', 'to', '110.100.30.0/24',
'via', '10.100.10.30']]
self._check_agent_method_called(expected)