Merge "Improve external lock implementation"
This commit is contained in:
commit
d752c49ce0
@ -395,7 +395,6 @@ class Service(object):
|
||||
vcs_string = version.version_string_with_vcs()
|
||||
LOG.audit(_('Starting %(topic)s node (version %(vcs_string)s)'),
|
||||
{'topic': self.topic, 'vcs_string': vcs_string})
|
||||
utils.cleanup_file_locks()
|
||||
self.manager.init_host()
|
||||
self.model_disconnected = False
|
||||
ctxt = context.get_admin_context()
|
||||
@ -616,7 +615,6 @@ class WSGIService(object):
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
utils.cleanup_file_locks()
|
||||
if self.manager:
|
||||
self.manager.init_host()
|
||||
self.server.start()
|
||||
|
@ -21,7 +21,6 @@ import select
|
||||
|
||||
from eventlet import greenpool
|
||||
from eventlet import greenthread
|
||||
import lockfile
|
||||
|
||||
from nova import exception
|
||||
from nova import test
|
||||
@ -107,20 +106,19 @@ class LockTestCase(test.TestCase):
|
||||
self.assertEqual(saved_sem_num, len(utils._semaphores),
|
||||
"Semaphore leak detected")
|
||||
|
||||
def test_nested_external_fails(self):
|
||||
"""We can not nest external syncs"""
|
||||
def test_nested_external_works(self):
|
||||
"""We can nest external syncs"""
|
||||
sentinel = object()
|
||||
|
||||
@utils.synchronized('testlock1', external=True)
|
||||
def outer_lock():
|
||||
|
||||
@utils.synchronized('testlock2', external=True)
|
||||
def inner_lock():
|
||||
pass
|
||||
inner_lock()
|
||||
try:
|
||||
self.assertRaises(lockfile.NotMyLock, outer_lock)
|
||||
finally:
|
||||
utils.cleanup_file_locks()
|
||||
return sentinel
|
||||
return inner_lock()
|
||||
|
||||
self.assertEqual(sentinel, outer_lock())
|
||||
|
||||
def test_synchronized_externally(self):
|
||||
"""We can lock across multiple processes"""
|
||||
|
@ -533,29 +533,22 @@ class MonkeyPatchTestCase(test.TestCase):
|
||||
in nova.tests.monkey_patch_example.CALLED_FUNCTION)
|
||||
|
||||
|
||||
class TestGreenLocks(test.TestCase):
|
||||
class TestFileLocks(test.TestCase):
|
||||
def test_concurrent_green_lock_succeeds(self):
|
||||
"""Verify spawn_n greenthreads with two locks run concurrently.
|
||||
|
||||
This succeeds with spawn but fails with spawn_n because lockfile
|
||||
gets the same thread id for both spawn_n threads. Our workaround
|
||||
of using the GreenLockFile will work even if the issue is fixed.
|
||||
"""
|
||||
"""Verify spawn_n greenthreads with two locks run concurrently."""
|
||||
self.completed = False
|
||||
with utils.tempdir() as tmpdir:
|
||||
|
||||
def locka(wait):
|
||||
a = utils.GreenLockFile(os.path.join(tmpdir, 'a'))
|
||||
a.acquire()
|
||||
wait.wait()
|
||||
a.release()
|
||||
a = utils.InterProcessLock(os.path.join(tmpdir, 'a'))
|
||||
with a:
|
||||
wait.wait()
|
||||
self.completed = True
|
||||
|
||||
def lockb(wait):
|
||||
b = utils.GreenLockFile(os.path.join(tmpdir, 'b'))
|
||||
b.acquire()
|
||||
wait.wait()
|
||||
b.release()
|
||||
b = utils.InterProcessLock(os.path.join(tmpdir, 'b'))
|
||||
with b:
|
||||
wait.wait()
|
||||
|
||||
wait1 = eventlet.event.Event()
|
||||
wait2 = eventlet.event.Event()
|
||||
@ -569,159 +562,6 @@ class TestGreenLocks(test.TestCase):
|
||||
self.assertTrue(self.completed)
|
||||
|
||||
|
||||
class TestLockCleanup(test.TestCase):
|
||||
"""unit tests for utils.cleanup_file_locks()"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestLockCleanup, self).setUp()
|
||||
|
||||
self.pid = os.getpid()
|
||||
self.dead_pid = self._get_dead_pid()
|
||||
self.tempdir = tempfile.mkdtemp()
|
||||
self.flags(lock_path=self.tempdir)
|
||||
self.lock_name = 'nova-testlock'
|
||||
self.lock_file = os.path.join(FLAGS.lock_path,
|
||||
self.lock_name + '.lock')
|
||||
self.hostname = socket.gethostname()
|
||||
print self.pid, self.dead_pid
|
||||
try:
|
||||
os.unlink(self.lock_file)
|
||||
except OSError as (errno, strerror):
|
||||
if errno == 2:
|
||||
pass
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir)
|
||||
super(TestLockCleanup, self).tearDown()
|
||||
|
||||
def _get_dead_pid(self):
|
||||
"""get a pid for a process that does not exist"""
|
||||
|
||||
candidate_pid = self.pid - 1
|
||||
while os.path.exists(os.path.join('/proc', str(candidate_pid))):
|
||||
candidate_pid -= 1
|
||||
if candidate_pid == 1:
|
||||
return 0
|
||||
return candidate_pid
|
||||
|
||||
def _get_sentinel_name(self, hostname, pid, thread='MainThread'):
|
||||
return os.path.join(FLAGS.lock_path,
|
||||
'%s-%s.%d' % (hostname, thread, pid))
|
||||
|
||||
def _create_sentinel(self, hostname, pid, thread='MainThread'):
|
||||
name = self._get_sentinel_name(hostname, pid, thread)
|
||||
open(name, 'wb').close()
|
||||
return name
|
||||
|
||||
def test_clean_stale_locks(self):
|
||||
"""verify locks for dead processes are cleaned up"""
|
||||
|
||||
# create sentinels for two processes, us and a 'dead' one
|
||||
# no active lock
|
||||
sentinel1 = self._create_sentinel(self.hostname, self.pid)
|
||||
sentinel2 = self._create_sentinel(self.hostname, self.dead_pid)
|
||||
|
||||
utils.cleanup_file_locks()
|
||||
|
||||
self.assertTrue(os.path.exists(sentinel1))
|
||||
self.assertFalse(os.path.exists(self.lock_file))
|
||||
self.assertFalse(os.path.exists(sentinel2))
|
||||
|
||||
os.unlink(sentinel1)
|
||||
|
||||
def test_clean_stale_locks_active(self):
|
||||
"""verify locks for dead processes are cleaned with an active lock """
|
||||
|
||||
# create sentinels for two processes, us and a 'dead' one
|
||||
# create an active lock for us
|
||||
sentinel1 = self._create_sentinel(self.hostname, self.pid)
|
||||
sentinel2 = self._create_sentinel(self.hostname, self.dead_pid)
|
||||
os.link(sentinel1, self.lock_file)
|
||||
|
||||
utils.cleanup_file_locks()
|
||||
|
||||
self.assertTrue(os.path.exists(sentinel1))
|
||||
self.assertTrue(os.path.exists(self.lock_file))
|
||||
self.assertFalse(os.path.exists(sentinel2))
|
||||
|
||||
os.unlink(sentinel1)
|
||||
os.unlink(self.lock_file)
|
||||
|
||||
def test_clean_stale_with_threads(self):
|
||||
"""verify locks for multiple threads are cleaned up """
|
||||
|
||||
# create sentinels for four threads in our process, and a 'dead'
|
||||
# process. no lock.
|
||||
sentinel1 = self._create_sentinel(self.hostname, self.pid, 'Default-1')
|
||||
sentinel2 = self._create_sentinel(self.hostname, self.pid, 'Default-2')
|
||||
sentinel3 = self._create_sentinel(self.hostname, self.pid, 'Default-3')
|
||||
sentinel4 = self._create_sentinel(self.hostname, self.pid, 'Default-4')
|
||||
sentinel5 = self._create_sentinel(self.hostname, self.dead_pid,
|
||||
'Default-1')
|
||||
|
||||
utils.cleanup_file_locks()
|
||||
|
||||
self.assertTrue(os.path.exists(sentinel1))
|
||||
self.assertTrue(os.path.exists(sentinel2))
|
||||
self.assertTrue(os.path.exists(sentinel3))
|
||||
self.assertTrue(os.path.exists(sentinel4))
|
||||
self.assertFalse(os.path.exists(self.lock_file))
|
||||
self.assertFalse(os.path.exists(sentinel5))
|
||||
|
||||
os.unlink(sentinel1)
|
||||
os.unlink(sentinel2)
|
||||
os.unlink(sentinel3)
|
||||
os.unlink(sentinel4)
|
||||
|
||||
def test_clean_stale_with_threads_active(self):
|
||||
"""verify locks for multiple threads are cleaned up """
|
||||
|
||||
# create sentinels for four threads in our process, and a 'dead'
|
||||
# process
|
||||
sentinel1 = self._create_sentinel(self.hostname, self.pid, 'Default-1')
|
||||
sentinel2 = self._create_sentinel(self.hostname, self.pid, 'Default-2')
|
||||
sentinel3 = self._create_sentinel(self.hostname, self.pid, 'Default-3')
|
||||
sentinel4 = self._create_sentinel(self.hostname, self.pid, 'Default-4')
|
||||
sentinel5 = self._create_sentinel(self.hostname, self.dead_pid,
|
||||
'Default-1')
|
||||
|
||||
os.link(sentinel1, self.lock_file)
|
||||
|
||||
utils.cleanup_file_locks()
|
||||
|
||||
self.assertTrue(os.path.exists(sentinel1))
|
||||
self.assertTrue(os.path.exists(sentinel2))
|
||||
self.assertTrue(os.path.exists(sentinel3))
|
||||
self.assertTrue(os.path.exists(sentinel4))
|
||||
self.assertTrue(os.path.exists(self.lock_file))
|
||||
self.assertFalse(os.path.exists(sentinel5))
|
||||
|
||||
os.unlink(sentinel1)
|
||||
os.unlink(sentinel2)
|
||||
os.unlink(sentinel3)
|
||||
os.unlink(sentinel4)
|
||||
os.unlink(self.lock_file)
|
||||
|
||||
def test_clean_bogus_lockfiles(self):
|
||||
"""verify lockfiles are cleaned """
|
||||
|
||||
lock1 = os.path.join(FLAGS.lock_path, 'nova-testlock1.lock')
|
||||
lock2 = os.path.join(FLAGS.lock_path, 'nova-testlock2.lock')
|
||||
lock3 = os.path.join(FLAGS.lock_path, 'testlock3.lock')
|
||||
|
||||
open(lock1, 'wb').close()
|
||||
open(lock2, 'wb').close()
|
||||
open(lock3, 'wb').close()
|
||||
|
||||
utils.cleanup_file_locks()
|
||||
|
||||
self.assertFalse(os.path.exists(lock1))
|
||||
self.assertFalse(os.path.exists(lock2))
|
||||
self.assertTrue(os.path.exists(lock3))
|
||||
|
||||
os.unlink(lock3)
|
||||
|
||||
|
||||
class AuditPeriodTest(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
157
nova/utils.py
157
nova/utils.py
@ -22,6 +22,7 @@
|
||||
import contextlib
|
||||
import datetime
|
||||
import errno
|
||||
import fcntl
|
||||
import functools
|
||||
import hashlib
|
||||
import inspect
|
||||
@ -45,7 +46,6 @@ from eventlet import event
|
||||
from eventlet.green import subprocess
|
||||
from eventlet import greenthread
|
||||
from eventlet import semaphore
|
||||
import lockfile
|
||||
import netaddr
|
||||
|
||||
from nova.common import deprecated
|
||||
@ -581,31 +581,52 @@ def utf8(value):
|
||||
return value
|
||||
|
||||
|
||||
class GreenLockFile(lockfile.FileLock):
|
||||
"""Implementation of lockfile that allows for a lock per greenthread.
|
||||
class InterProcessLock(object):
|
||||
"""Lock implementation which allows multiple locks, working around
|
||||
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
||||
not require any cleanup. Since lockf is always held on a file
|
||||
descriptor rather than outside of the process, the lock gets dropped
|
||||
automatically if the process crashes, even if __exit__ is not executed.
|
||||
|
||||
Simply implements lockfile:LockBase init with an addiontall suffix
|
||||
on the unique name of the greenthread identifier
|
||||
There are no guarantees regarding usage by multiple green threads in a
|
||||
single process here. This lock works only between processes. Exclusive
|
||||
access between local threads should be achieved using the semaphores
|
||||
in the @synchronized decorator.
|
||||
|
||||
This lock relies on fcntl's F_SETLK behaviour, which means that it is not
|
||||
safe to close the file descriptor while another green thread holds the
|
||||
lock. Just opening and closing the lock file can break synchronisation,
|
||||
so lock files must be accessed only using this abstraction.
|
||||
"""
|
||||
def __init__(self, path, threaded=True):
|
||||
self.path = path
|
||||
self.lock_file = os.path.abspath(path) + ".lock"
|
||||
self.hostname = socket.gethostname()
|
||||
self.pid = os.getpid()
|
||||
if threaded:
|
||||
t = threading.current_thread()
|
||||
# Thread objects in Python 2.4 and earlier do not have ident
|
||||
# attrs. Worm around that.
|
||||
ident = getattr(t, "ident", hash(t)) or hash(t)
|
||||
gident = corolocal.get_ident()
|
||||
self.tname = "-%x-%x" % (ident & 0xffffffff, gident & 0xffffffff)
|
||||
else:
|
||||
self.tname = ""
|
||||
dirname = os.path.dirname(self.lock_file)
|
||||
self.unique_name = os.path.join(dirname,
|
||||
"%s%s.%s" % (self.hostname,
|
||||
self.tname,
|
||||
self.pid))
|
||||
|
||||
def __init__(self, name):
|
||||
self.lockfile = None
|
||||
self.fname = name
|
||||
|
||||
def __enter__(self):
|
||||
self.lockfile = open(self.fname, 'w')
|
||||
|
||||
while True:
|
||||
try:
|
||||
# using non-blocking version since green threads are not
|
||||
# patched to deal with blocking fcntl calls
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
return self
|
||||
except IOError, e:
|
||||
if e.errno in (errno.EACCES, errno.EAGAIN):
|
||||
# external locks synchronise things like iptables
|
||||
# updates - give it some time to prevent busy spinning
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
raise
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
try:
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
||||
self.lockfile.close()
|
||||
except IOError:
|
||||
LOG.exception(_("Could not release the aquired lock `%s`")
|
||||
% self.fname)
|
||||
|
||||
|
||||
_semaphores = {}
|
||||
@ -638,20 +659,6 @@ def synchronized(name, external=False):
|
||||
multiple processes. This means that if two different workers both run a
|
||||
a method decorated with @synchronized('mylock', external=True), only one
|
||||
of them will execute at a time.
|
||||
|
||||
Important limitation: you can only have one external lock running per
|
||||
thread at a time. For example the following will fail:
|
||||
|
||||
@utils.synchronized('testlock1', external=True)
|
||||
def outer_lock():
|
||||
|
||||
@utils.synchronized('testlock2', external=True)
|
||||
def inner_lock():
|
||||
pass
|
||||
inner_lock()
|
||||
|
||||
outer_lock()
|
||||
|
||||
"""
|
||||
|
||||
def wrap(f):
|
||||
@ -676,7 +683,7 @@ def synchronized(name, external=False):
|
||||
{'lock': name, 'method': f.__name__})
|
||||
lock_file_path = os.path.join(FLAGS.lock_path,
|
||||
'nova-%s' % name)
|
||||
lock = GreenLockFile(lock_file_path)
|
||||
lock = InterProcessLock(lock_file_path)
|
||||
with lock:
|
||||
LOG.debug(_('Got file lock "%(lock)s" for '
|
||||
'method "%(method)s"...'),
|
||||
@ -695,78 +702,6 @@ def synchronized(name, external=False):
|
||||
return wrap
|
||||
|
||||
|
||||
def cleanup_file_locks():
|
||||
"""clean up stale locks left behind by process failures
|
||||
|
||||
The lockfile module, used by @synchronized, can leave stale lockfiles
|
||||
behind after process failure. These locks can cause process hangs
|
||||
at startup, when a process deadlocks on a lock which will never
|
||||
be unlocked.
|
||||
|
||||
Intended to be called at service startup.
|
||||
|
||||
"""
|
||||
|
||||
# NOTE(mikeyp) this routine incorporates some internal knowledge
|
||||
# from the lockfile module, and this logic really
|
||||
# should be part of that module.
|
||||
#
|
||||
# cleanup logic:
|
||||
# 1) look for the lockfile modules's 'sentinel' files, of the form
|
||||
# hostname.[thread-.*]-pid, extract the pid.
|
||||
# if pid doesn't match a running process, delete the file since
|
||||
# it's from a dead process.
|
||||
# 2) check for the actual lockfiles. if lockfile exists with linkcount
|
||||
# of 1, it's bogus, so delete it. A link count >= 2 indicates that
|
||||
# there are probably sentinels still linked to it from active
|
||||
# processes. This check isn't perfect, but there is no way to
|
||||
# reliably tell which sentinels refer to which lock in the
|
||||
# lockfile implementation.
|
||||
|
||||
if FLAGS.disable_process_locking:
|
||||
return
|
||||
|
||||
hostname = socket.gethostname()
|
||||
sentinel_re = hostname + r'-.*\.(\d+$)'
|
||||
lockfile_re = r'nova-.*\.lock'
|
||||
files = os.listdir(FLAGS.lock_path)
|
||||
|
||||
# cleanup sentinels
|
||||
for filename in files:
|
||||
match = re.match(sentinel_re, filename)
|
||||
if match is None:
|
||||
continue
|
||||
pid = match.group(1)
|
||||
LOG.debug(_('Found sentinel %(filename)s for pid %(pid)s'),
|
||||
{'filename': filename, 'pid': pid})
|
||||
try:
|
||||
os.kill(int(pid), 0)
|
||||
except OSError, e:
|
||||
# PID wasn't found
|
||||
delete_if_exists(os.path.join(FLAGS.lock_path, filename))
|
||||
LOG.debug(_('Cleaned sentinel %(filename)s for pid %(pid)s'),
|
||||
{'filename': filename, 'pid': pid})
|
||||
|
||||
# cleanup lock files
|
||||
for filename in files:
|
||||
match = re.match(lockfile_re, filename)
|
||||
if match is None:
|
||||
continue
|
||||
try:
|
||||
stat_info = os.stat(os.path.join(FLAGS.lock_path, filename))
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
LOG.debug(_('Found lockfile %(file)s with link count %(count)d'),
|
||||
{'file': filename, 'count': stat_info.st_nlink})
|
||||
if stat_info.st_nlink == 1:
|
||||
delete_if_exists(os.path.join(FLAGS.lock_path, filename))
|
||||
LOG.debug(_('Cleaned lockfile %(file)s with link count %(count)d'),
|
||||
{'file': filename, 'count': stat_info.st_nlink})
|
||||
|
||||
|
||||
def delete_if_exists(pathname):
|
||||
"""delete a file, but ignore file not found error"""
|
||||
|
||||
|
@ -95,6 +95,8 @@ function run_tests {
|
||||
cat run_tests.log
|
||||
fi
|
||||
fi
|
||||
# cleanup locks - not really needed, but stops pollution of the source tree
|
||||
rm -f nova-ensure_bridge nova-ensure_vlan nova-iptables nova-testlock1 nova-testlock2
|
||||
return $RESULT
|
||||
}
|
||||
|
||||
|
@ -1,63 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2012 La Honda Research Center, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""clean_file_locks.py - Cleans stale interprocess locks
|
||||
|
||||
This rountine can be used to find and delete stale lock files from
|
||||
nova's interprocess synchroization. It can be used safely while services
|
||||
are running.
|
||||
|
||||
"""
|
||||
|
||||
import logging
|
||||
import optparse
|
||||
|
||||
from nova import flags
|
||||
from nova.openstack.common import log
|
||||
from nova import utils
|
||||
|
||||
|
||||
LOG = log.getLogger('nova.utils')
|
||||
FLAGS = flags.FLAGS
|
||||
|
||||
|
||||
def parse_options():
|
||||
"""process command line options."""
|
||||
|
||||
parser = optparse.OptionParser('usage: %prog [options]')
|
||||
parser.add_option('--verbose', action='store_true',
|
||||
help='List lock files found and deleted')
|
||||
|
||||
options, args = parser.parse_args()
|
||||
|
||||
return options, args
|
||||
|
||||
|
||||
def main():
|
||||
"""Main loop."""
|
||||
options, args = parse_options()
|
||||
verbose = options.verbose
|
||||
|
||||
if verbose:
|
||||
LOG.logger.setLevel(logging.DEBUG)
|
||||
else:
|
||||
LOG.logger.setLevel(logging.INFO)
|
||||
LOG.info('Cleaning stale locks from %s' % FLAGS.lock_path)
|
||||
utils.cleanup_file_locks()
|
||||
LOG.info('Finished')
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -5,7 +5,6 @@ anyjson==0.2.4
|
||||
boto==2.1.1
|
||||
eventlet>=0.9.17
|
||||
kombu==1.0.4
|
||||
lockfile==0.8
|
||||
lxml==2.3
|
||||
python-daemon==1.5.5
|
||||
routes==1.12.3
|
||||
|
Loading…
Reference in New Issue
Block a user