Revert "OVS agent reacts to events instead of polling"
This might be associated to manifestation of bug #1514935
This reverts commit 1992d52d63
.
Closes-Bug: #1514935
Change-Id: If01cc87b6735e1bc039f99c4c6121e7c5ce547d0
This commit is contained in:
parent
d5df01e5ea
commit
e7270d9505
@ -34,7 +34,6 @@ from neutron.agent.common import polling
|
|||||||
from neutron.agent.common import utils
|
from neutron.agent.common import utils
|
||||||
from neutron.agent.l2.extensions import manager as ext_manager
|
from neutron.agent.l2.extensions import manager as ext_manager
|
||||||
from neutron.agent.linux import ip_lib
|
from neutron.agent.linux import ip_lib
|
||||||
from neutron.agent.linux import polling as linux_polling
|
|
||||||
from neutron.agent import rpc as agent_rpc
|
from neutron.agent import rpc as agent_rpc
|
||||||
from neutron.agent import securitygroups_rpc as sg_rpc
|
from neutron.agent import securitygroups_rpc as sg_rpc
|
||||||
from neutron.api.rpc.handlers import dvr_rpc
|
from neutron.api.rpc.handlers import dvr_rpc
|
||||||
@ -1213,88 +1212,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||||||
port_info['removed'] = registered_ports - cur_ports
|
port_info['removed'] = registered_ports - cur_ports
|
||||||
return port_info
|
return port_info
|
||||||
|
|
||||||
def process_ports_events(self, events, registered_ports, ancillary_ports,
|
|
||||||
updated_ports=None):
|
|
||||||
port_info = {}
|
|
||||||
port_info['added'] = set()
|
|
||||||
port_info['removed'] = set()
|
|
||||||
port_info['current'] = registered_ports
|
|
||||||
|
|
||||||
ancillary_port_info = {}
|
|
||||||
ancillary_port_info['added'] = set()
|
|
||||||
ancillary_port_info['removed'] = set()
|
|
||||||
ancillary_port_info['current'] = (
|
|
||||||
ancillary_ports if ancillary_ports else set())
|
|
||||||
|
|
||||||
# if a port was added and then removed or viceversa since the agent
|
|
||||||
# can't know the order of the operations, check the status of the port
|
|
||||||
# to determine if the port was added or deleted
|
|
||||||
device_removed_or_added = [
|
|
||||||
dev for dev in events['added'] if dev in events['removed']]
|
|
||||||
for device in device_removed_or_added:
|
|
||||||
if ovs_lib.BaseOVS().port_exists(device['name']):
|
|
||||||
events['removed'].remove(device)
|
|
||||||
else:
|
|
||||||
events['added'].remove(device)
|
|
||||||
|
|
||||||
#TODO(rossella_s): scanning the ancillary bridge won't be needed
|
|
||||||
# anymore when https://review.openstack.org/#/c/203381 since the bridge
|
|
||||||
# id stored in external_ids will be used to identify the bridge the
|
|
||||||
# port belongs to
|
|
||||||
cur_ancillary_ports = set()
|
|
||||||
for bridge in self.ancillary_brs:
|
|
||||||
cur_ancillary_ports |= bridge.get_vif_port_set()
|
|
||||||
cur_ancillary_ports |= ancillary_port_info['current']
|
|
||||||
|
|
||||||
def _process_device(device, devices, ancillary_devices):
|
|
||||||
# check 'iface-id' is set otherwise is not a port
|
|
||||||
# the agent should care about
|
|
||||||
if 'attached-mac' in device.get('external_ids', []):
|
|
||||||
iface_id = self.int_br.portid_from_external_ids(
|
|
||||||
device['external_ids'])
|
|
||||||
if iface_id:
|
|
||||||
if device['ofport'] == ovs_lib.UNASSIGNED_OFPORT:
|
|
||||||
#TODO(rossella_s) it's extreme to trigger a full resync
|
|
||||||
# if a port is not ready, resync only the device that
|
|
||||||
# is not ready
|
|
||||||
raise Exception(
|
|
||||||
_("Port %s is not ready, resync needed") % device[
|
|
||||||
'name'])
|
|
||||||
# check if device belong to ancillary bridge
|
|
||||||
if iface_id in cur_ancillary_ports:
|
|
||||||
ancillary_devices.add(iface_id)
|
|
||||||
else:
|
|
||||||
devices.add(iface_id)
|
|
||||||
|
|
||||||
for device in events['added']:
|
|
||||||
_process_device(device, port_info['added'],
|
|
||||||
ancillary_port_info['added'])
|
|
||||||
for device in events['removed']:
|
|
||||||
_process_device(device, port_info['removed'],
|
|
||||||
ancillary_port_info['removed'])
|
|
||||||
|
|
||||||
if updated_ports is None:
|
|
||||||
updated_ports = set()
|
|
||||||
updated_ports.update(self.check_changed_vlans())
|
|
||||||
|
|
||||||
# Disregard devices that were never noticed by the agent
|
|
||||||
port_info['removed'] &= port_info['current']
|
|
||||||
port_info['current'] |= port_info['added']
|
|
||||||
port_info['current'] -= port_info['removed']
|
|
||||||
|
|
||||||
ancillary_port_info['removed'] &= ancillary_port_info['current']
|
|
||||||
ancillary_port_info['current'] |= ancillary_port_info['added']
|
|
||||||
ancillary_port_info['current'] -= ancillary_port_info['removed']
|
|
||||||
|
|
||||||
if updated_ports:
|
|
||||||
# Some updated ports might have been removed in the
|
|
||||||
# meanwhile, and therefore should not be processed.
|
|
||||||
# In this case the updated port won't be found among
|
|
||||||
# current ports.
|
|
||||||
updated_ports &= port_info['current']
|
|
||||||
port_info['updated'] = updated_ports
|
|
||||||
return port_info, ancillary_port_info
|
|
||||||
|
|
||||||
def scan_ports(self, registered_ports, sync, updated_ports=None):
|
def scan_ports(self, registered_ports, sync, updated_ports=None):
|
||||||
cur_ports = self.int_br.get_vif_port_set()
|
cur_ports = self.int_br.get_vif_port_set()
|
||||||
self.int_br_device_count = len(cur_ports)
|
self.int_br_device_count = len(cur_ports)
|
||||||
@ -1739,64 +1656,12 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||||||
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
|
LOG.info(_LI("Cleaning stale %s flows"), bridge.br_name)
|
||||||
bridge.cleanup_flows()
|
bridge.cleanup_flows()
|
||||||
|
|
||||||
def process_port_info(self, start, polling_manager, sync, ovs_restarted,
|
|
||||||
ports, ancillary_ports, updated_ports_copy,
|
|
||||||
consecutive_resyncs):
|
|
||||||
# There are polling managers that don't have get_events, e.g.
|
|
||||||
# AlwaysPoll used by windows implementations
|
|
||||||
# REVISIT (rossella_s) This needs to be reworked to hide implementation
|
|
||||||
# details regarding polling in BasePollingManager subclasses
|
|
||||||
if sync or not (hasattr(polling_manager, 'get_events')):
|
|
||||||
if sync:
|
|
||||||
LOG.info(_LI("Agent out of sync with plugin!"))
|
|
||||||
consecutive_resyncs = consecutive_resyncs + 1
|
|
||||||
if (consecutive_resyncs >=
|
|
||||||
constants.MAX_DEVICE_RETRIES):
|
|
||||||
LOG.warn(_LW(
|
|
||||||
"Clearing cache of registered ports,"
|
|
||||||
" retries to resync were > %s"),
|
|
||||||
constants.MAX_DEVICE_RETRIES)
|
|
||||||
ports.clear()
|
|
||||||
ancillary_ports.clear()
|
|
||||||
consecutive_resyncs = 0
|
|
||||||
else:
|
|
||||||
consecutive_resyncs = 0
|
|
||||||
|
|
||||||
# NOTE(rossella_s) don't empty the queue of events
|
|
||||||
# calling polling_manager.get_events() since
|
|
||||||
# the agent might miss some event (for example a port
|
|
||||||
# deletion)
|
|
||||||
reg_ports = (set() if ovs_restarted else ports)
|
|
||||||
port_info = self.scan_ports(reg_ports, sync,
|
|
||||||
updated_ports_copy)
|
|
||||||
# Treat ancillary devices if they exist
|
|
||||||
if self.ancillary_brs:
|
|
||||||
ancillary_port_info = self.scan_ancillary_ports(
|
|
||||||
ancillary_ports, sync)
|
|
||||||
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d"
|
|
||||||
" - ancillary port info retrieved. "
|
|
||||||
"Elapsed:%(elapsed).3f",
|
|
||||||
{'iter_num': self.iter_num,
|
|
||||||
'elapsed': time.time() - start})
|
|
||||||
else:
|
|
||||||
ancillary_port_info = {}
|
|
||||||
|
|
||||||
else:
|
|
||||||
consecutive_resyncs = 0
|
|
||||||
events = polling_manager.get_events()
|
|
||||||
ancillary_ports = (
|
|
||||||
ancillary_ports if self.ancillary_brs else None)
|
|
||||||
port_info, ancillary_port_info = (
|
|
||||||
self.process_ports_events(events, ports,
|
|
||||||
ancillary_ports, updated_ports_copy))
|
|
||||||
return port_info, ancillary_port_info, consecutive_resyncs
|
|
||||||
|
|
||||||
def rpc_loop(self, polling_manager=None):
|
def rpc_loop(self, polling_manager=None):
|
||||||
if not polling_manager:
|
if not polling_manager:
|
||||||
polling_manager = polling.get_polling_manager(
|
polling_manager = polling.get_polling_manager(
|
||||||
minimize_polling=False)
|
minimize_polling=False)
|
||||||
|
|
||||||
sync = False
|
sync = True
|
||||||
ports = set()
|
ports = set()
|
||||||
updated_ports_copy = set()
|
updated_ports_copy = set()
|
||||||
ancillary_ports = set()
|
ancillary_ports = set()
|
||||||
@ -1809,6 +1674,20 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||||||
start = time.time()
|
start = time.time()
|
||||||
LOG.debug("Agent rpc_loop - iteration:%d started",
|
LOG.debug("Agent rpc_loop - iteration:%d started",
|
||||||
self.iter_num)
|
self.iter_num)
|
||||||
|
if sync:
|
||||||
|
LOG.info(_LI("Agent out of sync with plugin!"))
|
||||||
|
polling_manager.force_polling()
|
||||||
|
consecutive_resyncs = consecutive_resyncs + 1
|
||||||
|
if consecutive_resyncs >= constants.MAX_DEVICE_RETRIES:
|
||||||
|
LOG.warn(_LW("Clearing cache of registered ports, retrials"
|
||||||
|
" to resync were > %s"),
|
||||||
|
constants.MAX_DEVICE_RETRIES)
|
||||||
|
ports.clear()
|
||||||
|
ancillary_ports.clear()
|
||||||
|
sync = False
|
||||||
|
consecutive_resyncs = 0
|
||||||
|
else:
|
||||||
|
consecutive_resyncs = 0
|
||||||
ovs_status = self.check_ovs_status()
|
ovs_status = self.check_ovs_status()
|
||||||
if ovs_status == constants.OVS_RESTARTED:
|
if ovs_status == constants.OVS_RESTARTED:
|
||||||
self.setup_integration_br()
|
self.setup_integration_br()
|
||||||
@ -1824,15 +1703,6 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||||||
self.patch_tun_ofport)
|
self.patch_tun_ofport)
|
||||||
self.dvr_agent.reset_dvr_parameters()
|
self.dvr_agent.reset_dvr_parameters()
|
||||||
self.dvr_agent.setup_dvr_flows()
|
self.dvr_agent.setup_dvr_flows()
|
||||||
# restart the polling manager so that it will signal as added
|
|
||||||
# all the current ports
|
|
||||||
# REVISIT (rossella_s) Define a method "reset" in
|
|
||||||
# BasePollingManager that will be implemented by AlwaysPoll as
|
|
||||||
# no action and by InterfacePollingMinimizer as start/stop
|
|
||||||
if isinstance(
|
|
||||||
polling_manager, linux_polling.InterfacePollingMinimizer):
|
|
||||||
polling_manager.stop()
|
|
||||||
polling_manager.start()
|
|
||||||
elif ovs_status == constants.OVS_DEAD:
|
elif ovs_status == constants.OVS_DEAD:
|
||||||
# Agent doesn't apply any operations when ovs is dead, to
|
# Agent doesn't apply any operations when ovs is dead, to
|
||||||
# prevent unexpected failure or crash. Sleep and continue
|
# prevent unexpected failure or crash. Sleep and continue
|
||||||
@ -1849,7 +1719,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||||||
LOG.exception(_LE("Error while synchronizing tunnels"))
|
LOG.exception(_LE("Error while synchronizing tunnels"))
|
||||||
tunnel_sync = True
|
tunnel_sync = True
|
||||||
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
|
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
|
||||||
if self._agent_has_updates(polling_manager) or sync:
|
if self._agent_has_updates(polling_manager) or ovs_restarted:
|
||||||
try:
|
try:
|
||||||
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
|
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
|
||||||
"starting polling. Elapsed:%(elapsed).3f",
|
"starting polling. Elapsed:%(elapsed).3f",
|
||||||
@ -1861,13 +1731,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||||||
# between these two statements, this will be thread-safe
|
# between these two statements, this will be thread-safe
|
||||||
updated_ports_copy = self.updated_ports
|
updated_ports_copy = self.updated_ports
|
||||||
self.updated_ports = set()
|
self.updated_ports = set()
|
||||||
port_info, ancillary_port_info, consecutive_resyncs = (
|
reg_ports = (set() if ovs_restarted else ports)
|
||||||
self.process_port_info(
|
port_info = self.scan_ports(reg_ports, sync,
|
||||||
start, polling_manager, sync, ovs_restarted,
|
updated_ports_copy)
|
||||||
ports, ancillary_ports, updated_ports_copy,
|
|
||||||
consecutive_resyncs)
|
|
||||||
)
|
|
||||||
|
|
||||||
self.process_deleted_ports(port_info)
|
self.process_deleted_ports(port_info)
|
||||||
ofport_changed_ports = self.update_stale_ofport_rules()
|
ofport_changed_ports = self.update_stale_ofport_rules()
|
||||||
if ofport_changed_ports:
|
if ofport_changed_ports:
|
||||||
@ -1878,6 +1744,16 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
|
|||||||
"Elapsed:%(elapsed).3f",
|
"Elapsed:%(elapsed).3f",
|
||||||
{'iter_num': self.iter_num,
|
{'iter_num': self.iter_num,
|
||||||
'elapsed': time.time() - start})
|
'elapsed': time.time() - start})
|
||||||
|
# Treat ancillary devices if they exist
|
||||||
|
if self.ancillary_brs:
|
||||||
|
ancillary_port_info = self.scan_ancillary_ports(
|
||||||
|
ancillary_ports, sync)
|
||||||
|
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
|
||||||
|
"ancillary port info retrieved. "
|
||||||
|
"Elapsed:%(elapsed).3f",
|
||||||
|
{'iter_num': self.iter_num,
|
||||||
|
'elapsed': time.time() - start})
|
||||||
|
sync = False
|
||||||
# Secure and wire/unwire VIFs and update their status
|
# Secure and wire/unwire VIFs and update their status
|
||||||
# on Neutron server
|
# on Neutron server
|
||||||
if (self._port_info_has_changes(port_info) or
|
if (self._port_info_has_changes(port_info) or
|
||||||
|
@ -118,38 +118,13 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||||||
if tunnel_types:
|
if tunnel_types:
|
||||||
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
|
self.addCleanup(self.ovs.delete_bridge, self.br_tun)
|
||||||
agent.sg_agent = mock.Mock()
|
agent.sg_agent = mock.Mock()
|
||||||
agent.ancillary_brs = None
|
|
||||||
return agent
|
return agent
|
||||||
|
|
||||||
def _mock_get_events(self, agent, polling_manager, ports):
|
def start_agent(self, agent, unplug_ports=None):
|
||||||
get_events = polling_manager.get_events
|
|
||||||
p_ids = [p['id'] for p in ports]
|
|
||||||
|
|
||||||
def filter_events():
|
|
||||||
events = get_events()
|
|
||||||
filtered_ports = []
|
|
||||||
for dev in events['added']:
|
|
||||||
iface_id = agent.int_br.portid_from_external_ids(
|
|
||||||
dev.get('external_ids', []))
|
|
||||||
if iface_id in p_ids:
|
|
||||||
# if the event is not about a port that was created by
|
|
||||||
# this test, we filter the event out. Since these tests are
|
|
||||||
# not run in isolation processing all the events might make
|
|
||||||
# some test fail ( e.g. the agent might keep resycing
|
|
||||||
# because it keeps finding not ready ports that are created
|
|
||||||
# by other tests)
|
|
||||||
filtered_ports.append(dev)
|
|
||||||
return {'added': filtered_ports, 'removed': events['removed']}
|
|
||||||
polling_manager.get_events = mock.Mock(side_effect=filter_events)
|
|
||||||
|
|
||||||
def start_agent(self, agent, ports=None, unplug_ports=None):
|
|
||||||
if unplug_ports is None:
|
if unplug_ports is None:
|
||||||
unplug_ports = []
|
unplug_ports = []
|
||||||
if ports is None:
|
|
||||||
ports = []
|
|
||||||
self.setup_agent_rpc_mocks(agent, unplug_ports)
|
self.setup_agent_rpc_mocks(agent, unplug_ports)
|
||||||
polling_manager = polling.InterfacePollingMinimizer()
|
polling_manager = polling.InterfacePollingMinimizer()
|
||||||
self._mock_get_events(agent, polling_manager, ports)
|
|
||||||
self.addCleanup(polling_manager.stop)
|
self.addCleanup(polling_manager.stop)
|
||||||
polling_manager.start()
|
polling_manager.start()
|
||||||
agent_utils.wait_until_true(
|
agent_utils.wait_until_true(
|
||||||
@ -163,7 +138,6 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||||||
rpc_loop_thread.wait()
|
rpc_loop_thread.wait()
|
||||||
|
|
||||||
self.addCleanup(stop_agent, agent, t)
|
self.addCleanup(stop_agent, agent, t)
|
||||||
return polling_manager
|
|
||||||
|
|
||||||
def _create_test_port_dict(self):
|
def _create_test_port_dict(self):
|
||||||
return {'id': uuidutils.generate_uuid(),
|
return {'id': uuidutils.generate_uuid(),
|
||||||
@ -306,10 +280,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||||||
|
|
||||||
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
|
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
|
||||||
trigger_resync=False):
|
trigger_resync=False):
|
||||||
self.ports = port_dicts
|
|
||||||
self.agent = self.create_agent(create_tunnels=create_tunnels)
|
self.agent = self.create_agent(create_tunnels=create_tunnels)
|
||||||
self.polling_manager = self.start_agent(self.agent, ports=self.ports)
|
self.start_agent(self.agent)
|
||||||
self.network = self._create_test_network_dict()
|
self.network = self._create_test_network_dict()
|
||||||
|
self.ports = port_dicts
|
||||||
if trigger_resync:
|
if trigger_resync:
|
||||||
self._prepare_resync_trigger(self.agent)
|
self._prepare_resync_trigger(self.agent)
|
||||||
self._plug_ports(self.network, self.ports, self.agent)
|
self._plug_ports(self.network, self.ports, self.agent)
|
||||||
|
@ -73,6 +73,7 @@ class TestOVSAgent(base.OVSAgentTestFramework):
|
|||||||
port_dicts=self.create_test_ports())
|
port_dicts=self.create_test_ports())
|
||||||
self.wait_until_ports_state(self.ports, up=True)
|
self.wait_until_ports_state(self.ports, up=True)
|
||||||
self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED
|
self.agent.check_ovs_status.return_value = constants.OVS_RESTARTED
|
||||||
|
# OVS restarted, the agent should reprocess all the ports
|
||||||
self.agent.plugin_rpc.update_device_list.reset_mock()
|
self.agent.plugin_rpc.update_device_list.reset_mock()
|
||||||
self.wait_until_ports_state(self.ports, up=True)
|
self.wait_until_ports_state(self.ports, up=True)
|
||||||
|
|
||||||
|
@ -401,130 +401,6 @@ class TestOvsNeutronAgent(object):
|
|||||||
updated_ports)
|
updated_ports)
|
||||||
self.assertEqual(expected, actual)
|
self.assertEqual(expected, actual)
|
||||||
|
|
||||||
def test_process_ports_events_returns_current_for_unchanged_ports(self):
|
|
||||||
with mock.patch.object(self.agent, 'check_changed_vlans',
|
|
||||||
return_value=set()):
|
|
||||||
events = {'added': [], 'removed': []}
|
|
||||||
registered_ports = {1, 3}
|
|
||||||
ancillary_ports = {2, 5}
|
|
||||||
expected_ports = {'current': registered_ports, 'added': set(),
|
|
||||||
'removed': set()}
|
|
||||||
expected_ancillary = {'current': ancillary_ports, 'added': set(),
|
|
||||||
'removed': set()}
|
|
||||||
actual = self.agent.process_ports_events(events, registered_ports,
|
|
||||||
ancillary_ports)
|
|
||||||
self.assertEqual((expected_ports, expected_ancillary), actual)
|
|
||||||
|
|
||||||
def test_process_port_events_returns_port_changes(self):
|
|
||||||
events = {'added': [{'name': 'port3', 'ofport': 3,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}},
|
|
||||||
{'name': 'qg-port2', 'ofport': 5,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}}],
|
|
||||||
'removed': [{'name': 'port2', 'ofport': 2,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}},
|
|
||||||
{'name': 'qg-port1', 'ofport': 4,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}}]}
|
|
||||||
registered_ports = {1, 2}
|
|
||||||
ancillary_ports = {4}
|
|
||||||
expected_ports = dict(
|
|
||||||
current={1, 3}, added={3}, removed={2})
|
|
||||||
expected_ancillary_ports = dict(
|
|
||||||
current={5}, added={5}, removed={4})
|
|
||||||
ancillary_bridge = mock.Mock()
|
|
||||||
ancillary_bridge.get_vif_port_set.return_value = {4, 5}
|
|
||||||
self.agent.ancillary_brs = [ancillary_bridge]
|
|
||||||
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
|
|
||||||
side_effect=[3, 5, 2, 4]), \
|
|
||||||
mock.patch.object(self.agent, 'check_changed_vlans',
|
|
||||||
return_value=set()):
|
|
||||||
actual = self.agent.process_ports_events(
|
|
||||||
events, registered_ports, ancillary_ports)
|
|
||||||
self.assertEqual(
|
|
||||||
(expected_ports, expected_ancillary_ports), actual)
|
|
||||||
|
|
||||||
def _test_process_port_events_with_updated_ports(self, updated_ports):
|
|
||||||
events = {'added': [{'name': 'port3', 'ofport': 3,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}},
|
|
||||||
{'name': 'qg-port2', 'ofport': 6,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}}],
|
|
||||||
'removed': [{'name': 'port2', 'ofport': 2,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}},
|
|
||||||
{'name': 'qg-port1', 'ofport': 5,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}}]}
|
|
||||||
registered_ports = {1, 2, 4}
|
|
||||||
ancillary_ports = {5, 8}
|
|
||||||
expected_ports = dict(current={1, 3, 4}, added={3},
|
|
||||||
removed={2}, updated={4})
|
|
||||||
expected_ancillary = dict(current={6, 8}, added={6},
|
|
||||||
removed={5})
|
|
||||||
ancillary_bridge = mock.Mock()
|
|
||||||
ancillary_bridge.get_vif_port_set.return_value = {5, 6, 8}
|
|
||||||
self.agent.ancillary_brs = [ancillary_bridge]
|
|
||||||
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
|
|
||||||
side_effect=[3, 6, 2, 5]), \
|
|
||||||
mock.patch.object(self.agent, 'check_changed_vlans',
|
|
||||||
return_value=set()):
|
|
||||||
|
|
||||||
actual = self.agent.process_ports_events(
|
|
||||||
events, registered_ports, ancillary_ports, updated_ports)
|
|
||||||
self.assertEqual((expected_ports, expected_ancillary), actual)
|
|
||||||
|
|
||||||
def test_process_port_events_finds_known_updated_ports(self):
|
|
||||||
self._test_process_port_events_with_updated_ports({4})
|
|
||||||
|
|
||||||
def test_process_port_events_ignores_unknown_updated_ports(self):
|
|
||||||
# the port '5' was not seen on current ports. Hence it has either
|
|
||||||
# never been wired or already removed and should be ignored
|
|
||||||
self._test_process_port_events_with_updated_ports({4, 5})
|
|
||||||
|
|
||||||
def test_process_port_events_ignores_updated_port_if_removed(self):
|
|
||||||
events = {'added': [{'name': 'port3', 'ofport': 3,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}}],
|
|
||||||
'removed': [{'name': 'port2', 'ofport': 2,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}}]}
|
|
||||||
registered_ports = {1, 2}
|
|
||||||
updated_ports = {1, 2}
|
|
||||||
expected_ports = dict(current={1, 3}, added={3},
|
|
||||||
removed={2}, updated={1})
|
|
||||||
expected_ancillary = dict(current=set(), added=set(), removed=set())
|
|
||||||
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
|
|
||||||
side_effect=[3, 2]), \
|
|
||||||
mock.patch.object(self.agent, 'check_changed_vlans',
|
|
||||||
return_value=set()):
|
|
||||||
|
|
||||||
actual = self.agent.process_ports_events(
|
|
||||||
events, registered_ports, None, updated_ports)
|
|
||||||
self.assertEqual((expected_ports, expected_ancillary), actual)
|
|
||||||
|
|
||||||
def test_process_port_events_no_vif_changes_return_updated_port_only(self):
|
|
||||||
events = {'added': [], 'removed': []}
|
|
||||||
registered_ports = {1, 2, 3}
|
|
||||||
updated_ports = {2}
|
|
||||||
expected_ports = dict(current=registered_ports, updated={2},
|
|
||||||
added=set(), removed=set())
|
|
||||||
expected_ancillary = dict(current=set(), added=set(), removed=set())
|
|
||||||
with mock.patch.object(self.agent, 'check_changed_vlans',
|
|
||||||
return_value=set()):
|
|
||||||
actual = self.agent.process_ports_events(
|
|
||||||
events, registered_ports, None, updated_ports)
|
|
||||||
self.assertEqual((expected_ports, expected_ancillary), actual)
|
|
||||||
|
|
||||||
def test_process_port_events_ignores_removed_port_if_never_added(self):
|
|
||||||
events = {'added': [],
|
|
||||||
'removed': [{'name': 'port2', 'ofport': 2,
|
|
||||||
'external_ids': {'attached-mac': 'test-mac'}}]}
|
|
||||||
registered_ports = {1}
|
|
||||||
expected_ports = dict(current=registered_ports, added=set(),
|
|
||||||
removed=set())
|
|
||||||
expected_ancillary = dict(current=set(), added=set(), removed=set())
|
|
||||||
with mock.patch.object(self.agent.int_br, 'portid_from_external_ids',
|
|
||||||
side_effect=[2]), \
|
|
||||||
mock.patch.object(self.agent, 'check_changed_vlans',
|
|
||||||
return_value=set()):
|
|
||||||
actual = self.agent.process_ports_events(events, registered_ports,
|
|
||||||
None)
|
|
||||||
self.assertEqual((expected_ports, expected_ancillary), actual)
|
|
||||||
|
|
||||||
def test_update_ports_returns_changed_vlan(self):
|
def test_update_ports_returns_changed_vlan(self):
|
||||||
br = self.br_int_cls('br-int')
|
br = self.br_int_cls('br-int')
|
||||||
mac = "ca:fe:de:ad:be:ef"
|
mac = "ca:fe:de:ad:be:ef"
|
||||||
@ -1495,17 +1371,11 @@ class TestOvsNeutronAgent(object):
|
|||||||
'added': set([]),
|
'added': set([]),
|
||||||
'removed': set(['tap0'])}
|
'removed': set(['tap0'])}
|
||||||
|
|
||||||
reply_ancillary = {'current': set([]),
|
|
||||||
'added': set([]),
|
|
||||||
'removed': set([])}
|
|
||||||
|
|
||||||
with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
|
with mock.patch.object(async_process.AsyncProcess, "_spawn"),\
|
||||||
mock.patch.object(async_process.AsyncProcess, "start"),\
|
|
||||||
mock.patch.object(async_process.AsyncProcess, "stop"),\
|
|
||||||
mock.patch.object(log.KeywordArgumentAdapter,
|
mock.patch.object(log.KeywordArgumentAdapter,
|
||||||
'exception') as log_exception,\
|
'exception') as log_exception,\
|
||||||
mock.patch.object(self.mod_agent.OVSNeutronAgent,
|
mock.patch.object(self.mod_agent.OVSNeutronAgent,
|
||||||
'process_ports_events') as process_p_events,\
|
'scan_ports') as scan_ports,\
|
||||||
mock.patch.object(
|
mock.patch.object(
|
||||||
self.mod_agent.OVSNeutronAgent,
|
self.mod_agent.OVSNeutronAgent,
|
||||||
'process_network_ports') as process_network_ports,\
|
'process_network_ports') as process_network_ports,\
|
||||||
@ -1523,8 +1393,7 @@ class TestOvsNeutronAgent(object):
|
|||||||
'cleanup_stale_flows') as cleanup:
|
'cleanup_stale_flows') as cleanup:
|
||||||
log_exception.side_effect = Exception(
|
log_exception.side_effect = Exception(
|
||||||
'Fake exception to get out of the loop')
|
'Fake exception to get out of the loop')
|
||||||
process_p_events.side_effect = [(reply2, reply_ancillary),
|
scan_ports.side_effect = [reply2, reply3]
|
||||||
(reply3, reply_ancillary)]
|
|
||||||
process_network_ports.side_effect = [
|
process_network_ports.side_effect = [
|
||||||
False, Exception('Fake exception to get out of the loop')]
|
False, Exception('Fake exception to get out of the loop')]
|
||||||
check_ovs_status.side_effect = args
|
check_ovs_status.side_effect = args
|
||||||
@ -1533,12 +1402,10 @@ class TestOvsNeutronAgent(object):
|
|||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
process_p_events.assert_has_calls([
|
scan_ports.assert_has_calls([
|
||||||
mock.call({'removed': [], 'added': []}, set(), None, set()),
|
mock.call(set(), True, set()),
|
||||||
mock.call({'removed': [], 'added': []}, set(['tap0']), None,
|
mock.call(set(), False, set())
|
||||||
set())
|
|
||||||
])
|
])
|
||||||
|
|
||||||
process_network_ports.assert_has_calls([
|
process_network_ports.assert_has_calls([
|
||||||
mock.call(reply2, False),
|
mock.call(reply2, False),
|
||||||
mock.call(reply3, True)
|
mock.call(reply3, True)
|
||||||
|
@ -495,24 +495,14 @@ class TunnelTest(object):
|
|||||||
self._verify_mock_calls()
|
self._verify_mock_calls()
|
||||||
|
|
||||||
def test_daemon_loop(self):
|
def test_daemon_loop(self):
|
||||||
reply_ge_1 = {'added': set(['tap0']),
|
reply2 = {'current': set(['tap0']),
|
||||||
'removed': set([])}
|
'added': set(['tap2']),
|
||||||
|
'removed': set([])}
|
||||||
|
|
||||||
reply_ge_2 = {'added': set([]),
|
reply3 = {'current': set(['tap2']),
|
||||||
|
'added': set([]),
|
||||||
'removed': set(['tap0'])}
|
'removed': set(['tap0'])}
|
||||||
|
|
||||||
reply_pe_1 = {'current': set(['tap0']),
|
|
||||||
'added': set(['tap0']),
|
|
||||||
'removed': set([])}
|
|
||||||
|
|
||||||
reply_pe_2 = {'current': set([]),
|
|
||||||
'added': set([]),
|
|
||||||
'removed': set(['tap0'])}
|
|
||||||
|
|
||||||
reply_ancillary = {'current': set([]),
|
|
||||||
'added': set([]),
|
|
||||||
'removed': set([])}
|
|
||||||
|
|
||||||
self.mock_int_bridge_expected += [
|
self.mock_int_bridge_expected += [
|
||||||
mock.call.check_canary_table(),
|
mock.call.check_canary_table(),
|
||||||
mock.call.check_canary_table()
|
mock.call.check_canary_table()
|
||||||
@ -523,7 +513,7 @@ class TunnelTest(object):
|
|||||||
with mock.patch.object(log.KeywordArgumentAdapter,
|
with mock.patch.object(log.KeywordArgumentAdapter,
|
||||||
'exception') as log_exception,\
|
'exception') as log_exception,\
|
||||||
mock.patch.object(self.mod_agent.OVSNeutronAgent,
|
mock.patch.object(self.mod_agent.OVSNeutronAgent,
|
||||||
'process_ports_events') as process_p_events,\
|
'scan_ports') as scan_ports,\
|
||||||
mock.patch.object(
|
mock.patch.object(
|
||||||
self.mod_agent.OVSNeutronAgent,
|
self.mod_agent.OVSNeutronAgent,
|
||||||
'process_network_ports') as process_network_ports,\
|
'process_network_ports') as process_network_ports,\
|
||||||
@ -538,11 +528,8 @@ class TunnelTest(object):
|
|||||||
'cleanup_stale_flows') as cleanup:
|
'cleanup_stale_flows') as cleanup:
|
||||||
log_exception.side_effect = Exception(
|
log_exception.side_effect = Exception(
|
||||||
'Fake exception to get out of the loop')
|
'Fake exception to get out of the loop')
|
||||||
|
scan_ports.side_effect = [reply2, reply3]
|
||||||
update_stale.return_value = []
|
update_stale.return_value = []
|
||||||
process_p_events.side_effect = [
|
|
||||||
(reply_pe_1, reply_ancillary), (reply_pe_2, reply_ancillary)]
|
|
||||||
interface_polling = mock.Mock()
|
|
||||||
interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
|
|
||||||
process_network_ports.side_effect = [
|
process_network_ports.side_effect = [
|
||||||
False, Exception('Fake exception to get out of the loop')]
|
False, Exception('Fake exception to get out of the loop')]
|
||||||
|
|
||||||
@ -552,7 +539,7 @@ class TunnelTest(object):
|
|||||||
# We start method and expect it will raise after 2nd loop
|
# We start method and expect it will raise after 2nd loop
|
||||||
# If something goes wrong, assert_has_calls below will catch it
|
# If something goes wrong, assert_has_calls below will catch it
|
||||||
try:
|
try:
|
||||||
n_agent.rpc_loop(interface_polling)
|
n_agent.daemon_loop()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@ -560,14 +547,17 @@ class TunnelTest(object):
|
|||||||
# messages
|
# messages
|
||||||
log_exception.assert_called_once_with(
|
log_exception.assert_called_once_with(
|
||||||
"Error while processing VIF ports")
|
"Error while processing VIF ports")
|
||||||
process_p_events.assert_has_calls([
|
scan_ports.assert_has_calls([
|
||||||
mock.call(reply_ge_1, set(), None, set()),
|
mock.call(set(), True, set()),
|
||||||
mock.call(reply_ge_2, set(['tap0']), None, set())
|
mock.call(set(['tap0']), False, set())
|
||||||
])
|
])
|
||||||
process_network_ports.assert_has_calls([
|
process_network_ports.assert_has_calls([
|
||||||
mock.call({'current': set(['tap0']),
|
mock.call({'current': set(['tap0']),
|
||||||
'removed': set([]),
|
'removed': set([]),
|
||||||
'added': set(['tap0'])}, False),
|
'added': set(['tap2'])}, False),
|
||||||
|
mock.call({'current': set(['tap2']),
|
||||||
|
'removed': set(['tap0']),
|
||||||
|
'added': set([])}, False)
|
||||||
])
|
])
|
||||||
|
|
||||||
cleanup.assert_called_once_with()
|
cleanup.assert_called_once_with()
|
||||||
|
Loading…
Reference in New Issue
Block a user