use synchronized lock decorator from neutron-lib

neutron-lib contains the synchronized lockutils decorator as well as
the SYNCHRONIZED_PREFIX global. This patch consumes them from
neutron-lib and removes them from neutron.

NeutronLibImpact

Change-Id: I729da348e340509f2d09f8a6436716e2398f1583
This commit is contained in:
Boden R 2017-07-25 11:28:36 -06:00
parent c0fa798423
commit 60f8048c7c
12 changed files with 38 additions and 37 deletions
neutron
agent
common
manager.py
notifiers
services
metering/agents
trunk/drivers/linuxbridge/agent
tests/common/exclusive_resources

@ -20,6 +20,7 @@ import eventlet
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions
from neutron_lib.utils import runtime
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@ -65,7 +66,7 @@ def _wait_if_syncing(f):
def _net_lock(network_id):
"""Returns a context manager lock based on network_id."""
lock_name = 'dhcp-agent-network-lock-%s' % network_id
return lockutils.lock(lock_name, utils.SYNCHRONIZED_PREFIX)
return lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX)
class DhcpAgent(manager.Manager):

@ -16,6 +16,7 @@ import contextlib
import os
from neutron_lib import constants as lib_constants
from neutron_lib.utils import runtime
from oslo_concurrency import lockutils
from oslo_log import log as logging
from oslo_utils import excutils
@ -112,7 +113,7 @@ class FipNamespace(namespaces.Namespace):
# Use a namespace and port-specific lock semaphore to allow for
# concurrency
lock_name = 'port-lock-' + self.name + '-' + interface_name
with lockutils.lock(lock_name, common_utils.SYNCHRONIZED_PREFIX):
with lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX):
try:
yield
except Exception:

@ -19,6 +19,7 @@ import signal
import netaddr
from neutron_lib.api.definitions import portbindings
from neutron_lib import constants as n_consts
from neutron_lib.utils import runtime
from oslo_log import log as logging
from neutron.agent.l3 import namespaces
@ -440,7 +441,7 @@ class HaRouter(router.RouterInfo):
self.ha_port['status'] == n_consts.PORT_STATUS_ACTIVE):
self.enable_keepalived()
@common_utils.synchronized('enable_radvd')
@runtime.synchronized('enable_radvd')
def enable_radvd(self, internal_ports=None):
if (self.keepalived_manager.get_process().active and
self.ha_state == 'master'):

@ -14,9 +14,9 @@
import copy
import netaddr
from neutron_lib.utils import runtime
from neutron.agent.linux import utils as linux_utils
from neutron.common import utils
IPSET_ADD_BULK_THRESHOLD = 5
NET_PREFIX = 'N'
@ -83,7 +83,7 @@ class IpsetManager(object):
self.set_members_mutate(set_name, ethertype, member_ips)
return add_ips, del_ips
@utils.synchronized('ipset', external=True)
@runtime.synchronized('ipset', external=True)
def set_members_mutate(self, set_name, ethertype, member_ips):
if not self.set_name_exists(set_name):
# The initial creation is handled with create/refresh to
@ -105,7 +105,7 @@ class IpsetManager(object):
else:
self._refresh_set(set_name, member_ips, ethertype)
@utils.synchronized('ipset', external=True)
@runtime.synchronized('ipset', external=True)
def destroy(self, id, ethertype, forced=False):
set_name = self.get_name(id, ethertype)
self._destroy(set_name, forced)

@ -25,6 +25,7 @@ import os
import re
import sys
from neutron_lib.utils import runtime
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
@ -35,7 +36,6 @@ from neutron.agent.linux import ip_lib
from neutron.agent.linux import iptables_comments as ic
from neutron.agent.linux import utils as linux_utils
from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.conf.agent import common as config
LOG = logging.getLogger(__name__)
@ -442,7 +442,7 @@ class IptablesManager(object):
# NOTE(ihrachys) we may get rid of the lock once all supported
# platforms get iptables with 999eaa241212d3952ddff39a99d0d55a74e3639e
# ("iptables-restore: support acquiring the lock.")
with lockutils.lock(lock_name, utils.SYNCHRONIZED_PREFIX, True):
with lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX, True):
first = self._apply_synchronized()
if not cfg.CONF.AGENT.debug_iptables_rules:
return first

@ -21,6 +21,7 @@ from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants as n_const
from neutron_lib.utils import runtime
from oslo_log import log as logging
from oslo_utils import netutils
import six
@ -60,7 +61,7 @@ class PrefixDelegation(object):
def _is_pd_master_router(self, router):
return router['master']
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def enable_subnet(self, router_id, subnet_id, prefix, ri_ifname, mac):
router = self.routers.get(router_id)
if router is None:
@ -85,7 +86,7 @@ class PrefixDelegation(object):
if pd_info.client_started:
pd_info.driver.disable(self.pmon, router['ns_name'])
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def disable_subnet(self, router_id, subnet_id):
prefix_update = {}
router = self.routers.get(router_id)
@ -101,7 +102,7 @@ class PrefixDelegation(object):
self.notifier(self.context, prefix_update)
del router['subnets'][subnet_id]
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def update_subnet(self, router_id, subnet_id, prefix):
router = self.routers.get(router_id)
if router is not None:
@ -112,7 +113,7 @@ class PrefixDelegation(object):
pd_info.prefix = prefix
return old_prefix
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def add_gw_interface(self, router_id, gw_ifname):
router = self.routers.get(router_id)
if not router:
@ -157,14 +158,14 @@ class PrefixDelegation(object):
LOG.debug("Update server with prefixes: %s", prefix_update)
self.notifier(self.context, prefix_update)
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def remove_gw_interface(self, router_id):
router = self.routers.get(router_id)
if router is not None:
router['gw_interface'] = None
self.delete_router_pd(router)
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def get_preserve_ips(self, router_id):
preserve_ips = []
router = self.routers.get(router_id)
@ -173,13 +174,13 @@ class PrefixDelegation(object):
preserve_ips.append(pd_info.get_bind_lla_with_mask())
return preserve_ips
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def sync_router(self, router_id):
router = self.routers.get(router_id)
if router is not None and router['gw_interface'] is None:
self.delete_router_pd(router)
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def remove_stale_ri_ifname(self, router_id, stale_ifname):
router = self.routers.get(router_id)
if router is not None:
@ -260,7 +261,7 @@ class PrefixDelegation(object):
return not lla['tentative']
return False
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def process_ha_state(self, router_id, master):
router = self.routers.get(router_id)
if router is None or router['master'] == master:
@ -280,7 +281,7 @@ class PrefixDelegation(object):
switch_over=True)
pd_info.client_started = False
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def process_prefix_update(self):
LOG.debug("Processing IPv6 PD Prefix Update")
@ -348,7 +349,7 @@ class PrefixDelegation(object):
subnets[pd_info.subnet_id] = new_pd_info
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def remove_router(resource, event, l3_agent, **kwargs):
router_id = kwargs['router'].router_id
router = l3_agent.pd.routers.get(router_id)
@ -364,7 +365,7 @@ def get_router_entry(ns_name, master):
'subnets': {}}
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def add_router(resource, event, l3_agent, **kwargs):
added_router = kwargs['router']
router = l3_agent.pd.routers.get(added_router.router_id)
@ -379,7 +380,7 @@ def add_router(resource, event, l3_agent, **kwargs):
router['master'] = master
@utils.synchronized("l3-agent-pd")
@runtime.synchronized("l3-agent-pd")
def update_router(resource, event, l3_agent, **kwargs):
updated_router = kwargs['router']
router = l3_agent.pd.routers.get(updated_router.router_id)

@ -37,7 +37,6 @@ from eventlet.green import subprocess
import netaddr
from neutron_lib import constants as n_const
from neutron_lib.utils import helpers
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
@ -53,14 +52,11 @@ from neutron.db import api as db_api
TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
LOG = logging.getLogger(__name__)
SYNCHRONIZED_PREFIX = 'neutron-'
DEFAULT_THROTTLER_VALUE = 2
_SEPARATOR_REGEX = re.compile(r'[/\\]+')
synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX)
class WaitTimeout(Exception):
"""Default exception coming from wait_until_true() function."""

@ -17,6 +17,7 @@ from collections import defaultdict
from neutron_lib.plugins import constants as lib_const
from neutron_lib.plugins import directory
from neutron_lib.utils import runtime
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
@ -223,7 +224,7 @@ class NeutronManager(object):
"desc": plugin_inst.get_plugin_description()})
@classmethod
@utils.synchronized("manager")
@runtime.synchronized("manager")
def _create_instance(cls):
if not cls.has_instance():
cls._instance = cls()

@ -11,10 +11,9 @@
# under the License.
import eventlet
from neutron_lib.utils import runtime
from oslo_utils import uuidutils
from neutron.common import utils
class BatchNotifier(object):
def __init__(self, batch_interval, callback):
@ -47,7 +46,7 @@ class BatchNotifier(object):
self.pending_events.append(event)
@utils.synchronized(self._lock_identifier)
@runtime.synchronized(self._lock_identifier)
def synced_send():
self._notify()
# sleeping after send while holding the lock allows subsequent

@ -16,6 +16,7 @@ import sys
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.utils import runtime
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
@ -30,7 +31,6 @@ from neutron.common import config as common_config
from neutron.common import constants as n_const
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron.conf.agent import common as config
from neutron.conf.services import metering_agent
from neutron import manager
@ -160,7 +160,7 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager):
self._purge_metering_info()
self.last_report = ts
@utils.synchronized('metering-agent')
@runtime.synchronized('metering-agent')
def _invoke_driver(self, context, meterings, func_name):
try:
return getattr(self.metering_driver, func_name)(context, meterings)

@ -12,12 +12,12 @@
# under the License.
import re
from neutron_lib.utils import runtime
from oslo_concurrency import lockutils
from oslo_log import log as logging
from oslo_utils import excutils
from neutron.agent.linux import ip_lib
from neutron.common import utils
from neutron.plugins.ml2.drivers.linuxbridge.agent.common import utils as lutil
LOG = logging.getLogger(__name__)
@ -77,7 +77,7 @@ class Plumber(object):
def _trunk_lock(self, trunk_dev):
lock_name = 'trunk-%s' % trunk_dev
return lockutils.lock(lock_name, utils.SYNCHRONIZED_PREFIX)
return lockutils.lock(lock_name, runtime.SYNCHRONIZED_PREFIX)
def _create_vlan_subint(self, trunk_name, devname, vlan_id):
ip_wrap = ip_lib.IPWrapper(namespace=self.namespace)

@ -15,11 +15,10 @@
import os
import fixtures
from neutron_lib.utils import runtime
from oslo_log import log as logging
from oslo_utils import fileutils
from neutron.common import utils
LOG = logging.getLogger(__name__)
MAX_ATTEMPTS = 100
@ -68,7 +67,8 @@ class ResourceAllocator(object):
self._validator = validator if validator else is_valid
self._resource_name = resource_name
@utils.synchronized('resource_allocator', external=True, lock_path='/tmp')
@runtime.synchronized('resource_allocator', external=True,
lock_path='/tmp')
def allocate(self):
allocations = self._get_allocations()
@ -86,7 +86,8 @@ class ResourceAllocator(object):
'Could not allocate a new resource of type %s from pool %s' %
(self._resource_name, allocations))
@utils.synchronized('resource_allocator', external=True, lock_path='/tmp')
@runtime.synchronized('resource_allocator', external=True,
lock_path='/tmp')
def release(self, resource):
allocations = self._get_allocations()
allocations.remove(resource)