Ignore first local port update notification
Ovs-agent will scan and process the ports during the
first rpc_loop, and a local port update notification
will be sent out. This will cause these ports to
be processed again in the ovs-agent next (second)
rpc_loop.
This patch passes the restart flag (iteration num 0)
to the local port_update call trace. After this patch,
the local port_update notification will be ignored in
the first RPC loop.
Related-Bug: #1813703
Change-Id: Ic5bf718cfd056f805741892a91a8d45f7a6e0db3
(cherry picked from commit eaf3ff5786
)
changes/48/670148/1
parent
55a503b4c9
commit
56c070c5a3
|
@ -48,7 +48,7 @@ class RemoteResourceCache(object):
|
|||
def start_watcher(self):
|
||||
self._watcher = RemoteResourceWatcher(self)
|
||||
|
||||
def get_resource_by_id(self, rtype, obj_id):
|
||||
def get_resource_by_id(self, rtype, obj_id, agent_restarted=False):
|
||||
"""Returns None if it doesn't exist."""
|
||||
if obj_id in self._deleted_ids_by_type[rtype]:
|
||||
return None
|
||||
|
@ -56,10 +56,12 @@ class RemoteResourceCache(object):
|
|||
if cached_item:
|
||||
return cached_item
|
||||
# try server in case object existed before agent start
|
||||
self._flood_cache_for_query(rtype, id=(obj_id, ))
|
||||
self._flood_cache_for_query(rtype, id=(obj_id, ),
|
||||
agent_restarted=agent_restarted)
|
||||
return self._type_cache(rtype).get(obj_id)
|
||||
|
||||
def _flood_cache_for_query(self, rtype, **filter_kwargs):
|
||||
def _flood_cache_for_query(self, rtype, agent_restarted=False,
|
||||
**filter_kwargs):
|
||||
"""Load info from server for first query.
|
||||
|
||||
Queries the server if this is the first time a given query for
|
||||
|
@ -80,7 +82,8 @@ class RemoteResourceCache(object):
|
|||
# been updated already and pushed to us in another thread.
|
||||
LOG.debug("Ignoring stale update for %s: %s", rtype, resource)
|
||||
continue
|
||||
self.record_resource_update(context, rtype, resource)
|
||||
self.record_resource_update(context, rtype, resource,
|
||||
agent_restarted=agent_restarted)
|
||||
LOG.debug("%s resources returned for queries %s", len(resources),
|
||||
query_ids)
|
||||
self._satisfied_server_queries.update(query_ids)
|
||||
|
@ -158,7 +161,8 @@ class RemoteResourceCache(object):
|
|||
return True
|
||||
return False
|
||||
|
||||
def record_resource_update(self, context, rtype, resource):
|
||||
def record_resource_update(self, context, rtype, resource,
|
||||
agent_restarted=False):
|
||||
"""Takes in an OVO and generates an event on relevant changes.
|
||||
|
||||
A change is deemed to be relevant if it is not stale and if any
|
||||
|
@ -189,7 +193,8 @@ class RemoteResourceCache(object):
|
|||
registry.notify(rtype, events.AFTER_UPDATE, self,
|
||||
context=context, changed_fields=changed_fields,
|
||||
existing=existing, updated=resource,
|
||||
resource_id=resource.id)
|
||||
resource_id=resource.id,
|
||||
agent_restarted=agent_restarted)
|
||||
|
||||
def record_resource_delete(self, context, rtype, resource_id):
|
||||
# deletions are final, record them so we never
|
||||
|
|
|
@ -125,7 +125,8 @@ class PluginApi(object):
|
|||
devices=devices, agent_id=agent_id, host=host)
|
||||
|
||||
def get_devices_details_list_and_failed_devices(self, context, devices,
|
||||
agent_id, host=None):
|
||||
agent_id, host=None,
|
||||
**kwargs):
|
||||
"""Get devices details and the list of devices that failed.
|
||||
|
||||
This method returns the devices details. If an error is thrown when
|
||||
|
@ -224,6 +225,7 @@ class CacheBackedPluginApi(PluginApi):
|
|||
the payloads the handlers are expecting (an ID).
|
||||
"""
|
||||
rtype = rtype.lower() # all legacy handlers don't camelcase
|
||||
agent_restarted = kwargs.pop("agent_restarted", None)
|
||||
method, host_with_activation, host_with_deactivation = (
|
||||
self._get_method_host(rtype, event, **kwargs))
|
||||
if not hasattr(self._legacy_interface, method):
|
||||
|
@ -240,6 +242,9 @@ class CacheBackedPluginApi(PluginApi):
|
|||
else:
|
||||
payload = {rtype: {'id': resource_id},
|
||||
'%s_id' % rtype: resource_id}
|
||||
if method == "port_update" and agent_restarted is not None:
|
||||
# Mark ovs-agent restart for local port_update
|
||||
payload["agent_restarted"] = agent_restarted
|
||||
getattr(self._legacy_interface, method)(context, **payload)
|
||||
|
||||
def _get_method_host(self, rtype, event, **kwargs):
|
||||
|
@ -284,20 +289,23 @@ class CacheBackedPluginApi(PluginApi):
|
|||
return method, host_with_activation, host_with_deactivation
|
||||
|
||||
def get_devices_details_list_and_failed_devices(self, context, devices,
|
||||
agent_id, host=None):
|
||||
agent_id, host=None,
|
||||
agent_restarted=False):
|
||||
result = {'devices': [], 'failed_devices': []}
|
||||
for device in devices:
|
||||
try:
|
||||
result['devices'].append(
|
||||
self.get_device_details(context, device, agent_id, host))
|
||||
self.get_device_details(context, device, agent_id, host,
|
||||
agent_restarted))
|
||||
except Exception:
|
||||
LOG.exception("Failed to get details for device %s", device)
|
||||
result['failed_devices'].append(device)
|
||||
return result
|
||||
|
||||
def get_device_details(self, context, device, agent_id, host=None):
|
||||
def get_device_details(self, context, device, agent_id, host=None,
|
||||
agent_restarted=False):
|
||||
port_obj = self.remote_resource_cache.get_resource_by_id(
|
||||
resources.PORT, device)
|
||||
resources.PORT, device, agent_restarted)
|
||||
if not port_obj:
|
||||
LOG.debug("Device %s does not exist in cache.", device)
|
||||
return {'device': device}
|
||||
|
|
|
@ -402,11 +402,19 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
|
||||
def port_update(self, context, **kwargs):
|
||||
port = kwargs.get('port')
|
||||
agent_restarted = kwargs.pop("agent_restarted", False)
|
||||
# Put the port identifier in the updated_ports set.
|
||||
# Even if full port details might be provided to this call,
|
||||
# they are not used since there is no guarantee the notifications
|
||||
# are processed in the same order as the relevant API requests
|
||||
self.updated_ports.add(port['id'])
|
||||
if not agent_restarted:
|
||||
# When ovs-agent is just restarted, the first RPC loop will
|
||||
# process all the port as 'added'. And all of these ports will
|
||||
# send a port_update notification after that processing. This
|
||||
# will cause all these ports to be processed again in next RPC
|
||||
# loop as 'updated'. So here we just ignore such local update
|
||||
# notification.
|
||||
self.updated_ports.add(port['id'])
|
||||
|
||||
def port_delete(self, context, **kwargs):
|
||||
port_id = kwargs.get('port_id')
|
||||
|
@ -1586,12 +1594,14 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin,
|
|||
skipped_devices = []
|
||||
need_binding_devices = []
|
||||
binding_no_activated_devices = set()
|
||||
agent_restarted = self.iter_num == 0
|
||||
devices_details_list = (
|
||||
self.plugin_rpc.get_devices_details_list_and_failed_devices(
|
||||
self.context,
|
||||
devices,
|
||||
self.agent_id,
|
||||
self.conf.host))
|
||||
self.conf.host,
|
||||
agent_restarted))
|
||||
failed_devices = set(devices_details_list.get('failed_devices'))
|
||||
|
||||
devices = devices_details_list.get('devices')
|
||||
|
|
|
@ -283,7 +283,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||
'failed_devices_down': []}
|
||||
|
||||
def setup_agent_rpc_mocks(self, agent, unplug_ports):
|
||||
def mock_device_details(context, devices, agent_id, host=None):
|
||||
def mock_device_details(context, devices, agent_id, host=None,
|
||||
agent_restarted=False):
|
||||
details = []
|
||||
for port in self.ports:
|
||||
if port['id'] in devices:
|
||||
|
@ -375,6 +376,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase):
|
|||
self.ports = port_dicts
|
||||
self.agent = self.create_agent(create_tunnels=create_tunnels,
|
||||
ancillary_bridge=ancillary_bridge)
|
||||
self.agent.iter_num += 1
|
||||
self.polling_manager = self.start_agent(self.agent, ports=self.ports)
|
||||
self.network = network or self._create_test_network_dict()
|
||||
if trigger_resync:
|
||||
|
|
Loading…
Reference in New Issue