Browse Source

Lock sg updates while in _apply_port_filter()

Add a locking mechanism so that security_groups_member_updated event
processing is blocked if another thread is in the process of applying
port filters.  This avoids a race condition where security group
membership updates are incorrectly discarded when they arrive between
retrieving port details and registering the port with the firewall
driver.

See https://bugs.launchpad.net/neutron/+bug/1887405 for complete
details.

Bug: 1887405
Change-Id: I6e4abe13a7541c21399466c5eb0d61ff5780c887
(cherry picked from commit 5cc8f16a58)
changes/93/779793/1
Charles Farquhar 1 year ago
committed by Hemanth N
parent
commit
36ac9bd4c6
  1. 27
      neutron/agent/securitygroups_rpc.py

27
neutron/agent/securitygroups_rpc.py

@ -18,6 +18,7 @@ import functools
from neutron_lib.api.definitions import rbac_security_groups as rbac_sg_apidef
from neutron_lib.api.definitions import stateful_security_group as stateful_sg
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
@ -62,6 +63,9 @@ class SecurityGroupAgentRpc(object):
self.context = context
self.plugin_rpc = plugin_rpc
self.init_firewall(defer_refresh_firewall, integration_bridge)
# _latest_port_filter_lock will point to the lock created for the
# most recent thread to enter _apply_port_filters().
self._latest_port_filter_lock = lockutils.ReaderWriterLock()
def _get_trusted_devices(self, device_ids, devices):
trusted_devices = []
@ -77,6 +81,27 @@ class SecurityGroupAgentRpc(object):
trusted_devices.append(device_id)
return trusted_devices
def _port_filter_lock(func):
"""Decorator to acquire a new lock while applying port filters"""
@functools.wraps(func)
def decorated_function(self, *args, **kwargs):
lock = lockutils.ReaderWriterLock()
# Tracking the most recent lock at the instance level allows
# waiters to only wait for the most recent lock to be released
# instead of waiting until all locks have been released.
self._latest_port_filter_lock = lock
with lock.write_lock():
return func(self, *args, **kwargs)
return decorated_function
def _port_filter_wait(func):
"""Decorator to wait for the latest port filter lock to be released"""
@functools.wraps(func)
def decorated_function(self, *args, **kwargs):
with self._latest_port_filter_lock.read_lock():
return func(self, *args, **kwargs)
return decorated_function
def init_firewall(self, defer_refresh_firewall=False,
integration_bridge=None):
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver or 'noop'
@ -138,6 +163,7 @@ class SecurityGroupAgentRpc(object):
LOG.info("Preparing filters for devices %s", device_ids)
self._apply_port_filter(device_ids)
@_port_filter_lock
def _apply_port_filter(self, device_ids, update_filter=False):
step = common_constants.AGENT_RES_PROCESSING_STEP
if self.use_enhanced_rpc:
@ -195,6 +221,7 @@ class SecurityGroupAgentRpc(object):
'security_group_source_groups',
'sg_member')
@_port_filter_wait
def _security_group_updated(self, security_groups, attribute, action_type):
devices = []
sec_grp_set = set(security_groups)

Loading…
Cancel
Save