Ignore first local port update notification

Ovs-agent will scan and process the ports during the
first rpc_loop, and a local port update notification
will be sent out. This will cause these ports to
be processed again in the ovs-agent next (second)
rpc_loop.
This patch passes the restart flag (iteration num 0)
to the local port_update call trace. After this patch,
the local port_update notification will be ignored in
the first RPC loop.

Related-Bug: #1813703
Change-Id: Ic5bf718cfd056f805741892a91a8d45f7a6e0db3
This commit is contained in:
LIU Yulong 2019-03-19 21:21:45 +08:00
parent d9e61138ff
commit eaf3ff5786
4 changed files with 39 additions and 14 deletions

View File

@ -49,7 +49,7 @@ class RemoteResourceCache(object):
def start_watcher(self):
self._watcher = RemoteResourceWatcher(self)
def get_resource_by_id(self, rtype, obj_id):
def get_resource_by_id(self, rtype, obj_id, agent_restarted=False):
"""Returns None if it doesn't exist."""
if obj_id in self._deleted_ids_by_type[rtype]:
return None
@ -57,10 +57,12 @@ class RemoteResourceCache(object):
if cached_item:
return cached_item
# try server in case object existed before agent start
self._flood_cache_for_query(rtype, id=(obj_id, ))
self._flood_cache_for_query(rtype, id=(obj_id, ),
agent_restarted=agent_restarted)
return self._type_cache(rtype).get(obj_id)
def _flood_cache_for_query(self, rtype, **filter_kwargs):
def _flood_cache_for_query(self, rtype, agent_restarted=False,
**filter_kwargs):
"""Load info from server for first query.
Queries the server if this is the first time a given query for
@ -81,7 +83,8 @@ class RemoteResourceCache(object):
# been updated already and pushed to us in another thread.
LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
continue
self.record_resource_update(context, rtype, resource)
self.record_resource_update(context, rtype, resource,
agent_restarted=agent_restarted)
LOG.debug("%s resources returned for queries %s", len(resources),
query_ids)
self._satisfied_server_queries.update(query_ids)
@ -159,7 +162,8 @@ class RemoteResourceCache(object):
return True
return False
def record_resource_update(self, context, rtype, resource):
def record_resource_update(self, context, rtype, resource,
agent_restarted=False):
"""Takes in an OVO and generates an event on relevant changes.
A change is deemed to be relevant if it is not stale and if any
@ -190,7 +194,8 @@ class RemoteResourceCache(object):
registry.notify(rtype, events.AFTER_UPDATE, self,
context=context, changed_fields=changed_fields,
existing=existing, updated=resource,
resource_id=resource.id)
resource_id=resource.id,
agent_restarted=agent_restarted)
def record_resource_delete(self, context, rtype, resource_id):
# deletions are final, record them so we never

View File

@ -131,7 +131,8 @@ class PluginApi(object):
devices=devices, agent_id=agent_id, host=host)
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
agent_id, host=None,
**kwargs):
"""Get devices details and the list of devices that failed.
This method returns the devices details. If an error is thrown when
@ -240,6 +241,7 @@ class CacheBackedPluginApi(PluginApi):
the payloads the handlers are expecting (an ID).
"""
rtype = rtype.lower() # all legacy handlers don't camelcase
agent_restarted = kwargs.pop("agent_restarted", None)
method, host_with_activation, host_with_deactivation = (
self._get_method_host(rtype, event, **kwargs))
if not hasattr(self._legacy_interface, method):
@ -256,6 +258,9 @@ class CacheBackedPluginApi(PluginApi):
else:
payload = {rtype: {'id': resource_id},
'%s_id' % rtype: resource_id}
if method == "port_update" and agent_restarted is not None:
# Mark ovs-agent restart for local port_update
payload["agent_restarted"] = agent_restarted
getattr(self._legacy_interface, method)(context, **payload)
def _get_method_host(self, rtype, event, **kwargs):
@ -300,20 +305,23 @@ class CacheBackedPluginApi(PluginApi):
return method, host_with_activation, host_with_deactivation
def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None):
agent_id, host=None,
agent_restarted=False):
result = {'devices': [], 'failed_devices': []}
for device in devices:
try:
result['devices'].append(
self.get_device_details(context, device, agent_id, host))
self.get_device_details(context, device, agent_id, host,
agent_restarted))
except Exception:
LOG.exception("Failed to get details for device %s", device)
result['failed_devices'].append(device)
return result
def get_device_details(self, context, device, agent_id, host=None):
def get_device_details(self, context, device, agent_id, host=None,
agent_restarted=False):
port_obj = self.remote_resource_cache.get_resource_by_id(
resources.PORT, device)
resources.PORT, device, agent_restarted)
if not port_obj:
LOG.debug("Device %s does not exist in cache.", device)
return {'device': device}

View File

@ -481,10 +481,18 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
def port_update(self, context, **kwargs):
port = kwargs.get('port')
agent_restarted = kwargs.pop("agent_restarted", False)
# Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests
if not agent_restarted:
# When ovs-agent is just restarted, the first RPC loop will
# process all the port as 'added'. And all of these ports will
# send a port_update notification after that processing. This
# will cause all these ports to be processed again in next RPC
# loop as 'updated'. So here we just ignore such local update
# notification.
self.updated_ports.add(port['id'])
if not self.conf.AGENT.baremetal_smartnic:
@ -1775,12 +1783,14 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
skipped_devices = []
need_binding_devices = []
binding_no_activated_devices = set()
agent_restarted = self.iter_num == 0
devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context,
devices,
self.agent_id,
self.conf.host))
self.conf.host,
agent_restarted))
failed_devices = set(devices_details_list.get('failed_devices'))
devices = devices_details_list.get('devices')

View File

@ -347,7 +347,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper):
'failed_devices_down': []}
def setup_agent_rpc_mocks(self, agent, unplug_ports):
def mock_device_details(context, devices, agent_id, host=None):
def mock_device_details(context, devices, agent_id, host=None,
agent_restarted=False):
details = []
for port in self.ports:
if port['id'] in devices:
@ -439,6 +440,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper):
self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels,
ancillary_bridge=ancillary_bridge)
self.agent.iter_num += 1
self.polling_manager = self.start_agent(self.agent, ports=self.ports)
self.network = network or self._create_test_network_dict()
if trigger_resync: