Don't sync all devices when some fail

With the new RPC calls get_devices_details_and_failed_devices
and update_device_list the agent gets a list of devices
for which some operation failed. The agent can now make use
of this information and instead of syncing all the devices
can sync only those which failed.
With the current change if a device keeps failing, the agent
will try to sync it forever. In a following patch I will limit
the number of retrials.

Partially-Implements: blueprint restructure-l2-agent
Change-Id: I295dc79031a0547f8687c5835c7ba7bbc43df36d
This commit is contained in:
rossella 2015-07-06 01:02:12 +02:00
parent d6d43b32ca
commit 584faf47e2
5 changed files with 369 additions and 158 deletions

View File

@ -41,7 +41,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc
from neutron.api.rpc.handlers import dvr_rpc
from neutron.common import config
from neutron.common import constants as n_const
from neutron.common import exceptions
from neutron.common import ipv6_utils as ipv6
from neutron.common import topics
from neutron.common import utils as n_utils
@ -70,10 +69,6 @@ class _mac_mydialect(netaddr.mac_unix):
word_fmt = '%.2x'
class DeviceListRetrievalError(exceptions.NeutronException):
message = _("Unable to retrieve port details for devices: %(devices)s ")
class LocalVLANMapping(object):
def __init__(self, vlan, network_type, physical_network, segmentation_id,
@ -803,6 +798,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def _bind_devices(self, need_binding_ports):
devices_up = []
devices_down = []
failed_devices = []
port_names = [p['vif_port'].port_name for p in need_binding_ports]
port_info = self.int_br.get_ports_attributes(
"Port", columns=["name", "tag"], ports=port_names, if_exists=True)
@ -841,22 +837,19 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
else:
LOG.debug("Setting status for %s to DOWN", device)
devices_down.append(device)
failed_devices = []
if devices_up or devices_down:
devices_set = self.plugin_rpc.update_device_list(
self.context, devices_up, devices_down, self.agent_id,
self.conf.host)
failed_devices = (devices_set.get('failed_devices_up') +
devices_set.get('failed_devices_down'))
if failed_devices:
LOG.error(_LE("Configuration for devices %s failed!"),
failed_devices)
#TODO(rossella_s) handle better the resync in next patches,
# this is just to preserve the current behavior
raise DeviceListRetrievalError(devices=failed_devices)
if failed_devices:
LOG.error(_LE("Configuration for devices %s failed!"),
failed_devices)
LOG.info(_LI("Configuration for devices up %(up)s and devices "
"down %(down)s completed."),
{'up': devices_up, 'down': devices_down})
return set(failed_devices)
@staticmethod
def setup_arp_spoofing_protection(bridge, vif, port_details):
@ -1184,8 +1177,25 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_info['removed'] = registered_ports - cur_ports
return port_info
def _update_port_info_failed_devices_stats(self, port_info,
failed_devices):
# remove failed devices that don't need to be retried
failed_devices['added'] -= port_info['removed']
failed_devices['removed'] -= port_info['added']
# Disregard devices that were never noticed by the agent
port_info['removed'] &= port_info['current']
# retry failed devices
port_info['added'] |= failed_devices['added']
LOG.debug("retrying failed devices %s", failed_devices['added'])
port_info['removed'] |= failed_devices['removed']
# Update current ports
port_info['current'] |= port_info['added']
port_info['current'] -= port_info['removed']
def process_ports_events(self, events, registered_ports, ancillary_ports,
old_ports_not_ready, updated_ports=None):
old_ports_not_ready, failed_devices,
failed_ancillary_devices, updated_ports=None):
port_info = {}
port_info['added'] = set()
port_info['removed'] = set()
@ -1195,8 +1205,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ancillary_port_info['added'] = set()
ancillary_port_info['removed'] = set()
ancillary_port_info['current'] = ancillary_ports
ports_not_ready_yet = set()
ports_not_ready_yet = set()
# if a port was added and then removed or viceversa since the agent
# can't know the order of the operations, check the status of the port
# to determine if the port was added or deleted
@ -1256,19 +1266,14 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
_process_port(port, port_info['removed'],
ancillary_port_info['removed'])
self._update_port_info_failed_devices_stats(port_info, failed_devices)
self._update_port_info_failed_devices_stats(ancillary_port_info,
failed_ancillary_devices)
if updated_ports is None:
updated_ports = set()
updated_ports.update(self.check_changed_vlans())
# Disregard devices that were never noticed by the agent
port_info['removed'] &= port_info['current']
port_info['current'] |= port_info['added']
port_info['current'] -= port_info['removed']
ancillary_port_info['removed'] &= ancillary_port_info['current']
ancillary_port_info['current'] |= ancillary_port_info['added']
ancillary_port_info['current'] -= ancillary_port_info['removed']
if updated_ports:
# Some updated ports might have been removed in the
# meanwhile, and therefore should not be processed.
@ -1416,10 +1421,7 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
devices,
self.agent_id,
self.conf.host))
if devices_details_list.get('failed_devices'):
#TODO(rossella_s) handle better the resync in next patches,
# this is just to preserve the current behavior
raise DeviceListRetrievalError(devices=devices)
failed_devices = set(devices_details_list.get('failed_devices'))
devices = devices_details_list.get('devices')
vif_by_id = self.int_br.get_vifs_by_ids(
@ -1462,7 +1464,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.warn(_LW("Device %s not defined on plugin"), device)
if (port and port.ofport != -1):
self.port_dead(port)
return skipped_devices, need_binding_devices, security_disabled_devices
return (skipped_devices, need_binding_devices,
security_disabled_devices, failed_devices)
def _update_port_network(self, port_id, network_id):
self._clean_network_ports(port_id)
@ -1475,13 +1478,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
devices,
self.agent_id,
self.conf.host))
if devices_details_list.get('failed_devices'):
#TODO(rossella_s) handle better the resync in next patches,
# this is just to preserve the current behavior
raise DeviceListRetrievalError(devices=devices)
failed_devices = set(devices_details_list.get('failed_devices'))
devices_added = [
d['device'] for d in devices_details_list.get('devices')]
LOG.info(_LI("Ancillary Ports %s added"), devices_added)
# update plugin about port status
devices_set_up = (
@ -1490,13 +1489,13 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
[],
self.agent_id,
self.conf.host))
if devices_set_up.get('failed_devices_up'):
#TODO(rossella_s) handle better the resync in next patches,
# this is just to preserve the current behavior
raise DeviceListRetrievalError()
failed_devices |= set(devices_set_up.get('failed_devices_up'))
LOG.info(_LI("Ancillary Ports %(added)s added, failed devices "
"%(failed)s"), {'added': devices,
'failed': failed_devices})
return failed_devices
def treat_devices_removed(self, devices):
resync = False
self.sg_agent.remove_devices_filter(devices)
LOG.info(_LI("Ports %s removed"), devices)
devices_down = self.plugin_rpc.update_device_list(self.context,
@ -1504,37 +1503,33 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
devices,
self.agent_id,
self.conf.host)
failed_devices = devices_down.get('failed_devices_down')
if failed_devices:
LOG.debug("Port removal failed for %(devices)s ", failed_devices)
resync = True
failed_devices = set(devices_down.get('failed_devices_down'))
LOG.debug("Port removal failed for %s", failed_devices)
for device in devices:
self.port_unbound(device)
return resync
return failed_devices
def treat_ancillary_devices_removed(self, devices):
resync = False
LOG.info(_LI("Ancillary ports %s removed"), devices)
devices_down = self.plugin_rpc.update_device_list(self.context,
[],
devices,
self.agent_id,
self.conf.host)
failed_devices = devices_down.get('failed_devices_down')
LOG.info(_LI("Devices down %s "), devices_down)
failed_devices = set(devices_down.get('failed_devices_down'))
if failed_devices:
LOG.debug("Port removal failed for %(devices)s ", failed_devices)
resync = True
LOG.debug("Port removal failed for %(devices)s", failed_devices)
for detail in devices_down.get('devices_down'):
if detail['exists']:
LOG.info(_LI("Port %s updated."), detail['device'])
# Nothing to do regarding local networking
else:
LOG.debug("Device %s not defined on plugin", detail['device'])
return resync
return failed_devices
def process_network_ports(self, port_info, ovs_restarted):
resync_a = False
resync_b = False
failed_devices = {'added': set(), 'removed': set()}
# TODO(salv-orlando): consider a solution for ensuring notifications
# are processed exactly in the same order in which they were
# received. This is tricky because there are two notification
@ -1552,31 +1547,23 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
security_disabled_ports = []
if devices_added_updated:
start = time.time()
try:
(skipped_devices, need_binding_devices,
security_disabled_ports) = (
self.treat_devices_added_or_updated(
devices_added_updated, ovs_restarted))
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "
"%(num_current)d devices currently available. "
"Time elapsed: %(elapsed).3f",
{'iter_num': self.iter_num,
'num_skipped': len(skipped_devices),
'num_current': len(port_info['current']),
'elapsed': time.time() - start})
# Update the list of current ports storing only those which
# have been actually processed.
port_info['current'] = (port_info['current'] -
set(skipped_devices))
except DeviceListRetrievalError:
# Need to resync as there was an error with server
# communication.
LOG.exception(_LE("process_network_ports - iteration:%d - "
"failure while retrieving port details "
"from server"), self.iter_num)
resync_a = True
(skipped_devices, need_binding_devices,
security_disabled_ports, failed_devices['added']) = (
self.treat_devices_added_or_updated(
devices_added_updated, ovs_restarted))
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_added_or_updated completed. "
"Skipped %(num_skipped)d devices of "
"%(num_current)d devices currently available. "
"Time elapsed: %(elapsed).3f",
{'iter_num': self.iter_num,
'num_skipped': len(skipped_devices),
'num_current': len(port_info['current']),
'elapsed': time.time() - start})
# Update the list of current ports storing only those which
# have been actually processed.
port_info['current'] = (port_info['current'] -
set(skipped_devices))
# TODO(salv-orlando): Optimize avoiding applying filters
# unnecessarily, (eg: when there are no IP address changes)
@ -1585,49 +1572,43 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
added_ports -= set(security_disabled_ports)
self.sg_agent.setup_port_filters(added_ports,
port_info.get('updated', set()))
self._bind_devices(need_binding_devices)
failed_devices['added'] |= self._bind_devices(need_binding_devices)
if 'removed' in port_info and port_info['removed']:
start = time.time()
resync_b = self.treat_devices_removed(port_info['removed'])
failed_devices['removed'] |= self.treat_devices_removed(
port_info['removed'])
LOG.debug("process_network_ports - iteration:%(iter_num)d - "
"treat_devices_removed completed in %(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
return failed_devices
def process_ancillary_network_ports(self, port_info):
resync_a = False
resync_b = False
failed_devices = {'added': set(), 'removed': set()}
if 'added' in port_info and port_info['added']:
start = time.time()
try:
self.treat_ancillary_devices_added(port_info['added'])
LOG.debug("process_ancillary_network_ports - iteration: "
"%(iter_num)d - treat_ancillary_devices_added "
"completed in %(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
except DeviceListRetrievalError:
# Need to resync as there was an error with server
# communication.
LOG.exception(_LE("process_ancillary_network_ports - "
"iteration:%d - failure while retrieving "
"port details from server"), self.iter_num)
resync_a = True
failed_added = self.treat_ancillary_devices_added(
port_info['added'])
LOG.debug("process_ancillary_network_ports - iteration: "
"%(iter_num)d - treat_ancillary_devices_added "
"completed in %(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
failed_devices['added'] = failed_added
if 'removed' in port_info and port_info['removed']:
start = time.time()
resync_b = self.treat_ancillary_devices_removed(
failed_removed = self.treat_ancillary_devices_removed(
port_info['removed'])
failed_devices['removed'] = failed_removed
LOG.debug("process_ancillary_network_ports - iteration: "
"%(iter_num)d - treat_ancillary_devices_removed "
"completed in %(elapsed).3f",
{'iter_num': self.iter_num,
'elapsed': time.time() - start})
# If one of the above operations fails => resync with plugin
return (resync_a | resync_b)
return failed_devices
def get_ip_in_hex(self, ip_address):
try:
@ -1724,7 +1705,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
def process_port_info(self, start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs, ports_not_ready_yet):
consecutive_resyncs, ports_not_ready_yet,
failed_devices, failed_ancillary_devices):
# There are polling managers that don't have get_events, e.g.
# AlwaysPoll used by windows implementations
# REVISIT (rossella_s) This needs to be reworked to hide implementation
@ -1744,6 +1726,10 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
consecutive_resyncs = 0
else:
consecutive_resyncs = 0
# TODO(rossella_s): For implementations that use AlwaysPoll
# resync if a device failed. This can be improved in future
sync = (any(failed_devices.values()) or
any(failed_ancillary_devices.values()))
# NOTE(rossella_s) don't empty the queue of events
# calling polling_manager.get_events() since
@ -1770,6 +1756,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
port_info, ancillary_port_info, ports_not_ready_yet = (
self.process_ports_events(events, ports, ancillary_ports,
ports_not_ready_yet,
failed_devices,
failed_ancillary_devices,
updated_ports_copy))
return (port_info, ancillary_port_info, consecutive_resyncs,
ports_not_ready_yet)
@ -1788,6 +1776,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
consecutive_resyncs = 0
need_clean_stale_flow = True
ports_not_ready_yet = set()
failed_devices = {'added': set(), 'removed': set()}
failed_ancillary_devices = {'added': set(), 'removed': set()}
while self._check_and_handle_signal():
if self.fullsync:
LOG.info(_LI("rpc_loop doing a full sync."))
@ -1838,8 +1829,11 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
LOG.exception(_LE("Error while synchronizing tunnels"))
tunnel_sync = True
ovs_restarted |= (ovs_status == constants.OVS_RESTARTED)
devices_need_retry = (any(failed_devices.values()) or
any(failed_ancillary_devices.values()) or
ports_not_ready_yet)
if (self._agent_has_updates(polling_manager) or sync
or ports_not_ready_yet):
or devices_need_retry):
try:
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
"starting polling. Elapsed:%(elapsed).3f",
@ -1855,8 +1849,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ports_not_ready_yet) = (self.process_port_info(
start, polling_manager, sync, ovs_restarted,
ports, ancillary_ports, updated_ports_copy,
consecutive_resyncs, ports_not_ready_yet)
)
consecutive_resyncs, ports_not_ready_yet,
failed_devices, failed_ancillary_devices))
sync = False
self.process_deleted_ports(port_info)
ofport_changed_ports = self.update_stale_ofport_rules()
@ -1875,10 +1869,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ovs_restarted):
LOG.debug("Starting to process devices in:%s",
port_info)
# If treat devices fails - must resync with plugin
sync = self.process_network_ports(port_info,
ovs_restarted)
if not sync and need_clean_stale_flow:
failed_devices = self.process_network_ports(
port_info, ovs_restarted)
if need_clean_stale_flow:
self.cleanup_stale_flows()
need_clean_stale_flow = False
LOG.debug("Agent rpc_loop - iteration:%(iter_num)d - "
@ -1889,8 +1882,9 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
ports = port_info['current']
if self.ancillary_brs:
sync |= self.process_ancillary_network_ports(
ancillary_port_info)
failed_ancillary_devices = (
self.process_ancillary_network_ports(
ancillary_port_info))
LOG.debug("Agent rpc_loop - iteration: "
"%(iter_num)d - ancillary ports "
"processed. Elapsed:%(elapsed).3f",
@ -1901,9 +1895,8 @@ class OVSNeutronAgent(sg_rpc.SecurityGroupAgentRpcCallbackMixin,
polling_manager.polling_completed()
# Keep this flag in the last line of "try" block,
# so we can sure that no other Exception occurred.
if not sync:
ovs_restarted = False
self._dispose_local_vlan_hints()
ovs_restarted = False
self._dispose_local_vlan_hints()
except Exception:
LOG.exception(_LE("Error while processing VIF ports"))
# Put the ports back in self.updated_port

View File

@ -99,7 +99,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
'br_tun': br_tun.OVSTunnelBridge
}
def create_agent(self, create_tunnels=True):
def create_agent(self, create_tunnels=True, ancillary_bridge=None):
if create_tunnels:
tunnel_types = [p_const.TYPE_VXLAN]
else:
@ -120,6 +120,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
self.addCleanup(self.ovs.delete_bridge, self.br_phys)
agent.sg_agent = mock.Mock()
agent.ancillary_brs = []
if ancillary_bridge:
agent.ancillary_brs.append(ancillary_bridge)
return agent
def _mock_get_events(self, agent, polling_manager, ports):
@ -182,12 +184,14 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
return {'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid()}
def _plug_ports(self, network, ports, agent, ip_len=24):
def _plug_ports(self, network, ports, agent, ip_len=24,
bridge=None):
for port in ports:
bridge = bridge or agent.int_br
self.driver.plug(
network.get('id'), port.get('id'), port.get('vif_name'),
port.get('mac_address'),
agent.int_br.br_name, namespace=self.namespace)
bridge.br_name, namespace=self.namespace)
ip_cidrs = ["%s/%s" % (port.get('fixed_ips')[0][
'ip_address'], ip_len)]
self.driver.init_l3(port.get('vif_name'), ip_cidrs,
@ -249,7 +253,13 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
else:
rpc_devices = [
dev for args in call.call_args_list for dev in args[0][2]]
return not (set(expected_devices) - set(rpc_devices))
for dev in rpc_devices:
if dev in expected_devices:
expected_devices.remove(dev)
# reset mock otherwise if the mock is called again the same call param
# will be processed again
call.reset_mock()
return not expected_devices
def create_test_ports(self, amount=3, **kwargs):
ports = []
@ -299,6 +309,53 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
self.agent.plugin_rpc.update_device_list.side_effect = (
mock_device_raise_exception)
def _prepare_failed_dev_up_trigger(self, agent):
def mock_failed_devices_up(context, devices_up, devices_down,
agent_id, host=None):
failed_devices = []
devices = list(devices_up)
# first port fails
if self.ports[0]['id'] in devices_up:
# reassign side_effect so that next RPC call will succeed
agent.plugin_rpc.update_device_list.side_effect = (
self._mock_update_device)
devices.remove(self.ports[0]['id'])
failed_devices.append(self.ports[0]['id'])
return {'devices_up': devices,
'failed_devices_up': failed_devices,
'devices_down': [],
'failed_devices_down': []}
self.agent.plugin_rpc.update_device_list.side_effect = (
mock_failed_devices_up)
def _prepare_failed_dev_down_trigger(self, agent):
def mock_failed_devices_down(context, devices_up, devices_down,
agent_id, host=None):
# first port fails
failed_port_id = self.ports[0]['id']
failed_devices_down = []
dev_down = [
{'device': p['id'], 'exists': True}
for p in self.ports if p['id'] in devices_down and (
p['id'] != failed_port_id)]
# check if it's the call to set devices down and if the device
# that is supposed to fail is in the call then modify the
# side_effect so that next RPC call will succeed.
if devices_down and failed_port_id in devices_down:
agent.plugin_rpc.update_device_list.side_effect = (
self._mock_update_device)
failed_devices_down.append(failed_port_id)
return {'devices_up': devices_up,
'failed_devices_up': [],
'devices_down': dev_down,
'failed_devices_down': failed_devices_down}
self.agent.plugin_rpc.update_device_list.side_effect = (
mock_failed_devices_down)
def wait_until_ports_state(self, ports, up, timeout=60):
port_ids = [p['id'] for p in ports]
agent_utils.wait_until_true(
@ -307,11 +364,21 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
timeout=timeout)
def setup_agent_and_ports(self, port_dicts, create_tunnels=True,
trigger_resync=False):
ancillary_bridge=None,
trigger_resync=False,
failed_dev_up=False,
failed_dev_down=False):
self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels)
self.agent = self.create_agent(create_tunnels=create_tunnels,
ancillary_bridge=ancillary_bridge)
self.polling_manager = self.start_agent(self.agent, ports=self.ports)
self.network = self._create_test_network_dict()
if trigger_resync:
self._prepare_resync_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent)
elif failed_dev_up:
self._prepare_failed_dev_up_trigger(self.agent)
elif failed_dev_down:
self._prepare_failed_dev_down_trigger(self.agent)
self._plug_ports(self.network, self.ports, self.agent,
bridge=ancillary_bridge)

View File

@ -77,6 +77,79 @@ class TestOVSAgent(base.OVSAgentTestFramework):
self.agent.plugin_rpc.update_device_list.reset_mock()
self.wait_until_ports_state(self.ports, up=True)
def test_resync_dev_up_after_failure(self):
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),
failed_dev_up=True)
# in the RPC mock the first port fails and should
# be re-synced
expected_ports = self.ports + [self.ports[0]]
self.wait_until_ports_state(expected_ports, up=True)
def test_resync_dev_down_after_failure(self):
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),
failed_dev_down=True)
self.wait_until_ports_state(self.ports, up=True)
for port in self.ports:
self.agent.int_br.delete_port(port['vif_name'])
# in the RPC mock the first port fails and should
# be re-synced
expected_ports = self.ports + [self.ports[0]]
self.wait_until_ports_state(expected_ports, up=False)
def test_ancillary_port_creation_and_deletion(self):
external_bridge = self.useFixture(
net_helpers.OVSBridgeFixture()).bridge
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),
ancillary_bridge=external_bridge)
self.wait_until_ports_state(self.ports, up=True)
for port in self.ports:
external_bridge.delete_port(port['vif_name'])
self.wait_until_ports_state(self.ports, up=False)
def test_resync_ancillary_devices(self):
external_bridge = self.useFixture(
net_helpers.OVSBridgeFixture()).bridge
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),
ancillary_bridge=external_bridge,
trigger_resync=True)
self.wait_until_ports_state(self.ports, up=True)
def test_resync_ancillary_dev_up_after_failure(self):
external_bridge = self.useFixture(
net_helpers.OVSBridgeFixture()).bridge
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),
ancillary_bridge=external_bridge,
failed_dev_up=True)
# in the RPC mock the first port fails and should
# be re-synced
expected_ports = self.ports + [self.ports[0]]
self.wait_until_ports_state(expected_ports, up=True)
def test_resync_ancillary_dev_down_after_failure(self):
external_bridge = self.useFixture(
net_helpers.OVSBridgeFixture()).bridge
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),
ancillary_bridge=external_bridge,
failed_dev_down=True)
self.wait_until_ports_state(self.ports, up=True)
for port in self.ports:
external_bridge.delete_port(port['vif_name'])
# in the RPC mock the first port fails and should
# be re-synced
expected_ports = self.ports + [self.ports[0]]
self.wait_until_ports_state(expected_ports, up=False)
def test_port_vlan_tags(self):
self.setup_agent_and_ports(
port_dicts=self.create_test_ports(),

View File

@ -389,13 +389,18 @@ class TestOvsNeutronAgent(object):
def _test_process_ports_events(self, events, registered_ports,
ancillary_ports, expected_ports,
expected_ancillary, updated_ports=None):
expected_ancillary, updated_ports=None,
):
with mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
devices_not_ready_yet = set()
failed_devices = {'added': set(), 'removed': set()}
failed_ancillary_devices = {
'added': set(), 'removed': set()}
actual = self.agent.process_ports_events(
events, registered_ports, ancillary_ports,
devices_not_ready_yet, updated_ports)
devices_not_ready_yet, failed_devices,
failed_ancillary_devices, updated_ports)
self.assertEqual(
(expected_ports, expected_ancillary, devices_not_ready_yet),
actual)
@ -436,8 +441,13 @@ class TestOvsNeutronAgent(object):
side_effect=[2]), \
mock.patch.object(self.agent, 'check_changed_vlans',
return_value=set()):
failed_devices = {'added': set(), 'removed': set()}
failed_ancillary_devices = {
'added': set(), 'removed': set()}
ports_not_ready_yet = set()
actual = self.agent.process_ports_events(
events, registered_ports, set(), devices_not_ready_yet)
events, registered_ports, set(), ports_not_ready_yet,
failed_devices, failed_ancillary_devices)
self.assertEqual(
(expected_ports, expected_ancillary, devices_not_ready_yet),
actual)
@ -461,12 +471,15 @@ class TestOvsNeutronAgent(object):
'external_ids': {
'attached-mac': 'mac4'}}]):
expected_devices_not_ready = {'port5'}
failed_devices = {'added': set(), 'removed': set()}
failed_ancillary_devices = {
'added': set(), 'removed': set()}
actual = self.agent.process_ports_events(
events, registered_ports, set(), old_devices_not_ready)
events, registered_ports, set(), old_devices_not_ready,
failed_devices, failed_ancillary_devices)
self.assertEqual(
(expected_ports, expected_ancillary,
expected_devices_not_ready),
actual)
expected_devices_not_ready), actual)
def _test_process_port_events_with_updated_ports(self, updated_ports):
events = {'added': [{'name': 'port3', 'ofport': 3,
@ -493,9 +506,13 @@ class TestOvsNeutronAgent(object):
return_value=set()):
devices_not_ready_yet = set()
failed_devices = {'added': set(), 'removed': set()}
failed_ancillary_devices = {
'added': set(), 'removed': set()}
actual = self.agent.process_ports_events(
events, registered_ports, ancillary_ports,
devices_not_ready_yet, updated_ports)
devices_not_ready_yet, failed_devices,
failed_ancillary_devices, updated_ports)
self.assertEqual(
(expected_ports, expected_ancillary, devices_not_ready_yet),
actual)
@ -610,7 +627,7 @@ class TestOvsNeutronAgent(object):
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
return_value={'devices': [details],
'failed_devices': None}),\
'failed_devices': []}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={details['device']: port}),\
@ -623,8 +640,8 @@ class TestOvsNeutronAgent(object):
'get_port_tag_dict',
return_value={}),\
mock.patch.object(self.agent, func_name) as func:
skip_devs, need_bound_devices, insecure_ports = (
self.agent.treat_devices_added_or_updated([{}], False))
skip_devs, need_bound_devices, insecure_ports, _ = (
self.agent.treat_devices_added_or_updated([], False))
# The function should not raise
self.assertFalse(skip_devs)
return func.called
@ -668,7 +685,7 @@ class TestOvsNeutronAgent(object):
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
return_value={'devices': [details],
'failed_devices': None}),\
'failed_devices': []}),\
mock.patch.object(self.agent.ext_manager,
'handle_port', new=fake_handle_port),\
mock.patch.object(self.agent.int_br,
@ -677,7 +694,7 @@ class TestOvsNeutronAgent(object):
mock.patch.object(self.agent, 'treat_vif_port',
return_value=False):
self.agent.treat_devices_added_or_updated([{}], False)
self.agent.treat_devices_added_or_updated([], False)
def test_treat_devices_added_updated_skips_if_port_not_found(self):
dev_mock = mock.MagicMock()
@ -685,7 +702,7 @@ class TestOvsNeutronAgent(object):
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
return_value={'devices': [dev_mock],
'failed_devices': None}),\
'failed_devices': []}),\
mock.patch.object(self.agent.int_br,
'get_port_tag_dict',
return_value={}),\
@ -694,10 +711,29 @@ class TestOvsNeutronAgent(object):
return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs = self.agent.treat_devices_added_or_updated([{}], False)
skip_devs = self.agent.treat_devices_added_or_updated([], False)
# The function should return False for resync and no device
# processed
self.assertEqual((['the_skipped_one'], [], []), skip_devs)
self.assertEqual((['the_skipped_one'], [], [], set()), skip_devs)
self.assertFalse(treat_vif_port.called)
def test_treat_devices_added_failed_devices(self):
dev_mock = 'the_failed_one'
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
return_value={'devices': [],
'failed_devices': [dev_mock]}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
failed_devices = {'added': set(), 'removed': set()}
(_, _, _, failed_devices['added']) = (
self.agent.treat_devices_added_or_updated([], False))
# The function should return False for resync and no device
# processed
self.assertEqual(set([dev_mock]), failed_devices.get('added'))
self.assertFalse(treat_vif_port.called)
def test_treat_devices_added_updated_put_port_down(self):
@ -717,7 +753,7 @@ class TestOvsNeutronAgent(object):
with mock.patch.object(self.agent.plugin_rpc,
'get_devices_details_list_and_failed_devices',
return_value={'devices': [fake_details_dict],
'failed_devices': None}),\
'failed_devices': []}),\
mock.patch.object(self.agent.int_br,
'get_vifs_by_ids',
return_value={'xxx': mock.MagicMock()}),\
@ -725,8 +761,8 @@ class TestOvsNeutronAgent(object):
return_value={}),\
mock.patch.object(self.agent,
'treat_vif_port') as treat_vif_port:
skip_devs, need_bound_devices, insecure_ports = (
self.agent.treat_devices_added_or_updated([{}], False))
skip_devs, need_bound_devices, insecure_ports, _ = (
self.agent.treat_devices_added_or_updated([], False))
# The function should return False for resync
self.assertFalse(skip_devs)
self.assertTrue(treat_vif_port.called)
@ -749,6 +785,19 @@ class TestOvsNeutronAgent(object):
def test_treat_devices_removed_ignores_missing_port(self):
self._mock_treat_devices_removed(False)
def test_treat_devices_removed_failed_devices(self):
dev_mock = 'the_failed_one'
with mock.patch.object(self.agent.plugin_rpc,
'update_device_list',
return_value={'devices_up': [],
'devices_down': [],
'failed_devices_up': [],
'failed_devices_down': [
dev_mock]}):
failed_devices = {'added': set(), 'removed': set()}
failed_devices['removed'] = self.agent.treat_devices_removed([{}])
self.assertEqual(set([dev_mock]), failed_devices.get('removed'))
def test_bind_port_with_missing_network(self):
vif_port = mock.Mock()
vif_port.name.return_value = 'port'
@ -756,19 +805,24 @@ class TestOvsNeutronAgent(object):
'vif_port': vif_port}])
def _test_process_network_ports(self, port_info):
failed_devices = {'added': set(), 'removed': set()}
with mock.patch.object(self.agent.sg_agent,
"setup_port_filters") as setup_port_filters,\
mock.patch.object(
self.agent,
"treat_devices_added_or_updated",
return_value=([], [], [])) as device_added_updated,\
self.agent, "treat_devices_added_or_updated",
return_value=(
[], [], [],
failed_devices['added'])) as device_added_updated,\
mock.patch.object(self.agent.int_br, "get_ports_attributes",
return_value=[]),\
mock.patch.object(self.agent,
"treat_devices_removed",
return_value=False) as device_removed:
self.assertFalse(self.agent.process_network_ports(port_info,
False))
return_value=(
failed_devices[
'removed'])) as device_removed:
self.assertEqual(
failed_devices,
self.agent.process_network_ports(port_info, False))
setup_port_filters.assert_called_once_with(
port_info.get('added', set()),
port_info.get('updated', set()))
@ -801,14 +855,18 @@ class TestOvsNeutronAgent(object):
'updated': set(['tap1']),
'removed': set([]),
'added': set(['eth1'])}
failed_dev = {'added': set(), 'removed': set()}
with mock.patch.object(self.agent.sg_agent,
"setup_port_filters") as setup_port_filters,\
mock.patch.object(
self.agent,
"treat_devices_added_or_updated",
return_value=([], [], ['eth1'])) as device_added_updated:
self.assertFalse(self.agent.process_network_ports(port_info,
False))
return_value=(
[], [], ['eth1'],
failed_dev['added'])) as device_added_updated:
self.assertEqual(
failed_dev,
self.agent.process_network_ports(port_info, False))
device_added_updated.assert_called_once_with(
set(['eth1', 'tap1']), False)
setup_port_filters.assert_called_once_with(
@ -1488,8 +1546,11 @@ class TestOvsNeutronAgent(object):
devices_not_ready),
(reply3, reply_ancillary,
devices_not_ready)]
failed_devices = {'added': set(), 'removed': set()}
failed_ancillary_devices = {'added': set(), 'removed': set()}
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
failed_devices,
Exception('Fake exception to get out of the loop')]
check_ovs_status.side_effect = args
try:
self.agent.daemon_loop()
@ -1498,9 +1559,11 @@ class TestOvsNeutronAgent(object):
process_p_events.assert_has_calls([
mock.call({'removed': [], 'added': []}, set(), set(), set(),
failed_devices, failed_ancillary_devices,
set()),
mock.call({'removed': [], 'added': []}, set(['tap0']), set(),
set(), set())
set(), failed_devices, failed_ancillary_devices,
set())
])
process_network_ports.assert_has_calls([
@ -1540,7 +1603,7 @@ class TestOvsNeutronAgent(object):
mock.patch.object(
self.mod_agent.OVSNeutronAgent,
'_check_and_handle_signal') as check_and_handle_signal:
process_network_ports.return_value = True
process_network_ports.side_effect = Exception("Trigger resync")
check_ovs_status.return_value = constants.OVS_NORMAL
check_and_handle_signal.side_effect = [True, False]
self.agent.daemon_loop()
@ -2321,7 +2384,9 @@ class TestOvsDvrNeutronAgent(object):
mock.patch.object(self.agent.dvr_agent, 'tun_br', new=tun_br),\
mock.patch.dict(self.agent.dvr_agent.phys_brs,
{self._physical_network: phys_br}):
self.agent.treat_devices_removed([self._port.vif_id])
failed_devices = {'added': set(), 'removed': set()}
failed_devices['removed'] = self.agent.treat_devices_removed(
[self._port.vif_id])
lvid = self.agent.local_vlan_map[self._net_uuid].vlan
if ip_version == 4:
expected = [
@ -2435,7 +2500,9 @@ class TestOvsDvrNeutronAgent(object):
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
mock.patch.object(self.agent.dvr_agent, 'tun_br', new=tun_br):
self.agent.treat_devices_removed([self._compute_port.vif_id])
failed_devices = {'added': set(), 'removed': set()}
failed_devices['removed'] = self.agent.treat_devices_removed(
[self._compute_port.vif_id])
int_br.assert_has_calls([
mock.call.delete_dvr_to_src_mac(
network_type='vxlan',
@ -2531,7 +2598,9 @@ class TestOvsDvrNeutronAgent(object):
mock.patch.object(self.agent, 'tun_br', new=tun_br),\
mock.patch.object(self.agent.dvr_agent, 'int_br', new=int_br),\
mock.patch.object(self.agent.dvr_agent, 'tun_br', new=tun_br):
self.agent.treat_devices_removed([self._port.vif_id])
failed_devices = {'added': set(), 'removed': set()}
failed_devices['removed'] = self.agent.treat_devices_removed(
[self._port.vif_id])
expected_on_int_br = [
mock.call.delete_dvr_to_src_mac(
network_type='vxlan',

View File

@ -509,11 +509,15 @@ class TunnelTest(object):
self._verify_mock_calls()
def test_daemon_loop(self):
reply_ge_1 = {'added': set(['tap0']),
'removed': set([])}
reply_ge_1 = {'added': [{'name': 'tap0', 'ofport': 3,
'external_ids': {
'attached-mac': 'test_mac'}}],
'removed': []}
reply_ge_2 = {'added': set([]),
'removed': set(['tap0'])}
reply_ge_2 = {'added': [],
'removed': [{'name': 'tap0', 'ofport': 3,
'external_ids': {
'attached-mac': 'test_mac'}}]}
reply_pe_1 = {'current': set(['tap0']),
'added': set(['tap0']),
@ -559,8 +563,11 @@ class TunnelTest(object):
(reply_pe_2, reply_ancillary, devices_not_ready)]
interface_polling = mock.Mock()
interface_polling.get_events.side_effect = [reply_ge_1, reply_ge_2]
failed_devices = {'removed': set([]), 'added': set([])}
failed_ancillary_devices = {'removed': set([]), 'added': set([])}
process_network_ports.side_effect = [
False, Exception('Fake exception to get out of the loop')]
failed_devices,
Exception('Fake exception to get out of the loop')]
n_agent = self._build_agent()
@ -577,8 +584,10 @@ class TunnelTest(object):
log_exception.assert_called_once_with(
"Error while processing VIF ports")
process_p_events.assert_has_calls([
mock.call(reply_ge_1, set(), set(), devices_not_ready, set()),
mock.call(reply_ge_1, set(), set(), devices_not_ready,
failed_devices, failed_ancillary_devices, set()),
mock.call(reply_ge_2, set(['tap0']), set(), devices_not_ready,
failed_devices, failed_ancillary_devices,
set())
])
process_network_ports.assert_has_calls([