From eaf3ff57863a7af2a33ab189910666f6c3450019 Mon Sep 17 00:00:00 2001 From: LIU Yulong Date: Tue, 19 Mar 2019 21:21:45 +0800 Subject: [PATCH] Ignore first local port update notification Ovs-agent will scan and process the ports during the first rpc_loop, and a local port update notification will be sent out. This will cause these ports to be processed again in the ovs-agent next (second) rpc_loop. This patch passes the restart flag (iteration num 0) to the local port_update call trace. After this patch, the local port_update notification will be ignored in the first RPC loop. Related-Bug: #1813703 Change-Id: Ic5bf718cfd056f805741892a91a8d45f7a6e0db3 --- neutron/agent/resource_cache.py | 17 +++++++++++------ neutron/agent/rpc.py | 18 +++++++++++++----- .../openvswitch/agent/ovs_neutron_agent.py | 14 ++++++++++++-- neutron/tests/functional/agent/l2/base.py | 4 +++- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/neutron/agent/resource_cache.py b/neutron/agent/resource_cache.py index 439b1fd2fdb..5f37dcead7f 100644 --- a/neutron/agent/resource_cache.py +++ b/neutron/agent/resource_cache.py @@ -49,7 +49,7 @@ class RemoteResourceCache(object): def start_watcher(self): self._watcher = RemoteResourceWatcher(self) - def get_resource_by_id(self, rtype, obj_id): + def get_resource_by_id(self, rtype, obj_id, agent_restarted=False): """Returns None if it doesn't exist.""" if obj_id in self._deleted_ids_by_type[rtype]: return None @@ -57,10 +57,12 @@ class RemoteResourceCache(object): if cached_item: return cached_item # try server in case object existed before agent start - self._flood_cache_for_query(rtype, id=(obj_id, )) + self._flood_cache_for_query(rtype, id=(obj_id, ), + agent_restarted=agent_restarted) return self._type_cache(rtype).get(obj_id) - def _flood_cache_for_query(self, rtype, **filter_kwargs): + def _flood_cache_for_query(self, rtype, agent_restarted=False, + **filter_kwargs): """Load info from server for first query. Queries the server if this is the first time a given query for @@ -81,7 +83,8 @@ class RemoteResourceCache(object): # been updated already and pushed to us in another thread. LOG.debug("Ignoring stale update for %s: %s", rtype, resource) continue - self.record_resource_update(context, rtype, resource) + self.record_resource_update(context, rtype, resource, + agent_restarted=agent_restarted) LOG.debug("%s resources returned for queries %s", len(resources), query_ids) self._satisfied_server_queries.update(query_ids) @@ -159,7 +162,8 @@ class RemoteResourceCache(object): return True return False - def record_resource_update(self, context, rtype, resource): + def record_resource_update(self, context, rtype, resource, + agent_restarted=False): """Takes in an OVO and generates an event on relevant changes. A change is deemed to be relevant if it is not stale and if any @@ -190,7 +194,8 @@ class RemoteResourceCache(object): registry.notify(rtype, events.AFTER_UPDATE, self, context=context, changed_fields=changed_fields, existing=existing, updated=resource, - resource_id=resource.id) + resource_id=resource.id, + agent_restarted=agent_restarted) def record_resource_delete(self, context, rtype, resource_id): # deletions are final, record them so we never diff --git a/neutron/agent/rpc.py b/neutron/agent/rpc.py index 8c0f9bda3dc..08175a7668f 100644 --- a/neutron/agent/rpc.py +++ b/neutron/agent/rpc.py @@ -131,7 +131,8 @@ class PluginApi(object): devices=devices, agent_id=agent_id, host=host) def get_devices_details_list_and_failed_devices(self, context, devices, - agent_id, host=None): + agent_id, host=None, + **kwargs): """Get devices details and the list of devices that failed. This method returns the devices details. If an error is thrown when @@ -240,6 +241,7 @@ class CacheBackedPluginApi(PluginApi): the payloads the handlers are expecting (an ID). """ rtype = rtype.lower() # all legacy handlers don't camelcase + agent_restarted = kwargs.pop("agent_restarted", None) method, host_with_activation, host_with_deactivation = ( self._get_method_host(rtype, event, **kwargs)) if not hasattr(self._legacy_interface, method): @@ -256,6 +258,9 @@ class CacheBackedPluginApi(PluginApi): else: payload = {rtype: {'id': resource_id}, '%s_id' % rtype: resource_id} + if method == "port_update" and agent_restarted is not None: + # Mark ovs-agent restart for local port_update + payload["agent_restarted"] = agent_restarted getattr(self._legacy_interface, method)(context, **payload) def _get_method_host(self, rtype, event, **kwargs): @@ -300,20 +305,23 @@ class CacheBackedPluginApi(PluginApi): return method, host_with_activation, host_with_deactivation def get_devices_details_list_and_failed_devices(self, context, devices, - agent_id, host=None): + agent_id, host=None, + agent_restarted=False): result = {'devices': [], 'failed_devices': []} for device in devices: try: result['devices'].append( - self.get_device_details(context, device, agent_id, host)) + self.get_device_details(context, device, agent_id, host, + agent_restarted)) except Exception: LOG.exception("Failed to get details for device %s", device) result['failed_devices'].append(device) return result - def get_device_details(self, context, device, agent_id, host=None): + def get_device_details(self, context, device, agent_id, host=None, + agent_restarted=False): port_obj = self.remote_resource_cache.get_resource_by_id( - resources.PORT, device) + resources.PORT, device, agent_restarted) if not port_obj: LOG.debug("Device %s does not exist in cache.", device) return {'device': device} diff --git a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py index 74da51e245c..01cfcb0a08b 100644 --- a/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/ml2/drivers/openvswitch/agent/ovs_neutron_agent.py @@ -481,11 +481,19 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, def port_update(self, context, **kwargs): port = kwargs.get('port') + agent_restarted = kwargs.pop("agent_restarted", False) # Put the port identifier in the updated_ports set. # Even if full port details might be provided to this call, # they are not used since there is no guarantee the notifications # are processed in the same order as the relevant API requests - self.updated_ports.add(port['id']) + if not agent_restarted: + # When ovs-agent is just restarted, the first RPC loop will + # process all the port as 'added'. And all of these ports will + # send a port_update notification after that processing. This + # will cause all these ports to be processed again in next RPC + # loop as 'updated'. So here we just ignore such local update + # notification. + self.updated_ports.add(port['id']) if not self.conf.AGENT.baremetal_smartnic: return @@ -1775,12 +1783,14 @@ class OVSNeutronAgent(l2population_rpc.L2populationRpcCallBackTunnelMixin, skipped_devices = [] need_binding_devices = [] binding_no_activated_devices = set() + agent_restarted = self.iter_num == 0 devices_details_list = ( self.plugin_rpc.get_devices_details_list_and_failed_devices( self.context, devices, self.agent_id, - self.conf.host)) + self.conf.host, + agent_restarted)) failed_devices = set(devices_details_list.get('failed_devices')) devices = devices_details_list.get('devices') diff --git a/neutron/tests/functional/agent/l2/base.py b/neutron/tests/functional/agent/l2/base.py index 34b67d9acb6..cb2ddbb7609 100644 --- a/neutron/tests/functional/agent/l2/base.py +++ b/neutron/tests/functional/agent/l2/base.py @@ -347,7 +347,8 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper): 'failed_devices_down': []} def setup_agent_rpc_mocks(self, agent, unplug_ports): - def mock_device_details(context, devices, agent_id, host=None): + def mock_device_details(context, devices, agent_id, host=None, + agent_restarted=False): details = [] for port in self.ports: if port['id'] in devices: @@ -439,6 +440,7 @@ class OVSAgentTestFramework(base.BaseOVSLinuxTestCase, OVSOFControllerHelper): self.ports = port_dicts self.agent = self.create_agent(create_tunnels=create_tunnels, ancillary_bridge=ancillary_bridge) + self.agent.iter_num += 1 self.polling_manager = self.start_agent(self.agent, ports=self.ports) self.network = network or self._create_test_network_dict() if trigger_resync: