From 7f3aadda217c7ea4a0356d380e4c42c1e22d44f7 Mon Sep 17 00:00:00 2001 From: Charles Farquhar Date: Tue, 28 Jul 2020 19:39:06 +0000 Subject: [PATCH] Lock sg updates while in _apply_port_filter() Add a locking mechanism so that security_groups_member_updated event processing is blocked if another thread is in the process of applying port filters. This avoids a race condition where security group membership updates are incorrectly discarded when they arrive between retrieving port details and registering the port with the firewall driver. See https://bugs.launchpad.net/neutron/+bug/1887405 for complete details. Bug: 1887405 Change-Id: I6e4abe13a7541c21399466c5eb0d61ff5780c887 (cherry picked from commit 5cc8f16a588d7e40c3e696e6739d8ada907f0116) --- neutron/agent/securitygroups_rpc.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/neutron/agent/securitygroups_rpc.py b/neutron/agent/securitygroups_rpc.py index 87365265b04..0ab4f44517b 100644 --- a/neutron/agent/securitygroups_rpc.py +++ b/neutron/agent/securitygroups_rpc.py @@ -18,6 +18,7 @@ import functools from neutron_lib.api.definitions import rbac_security_groups as rbac_sg_apidef from neutron_lib.api.definitions import stateful_security_group as stateful_sg +from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging import oslo_messaging @@ -60,6 +61,9 @@ class SecurityGroupAgentRpc(object): self.context = context self.plugin_rpc = plugin_rpc self.init_firewall(defer_refresh_firewall, integration_bridge) + # _latest_port_filter_lock will point to the lock created for the + # most recent thread to enter _apply_port_filters(). + self._latest_port_filter_lock = lockutils.ReaderWriterLock() def _get_trusted_devices(self, device_ids, devices): trusted_devices = [] @@ -75,6 +79,27 @@ class SecurityGroupAgentRpc(object): trusted_devices.append(device_id) return trusted_devices + def _port_filter_lock(func): + """Decorator to acquire a new lock while applying port filters""" + @functools.wraps(func) + def decorated_function(self, *args, **kwargs): + lock = lockutils.ReaderWriterLock() + # Tracking the most recent lock at the instance level allows + # waiters to only wait for the most recent lock to be released + # instead of waiting until all locks have been released. + self._latest_port_filter_lock = lock + with lock.write_lock(): + return func(self, *args, **kwargs) + return decorated_function + + def _port_filter_wait(func): + """Decorator to wait for the latest port filter lock to be released""" + @functools.wraps(func) + def decorated_function(self, *args, **kwargs): + with self._latest_port_filter_lock.read_lock(): + return func(self, *args, **kwargs) + return decorated_function + def init_firewall(self, defer_refresh_firewall=False, integration_bridge=None): firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver or 'noop' @@ -136,6 +161,7 @@ class SecurityGroupAgentRpc(object): LOG.info("Preparing filters for devices %s", device_ids) self._apply_port_filter(device_ids) + @_port_filter_lock def _apply_port_filter(self, device_ids, update_filter=False): step = common_constants.AGENT_RES_PROCESSING_STEP if self.use_enhanced_rpc: @@ -193,6 +219,7 @@ class SecurityGroupAgentRpc(object): 'security_group_source_groups', 'sg_member') + @_port_filter_wait def _security_group_updated(self, security_groups, attribute, action_type): devices = [] sec_grp_set = set(security_groups)