Adds lock for create from vol/snap to avoid race conditions

This patch protects create from volume/snapshot by using a
lockfile to protect the operation from concurrent deletes of
the volume/snapshot used in the create operation.

Currently, if a volume/snapshot is deleted while a volume is
being created from it that delete may complete during the
create operation thus leaving the new volume in error or stuck
state. This lock will ensure that:

(a) if a create of VolA from snap/volB is in progress, any
    delete requests for snap/volB will wait until the create
    is complete.

(b) if a delete of snap/volA is in progress, any create from
    snap/volA will wait until snap/volA delete is complete.

Co-authored-by: Takashi Natsume <natsume.takashi@lab.ntt.co.jp>
Closes-Bug: 1251334
Change-Id: Ie4bc0af789ab232593f55aa2f6b34345eb9b9929
This commit is contained in:
Edward Hope-Morley
2013-11-14 19:00:00 +00:00
parent e99cd785d8
commit 4f6e5fcc25
2 changed files with 302 additions and 4 deletions

View File

@@ -46,6 +46,7 @@ from cinder.openstack.common.notifier import test_notifier
from cinder.openstack.common import rpc
import cinder.policy
from cinder import quota
from cinder.taskflow.patterns import linear_flow
from cinder import test
from cinder.tests.brick.fake_lvm import FakeBrickLVM
from cinder.tests import conf_fixture
@@ -62,6 +63,8 @@ from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
import eventlet
QUOTAS = quota.QUOTAS
CONF = cfg.CONF
@@ -110,6 +113,8 @@ class BaseVolumeTestCase(test.TestCase):
self.stubs.Set(brick_lvm.LVM, '_vg_exists', lambda x: True)
self.stubs.Set(os.path, 'exists', lambda x: True)
self.volume.driver.set_initialized()
# keep ordered record of what we execute
self.called = []
def tearDown(self):
try:
@@ -443,6 +448,234 @@ class VolumeTestCase(BaseVolumeTestCase):
self.volume.delete_snapshot(self.context, snapshot_id)
self.volume.delete_volume(self.context, volume_src['id'])
def _mock_synchronized(self, name, *s_args, **s_kwargs):
def inner_sync1(f):
def inner_sync2(*args, **kwargs):
self.called.append('lock-%s' % (name))
ret = f(*args, **kwargs)
self.called.append('unlock-%s' % (name))
return ret
return inner_sync2
return inner_sync1
def test_create_volume_from_snapshot_check_locks(self):
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot',
lambda *args, **kwargs: None)
orig_flow = linear_flow.Flow.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
self.assertEqual(len(self.called), 1)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
# no lock
self.volume.create_volume(self.context, src_vol_id)
snap_id = self._create_snapshot(src_vol_id)['id']
# no lock
self.volume.create_snapshot(self.context, src_vol_id, snap_id)
dst_vol = tests_utils.create_volume(self.context,
snapshot_id=snap_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
snapshot_id=snap_id)
self.assertEqual(len(self.called), 2)
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(snap_id,
db.volume_get(admin_ctxt, dst_vol_id).snapshot_id)
# locked
self.volume.delete_volume(self.context, dst_vol_id)
self.assertEqual(len(self.called), 4)
# locked
self.volume.delete_snapshot(self.context, snap_id)
self.assertEqual(len(self.called), 6)
# locked
self.volume.delete_volume(self.context, src_vol_id)
self.assertEqual(len(self.called), 8)
self.assertEqual(self.called,
['lock-%s' % ('%s-delete_snapshot' % (snap_id)),
'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'lock-%s' % ('%s-delete_snapshot' % (snap_id)),
'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])
def test_create_volume_from_volume_check_locks(self):
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
orig_flow = linear_flow.Flow.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
self.assertEqual(len(self.called), 1)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
# no lock
self.volume.create_volume(self.context, src_vol_id)
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
source_volid=src_vol_id)
self.assertEqual(len(self.called), 2)
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(src_vol_id,
db.volume_get(admin_ctxt, dst_vol_id).source_volid)
# locked
self.volume.delete_volume(self.context, dst_vol_id)
self.assertEqual(len(self.called), 4)
# locked
self.volume.delete_volume(self.context, src_vol_id)
self.assertEqual(len(self.called), 6)
self.assertEqual(self.called,
['lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id)),
'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id))])
def test_create_volume_from_volume_delete_lock_taken(self):
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
# no lock
self.volume.create_volume(self.context, src_vol_id)
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()
orig_elevated = self.context.elevated
ctxt_deepcopy = self.context.deepcopy()
gthreads = []
def mock_elevated(*args, **kwargs):
# unset mock so it is only called once
self.stubs.Set(self.context, 'elevated', orig_elevated)
# we expect this to block and then fail
t = eventlet.spawn(self.volume.create_volume,
ctxt_deepcopy,
volume_id=dst_vol_id, source_volid=src_vol_id)
gthreads.append(t)
return orig_elevated(*args, **kwargs)
# mock something from early on in the delete operation and within the
# lock so that when we do the create we expect it to block.
self.stubs.Set(self.context, 'elevated', mock_elevated)
# locked
self.volume.delete_volume(self.context, src_vol_id)
# we expect the volume create to fail with the following err since the
# source volume was deleted while the create was locked. Note that the
# volume is still in the db since it was created by the test prior to
# calling manager.create_volume.
self.assertRaises(exception.VolumeNotFound, gthreads[0].wait)
def test_create_volume_from_snapshot_delete_lock_taken(self):
# create source volume
src_vol = tests_utils.create_volume(self.context, **self.volume_params)
src_vol_id = src_vol['id']
# no lock
self.volume.create_volume(self.context, src_vol_id)
# create snapshot
snap_id = self._create_snapshot(src_vol_id)['id']
# no lock
self.volume.create_snapshot(self.context, src_vol_id, snap_id)
# create vol from snapshot...
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
**self.volume_params)
dst_vol_id = dst_vol['id']
admin_ctxt = context.get_admin_context()
orig_elevated = self.context.elevated
ctxt_deepcopy = self.context.deepcopy()
gthreads = []
def mock_elevated(*args, **kwargs):
# unset mock so it is only called once
self.stubs.Set(self.context, 'elevated', orig_elevated)
# We expect this to block and then fail
t = eventlet.spawn(self.volume.create_volume, ctxt_deepcopy,
volume_id=dst_vol_id, snapshot_id=snap_id)
gthreads.append(t)
return orig_elevated(*args, **kwargs)
# mock something from early on in the delete operation and within the
# lock so that when we do the create we expect it to block.
self.stubs.Set(self.context, 'elevated', mock_elevated)
# locked
self.volume.delete_snapshot(self.context, snap_id)
# we expect the volume create to fail with the following err since the
# snapshot was deleted while the create was locked. Note that the
# volume is still in the db since it was created by the test prior to
# calling manager.create_volume.
self.assertRaises(exception.SnapshotNotFound, gthreads[0].wait)
# locked
self.volume.delete_volume(self.context, src_vol_id)
# make sure it is gone
self.assertRaises(exception.VolumeNotFound, db.volume_get,
self.context, src_vol_id)
def test_create_volume_from_snapshot_with_encryption(self):
"""Test volume can be created from a snapshot of
an encrypted volume.

View File

@@ -138,6 +138,49 @@ MAPPING = {
'cinder.volume.drivers.huawei.HuaweiVolumeDriver'}
def locked_volume_operation(f):
"""Lock decorator for volume operations.
Takes a named lock prior to executing the operation. The lock is named with
the operation executed and the id of the volume. This lock can then be used
by other operations to avoid operation conflicts on shared volumes.
Example use:
If a volume operation uses this decorator, it will block until the named
lock is free. This is used to protect concurrent operations on the same
volume e.g. delete VolA while create volume VolB from VolA is in progress.
"""
def lvo_inner1(inst, context, volume_id, **kwargs):
@utils.synchronized("%s-%s" % (volume_id, f.__name__), external=True)
def lvo_inner2(*_args, **_kwargs):
return f(*_args, **_kwargs)
return lvo_inner2(inst, context, volume_id, **kwargs)
return lvo_inner1
def locked_snapshot_operation(f):
"""Lock decorator for snapshot operations.
Takes a named lock prior to executing the operation. The lock is named with
the operation executed and the id of the snapshot. This lock can then be
used by other operations to avoid operation conflicts on shared snapshots.
Example use:
If a snapshot operation uses this decorator, it will block until the named
lock is free. This is used to protect concurrent operations on the same
snapshot e.g. delete SnapA while create volume VolA from SnapA is in
progress.
"""
def lso_inner1(inst, context, snapshot_id, **kwargs):
@utils.synchronized("%s-%s" % (snapshot_id, f.__name__), external=True)
def lso_inner2(*_args, **_kwargs):
return f(*_args, **_kwargs)
return lso_inner2(inst, context, snapshot_id, **kwargs)
return lso_inner1
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
@@ -264,15 +307,36 @@ class VolumeManager(manager.SchedulerDependentManager):
assert flow, _('Manager volume flow not retrieved')
flow.run(context.elevated())
if flow.state != states.SUCCESS:
raise exception.CinderException(_("Failed to successfully complete"
" manager volume workflow"))
if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it.
locked_action = "%s-%s" % (snapshot_id, 'delete_snapshot')
elif source_volid is not None:
# Make sure the volume is not deleted until we are done with it.
locked_action = "%s-%s" % (source_volid, 'delete_volume')
else:
locked_action = None
def _run_flow():
flow.run(context.elevated())
if flow.state != states.SUCCESS:
msg = _("Failed to successfully complete manager volume "
"workflow")
raise exception.CinderException(msg)
@utils.synchronized(locked_action, external=True)
def _run_flow_locked():
_run_flow()
if locked_action is None:
_run_flow()
else:
_run_flow_locked()
self._reset_stats()
return volume_id
@utils.require_driver_initialized
@locked_volume_operation
def delete_volume(self, context, volume_id):
"""Deletes and unexports volume."""
context = context.elevated()
@@ -401,6 +465,7 @@ class VolumeManager(manager.SchedulerDependentManager):
return snapshot_id
@utils.require_driver_initialized
@locked_snapshot_operation
def delete_snapshot(self, context, snapshot_id):
"""Deletes and unexports snapshot."""
caller_context = context