diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 8078b4dafb7..230fbb20dfa 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -34,7 +34,6 @@ from neutron.agent.common import polling from neutron.agent.common import utils from neutron.agent.l2.extensions import manager as ext_manager from neutron.agent.linux import ip_lib -from neutron.agent.linux import polling as linux_polling from neutron.agent import rpc as agent_rpc from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc @@ -1213,88 +1212,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, port_info['removed'] = registered_ports - cur_ports return port_info - def process_ports_events(self, events, registered_ports, ancillary_ports, - updated_ports=None): - port_info = {} - port_info['added'] = set() - port_info['removed'] = set() - port_info['current'] = registered_ports - - ancillary_port_info = {} - ancillary_port_info['added'] = set() - ancillary_port_info['removed'] = set() - ancillary_port_info['current'] = ( - ancillary_ports if ancillary_ports else set()) - - # if a port was added and then removed or viceversa since the agent - # can't know the order of the operations, check the status of the port - # to determine if the port was added or deleted - device_removed_or_added = [ - dev for dev in events['added'] if dev in events['removed']] - for device in device_removed_or_added: - if ovs_lib.BaseOVS().port_exists(device['name']): - events['removed'].remove(device) - else: - events['added'].remove(device) - - #TODO(rossella_s): scanning the ancillary bridge won't be needed - # anymore when https://review.openstack.org/#/c/203381 since the bridge - # id stored in external_ids will be used to identify the bridge the - # port belongs to - cur_ancillary_ports = set() - for bridge in self.ancillary_brs: - cur_ancillary_ports |= bridge.get_vif_port_set() - cur_ancillary_ports |= ancillary_port_info['current'] - - def _process_device(device, devices, ancillary_devices): - # check 'iface-id' is set otherwise is not a port - # the agent should care about - if 'attached-mac' in device.get('external_ids', []): - iface_id = self.int_br.portid_from_external_ids( - device['external_ids']) - if iface_id: - if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT: - #TODO(rossella_s) it's extreme to trigger a full resync - # if a port is not ready, resync only the device that - # is not ready - raise Exception( - _("Port %s is not ready, resync needed") % device[ - 'name']) - # check if device belong to ancillary bridge - if iface_id in cur_ancillary_ports: - ancillary_devices.add(iface_id) - else: - devices.add(iface_id) - - for device in events['added']: - _process_device(device, port_info['added'], - ancillary_port_info['added']) - for device in events['removed']: - _process_device(device, port_info['removed'], - ancillary_port_info['removed']) - - if updated_ports is None: - updated_ports = set() - updated_ports.update(self.check_changed_vlans()) - - # Disregard devices that were never noticed by the agent - port_info['removed'] &= port_info['current'] - port_info['current'] |= port_info['added'] - port_info['current'] -= port_info['removed'] - - ancillary_port_info['removed'] &= ancillary_port_info['current'] - ancillary_port_info['current'] |= ancillary_port_info['added'] - ancillary_port_info['current'] -= ancillary_port_info['removed'] - - if updated_ports: - # Some updated ports might have been removed in the - # meanwhile, and therefore should not be processed. - # In this case the updated port won't be found among - # current ports. - updated_ports &= port_info['current'] - port_info['updated'] = updated_ports - return port_info, ancillary_port_info - def scan_ports(self, registered_ports, sync, updated_ports=None): cur_ports = self.int_br.get_vif_port_set() self.int_br_device_count = len(cur_ports) @@ -1739,64 +1656,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name) bridge.cleanup_flows() - def process_port_info(self, start, polling_manager, sync, ovs_restarted, - ports, ancillary_ports, updated_ports_copy, - consecutive_resyncs): - # There are polling managers that don't have get_events, e.g. - # AlwaysPoll used by windows implementations - # REVISIT (rossella_s) This needs to be reworked to hide implementation - # details regarding polling in BasePollingManager subclasses - if sync or not (hasattr(polling_manager, 'get_events')): - if sync: - LOG.info(_LI("Agent out of sync with plugin!")) - consecutive_resyncs = consecutive_resyncs + 1 - if (consecutive_resyncs >= - constants.MAX_DEVICE_RETRIES): - LOG.warn(_LW( - "Clearing cache of registered ports," - " retries to resync were > %s"), - constants.MAX_DEVICE_RETRIES) - ports.clear() - ancillary_ports.clear() - consecutive_resyncs = 0 - else: - consecutive_resyncs = 0 - - # NOTE(rossella_s) don't empty the queue of events - # calling polling_manager.get_events() since - # the agent might miss some event (for example a port - # deletion) - reg_ports = (set() if ovs_restarted else ports) - port_info = self.scan_ports(reg_ports, sync, - updated_ports_copy) - # Treat ancillary devices if they exist - if self.ancillary_brs: - ancillary_port_info = self.scan_ancillary_ports( - ancillary_ports, sync) - LOG.debug("Agent rpc_loop - iteration:%(iter_num)d" - " - ancillary port info retrieved. " - "Elapsed:%(elapsed).3f", - {'iter_num': self.iter_num, - 'elapsed': time.time() - start}) - else: - ancillary_port_info = {} - - else: - consecutive_resyncs = 0 - events = polling_manager.get_events() - ancillary_ports = ( - ancillary_ports if self.ancillary_brs else None) - port_info, ancillary_port_info = ( - self.process_ports_events(events, ports, - ancillary_ports, updated_ports_copy)) - return port_info, ancillary_port_info, consecutive_resyncs - def rpc_loop(self, polling_manager=None): if not polling_manager: polling_manager = polling.get_polling_manager( minimize_polling=False) - sync = False + sync = True ports = set() updated_ports_copy = set() ancillary_ports = set() @@ -1809,6 +1674,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, start = time.time() LOG.debug("Agent rpc_loop - iteration:%d started", self.iter_num) + if sync: + LOG.info(_LI("Agent out of sync with plugin!")) + polling_manager.force_polling() + consecutive_resyncs = consecutive_resyncs + 1 + if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES: + LOG.warn(_LW("Clearing cache of registered ports, retrials" + " to resync were > %s"), + constants.MAX_DEVICE_RETRIES) + ports.clear() + ancillary_ports.clear() + sync = False + consecutive_resyncs = 0 + else: + consecutive_resyncs = 0 ovs_status = self.check_ovs_status() if ovs_status == constants.OVS_RESTARTED: self.setup_integration_br() @@ -1824,15 +1703,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, self.patch_tun_ofport) self.dvr_agent.reset_dvr_parameters() self.dvr_agent.setup_dvr_flows() - # restart the polling manager so that it will signal as added - # all the current ports - # REVISIT (rossella_s) Define a method "reset" in - # BasePollingManager that will be implemented by AlwaysPoll as - # no action and by InterfacePollingMinimizer as start/stop - if isinstance( - polling_manager, linux_polling.InterfacePollingMinimizer): - polling_manager.stop() - polling_manager.start() elif ovs_status == constants.OVS_DEAD: # Agent doesn't apply any operations when ovs is dead, to # prevent unexpected failure or crash. Sleep and continue @@ -1849,7 +1719,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, LOG.exception(_LE("Error while synchronizing tunnels")) tunnel_sync = True ovs_restarted |= (ovs_status == constants.OVS_RESTARTED) - if self._agent_has_updates(polling_manager) or sync: + if self._agent_has_updates(polling_manager) or ovs_restarted: try: LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - " "starting polling. Elapsed:%(elapsed).3f", @@ -1861,13 +1731,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, # between these two statements, this will be thread-safe updated_ports_copy = self.updated_ports self.updated_ports = set() - port_info, ancillary_port_info, consecutive_resyncs = ( - self.process_port_info( - start, polling_manager, sync, ovs_restarted, - ports, ancillary_ports, updated_ports_copy, - consecutive_resyncs) - ) - + reg_ports = (set() if ovs_restarted else ports) + port_info = self.scan_ports(reg_ports, sync, + updated_ports_copy) self.process_deleted_ports(port_info) ofport_changed_ports = self.update_stale_ofport_rules() if ofport_changed_ports: @@ -1878,6 +1744,16 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin, "Elapsed:%(elapsed).3f", {'iter_num': self.iter_num, 'elapsed': time.time() - start}) + # Treat ancillary devices if they exist + if self.ancillary_brs: + ancillary_port_info = self.scan_ancillary_ports( + ancillary_ports, sync) + LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - " + "ancillary port info retrieved. " + "Elapsed:%(elapsed).3f", + {'iter_num': self.iter_num, + 'elapsed': time.time() - start}) + sync = False # Secure and wire/unwire VIFs and update their status # on Neutron server if (self._port_info_has_changes(port_info) or diff --git a/neutron/tests/functional/agent/l2/base.py b/neutron/tests/functional/agent/l2/base.py index 9fa7540d467..39e511b11df 100644 --- a/neutron/tests/functional/agent/l2/base.py +++ b/neutron/tests/functional/agent/l2/base.py @@ -118,38 +118,13 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): if tunnel_types: self.addCleanup(self.ovs.delete_bridge, self.br_tun) agent.sg_agent = mock.Mock() - agent.ancillary_brs = None return agent - def _mock_get_events(self, agent, polling_manager, ports): - get_events = polling_manager.get_events - p_ids = [p['id'] for p in ports] - - def filter_events(): - events = get_events() - filtered_ports = [] - for dev in events['added']: - iface_id = agent.int_br.portid_from_external_ids( - dev.get('external_ids', [])) - if iface_id in p_ids: - # if the event is not about a port that was created by - # this test, we filter the event out. Since these tests are - # not run in isolation processing all the events might make - # some test fail ( e.g. the agent might keep resycing - # because it keeps finding not ready ports that are created - # by other tests) - filtered_ports.append(dev) - return {'added': filtered_ports, 'removed': events['removed']} - polling_manager.get_events = mock.Mock(side_effect=filter_events) - - def start_agent(self, agent, ports=None, unplug_ports=None): + def start_agent(self, agent, unplug_ports=None): if unplug_ports is None: unplug_ports = [] - if ports is None: - ports = [] self.setup_agent_rpc_mocks(agent, unplug_ports) polling_manager = polling.InterfacePollingMinimizer() - self._mock_get_events(agent, polling_manager, ports) self.addCleanup(polling_manager.stop) polling_manager.start() agent_utils.wait_until_true( @@ -163,7 +138,6 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): rpc_loop_thread.wait() self.addCleanup(stop_agent, agent, t) - return polling_manager def _create_test_port_dict(self): return {'id': uuidutils.generate_uuid(), @@ -306,10 +280,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase): def setup_agent_and_ports(self, port_dicts, create_tunnels=True, trigger_resync=False): - self.ports = port_dicts self.agent = self.create_agent(create_tunnels=create_tunnels) - self.polling_manager = self.start_agent(self.agent, ports=self.ports) + self.start_agent(self.agent) self.network = self._create_test_network_dict() + self.ports = port_dicts if trigger_resync: self._prepare_resync_trigger(self.agent) self._plug_ports(self.network, self.ports, self.agent) diff --git a/neutron/tests/functional/agent/test_l2_ovs_agent.py b/neutron/tests/functional/agent/test_l2_ovs_agent.py index ab59680ac21..b57a8f93158 100644 --- a/neutron/tests/functional/agent/test_l2_ovs_agent.py +++ b/neutron/tests/functional/agent/test_l2_ovs_agent.py @@ -73,6 +73,7 @@ class TestOVSAgent(base.OVSAgentTestFramework): port_dicts=self.create_test_ports()) self.wait_until_ports_state(self.ports, up=True) self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED + # OVS restarted, the agent should reprocess all the ports self.agent.plugin_rpc.update_device_list.reset_mock() self.wait_until_ports_state(self.ports, up=True) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py index d4cf3877bf5..322e4b921af 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_neutron_agent.py @@ -401,130 +401,6 @@ class TestOvsNeutronAgent(object): updated_ports) self.assertEqual(expected, actual) - def test_process_ports_events_returns_current_for_unchanged_ports(self): - with mock.patch.object(self.agent, 'check_changed_vlans', - return_value=set()): - events = {'added': [], 'removed': []} - registered_ports = {1, 3} - ancillary_ports = {2, 5} - expected_ports = {'current': registered_ports, 'added': set(), - 'removed': set()} - expected_ancillary = {'current': ancillary_ports, 'added': set(), - 'removed': set()} - actual = self.agent.process_ports_events(events, registered_ports, - ancillary_ports) - self.assertEqual((expected_ports, expected_ancillary), actual) - - def test_process_port_events_returns_port_changes(self): - events = {'added': [{'name': 'port3', 'ofport': 3, - 'external_ids': {'attached-mac': 'test-mac'}}, - {'name': 'qg-port2', 'ofport': 5, - 'external_ids': {'attached-mac': 'test-mac'}}], - 'removed': [{'name': 'port2', 'ofport': 2, - 'external_ids': {'attached-mac': 'test-mac'}}, - {'name': 'qg-port1', 'ofport': 4, - 'external_ids': {'attached-mac': 'test-mac'}}]} - registered_ports = {1, 2} - ancillary_ports = {4} - expected_ports = dict( - current={1, 3}, added={3}, removed={2}) - expected_ancillary_ports = dict( - current={5}, added={5}, removed={4}) - ancillary_bridge = mock.Mock() - ancillary_bridge.get_vif_port_set.return_value = {4, 5} - self.agent.ancillary_brs = [ancillary_bridge] - with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', - side_effect=[3, 5, 2, 4]), \ - mock.patch.object(self.agent, 'check_changed_vlans', - return_value=set()): - actual = self.agent.process_ports_events( - events, registered_ports, ancillary_ports) - self.assertEqual( - (expected_ports, expected_ancillary_ports), actual) - - def _test_process_port_events_with_updated_ports(self, updated_ports): - events = {'added': [{'name': 'port3', 'ofport': 3, - 'external_ids': {'attached-mac': 'test-mac'}}, - {'name': 'qg-port2', 'ofport': 6, - 'external_ids': {'attached-mac': 'test-mac'}}], - 'removed': [{'name': 'port2', 'ofport': 2, - 'external_ids': {'attached-mac': 'test-mac'}}, - {'name': 'qg-port1', 'ofport': 5, - 'external_ids': {'attached-mac': 'test-mac'}}]} - registered_ports = {1, 2, 4} - ancillary_ports = {5, 8} - expected_ports = dict(current={1, 3, 4}, added={3}, - removed={2}, updated={4}) - expected_ancillary = dict(current={6, 8}, added={6}, - removed={5}) - ancillary_bridge = mock.Mock() - ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8} - self.agent.ancillary_brs = [ancillary_bridge] - with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', - side_effect=[3, 6, 2, 5]), \ - mock.patch.object(self.agent, 'check_changed_vlans', - return_value=set()): - - actual = self.agent.process_ports_events( - events, registered_ports, ancillary_ports, updated_ports) - self.assertEqual((expected_ports, expected_ancillary), actual) - - def test_process_port_events_finds_known_updated_ports(self): - self._test_process_port_events_with_updated_ports({4}) - - def test_process_port_events_ignores_unknown_updated_ports(self): - # the port '5' was not seen on current ports. Hence it has either - # never been wired or already removed and should be ignored - self._test_process_port_events_with_updated_ports({4, 5}) - - def test_process_port_events_ignores_updated_port_if_removed(self): - events = {'added': [{'name': 'port3', 'ofport': 3, - 'external_ids': {'attached-mac': 'test-mac'}}], - 'removed': [{'name': 'port2', 'ofport': 2, - 'external_ids': {'attached-mac': 'test-mac'}}]} - registered_ports = {1, 2} - updated_ports = {1, 2} - expected_ports = dict(current={1, 3}, added={3}, - removed={2}, updated={1}) - expected_ancillary = dict(current=set(), added=set(), removed=set()) - with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', - side_effect=[3, 2]), \ - mock.patch.object(self.agent, 'check_changed_vlans', - return_value=set()): - - actual = self.agent.process_ports_events( - events, registered_ports, None, updated_ports) - self.assertEqual((expected_ports, expected_ancillary), actual) - - def test_process_port_events_no_vif_changes_return_updated_port_only(self): - events = {'added': [], 'removed': []} - registered_ports = {1, 2, 3} - updated_ports = {2} - expected_ports = dict(current=registered_ports, updated={2}, - added=set(), removed=set()) - expected_ancillary = dict(current=set(), added=set(), removed=set()) - with mock.patch.object(self.agent, 'check_changed_vlans', - return_value=set()): - actual = self.agent.process_ports_events( - events, registered_ports, None, updated_ports) - self.assertEqual((expected_ports, expected_ancillary), actual) - - def test_process_port_events_ignores_removed_port_if_never_added(self): - events = {'added': [], - 'removed': [{'name': 'port2', 'ofport': 2, - 'external_ids': {'attached-mac': 'test-mac'}}]} - registered_ports = {1} - expected_ports = dict(current=registered_ports, added=set(), - removed=set()) - expected_ancillary = dict(current=set(), added=set(), removed=set()) - with mock.patch.object(self.agent.int_br, 'portid_from_external_ids', - side_effect=[2]), \ - mock.patch.object(self.agent, 'check_changed_vlans', - return_value=set()): - actual = self.agent.process_ports_events(events, registered_ports, - None) - self.assertEqual((expected_ports, expected_ancillary), actual) - def test_update_ports_returns_changed_vlan(self): br = self.br_int_cls('br-int') mac = "ca:fe:de:ad:be:ef" @@ -1495,17 +1371,11 @@ class TestOvsNeutronAgent(object): 'added': set([]), 'removed': set(['tap0'])} - reply_ancillary = {'current': set([]), - 'added': set([]), - 'removed': set([])} - with mock.patch.object(async_process.AsyncProcess, "_spawn"),\ - mock.patch.object(async_process.AsyncProcess, "start"),\ - mock.patch.object(async_process.AsyncProcess, "stop"),\ mock.patch.object(log.KeywordArgumentAdapter, 'exception') as log_exception,\ mock.patch.object(self.mod_agent.OVSNeutronAgent, - 'process_ports_events') as process_p_events,\ + 'scan_ports') as scan_ports,\ mock.patch.object( self.mod_agent.OVSNeutronAgent, 'process_network_ports') as process_network_ports,\ @@ -1523,8 +1393,7 @@ class TestOvsNeutronAgent(object): 'cleanup_stale_flows') as cleanup: log_exception.side_effect = Exception( 'Fake exception to get out of the loop') - process_p_events.side_effect = [(reply2, reply_ancillary), - (reply3, reply_ancillary)] + scan_ports.side_effect = [reply2, reply3] process_network_ports.side_effect = [ False, Exception('Fake exception to get out of the loop')] check_ovs_status.side_effect = args @@ -1533,12 +1402,10 @@ class TestOvsNeutronAgent(object): except Exception: pass - process_p_events.assert_has_calls([ - mock.call({'removed': [], 'added': []}, set(), None, set()), - mock.call({'removed': [], 'added': []}, set(['tap0']), None, - set()) + scan_ports.assert_has_calls([ + mock.call(set(), True, set()), + mock.call(set(), False, set()) ]) - process_network_ports.assert_has_calls([ mock.call(reply2, False), mock.call(reply3, True) diff --git a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py index 7bae6df2ca6..079d2dea083 100644 --- a/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py +++ b/neutron/tests/unit/plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py @@ -495,24 +495,14 @@ class TunnelTest(object): self._verify_mock_calls() def test_daemon_loop(self): - reply_ge_1 = {'added': set(['tap0']), - 'removed': set([])} + reply2 = {'current': set(['tap0']), + 'added': set(['tap2']), + 'removed': set([])} - reply_ge_2 = {'added': set([]), + reply3 = {'current': set(['tap2']), + 'added': set([]), 'removed': set(['tap0'])} - reply_pe_1 = {'current': set(['tap0']), - 'added': set(['tap0']), - 'removed': set([])} - - reply_pe_2 = {'current': set([]), - 'added': set([]), - 'removed': set(['tap0'])} - - reply_ancillary = {'current': set([]), - 'added': set([]), - 'removed': set([])} - self.mock_int_bridge_expected += [ mock.call.check_canary_table(), mock.call.check_canary_table() @@ -523,7 +513,7 @@ class TunnelTest(object): with mock.patch.object(log.KeywordArgumentAdapter, 'exception') as log_exception,\ mock.patch.object(self.mod_agent.OVSNeutronAgent, - 'process_ports_events') as process_p_events,\ + 'scan_ports') as scan_ports,\ mock.patch.object( self.mod_agent.OVSNeutronAgent, 'process_network_ports') as process_network_ports,\ @@ -538,11 +528,8 @@ class TunnelTest(object): 'cleanup_stale_flows') as cleanup: log_exception.side_effect = Exception( 'Fake exception to get out of the loop') + scan_ports.side_effect = [reply2, reply3] update_stale.return_value = [] - process_p_events.side_effect = [ - (reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)] - interface_polling = mock.Mock() - interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2] process_network_ports.side_effect = [ False, Exception('Fake exception to get out of the loop')] @@ -552,7 +539,7 @@ class TunnelTest(object): # We start method and expect it will raise after 2nd loop # If something goes wrong, assert_has_calls below will catch it try: - n_agent.rpc_loop(interface_polling) + n_agent.daemon_loop() except Exception: pass @@ -560,14 +547,17 @@ class TunnelTest(object): # messages log_exception.assert_called_once_with( "Error while processing VIF ports") - process_p_events.assert_has_calls([ - mock.call(reply_ge_1, set(), None, set()), - mock.call(reply_ge_2, set(['tap0']), None, set()) + scan_ports.assert_has_calls([ + mock.call(set(), True, set()), + mock.call(set(['tap0']), False, set()) ]) process_network_ports.assert_has_calls([ mock.call({'current': set(['tap0']), 'removed': set([]), - 'added': set(['tap0'])}, False), + 'added': set(['tap2'])}, False), + mock.call({'current': set(['tap2']), + 'removed': set(['tap0']), + 'added': set([])}, False) ]) cleanup.assert_called_once_with()