Migrate to fileutils and lockutils.

Migrate nova to using openstack-common's file and lock utilities.
Resolves bug 1063230.

Change-Id: I1a4c87856bc08cd33b61d7098ed856baa4583654
This commit is contained in:
Michael Still 2012-10-23 14:25:25 -07:00
parent 86b91474d1
commit 0d4e6dbe6f
28 changed files with 364 additions and 395 deletions

View File

@ -12,7 +12,7 @@ view, each OpenStack service runs in a single thread.
The use of green threads reduces the likelihood of race conditions, but does
not completely eliminate them. In some cases, you may need to use the
``@utils.synchronized(...)`` decorator to avoid races.
``@lockutils.synchronized(...)`` decorator to avoid races.
In addition, since there is only one operating system thread, a call that
blocks that main thread will block the entire process.

View File

@ -25,6 +25,7 @@ from nova import db
from nova import exception
from nova import flags
from nova import network
from nova.openstack.common import fileutils
from nova.openstack.common import log as logging
from nova.openstack.common import timeutils
from nova import utils
@ -69,7 +70,7 @@ class CloudpipeController(object):
# NOTE(vish): One of the drawbacks of doing this in the api is
# the keys will only be on the api node that launched
# the cloudpipe.
utils.ensure_tree(FLAGS.keys_path)
fileutils.ensure_tree(FLAGS.keys_path)
def _get_all_cloudpipes(self, context):
"""Get all cloudpipes"""

View File

@ -33,6 +33,7 @@ from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common import log as logging
from nova import utils
@ -150,7 +151,7 @@ class CloudPipe(object):
key_name)
private_key = result['private_key']
key_dir = os.path.join(FLAGS.keys_path, context.user_id)
utils.ensure_tree(key_dir)
fileutils.ensure_tree(key_dir)
key_path = os.path.join(key_dir, '%s.pem' % key_name)
with open(key_path, 'w') as f:
f.write(private_key)

View File

@ -64,6 +64,7 @@ from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier
from nova.openstack.common import rpc
@ -844,7 +845,7 @@ class ComputeManager(manager.SchedulerDependentManager):
if injected_files is None:
injected_files = []
@utils.synchronized(instance['uuid'])
@lockutils.synchronized(instance['uuid'], 'nova-')
def do_run_instance():
self._run_instance(context, request_spec,
filter_properties, requested_networks, injected_files,
@ -954,7 +955,7 @@ class ComputeManager(manager.SchedulerDependentManager):
if not bdms:
bdms = self._get_instance_volume_bdms(context, instance["uuid"])
@utils.synchronized(instance['uuid'])
@lockutils.synchronized(instance['uuid'], 'nova-')
def do_terminate_instance(instance, bdms):
try:
self._delete_instance(context, instance, bdms)
@ -2027,7 +2028,7 @@ class ComputeManager(manager.SchedulerDependentManager):
@wrap_instance_fault
def reserve_block_device_name(self, context, instance, device, volume_id):
@utils.synchronized(instance['uuid'])
@lockutils.synchronized(instance['uuid'], 'nova-')
def do_reserve():
result = compute_utils.get_device_name_for_instance(context,
instance,

View File

@ -27,6 +27,7 @@ from nova import notifications
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova import utils
@ -121,7 +122,7 @@ class ResourceTracker(object):
claim = self.begin_resource_claim(context, instance_ref, limits)
return ResourceContextManager(context, claim, self)
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def begin_resource_claim(self, context, instance_ref, limits=None):
"""Indicate that some resources are needed for an upcoming compute
instance build operation.
@ -293,7 +294,7 @@ class ResourceTracker(object):
return can_claim_cpu
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def finish_resource_claim(self, claim):
"""Indicate that the compute operation that previously claimed the
resources identified by 'claim' has now completed and the resources
@ -308,7 +309,7 @@ class ResourceTracker(object):
if self.claims.pop(claim.claim_id, None):
LOG.debug(_("Finishing claim: %s") % claim)
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def abort_resource_claim(self, context, claim):
"""Indicate that the operation that claimed the resources identified by
'claim_id' has either failed or been aborted and the resources are no
@ -328,7 +329,7 @@ class ResourceTracker(object):
self._update_usage_from_instance(self.compute_node, claim.instance)
self._update(context, self.compute_node)
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def update_usage(self, context, instance):
"""Update the resource usage and stats after a change in an
instance
@ -347,7 +348,7 @@ class ResourceTracker(object):
def disabled(self):
return self.compute_node is None
@utils.synchronized(COMPUTE_RESOURCE_SEMAPHORE)
@lockutils.synchronized(COMPUTE_RESOURCE_SEMAPHORE, 'nova-')
def update_available_resource(self, context):
"""Override in-memory calculations of compute node resource usage based
on data audited from the hypervisor layer.

View File

@ -33,6 +33,7 @@ from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common import log as logging
from nova.openstack.common import timeutils
from nova import utils
@ -112,7 +113,7 @@ def ensure_ca_filesystem():
'genrootca.sh')
start = os.getcwd()
utils.ensure_tree(ca_dir)
fileutils.ensure_tree(ca_dir)
os.chdir(ca_dir)
utils.execute("sh", genrootca_sh_path)
os.chdir(start)
@ -301,7 +302,7 @@ def _sign_csr(csr_text, ca_folder):
start = os.getcwd()
# Change working dir to CA
utils.ensure_tree(ca_folder)
fileutils.ensure_tree(ca_folder)
os.chdir(ca_folder)
utils.execute('openssl', 'ca', '-batch', '-out', outbound, '-config',
'./openssl.cnf', '-infiles', inbound)

View File

@ -91,9 +91,6 @@ core_opts = [
cfg.StrOpt('state_path',
default='$pybasedir',
help="Top-level directory for maintaining nova's state"),
cfg.StrOpt('lock_path',
default='$pybasedir',
help='Directory to use for lock files'),
]
debug_opts = [

View File

@ -28,7 +28,9 @@ from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common import importutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova import utils
@ -344,7 +346,7 @@ class IptablesManager(object):
self._apply()
@utils.synchronized('iptables', external=True)
@lockutils.synchronized('iptables', 'nova-', external=True)
def _apply(self):
"""Apply the current in-memory set of iptables rules.
@ -791,7 +793,7 @@ def kill_dhcp(dev):
# NOTE(ja): Sending a HUP only reloads the hostfile, so any
# configuration options (like dchp-range, vlan, ...)
# aren't reloaded.
@utils.synchronized('dnsmasq_start')
@lockutils.synchronized('dnsmasq_start', 'nova-')
def restart_dhcp(context, dev, network_ref):
"""(Re)starts a dnsmasq server for a given network.
@ -858,7 +860,7 @@ def restart_dhcp(context, dev, network_ref):
_add_dnsmasq_accept_rules(dev)
@utils.synchronized('radvd_start')
@lockutils.synchronized('radvd_start', 'nova-')
def update_ra(context, dev, network_ref):
conffile = _ra_file(dev, 'conf')
conf_str = """
@ -957,7 +959,7 @@ def _device_exists(device):
def _dhcp_file(dev, kind):
"""Return path to a pid, leases or conf file for a bridge/device."""
utils.ensure_tree(FLAGS.networks_path)
fileutils.ensure_tree(FLAGS.networks_path)
return os.path.abspath('%s/nova-%s.%s' % (FLAGS.networks_path,
dev,
kind))
@ -965,7 +967,7 @@ def _dhcp_file(dev, kind):
def _ra_file(dev, kind):
"""Return path to a pid or conf file for a bridge/device."""
utils.ensure_tree(FLAGS.networks_path)
fileutils.ensure_tree(FLAGS.networks_path)
return os.path.abspath('%s/nova-ra-%s.%s' % (FLAGS.networks_path,
dev,
kind))
@ -1116,7 +1118,7 @@ class LinuxBridgeInterfaceDriver(LinuxNetInterfaceDriver):
return interface
@classmethod
@utils.synchronized('ensure_vlan', external=True)
@lockutils.synchronized('ensure_vlan', 'nova-', external=True)
def ensure_vlan(_self, vlan_num, bridge_interface, mac_address=None):
"""Create a vlan unless it already exists."""
interface = 'vlan%s' % vlan_num
@ -1141,7 +1143,7 @@ class LinuxBridgeInterfaceDriver(LinuxNetInterfaceDriver):
return interface
@classmethod
@utils.synchronized('ensure_bridge', external=True)
@lockutils.synchronized('ensure_bridge', 'nova-', external=True)
def ensure_bridge(_self, bridge, interface, net_attrs=None, gateway=True):
"""Create a bridge unless it already exists.

View File

@ -66,6 +66,7 @@ from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier
from nova.openstack.common import rpc
@ -854,7 +855,7 @@ class NetworkManager(manager.SchedulerDependentManager):
def _import_ipam_lib(self, ipam_lib):
self.ipam = importutils.import_module(ipam_lib).get_ipam_lib(self)
@utils.synchronized('get_dhcp')
@lockutils.synchronized('get_dhcp', 'nova-')
def _get_dhcp_ip(self, context, network_ref, host=None):
"""Get the proper dhcp address to listen on."""
# NOTE(vish): this is for compatibility

View File

@ -46,6 +46,7 @@ import webob
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova import utils
from nova import wsgi
@ -93,7 +94,7 @@ class S3Application(wsgi.Router):
mapper.connect('/{bucket_name}/',
controller=lambda *a, **kw: BucketHandler(self)(*a, **kw))
self.directory = os.path.abspath(root_directory)
utils.ensure_tree(self.directory)
fileutils.ensure_tree(self.directory)
self.bucket_depth = bucket_depth
super(S3Application, self).__init__(mapper)
@ -285,7 +286,7 @@ class BucketHandler(BaseRequestHandler):
os.path.exists(path)):
self.set_status(403)
return
utils.ensure_tree(path)
fileutils.ensure_tree(path)
self.finish()
def delete(self, bucket_name):
@ -334,7 +335,7 @@ class ObjectHandler(BaseRequestHandler):
self.set_status(403)
return
directory = os.path.dirname(path)
utils.ensure_tree(directory)
fileutils.ensure_tree(directory)
object_file = open(path, "w")
object_file.write(self.request.body)
object_file.close()

View File

@ -0,0 +1,35 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import os
def ensure_tree(path):
"""Create a directory (and any ancestor directories required)
:param path: Directory to create
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise

View File

@ -0,0 +1,233 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import functools
import os
import shutil
import tempfile
import time
import weakref
from eventlet import greenthread
from eventlet import semaphore
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
default=os.path.abspath(os.path.join(os.path.dirname(__file__),
'../')),
help='Directory to use for lock files')
]
CONF = cfg.CONF
CONF.register_opts(util_opts)
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
descriptor rather than outside of the process, the lock gets dropped
automatically if the process crashes, even if __exit__ is not executed.
There are no guarantees regarding usage by multiple green threads in a
single process here. This lock works only between processes. Exclusive
access between local threads should be achieved using the semaphores
in the @synchronized decorator.
Note these locks are released when the descriptor is closed, so it's not
safe to close the file descriptor while another green thread holds the
lock. Just opening and closing the lock file can break synchronisation,
so lock files must be accessed only using this abstraction.
"""
def __init__(self, name):
self.lockfile = None
self.fname = name
def __enter__(self):
self.lockfile = open(self.fname, 'w')
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError, e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.unlock()
self.lockfile.close()
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"),
self.fname)
def trylock(self):
raise NotImplementedError()
def unlock(self):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
def trylock(self):
msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
else:
import fcntl
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
def synchronized(name, lock_file_prefix, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the bar method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
The external keyword argument denotes whether this lock should work across
multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time.
The lock_path keyword argument is used to specify a special location for
external lock files to live. If nothing is set, then CONF.lock_path is
used as a default.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
'method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
cleanup_dir = True
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s '
'for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to cleanup
# the locks left behind by unit tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
return retval
return inner
return wrap

View File

@ -23,6 +23,7 @@ from nova import context
from nova import db
from nova import flags
from nova.network import linux_net
from nova.openstack.common import fileutils
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova import test
@ -236,18 +237,18 @@ class LinuxNetworkTestCase(test.TestCase):
self.flags(use_single_default_gateway=True)
self.mox.StubOutWithMock(self.driver, 'write_to_file')
self.mox.StubOutWithMock(utils, 'ensure_tree')
self.mox.StubOutWithMock(fileutils, 'ensure_tree')
self.mox.StubOutWithMock(os, 'chmod')
self.driver.write_to_file(mox.IgnoreArg(), mox.IgnoreArg())
self.driver.write_to_file(mox.IgnoreArg(), mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
os.chmod(mox.IgnoreArg(), mox.IgnoreArg())
os.chmod(mox.IgnoreArg(), mox.IgnoreArg())
@ -259,18 +260,18 @@ class LinuxNetworkTestCase(test.TestCase):
self.flags(use_single_default_gateway=True)
self.mox.StubOutWithMock(self.driver, 'write_to_file')
self.mox.StubOutWithMock(utils, 'ensure_tree')
self.mox.StubOutWithMock(fileutils, 'ensure_tree')
self.mox.StubOutWithMock(os, 'chmod')
self.driver.write_to_file(mox.IgnoreArg(), mox.IgnoreArg())
self.driver.write_to_file(mox.IgnoreArg(), mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
utils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
fileutils.ensure_tree(mox.IgnoreArg())
os.chmod(mox.IgnoreArg(), mox.IgnoreArg())
os.chmod(mox.IgnoreArg(), mox.IgnoreArg())

View File

@ -18,6 +18,7 @@
import os
from nova import flags
from nova.openstack.common import fileutils
from nova import test
from nova.tests import fake_libvirt_utils
from nova.virt.libvirt import imagebackend
@ -56,8 +57,8 @@ class _ImageTestCase(test.TestCase):
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
fn = self.mox.CreateMockAnything()
fn(target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(imagebackend.utils, 'ensure_tree')
imagebackend.utils.ensure_tree(self.TEMPLATE_DIR)
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
@ -83,7 +84,7 @@ class _ImageTestCase(test.TestCase):
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
fn = self.mox.CreateMockAnything()
fn(target=self.TEMPLATE_PATH)
self.mox.StubOutWithMock(imagebackend.utils, 'ensure_tree')
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
self.mox.ReplayAll()
image = self.image_class(self.INSTANCE, self.NAME)
@ -117,7 +118,8 @@ class RawTestCase(_ImageTestCase):
def prepare_mocks(self):
fn = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(imagebackend.utils.synchronized, '__call__')
self.mox.StubOutWithMock(imagebackend.lockutils.synchronized,
'__call__')
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
return fn
@ -167,7 +169,8 @@ class Qcow2TestCase(_ImageTestCase):
def prepare_mocks(self):
fn = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(imagebackend.utils.synchronized, '__call__')
self.mox.StubOutWithMock(imagebackend.lockutils.synchronized,
'__call__')
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
'create_cow_image')
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')

View File

@ -37,6 +37,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import fileutils
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
@ -448,7 +449,7 @@ class CacheConcurrencyTestCase(test.TestCase):
# use for tests. So, create the path here so utils.synchronized()
# won't delete it out from under one of the threads.
self.lock_path = os.path.join(FLAGS.instances_path, 'locks')
utils.ensure_tree(self.lock_path)
fileutils.ensure_tree(self.lock_path)
def fake_exists(fname):
basedir = os.path.join(FLAGS.instances_path, FLAGS.base_dir_name)

View File

@ -24,7 +24,6 @@ from eventlet import greenthread
from nova import exception
from nova import test
from nova import utils
class ExceptionTestCase(test.TestCase):
@ -63,96 +62,3 @@ class ProjectTestCase(test.TestCase):
helpful_msg = (_("The following migrations are missing a downgrade:"
"\n\t%s") % '\n\t'.join(sorted(missing_downgrade)))
self.assert_(not missing_downgrade, helpful_msg)
class LockTestCase(test.TestCase):
def test_synchronized_wrapped_function_metadata(self):
@utils.synchronized('whatever')
def foo():
"""Bar"""
pass
self.assertEquals(foo.__doc__, 'Bar', "Wrapped function's docstring "
"got lost")
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
"got mangled")
def test_synchronized_internally(self):
"""We can lock across multiple green threads"""
saved_sem_num = len(utils._semaphores)
seen_threads = list()
@utils.synchronized('testlock2', external=False)
def f(id):
for x in range(10):
seen_threads.append(id)
greenthread.sleep(0)
threads = []
pool = greenpool.GreenPool(10)
for i in range(10):
threads.append(pool.spawn(f, i))
for thread in threads:
thread.wait()
self.assertEquals(len(seen_threads), 100)
# Looking at the seen threads, split it into chunks of 10, and verify
# that the last 9 match the first in each chunk.
for i in range(10):
for j in range(9):
self.assertEquals(seen_threads[i * 10],
seen_threads[i * 10 + 1 + j])
self.assertEqual(saved_sem_num, len(utils._semaphores),
"Semaphore leak detected")
def test_nested_external_works(self):
"""We can nest external syncs"""
with utils.tempdir() as tempdir:
self.flags(lock_path=tempdir)
sentinel = object()
@utils.synchronized('testlock1', external=True)
def outer_lock():
@utils.synchronized('testlock2', external=True)
def inner_lock():
return sentinel
return inner_lock()
self.assertEqual(sentinel, outer_lock())
def test_synchronized_externally(self):
"""We can lock across multiple processes"""
with utils.tempdir() as tempdir:
self.flags(lock_path=tempdir)
rpipe1, wpipe1 = os.pipe()
rpipe2, wpipe2 = os.pipe()
@utils.synchronized('testlock1', external=True)
def f(rpipe, wpipe):
try:
os.write(wpipe, "foo")
except OSError, e:
self.assertEquals(e.errno, errno.EPIPE)
return
rfds, _wfds, _efds = select.select([rpipe], [], [], 1)
self.assertEquals(len(rfds), 0, "The other process, which was"
" supposed to be locked, "
"wrote on its end of the "
"pipe")
os.close(rpipe)
pid = os.fork()
if pid > 0:
os.close(wpipe1)
os.close(rpipe2)
f(rpipe1, wpipe2)
else:
os.close(rpipe1)
os.close(wpipe2)
f(rpipe2, wpipe1)
os._exit(0)

View File

@ -543,35 +543,6 @@ class MonkeyPatchTestCase(test.TestCase):
in nova.tests.monkey_patch_example.CALLED_FUNCTION)
class TestFileLocks(test.TestCase):
def test_concurrent_green_lock_succeeds(self):
"""Verify spawn_n greenthreads with two locks run concurrently."""
self.completed = False
with utils.tempdir() as tmpdir:
def locka(wait):
a = utils.InterProcessLock(os.path.join(tmpdir, 'a'))
with a:
wait.wait()
self.completed = True
def lockb(wait):
b = utils.InterProcessLock(os.path.join(tmpdir, 'b'))
with b:
wait.wait()
wait1 = eventlet.event.Event()
wait2 = eventlet.event.Event()
pool = greenpool.GreenPool()
pool.spawn_n(locka, wait1)
pool.spawn_n(lockb, wait2)
wait2.send()
eventlet.sleep(0)
wait1.send()
pool.waitall()
self.assertTrue(self.completed)
class AuditPeriodTest(test.TestCase):
def setUp(self):
@ -778,11 +749,3 @@ class MkfsTestCase(test.TestCase):
utils.mkfs('ext4', '/my/block/dev')
utils.mkfs('swap', '/my/swap/block/dev')
class EnsureTree(test.TestCase):
def test_ensure_tree(self):
with utils.tempdir() as tmpdir:
testdir = '%s/foo/bar/baz' % (tmpdir,)
utils.ensure_tree(testdir)
self.assertTrue(os.path.isdir(testdir))

View File

@ -586,183 +586,6 @@ def utf8(value):
return value
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
descriptor rather than outside of the process, the lock gets dropped
automatically if the process crashes, even if __exit__ is not executed.
There are no guarantees regarding usage by multiple green threads in a
single process here. This lock works only between processes. Exclusive
access between local threads should be achieved using the semaphores
in the @synchronized decorator.
Note these locks are released when the descriptor is closed, so it's not
safe to close the file descriptor while another green thread holds the
lock. Just opening and closing the lock file can break synchronisation,
so lock files must be accessed only using this abstraction.
"""
def __init__(self, name):
self.lockfile = None
self.fname = name
def __enter__(self):
self.lockfile = open(self.fname, 'w')
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError, e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.unlock()
self.lockfile.close()
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`")
% self.fname)
def trylock(self):
raise NotImplementedError()
def unlock(self):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
def trylock(self):
msvcrt.locking(self.lockfile, msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self.lockfile, msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
else:
import fcntl
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
def synchronized(name, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the bar method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
The external keyword argument denotes whether this lock should work across
multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time.
The lock_path keyword argument is used to specify a special location for
external lock files to live. If nothing is set, then FLAGS.lock_path is
used as a default.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
if external and not FLAGS.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
'method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = FLAGS.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
cleanup_dir = True
ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_path = os.path.join(local_lock_path,
'nova-%s' % safe_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" for '
'method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
retval = f(*args, **kwargs)
finally:
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to cleanup
# the locks left behind by unit tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
return retval
return inner
return wrap
def delete_if_exists(pathname):
"""delete a file, but ignore file not found error"""
@ -1314,21 +1137,6 @@ class UndoManager(object):
self._rollback()
def ensure_tree(path):
"""Create a directory (and any ancestor directories required)
:param path: Directory to create
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise
def mkfs(fs, path, label=None):
"""Format a file or block device

View File

@ -42,6 +42,8 @@ from nova import exception
from nova import flags
from nova import notifications
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova import utils
from nova.virt.baremetal import dom
@ -303,10 +305,10 @@ class BareMetalDriver(driver.ComputeDriver):
if not os.path.exists(target):
base_dir = os.path.join(FLAGS.instances_path, '_base')
if not os.path.exists(base_dir):
utils.ensure_tree(base_dir)
fileutils.ensure_tree(base_dir)
base = os.path.join(base_dir, fname)
@utils.synchronized(fname)
@lockutils.synchronized(fname, 'nova-')
def call_if_not_exists(base, fetch_func, *args, **kwargs):
if not os.path.exists(base):
fetch_func(target=base, *args, **kwargs)
@ -331,7 +333,7 @@ class BareMetalDriver(driver.ComputeDriver):
fname + suffix)
# ensure directories exist and are writable
utils.ensure_tree(basepath(suffix=''))
fileutils.ensure_tree(basepath(suffix=''))
utils.execute('chmod', '0777', basepath(suffix=''))
LOG.info(_('instance %s: Creating image'), inst['name'],
@ -339,7 +341,7 @@ class BareMetalDriver(driver.ComputeDriver):
if FLAGS.baremetal_type == 'lxc':
container_dir = '%s/rootfs' % basepath(suffix='')
utils.ensure_tree(container_dir)
fileutils.ensure_tree(container_dir)
# NOTE(vish): No need add the suffix to console.log
libvirt_utils.write_to_file(basepath('console.log', ''), '', 007)

View File

@ -24,6 +24,7 @@ import tempfile
from nova import exception
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common import log as logging
from nova import utils
from nova import version
@ -66,7 +67,7 @@ class ConfigDriveBuilder(object):
def _add_file(self, path, data):
filepath = os.path.join(self.tempdir, path)
dirname = os.path.dirname(filepath)
utils.ensure_tree(dirname)
fileutils.ensure_tree(dirname)
with open(filepath, 'w') as f:
f.write(data)

View File

@ -24,8 +24,8 @@ from nova import network
from nova.network import linux_net
from nova.openstack.common import cfg
from nova.openstack.common import importutils
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova import utils
from nova.virt import netutils
@ -430,7 +430,7 @@ class IptablesFirewallDriver(FirewallDriver):
self.do_refresh_instance_rules(instance)
self.iptables.apply()
@utils.synchronized('iptables', external=True)
@lockutils.synchronized('iptables', 'nova-', external=True)
def _inner_do_refresh_rules(self, instance, ipv4_rules,
ipv6_rules):
self.remove_filters_for_instance(instance)
@ -453,7 +453,7 @@ class IptablesFirewallDriver(FirewallDriver):
self._do_refresh_provider_fw_rules()
self.iptables.apply()
@utils.synchronized('iptables', external=True)
@lockutils.synchronized('iptables', 'nova-', external=True)
def _do_refresh_provider_fw_rules(self):
"""Internal, synchronized version of refresh_provider_fw_rules."""
self._purge_provider_fw_rules()

View File

@ -27,8 +27,8 @@ from nova import db
from nova import exception
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova import utils
from nova.virt.hyperv import baseops
from nova.virt.hyperv import constants
from nova.virt.hyperv import vmutils
@ -595,7 +595,7 @@ class VMOps(baseops.BaseOps):
If cow is True, it will make a CoW image instead of a copy.
"""
@utils.synchronized(fname)
@lockutils.synchronized(fname, 'nova-')
def call_if_not_exists(path, fn, *args, **kwargs):
if not os.path.exists(path):
fn(target=path, *args, **kwargs)

View File

@ -67,6 +67,7 @@ from nova import flags
from nova.image import glance
from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import fileutils
from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
@ -841,7 +842,7 @@ class LibvirtDriver(driver.ComputeDriver):
# Export the snapshot to a raw image
snapshot_directory = FLAGS.libvirt_snapshots_directory
utils.ensure_tree(snapshot_directory)
fileutils.ensure_tree(snapshot_directory)
with utils.tempdir(dir=snapshot_directory) as tmpdir:
try:
out_path = os.path.join(tmpdir, snapshot_name)
@ -1258,7 +1259,7 @@ class LibvirtDriver(driver.ComputeDriver):
return image(fname, image_type='raw')
# ensure directories exist and are writable
utils.ensure_tree(basepath(suffix=''))
fileutils.ensure_tree(basepath(suffix=''))
LOG.info(_('Creating image'), instance=instance)
libvirt_utils.write_to_file(basepath('libvirt.xml'), libvirt_xml)
@ -1267,7 +1268,7 @@ class LibvirtDriver(driver.ComputeDriver):
container_dir = os.path.join(FLAGS.instances_path,
instance['name'],
'rootfs')
utils.ensure_tree(container_dir)
fileutils.ensure_tree(container_dir)
# NOTE(dprince): for rescue console.log may already exist... chown it.
self._chown_console_log_for_instance(instance['name'])

View File

@ -22,6 +22,8 @@ import os
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import excutils
from nova.openstack.common import fileutils
from nova.openstack.common import lockutils
from nova import utils
from nova.virt.disk import api as disk
from nova.virt.libvirt import config
@ -112,7 +114,8 @@ class Image(object):
:filename: Name of the file in the image directory
:size: Size of created image in bytes (optional)
"""
@utils.synchronized(filename, external=True, lock_path=self.lock_path)
@lockutils.synchronized(filename, 'nova-', external=True,
lock_path=self.lock_path)
def call_if_not_exists(target, *args, **kwargs):
if not os.path.exists(target):
fetch_func(target=target, *args, **kwargs)
@ -120,7 +123,7 @@ class Image(object):
if not os.path.exists(self.path):
base_dir = os.path.join(FLAGS.instances_path, '_base')
if not os.path.exists(base_dir):
utils.ensure_tree(base_dir)
fileutils.ensure_tree(base_dir)
base = os.path.join(base_dir, filename)
self.create_image(call_if_not_exists, base, size,
@ -143,7 +146,8 @@ class Raw(Image):
instance, name)
def create_image(self, prepare_template, base, size, *args, **kwargs):
@utils.synchronized(base, external=True, lock_path=self.lock_path)
@lockutils.synchronized(base, 'nova-', external=True,
lock_path=self.lock_path)
def copy_raw_image(base, target, size):
libvirt_utils.copy_image(base, target)
if size:
@ -170,7 +174,8 @@ class Qcow2(Image):
instance, name)
def create_image(self, prepare_template, base, size, *args, **kwargs):
@utils.synchronized(base, external=True, lock_path=self.lock_path)
@lockutils.synchronized(base, 'nova-', external=True,
lock_path=self.lock_path)
def copy_qcow2_image(base, target, size):
qcow2_base = base
if size:
@ -216,7 +221,8 @@ class Lvm(Image):
self.sparse = FLAGS.libvirt_sparse_logical_volumes
def create_image(self, prepare_template, base, size, *args, **kwargs):
@utils.synchronized(base, external=True, lock_path=self.lock_path)
@lockutils.synchronized(base, 'nova-', external=True,
lock_path=self.lock_path)
def create_lvm_image(base, size):
base_size = disk.get_disk_size(base)
resize = size > base_size

View File

@ -28,6 +28,7 @@ from lxml import etree
from nova import exception
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova import utils
@ -523,7 +524,7 @@ def write_stored_info(target, field=None, value=None):
return
info_file = get_info_filename(target)
utils.ensure_tree(os.path.dirname(info_file))
fileutils.ensure_tree(os.path.dirname(info_file))
d = read_stored_info(info_file)
d[field] = value

View File

@ -22,6 +22,7 @@ import time
from nova import exception
from nova import flags
from nova.openstack.common import lockutils
from nova.openstack.common import log as logging
from nova import utils
from nova.virt.libvirt import config
@ -123,7 +124,7 @@ class LibvirtISCSIVolumeDriver(LibvirtVolumeDriver):
'-v', property_value)
return self._run_iscsiadm(iscsi_properties, iscsi_command, **kwargs)
@utils.synchronized('connect_volume')
@lockutils.synchronized('connect_volume', 'nova-')
def connect_volume(self, connection_info, mount_device):
"""Attach the volume to instance_name"""
iscsi_properties = connection_info['data']
@ -193,7 +194,7 @@ class LibvirtISCSIVolumeDriver(LibvirtVolumeDriver):
sup = super(LibvirtISCSIVolumeDriver, self)
return sup.connect_volume(connection_info, mount_device)
@utils.synchronized('connect_volume')
@lockutils.synchronized('connect_volume', 'nova-')
def disconnect_volume(self, connection_info, mount_device):
"""Detach the volume from instance_name"""
sup = super(LibvirtISCSIVolumeDriver, self)

View File

@ -24,6 +24,7 @@ import os
from nova import exception
from nova import flags
from nova.openstack.common import cfg
from nova.openstack.common import fileutils
from nova.openstack.common import log as logging
from nova import utils
@ -109,7 +110,7 @@ class TgtAdm(TargetAdmin):
# Note(jdg) tid and lun aren't used by TgtAdm but remain for
# compatibility
utils.ensure_tree(FLAGS.volumes_dir)
fileutils.ensure_tree(FLAGS.volumes_dir)
vol_id = name.split(':')[1]
volume_conf = """

View File

@ -1,7 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=cfg,context,excutils,gettextutils,importutils,iniparser,jsonutils,local,log,network_utils,notifier,plugin,policy,setup,timeutils,rpc
modules=cfg,context,excutils,fileutils,gettextutils,importutils,iniparser,jsonutils,local,lockutils,log,network_utils,notifier,plugin,policy,setup,timeutils,rpc
# The base module to hold the copy of openstack.common
base=nova