From ccdf211b4cf224d415520c7d70b7f53952674414 Mon Sep 17 00:00:00 2001 From: rossella Date: Tue, 1 Dec 2015 16:58:44 +0000 Subject: [PATCH] Revert "Revert "OVS agent reacts to events instead of polling"" The original change had to be reverted because it caused tests failing in the gate. The failures were due to the fact that when a port was not ready, an exception was thrown to trigger a full resync of the agent. This behavior was meant to be temporary and was fixed in a dependent patch that was not merged though. This revert moves the handling of not ready ports in this patch. It also refactors the unit tests a bit. This reverts commit e7270d9505fe09ec6d687e7796a814bf883b5416. Change-Id: I6574cef3c95525ace6a98cf968ee159190681394 --- .../openvswitch/agent/ovs_neutron_agent.py | 205 +++++++++++++++--- neutron/tests/functional/agent/l2/base.py | 34 ++- .../functional/agent/test_l2_ovs_agent.py | 3 +- .../agent/test_ovs_neutron_agent.py | 150 ++++++++++++- .../openvswitch/agent/test_ovs_tunnel.py | 43 ++-- 5 files changed, 380 insertions(+), 55 deletions(-) diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index e7a27b2a59e..b86f8ad264c 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -32,7 +32,9 @@ from neutron.agent.common import ovs_lib from neutron.agent.common import polling from neutron.agent.common import utils from neutron.agent.l2.extensions import manager as ext_manager +from neutron.agent.linux import async_process from neutron.agent.linux import ip_lib +from neutron.agent.linux import polling as linux_polling from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc @@ -149,7 +151,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, agent_conf = self.conf.AGENT ovs_conf = self.conf.OVS - self.fullsync = True + self.fullsync = False # init bridge classes with configured datapath type. self.br_int_cls, self.br_phys_cls, self.br_tun_cls = ( functools.partial(bridge_classes[b], @@ -1175,6 +1177,100 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_info['removed'] = registered_ports - cur_ports return port_info + def process_ports_events(self, events, registered_ports, ancillary_ports, + old_ports_not_ready, updated_ports=None): + port_info = {} + port_info['added'] = set() + port_info['removed'] = set() + port_info['current'] = registered_ports + + ancillary_port_info = {} + ancillary_port_info['added'] = set() + ancillary_port_info['removed'] = set() + ancillary_port_info['current'] = ancillary_ports + ports_not_ready_yet = set() + + # if a port was added and then removed or viceversa since the agent + # can't know the order of the operations, check the status of the port + # to determine if the port was added or deleted + ports_removed_and_added = [ + p for p in events['added'] if p in events['removed']] + for p in ports_removed_and_added: + if ovs_lib.BaseOVS().port_exists(p['name']): + events['removed'].remove(p) + else: + events['added'].remove(p) + + #TODO(rossella_s): scanning the ancillary bridge won't be needed + # anymore when https://review.openstack.org/#/c/203381 since the bridge + # id stored in external_ids will be used to identify the bridge the + # port belongs to + cur_ancillary_ports = set() + for bridge in self.ancillary_brs: + cur_ancillary_ports |= bridge.get_vif_port_set() + cur_ancillary_ports |= ancillary_port_info['current'] + + def _process_port(port, ports, ancillary_ports): + # check 'iface-id' is set otherwise is not a port + # the agent should care about + if 'attached-mac' in port.get('external_ids', []): + iface_id = self.int_br.portid_from_external_ids( + port['external_ids']) + if iface_id: + if port['ofport'] == ovs_lib.UNASSIGNED_OFPORT: + LOG.debug("Port %s not ready yet on the bridge", + iface_id) + ports_not_ready_yet.add(port['name']) + return + # check if port belongs to ancillary bridge + if iface_id in cur_ancillary_ports: + ancillary_ports.add(iface_id) + else: + ports.add(iface_id) + if old_ports_not_ready: + old_ports_not_ready_attrs = self.int_br.get_ports_attributes( + 'Interface', columns=['name', 'external_ids', 'ofport'], + ports=old_ports_not_ready, if_exists=True) + now_ready_ports = set( + [p['name'] for p in old_ports_not_ready_attrs]) + LOG.debug("Ports %s are now ready", now_ready_ports) + old_ports_not_ready_yet = old_ports_not_ready - now_ready_ports + removed_ports = set([p['name'] for p in events['removed']]) + old_ports_not_ready_yet -= removed_ports + LOG.debug("Ports %s were not ready at last iteration and are not " + "ready yet", old_ports_not_ready_yet) + ports_not_ready_yet |= old_ports_not_ready_yet + events['added'].extend(old_ports_not_ready_attrs) + + for port in events['added']: + _process_port(port, port_info['added'], + ancillary_port_info['added']) + for port in events['removed']: + _process_port(port, port_info['removed'], + ancillary_port_info['removed']) + + if updated_ports is None: + updated_ports = set() + updated_ports.update(self.check_changed_vlans()) + + # Disregard devices that were never noticed by the agent + port_info['removed'] &= port_info['current'] + port_info['current'] |= port_info['added'] + port_info['current'] -= port_info['removed'] + + ancillary_port_info['removed'] &= ancillary_port_info['current'] + ancillary_port_info['current'] |= ancillary_port_info['added'] + ancillary_port_info['current'] -= ancillary_port_info['removed'] + + if updated_ports: + # Some updated ports might have been removed in the + # meanwhile, and therefore should not be processed. + # In this case the updated port won't be found among + # current ports. + updated_ports &= port_info['current'] + port_info['updated'] = updated_ports + return port_info, ancillary_port_info, ports_not_ready_yet + def scan_ports(self, registered_ports, sync, updated_ports=None): cur_ports = self.int_br.get_vif_port_set() self.int_br_device_count = len(cur_ports) @@ -1618,11 +1714,64 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name) bridge.cleanup_flows() + def process_port_info(self, start, polling_manager, sync, ovs_restarted, + ports, ancillary_ports, updated_ports_copy, + consecutive_resyncs, ports_not_ready_yet): + # There are polling managers that don't have get_events, e.g. + # AlwaysPoll used by windows implementations + # REVISIT (rossella_s) This needs to be reworked to hide implementation + # details regarding polling in BasePollingManager subclasses + if sync or not (hasattr(polling_manager, 'get_events')): + if sync: + LOG.info(_LI("Agent out of sync with plugin!")) + consecutive_resyncs = consecutive_resyncs + 1 + if (consecutive_resyncs >= + constants.MAX_DEVICE_RETRIES): + LOG.warn(_LW( + "Clearing cache of registered ports," + " retries to resync were > %s"), + constants.MAX_DEVICE_RETRIES) + ports.clear() + ancillary_ports.clear() + consecutive_resyncs = 0 + else: + consecutive_resyncs = 0 + + # NOTE(rossella_s) don't empty the queue of events + # calling polling_manager.get_events() since + # the agent might miss some event (for example a port + # deletion) + reg_ports = (set() if ovs_restarted else ports) + port_info = self.scan_ports(reg_ports, sync, + updated_ports_copy) + # Treat ancillary devices if they exist + if self.ancillary_brs: + ancillary_port_info = self.scan_ancillary_ports( + ancillary_ports, sync) + LOG.debug("Agent rpc_loop - iteration:%(iter_num)d" + " - ancillary port info retrieved. " + "Elapsed:%(elapsed).3f", + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + else: + ancillary_port_info = {} + + else: + consecutive_resyncs = 0 + events = polling_manager.get_events() + port_info, ancillary_port_info, ports_not_ready_yet = ( + self.process_ports_events(events, ports, ancillary_ports, + ports_not_ready_yet, + updated_ports_copy)) + return (port_info, ancillary_port_info, consecutive_resyncs, + ports_not_ready_yet) + def rpc_loop(self, polling_manager=None): if not polling_manager: polling_manager = polling.get_polling_manager( minimize_polling=False) - sync = True + + sync = False ports = set() updated_ports_copy = set() ancillary_ports = set() @@ -1630,6 +1779,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, ovs_restarted = False consecutive_resyncs = 0 need_clean_stale_flow = True + ports_not_ready_yet = set() while self._check_and_handle_signal(): if self.fullsync: LOG.info(_LI("rpc_loop doing a full sync.")) @@ -1640,20 +1790,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, start = time.time() LOG.debug("Agent rpc_loop - iteration:%d started", self.iter_num) - if sync: - LOG.info(_LI("Agent out of sync with plugin!")) - polling_manager.force_polling() - consecutive_resyncs = consecutive_resyncs + 1 - if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES: - LOG.warn(_LW("Clearing cache of registered ports, retrials" - " to resync were > %s"), - constants.MAX_DEVICE_RETRIES) - ports.clear() - ancillary_ports.clear() - sync = False - consecutive_resyncs = 0 - else: - consecutive_resyncs = 0 ovs_status = self.check_ovs_status() if ovs_status == constants.OVS_RESTARTED: self.setup_integration_br() @@ -1669,6 +1805,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.patch_tun_ofport) self.dvr_agent.reset_dvr_parameters() self.dvr_agent.setup_dvr_flows() + # restart the polling manager so that it will signal as added + # all the current ports + # REVISIT (rossella_s) Define a method "reset" in + # BasePollingManager that will be implemented by AlwaysPoll as + # no action and by InterfacePollingMinimizer as start/stop + if isinstance( + polling_manager, linux_polling.InterfacePollingMinimizer): + # There's a possible race here, when ovsdb-server is + # restarted ovsdb monitor will also be restarted + try: + polling_manager.stop() + except async_process.AsyncProcessException: + LOG.debug("OVSDB monitor was not running") + polling_manager.start() elif ovs_status == constants.OVS_DEAD: # Agent doesn't apply any operations when ovs is dead, to # prevent unexpected failure or crash. Sleep and continue @@ -1685,7 +1835,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.exception(_LE("Error while synchronizing tunnels")) tunnel_sync = True ovs_restarted |= (ovs_status == constants.OVS_RESTARTED) - if self._agent_has_updates(polling_manager) or ovs_restarted: + if (self._agent_has_updates(polling_manager) or sync + or ports_not_ready_yet): try: LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - " "starting polling. Elapsed:%(elapsed).3f", @@ -1697,9 +1848,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # between these two statements, this will be thread-safe updated_ports_copy = self.updated_ports self.updated_ports = set() - reg_ports = (set() if ovs_restarted else ports) - port_info = self.scan_ports(reg_ports, sync, - updated_ports_copy) + (port_info, ancillary_port_info, consecutive_resyncs, + ports_not_ready_yet) = (self.process_port_info( + start, polling_manager, sync, ovs_restarted, + ports, ancillary_ports, updated_ports_copy, + consecutive_resyncs, ports_not_ready_yet) + ) + sync = False self.process_deleted_ports(port_info) ofport_changed_ports = self.update_stale_ofport_rules() if ofport_changed_ports: @@ -1710,16 +1865,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, "Elapsed:%(elapsed).3f", {'iter_num': self.iter_num, 'elapsed': time.time() - start}) - # Treat ancillary devices if they exist - if self.ancillary_brs: - ancillary_port_info = self.scan_ancillary_ports( - ancillary_ports, sync) - LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - " - "ancillary port info retrieved. " - "Elapsed:%(elapsed).3f", - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - sync = False # Secure and wire/unwire VIFs and update their status # on Neutron server if (self._port_info_has_changes(port_info) or diff --git a/neutron/tests/functional/agent/l2/base.py b/neutron/tests/functional/agent/l2/base.py index 37159555190..39f1a755d82 100644 --- a/neutron/tests/functional/agent/l2/base.py +++ b/neutron/tests/functional/agent/l2/base.py @@ -117,13 +117,38 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): if tunnel_types: self.addCleanup(self.ovs.delete_bridge, self.br_tun) agent.sg_agent = mock.Mock() + agent.ancillary_brs = [] return agent - def start_agent(self, agent, unplug_ports=None): + def _mock_get_events(self, agent, polling_manager, ports): + get_events = polling_manager.get_events + p_ids = [p['id'] for p in ports] + + def filter_events(): + events = get_events() + filtered_ports = [] + for dev in events['added']: + iface_id = agent.int_br.portid_from_external_ids( + dev.get('external_ids', [])) + if iface_id in p_ids: + # if the event is not about a port that was created by + # this test, we filter the event out. Since these tests are + # not run in isolation processing all the events might make + # some test fail ( e.g. the agent might keep resycing + # because it keeps finding not ready ports that are created + # by other tests) + filtered_ports.append(dev) + return {'added': filtered_ports, 'removed': events['removed']} + polling_manager.get_events = mock.Mock(side_effect=filter_events) + + def start_agent(self, agent, ports=None, unplug_ports=None): if unplug_ports is None: unplug_ports = [] + if ports is None: + ports = [] self.setup_agent_rpc_mocks(agent, unplug_ports) polling_manager = polling.InterfacePollingMinimizer() + self._mock_get_events(agent, polling_manager, ports) self.addCleanup(polling_manager.stop) polling_manager.start() agent_utils.wait_until_true( @@ -137,6 +162,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): rpc_loop_thread.wait() self.addCleanup(stop_agent, agent, t) + return polling_manager def _create_test_port_dict(self): return {'id': uuidutils.generate_uuid(), @@ -280,10 +306,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): def setup_agent_and_ports(self, port_dicts, create_tunnels=True, trigger_resync=False): - self.agent = self.create_agent(create_tunnels=create_tunnels) - self.start_agent(self.agent) - self.network = self._create_test_network_dict() self.ports = port_dicts + self.agent = self.create_agent(create_tunnels=create_tunnels) + self.polling_manager = self.start_agent(self.agent, ports=self.ports) + self.network = self._create_test_network_dict() if trigger_resync: self._prepare_resync_trigger(self.agent) self._plug_ports(self.network, self.ports, self.agent) diff --git a/neutron/tests/functional/agent/test_l2_ovs_agent.py b/neutron/tests/functional/agent/test_l2_ovs_agent.py index b57a8f93158..ffdc0ff04e9 100644 --- a/neutron/tests/functional/agent/test_l2_ovs_agent.py +++ b/neutron/tests/functional/agent/test_l2_ovs_agent.py @@ -114,7 +114,8 @@ class TestOVSAgent(base.OVSAgentTestFramework): self.agent = self.create_agent(create_tunnels=False) self.network = self._create_test_network_dict() self._plug_ports(self.network, self.ports, self.agent) - self.start_agent(self.agent, unplug_ports=[self.ports[1]]) + self.start_agent(self.agent, ports=self.ports, + unplug_ports=[self.ports[1]]) self.wait_until_ports_state([self.ports[0]], up=True) self.assertRaises( Timeout, self.wait_until_ports_state, [self.ports[1]], up=True, diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index 205dda82473..b555943e68e 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -390,6 +390,133 @@ class TestOvsNeutronAgent(object): updated_ports) self.assertEqual(expected, actual) + def _test_process_ports_events(self, events, registered_ports, + ancillary_ports, expected_ports, + expected_ancillary, updated_ports=None): + with mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + devices_not_ready_yet = set() + actual = self.agent.process_ports_events( + events, registered_ports, ancillary_ports, + devices_not_ready_yet, updated_ports) + self.assertEqual( + (expected_ports, expected_ancillary, devices_not_ready_yet), + actual) + + def test_process_ports_events_returns_current_for_unchanged_ports(self): + events = {'added': [], 'removed': []} + registered_ports = {1, 3} + ancillary_ports = {2, 5} + expected_ports = {'current': registered_ports, 'added': set(), + 'removed': set()} + expected_ancillary = {'current': ancillary_ports, 'added': set(), + 'removed': set()} + self._test_process_ports_events(events, registered_ports, + ancillary_ports, expected_ports, + expected_ancillary) + + def test_process_port_events_no_vif_changes_return_updated_port_only(self): + events = {'added': [], 'removed': []} + registered_ports = {1, 2, 3} + updated_ports = {2} + expected_ports = dict(current=registered_ports, updated={2}, + added=set(), removed=set()) + expected_ancillary = dict(current=set(), added=set(), removed=set()) + self._test_process_ports_events(events, registered_ports, + set(), expected_ports, + expected_ancillary, updated_ports) + + def test_process_port_events_ignores_removed_port_if_never_added(self): + events = {'added': [], + 'removed': [{'name': 'port2', 'ofport': 2, + 'external_ids': {'attached-mac': 'test-mac'}}]} + registered_ports = {1} + expected_ports = dict(current=registered_ports, added=set(), + removed=set()) + expected_ancillary = dict(current=set(), added=set(), removed=set()) + devices_not_ready_yet = set() + with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', + side_effect=[2]), \ + mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + actual = self.agent.process_ports_events( + events, registered_ports, set(), devices_not_ready_yet) + self.assertEqual( + (expected_ports, expected_ancillary, devices_not_ready_yet), + actual) + + def test_process_port_events_port_not_ready_yet(self): + events = {'added': [{'name': 'port5', 'ofport': [], + 'external_ids': {'attached-mac': 'test-mac'}}], + 'removed': []} + old_devices_not_ready = {'port4'} + registered_ports = set([1, 2, 3]) + expected_ports = dict(current=set([1, 2, 3, 4]), + added=set([4]), removed=set()) + self.agent.ancillary_brs = [] + expected_ancillary = dict(current=set(), added=set(), removed=set()) + with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', + side_effect=[5, 4]), \ + mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()), \ + mock.patch.object(self.agent.int_br, 'get_ports_attributes', + return_value=[{'name': 'port4', 'ofport': 4, + 'external_ids': { + 'attached-mac': 'mac4'}}]): + expected_devices_not_ready = {'port5'} + actual = self.agent.process_ports_events( + events, registered_ports, set(), old_devices_not_ready) + self.assertEqual( + (expected_ports, expected_ancillary, + expected_devices_not_ready), + actual) + + def _test_process_port_events_with_updated_ports(self, updated_ports): + events = {'added': [{'name': 'port3', 'ofport': 3, + 'external_ids': {'attached-mac': 'test-mac'}}, + {'name': 'qg-port2', 'ofport': 6, + 'external_ids': {'attached-mac': 'test-mac'}}], + 'removed': [{'name': 'port2', 'ofport': 2, + 'external_ids': {'attached-mac': 'test-mac'}}, + {'name': 'qg-port1', 'ofport': 5, + 'external_ids': {'attached-mac': 'test-mac'}}]} + registered_ports = {1, 2, 4} + ancillary_ports = {5, 8} + expected_ports = dict(current={1, 3, 4}, added={3}, removed={2}) + if updated_ports: + expected_ports['updated'] = updated_ports + expected_ancillary = dict(current={6, 8}, added={6}, + removed={5}) + ancillary_bridge = mock.Mock() + ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8} + self.agent.ancillary_brs = [ancillary_bridge] + with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', + side_effect=[3, 6, 2, 5]), \ + mock.patch.object(self.agent, 'check_changed_vlans', + return_value=set()): + + devices_not_ready_yet = set() + actual = self.agent.process_ports_events( + events, registered_ports, ancillary_ports, + devices_not_ready_yet, updated_ports) + self.assertEqual( + (expected_ports, expected_ancillary, devices_not_ready_yet), + actual) + + def test_process_port_events_returns_port_changes(self): + self._test_process_port_events_with_updated_ports(set()) + + def test_process_port_events_finds_known_updated_ports(self): + self._test_process_port_events_with_updated_ports({4}) + + def test_process_port_events_ignores_unknown_updated_ports(self): + # the port '10' was not seen on current ports. Hence it has either + # never been wired or already removed and should be ignored + self._test_process_port_events_with_updated_ports({4, 10}) + + def test_process_port_events_ignores_updated_port_if_removed(self): + self._test_process_port_events_with_updated_ports({4, 5}) + def test_update_ports_returns_changed_vlan(self): br = self.br_int_cls('br-int') mac = "ca:fe:de:ad:be:ef" @@ -1355,11 +1482,17 @@ class TestOvsNeutronAgent(object): 'added': set([]), 'removed': set(['tap0'])} + reply_ancillary = {'current': set([]), + 'added': set([]), + 'removed': set([])} + with mock.patch.object(async_process.AsyncProcess, "_spawn"),\ + mock.patch.object(async_process.AsyncProcess, "start"),\ + mock.patch.object(async_process.AsyncProcess, "stop"),\ mock.patch.object(log.KeywordArgumentAdapter, 'exception') as log_exception,\ mock.patch.object(self.mod_agent.OVSNeutronAgent, - 'scan_ports') as scan_ports,\ + 'process_ports_events') as process_p_events,\ mock.patch.object( self.mod_agent.OVSNeutronAgent, 'process_network_ports') as process_network_ports,\ @@ -1377,7 +1510,11 @@ class TestOvsNeutronAgent(object): 'cleanup_stale_flows') as cleanup: log_exception.side_effect = Exception( 'Fake exception to get out of the loop') - scan_ports.side_effect = [reply2, reply3] + devices_not_ready = set() + process_p_events.side_effect = [(reply2, reply_ancillary, + devices_not_ready), + (reply3, reply_ancillary, + devices_not_ready)] process_network_ports.side_effect = [ False, Exception('Fake exception to get out of the loop')] check_ovs_status.side_effect = args @@ -1386,10 +1523,13 @@ class TestOvsNeutronAgent(object): except Exception: pass - scan_ports.assert_has_calls([ - mock.call(set(), True, set()), - mock.call(set(), False, set()) + process_p_events.assert_has_calls([ + mock.call({'removed': [], 'added': []}, set(), set(), set(), + set()), + mock.call({'removed': [], 'added': []}, set(['tap0']), set(), + set(), set()) ]) + process_network_ports.assert_has_calls([ mock.call(reply2, False), mock.call(reply3, True) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py index 76210ce31da..5bf9cd20fc2 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py @@ -509,14 +509,24 @@ class TunnelTest(object): self._verify_mock_calls() def test_daemon_loop(self): - reply2 = {'current': set(['tap0']), - 'added': set(['tap2']), - 'removed': set([])} + reply_ge_1 = {'added': set(['tap0']), + 'removed': set([])} - reply3 = {'current': set(['tap2']), - 'added': set([]), + reply_ge_2 = {'added': set([]), 'removed': set(['tap0'])} + reply_pe_1 = {'current': set(['tap0']), + 'added': set(['tap0']), + 'removed': set([])} + + reply_pe_2 = {'current': set([]), + 'added': set([]), + 'removed': set(['tap0'])} + + reply_ancillary = {'current': set([]), + 'added': set([]), + 'removed': set([])} + self.mock_int_bridge_expected += [ mock.call.check_canary_table(), mock.call.check_canary_table() @@ -527,7 +537,7 @@ class TunnelTest(object): with mock.patch.object(log.KeywordArgumentAdapter, 'exception') as log_exception,\ mock.patch.object(self.mod_agent.OVSNeutronAgent, - 'scan_ports') as scan_ports,\ + 'process_ports_events') as process_p_events,\ mock.patch.object( self.mod_agent.OVSNeutronAgent, 'process_network_ports') as process_network_ports,\ @@ -542,8 +552,13 @@ class TunnelTest(object): 'cleanup_stale_flows') as cleanup: log_exception.side_effect = Exception( 'Fake exception to get out of the loop') - scan_ports.side_effect = [reply2, reply3] update_stale.return_value = [] + devices_not_ready = set() + process_p_events.side_effect = [ + (reply_pe_1, reply_ancillary, devices_not_ready), + (reply_pe_2, reply_ancillary, devices_not_ready)] + interface_polling = mock.Mock() + interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2] process_network_ports.side_effect = [ False, Exception('Fake exception to get out of the loop')] @@ -553,7 +568,7 @@ class TunnelTest(object): # We start method and expect it will raise after 2nd loop # If something goes wrong, assert_has_calls below will catch it try: - n_agent.daemon_loop() + n_agent.rpc_loop(interface_polling) except Exception: pass @@ -561,17 +576,15 @@ class TunnelTest(object): # messages log_exception.assert_called_once_with( "Error while processing VIF ports") - scan_ports.assert_has_calls([ - mock.call(set(), True, set()), - mock.call(set(['tap0']), False, set()) + process_p_events.assert_has_calls([ + mock.call(reply_ge_1, set(), set(), devices_not_ready, set()), + mock.call(reply_ge_2, set(['tap0']), set(), devices_not_ready, + set()) ]) process_network_ports.assert_has_calls([ mock.call({'current': set(['tap0']), 'removed': set([]), - 'added': set(['tap2'])}, False), - mock.call({'current': set(['tap2']), - 'removed': set(['tap0']), - 'added': set([])}, False) + 'added': set(['tap0'])}, False), ]) cleanup.assert_called_once_with()