Ignore first local port update notification

Ovs-agent will scan and process the ports during the
first rpc_loop, and a local port update notification
will be sent out. This will cause these ports to
be processed again in the ovs-agent next (second)
rpc_loop.
This patch passes the restart flag (iteration num 0)
to the local port_update call trace. After this patch,
the local port_update notification will be ignored in
the first RPC loop.

Related-Bug: #1813703
Change-Id: Ic5bf718cfd056f805741892a91a8d45f7a6e0db3
(cherry picked from commit eaf3ff5786)
This commit is contained in:
LIU Yulong 2019-03-19 21:21:45 +08:00 committed by Swaminathan Vasudevan
parent aa60d6e837
commit e1b84c9a70
4 changed files with 39 additions and 14 deletions

View File

@ -49,7 +49,7 @@ class RemoteResourceCache(object):
def start_watcher(self): def start_watcher(self):
self._watcher = RemoteResourceWatcher(self) self._watcher = RemoteResourceWatcher(self)
def get_resource_by_id(self, rtype, obj_id): def get_resource_by_id(self, rtype, obj_id, agent_restarted=False):
"""Returns None if it doesn't exist.""" """Returns None if it doesn't exist."""
if obj_id in self._deleted_ids_by_type[rtype]: if obj_id in self._deleted_ids_by_type[rtype]:
return None return None
@ -57,10 +57,12 @@ class RemoteResourceCache(object):
if cached_item: if cached_item:
return cached_item return cached_item
# try server in case object existed before agent start # try server in case object existed before agent start
self._flood_cache_for_query(rtype, id=(obj_id, )) self._flood_cache_for_query(rtype, id=(obj_id, ),
agent_restarted=agent_restarted)
return self._type_cache(rtype).get(obj_id) return self._type_cache(rtype).get(obj_id)
def _flood_cache_for_query(self, rtype, **filter_kwargs): def _flood_cache_for_query(self, rtype, agent_restarted=False,
**filter_kwargs):
"""Load info from server for first query. """Load info from server for first query.
Queries the server if this is the first time a given query for Queries the server if this is the first time a given query for
@ -81,7 +83,8 @@ class RemoteResourceCache(object):
# been updated already and pushed to us in another thread. # been updated already and pushed to us in another thread.
LOG.debug("Ignoring stale update for %s: %s", rtype, resource) LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
continue continue
self.record_resource_update(context, rtype, resource) self.record_resource_update(context, rtype, resource,
agent_restarted=agent_restarted)
LOG.debug("%s resources returned for queries %s", len(resources), LOG.debug("%s resources returned for queries %s", len(resources),
query_ids) query_ids)
self._satisfied_server_queries.update(query_ids) self._satisfied_server_queries.update(query_ids)
@ -159,7 +162,8 @@ class RemoteResourceCache(object):
return True return True
return False return False
def record_resource_update(self, context, rtype, resource): def record_resource_update(self, context, rtype, resource,
agent_restarted=False):
"""Takes in an OVO and generates an event on relevant changes. """Takes in an OVO and generates an event on relevant changes.
A change is deemed to be relevant if it is not stale and if any A change is deemed to be relevant if it is not stale and if any
@ -190,7 +194,8 @@ class RemoteResourceCache(object):
registry.notify(rtype, events.AFTER_UPDATE, self, registry.notify(rtype, events.AFTER_UPDATE, self,
context=context, changed_fields=changed_fields, context=context, changed_fields=changed_fields,
existing=existing, updated=resource, existing=existing, updated=resource,
resource_id=resource.id) resource_id=resource.id,
agent_restarted=agent_restarted)
def record_resource_delete(self, context, rtype, resource_id): def record_resource_delete(self, context, rtype, resource_id):
# deletions are final, record them so we never # deletions are final, record them so we never

View File

@ -129,7 +129,8 @@ class PluginApi(object):
devices=devices, agent_id=agent_id, host=host) devices=devices, agent_id=agent_id, host=host)
def get_devices_details_list_and_failed_devices(self, context, devices, def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None): agent_id, host=None,
**kwargs):
"""Get devices details and the list of devices that failed. """Get devices details and the list of devices that failed.
This method returns the devices details. If an error is thrown when This method returns the devices details. If an error is thrown when
@ -228,6 +229,7 @@ class CacheBackedPluginApi(PluginApi):
the payloads the handlers are expecting (an ID). the payloads the handlers are expecting (an ID).
""" """
rtype = rtype.lower() # all legacy handlers don't camelcase rtype = rtype.lower() # all legacy handlers don't camelcase
agent_restarted = kwargs.pop("agent_restarted", None)
method, host_with_activation, host_with_deactivation = ( method, host_with_activation, host_with_deactivation = (
self._get_method_host(rtype, event, **kwargs)) self._get_method_host(rtype, event, **kwargs))
if not hasattr(self._legacy_interface, method): if not hasattr(self._legacy_interface, method):
@ -244,6 +246,9 @@ class CacheBackedPluginApi(PluginApi):
else: else:
payload = {rtype: {'id': resource_id}, payload = {rtype: {'id': resource_id},
'%s_id' % rtype: resource_id} '%s_id' % rtype: resource_id}
if method == "port_update" and agent_restarted is not None:
# Mark ovs-agent restart for local port_update
payload["agent_restarted"] = agent_restarted
getattr(self._legacy_interface, method)(context, **payload) getattr(self._legacy_interface, method)(context, **payload)
def _get_method_host(self, rtype, event, **kwargs): def _get_method_host(self, rtype, event, **kwargs):
@ -288,20 +293,23 @@ class CacheBackedPluginApi(PluginApi):
return method, host_with_activation, host_with_deactivation return method, host_with_activation, host_with_deactivation
def get_devices_details_list_and_failed_devices(self, context, devices, def get_devices_details_list_and_failed_devices(self, context, devices,
agent_id, host=None): agent_id, host=None,
agent_restarted=False):
result = {'devices': [], 'failed_devices': []} result = {'devices': [], 'failed_devices': []}
for device in devices: for device in devices:
try: try:
result['devices'].append( result['devices'].append(
self.get_device_details(context, device, agent_id, host)) self.get_device_details(context, device, agent_id, host,
agent_restarted))
except Exception: except Exception:
LOG.exception("Failed to get details for device %s", device) LOG.exception("Failed to get details for device %s", device)
result['failed_devices'].append(device) result['failed_devices'].append(device)
return result return result
def get_device_details(self, context, device, agent_id, host=None): def get_device_details(self, context, device, agent_id, host=None,
agent_restarted=False):
port_obj = self.remote_resource_cache.get_resource_by_id( port_obj = self.remote_resource_cache.get_resource_by_id(
resources.PORT, device) resources.PORT, device, agent_restarted)
if not port_obj: if not port_obj:
LOG.debug("Device %s does not exist in cache.", device) LOG.debug("Device %s does not exist in cache.", device)
return {'device': device} return {'device': device}

View File

@ -438,11 +438,19 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
def port_update(self, context, **kwargs): def port_update(self, context, **kwargs):
port = kwargs.get('port') port = kwargs.get('port')
agent_restarted = kwargs.pop("agent_restarted", False)
# Put the port identifier in the updated_ports set. # Put the port identifier in the updated_ports set.
# Even if full port details might be provided to this call, # Even if full port details might be provided to this call,
# they are not used since there is no guarantee the notifications # they are not used since there is no guarantee the notifications
# are processed in the same order as the relevant API requests # are processed in the same order as the relevant API requests
self.updated_ports.add(port['id']) if not agent_restarted:
# When ovs-agent is just restarted, the first RPC loop will
# process all the port as 'added'. And all of these ports will
# send a port_update notification after that processing. This
# will cause all these ports to be processed again in next RPC
# loop as 'updated'. So here we just ignore such local update
# notification.
self.updated_ports.add(port['id'])
def port_delete(self, context, **kwargs): def port_delete(self, context, **kwargs):
port_id = kwargs.get('port_id') port_id = kwargs.get('port_id')
@ -1633,12 +1641,14 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
skipped_devices = [] skipped_devices = []
need_binding_devices = [] need_binding_devices = []
binding_no_activated_devices = set() binding_no_activated_devices = set()
agent_restarted = self.iter_num == 0
devices_details_list = ( devices_details_list = (
self.plugin_rpc.get_devices_details_list_and_failed_devices( self.plugin_rpc.get_devices_details_list_and_failed_devices(
self.context, self.context,
devices, devices,
self.agent_id, self.agent_id,
self.conf.host)) self.conf.host,
agent_restarted))
failed_devices = set(devices_details_list.get('failed_devices')) failed_devices = set(devices_details_list.get('failed_devices'))
devices = devices_details_list.get('devices') devices = devices_details_list.get('devices')

View File

@ -287,7 +287,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
'failed_devices_down': []} 'failed_devices_down': []}
def setup_agent_rpc_mocks(self, agent, unplug_ports): def setup_agent_rpc_mocks(self, agent, unplug_ports):
def mock_device_details(context, devices, agent_id, host=None): def mock_device_details(context, devices, agent_id, host=None,
agent_restarted=False):
details = [] details = []
for port in self.ports: for port in self.ports:
if port['id'] in devices: if port['id'] in devices:
@ -379,6 +380,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
self.ports = port_dicts self.ports = port_dicts
self.agent = self.create_agent(create_tunnels=create_tunnels, self.agent = self.create_agent(create_tunnels=create_tunnels,
ancillary_bridge=ancillary_bridge) ancillary_bridge=ancillary_bridge)
self.agent.iter_num += 1
self.polling_manager = self.start_agent(self.agent, ports=self.ports) self.polling_manager = self.start_agent(self.agent, ports=self.ports)
self.network = network or self._create_test_network_dict() self.network = network or self._create_test_network_dict()
if trigger_resync: if trigger_resync: