Merge "Lock sg updates while in _apply_port_filter()" into stable/victoria

This commit is contained in:
Zuul 2021-03-11 23:21:58 +00:00 committed by Gerrit Code Review
commit b7416f18f7
1 changed files with 27 additions and 0 deletions

View File

@ -18,6 +18,7 @@ import functools
from neutron_lib.api.definitions import rbac_security_groups as rbac_sg_apidef from neutron_lib.api.definitions import rbac_security_groups as rbac_sg_apidef
from neutron_lib.api.definitions import stateful_security_group as stateful_sg from neutron_lib.api.definitions import stateful_security_group as stateful_sg
from oslo_concurrency import lockutils
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import oslo_messaging import oslo_messaging
@ -62,6 +63,9 @@ class SecurityGroupAgentRpc(object):
self.context = context self.context = context
self.plugin_rpc = plugin_rpc self.plugin_rpc = plugin_rpc
self.init_firewall(defer_refresh_firewall, integration_bridge) self.init_firewall(defer_refresh_firewall, integration_bridge)
# _latest_port_filter_lock will point to the lock created for the
# most recent thread to enter _apply_port_filters().
self._latest_port_filter_lock = lockutils.ReaderWriterLock()
def _get_trusted_devices(self, device_ids, devices): def _get_trusted_devices(self, device_ids, devices):
trusted_devices = [] trusted_devices = []
@ -77,6 +81,27 @@ class SecurityGroupAgentRpc(object):
trusted_devices.append(device_id) trusted_devices.append(device_id)
return trusted_devices return trusted_devices
def _port_filter_lock(func):
"""Decorator to acquire a new lock while applying port filters"""
@functools.wraps(func)
def decorated_function(self, *args, **kwargs):
lock = lockutils.ReaderWriterLock()
# Tracking the most recent lock at the instance level allows
# waiters to only wait for the most recent lock to be released
# instead of waiting until all locks have been released.
self._latest_port_filter_lock = lock
with lock.write_lock():
return func(self, *args, **kwargs)
return decorated_function
def _port_filter_wait(func):
"""Decorator to wait for the latest port filter lock to be released"""
@functools.wraps(func)
def decorated_function(self, *args, **kwargs):
with self._latest_port_filter_lock.read_lock():
return func(self, *args, **kwargs)
return decorated_function
def init_firewall(self, defer_refresh_firewall=False, def init_firewall(self, defer_refresh_firewall=False,
integration_bridge=None): integration_bridge=None):
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver or 'noop' firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver or 'noop'
@ -138,6 +163,7 @@ class SecurityGroupAgentRpc(object):
LOG.info("Preparing filters for devices %s", device_ids) LOG.info("Preparing filters for devices %s", device_ids)
self._apply_port_filter(device_ids) self._apply_port_filter(device_ids)
@_port_filter_lock
def _apply_port_filter(self, device_ids, update_filter=False): def _apply_port_filter(self, device_ids, update_filter=False):
step = common_constants.AGENT_RES_PROCESSING_STEP step = common_constants.AGENT_RES_PROCESSING_STEP
if self.use_enhanced_rpc: if self.use_enhanced_rpc:
@ -195,6 +221,7 @@ class SecurityGroupAgentRpc(object):
'security_group_source_groups', 'security_group_source_groups',
'sg_member') 'sg_member')
@_port_filter_wait
def _security_group_updated(self, security_groups, attribute, action_type): def _security_group_updated(self, security_groups, attribute, action_type):
devices = [] devices = []
sec_grp_set = set(security_groups) sec_grp_set = set(security_groups)