Merge "Lock sg updates while in _apply_port_filter()" into stable/ussuri
This commit is contained in:
commit
19f96ffa99
|
@ -18,6 +18,7 @@ import functools
|
|||
|
||||
from neutron_lib.api.definitions import rbac_security_groups as rbac_sg_apidef
|
||||
from neutron_lib.api.definitions import stateful_security_group as stateful_sg
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
|
@ -60,6 +61,9 @@ class SecurityGroupAgentRpc(object):
|
|||
self.context = context
|
||||
self.plugin_rpc = plugin_rpc
|
||||
self.init_firewall(defer_refresh_firewall, integration_bridge)
|
||||
# _latest_port_filter_lock will point to the lock created for the
|
||||
# most recent thread to enter _apply_port_filters().
|
||||
self._latest_port_filter_lock = lockutils.ReaderWriterLock()
|
||||
|
||||
def _get_trusted_devices(self, device_ids, devices):
|
||||
trusted_devices = []
|
||||
|
@ -75,6 +79,27 @@ class SecurityGroupAgentRpc(object):
|
|||
trusted_devices.append(device_id)
|
||||
return trusted_devices
|
||||
|
||||
def _port_filter_lock(func):
|
||||
"""Decorator to acquire a new lock while applying port filters"""
|
||||
@functools.wraps(func)
|
||||
def decorated_function(self, *args, **kwargs):
|
||||
lock = lockutils.ReaderWriterLock()
|
||||
# Tracking the most recent lock at the instance level allows
|
||||
# waiters to only wait for the most recent lock to be released
|
||||
# instead of waiting until all locks have been released.
|
||||
self._latest_port_filter_lock = lock
|
||||
with lock.write_lock():
|
||||
return func(self, *args, **kwargs)
|
||||
return decorated_function
|
||||
|
||||
def _port_filter_wait(func):
|
||||
"""Decorator to wait for the latest port filter lock to be released"""
|
||||
@functools.wraps(func)
|
||||
def decorated_function(self, *args, **kwargs):
|
||||
with self._latest_port_filter_lock.read_lock():
|
||||
return func(self, *args, **kwargs)
|
||||
return decorated_function
|
||||
|
||||
def init_firewall(self, defer_refresh_firewall=False,
|
||||
integration_bridge=None):
|
||||
firewall_driver = cfg.CONF.SECURITYGROUP.firewall_driver or 'noop'
|
||||
|
@ -136,6 +161,7 @@ class SecurityGroupAgentRpc(object):
|
|||
LOG.info("Preparing filters for devices %s", device_ids)
|
||||
self._apply_port_filter(device_ids)
|
||||
|
||||
@_port_filter_lock
|
||||
def _apply_port_filter(self, device_ids, update_filter=False):
|
||||
step = common_constants.AGENT_RES_PROCESSING_STEP
|
||||
if self.use_enhanced_rpc:
|
||||
|
@ -193,6 +219,7 @@ class SecurityGroupAgentRpc(object):
|
|||
'security_group_source_groups',
|
||||
'sg_member')
|
||||
|
||||
@_port_filter_wait
|
||||
def _security_group_updated(self, security_groups, attribute, action_type):
|
||||
devices = []
|
||||
sec_grp_set = set(security_groups)
|
||||
|
|
Loading…
Reference in New Issue