Handle add/remove subports events loss due to agent failures
Upon restart the agent reconciles the logical with the physical state by removing/adding physical subports that are used to be/are current present in the logical view. This patch adds a functional test to demonstrate that there's no need to handle the resync in the trunk driver, since the ovs agent already takes care of it. Change-Id: I164153c79313f2ae7a1fca0414736d5987656185 Partially-implements: blueprint vlan-aware-vms Partial-bug: #1623708
This commit is contained in:
parent
8d6b7f8635
commit
08f2af18f9
|
@ -67,6 +67,10 @@ def is_trunk_bridge(port_name):
|
|||
return port_name.startswith(t_const.TRUNK_BR_PREFIX)
|
||||
|
||||
|
||||
def is_subport(port_name):
|
||||
return port_name.startswith(tman.SubPort.DEV_PREFIX)
|
||||
|
||||
|
||||
def is_trunk_service_port(port_name):
|
||||
"""True if the port is any of the ports used to realize a trunk."""
|
||||
return is_trunk_bridge(port_name) or port_name[:2] in (
|
||||
|
@ -74,9 +78,8 @@ def is_trunk_service_port(port_name):
|
|||
tman.SubPort.DEV_PREFIX)
|
||||
|
||||
|
||||
def bridge_has_instance_port(bridge):
|
||||
"""True if there is an OVS port that doesn't have bridge or patch ports
|
||||
prefix.
|
||||
def bridge_has_port(bridge, is_port_predicate):
|
||||
"""True if there is an OVS port for which is_port_predicate is True.
|
||||
"""
|
||||
try:
|
||||
ifaces = bridge.get_iface_name_list()
|
||||
|
@ -87,8 +90,21 @@ def bridge_has_instance_port(bridge):
|
|||
'err': e})
|
||||
return False
|
||||
|
||||
return any(iface for iface in ifaces
|
||||
if not is_trunk_service_port(iface))
|
||||
return any(iface for iface in ifaces if is_port_predicate(iface))
|
||||
|
||||
|
||||
def bridge_has_instance_port(bridge):
|
||||
"""True if there is an OVS port that doesn't have bridge or patch ports
|
||||
prefix.
|
||||
"""
|
||||
is_instance_port = lambda p: not is_trunk_service_port(p)
|
||||
return bridge_has_port(bridge, is_instance_port)
|
||||
|
||||
|
||||
def bridge_has_service_port(bridge):
|
||||
"""True if there is an OVS port that is used to implement a trunk.
|
||||
"""
|
||||
return bridge_has_port(bridge, is_trunk_service_port)
|
||||
|
||||
|
||||
class OVSDBHandler(object):
|
||||
|
@ -167,10 +183,13 @@ class OVSDBHandler(object):
|
|||
bridge.destroy()
|
||||
return
|
||||
|
||||
# Check if the trunk was provisioned in a previous run. This can happen
|
||||
# at agent startup when existing trunks are notified as added events.
|
||||
rewire = bridge_has_service_port(bridge)
|
||||
# Once we get hold of the trunk parent port, we can provision
|
||||
# the OVS dataplane for the trunk.
|
||||
try:
|
||||
self._wire_trunk(bridge, self._get_parent_port(bridge))
|
||||
self._wire_trunk(bridge, self._get_parent_port(bridge), rewire)
|
||||
except oslo_messaging.MessagingException as e:
|
||||
LOG.error(_LE("Got messaging error while processing trunk bridge "
|
||||
"%(bridge_name)s: %(err)s"),
|
||||
|
@ -217,6 +236,23 @@ class OVSDBHandler(object):
|
|||
bridge_name = utils.gen_trunk_br_name(trunk_id)
|
||||
return ovs_lib.BaseOVS().bridge_exists(bridge_name)
|
||||
|
||||
def get_connected_subports_for_trunk(self, trunk_id):
|
||||
"""Return the list of subports present on the trunk bridge."""
|
||||
bridge = ovs_lib.OVSBridge(utils.gen_trunk_br_name(trunk_id))
|
||||
if not bridge.bridge_exists(bridge.br_name):
|
||||
return []
|
||||
try:
|
||||
ports = bridge.get_ports_attributes(
|
||||
'Interface', columns=['name', 'external_ids'])
|
||||
return [
|
||||
self.trunk_manager.get_port_uuid_from_external_ids(port)
|
||||
for port in ports if is_subport(port['name'])
|
||||
]
|
||||
except (RuntimeError, tman.TrunkManagerError) as e:
|
||||
LOG.error(_LE("Failed to get subports for bridge %(bridge)s: "
|
||||
"%(err)s"), {'bridge': bridge.br_name, 'err': e})
|
||||
return []
|
||||
|
||||
def wire_subports_for_trunk(self, context, trunk_id, subports,
|
||||
trunk_bridge=None, parent_port=None):
|
||||
"""Create OVS ports associated to the logical subports."""
|
||||
|
@ -302,14 +338,19 @@ class OVSDBHandler(object):
|
|||
"Can't find parent port for trunk bridge %s" %
|
||||
trunk_bridge.br_name)
|
||||
|
||||
def _wire_trunk(self, trunk_br, port):
|
||||
def _wire_trunk(self, trunk_br, port, rewire=False):
|
||||
"""Wire trunk bridge with integration bridge.
|
||||
|
||||
The method calls into trunk manager to create patch ports for trunk and
|
||||
patch ports for all subports associated with this trunk.
|
||||
patch ports for all subports associated with this trunk. If rewire is
|
||||
True, a diff is performed between desired state (the one got from the
|
||||
server) and actual state (the patch ports present on the trunk bridge)
|
||||
and subports are wired/unwired accordingly.
|
||||
|
||||
:param trunk_br: OVSBridge object representing the trunk bridge.
|
||||
:param port: Parent port dict.
|
||||
:param rewire: True if local trunk state must be reconciled with
|
||||
server's state.
|
||||
"""
|
||||
ctx = self.context
|
||||
try:
|
||||
|
@ -337,8 +378,19 @@ class OVSDBHandler(object):
|
|||
self.report_trunk_status(ctx, trunk.id, constants.ERROR_STATUS)
|
||||
return
|
||||
|
||||
# We need to remove stale subports
|
||||
if rewire:
|
||||
old_subport_ids = self.get_connected_subports_for_trunk(trunk.id)
|
||||
subports = {p['port_id'] for p in trunk.sub_ports}
|
||||
subports_to_delete = set(old_subport_ids) - subports
|
||||
if subports_to_delete:
|
||||
self.unwire_subports_for_trunk(trunk.id, subports_to_delete)
|
||||
|
||||
# NOTE(status_police): inform the server whether the operation
|
||||
# was a partial or complete success. Do not inline status.
|
||||
# NOTE: in case of rewiring we readd ports that are already present on
|
||||
# the bridge because e.g. the segmentation ID might have changed (e.g.
|
||||
# agent crashed, port was removed and readded with a different seg ID)
|
||||
status = self.wire_subports_for_trunk(
|
||||
ctx, trunk.id, trunk.sub_ports,
|
||||
trunk_bridge=trunk_br, parent_port=port)
|
||||
|
|
|
@ -133,7 +133,8 @@ class TrunkParentPort(object):
|
|||
# control over the wiring logic for trunk ports be required.
|
||||
patch_int_attrs = get_patch_peer_attrs(
|
||||
self.patch_port_trunk_name, self.port_mac, self.port_id)
|
||||
patch_trunk_attrs = get_patch_peer_attrs(self.patch_port_int_name)
|
||||
patch_trunk_attrs = get_patch_peer_attrs(self.patch_port_int_name,
|
||||
self.port_mac, self.port_id)
|
||||
|
||||
with self.ovsdb_transaction() as txn:
|
||||
txn.add(ovsdb.add_port(br_int.br_name,
|
||||
|
|
|
@ -145,6 +145,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||
return {'added': filtered_ports, 'removed': events['removed']}
|
||||
polling_manager.get_events = mock.Mock(side_effect=filter_events)
|
||||
|
||||
def stop_agent(self, agent, rpc_loop_thread):
|
||||
agent.run_daemon_loop = False
|
||||
rpc_loop_thread.wait()
|
||||
|
||||
def start_agent(self, agent, ports=None, unplug_ports=None):
|
||||
if unplug_ports is None:
|
||||
unplug_ports = []
|
||||
|
@ -159,13 +163,10 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||
polling_manager._monitor.is_active)
|
||||
agent.check_ovs_status = mock.Mock(
|
||||
return_value=constants.OVS_NORMAL)
|
||||
t = eventlet.spawn(agent.rpc_loop, polling_manager)
|
||||
self.agent_thread = eventlet.spawn(agent.rpc_loop,
|
||||
polling_manager)
|
||||
|
||||
def stop_agent(agent, rpc_loop_thread):
|
||||
agent.run_daemon_loop = False
|
||||
rpc_loop_thread.wait()
|
||||
|
||||
self.addCleanup(stop_agent, agent, t)
|
||||
self.addCleanup(self.stop_agent, agent, self.agent_thread)
|
||||
return polling_manager
|
||||
|
||||
def _create_test_port_dict(self):
|
||||
|
|
|
@ -145,3 +145,48 @@ class OVSDBHandlerTestCase(base.OVSAgentTestFramework):
|
|||
ports = self.create_test_ports(amount=1)
|
||||
self.trunk_dict['port_id'] = ports[0]['id']
|
||||
self._test_trunk_creation_helper(ports)
|
||||
|
||||
def test_resync(self):
|
||||
ports = self.create_test_ports(amount=3)
|
||||
self.trunk_dict['port_id'] = ports[0]['id']
|
||||
self.trunk_dict['sub_ports'] = [trunk_obj.SubPort(
|
||||
id=uuidutils.generate_uuid(),
|
||||
port_id=ports[i]['id'],
|
||||
mac_address=ports[i]['mac_address'],
|
||||
segmentation_id=i,
|
||||
trunk_id=self.trunk_dict['id'])
|
||||
for i in range(1, 3)]
|
||||
|
||||
self.setup_agent_and_ports(port_dicts=ports)
|
||||
self.wait_until_ports_state(self.ports, up=True)
|
||||
self.agent.fullsync = True
|
||||
self.wait_until_ports_state(self.ports, up=True)
|
||||
|
||||
def test_restart(self):
|
||||
ports = self.create_test_ports(amount=3)
|
||||
self.trunk_dict['port_id'] = ports[0]['id']
|
||||
self.trunk_dict['sub_ports'] = [trunk_obj.SubPort(
|
||||
id=uuidutils.generate_uuid(),
|
||||
port_id=ports[i]['id'],
|
||||
mac_address=ports[i]['mac_address'],
|
||||
segmentation_id=i,
|
||||
trunk_id=self.trunk_dict['id'])
|
||||
for i in range(1, 3)]
|
||||
|
||||
self.setup_agent_and_ports(port_dicts=ports)
|
||||
self.wait_until_ports_state(self.ports, up=True)
|
||||
|
||||
# restart and simulate a subport delete
|
||||
deleted_port = self.ports[2]
|
||||
deleted_sp = trunk_manager.SubPort(
|
||||
self.trunk_dict['id'], deleted_port['id'])
|
||||
self.stop_agent(self.agent, self.agent_thread)
|
||||
self.polling_manager.stop()
|
||||
self.trunk_dict['sub_ports'] = self.trunk_dict['sub_ports'][:1]
|
||||
self.setup_agent_and_ports(port_dicts=ports[:2])
|
||||
# NOTE: the port_dicts passed in setup_agent_and_ports is stored in
|
||||
# self.ports so we are waiting here only for ports[:2]
|
||||
self.wait_until_ports_state(self.ports, up=True)
|
||||
common_utils.wait_until_true(
|
||||
lambda: (deleted_sp.patch_port_trunk_name not in
|
||||
self.trunk_br.get_port_name_list()))
|
||||
|
|
Loading…
Reference in New Issue