# Copyright (C) 2014 VA Linux Systems Japan K.K. # Based on test for openvswitch agent(test_ovs_neutron_agent.py). # # Copyright (c) 2012 OpenStack Foundation. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # # @author: Fumihiko Kakuma, VA Linux Systems Japan K.K. # @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K. import contextlib import mock import netaddr from oslo.config import cfg import testtools from neutron.agent.linux import ip_lib from neutron.agent.linux import ovs_lib from neutron.agent.linux import utils from neutron.openstack.common import importutils from neutron.plugins.common import constants as p_const from neutron.plugins.openvswitch.common import constants from neutron.tests import base from neutron.tests.unit.ofagent import fake_oflib NOTIFIER = ('neutron.plugins.ml2.rpc.AgentNotifierApi') OVS_LINUX_KERN_VERS_WITHOUT_VXLAN = "3.12.0" class OFAAgentTestCase(base.BaseTestCase): _AGENT_NAME = 'neutron.plugins.ofagent.agent.ofa_neutron_agent' def setUp(self): super(OFAAgentTestCase, self).setUp() self.fake_oflib_of = fake_oflib.patch_fake_oflib_of().start() self.mod_agent = importutils.import_module(self._AGENT_NAME) cfg.CONF.set_default('firewall_driver', 'neutron.agent.firewall.NoopFirewallDriver', group='SECURITYGROUP') self.ryuapp = mock.Mock() cfg.CONF.register_cli_opts([ cfg.StrOpt('ofp-listen-host', default='', help='openflow listen host'), cfg.IntOpt('ofp-tcp-listen-port', default=6633, help='openflow tcp listen port') ]) cfg.CONF.set_override('root_helper', 'fake_helper', group='AGENT') class CreateAgentConfigMap(OFAAgentTestCase): def test_create_agent_config_map_succeeds(self): self.assertTrue(self.mod_agent.create_agent_config_map(cfg.CONF)) def test_create_agent_config_map_fails_for_invalid_tunnel_config(self): # An ip address is required for tunneling but there is no default, # verify this for both gre and vxlan tunnels. cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE], group='AGENT') with testtools.ExpectedException(ValueError): self.mod_agent.create_agent_config_map(cfg.CONF) cfg.CONF.set_override('tunnel_types', [p_const.TYPE_VXLAN], group='AGENT') with testtools.ExpectedException(ValueError): self.mod_agent.create_agent_config_map(cfg.CONF) def test_create_agent_config_map_enable_tunneling(self): # Verify setting only enable_tunneling will default tunnel_type to GRE cfg.CONF.set_override('tunnel_types', None, group='AGENT') cfg.CONF.set_override('enable_tunneling', True, group='OVS') cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS') cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF) self.assertEqual(cfgmap['tunnel_types'], [p_const.TYPE_GRE]) def test_create_agent_config_map_fails_no_local_ip(self): # An ip address is required for tunneling but there is no default cfg.CONF.set_override('enable_tunneling', True, group='OVS') with testtools.ExpectedException(ValueError): self.mod_agent.create_agent_config_map(cfg.CONF) def test_create_agent_config_map_fails_for_invalid_tunnel_type(self): cfg.CONF.set_override('tunnel_types', ['foobar'], group='AGENT') with testtools.ExpectedException(ValueError): self.mod_agent.create_agent_config_map(cfg.CONF) def test_create_agent_config_map_multiple_tunnel_types(self): cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS') cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE, p_const.TYPE_VXLAN], group='AGENT') cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF) self.assertEqual(cfgmap['tunnel_types'], [p_const.TYPE_GRE, p_const.TYPE_VXLAN]) class TestOFANeutronAgentOVSBridge(OFAAgentTestCase): def setUp(self): super(TestOFANeutronAgentOVSBridge, self).setUp() self.br_name = 'bridge1' self.root_helper = 'fake_helper' self.ovs = self.mod_agent.OVSBridge( self.br_name, self.root_helper, self.ryuapp) def test_find_datapath_id(self): with mock.patch.object(self.ovs, 'get_datapath_id', return_value='12345'): self.ovs.find_datapath_id() self.assertEqual(self.ovs.datapath_id, '12345') def _fake_get_datapath(self, app, datapath_id): if self.ovs.retry_count >= 2: datapath = mock.Mock() datapath.ofproto_parser = mock.Mock() return datapath self.ovs.retry_count += 1 return None def test_get_datapath_normal(self): self.ovs.retry_count = 0 with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath', new=self._fake_get_datapath): self.ovs.datapath_id = '0x64' self.ovs.get_datapath(retry_max=4) self.assertEqual(self.ovs.retry_count, 2) def test_get_datapath_retry_out_by_default_time(self): cfg.CONF.set_override('get_datapath_retry_times', 3, group='AGENT') with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath', return_value=None) as mock_get_datapath: with testtools.ExpectedException(SystemExit): self.ovs.datapath_id = '0x64' self.ovs.get_datapath(retry_max=3) self.assertEqual(mock_get_datapath.call_count, 3) def test_get_datapath_retry_out_by_specified_time(self): with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath', return_value=None) as mock_get_datapath: with testtools.ExpectedException(SystemExit): self.ovs.datapath_id = '0x64' self.ovs.get_datapath(retry_max=2) self.assertEqual(mock_get_datapath.call_count, 2) def test_setup_ofp_default_par(self): with contextlib.nested( mock.patch.object(self.ovs, 'set_protocols'), mock.patch.object(self.ovs, 'set_controller'), mock.patch.object(self.ovs, 'find_datapath_id'), mock.patch.object(self.ovs, 'get_datapath'), ) as (mock_set_protocols, mock_set_controller, mock_find_datapath_id, mock_get_datapath): self.ovs.setup_ofp() mock_set_protocols.assert_called_with('OpenFlow13') mock_set_controller.assert_called_with(['tcp:127.0.0.1:6633']) mock_get_datapath.assert_called_with( cfg.CONF.AGENT.get_datapath_retry_times) self.assertEqual(mock_find_datapath_id.call_count, 1) def test_setup_ofp_specify_par(self): controller_names = ['tcp:192.168.10.10:1234', 'tcp:172.17.16.20:5555'] with contextlib.nested( mock.patch.object(self.ovs, 'set_protocols'), mock.patch.object(self.ovs, 'set_controller'), mock.patch.object(self.ovs, 'find_datapath_id'), mock.patch.object(self.ovs, 'get_datapath'), ) as (mock_set_protocols, mock_set_controller, mock_find_datapath_id, mock_get_datapath): self.ovs.setup_ofp(controller_names=controller_names, protocols='OpenFlow133', retry_max=11) mock_set_protocols.assert_called_with('OpenFlow133') mock_set_controller.assert_called_with(controller_names) mock_get_datapath.assert_called_with(11) self.assertEqual(mock_find_datapath_id.call_count, 1) def test_setup_ofp_with_except(self): with contextlib.nested( mock.patch.object(self.ovs, 'set_protocols', side_effect=RuntimeError), mock.patch.object(self.ovs, 'set_controller'), mock.patch.object(self.ovs, 'find_datapath_id'), mock.patch.object(self.ovs, 'get_datapath'), ) as (mock_set_protocols, mock_set_controller, mock_find_datapath_id, mock_get_datapath): with testtools.ExpectedException(SystemExit): self.ovs.setup_ofp() class TestOFANeutronAgent(OFAAgentTestCase): def setUp(self): super(TestOFANeutronAgent, self).setUp() notifier_p = mock.patch(NOTIFIER) notifier_cls = notifier_p.start() self.notifier = mock.Mock() notifier_cls.return_value = self.notifier # Avoid rpc initialization for unit tests cfg.CONF.set_override('rpc_backend', 'neutron.openstack.common.rpc.impl_fake') kwargs = self.mod_agent.create_agent_config_map(cfg.CONF) class MockFixedIntervalLoopingCall(object): def __init__(self, f): self.f = f def start(self, interval=0): self.f() def _mk_test_dp(name): ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') dp = mock.Mock() dp.ofproto = ofp dp.ofproto_parser = ofpp dp.__repr__ = lambda _self: name return dp def _mk_test_br(name): dp = _mk_test_dp(name) br = mock.Mock() br.datapath = dp br.ofproto = dp.ofproto br.ofparser = dp.ofproto_parser return br with contextlib.nested( mock.patch.object(self.mod_agent.OFANeutronAgent, 'setup_integration_br', return_value=mock.Mock()), mock.patch.object(self.mod_agent.OFANeutronAgent, 'setup_ancillary_bridges', return_value=[]), mock.patch.object(self.mod_agent.OVSBridge, 'get_local_port_mac', return_value='00:00:00:00:00:01'), mock.patch('neutron.agent.linux.utils.get_interface_mac', return_value='00:00:00:00:00:01'), mock.patch('neutron.openstack.common.loopingcall.' 'FixedIntervalLoopingCall', new=MockFixedIntervalLoopingCall)): self.agent = self.mod_agent.OFANeutronAgent(self.ryuapp, **kwargs) self.agent.tun_br = _mk_test_br('tun_br') self.datapath = mock.Mock() self.ofparser = mock.Mock() self.agent.phys_brs['phys-net1'] = _mk_test_br('phys_br1') self.agent.phys_ofports['phys-net1'] = 777 self.agent.int_ofports['phys-net1'] = 666 self.datapath.ofparser = self.ofparser self.ofparser.OFPMatch = mock.Mock() self.ofparser.OFPMatch.return_value = mock.Mock() self.ofparser.OFPFlowMod = mock.Mock() self.ofparser.OFPFlowMod.return_value = mock.Mock() self.agent.int_br.ofparser = self.ofparser self.agent.int_br.datapath = _mk_test_dp('int_br') self.agent.sg_agent = mock.Mock() def _mock_port_bound(self, ofport=None, new_local_vlan=None, old_local_vlan=None): port = mock.Mock() port.ofport = ofport net_uuid = 'my-net-uuid' if old_local_vlan is not None: self.agent.local_vlan_map[net_uuid] = ( self.mod_agent.LocalVLANMapping( old_local_vlan, None, None, None)) with contextlib.nested( mock.patch.object(self.mod_agent.OVSBridge, 'set_db_attribute', return_value=True), mock.patch.object(self.mod_agent.OVSBridge, 'db_get_val', return_value=str(old_local_vlan)), mock.patch.object(self.agent, 'ryu_send_msg') ) as (set_ovs_db_func, get_ovs_db_func, ryu_send_msg_func): self.agent.port_bound(port, net_uuid, 'local', None, None) get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag") if new_local_vlan != old_local_vlan: set_ovs_db_func.assert_called_once_with( "Port", mock.ANY, "tag", str(new_local_vlan)) if ofport != -1: ryu_send_msg_func.assert_called_once_with( self.ofparser.OFPFlowMod.return_value) else: self.assertFalse(ryu_send_msg_func.called) else: self.assertFalse(set_ovs_db_func.called) self.assertFalse(ryu_send_msg_func.called) def test_port_bound_deletes_flows_for_valid_ofport(self): self._mock_port_bound(ofport=1, new_local_vlan=1) def test_port_bound_ignores_flows_for_invalid_ofport(self): self._mock_port_bound(ofport=-1, new_local_vlan=1) def test_port_bound_does_not_rewire_if_already_bound(self): self._mock_port_bound(ofport=-1, new_local_vlan=1, old_local_vlan=1) def _test_port_dead(self, cur_tag=None): port = mock.Mock() port.ofport = 1 with contextlib.nested( mock.patch.object(self.mod_agent.OVSBridge, 'set_db_attribute', return_value=True), mock.patch.object(self.mod_agent.OVSBridge, 'db_get_val', return_value=cur_tag), mock.patch.object(self.agent, 'ryu_send_msg') ) as (set_ovs_db_func, get_ovs_db_func, ryu_send_msg_func): self.agent.port_dead(port) get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag") if cur_tag == self.mod_agent.DEAD_VLAN_TAG: self.assertFalse(set_ovs_db_func.called) self.assertFalse(ryu_send_msg_func.called) else: set_ovs_db_func.assert_called_once_with( "Port", mock.ANY, "tag", str(self.mod_agent.DEAD_VLAN_TAG)) ryu_send_msg_func.assert_called_once_with( self.ofparser.OFPFlowMod.return_value) def test_port_dead(self): self._test_port_dead() def test_port_dead_with_port_already_dead(self): self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG) def mock_scan_ports(self, vif_port_set=None, registered_ports=None, updated_ports=None, port_tags_dict=None): port_tags_dict = port_tags_dict or {} with contextlib.nested( mock.patch.object(self.agent.int_br, 'get_vif_port_set', return_value=vif_port_set), mock.patch.object(self.agent.int_br, 'get_port_tag_dict', return_value=port_tags_dict) ): return self.agent.scan_ports(registered_ports, updated_ports) def test_scan_ports_returns_current_only_for_unchanged_ports(self): vif_port_set = set([1, 3]) registered_ports = set([1, 3]) expected = {'current': vif_port_set} actual = self.mock_scan_ports(vif_port_set, registered_ports) self.assertEqual(expected, actual) def test_scan_ports_returns_port_changes(self): vif_port_set = set([1, 3]) registered_ports = set([1, 2]) expected = dict(current=vif_port_set, added=set([3]), removed=set([2])) actual = self.mock_scan_ports(vif_port_set, registered_ports) self.assertEqual(expected, actual) def _test_scan_ports_with_updated_ports(self, updated_ports): vif_port_set = set([1, 3, 4]) registered_ports = set([1, 2, 4]) expected = dict(current=vif_port_set, added=set([3]), removed=set([2]), updated=set([4])) actual = self.mock_scan_ports(vif_port_set, registered_ports, updated_ports) self.assertEqual(expected, actual) def test_scan_ports_finds_known_updated_ports(self): self._test_scan_ports_with_updated_ports(set([4])) def test_scan_ports_ignores_unknown_updated_ports(self): # the port '5' was not seen on current ports. Hence it has either # never been wired or already removed and should be ignored self._test_scan_ports_with_updated_ports(set([4, 5])) def test_scan_ports_ignores_updated_port_if_removed(self): vif_port_set = set([1, 3]) registered_ports = set([1, 2]) updated_ports = set([1, 2]) expected = dict(current=vif_port_set, added=set([3]), removed=set([2]), updated=set([1])) actual = self.mock_scan_ports(vif_port_set, registered_ports, updated_ports) self.assertEqual(expected, actual) def test_scan_ports_no_vif_changes_returns_updated_port_only(self): vif_port_set = set([1, 2, 3]) registered_ports = set([1, 2, 3]) updated_ports = set([2]) expected = dict(current=vif_port_set, updated=set([2])) actual = self.mock_scan_ports(vif_port_set, registered_ports, updated_ports) self.assertEqual(expected, actual) def test_update_ports_returns_lost_vlan_port(self): br = self.mod_agent.OVSBridge('br-int', 'fake_helper', self.ryuapp) mac = "ca:fe:de:ad:be:ef" port = ovs_lib.VifPort(1, 1, 1, mac, br) lvm = self.mod_agent.LocalVLANMapping( 1, '1', None, 1, {port.vif_id: port}) local_vlan_map = {'1': lvm} vif_port_set = set([1, 3]) registered_ports = set([1, 2]) port_tags_dict = {1: []} expected = dict( added=set([3]), current=vif_port_set, removed=set([2]), updated=set([1]) ) with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map): actual = self.mock_scan_ports( vif_port_set, registered_ports, port_tags_dict=port_tags_dict) self.assertEqual(expected, actual) def test_treat_devices_added_returns_true_for_missing_device(self): with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_device_details', side_effect=Exception()), mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=mock.Mock())): self.assertTrue(self.agent.treat_devices_added_or_updated([{}])) def _mock_treat_devices_added_updated(self, details, port, func_name): """Mock treat devices added or updated. :param details: the details to return for the device :param port: the port that get_vif_port_by_id should return :param func_name: the function that should be called :returns: whether the named function was called """ with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_device_details', return_value=details), mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=port), mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, func_name) ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, func): self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) return func.called def test_treat_devices_added_updated_ignores_invalid_ofport(self): port = mock.Mock() port.ofport = -1 self.assertFalse(self._mock_treat_devices_added_updated( mock.MagicMock(), port, 'port_dead')) def test_treat_devices_added_updated_marks_unknown_port_as_dead(self): port = mock.Mock() port.ofport = 1 self.assertTrue(self._mock_treat_devices_added_updated( mock.MagicMock(), port, 'port_dead')) def test_treat_devices_added_does_not_process_missing_port(self): with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_device_details'), mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=None) ) as (get_dev_fn, get_vif_func): self.assertFalse(get_dev_fn.called) def test_treat_devices_added_updated_updates_known_port(self): details = mock.MagicMock() details.__contains__.side_effect = lambda x: True self.assertTrue(self._mock_treat_devices_added_updated( details, mock.Mock(), 'treat_vif_port')) def test_treat_devices_added_updated_put_port_down(self): fake_details_dict = {'admin_state_up': False, 'port_id': 'xxx', 'device': 'xxx', 'network_id': 'yyy', 'physical_network': 'foo', 'segmentation_id': 'bar', 'network_type': 'baz'} with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'get_device_details', return_value=fake_details_dict), mock.patch.object(self.agent.int_br, 'get_vif_port_by_id', return_value=mock.MagicMock()), mock.patch.object(self.agent.plugin_rpc, 'update_device_up'), mock.patch.object(self.agent.plugin_rpc, 'update_device_down'), mock.patch.object(self.agent, 'treat_vif_port') ) as (get_dev_fn, get_vif_func, upd_dev_up, upd_dev_down, treat_vif_port): self.assertFalse(self.agent.treat_devices_added_or_updated([{}])) self.assertTrue(treat_vif_port.called) self.assertTrue(upd_dev_down.called) def test_treat_devices_removed_returns_true_for_missing_device(self): with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', side_effect=Exception()): self.assertTrue(self.agent.treat_devices_removed([{}])) def _mock_treat_devices_removed(self, port_exists): details = dict(exists=port_exists) with mock.patch.object(self.agent.plugin_rpc, 'update_device_down', return_value=details): with mock.patch.object(self.agent, 'port_unbound') as port_unbound: self.assertFalse(self.agent.treat_devices_removed([{}])) self.assertTrue(port_unbound.called) def test_treat_devices_removed_unbinds_port(self): self._mock_treat_devices_removed(True) def test_treat_devices_removed_ignores_missing_port(self): self._mock_treat_devices_removed(False) def _test_process_network_ports(self, port_info): with contextlib.nested( mock.patch.object(self.agent.sg_agent, "setup_port_filters"), mock.patch.object(self.agent, "treat_devices_added_or_updated", return_value=False), mock.patch.object(self.agent, "treat_devices_removed", return_value=False) ) as (setup_port_filters, device_added_updated, device_removed): self.assertFalse(self.agent.process_network_ports(port_info)) setup_port_filters.assert_called_once_with( port_info['added'], port_info.get('updated', set())) device_added_updated.assert_called_once_with( port_info['added'] | port_info.get('updated', set())) device_removed.assert_called_once_with(port_info['removed']) def test_process_network_ports(self): self._test_process_network_ports( {'current': set(['tap0']), 'removed': set(['eth0']), 'added': set(['eth1'])}) def test_process_network_port_with_updated_ports(self): self._test_process_network_ports( {'current': set(['tap0', 'tap1']), 'updated': set(['tap1', 'eth1']), 'removed': set(['eth0']), 'added': set(['eth1'])}) def test_report_state(self): with mock.patch.object(self.agent.state_rpc, "report_state") as report_st: self.agent.int_br_device_count = 5 self.agent._report_state() report_st.assert_called_with(self.agent.context, self.agent.agent_state) self.assertNotIn("start_flag", self.agent.agent_state) self.assertEqual( self.agent.agent_state["configurations"]["devices"], self.agent.int_br_device_count ) def test_network_delete(self): with mock.patch.object(self.agent, "reclaim_local_vlan") as recl_fn: self.agent.network_delete("unused_context", network_id="123") self.assertFalse(recl_fn.called) self.agent.local_vlan_map["123"] = "LVM object" self.agent.network_delete("unused_context", network_id="123") recl_fn.assert_called_with("123") def test_port_update(self): port = {"id": "123", "network_id": "124", "admin_state_up": False} self.agent.port_update("unused_context", port=port, network_type="vlan", segmentation_id="1", physical_network="physnet") self.assertEqual(set(['123']), self.agent.updated_ports) def test_setup_physical_bridges(self): with contextlib.nested( mock.patch.object(ip_lib, "device_exists"), mock.patch.object(utils, "execute"), mock.patch.object(self.mod_agent.OVSBridge, "add_port"), mock.patch.object(self.mod_agent.OVSBridge, "delete_port"), mock.patch.object(self.mod_agent.OVSBridge, "set_protocols"), mock.patch.object(self.mod_agent.OVSBridge, "set_controller"), mock.patch.object(self.mod_agent.OVSBridge, "get_datapath_id", return_value='0xa'), mock.patch.object(self.agent.int_br, "add_port"), mock.patch.object(self.agent.int_br, "delete_port"), mock.patch.object(ip_lib.IPWrapper, "add_veth"), mock.patch.object(ip_lib.IpLinkCommand, "delete"), mock.patch.object(ip_lib.IpLinkCommand, "set_up"), mock.patch.object(ip_lib.IpLinkCommand, "set_mtu"), mock.patch.object(self.mod_agent.ryu_api, "get_datapath", return_value=self.datapath) ) as (devex_fn, utilsexec_fn, ovs_addport_fn, ovs_delport_fn, ovs_set_protocols_fn, ovs_set_controller_fn, ovs_datapath_id_fn, br_addport_fn, br_delport_fn, addveth_fn, linkdel_fn, linkset_fn, linkmtu_fn, ryu_api_fn): devex_fn.return_value = True parent = mock.MagicMock() parent.attach_mock(utilsexec_fn, 'utils_execute') parent.attach_mock(linkdel_fn, 'link_delete') parent.attach_mock(addveth_fn, 'add_veth') addveth_fn.return_value = (ip_lib.IPDevice("int-br-eth1"), ip_lib.IPDevice("phy-br-eth1")) ovs_addport_fn.return_value = "25" br_addport_fn.return_value = "11" self.agent.setup_physical_bridges({"physnet1": "br-eth"}) expected_calls = [mock.call.link_delete(), mock.call.utils_execute(['/sbin/udevadm', 'settle', '--timeout=10']), mock.call.add_veth('int-br-eth', 'phy-br-eth')] parent.assert_has_calls(expected_calls, any_order=False) self.assertEqual(self.agent.int_ofports["physnet1"], "11") self.assertEqual(self.agent.phys_ofports["physnet1"], "25") def test_port_unbound(self): with mock.patch.object(self.agent, "reclaim_local_vlan") as reclvl_fn: self.agent.enable_tunneling = True lvm = mock.Mock() lvm.network_type = "gre" lvm.vif_ports = {"vif1": mock.Mock()} self.agent.local_vlan_map["netuid12345"] = lvm self.agent.port_unbound("vif1", "netuid12345") self.assertTrue(reclvl_fn.called) reclvl_fn.called = False lvm.vif_ports = {} self.agent.port_unbound("vif1", "netuid12345") self.assertEqual(reclvl_fn.call_count, 2) lvm.vif_ports = {"vif1": mock.Mock()} self.agent.port_unbound("vif3", "netuid12345") self.assertEqual(reclvl_fn.call_count, 2) def test_daemon_loop_uses_polling_manager(self): with mock.patch( 'neutron.agent.linux.polling.get_polling_manager' ) as mock_get_pm: fake_pm = mock.Mock() mock_get_pm.return_value = fake_pm fake_pm.__enter__ = mock.Mock() fake_pm.__exit__ = mock.Mock() with mock.patch.object( self.agent, 'ovsdb_monitor_loop' ) as mock_loop: self.agent.daemon_loop() mock_get_pm.assert_called_once_with(True, 'fake_helper', constants.DEFAULT_OVSDBMON_RESPAWN) mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__()) def test_setup_tunnel_port_error_negative(self): with contextlib.nested( mock.patch.object(self.agent.tun_br, 'add_tunnel_port', return_value='-1'), mock.patch.object(self.mod_agent.LOG, 'error') ) as (add_tunnel_port_fn, log_error_fn): ofport = self.agent.setup_tunnel_port( 'gre-1', 'remote_ip', p_const.TYPE_GRE) add_tunnel_port_fn.assert_called_once_with( 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE, self.agent.vxlan_udp_port, self.agent.dont_fragment) log_error_fn.assert_called_once_with( _("Failed to set-up %(type)s tunnel port to %(ip)s"), {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'}) self.assertEqual(ofport, 0) def test_setup_tunnel_port_error_not_int(self): with contextlib.nested( mock.patch.object(self.agent.tun_br, 'add_tunnel_port', return_value=None), mock.patch.object(self.mod_agent.LOG, 'exception'), mock.patch.object(self.mod_agent.LOG, 'error') ) as (add_tunnel_port_fn, log_exc_fn, log_error_fn): ofport = self.agent.setup_tunnel_port( 'gre-1', 'remote_ip', p_const.TYPE_GRE) add_tunnel_port_fn.assert_called_once_with( 'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE, self.agent.vxlan_udp_port, self.agent.dont_fragment) log_exc_fn.assert_called_once_with( _("ofport should have a value that can be " "interpreted as an integer")) log_error_fn.assert_called_once_with( _("Failed to set-up %(type)s tunnel port to %(ip)s"), {'type': p_const.TYPE_GRE, 'ip': 'remote_ip'}) self.assertEqual(ofport, 0) def _create_tunnel_port_name(self, tunnel_ip, tunnel_type): tunnel_ip_hex = '%08x' % netaddr.IPAddress(tunnel_ip, version=4) return '%s-%s' % (tunnel_type, tunnel_ip_hex) def test_tunnel_sync_with_valid_ip_address_and_gre_type(self): tunnel_ip = '100.101.102.103' self.agent.tunnel_types = ['gre'] tun_name = self._create_tunnel_port_name(tunnel_ip, self.agent.tunnel_types[0]) fake_tunnel_details = {'tunnels': [{'ip_address': tunnel_ip}]} with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'tunnel_sync', return_value=fake_tunnel_details), mock.patch.object(self.agent, 'setup_tunnel_port') ) as (tunnel_sync_rpc_fn, setup_tunnel_port_fn): self.agent.tunnel_sync() expected_calls = [mock.call(tun_name, tunnel_ip, self.agent.tunnel_types[0])] setup_tunnel_port_fn.assert_has_calls(expected_calls) def test_tunnel_sync_with_valid_ip_address_and_vxlan_type(self): tunnel_ip = '100.101.31.15' self.agent.tunnel_types = ['vxlan'] tun_name = self._create_tunnel_port_name(tunnel_ip, self.agent.tunnel_types[0]) fake_tunnel_details = {'tunnels': [{'ip_address': tunnel_ip}]} with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'tunnel_sync', return_value=fake_tunnel_details), mock.patch.object(self.agent, 'setup_tunnel_port') ) as (tunnel_sync_rpc_fn, setup_tunnel_port_fn): self.agent.tunnel_sync() expected_calls = [mock.call(tun_name, tunnel_ip, self.agent.tunnel_types[0])] setup_tunnel_port_fn.assert_has_calls(expected_calls) def test_tunnel_sync_invalid_ip_address(self): tunnel_ip = '100.100.100.100' self.agent.tunnel_types = ['vxlan'] tun_name = self._create_tunnel_port_name(tunnel_ip, self.agent.tunnel_types[0]) fake_tunnel_details = {'tunnels': [{'ip_address': '300.300.300.300'}, {'ip_address': tunnel_ip}]} with contextlib.nested( mock.patch.object(self.agent.plugin_rpc, 'tunnel_sync', return_value=fake_tunnel_details), mock.patch.object(self.agent, 'setup_tunnel_port') ) as (tunnel_sync_rpc_fn, setup_tunnel_port_fn): self.agent.tunnel_sync() setup_tunnel_port_fn.assert_called_once_with( tun_name, tunnel_ip, self.agent.tunnel_types[0]) def test_tunnel_update(self): tunnel_ip = '10.10.10.10' self.agent.tunnel_types = ['gre'] tun_name = self._create_tunnel_port_name(tunnel_ip, self.agent.tunnel_types[0]) kwargs = {'tunnel_ip': tunnel_ip, 'tunnel_type': self.agent.tunnel_types[0]} self.agent.setup_tunnel_port = mock.Mock() self.agent.enable_tunneling = True self.agent.l2_pop = False self.agent.tunnel_update(context=None, **kwargs) expected_calls = [mock.call(tun_name, tunnel_ip, self.agent.tunnel_types[0])] self.agent.setup_tunnel_port.assert_has_calls(expected_calls) def test__provision_local_vlan_inbound_for_tunnel(self): with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._provision_local_vlan_inbound_for_tunnel(1, 'gre', 3) ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') expected_msg = ofpp.OFPFlowMod( self.agent.tun_br.datapath, instructions=[ ofpp.OFPInstructionActions( ofp.OFPIT_APPLY_ACTIONS, [ ofpp.OFPActionPushVlan(), ofpp.OFPActionSetField(vlan_vid=1 | ofp.OFPVID_PRESENT), ]), ofpp.OFPInstructionGotoTable(table_id=10), ], match=ofpp.OFPMatch(tunnel_id=3), priority=1, table_id=2) sendmsg.assert_has_calls([mock.call(expected_msg)]) def test__provision_local_vlan_outbound(self): with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._provision_local_vlan_outbound(888, 999, 'phys-net1') ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') expected_msg = ofpp.OFPFlowMod( self.agent.phys_brs['phys-net1'].datapath, instructions=[ ofpp.OFPInstructionActions( ofp.OFPIT_APPLY_ACTIONS, [ ofpp.OFPActionSetField(vlan_vid=999), ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0), ] ) ], match=ofpp.OFPMatch( in_port=777, vlan_vid=888 | ofp.OFPVID_PRESENT ), priority=4) sendmsg.assert_has_calls([mock.call(expected_msg)]) def test__provision_local_vlan_inbound(self): with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._provision_local_vlan_inbound(888, 999, 'phys-net1') ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') expected_msg = ofpp.OFPFlowMod( self.agent.int_br.datapath, instructions=[ ofpp.OFPInstructionActions( ofp.OFPIT_APPLY_ACTIONS, [ ofpp.OFPActionSetField( vlan_vid=888 | ofp.OFPVID_PRESENT ), ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0), ] ) ], match=ofpp.OFPMatch(in_port=666, vlan_vid=999), priority=3) sendmsg.assert_has_calls([mock.call(expected_msg)]) def test__reclaim_local_vlan_outbound(self): lvm = mock.Mock() lvm.network_type = p_const.TYPE_VLAN lvm.segmentation_id = 555 lvm.vlan = 444 lvm.physical_network = 'phys-net1' with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._reclaim_local_vlan_outbound(lvm) ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') expected_msg = ofpp.OFPFlowMod( self.agent.phys_brs['phys-net1'].datapath, command=ofp.OFPFC_DELETE, match=ofpp.OFPMatch( in_port=777, vlan_vid=444 | ofp.OFPVID_PRESENT ), out_group=ofp.OFPG_ANY, out_port=ofp.OFPP_ANY, table_id=ofp.OFPTT_ALL) sendmsg.assert_has_calls([mock.call(expected_msg)]) def test__reclaim_local_vlan_inbound(self): lvm = mock.Mock() lvm.network_type = p_const.TYPE_VLAN lvm.segmentation_id = 555 lvm.vlan = 444 lvm.physical_network = 'phys-net1' with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._reclaim_local_vlan_inbound(lvm) ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') expected_msg = ofpp.OFPFlowMod( self.agent.int_br.datapath, command=ofp.OFPFC_DELETE, match=ofpp.OFPMatch( in_port=666, vlan_vid=555 | ofp.OFPVID_PRESENT ), out_group=ofp.OFPG_ANY, out_port=ofp.OFPP_ANY, table_id=ofp.OFPTT_ALL) sendmsg.assert_has_calls([mock.call(expected_msg)]) def test__provision_local_vlan_outbound_flat(self): ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._provision_local_vlan_outbound(888, ofp.OFPVID_NONE, 'phys-net1') expected_msg = ofpp.OFPFlowMod( self.agent.phys_brs['phys-net1'].datapath, instructions=[ ofpp.OFPInstructionActions( ofp.OFPIT_APPLY_ACTIONS, [ ofpp.OFPActionPopVlan(), ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0), ] ) ], match=ofpp.OFPMatch( in_port=777, vlan_vid=888 | ofp.OFPVID_PRESENT ), priority=4) sendmsg.assert_has_calls([mock.call(expected_msg)]) def test__provision_local_vlan_inbound_flat(self): ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._provision_local_vlan_inbound(888, ofp.OFPVID_NONE, 'phys-net1') expected_msg = ofpp.OFPFlowMod( self.agent.int_br.datapath, instructions=[ ofpp.OFPInstructionActions( ofp.OFPIT_APPLY_ACTIONS, [ ofpp.OFPActionPushVlan(), ofpp.OFPActionSetField( vlan_vid=888 | ofp.OFPVID_PRESENT ), ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0), ] ) ], match=ofpp.OFPMatch(in_port=666, vlan_vid=ofp.OFPVID_NONE), priority=3) sendmsg.assert_has_calls([mock.call(expected_msg)]) def test__reclaim_local_vlan_outbound_flat(self): lvm = mock.Mock() lvm.network_type = p_const.TYPE_FLAT lvm.segmentation_id = 555 lvm.vlan = 444 lvm.physical_network = 'phys-net1' with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._reclaim_local_vlan_outbound(lvm) ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') expected_msg = ofpp.OFPFlowMod( self.agent.phys_brs['phys-net1'].datapath, command=ofp.OFPFC_DELETE, match=ofpp.OFPMatch( in_port=777, vlan_vid=444 | ofp.OFPVID_PRESENT ), out_group=ofp.OFPG_ANY, out_port=ofp.OFPP_ANY, table_id=ofp.OFPTT_ALL) sendmsg.assert_has_calls([mock.call(expected_msg)]) def test__reclaim_local_vlan_inbound_flat(self): lvm = mock.Mock() lvm.network_type = p_const.TYPE_FLAT lvm.segmentation_id = 555 lvm.vlan = 444 lvm.physical_network = 'phys-net1' with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg: self.agent._reclaim_local_vlan_inbound(lvm) ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3') ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser') expected_msg = ofpp.OFPFlowMod( self.agent.int_br.datapath, command=ofp.OFPFC_DELETE, match=ofpp.OFPMatch( in_port=666, vlan_vid=ofp.OFPVID_NONE ), out_group=ofp.OFPG_ANY, out_port=ofp.OFPP_ANY, table_id=ofp.OFPTT_ALL) sendmsg.assert_has_calls([mock.call(expected_msg)]) class AncillaryBridgesTest(OFAAgentTestCase): def setUp(self): super(AncillaryBridgesTest, self).setUp() notifier_p = mock.patch(NOTIFIER) notifier_cls = notifier_p.start() self.notifier = mock.Mock() notifier_cls.return_value = self.notifier # Avoid rpc initialization for unit tests cfg.CONF.set_override('rpc_backend', 'neutron.openstack.common.rpc.impl_fake') cfg.CONF.set_override('report_interval', 0, 'AGENT') self.kwargs = self.mod_agent.create_agent_config_map(cfg.CONF) def _test_ancillary_bridges(self, bridges, ancillary): device_ids = ancillary[:] def pullup_side_effect(self, *args): result = device_ids.pop(0) return result with contextlib.nested( mock.patch.object(self.mod_agent.OFANeutronAgent, 'setup_integration_br', return_value=mock.Mock()), mock.patch('neutron.agent.linux.utils.get_interface_mac', return_value='00:00:00:00:00:01'), mock.patch.object(self.mod_agent.OVSBridge, 'get_local_port_mac', return_value='00:00:00:00:00:01'), mock.patch('neutron.agent.linux.ovs_lib.get_bridges', return_value=bridges), mock.patch( 'neutron.agent.linux.ovs_lib.get_bridge_external_bridge_id', side_effect=pullup_side_effect)): self.agent = self.mod_agent.OFANeutronAgent( self.ryuapp, **self.kwargs) self.assertEqual(len(ancillary), len(self.agent.ancillary_brs)) if ancillary: bridges = [br.br_name for br in self.agent.ancillary_brs] for br in ancillary: self.assertIn(br, bridges) def test_ancillary_bridges_single(self): bridges = ['br-int', 'br-ex'] self._test_ancillary_bridges(bridges, ['br-ex']) def test_ancillary_bridges_none(self): bridges = ['br-int'] self._test_ancillary_bridges(bridges, []) def test_ancillary_bridges_multiple(self): bridges = ['br-int', 'br-ex1', 'br-ex2'] self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])