Merge "External locking for image caching."
This commit is contained in:
commit
9e172ce806
|
@ -35,7 +35,8 @@ class _ImageTestCase(test.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
super(_ImageTestCase, self).setUp()
|
||||
self.flags(instances_path=self.INSTANCES_PATH)
|
||||
self.flags(disable_process_locking=True,
|
||||
instances_path=self.INSTANCES_PATH)
|
||||
self.INSTANCE = 'instance'
|
||||
self.NAME = 'fake.vm'
|
||||
self.TEMPLATE = 'template'
|
||||
|
|
|
@ -146,10 +146,11 @@ class XenAPIVolumeTestCase(stubs.XenAPITestBase):
|
|||
self.user_id = 'fake'
|
||||
self.project_id = 'fake'
|
||||
self.context = context.RequestContext(self.user_id, self.project_id)
|
||||
self.flags(xenapi_connection_url='test_url',
|
||||
xenapi_connection_password='test_pass',
|
||||
self.flags(disable_process_locking=True,
|
||||
firewall_driver='nova.virt.xenapi.firewall.'
|
||||
'Dom0IptablesFirewallDriver')
|
||||
'Dom0IptablesFirewallDriver',
|
||||
xenapi_connection_url='test_url',
|
||||
xenapi_connection_password='test_pass')
|
||||
db_fakes.stub_out_db_instance_api(self.stubs)
|
||||
self.instance_values = {'id': 1,
|
||||
'project_id': self.user_id,
|
||||
|
@ -260,11 +261,12 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
|
|||
def setUp(self):
|
||||
super(XenAPIVMTestCase, self).setUp()
|
||||
self.network = importutils.import_object(FLAGS.network_manager)
|
||||
self.flags(xenapi_connection_url='test_url',
|
||||
xenapi_connection_password='test_pass',
|
||||
self.flags(disable_process_locking=True,
|
||||
instance_name_template='%d',
|
||||
firewall_driver='nova.virt.xenapi.firewall.'
|
||||
'Dom0IptablesFirewallDriver')
|
||||
'Dom0IptablesFirewallDriver',
|
||||
xenapi_connection_url='test_url',
|
||||
xenapi_connection_password='test_pass',)
|
||||
xenapi_fake.create_local_srs()
|
||||
xenapi_fake.create_local_pifs()
|
||||
db_fakes.stub_out_db_instance_api(self.stubs)
|
||||
|
|
|
@ -661,7 +661,7 @@ else:
|
|||
_semaphores = weakref.WeakValueDictionary()
|
||||
|
||||
|
||||
def synchronized(name, external=False):
|
||||
def synchronized(name, external=False, lock_path=None):
|
||||
"""Synchronization decorator.
|
||||
|
||||
Decorating a method like so::
|
||||
|
@ -688,6 +688,10 @@ def synchronized(name, external=False):
|
|||
multiple processes. This means that if two different workers both run a
|
||||
a method decorated with @synchronized('mylock', external=True), only one
|
||||
of them will execute at a time.
|
||||
|
||||
The lock_path keyword argument is used to specify a special location for
|
||||
external lock files to live. If nothing is set, then FLAGS.lock_path is
|
||||
used as a default.
|
||||
"""
|
||||
|
||||
def wrap(f):
|
||||
|
@ -703,9 +707,6 @@ def synchronized(name, external=False):
|
|||
# (only valid in greenthreads)
|
||||
_semaphores[name] = sem
|
||||
|
||||
LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method '
|
||||
'"%(method)s"...'), {'lock': name,
|
||||
'method': f.__name__})
|
||||
with sem:
|
||||
LOG.debug(_('Got semaphore "%(lock)s" for method '
|
||||
'"%(method)s"...'), {'lock': name,
|
||||
|
@ -714,8 +715,25 @@ def synchronized(name, external=False):
|
|||
LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
|
||||
'method "%(method)s"...'),
|
||||
{'lock': name, 'method': f.__name__})
|
||||
lock_path = FLAGS.lock_path or tempfile.mkdtemp()
|
||||
lock_file_path = os.path.join(lock_path, 'nova-%s' % name)
|
||||
cleanup_dir = False
|
||||
|
||||
def wrap_mkdtemp():
|
||||
cleanup_dir = True
|
||||
return tempfile.mkdtemp()
|
||||
|
||||
# We need a copy of lock_path because it is non-local
|
||||
local_lock_path = lock_path
|
||||
if not local_lock_path:
|
||||
local_lock_path = FLAGS.lock_path or wrap_mkdtemp()
|
||||
|
||||
if not os.path.exists(local_lock_path):
|
||||
ensure_tree(local_lock_path)
|
||||
|
||||
# NOTE(mikal): the lock name cannot contain directory
|
||||
# separators
|
||||
safe_name = name.replace(os.sep, '_')
|
||||
lock_file_path = os.path.join(local_lock_path,
|
||||
'nova-%s' % safe_name)
|
||||
lock = InterProcessLock(lock_file_path)
|
||||
try:
|
||||
with lock:
|
||||
|
@ -727,8 +745,8 @@ def synchronized(name, external=False):
|
|||
# NOTE(vish): This removes the tempdir if we needed
|
||||
# to create one. This is used to cleanup
|
||||
# the locks left behind by unit tests.
|
||||
if not FLAGS.lock_path:
|
||||
shutil.rmtree(lock_path)
|
||||
if cleanup_dir:
|
||||
shutil.rmtree(local_lock_path)
|
||||
else:
|
||||
retval = f(*args, **kwargs)
|
||||
|
||||
|
|
|
@ -61,6 +61,11 @@ class Image(object):
|
|||
self.driver_format = driver_format
|
||||
self.is_block_dev = is_block_dev
|
||||
|
||||
# NOTE(mikal): We need a lock directory which is shared along with
|
||||
# instance files, to cover the scenario where multiple compute nodes
|
||||
# are trying to create a base file at the same time
|
||||
self.lock_path = os.path.join(FLAGS.instances_path, 'locks')
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_image(self, prepare_template, base, size, *args, **kwargs):
|
||||
"""Create image from template.
|
||||
|
@ -106,7 +111,7 @@ class Image(object):
|
|||
:fname: Template name
|
||||
:size: Size of created image in bytes (optional)
|
||||
"""
|
||||
@utils.synchronized(fname)
|
||||
@utils.synchronized(fname, external=True, lock_path=self.lock_path)
|
||||
def call_if_not_exists(target, *args, **kwargs):
|
||||
if not os.path.exists(target):
|
||||
fn(target=target, *args, **kwargs)
|
||||
|
@ -129,7 +134,7 @@ class Raw(Image):
|
|||
instance, name)
|
||||
|
||||
def create_image(self, prepare_template, base, size, *args, **kwargs):
|
||||
@utils.synchronized(base)
|
||||
@utils.synchronized(base, external=True, lock_path=self.lock_path)
|
||||
def copy_raw_image(base, target, size):
|
||||
libvirt_utils.copy_image(base, target)
|
||||
if size:
|
||||
|
@ -153,7 +158,7 @@ class Qcow2(Image):
|
|||
instance, name)
|
||||
|
||||
def create_image(self, prepare_template, base, size, *args, **kwargs):
|
||||
@utils.synchronized(base)
|
||||
@utils.synchronized(base, external=True, lock_path=self.lock_path)
|
||||
def copy_qcow2_image(base, target, size):
|
||||
qcow2_base = base
|
||||
if size:
|
||||
|
@ -189,7 +194,7 @@ class Lvm(Image):
|
|||
self.sparse = FLAGS.libvirt_sparse_logical_volumes
|
||||
|
||||
def create_image(self, prepare_template, base, size, *args, **kwargs):
|
||||
@utils.synchronized(base)
|
||||
@utils.synchronized(base, external=True, lock_path=self.lock_path)
|
||||
def create_lvm_image(base, size):
|
||||
base_size = disk.get_disk_size(base)
|
||||
resize = size > base_size
|
||||
|
|
Loading…
Reference in New Issue