Merge "Migrate to fileutils and lockutils."
This commit is contained in:
@@ -12,7 +12,7 @@ view, each OpenStack service runs in a single thread.
|
|||||||
|
|
||||||
The use of green threads reduces the likelihood of race conditions, but does
|
The use of green threads reduces the likelihood of race conditions, but does
|
||||||
not completely eliminate them. In some cases, you may need to use the
|
not completely eliminate them. In some cases, you may need to use the
|
||||||
``@utils.synchronized(...)`` decorator to avoid races.
|
``@lockutils.synchronized(...)`` decorator to avoid races.
|
||||||
|
|
||||||
In addition, since there is only one operating system thread, a call that
|
In addition, since there is only one operating system thread, a call that
|
||||||
blocks that main thread will block the entire process.
|
blocks that main thread will block the entire process.
|
||||||
|
|||||||
@@ -91,9 +91,6 @@ core_opts = [
|
|||||||
cfg.StrOpt('state_path',
|
cfg.StrOpt('state_path',
|
||||||
default='$pybasedir',
|
default='$pybasedir',
|
||||||
help="Top-level directory for maintaining nova's state"),
|
help="Top-level directory for maintaining nova's state"),
|
||||||
cfg.StrOpt('lock_path',
|
|
||||||
default='$pybasedir',
|
|
||||||
help='Directory to use for lock files'),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
debug_opts = [
|
debug_opts = [
|
||||||
|
|||||||
35
nova/openstack/common/fileutils.py
Normal file
35
nova/openstack/common/fileutils.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 OpenStack LLC.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
import errno
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_tree(path):
|
||||||
|
"""Create a directory (and any ancestor directories required)
|
||||||
|
|
||||||
|
:param path: Directory to create
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
os.makedirs(path)
|
||||||
|
except OSError as exc:
|
||||||
|
if exc.errno == errno.EEXIST:
|
||||||
|
if not os.path.isdir(path):
|
||||||
|
raise
|
||||||
|
else:
|
||||||
|
raise
|
||||||
233
nova/openstack/common/lockutils.py
Normal file
233
nova/openstack/common/lockutils.py
Normal file
@@ -0,0 +1,233 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 OpenStack LLC.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
import errno
|
||||||
|
import functools
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
import weakref
|
||||||
|
|
||||||
|
from eventlet import greenthread
|
||||||
|
from eventlet import semaphore
|
||||||
|
|
||||||
|
from nova.openstack.common import cfg
|
||||||
|
from nova.openstack.common import fileutils
|
||||||
|
from nova.openstack.common import log as logging
|
||||||
|
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
util_opts = [
|
||||||
|
cfg.BoolOpt('disable_process_locking', default=False,
|
||||||
|
help='Whether to disable inter-process locks'),
|
||||||
|
cfg.StrOpt('lock_path',
|
||||||
|
default=os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||||
|
'../')),
|
||||||
|
help='Directory to use for lock files')
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(util_opts)
|
||||||
|
|
||||||
|
|
||||||
|
class _InterProcessLock(object):
|
||||||
|
"""Lock implementation which allows multiple locks, working around
|
||||||
|
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
||||||
|
not require any cleanup. Since the lock is always held on a file
|
||||||
|
descriptor rather than outside of the process, the lock gets dropped
|
||||||
|
automatically if the process crashes, even if __exit__ is not executed.
|
||||||
|
|
||||||
|
There are no guarantees regarding usage by multiple green threads in a
|
||||||
|
single process here. This lock works only between processes. Exclusive
|
||||||
|
access between local threads should be achieved using the semaphores
|
||||||
|
in the @synchronized decorator.
|
||||||
|
|
||||||
|
Note these locks are released when the descriptor is closed, so it's not
|
||||||
|
safe to close the file descriptor while another green thread holds the
|
||||||
|
lock. Just opening and closing the lock file can break synchronisation,
|
||||||
|
so lock files must be accessed only using this abstraction.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name):
|
||||||
|
self.lockfile = None
|
||||||
|
self.fname = name
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.lockfile = open(self.fname, 'w')
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Using non-blocking locks since green threads are not
|
||||||
|
# patched to deal with blocking locking calls.
|
||||||
|
# Also upon reading the MSDN docs for locking(), it seems
|
||||||
|
# to have a laughable 10 attempts "blocking" mechanism.
|
||||||
|
self.trylock()
|
||||||
|
return self
|
||||||
|
except IOError, e:
|
||||||
|
if e.errno in (errno.EACCES, errno.EAGAIN):
|
||||||
|
# external locks synchronise things like iptables
|
||||||
|
# updates - give it some time to prevent busy spinning
|
||||||
|
time.sleep(0.01)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
|
try:
|
||||||
|
self.unlock()
|
||||||
|
self.lockfile.close()
|
||||||
|
except IOError:
|
||||||
|
LOG.exception(_("Could not release the acquired lock `%s`"),
|
||||||
|
self.fname)
|
||||||
|
|
||||||
|
def trylock(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def unlock(self):
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
|
class _WindowsLock(_InterProcessLock):
|
||||||
|
def trylock(self):
|
||||||
|
msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
|
||||||
|
|
||||||
|
def unlock(self):
|
||||||
|
msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
|
||||||
|
|
||||||
|
|
||||||
|
class _PosixLock(_InterProcessLock):
|
||||||
|
def trylock(self):
|
||||||
|
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||||
|
|
||||||
|
def unlock(self):
|
||||||
|
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
||||||
|
|
||||||
|
|
||||||
|
if os.name == 'nt':
|
||||||
|
import msvcrt
|
||||||
|
InterProcessLock = _WindowsLock
|
||||||
|
else:
|
||||||
|
import fcntl
|
||||||
|
InterProcessLock = _PosixLock
|
||||||
|
|
||||||
|
_semaphores = weakref.WeakValueDictionary()
|
||||||
|
|
||||||
|
|
||||||
|
def synchronized(name, lock_file_prefix, external=False, lock_path=None):
|
||||||
|
"""Synchronization decorator.
|
||||||
|
|
||||||
|
Decorating a method like so::
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def foo(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
ensures that only one thread will execute the bar method at a time.
|
||||||
|
|
||||||
|
Different methods can share the same lock::
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def foo(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
@synchronized('mylock')
|
||||||
|
def bar(self, *args):
|
||||||
|
...
|
||||||
|
|
||||||
|
This way only one of either foo or bar can be executing at a time.
|
||||||
|
|
||||||
|
The lock_file_prefix argument is used to provide lock files on disk with a
|
||||||
|
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
|
||||||
|
|
||||||
|
The external keyword argument denotes whether this lock should work across
|
||||||
|
multiple processes. This means that if two different workers both run a
|
||||||
|
a method decorated with @synchronized('mylock', external=True), only one
|
||||||
|
of them will execute at a time.
|
||||||
|
|
||||||
|
The lock_path keyword argument is used to specify a special location for
|
||||||
|
external lock files to live. If nothing is set, then CONF.lock_path is
|
||||||
|
used as a default.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def wrap(f):
|
||||||
|
@functools.wraps(f)
|
||||||
|
def inner(*args, **kwargs):
|
||||||
|
# NOTE(soren): If we ever go natively threaded, this will be racy.
|
||||||
|
# See http://stackoverflow.com/questions/5390569/dyn
|
||||||
|
# amically-allocating-and-destroying-mutexes
|
||||||
|
sem = _semaphores.get(name, semaphore.Semaphore())
|
||||||
|
if name not in _semaphores:
|
||||||
|
# this check is not racy - we're already holding ref locally
|
||||||
|
# so GC won't remove the item and there was no IO switch
|
||||||
|
# (only valid in greenthreads)
|
||||||
|
_semaphores[name] = sem
|
||||||
|
|
||||||
|
with sem:
|
||||||
|
LOG.debug(_('Got semaphore "%(lock)s" for method '
|
||||||
|
'"%(method)s"...'), {'lock': name,
|
||||||
|
'method': f.__name__})
|
||||||
|
if external and not CONF.disable_process_locking:
|
||||||
|
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
|
||||||
|
'method "%(method)s"...'),
|
||||||
|
{'lock': name, 'method': f.__name__})
|
||||||
|
cleanup_dir = False
|
||||||
|
|
||||||
|
# We need a copy of lock_path because it is non-local
|
||||||
|
local_lock_path = lock_path
|
||||||
|
if not local_lock_path:
|
||||||
|
local_lock_path = CONF.lock_path
|
||||||
|
|
||||||
|
if not local_lock_path:
|
||||||
|
cleanup_dir = True
|
||||||
|
local_lock_path = tempfile.mkdtemp()
|
||||||
|
|
||||||
|
if not os.path.exists(local_lock_path):
|
||||||
|
cleanup_dir = True
|
||||||
|
fileutils.ensure_tree(local_lock_path)
|
||||||
|
|
||||||
|
# NOTE(mikal): the lock name cannot contain directory
|
||||||
|
# separators
|
||||||
|
safe_name = name.replace(os.sep, '_')
|
||||||
|
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
|
||||||
|
lock_file_path = os.path.join(local_lock_path,
|
||||||
|
lock_file_name)
|
||||||
|
|
||||||
|
try:
|
||||||
|
lock = InterProcessLock(lock_file_path)
|
||||||
|
with lock:
|
||||||
|
LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
|
||||||
|
'for method "%(method)s"...'),
|
||||||
|
{'lock': name,
|
||||||
|
'path': lock_file_path,
|
||||||
|
'method': f.__name__})
|
||||||
|
retval = f(*args, **kwargs)
|
||||||
|
finally:
|
||||||
|
# NOTE(vish): This removes the tempdir if we needed
|
||||||
|
# to create one. This is used to cleanup
|
||||||
|
# the locks left behind by unit tests.
|
||||||
|
if cleanup_dir:
|
||||||
|
shutil.rmtree(local_lock_path)
|
||||||
|
else:
|
||||||
|
retval = f(*args, **kwargs)
|
||||||
|
|
||||||
|
return retval
|
||||||
|
return inner
|
||||||
|
return wrap
|
||||||
@@ -18,6 +18,7 @@
|
|||||||
import os
|
import os
|
||||||
|
|
||||||
from nova import flags
|
from nova import flags
|
||||||
|
from nova.openstack.common import fileutils
|
||||||
from nova import test
|
from nova import test
|
||||||
from nova.tests import fake_libvirt_utils
|
from nova.tests import fake_libvirt_utils
|
||||||
from nova.virt.libvirt import imagebackend
|
from nova.virt.libvirt import imagebackend
|
||||||
@@ -56,8 +57,8 @@ class _ImageTestCase(test.TestCase):
|
|||||||
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
||||||
fn = self.mox.CreateMockAnything()
|
fn = self.mox.CreateMockAnything()
|
||||||
fn(target=self.TEMPLATE_PATH)
|
fn(target=self.TEMPLATE_PATH)
|
||||||
self.mox.StubOutWithMock(imagebackend.utils, 'ensure_tree')
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
||||||
imagebackend.utils.ensure_tree(self.TEMPLATE_DIR)
|
imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
image = self.image_class(self.INSTANCE, self.NAME)
|
image = self.image_class(self.INSTANCE, self.NAME)
|
||||||
@@ -83,7 +84,7 @@ class _ImageTestCase(test.TestCase):
|
|||||||
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
||||||
fn = self.mox.CreateMockAnything()
|
fn = self.mox.CreateMockAnything()
|
||||||
fn(target=self.TEMPLATE_PATH)
|
fn(target=self.TEMPLATE_PATH)
|
||||||
self.mox.StubOutWithMock(imagebackend.utils, 'ensure_tree')
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
image = self.image_class(self.INSTANCE, self.NAME)
|
image = self.image_class(self.INSTANCE, self.NAME)
|
||||||
@@ -117,7 +118,8 @@ class RawTestCase(_ImageTestCase):
|
|||||||
|
|
||||||
def prepare_mocks(self):
|
def prepare_mocks(self):
|
||||||
fn = self.mox.CreateMockAnything()
|
fn = self.mox.CreateMockAnything()
|
||||||
self.mox.StubOutWithMock(imagebackend.utils.synchronized, '__call__')
|
self.mox.StubOutWithMock(imagebackend.lockutils.synchronized,
|
||||||
|
'__call__')
|
||||||
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
|
||||||
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
|
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
|
||||||
return fn
|
return fn
|
||||||
@@ -167,7 +169,8 @@ class Qcow2TestCase(_ImageTestCase):
|
|||||||
|
|
||||||
def prepare_mocks(self):
|
def prepare_mocks(self):
|
||||||
fn = self.mox.CreateMockAnything()
|
fn = self.mox.CreateMockAnything()
|
||||||
self.mox.StubOutWithMock(imagebackend.utils.synchronized, '__call__')
|
self.mox.StubOutWithMock(imagebackend.lockutils.synchronized,
|
||||||
|
'__call__')
|
||||||
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
|
||||||
'create_cow_image')
|
'create_cow_image')
|
||||||
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ from nova import context
|
|||||||
from nova import db
|
from nova import db
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova import flags
|
from nova import flags
|
||||||
|
from nova.openstack.common import fileutils
|
||||||
from nova.openstack.common import importutils
|
from nova.openstack.common import importutils
|
||||||
from nova.openstack.common import jsonutils
|
from nova.openstack.common import jsonutils
|
||||||
from nova.openstack.common import log as logging
|
from nova.openstack.common import log as logging
|
||||||
@@ -448,7 +449,7 @@ class CacheConcurrencyTestCase(test.TestCase):
|
|||||||
# use for tests. So, create the path here so utils.synchronized()
|
# use for tests. So, create the path here so utils.synchronized()
|
||||||
# won't delete it out from under one of the threads.
|
# won't delete it out from under one of the threads.
|
||||||
self.lock_path = os.path.join(FLAGS.instances_path, 'locks')
|
self.lock_path = os.path.join(FLAGS.instances_path, 'locks')
|
||||||
utils.ensure_tree(self.lock_path)
|
fileutils.ensure_tree(self.lock_path)
|
||||||
|
|
||||||
def fake_exists(fname):
|
def fake_exists(fname):
|
||||||
basedir = os.path.join(FLAGS.instances_path, FLAGS.base_dir_name)
|
basedir = os.path.join(FLAGS.instances_path, FLAGS.base_dir_name)
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ from eventlet import greenthread
|
|||||||
|
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova import test
|
from nova import test
|
||||||
from nova import utils
|
|
||||||
|
|
||||||
|
|
||||||
class ExceptionTestCase(test.TestCase):
|
class ExceptionTestCase(test.TestCase):
|
||||||
@@ -63,96 +62,3 @@ class ProjectTestCase(test.TestCase):
|
|||||||
helpful_msg = (_("The following migrations are missing a downgrade:"
|
helpful_msg = (_("The following migrations are missing a downgrade:"
|
||||||
"\n\t%s") % '\n\t'.join(sorted(missing_downgrade)))
|
"\n\t%s") % '\n\t'.join(sorted(missing_downgrade)))
|
||||||
self.assert_(not missing_downgrade, helpful_msg)
|
self.assert_(not missing_downgrade, helpful_msg)
|
||||||
|
|
||||||
|
|
||||||
class LockTestCase(test.TestCase):
|
|
||||||
def test_synchronized_wrapped_function_metadata(self):
|
|
||||||
@utils.synchronized('whatever')
|
|
||||||
def foo():
|
|
||||||
"""Bar"""
|
|
||||||
pass
|
|
||||||
self.assertEquals(foo.__doc__, 'Bar', "Wrapped function's docstring "
|
|
||||||
"got lost")
|
|
||||||
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
|
|
||||||
"got mangled")
|
|
||||||
|
|
||||||
def test_synchronized_internally(self):
|
|
||||||
"""We can lock across multiple green threads"""
|
|
||||||
saved_sem_num = len(utils._semaphores)
|
|
||||||
seen_threads = list()
|
|
||||||
|
|
||||||
@utils.synchronized('testlock2', external=False)
|
|
||||||
def f(id):
|
|
||||||
for x in range(10):
|
|
||||||
seen_threads.append(id)
|
|
||||||
greenthread.sleep(0)
|
|
||||||
|
|
||||||
threads = []
|
|
||||||
pool = greenpool.GreenPool(10)
|
|
||||||
for i in range(10):
|
|
||||||
threads.append(pool.spawn(f, i))
|
|
||||||
|
|
||||||
for thread in threads:
|
|
||||||
thread.wait()
|
|
||||||
|
|
||||||
self.assertEquals(len(seen_threads), 100)
|
|
||||||
# Looking at the seen threads, split it into chunks of 10, and verify
|
|
||||||
# that the last 9 match the first in each chunk.
|
|
||||||
for i in range(10):
|
|
||||||
for j in range(9):
|
|
||||||
self.assertEquals(seen_threads[i * 10],
|
|
||||||
seen_threads[i * 10 + 1 + j])
|
|
||||||
|
|
||||||
self.assertEqual(saved_sem_num, len(utils._semaphores),
|
|
||||||
"Semaphore leak detected")
|
|
||||||
|
|
||||||
def test_nested_external_works(self):
|
|
||||||
"""We can nest external syncs"""
|
|
||||||
with utils.tempdir() as tempdir:
|
|
||||||
self.flags(lock_path=tempdir)
|
|
||||||
sentinel = object()
|
|
||||||
|
|
||||||
@utils.synchronized('testlock1', external=True)
|
|
||||||
def outer_lock():
|
|
||||||
|
|
||||||
@utils.synchronized('testlock2', external=True)
|
|
||||||
def inner_lock():
|
|
||||||
return sentinel
|
|
||||||
return inner_lock()
|
|
||||||
|
|
||||||
self.assertEqual(sentinel, outer_lock())
|
|
||||||
|
|
||||||
def test_synchronized_externally(self):
|
|
||||||
"""We can lock across multiple processes"""
|
|
||||||
with utils.tempdir() as tempdir:
|
|
||||||
self.flags(lock_path=tempdir)
|
|
||||||
rpipe1, wpipe1 = os.pipe()
|
|
||||||
rpipe2, wpipe2 = os.pipe()
|
|
||||||
|
|
||||||
@utils.synchronized('testlock1', external=True)
|
|
||||||
def f(rpipe, wpipe):
|
|
||||||
try:
|
|
||||||
os.write(wpipe, "foo")
|
|
||||||
except OSError, e:
|
|
||||||
self.assertEquals(e.errno, errno.EPIPE)
|
|
||||||
return
|
|
||||||
|
|
||||||
rfds, _wfds, _efds = select.select([rpipe], [], [], 1)
|
|
||||||
self.assertEquals(len(rfds), 0, "The other process, which was"
|
|
||||||
" supposed to be locked, "
|
|
||||||
"wrote on its end of the "
|
|
||||||
"pipe")
|
|
||||||
os.close(rpipe)
|
|
||||||
|
|
||||||
pid = os.fork()
|
|
||||||
if pid > 0:
|
|
||||||
os.close(wpipe1)
|
|
||||||
os.close(rpipe2)
|
|
||||||
|
|
||||||
f(rpipe1, wpipe2)
|
|
||||||
else:
|
|
||||||
os.close(rpipe1)
|
|
||||||
os.close(wpipe2)
|
|
||||||
|
|
||||||
f(rpipe2, wpipe1)
|
|
||||||
os._exit(0)
|
|
||||||
|
|||||||
192
nova/utils.py
192
nova/utils.py
@@ -574,183 +574,6 @@ def utf8(value):
|
|||||||
return value
|
return value
|
||||||
|
|
||||||
|
|
||||||
class _InterProcessLock(object):
|
|
||||||
"""Lock implementation which allows multiple locks, working around
|
|
||||||
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
|
||||||
not require any cleanup. Since the lock is always held on a file
|
|
||||||
descriptor rather than outside of the process, the lock gets dropped
|
|
||||||
automatically if the process crashes, even if __exit__ is not executed.
|
|
||||||
|
|
||||||
There are no guarantees regarding usage by multiple green threads in a
|
|
||||||
single process here. This lock works only between processes. Exclusive
|
|
||||||
access between local threads should be achieved using the semaphores
|
|
||||||
in the @synchronized decorator.
|
|
||||||
|
|
||||||
Note these locks are released when the descriptor is closed, so it's not
|
|
||||||
safe to close the file descriptor while another green thread holds the
|
|
||||||
lock. Just opening and closing the lock file can break synchronisation,
|
|
||||||
so lock files must be accessed only using this abstraction.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, name):
|
|
||||||
self.lockfile = None
|
|
||||||
self.fname = name
|
|
||||||
|
|
||||||
def __enter__(self):
|
|
||||||
self.lockfile = open(self.fname, 'w')
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Using non-blocking locks since green threads are not
|
|
||||||
# patched to deal with blocking locking calls.
|
|
||||||
# Also upon reading the MSDN docs for locking(), it seems
|
|
||||||
# to have a laughable 10 attempts "blocking" mechanism.
|
|
||||||
self.trylock()
|
|
||||||
return self
|
|
||||||
except IOError, e:
|
|
||||||
if e.errno in (errno.EACCES, errno.EAGAIN):
|
|
||||||
# external locks synchronise things like iptables
|
|
||||||
# updates - give it some time to prevent busy spinning
|
|
||||||
time.sleep(0.01)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
||||||
try:
|
|
||||||
self.unlock()
|
|
||||||
self.lockfile.close()
|
|
||||||
except IOError:
|
|
||||||
LOG.exception(_("Could not release the acquired lock `%s`")
|
|
||||||
% self.fname)
|
|
||||||
|
|
||||||
def trylock(self):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def unlock(self):
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
|
|
||||||
class _WindowsLock(_InterProcessLock):
|
|
||||||
def trylock(self):
|
|
||||||
msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
|
|
||||||
|
|
||||||
def unlock(self):
|
|
||||||
msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
|
|
||||||
|
|
||||||
|
|
||||||
class _PosixLock(_InterProcessLock):
|
|
||||||
def trylock(self):
|
|
||||||
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
||||||
|
|
||||||
def unlock(self):
|
|
||||||
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
|
||||||
|
|
||||||
|
|
||||||
if os.name == 'nt':
|
|
||||||
import msvcrt
|
|
||||||
InterProcessLock = _WindowsLock
|
|
||||||
else:
|
|
||||||
import fcntl
|
|
||||||
InterProcessLock = _PosixLock
|
|
||||||
|
|
||||||
_semaphores = weakref.WeakValueDictionary()
|
|
||||||
|
|
||||||
|
|
||||||
def synchronized(name, external=False, lock_path=None):
|
|
||||||
"""Synchronization decorator.
|
|
||||||
|
|
||||||
Decorating a method like so::
|
|
||||||
|
|
||||||
@synchronized('mylock')
|
|
||||||
def foo(self, *args):
|
|
||||||
...
|
|
||||||
|
|
||||||
ensures that only one thread will execute the bar method at a time.
|
|
||||||
|
|
||||||
Different methods can share the same lock::
|
|
||||||
|
|
||||||
@synchronized('mylock')
|
|
||||||
def foo(self, *args):
|
|
||||||
...
|
|
||||||
|
|
||||||
@synchronized('mylock')
|
|
||||||
def bar(self, *args):
|
|
||||||
...
|
|
||||||
|
|
||||||
This way only one of either foo or bar can be executing at a time.
|
|
||||||
|
|
||||||
The external keyword argument denotes whether this lock should work across
|
|
||||||
multiple processes. This means that if two different workers both run a
|
|
||||||
a method decorated with @synchronized('mylock', external=True), only one
|
|
||||||
of them will execute at a time.
|
|
||||||
|
|
||||||
The lock_path keyword argument is used to specify a special location for
|
|
||||||
external lock files to live. If nothing is set, then FLAGS.lock_path is
|
|
||||||
used as a default.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def wrap(f):
|
|
||||||
@functools.wraps(f)
|
|
||||||
def inner(*args, **kwargs):
|
|
||||||
# NOTE(soren): If we ever go natively threaded, this will be racy.
|
|
||||||
# See http://stackoverflow.com/questions/5390569/dyn
|
|
||||||
# amically-allocating-and-destroying-mutexes
|
|
||||||
sem = _semaphores.get(name, semaphore.Semaphore())
|
|
||||||
if name not in _semaphores:
|
|
||||||
# this check is not racy - we're already holding ref locally
|
|
||||||
# so GC won't remove the item and there was no IO switch
|
|
||||||
# (only valid in greenthreads)
|
|
||||||
_semaphores[name] = sem
|
|
||||||
|
|
||||||
with sem:
|
|
||||||
LOG.debug(_('Got semaphore "%(lock)s" for method '
|
|
||||||
'"%(method)s"...'), {'lock': name,
|
|
||||||
'method': f.__name__})
|
|
||||||
if external and not FLAGS.disable_process_locking:
|
|
||||||
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
|
|
||||||
'method "%(method)s"...'),
|
|
||||||
{'lock': name, 'method': f.__name__})
|
|
||||||
cleanup_dir = False
|
|
||||||
|
|
||||||
# We need a copy of lock_path because it is non-local
|
|
||||||
local_lock_path = lock_path
|
|
||||||
if not local_lock_path:
|
|
||||||
local_lock_path = FLAGS.lock_path
|
|
||||||
|
|
||||||
if not local_lock_path:
|
|
||||||
cleanup_dir = True
|
|
||||||
local_lock_path = tempfile.mkdtemp()
|
|
||||||
|
|
||||||
if not os.path.exists(local_lock_path):
|
|
||||||
cleanup_dir = True
|
|
||||||
ensure_tree(local_lock_path)
|
|
||||||
|
|
||||||
# NOTE(mikal): the lock name cannot contain directory
|
|
||||||
# separators
|
|
||||||
safe_name = name.replace(os.sep, '_')
|
|
||||||
lock_file_path = os.path.join(local_lock_path,
|
|
||||||
'nova-%s' % safe_name)
|
|
||||||
try:
|
|
||||||
lock = InterProcessLock(lock_file_path)
|
|
||||||
with lock:
|
|
||||||
LOG.debug(_('Got file lock "%(lock)s" for '
|
|
||||||
'method "%(method)s"...'),
|
|
||||||
{'lock': name, 'method': f.__name__})
|
|
||||||
retval = f(*args, **kwargs)
|
|
||||||
finally:
|
|
||||||
# NOTE(vish): This removes the tempdir if we needed
|
|
||||||
# to create one. This is used to cleanup
|
|
||||||
# the locks left behind by unit tests.
|
|
||||||
if cleanup_dir:
|
|
||||||
shutil.rmtree(local_lock_path)
|
|
||||||
else:
|
|
||||||
retval = f(*args, **kwargs)
|
|
||||||
|
|
||||||
return retval
|
|
||||||
return inner
|
|
||||||
return wrap
|
|
||||||
|
|
||||||
|
|
||||||
def delete_if_exists(pathname):
|
def delete_if_exists(pathname):
|
||||||
"""delete a file, but ignore file not found error"""
|
"""delete a file, but ignore file not found error"""
|
||||||
|
|
||||||
@@ -1302,21 +1125,6 @@ class UndoManager(object):
|
|||||||
self._rollback()
|
self._rollback()
|
||||||
|
|
||||||
|
|
||||||
def ensure_tree(path):
|
|
||||||
"""Create a directory (and any ancestor directories required)
|
|
||||||
|
|
||||||
:param path: Directory to create
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
os.makedirs(path)
|
|
||||||
except OSError as exc:
|
|
||||||
if exc.errno == errno.EEXIST:
|
|
||||||
if not os.path.isdir(path):
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
def mkfs(fs, path, label=None):
|
def mkfs(fs, path, label=None):
|
||||||
"""Format a file or block device
|
"""Format a file or block device
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
[DEFAULT]
|
[DEFAULT]
|
||||||
|
|
||||||
# The list of modules to copy from openstack-common
|
# The list of modules to copy from openstack-common
|
||||||
modules=cfg,context,excutils,gettextutils,importutils,iniparser,jsonutils,local,log,network_utils,notifier,plugin,policy,setup,timeutils,rpc
|
modules=cfg,context,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,network_utils,notifier,plugin,policy,setup,timeutils,rpc
|
||||||
|
|
||||||
# The base module to hold the copy of openstack.common
|
# The base module to hold the copy of openstack.common
|
||||||
base=nova
|
base=nova
|
||||||
|
|||||||
Reference in New Issue
Block a user