Replace locks in volume manager

Use Tooz locks from coordination module in volume manager.

Partially implements: blueprint cinder-volume-active-active-support
Co-Authored-By: Szymon Borkowski <szymon.borkowski@intel.com>
Change-Id: I25b088a6a7c5d89dd64b5365c0441cf8e94f33fe
This commit is contained in:
Szymon Wroblewski 2015-05-26 17:22:06 +02:00 committed by Szymon Wróblewski
parent 1c7b0a50da
commit fa4442ef89
3 changed files with 111 additions and 189 deletions

View File

@ -40,6 +40,7 @@ from oslotest import moxstubout
import testtools
from cinder.common import config # noqa Need to register global_opts
from cinder import coordination
from cinder.db import migration
from cinder.db.sqlalchemy import api as sqla_api
from cinder import i18n
@ -232,6 +233,9 @@ class TestCase(testtools.TestCase):
# clear out the cache.
sqla_api._GET_METHODS = {}
coordination.COORDINATOR.start()
self.addCleanup(coordination.COORDINATOR.stop)
def _restore_obj_registry(self):
objects_base.CinderObjectRegistry._registry._obj_classes = \
self._base_test_obj_backup

View File

@ -43,6 +43,7 @@ from taskflow.engines.action_engine import engine
from cinder.api import common
from cinder.brick.local_dev import lvm as brick_lvm
from cinder import context
from cinder import coordination
from cinder import db
from cinder import exception
from cinder.image import image_utils
@ -1566,17 +1567,16 @@ class VolumeTestCase(BaseVolumeTestCase):
def _fake_execute(self, *cmd, **kwargs):
pass
@mock.patch.object(coordination.Coordinator, 'get_lock')
@mock.patch.object(cinder.volume.drivers.lvm.LVMVolumeDriver,
'create_volume_from_snapshot')
def test_create_volume_from_snapshot_check_locks(
self, mock_lvm_create):
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
self, mock_lvm_create, mock_lock):
orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
self.assertEqual(1, len(self.called))
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
@ -1607,45 +1607,35 @@ class VolumeTestCase(BaseVolumeTestCase):
self.volume.create_volume(self.context, volume_id=dst_vol_id,
request_spec={'snapshot_id': snap_id},
volume=dst_vol)
self.assertEqual(2, len(self.called))
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(snap_id,
db.volume_get(admin_ctxt, dst_vol_id).snapshot_id)
# locked
self.volume.delete_volume(self.context, dst_vol_id, volume=dst_vol)
self.assertEqual(4, len(self.called))
mock_lock.assert_called_with('%s-delete_volume' % dst_vol_id)
# locked
self.volume.delete_snapshot(self.context, snapshot_obj)
self.assertEqual(6, len(self.called))
mock_lock.assert_called_with('%s-delete_snapshot' % snap_id)
# locked
self.volume.delete_volume(self.context, src_vol_id, volume=src_vol)
self.assertEqual(8, len(self.called))
self.assertEqual(['lock-%s' % ('%s-delete_snapshot' % (snap_id)),
'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'lock-%s' % ('%s-delete_snapshot' % (snap_id)),
'unlock-%s' % ('%s-delete_snapshot' % (snap_id)),
'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id))],
self.called)
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
self.assertTrue(mock_lvm_create.called)
def test_create_volume_from_volume_check_locks(self):
@mock.patch.object(coordination.Coordinator, 'get_lock')
def test_create_volume_from_volume_check_locks(self, mock_lock):
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
self.stubs.Set(utils, 'execute', self._fake_execute)
orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
self.assertEqual(1, len(self.called))
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
# now proceed with the flow.
ret = orig_flow(*args, **kwargs)
return ret
@ -1656,6 +1646,7 @@ class VolumeTestCase(BaseVolumeTestCase):
# no lock
self.volume.create_volume(self.context, src_vol_id, volume=src_vol)
self.assertEqual(0, mock_lock.call_count)
dst_vol = tests_utils.create_volume(self.context,
source_volid=src_vol_id,
@ -1670,26 +1661,18 @@ class VolumeTestCase(BaseVolumeTestCase):
self.volume.create_volume(self.context, volume_id=dst_vol_id,
request_spec={'source_volid': src_vol_id},
volume=dst_vol)
self.assertEqual(2, len(self.called))
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
self.assertEqual(dst_vol_id, db.volume_get(admin_ctxt, dst_vol_id).id)
self.assertEqual(src_vol_id,
db.volume_get(admin_ctxt, dst_vol_id).source_volid)
# locked
self.volume.delete_volume(self.context, dst_vol_id, volume=dst_vol)
self.assertEqual(4, len(self.called))
mock_lock.assert_called_with('%s-delete_volume' % dst_vol_id)
# locked
self.volume.delete_volume(self.context, src_vol_id, volume=src_vol)
self.assertEqual(6, len(self.called))
self.assertEqual(['lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id)),
'lock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (dst_vol_id)),
'lock-%s' % ('%s-delete_volume' % (src_vol_id)),
'unlock-%s' % ('%s-delete_volume' % (src_vol_id))],
self.called)
mock_lock.assert_called_with('%s-delete_volume' % src_vol_id)
def test_create_volume_from_volume_delete_lock_taken(self):
# create source volume

View File

@ -56,6 +56,7 @@ from taskflow import exceptions as tfe
from cinder import compute
from cinder import context
from cinder import coordination
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _, _LE, _LI, _LW
@ -153,68 +154,6 @@ MAPPING = {
}
def locked_volume_operation(f):
"""Lock decorator for volume operations.
Takes a named lock prior to executing the operation. The lock is named with
the operation executed and the id of the volume. This lock can then be used
by other operations to avoid operation conflicts on shared volumes.
Example use:
If a volume operation uses this decorator, it will block until the named
lock is free. This is used to protect concurrent operations on the same
volume e.g. delete VolA while create volume VolB from VolA is in progress.
"""
def lvo_inner1(inst, context, volume_id, **kwargs):
@utils.synchronized("%s-%s" % (volume_id, f.__name__), external=True)
def lvo_inner2(*_args, **_kwargs):
return f(*_args, **_kwargs)
return lvo_inner2(inst, context, volume_id, **kwargs)
return lvo_inner1
def locked_detach_operation(f):
"""Lock decorator for volume detach operations.
Takes a named lock prior to executing the detach call. The lock is named
with the operation executed and the id of the volume. This lock can then
be used by other operations to avoid operation conflicts on shared volumes.
This locking mechanism is only for detach calls. We can't use the
locked_volume_operation, because detach requires an additional
attachment_id in the parameter list.
"""
def ldo_inner1(inst, context, volume_id, attachment_id=None, **kwargs):
@utils.synchronized("%s-%s" % (volume_id, f.__name__), external=True)
def ldo_inner2(*_args, **_kwargs):
return f(*_args, **_kwargs)
return ldo_inner2(inst, context, volume_id, attachment_id, **kwargs)
return ldo_inner1
def locked_snapshot_operation(f):
"""Lock decorator for snapshot operations.
Takes a named lock prior to executing the operation. The lock is named with
the operation executed and the id of the snapshot. This lock can then be
used by other operations to avoid operation conflicts on shared snapshots.
Example use:
If a snapshot operation uses this decorator, it will block until the named
lock is free. This is used to protect concurrent operations on the same
snapshot e.g. delete SnapA while create volume VolA from SnapA is in
progress.
"""
def lso_inner1(inst, context, snapshot, **kwargs):
@utils.synchronized("%s-%s" % (snapshot.id, f.__name__), external=True)
def lso_inner2(*_args, **_kwargs):
return f(*_args, **_kwargs)
return lso_inner2(inst, context, snapshot, **kwargs)
return lso_inner1
class VolumeManager(manager.SchedulerDependentManager):
"""Manages attachable block storage devices."""
@ -618,10 +557,6 @@ class VolumeManager(manager.SchedulerDependentManager):
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
@utils.synchronized(locked_action, external=True)
def _run_flow_locked():
_run_flow()
# NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
# decide if allocated_capacity should be incremented.
rescheduled = False
@ -631,7 +566,8 @@ class VolumeManager(manager.SchedulerDependentManager):
if locked_action is None:
_run_flow()
else:
_run_flow_locked()
with coordination.Lock(locked_action):
_run_flow()
finally:
try:
vol_ref = flow_engine.storage.fetch('volume_ref')
@ -657,7 +593,8 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.info(_LI("Created volume successfully."), resource=vol_ref)
return vol_ref.id
@locked_volume_operation
# FIXME(bluex): replace volume_id with volume.id when volume_id is removed
@coordination.synchronized('{volume_id}-{f_name}')
def delete_volume(self, context, volume_id,
unmanage_only=False,
volume=None,
@ -877,7 +814,7 @@ class VolumeManager(manager.SchedulerDependentManager):
resource=snapshot)
return snapshot.id
@locked_snapshot_operation
@coordination.synchronized('{snapshot.id}-{f_name}')
def delete_snapshot(self, context, snapshot, unmanage_only=False):
"""Deletes and unexports snapshot."""
context = context.elevated()
@ -943,108 +880,106 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.info(_LI("Delete snapshot completed successfully"),
resource=snapshot)
@coordination.synchronized('{volume_id}')
def attach_volume(self, context, volume_id, instance_uuid, host_name,
mountpoint, mode):
"""Updates db to show volume is attached."""
@utils.synchronized(volume_id, external=True)
def do_attach():
# check the volume status before attaching
volume = self.db.volume_get(context, volume_id)
volume_metadata = self.db.volume_admin_metadata_get(
context.elevated(), volume_id)
if volume['status'] == 'attaching':
if (volume_metadata.get('attached_mode') and
volume_metadata.get('attached_mode') != mode):
raise exception.InvalidVolume(
reason=_("being attached by different mode"))
if (volume['status'] == 'in-use' and not volume['multiattach']
and not volume['migration_status']):
# check the volume status before attaching
volume = self.db.volume_get(context, volume_id)
volume_metadata = self.db.volume_admin_metadata_get(
context.elevated(), volume_id)
if volume['status'] == 'attaching':
if (volume_metadata.get('attached_mode') and
volume_metadata.get('attached_mode') != mode):
raise exception.InvalidVolume(
reason=_("volume is already attached"))
reason=_("being attached by different mode"))
host_name_sanitized = utils.sanitize_hostname(
host_name) if host_name else None
if instance_uuid:
attachments = \
self.db.volume_attachment_get_all_by_instance_uuid(
context, volume_id, instance_uuid)
else:
attachments = (
self.db.volume_attachment_get_all_by_host(
context,
volume_id,
host_name_sanitized))
if attachments:
self.db.volume_update(context, volume_id,
{'status': 'in-use'})
return
if (volume['status'] == 'in-use' and not volume['multiattach']
and not volume['migration_status']):
raise exception.InvalidVolume(
reason=_("volume is already attached"))
self._notify_about_volume_usage(context, volume,
"attach.start")
values = {'volume_id': volume_id,
'attach_status': 'attaching', }
host_name_sanitized = utils.sanitize_hostname(
host_name) if host_name else None
if instance_uuid:
attachments = \
self.db.volume_attachment_get_all_by_instance_uuid(
context, volume_id, instance_uuid)
else:
attachments = (
self.db.volume_attachment_get_all_by_host(
context,
volume_id,
host_name_sanitized))
if attachments:
self.db.volume_update(context, volume_id,
{'status': 'in-use'})
return
attachment = self.db.volume_attach(context.elevated(), values)
volume_metadata = self.db.volume_admin_metadata_update(
context.elevated(), volume_id,
{"attached_mode": mode}, False)
self._notify_about_volume_usage(context, volume,
"attach.start")
values = {'volume_id': volume_id,
'attach_status': 'attaching', }
attachment_id = attachment['id']
if instance_uuid and not uuidutils.is_uuid_like(instance_uuid):
self.db.volume_attachment_update(context, attachment_id,
{'attach_status':
'error_attaching'})
raise exception.InvalidUUID(uuid=instance_uuid)
attachment = self.db.volume_attach(context.elevated(), values)
volume_metadata = self.db.volume_admin_metadata_update(
context.elevated(), volume_id,
{"attached_mode": mode}, False)
volume = self.db.volume_get(context, volume_id)
attachment_id = attachment['id']
if instance_uuid and not uuidutils.is_uuid_like(instance_uuid):
self.db.volume_attachment_update(context, attachment_id,
{'attach_status':
'error_attaching'})
raise exception.InvalidUUID(uuid=instance_uuid)
if volume_metadata.get('readonly') == 'True' and mode != 'ro':
self.db.volume_update(context, volume_id,
{'status': 'error_attaching'})
self.message_api.create(
context, defined_messages.ATTACH_READONLY_VOLUME,
context.project_id, resource_type=resource_types.VOLUME,
resource_uuid=volume_id)
raise exception.InvalidVolumeAttachMode(mode=mode,
volume_id=volume_id)
volume = self.db.volume_get(context, volume_id)
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
# and the volume status updated.
utils.require_driver_initialized(self.driver)
if volume_metadata.get('readonly') == 'True' and mode != 'ro':
self.db.volume_update(context, volume_id,
{'status': 'error_attaching'})
self.message_api.create(
context, defined_messages.ATTACH_READONLY_VOLUME,
context.project_id, resource_type=resource_types.VOLUME,
resource_uuid=volume_id)
raise exception.InvalidVolumeAttachMode(mode=mode,
volume_id=volume_id)
LOG.debug('Attaching volume %(volume_id)s to instance '
'%(instance)s at mountpoint %(mount)s on host '
'%(host)s.',
{'volume_id': volume_id, 'instance': instance_uuid,
'mount': mountpoint, 'host': host_name_sanitized},
resource=volume)
self.driver.attach_volume(context,
volume,
instance_uuid,
host_name_sanitized,
mountpoint)
except Exception:
with excutils.save_and_reraise_exception():
self.db.volume_attachment_update(
context, attachment_id,
{'attach_status': 'error_attaching'})
try:
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught
# and the volume status updated.
utils.require_driver_initialized(self.driver)
volume = self.db.volume_attached(context.elevated(),
attachment_id,
instance_uuid,
host_name_sanitized,
mountpoint,
mode)
self._notify_about_volume_usage(context, volume, "attach.end")
LOG.info(_LI("Attach volume completed successfully."),
resource=volume)
return self.db.volume_attachment_get(context, attachment_id)
return do_attach()
LOG.debug('Attaching volume %(volume_id)s to instance '
'%(instance)s at mountpoint %(mount)s on host '
'%(host)s.',
{'volume_id': volume_id, 'instance': instance_uuid,
'mount': mountpoint, 'host': host_name_sanitized},
resource=volume)
self.driver.attach_volume(context,
volume,
instance_uuid,
host_name_sanitized,
mountpoint)
except Exception:
with excutils.save_and_reraise_exception():
self.db.volume_attachment_update(
context, attachment_id,
{'attach_status': 'error_attaching'})
@locked_detach_operation
volume = self.db.volume_attached(context.elevated(),
attachment_id,
instance_uuid,
host_name_sanitized,
mountpoint,
mode)
self._notify_about_volume_usage(context, volume, "attach.end")
LOG.info(_LI("Attach volume completed successfully."),
resource=volume)
return self.db.volume_attachment_get(context, attachment_id)
@coordination.synchronized('{volume_id}-{f_name}')
def detach_volume(self, context, volume_id, attachment_id=None):
"""Updates db to show volume is detached."""
# TODO(vish): refactor this into a more general "unreserve"