Revert "Revert "OVS agent reacts to events instead of polling""

The original change had to be reverted because it caused
tests failing in the gate. The failures were due to the
fact that when a port was not ready, an exception was
thrown to trigger a full resync of the agent. This
behavior was meant to be temporary and was fixed in a
dependent patch that was not merged though. This revert
moves the handling of not ready ports in this patch.
It also refactors the unit tests a bit.

This reverts commit e7270d9505.

Change-Id: I6574cef3c95525ace6a98cf968ee159190681394
This commit is contained in:
rossella 2015-12-01 16:58:44 +00:00
parent 81a4aac8d4
commit ccdf211b4c
5 changed files with 380 additions and 55 deletions

View File

@ -32,7 +32,9 @@ from neutron.agent.common import ovs_lib
from neutron.agent.common import polling
from neutron.agent.common import utils
from neutron.agent.l2.extensions import manager as ext_manager
from neutron.agent.linux import async_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import polling as linux_polling
from neutron.agent import rpc as agent_rpc
from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
@ -149,7 +151,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
agent_conf = self.conf.AGENT
ovs_conf = self.conf.OVS
self.fullsync = True
self.fullsync = False
# init bridge classes with configured datapath type.
self.br_int_cls, self.br_phys_cls, self.br_tun_cls = (
functools.partial(bridge_classes[b],
@ -1175,6 +1177,100 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_info['removed'] = registered_ports - cur_ports
return port_info
def process_ports_events(self, events, registered_ports, ancillary_ports,
old_ports_not_ready, updated_ports=None):
port_info = {}
port_info['added'] = set()
port_info['removed'] = set()
port_info['current'] = registered_ports
ancillary_port_info = {}
ancillary_port_info['added'] = set()
ancillary_port_info['removed'] = set()
ancillary_port_info['current'] = ancillary_ports
ports_not_ready_yet = set()
# if a port was added and then removed or viceversa since the agent
# can't know the order of the operations, check the status of the port
# to determine if the port was added or deleted
ports_removed_and_added = [
p for p in events['added'] if p in events['removed']]
for p in ports_removed_and_added:
if ovs_lib.BaseOVS().port_exists(p['name']):
events['removed'].remove(p)
else:
events['added'].remove(p)
#TODO(rossella_s): scanning the ancillary bridge won't be needed
# anymore when https://review.openstack.org/#/c/203381 since the bridge
# id stored in external_ids will be used to identify the bridge the
# port belongs to
cur_ancillary_ports = set()
for bridge in self.ancillary_brs:
cur_ancillary_ports |= bridge.get_vif_port_set()
cur_ancillary_ports |= ancillary_port_info['current']
def _process_port(port, ports, ancillary_ports):
# check 'iface-id' is set otherwise is not a port
# the agent should care about
if 'attached-mac' in port.get('external_ids', []):
iface_id = self.int_br.portid_from_external_ids(
port['external_ids'])
if iface_id:
if port['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
LOG.debug("Port %s not ready yet on the bridge",
iface_id)
ports_not_ready_yet.add(port['name'])
return
# check if port belongs to ancillary bridge
if iface_id in cur_ancillary_ports:
ancillary_ports.add(iface_id)
else:
ports.add(iface_id)
if old_ports_not_ready:
old_ports_not_ready_attrs = self.int_br.get_ports_attributes(
'Interface', columns=['name', 'external_ids', 'ofport'],
ports=old_ports_not_ready, if_exists=True)
now_ready_ports = set(
[p['name'] for p in old_ports_not_ready_attrs])
LOG.debug("Ports %s are now ready", now_ready_ports)
old_ports_not_ready_yet = old_ports_not_ready - now_ready_ports
removed_ports = set([p['name'] for p in events['removed']])
old_ports_not_ready_yet -= removed_ports
LOG.debug("Ports %s were not ready at last iteration and are not "
"ready yet", old_ports_not_ready_yet)
ports_not_ready_yet |= old_ports_not_ready_yet
events['added'].extend(old_ports_not_ready_attrs)
for port in events['added']:
_process_port(port, port_info['added'],
ancillary_port_info['added'])
for port in events['removed']:
_process_port(port, port_info['removed'],
ancillary_port_info['removed'])
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
# Disregard devices that were never noticed by the agent
port_info['removed'] &= port_info['current']
port_info['current'] |= port_info['added']
port_info['current'] -= port_info['removed']
ancillary_port_info['removed'] &= ancillary_port_info['current']
ancillary_port_info['current'] |= ancillary_port_info['added']
ancillary_port_info['current'] -= ancillary_port_info['removed']
if updated_ports:
# Some updated ports might have been removed in the
# meanwhile, and therefore should not be processed.
# In this case the updated port won't be found among
# current ports.
updated_ports &= port_info['current']
port_info['updated'] = updated_ports
return port_info, ancillary_port_info, ports_not_ready_yet
def scan_ports(self, registered_ports, sync, updated_ports=None):
cur_ports = self.int_br.get_vif_port_set()
self.int_br_device_count = len(cur_ports)
@ -1618,11 +1714,64 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
bridge.cleanup_flows()
def process_port_info(self, start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs, ports_not_ready_yet):
# There are polling managers that don't have get_events, e.g.
# AlwaysPoll used by windows implementations
# REVISIT (rossella_s) This needs to be reworked to hide implementation
# details regarding polling in BasePollingManager subclasses
if sync or not (hasattr(polling_manager, 'get_events')):
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
consecutive_resyncs = consecutive_resyncs + 1
if (consecutive_resyncs >=
constants.MAX_DEVICE_RETRIES):
LOG.warn(_LW(
"Clearing cache of registered ports,"
" retries to resync were > %s"),
constants.MAX_DEVICE_RETRIES)
ports.clear()
ancillary_ports.clear()
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
# NOTE(rossella_s) don't empty the queue of events
# calling polling_manager.get_events() since
# the agent might miss some event (for example a port
# deletion)
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, sync,
updated_ports_copy)
# Treat ancillary devices if they exist
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
" - ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
else:
ancillary_port_info = {}
else:
consecutive_resyncs = 0
events = polling_manager.get_events()
port_info, ancillary_port_info, ports_not_ready_yet = (
self.process_ports_events(events, ports, ancillary_ports,
ports_not_ready_yet,
updated_ports_copy))
return (port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet)
def rpc_loop(self, polling_manager=None):
if not polling_manager:
polling_manager = polling.get_polling_manager(
minimize_polling=False)
sync = True
sync = False
ports = set()
updated_ports_copy = set()
ancillary_ports = set()
@ -1630,6 +1779,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ovs_restarted = False
consecutive_resyncs = 0
need_clean_stale_flow = True
ports_not_ready_yet = set()
while self._check_and_handle_signal():
if self.fullsync:
LOG.info(_LI("rpc_loop doing a full sync."))
@ -1640,20 +1790,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
start = time.time()
LOG.debug("Agent rpc_loop - iteration:%d started",
self.iter_num)
if sync:
LOG.info(_LI("Agent out of sync with plugin!"))
polling_manager.force_polling()
consecutive_resyncs = consecutive_resyncs + 1
if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
LOG.warn(_LW("Clearing cache of registered ports, retrials"
" to resync were > %s"),
constants.MAX_DEVICE_RETRIES)
ports.clear()
ancillary_ports.clear()
sync = False
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
ovs_status = self.check_ovs_status()
if ovs_status == constants.OVS_RESTARTED:
self.setup_integration_br()
@ -1669,6 +1805,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
self.patch_tun_ofport)
self.dvr_agent.reset_dvr_parameters()
self.dvr_agent.setup_dvr_flows()
# restart the polling manager so that it will signal as added
# all the current ports
# REVISIT (rossella_s) Define a method "reset" in
# BasePollingManager that will be implemented by AlwaysPoll as
# no action and by InterfacePollingMinimizer as start/stop
if isinstance(
polling_manager, linux_polling.InterfacePollingMinimizer):
# There's a possible race here, when ovsdb-server is
# restarted ovsdb monitor will also be restarted
try:
polling_manager.stop()
except async_process.AsyncProcessException:
LOG.debug("OVSDB monitor was not running")
polling_manager.start()
elif ovs_status == constants.OVS_DEAD:
# Agent doesn't apply any operations when ovs is dead, to
# prevent unexpected failure or crash. Sleep and continue
@ -1685,7 +1835,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.exception(_LE("Error while synchronizing tunnels"))
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
if self._agent_has_updates(polling_manager) or ovs_restarted:
if (self._agent_has_updates(polling_manager) or sync
or ports_not_ready_yet):
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
@ -1697,9 +1848,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
# between these two statements, this will be thread-safe
updated_ports_copy = self.updated_ports
self.updated_ports = set()
reg_ports = (set() if ovs_restarted else ports)
port_info = self.scan_ports(reg_ports, sync,
updated_ports_copy)
(port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet) = (self.process_port_info(
start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs, ports_not_ready_yet)
)
sync = False
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
if ofport_changed_ports:
@ -1710,16 +1865,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# Treat ancillary devices if they exist
if self.ancillary_brs:
ancillary_port_info = self.scan_ancillary_ports(
ancillary_ports, sync)
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"ancillary port info retrieved. "
"Elapsed:%(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
sync = False
# Secure and wire/unwire VIFs and update their status
# on Neutron server
if (self._port_info_has_changes(port_info) or

View File

@ -117,13 +117,38 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
if tunnel_types:
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
agent.sg_agent = mock.Mock()
agent.ancillary_brs = []
return agent
def start_agent(self, agent, unplug_ports=None):
def _mock_get_events(self, agent, polling_manager, ports):
get_events = polling_manager.get_events
p_ids = [p['id'] for p in ports]
def filter_events():
events = get_events()
filtered_ports = []
for dev in events['added']:
iface_id = agent.int_br.portid_from_external_ids(
dev.get('external_ids', []))
if iface_id in p_ids:
# if the event is not about a port that was created by
# this test, we filter the event out. Since these tests are
# not run in isolation processing all the events might make
# some test fail ( e.g. the agent might keep resycing
# because it keeps finding not ready ports that are created
# by other tests)
filtered_ports.append(dev)
return {'added': filtered_ports, 'removed': events['removed']}
polling_manager.get_events = mock.Mock(side_effect=filter_events)
def start_agent(self, agent, ports=None, unplug_ports=None):
if unplug_ports is None:
unplug_ports = []
if ports is None:
ports = []
self.setup_agent_rpc_mocks(agent, unplug_ports)
polling_manager = polling.InterfacePollingMinimizer()
self._mock_get_events(agent, polling_manager, ports)
self.addCleanup(polling_manager.stop)
polling_manager.start()
agent_utils.wait_until_true(
@ -137,6 +162,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
rpc_loop_thread.wait()
self.addCleanup(stop_agent, agent, t)
return polling_manager
def _create_test_port_dict(self):
return {'id': uuidutils.generate_uuid(),
@ -280,10 +306,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
trigger_resync=False):
self.agent = self.create_agent(create_tunnels=create_tunnels)
self.start_agent(self.agent)
self.network = self._create_test_network_dict()
self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels)
self.polling_manager = self.start_agent(self.agent, ports=self.ports)
self.network = self._create_test_network_dict()
if trigger_resync:
self._prepare_resync_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent)

View File

@ -114,7 +114,8 @@ class TestOVSAgent(base.OVSAgentTestFramework):
self.agent = self.create_agent(create_tunnels=False)
self.network = self._create_test_network_dict()
self._plug_ports(self.network, self.ports, self.agent)
self.start_agent(self.agent, unplug_ports=[self.ports[1]])
self.start_agent(self.agent, ports=self.ports,
unplug_ports=[self.ports[1]])
self.wait_until_ports_state([self.ports[0]], up=True)
self.assertRaises(
Timeout, self.wait_until_ports_state, [self.ports[1]], up=True,

View File

@ -390,6 +390,133 @@ class TestOvsNeutronAgent(object):
updated_ports)
self.assertEqual(expected, actual)
def _test_process_ports_events(self, events, registered_ports,
ancillary_ports, expected_ports,
expected_ancillary, updated_ports=None):
with mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
devices_not_ready_yet = set()
actual = self.agent.process_ports_events(
events, registered_ports, ancillary_ports,
devices_not_ready_yet, updated_ports)
self.assertEqual(
(expected_ports, expected_ancillary, devices_not_ready_yet),
actual)
def test_process_ports_events_returns_current_for_unchanged_ports(self):
events = {'added': [], 'removed': []}
registered_ports = {1, 3}
ancillary_ports = {2, 5}
expected_ports = {'current': registered_ports, 'added': set(),
'removed': set()}
expected_ancillary = {'current': ancillary_ports, 'added': set(),
'removed': set()}
self._test_process_ports_events(events, registered_ports,
ancillary_ports, expected_ports,
expected_ancillary)
def test_process_port_events_no_vif_changes_return_updated_port_only(self):
events = {'added': [], 'removed': []}
registered_ports = {1, 2, 3}
updated_ports = {2}
expected_ports = dict(current=registered_ports, updated={2},
added=set(), removed=set())
expected_ancillary = dict(current=set(), added=set(), removed=set())
self._test_process_ports_events(events, registered_ports,
set(), expected_ports,
expected_ancillary, updated_ports)
def test_process_port_events_ignores_removed_port_if_never_added(self):
events = {'added': [],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1}
expected_ports = dict(current=registered_ports, added=set(),
removed=set())
expected_ancillary = dict(current=set(), added=set(), removed=set())
devices_not_ready_yet = set()
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[2]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
actual = self.agent.process_ports_events(
events, registered_ports, set(), devices_not_ready_yet)
self.assertEqual(
(expected_ports, expected_ancillary, devices_not_ready_yet),
actual)
def test_process_port_events_port_not_ready_yet(self):
events = {'added': [{'name': 'port5', 'ofport': [],
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': []}
old_devices_not_ready = {'port4'}
registered_ports = set([1, 2, 3])
expected_ports = dict(current=set([1, 2, 3, 4]),
added=set([4]), removed=set())
self.agent.ancillary_brs = []
expected_ancillary = dict(current=set(), added=set(), removed=set())
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[5, 4]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()), \
mock.patch.object(self.agent.int_br, 'get_ports_attributes',
return_value=[{'name': 'port4', 'ofport': 4,
'external_ids': {
'attached-mac': 'mac4'}}]):
expected_devices_not_ready = {'port5'}
actual = self.agent.process_ports_events(
events, registered_ports, set(), old_devices_not_ready)
self.assertEqual(
(expected_ports, expected_ancillary,
expected_devices_not_ready),
actual)
def _test_process_port_events_with_updated_ports(self, updated_ports):
events = {'added': [{'name': 'port3', 'ofport': 3,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port2', 'ofport': 6,
'external_ids': {'attached-mac': 'test-mac'}}],
'removed': [{'name': 'port2', 'ofport': 2,
'external_ids': {'attached-mac': 'test-mac'}},
{'name': 'qg-port1', 'ofport': 5,
'external_ids': {'attached-mac': 'test-mac'}}]}
registered_ports = {1, 2, 4}
ancillary_ports = {5, 8}
expected_ports = dict(current={1, 3, 4}, added={3}, removed={2})
if updated_ports:
expected_ports['updated'] = updated_ports
expected_ancillary = dict(current={6, 8}, added={6},
removed={5})
ancillary_bridge = mock.Mock()
ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
self.agent.ancillary_brs = [ancillary_bridge]
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
side_effect=[3, 6, 2, 5]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
devices_not_ready_yet = set()
actual = self.agent.process_ports_events(
events, registered_ports, ancillary_ports,
devices_not_ready_yet, updated_ports)
self.assertEqual(
(expected_ports, expected_ancillary, devices_not_ready_yet),
actual)
def test_process_port_events_returns_port_changes(self):
self._test_process_port_events_with_updated_ports(set())
def test_process_port_events_finds_known_updated_ports(self):
self._test_process_port_events_with_updated_ports({4})
def test_process_port_events_ignores_unknown_updated_ports(self):
# the port '10' was not seen on current ports. Hence it has either
# never been wired or already removed and should be ignored
self._test_process_port_events_with_updated_ports({4, 10})
def test_process_port_events_ignores_updated_port_if_removed(self):
self._test_process_port_events_with_updated_ports({4, 5})
def test_update_ports_returns_changed_vlan(self):
br = self.br_int_cls('br-int')
mac = "ca:fe:de:ad:be:ef"
@ -1355,11 +1482,17 @@ class TestOvsNeutronAgent(object):
'added': set([]),
'removed': set(['tap0'])}
reply_ancillary = {'current': set([]),
'added': set([]),
'removed': set([])}
with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
mock.patch.object(async_process.AsyncProcess, "start"),\
mock.patch.object(async_process.AsyncProcess, "stop"),\
mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'scan_ports') as scan_ports,\
'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
@ -1377,7 +1510,11 @@ class TestOvsNeutronAgent(object):
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
devices_not_ready = set()
process_p_events.side_effect = [(reply2, reply_ancillary,
devices_not_ready),
(reply3, reply_ancillary,
devices_not_ready)]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
check_ovs_status.side_effect = args
@ -1386,10 +1523,13 @@ class TestOvsNeutronAgent(object):
except Exception:
pass
scan_ports.assert_has_calls([
mock.call(set(), True, set()),
mock.call(set(), False, set())
process_p_events.assert_has_calls([
mock.call({'removed': [], 'added': []}, set(), set(), set(),
set()),
mock.call({'removed': [], 'added': []}, set(['tap0']), set(),
set(), set())
])
process_network_ports.assert_has_calls([
mock.call(reply2, False),
mock.call(reply3, True)

View File

@ -509,14 +509,24 @@ class TunnelTest(object):
self._verify_mock_calls()
def test_daemon_loop(self):
reply2 = {'current': set(['tap0']),
'added': set(['tap2']),
'removed': set([])}
reply_ge_1 = {'added': set(['tap0']),
'removed': set([])}
reply3 = {'current': set(['tap2']),
'added': set([]),
reply_ge_2 = {'added': set([]),
'removed': set(['tap0'])}
reply_pe_1 = {'current': set(['tap0']),
'added': set(['tap0']),
'removed': set([])}
reply_pe_2 = {'current': set([]),
'added': set([]),
'removed': set(['tap0'])}
reply_ancillary = {'current': set([]),
'added': set([]),
'removed': set([])}
self.mock_int_bridge_expected += [
mock.call.check_canary_table(),
mock.call.check_canary_table()
@ -527,7 +537,7 @@ class TunnelTest(object):
with mock.patch.object(log.KeywordArgumentAdapter,
'exception') as log_exception,\
mock.patch.object(self.mod_agent.OVSNeutronAgent,
'scan_ports') as scan_ports,\
'process_ports_events') as process_p_events,\
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'process_network_ports') as process_network_ports,\
@ -542,8 +552,13 @@ class TunnelTest(object):
'cleanup_stale_flows') as cleanup:
log_exception.side_effect = Exception(
'Fake exception to get out of the loop')
scan_ports.side_effect = [reply2, reply3]
update_stale.return_value = []
devices_not_ready = set()
process_p_events.side_effect = [
(reply_pe_1, reply_ancillary, devices_not_ready),
(reply_pe_2, reply_ancillary, devices_not_ready)]
interface_polling = mock.Mock()
interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
@ -553,7 +568,7 @@ class TunnelTest(object):
# We start method and expect it will raise after 2nd loop
# If something goes wrong, assert_has_calls below will catch it
try:
n_agent.daemon_loop()
n_agent.rpc_loop(interface_polling)
except Exception:
pass
@ -561,17 +576,15 @@ class TunnelTest(object):
# messages
log_exception.assert_called_once_with(
"Error while processing VIF ports")
scan_ports.assert_has_calls([
mock.call(set(), True, set()),
mock.call(set(['tap0']), False, set())
process_p_events.assert_has_calls([
mock.call(reply_ge_1, set(), set(), devices_not_ready, set()),
mock.call(reply_ge_2, set(['tap0']), set(), devices_not_ready,
set())
])
process_network_ports.assert_has_calls([
mock.call({'current': set(['tap0']),
'removed': set([]),
'added': set(['tap2'])}, False),
mock.call({'current': set(['tap2']),
'removed': set(['tap0']),
'added': set([])}, False)
'added': set(['tap0'])}, False),
])
cleanup.assert_called_once_with()