External locking for image caching.

If the instance storage is shared between compute nodes, then you
need external locking which is also shared to avoid clobbering each
other's attempts to cache base images. Resolves bug 1014227.

Change-Id: Ic2ac87840904fa199c17774dae9556ad6c7a3eaf
This commit is contained in:
Michael Still 2012-08-23 22:43:06 +10:00
parent 64cec81943
commit 1523fab5ee
4 changed files with 45 additions and 19 deletions

View File

@ -35,7 +35,8 @@ class _ImageTestCase(test.TestCase):
def setUp(self): def setUp(self):
super(_ImageTestCase, self).setUp() super(_ImageTestCase, self).setUp()
self.flags(instances_path=self.INSTANCES_PATH) self.flags(disable_process_locking=True,
instances_path=self.INSTANCES_PATH)
self.INSTANCE = 'instance' self.INSTANCE = 'instance'
self.NAME = 'fake.vm' self.NAME = 'fake.vm'
self.TEMPLATE = 'template' self.TEMPLATE = 'template'

View File

@ -146,10 +146,11 @@ class XenAPIVolumeTestCase(stubs.XenAPITestBase):
self.user_id = 'fake' self.user_id = 'fake'
self.project_id = 'fake' self.project_id = 'fake'
self.context = context.RequestContext(self.user_id, self.project_id) self.context = context.RequestContext(self.user_id, self.project_id)
self.flags(xenapi_connection_url='test_url', self.flags(disable_process_locking=True,
xenapi_connection_password='test_pass',
firewall_driver='nova.virt.xenapi.firewall.' firewall_driver='nova.virt.xenapi.firewall.'
'Dom0IptablesFirewallDriver') 'Dom0IptablesFirewallDriver',
xenapi_connection_url='test_url',
xenapi_connection_password='test_pass')
db_fakes.stub_out_db_instance_api(self.stubs) db_fakes.stub_out_db_instance_api(self.stubs)
self.instance_values = {'id': 1, self.instance_values = {'id': 1,
'project_id': self.user_id, 'project_id': self.user_id,
@ -260,11 +261,12 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
def setUp(self): def setUp(self):
super(XenAPIVMTestCase, self).setUp() super(XenAPIVMTestCase, self).setUp()
self.network = importutils.import_object(FLAGS.network_manager) self.network = importutils.import_object(FLAGS.network_manager)
self.flags(xenapi_connection_url='test_url', self.flags(disable_process_locking=True,
xenapi_connection_password='test_pass',
instance_name_template='%d', instance_name_template='%d',
firewall_driver='nova.virt.xenapi.firewall.' firewall_driver='nova.virt.xenapi.firewall.'
'Dom0IptablesFirewallDriver') 'Dom0IptablesFirewallDriver',
xenapi_connection_url='test_url',
xenapi_connection_password='test_pass',)
xenapi_fake.create_local_srs() xenapi_fake.create_local_srs()
xenapi_fake.create_local_pifs() xenapi_fake.create_local_pifs()
db_fakes.stub_out_db_instance_api(self.stubs) db_fakes.stub_out_db_instance_api(self.stubs)

View File

@ -661,7 +661,7 @@ else:
_semaphores = weakref.WeakValueDictionary() _semaphores = weakref.WeakValueDictionary()
def synchronized(name, external=False): def synchronized(name, external=False, lock_path=None):
"""Synchronization decorator. """Synchronization decorator.
Decorating a method like so:: Decorating a method like so::
@ -688,6 +688,10 @@ def synchronized(name, external=False):
multiple processes. This means that if two different workers both run a multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time. of them will execute at a time.
The lock_path keyword argument is used to specify a special location for
external lock files to live. If nothing is set, then FLAGS.lock_path is
used as a default.
""" """
def wrap(f): def wrap(f):
@ -703,9 +707,6 @@ def synchronized(name, external=False):
# (only valid in greenthreads) # (only valid in greenthreads)
_semaphores[name] = sem _semaphores[name] = sem
LOG.debug(_('Attempting to grab semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
with sem: with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method ' LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name, '"%(method)s"...'), {'lock': name,
@ -714,8 +715,25 @@ def synchronized(name, external=False):
LOG.debug(_('Attempting to grab file lock "%(lock)s" for ' LOG.debug(_('Attempting to grab file lock "%(lock)s" for '
'method "%(method)s"...'), 'method "%(method)s"...'),
{'lock': name, 'method': f.__name__}) {'lock': name, 'method': f.__name__})
lock_path = FLAGS.lock_path or tempfile.mkdtemp() cleanup_dir = False
lock_file_path = os.path.join(lock_path, 'nova-%s' % name)
def wrap_mkdtemp():
cleanup_dir = True
return tempfile.mkdtemp()
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = FLAGS.lock_path or wrap_mkdtemp()
if not os.path.exists(local_lock_path):
ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_path = os.path.join(local_lock_path,
'nova-%s' % safe_name)
lock = InterProcessLock(lock_file_path) lock = InterProcessLock(lock_file_path)
try: try:
with lock: with lock:
@ -727,8 +745,8 @@ def synchronized(name, external=False):
# NOTE(vish): This removes the tempdir if we needed # NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to cleanup # to create one. This is used to cleanup
# the locks left behind by unit tests. # the locks left behind by unit tests.
if not FLAGS.lock_path: if cleanup_dir:
shutil.rmtree(lock_path) shutil.rmtree(local_lock_path)
else: else:
retval = f(*args, **kwargs) retval = f(*args, **kwargs)

View File

@ -61,6 +61,11 @@ class Image(object):
self.driver_format = driver_format self.driver_format = driver_format
self.is_block_dev = is_block_dev self.is_block_dev = is_block_dev
# NOTE(mikal): We need a lock directory which is shared along with
# instance files, to cover the scenario where multiple compute nodes
# are trying to create a base file at the same time
self.lock_path = os.path.join(FLAGS.instances_path, 'locks')
@abc.abstractmethod @abc.abstractmethod
def create_image(self, prepare_template, base, size, *args, **kwargs): def create_image(self, prepare_template, base, size, *args, **kwargs):
"""Create image from template. """Create image from template.
@ -106,7 +111,7 @@ class Image(object):
:fname: Template name :fname: Template name
:size: Size of created image in bytes (optional) :size: Size of created image in bytes (optional)
""" """
@utils.synchronized(fname) @utils.synchronized(fname, external=True, lock_path=self.lock_path)
def call_if_not_exists(target, *args, **kwargs): def call_if_not_exists(target, *args, **kwargs):
if not os.path.exists(target): if not os.path.exists(target):
fn(target=target, *args, **kwargs) fn(target=target, *args, **kwargs)
@ -129,7 +134,7 @@ class Raw(Image):
instance, name) instance, name)
def create_image(self, prepare_template, base, size, *args, **kwargs): def create_image(self, prepare_template, base, size, *args, **kwargs):
@utils.synchronized(base) @utils.synchronized(base, external=True, lock_path=self.lock_path)
def copy_raw_image(base, target, size): def copy_raw_image(base, target, size):
libvirt_utils.copy_image(base, target) libvirt_utils.copy_image(base, target)
if size: if size:
@ -153,7 +158,7 @@ class Qcow2(Image):
instance, name) instance, name)
def create_image(self, prepare_template, base, size, *args, **kwargs): def create_image(self, prepare_template, base, size, *args, **kwargs):
@utils.synchronized(base) @utils.synchronized(base, external=True, lock_path=self.lock_path)
def copy_qcow2_image(base, target, size): def copy_qcow2_image(base, target, size):
qcow2_base = base qcow2_base = base
if size: if size:
@ -189,7 +194,7 @@ class Lvm(Image):
self.sparse = FLAGS.libvirt_sparse_logical_volumes self.sparse = FLAGS.libvirt_sparse_logical_volumes
def create_image(self, prepare_template, base, size, *args, **kwargs): def create_image(self, prepare_template, base, size, *args, **kwargs):
@utils.synchronized(base) @utils.synchronized(base, external=True, lock_path=self.lock_path)
def create_lvm_image(base, size): def create_lvm_image(base, size):
base_size = disk.get_disk_size(base) base_size = disk.get_disk_size(base)
resize = size > base_size resize = size > base_size