chore: Update openstack.common, add lockutils
This patch updates openstack.common to HEAD and also adds lockutils to openstack-common.conf in anticipation of pulling in the cache module when it lands. Change-Id: I714e03c4eba9e98b92144f2b2f227799d104f1bd
This commit is contained in:
parent
014039f03b
commit
1f4bf7c2f8
101
marconi/openstack/common/excutils.py
Normal file
101
marconi/openstack/common/excutils.py
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 OpenStack Foundation.
|
||||||
|
# Copyright 2012, Red Hat, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Exception related utilities.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import traceback
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
|
from marconi.openstack.common.gettextutils import _ # noqa
|
||||||
|
|
||||||
|
|
||||||
|
class save_and_reraise_exception(object):
|
||||||
|
"""Save current exception, run some code and then re-raise.
|
||||||
|
|
||||||
|
In some cases the exception context can be cleared, resulting in None
|
||||||
|
being attempted to be re-raised after an exception handler is run. This
|
||||||
|
can happen when eventlet switches greenthreads or when running an
|
||||||
|
exception handler, code raises and catches an exception. In both
|
||||||
|
cases the exception context will be cleared.
|
||||||
|
|
||||||
|
To work around this, we save the exception state, run handler code, and
|
||||||
|
then re-raise the original exception. If another exception occurs, the
|
||||||
|
saved exception is logged and the new exception is re-raised.
|
||||||
|
|
||||||
|
In some cases the caller may not want to re-raise the exception, and
|
||||||
|
for those circumstances this context provides a reraise flag that
|
||||||
|
can be used to suppress the exception. For example:
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
with save_and_reraise_exception() as ctxt:
|
||||||
|
decide_if_need_reraise()
|
||||||
|
if not should_be_reraised:
|
||||||
|
ctxt.reraise = False
|
||||||
|
"""
|
||||||
|
def __init__(self):
|
||||||
|
self.reraise = True
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.type_, self.value, self.tb, = sys.exc_info()
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
if exc_type is not None:
|
||||||
|
logging.error(_('Original exception being dropped: %s'),
|
||||||
|
traceback.format_exception(self.type_,
|
||||||
|
self.value,
|
||||||
|
self.tb))
|
||||||
|
return False
|
||||||
|
if self.reraise:
|
||||||
|
six.reraise(self.type_, self.value, self.tb)
|
||||||
|
|
||||||
|
|
||||||
|
def forever_retry_uncaught_exceptions(infunc):
|
||||||
|
def inner_func(*args, **kwargs):
|
||||||
|
last_log_time = 0
|
||||||
|
last_exc_message = None
|
||||||
|
exc_count = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
return infunc(*args, **kwargs)
|
||||||
|
except Exception as exc:
|
||||||
|
this_exc_message = unicode(exc)
|
||||||
|
if this_exc_message == last_exc_message:
|
||||||
|
exc_count += 1
|
||||||
|
else:
|
||||||
|
exc_count = 1
|
||||||
|
# Do not log any more frequently than once a minute unless
|
||||||
|
# the exception message changes
|
||||||
|
cur_time = int(time.time())
|
||||||
|
if (cur_time - last_log_time > 60 or
|
||||||
|
this_exc_message != last_exc_message):
|
||||||
|
logging.exception(
|
||||||
|
_('Unexpected exception occurred %d time(s)... '
|
||||||
|
'retrying.') % exc_count)
|
||||||
|
last_log_time = cur_time
|
||||||
|
last_exc_message = this_exc_message
|
||||||
|
exc_count = 0
|
||||||
|
# This should be a very rare event. In case it isn't, do
|
||||||
|
# a sleep.
|
||||||
|
time.sleep(1)
|
||||||
|
return inner_func
|
111
marconi/openstack/common/fileutils.py
Normal file
111
marconi/openstack/common/fileutils.py
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 OpenStack Foundation.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import errno
|
||||||
|
import os
|
||||||
|
|
||||||
|
from marconi.openstack.common import excutils
|
||||||
|
from marconi.openstack.common.gettextutils import _ # noqa
|
||||||
|
from marconi.openstack.common import log as logging
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_FILE_CACHE = {}
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_tree(path):
|
||||||
|
"""Create a directory (and any ancestor directories required)
|
||||||
|
|
||||||
|
:param path: Directory to create
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
os.makedirs(path)
|
||||||
|
except OSError as exc:
|
||||||
|
if exc.errno == errno.EEXIST:
|
||||||
|
if not os.path.isdir(path):
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def read_cached_file(filename, force_reload=False):
|
||||||
|
"""Read from a file if it has been modified.
|
||||||
|
|
||||||
|
:param force_reload: Whether to reload the file.
|
||||||
|
:returns: A tuple with a boolean specifying if the data is fresh
|
||||||
|
or not.
|
||||||
|
"""
|
||||||
|
global _FILE_CACHE
|
||||||
|
|
||||||
|
if force_reload and filename in _FILE_CACHE:
|
||||||
|
del _FILE_CACHE[filename]
|
||||||
|
|
||||||
|
reloaded = False
|
||||||
|
mtime = os.path.getmtime(filename)
|
||||||
|
cache_info = _FILE_CACHE.setdefault(filename, {})
|
||||||
|
|
||||||
|
if not cache_info or mtime > cache_info.get('mtime', 0):
|
||||||
|
LOG.debug(_("Reloading cached file %s") % filename)
|
||||||
|
with open(filename) as fap:
|
||||||
|
cache_info['data'] = fap.read()
|
||||||
|
cache_info['mtime'] = mtime
|
||||||
|
reloaded = True
|
||||||
|
return (reloaded, cache_info['data'])
|
||||||
|
|
||||||
|
|
||||||
|
def delete_if_exists(path, remove=os.unlink):
|
||||||
|
"""Delete a file, but ignore file not found error.
|
||||||
|
|
||||||
|
:param path: File to delete
|
||||||
|
:param remove: Optional function to remove passed path
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
remove(path)
|
||||||
|
except OSError as e:
|
||||||
|
if e.errno != errno.ENOENT:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def remove_path_on_error(path, remove=delete_if_exists):
|
||||||
|
"""Protect code that wants to operate on PATH atomically.
|
||||||
|
Any exception will cause PATH to be removed.
|
||||||
|
|
||||||
|
:param path: File to work with
|
||||||
|
:param remove: Optional function to remove passed path
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except Exception:
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
remove(path)
|
||||||
|
|
||||||
|
|
||||||
|
def file_open(*args, **kwargs):
|
||||||
|
"""Open file
|
||||||
|
|
||||||
|
see built-in file() documentation for more details
|
||||||
|
|
||||||
|
Note: The reason this is kept in a separate module is to easily
|
||||||
|
be able to provide a stub module that doesn't alter system
|
||||||
|
state at all (for unit tests)
|
||||||
|
"""
|
||||||
|
return file(*args, **kwargs)
|
@ -26,10 +26,13 @@ Usual usage in an openstack.common module:
|
|||||||
|
|
||||||
import copy
|
import copy
|
||||||
import gettext
|
import gettext
|
||||||
import logging.handlers
|
import logging
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
import UserString
|
try:
|
||||||
|
import UserString as _userString
|
||||||
|
except ImportError:
|
||||||
|
import collections as _userString
|
||||||
|
|
||||||
from babel import localedata
|
from babel import localedata
|
||||||
import six
|
import six
|
||||||
@ -37,11 +40,27 @@ import six
|
|||||||
_localedir = os.environ.get('marconi'.upper() + '_LOCALEDIR')
|
_localedir = os.environ.get('marconi'.upper() + '_LOCALEDIR')
|
||||||
_t = gettext.translation('marconi', localedir=_localedir, fallback=True)
|
_t = gettext.translation('marconi', localedir=_localedir, fallback=True)
|
||||||
|
|
||||||
_AVAILABLE_LANGUAGES = []
|
_AVAILABLE_LANGUAGES = {}
|
||||||
|
USE_LAZY = False
|
||||||
|
|
||||||
|
|
||||||
|
def enable_lazy():
|
||||||
|
"""Convenience function for configuring _() to use lazy gettext
|
||||||
|
|
||||||
|
Call this at the start of execution to enable the gettextutils._
|
||||||
|
function to use lazy gettext functionality. This is useful if
|
||||||
|
your project is importing _ directly instead of using the
|
||||||
|
gettextutils.install() way of importing the _ function.
|
||||||
|
"""
|
||||||
|
global USE_LAZY
|
||||||
|
USE_LAZY = True
|
||||||
|
|
||||||
|
|
||||||
def _(msg):
|
def _(msg):
|
||||||
return _t.ugettext(msg)
|
if USE_LAZY:
|
||||||
|
return Message(msg, 'marconi')
|
||||||
|
else:
|
||||||
|
return _t.ugettext(msg)
|
||||||
|
|
||||||
|
|
||||||
def install(domain, lazy=False):
|
def install(domain, lazy=False):
|
||||||
@ -95,7 +114,7 @@ def install(domain, lazy=False):
|
|||||||
unicode=True)
|
unicode=True)
|
||||||
|
|
||||||
|
|
||||||
class Message(UserString.UserString, object):
|
class Message(_userString.UserString, object):
|
||||||
"""Class used to encapsulate translatable messages."""
|
"""Class used to encapsulate translatable messages."""
|
||||||
def __init__(self, msg, domain):
|
def __init__(self, msg, domain):
|
||||||
# _msg is the gettext msgid and should never change
|
# _msg is the gettext msgid and should never change
|
||||||
@ -236,7 +255,7 @@ class Message(UserString.UserString, object):
|
|||||||
if name in ops:
|
if name in ops:
|
||||||
return getattr(self.data, name)
|
return getattr(self.data, name)
|
||||||
else:
|
else:
|
||||||
return UserString.UserString.__getattribute__(self, name)
|
return _userString.UserString.__getattribute__(self, name)
|
||||||
|
|
||||||
|
|
||||||
def get_available_languages(domain):
|
def get_available_languages(domain):
|
||||||
@ -244,8 +263,8 @@ def get_available_languages(domain):
|
|||||||
|
|
||||||
:param domain: the domain to get languages for
|
:param domain: the domain to get languages for
|
||||||
"""
|
"""
|
||||||
if _AVAILABLE_LANGUAGES:
|
if domain in _AVAILABLE_LANGUAGES:
|
||||||
return _AVAILABLE_LANGUAGES
|
return copy.copy(_AVAILABLE_LANGUAGES[domain])
|
||||||
|
|
||||||
localedir = '%s_LOCALEDIR' % domain.upper()
|
localedir = '%s_LOCALEDIR' % domain.upper()
|
||||||
find = lambda x: gettext.find(domain,
|
find = lambda x: gettext.find(domain,
|
||||||
@ -254,7 +273,7 @@ def get_available_languages(domain):
|
|||||||
|
|
||||||
# NOTE(mrodden): en_US should always be available (and first in case
|
# NOTE(mrodden): en_US should always be available (and first in case
|
||||||
# order matters) since our in-line message strings are en_US
|
# order matters) since our in-line message strings are en_US
|
||||||
_AVAILABLE_LANGUAGES.append('en_US')
|
language_list = ['en_US']
|
||||||
# NOTE(luisg): Babel <1.0 used a function called list(), which was
|
# NOTE(luisg): Babel <1.0 used a function called list(), which was
|
||||||
# renamed to locale_identifiers() in >=1.0, the requirements master list
|
# renamed to locale_identifiers() in >=1.0, the requirements master list
|
||||||
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
|
# requires >=0.9.6, uncapped, so defensively work with both. We can remove
|
||||||
@ -264,13 +283,14 @@ def get_available_languages(domain):
|
|||||||
locale_identifiers = list_identifiers()
|
locale_identifiers = list_identifiers()
|
||||||
for i in locale_identifiers:
|
for i in locale_identifiers:
|
||||||
if find(i) is not None:
|
if find(i) is not None:
|
||||||
_AVAILABLE_LANGUAGES.append(i)
|
language_list.append(i)
|
||||||
return _AVAILABLE_LANGUAGES
|
_AVAILABLE_LANGUAGES[domain] = language_list
|
||||||
|
return copy.copy(language_list)
|
||||||
|
|
||||||
|
|
||||||
def get_localized_message(message, user_locale):
|
def get_localized_message(message, user_locale):
|
||||||
"""Gets a localized version of the given message in the given locale."""
|
"""Gets a localized version of the given message in the given locale."""
|
||||||
if (isinstance(message, Message)):
|
if isinstance(message, Message):
|
||||||
if user_locale:
|
if user_locale:
|
||||||
message.locale = user_locale
|
message.locale = user_locale
|
||||||
return unicode(message)
|
return unicode(message)
|
||||||
|
@ -38,14 +38,18 @@ import functools
|
|||||||
import inspect
|
import inspect
|
||||||
import itertools
|
import itertools
|
||||||
import json
|
import json
|
||||||
import types
|
try:
|
||||||
import xmlrpclib
|
import xmlrpclib
|
||||||
|
except ImportError:
|
||||||
|
# NOTE(jd): xmlrpclib is not shipped with Python 3
|
||||||
|
xmlrpclib = None
|
||||||
|
|
||||||
import netaddr
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
from marconi.openstack.common import importutils
|
||||||
from marconi.openstack.common import timeutils
|
from marconi.openstack.common import timeutils
|
||||||
|
|
||||||
|
netaddr = importutils.try_import("netaddr")
|
||||||
|
|
||||||
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
_nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
||||||
inspect.isfunction, inspect.isgeneratorfunction,
|
inspect.isfunction, inspect.isgeneratorfunction,
|
||||||
@ -53,7 +57,8 @@ _nasty_type_tests = [inspect.ismodule, inspect.isclass, inspect.ismethod,
|
|||||||
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
|
inspect.iscode, inspect.isbuiltin, inspect.isroutine,
|
||||||
inspect.isabstract]
|
inspect.isabstract]
|
||||||
|
|
||||||
_simple_types = (types.NoneType, int, basestring, bool, float, long)
|
_simple_types = (six.string_types + six.integer_types
|
||||||
|
+ (type(None), bool, float))
|
||||||
|
|
||||||
|
|
||||||
def to_primitive(value, convert_instances=False, convert_datetime=True,
|
def to_primitive(value, convert_instances=False, convert_datetime=True,
|
||||||
@ -125,7 +130,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||||||
# It's not clear why xmlrpclib created their own DateTime type, but
|
# It's not clear why xmlrpclib created their own DateTime type, but
|
||||||
# for our purposes, make it a datetime type which is explicitly
|
# for our purposes, make it a datetime type which is explicitly
|
||||||
# handled
|
# handled
|
||||||
if isinstance(value, xmlrpclib.DateTime):
|
if xmlrpclib and isinstance(value, xmlrpclib.DateTime):
|
||||||
value = datetime.datetime(*tuple(value.timetuple())[:6])
|
value = datetime.datetime(*tuple(value.timetuple())[:6])
|
||||||
|
|
||||||
if convert_datetime and isinstance(value, datetime.datetime):
|
if convert_datetime and isinstance(value, datetime.datetime):
|
||||||
@ -138,7 +143,7 @@ def to_primitive(value, convert_instances=False, convert_datetime=True,
|
|||||||
# Likely an instance of something. Watch for cycles.
|
# Likely an instance of something. Watch for cycles.
|
||||||
# Ignore class member vars.
|
# Ignore class member vars.
|
||||||
return recursive(value.__dict__, level=level + 1)
|
return recursive(value.__dict__, level=level + 1)
|
||||||
elif isinstance(value, netaddr.IPAddress):
|
elif netaddr and isinstance(value, netaddr.IPAddress):
|
||||||
return six.text_type(value)
|
return six.text_type(value)
|
||||||
else:
|
else:
|
||||||
if any(test(value) for test in _nasty_type_tests):
|
if any(test(value) for test in _nasty_type_tests):
|
||||||
|
277
marconi/openstack/common/lockutils.py
Normal file
277
marconi/openstack/common/lockutils.py
Normal file
@ -0,0 +1,277 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 OpenStack Foundation.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import errno
|
||||||
|
import functools
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import weakref
|
||||||
|
|
||||||
|
from oslo.config import cfg
|
||||||
|
|
||||||
|
from marconi.openstack.common import fileutils
|
||||||
|
from marconi.openstack.common.gettextutils import _ # noqa
|
||||||
|
from marconi.openstack.common import local
|
||||||
|
from marconi.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
util_opts = [
|
||||||
|
cfg.BoolOpt('disable_process_locking', default=False,
|
||||||
|
help='Whether to disable inter-process locks'),
|
||||||
|
cfg.StrOpt('lock_path',
|
||||||
|
help=('Directory to use for lock files.'))
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(util_opts)
|
||||||
|
|
||||||
|
|
||||||
|
def set_defaults(lock_path):
|
||||||
|
cfg.set_defaults(util_opts, lock_path=lock_path)
|
||||||
|
|
||||||
|
|
||||||
|
class _InterProcessLock(object):
|
||||||
|
"""Lock implementation which allows multiple locks, working around
|
||||||
|
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
||||||
|
not require any cleanup. Since the lock is always held on a file
|
||||||
|
descriptor rather than outside of the process, the lock gets dropped
|
||||||
|
automatically if the process crashes, even if __exit__ is not executed.
|
||||||
|
|
||||||
|
There are no guarantees regarding usage by multiple green threads in a
|
||||||
|
single process here. This lock works only between processes. Exclusive
|
||||||
|
access between local threads should be achieved using the semaphores
|
||||||
|
in the @synchronized decorator.
|
||||||
|
|
||||||
|
Note these locks are released when the descriptor is closed, so it's not
|
||||||
|
safe to close the file descriptor while another green thread holds the
|
||||||
|
lock. Just opening and closing the lock file can break synchronisation,
|
||||||
|
so lock files must be accessed only using this abstraction.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name):
|
||||||
|
self.lockfile = None
|
||||||
|
self.fname = name
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.lockfile = open(self.fname, 'w')
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Using non-blocking locks since green threads are not
|
||||||
|
# patched to deal with blocking locking calls.
|
||||||
|
# Also upon reading the MSDN docs for locking(), it seems
|
||||||
|
# to have a laughable 10 attempts "blocking" mechanism.
|
||||||
|
self.trylock()
|
||||||
|
return self
|
||||||
|
except IOError as e:
|
||||||
|
if e.errno in (errno.EACCES, errno.EAGAIN):
|
||||||
|
# external locks synchronise things like iptables
|
||||||
|
# updates - give it some time to prevent busy spinning
|
||||||
|
time.sleep(0.01)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
try:
|
||||||
|
self.unlock()
|
||||||
|
self.lockfile.close()
|
||||||
|
except IOError:
|
||||||
|
LOG.exception(_("Could not release the acquired lock `%s`"),
|
||||||
|
self.fname)
|
||||||
|
|
||||||
|
def trylock(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def unlock(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
|
class _WindowsLock(_InterProcessLock):
|
||||||
|
def trylock(self):
|
||||||
|
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
|
||||||
|
|
||||||
|
def unlock(self):
|
||||||
|
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
|
||||||
|
|
||||||
|
|
||||||
|
class _PosixLock(_InterProcessLock):
|
||||||
|
def trylock(self):
|
||||||
|
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
|
||||||
|
def unlock(self):
|
||||||
|
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
||||||
|
|
||||||
|
|
||||||
|
if os.name == 'nt':
|
||||||
|
import msvcrt
|
||||||
|
InterProcessLock = _WindowsLock
|
||||||
|
else:
|
||||||
|
import fcntl
|
||||||
|
InterProcessLock = _PosixLock
|
||||||
|
|
||||||
|
_semaphores = weakref.WeakValueDictionary()
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||||
|
"""Context based lock
|
||||||
|
|
||||||
|
This function yields a `threading.Semaphore` instance (if we don't use
|
||||||
|
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
|
||||||
|
True, in which case, it'll yield an InterProcessLock instance.
|
||||||
|
|
||||||
|
:param lock_file_prefix: The lock_file_prefix argument is used to provide
|
||||||
|
lock files on disk with a meaningful prefix.
|
||||||
|
|
||||||
|
:param external: The external keyword argument denotes whether this lock
|
||||||
|
should work across multiple processes. This means that if two different
|
||||||
|
workers both run a a method decorated with @synchronized('mylock',
|
||||||
|
external=True), only one of them will execute at a time.
|
||||||
|
|
||||||
|
:param lock_path: The lock_path keyword argument is used to specify a
|
||||||
|
special location for external lock files to live. If nothing is set, then
|
||||||
|
CONF.lock_path is used as a default.
|
||||||
|
"""
|
||||||
|
# NOTE(soren): If we ever go natively threaded, this will be racy.
|
||||||
|
# See http://stackoverflow.com/questions/5390569/dyn
|
||||||
|
# amically-allocating-and-destroying-mutexes
|
||||||
|
sem = _semaphores.get(name, threading.Semaphore())
|
||||||
|
if name not in _semaphores:
|
||||||
|
# this check is not racy - we're already holding ref locally
|
||||||
|
# so GC won't remove the item and there was no IO switch
|
||||||
|
# (only valid in greenthreads)
|
||||||
|
_semaphores[name] = sem
|
||||||
|
|
||||||
|
with sem:
|
||||||
|
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
|
||||||
|
|
||||||
|
# NOTE(mikal): I know this looks odd
|
||||||
|
if not hasattr(local.strong_store, 'locks_held'):
|
||||||
|
local.strong_store.locks_held = []
|
||||||
|
local.strong_store.locks_held.append(name)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if external and not CONF.disable_process_locking:
|
||||||
|
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
|
||||||
|
{'lock': name})
|
||||||
|
|
||||||
|
# We need a copy of lock_path because it is non-local
|
||||||
|
local_lock_path = lock_path or CONF.lock_path
|
||||||
|
if not local_lock_path:
|
||||||
|
raise cfg.RequiredOptError('lock_path')
|
||||||
|
|
||||||
|
if not os.path.exists(local_lock_path):
|
||||||
|
fileutils.ensure_tree(local_lock_path)
|
||||||
|
LOG.info(_('Created lock path: %s'), local_lock_path)
|
||||||
|
|
||||||
|
def add_prefix(name, prefix):
|
||||||
|
if not prefix:
|
||||||
|
return name
|
||||||
|
sep = '' if prefix.endswith('-') else '-'
|
||||||
|
return '%s%s%s' % (prefix, sep, name)
|
||||||
|
|
||||||
|
# NOTE(mikal): the lock name cannot contain directory
|
||||||
|
# separators
|
||||||
|
lock_file_name = add_prefix(name.replace(os.sep, '_'),
|
||||||
|
lock_file_prefix)
|
||||||
|
|
||||||
|
lock_file_path = os.path.join(local_lock_path, lock_file_name)
|
||||||
|
|
||||||
|
try:
|
||||||
|
lock = InterProcessLock(lock_file_path)
|
||||||
|
with lock as lock:
|
||||||
|
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
|
||||||
|
{'lock': name, 'path': lock_file_path})
|
||||||
|
yield lock
|
||||||
|
finally:
|
||||||
|
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
|
||||||
|
{'lock': name, 'path': lock_file_path})
|
||||||
|
else:
|
||||||
|
yield sem
|
||||||
|
|
||||||
|
finally:
|
||||||
|
local.strong_store.locks_held.remove(name)
|
||||||
|
|
||||||
|
|
||||||
|
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||||
|
"""Synchronization decorator.
|
||||||
|
|
||||||
|
Decorating a method like so::
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def foo(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
ensures that only one thread will execute the foo method at a time.
|
||||||
|
|
||||||
|
Different methods can share the same lock::
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def foo(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def bar(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
This way only one of either foo or bar can be executing at a time.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def wrap(f):
|
||||||
|
@functools.wraps(f)
|
||||||
|
def inner(*args, **kwargs):
|
||||||
|
with lock(name, lock_file_prefix, external, lock_path):
|
||||||
|
LOG.debug(_('Got semaphore / lock "%(function)s"'),
|
||||||
|
{'function': f.__name__})
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
|
LOG.debug(_('Semaphore / lock released "%(function)s"'),
|
||||||
|
{'function': f.__name__})
|
||||||
|
return inner
|
||||||
|
return wrap
|
||||||
|
|
||||||
|
|
||||||
|
def synchronized_with_prefix(lock_file_prefix):
|
||||||
|
"""Partial object generator for the synchronization decorator.
|
||||||
|
|
||||||
|
Redefine @synchronized in each project like so::
|
||||||
|
|
||||||
|
(in nova/utils.py)
|
||||||
|
from nova.openstack.common import lockutils
|
||||||
|
|
||||||
|
synchronized = lockutils.synchronized_with_prefix('nova-')
|
||||||
|
|
||||||
|
|
||||||
|
(in nova/foo.py)
|
||||||
|
from nova import utils
|
||||||
|
|
||||||
|
@utils.synchronized('mylock')
|
||||||
|
def bar(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
The lock_file_prefix argument is used to provide lock files on disk with a
|
||||||
|
meaningful prefix.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
|
@ -1,7 +1,10 @@
|
|||||||
[DEFAULT]
|
[DEFAULT]
|
||||||
|
module=excutils
|
||||||
|
module=fileutils
|
||||||
module=gettextutils
|
module=gettextutils
|
||||||
module=importutils
|
module=importutils
|
||||||
module=jsonutils
|
module=jsonutils
|
||||||
|
module=lockutils
|
||||||
module=log
|
module=log
|
||||||
module=setup
|
module=setup
|
||||||
module=timeutils
|
module=timeutils
|
||||||
|
Loading…
Reference in New Issue
Block a user