Merge "Use original volume OVO instance in create flow"
This commit is contained in:
@@ -120,7 +120,7 @@ class ScheduleCreateVolumeTask(flow_utils.CinderTask):
|
||||
"payload %(payload)s"),
|
||||
{'topic': self.FAILURE_TOPIC, 'payload': payload})
|
||||
|
||||
def execute(self, context, request_spec, filter_properties):
|
||||
def execute(self, context, request_spec, filter_properties, volume):
|
||||
try:
|
||||
self.driver_api.schedule_create_volume(context, request_spec,
|
||||
filter_properties)
|
||||
@@ -141,9 +141,7 @@ class ScheduleCreateVolumeTask(flow_utils.CinderTask):
|
||||
try:
|
||||
self._handle_failure(context, request_spec, e)
|
||||
finally:
|
||||
common.error_out_volume(context, self.db_api,
|
||||
request_spec['volume_id'],
|
||||
reason=e)
|
||||
common.error_out(volume, reason=e)
|
||||
|
||||
|
||||
def get_flow(context, db_api, driver_api, request_spec=None,
|
||||
|
||||
@@ -4218,17 +4218,22 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
fake_error_create_cloned_volume)
|
||||
volume_src = tests_utils.create_volume(self.context,
|
||||
**self.volume_params)
|
||||
self.assertEqual('creating', volume_src.status)
|
||||
self.volume.create_volume(self.context, volume_src.id,
|
||||
volume=volume_src)
|
||||
self.assertEqual('available', volume_src.status)
|
||||
volume_dst = tests_utils.create_volume(self.context,
|
||||
source_volid=volume_src['id'],
|
||||
**self.volume_params)
|
||||
self.assertEqual('creating', volume_dst.status)
|
||||
self.assertRaises(exception.CinderException,
|
||||
self.volume.create_volume,
|
||||
self.context,
|
||||
volume_dst.id,
|
||||
volume=volume_dst)
|
||||
self.assertEqual('creating', volume_src['status'])
|
||||
# Source volume's status is still available and dst is set to error
|
||||
self.assertEqual('available', volume_src.status)
|
||||
self.assertEqual('error', volume_dst.status)
|
||||
self.volume.delete_volume(self.context, volume_dst.id,
|
||||
volume=volume_dst)
|
||||
self.volume.delete_volume(self.context, volume_src.id,
|
||||
|
||||
@@ -601,7 +601,7 @@ class CreateVolumeFlowManagerTestCase(test.TestCase):
|
||||
snapshot_obj.id)
|
||||
fake_driver.create_volume_from_snapshot.assert_called_once_with(
|
||||
volume_obj, snapshot_obj)
|
||||
handle_bootable.assert_called_once_with(self.ctxt, volume_obj.id,
|
||||
handle_bootable.assert_called_once_with(self.ctxt, volume_obj,
|
||||
snapshot_id=snapshot_obj.id)
|
||||
|
||||
@mock.patch('cinder.objects.Snapshot.get_by_id')
|
||||
@@ -675,7 +675,7 @@ class CreateVolumeFlowManagerGlanceCinderBackendCase(test.TestCase):
|
||||
if format is 'raw' and not owner and location:
|
||||
fake_driver.create_cloned_volume.assert_called_once_with(
|
||||
volume, image_volume)
|
||||
handle_bootable.assert_called_once_with(self.ctxt, volume['id'],
|
||||
handle_bootable.assert_called_once_with(self.ctxt, volume,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta)
|
||||
else:
|
||||
@@ -752,7 +752,7 @@ class CreateVolumeFlowManagerImageCacheTestCase(test.TestCase):
|
||||
|
||||
mock_handle_bootable.assert_called_once_with(
|
||||
self.ctxt,
|
||||
volume['id'],
|
||||
volume,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta
|
||||
)
|
||||
@@ -812,7 +812,7 @@ class CreateVolumeFlowManagerImageCacheTestCase(test.TestCase):
|
||||
|
||||
mock_handle_bootable.assert_called_once_with(
|
||||
self.ctxt,
|
||||
volume['id'],
|
||||
volume,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta
|
||||
)
|
||||
@@ -886,7 +886,7 @@ class CreateVolumeFlowManagerImageCacheTestCase(test.TestCase):
|
||||
|
||||
mock_handle_bootable.assert_called_once_with(
|
||||
self.ctxt,
|
||||
volume['id'],
|
||||
volume,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta
|
||||
)
|
||||
@@ -953,7 +953,7 @@ class CreateVolumeFlowManagerImageCacheTestCase(test.TestCase):
|
||||
|
||||
mock_handle_bootable.assert_called_once_with(
|
||||
self.ctxt,
|
||||
volume['id'],
|
||||
volume,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta
|
||||
)
|
||||
@@ -1079,7 +1079,7 @@ class CreateVolumeFlowManagerImageCacheTestCase(test.TestCase):
|
||||
|
||||
mock_handle_bootable.assert_called_once_with(
|
||||
self.ctxt,
|
||||
volume['id'],
|
||||
volume,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta
|
||||
)
|
||||
|
||||
@@ -771,15 +771,14 @@ class VolumeCastTask(flow_utils.CinderTask):
|
||||
filter_properties['scheduler_hints'] = scheduler_hints
|
||||
self._cast_create_volume(context, request_spec, filter_properties)
|
||||
|
||||
def revert(self, context, result, flow_failures, **kwargs):
|
||||
def revert(self, context, result, flow_failures, volume, **kwargs):
|
||||
if isinstance(result, ft.Failure):
|
||||
return
|
||||
|
||||
# Restore the source volume status and set the volume to error status.
|
||||
volume_id = kwargs['volume_id']
|
||||
common.restore_source_status(context, self.db, kwargs)
|
||||
common.error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume_id)
|
||||
common.error_out(volume)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume.id)
|
||||
exc_info = False
|
||||
if all(flow_failures[-1].exc_info):
|
||||
exc_info = flow_failures[-1].exc_info
|
||||
|
||||
@@ -107,3 +107,21 @@ def error_out_volume(context, db, volume_id, reason=None):
|
||||
def error_out_snapshot(context, db, snapshot_id, reason=None):
|
||||
reason = _clean_reason(reason)
|
||||
_update_object(context, db, 'error', reason, 'snapshot', snapshot_id)
|
||||
|
||||
|
||||
def error_out(resource, reason=None):
|
||||
"""Sets status to error for any persistent OVO."""
|
||||
reason = _clean_reason(reason)
|
||||
try:
|
||||
LOG.debug('Setting %(object_type)s %(object_id)s to error due to: '
|
||||
'%(reason)s', {'object_type': resource.obj_name(),
|
||||
'object_id': resource.id,
|
||||
'reason': reason})
|
||||
resource.status = 'error'
|
||||
resource.save()
|
||||
except Exception:
|
||||
# Don't let this cause further exceptions.
|
||||
LOG.exception(_LE("Failed setting %(object_type)s %(object_id)s to "
|
||||
" error status."),
|
||||
{'object_type': resource.obj_name(),
|
||||
'object_id': resource.id})
|
||||
|
||||
@@ -60,7 +60,7 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
|
||||
def __init__(self, reschedule_context, db, scheduler_rpcapi,
|
||||
do_reschedule):
|
||||
requires = ['filter_properties', 'request_spec', 'volume_ref',
|
||||
requires = ['filter_properties', 'request_spec', 'volume',
|
||||
'context']
|
||||
super(OnFailureRescheduleTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
@@ -153,7 +153,7 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
|
||||
LOG.debug("Volume %s: re-scheduled", volume.id)
|
||||
|
||||
def revert(self, context, result, flow_failures, volume_ref, **kwargs):
|
||||
def revert(self, context, result, flow_failures, volume, **kwargs):
|
||||
# NOTE(dulek): Revert is occurring and manager need to know if
|
||||
# rescheduling happened. We're returning boolean flag that will
|
||||
# indicate that. It which will be available in flow engine store
|
||||
@@ -162,16 +162,16 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
# If do not want to be rescheduled, just set the volume's status to
|
||||
# error and return.
|
||||
if not self.do_reschedule:
|
||||
common.error_out_volume(context, self.db, volume_ref.id)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume_ref.id)
|
||||
common.error_out(volume)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume.id)
|
||||
return False
|
||||
|
||||
# Check if we have a cause which can tell us not to reschedule and
|
||||
# set the volume's status to error.
|
||||
for failure in flow_failures.values():
|
||||
if failure.check(*self.no_reschedule_types):
|
||||
common.error_out_volume(context, self.db, volume_ref.id)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume_ref.id)
|
||||
common.error_out(volume)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume.id)
|
||||
return False
|
||||
|
||||
# Use a different context when rescheduling.
|
||||
@@ -179,13 +179,13 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
cause = list(flow_failures.values())[0]
|
||||
context = self.reschedule_context
|
||||
try:
|
||||
self._pre_reschedule(volume_ref)
|
||||
self._reschedule(context, cause, volume=volume_ref, **kwargs)
|
||||
self._post_reschedule(volume_ref)
|
||||
self._pre_reschedule(volume)
|
||||
self._reschedule(context, cause, volume=volume, **kwargs)
|
||||
self._post_reschedule(volume)
|
||||
return True
|
||||
except exception.CinderException:
|
||||
LOG.exception(_LE("Volume %s: rescheduling failed"),
|
||||
volume_ref.id)
|
||||
volume.id)
|
||||
|
||||
return False
|
||||
|
||||
@@ -193,7 +193,7 @@ class OnFailureRescheduleTask(flow_utils.CinderTask):
|
||||
class ExtractVolumeRefTask(flow_utils.CinderTask):
|
||||
"""Extracts volume reference for given volume id."""
|
||||
|
||||
default_provides = 'volume_ref'
|
||||
default_provides = 'refreshed'
|
||||
|
||||
def __init__(self, db, host, set_error=True):
|
||||
super(ExtractVolumeRefTask, self).__init__(addons=[ACTION])
|
||||
@@ -201,21 +201,22 @@ class ExtractVolumeRefTask(flow_utils.CinderTask):
|
||||
self.host = host
|
||||
self.set_error = set_error
|
||||
|
||||
def execute(self, context, volume_id):
|
||||
def execute(self, context, volume):
|
||||
# NOTE(harlowja): this will fetch the volume from the database, if
|
||||
# the volume has been deleted before we got here then this should fail.
|
||||
#
|
||||
# In the future we might want to have a lock on the volume_id so that
|
||||
# the volume can not be deleted while its still being created?
|
||||
return objects.Volume.get_by_id(context, volume_id)
|
||||
volume.refresh()
|
||||
return volume
|
||||
|
||||
def revert(self, context, volume_id, result, **kwargs):
|
||||
def revert(self, context, volume, result, **kwargs):
|
||||
if isinstance(result, ft.Failure) or not self.set_error:
|
||||
return
|
||||
|
||||
reason = _('Volume create failed while extracting volume ref.')
|
||||
common.error_out_volume(context, self.db, volume_id, reason=reason)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume_id)
|
||||
common.error_out_volume(context, self.db, volume.id, reason=reason)
|
||||
LOG.error(_LE("Volume %s: create failed"), volume.id)
|
||||
|
||||
|
||||
class ExtractVolumeSpecTask(flow_utils.CinderTask):
|
||||
@@ -232,43 +233,43 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask):
|
||||
default_provides = 'volume_spec'
|
||||
|
||||
def __init__(self, db):
|
||||
requires = ['volume_ref', 'request_spec']
|
||||
requires = ['volume', 'request_spec']
|
||||
super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
self.db = db
|
||||
|
||||
def execute(self, context, volume_ref, request_spec):
|
||||
def execute(self, context, volume, request_spec):
|
||||
get_remote_image_service = glance.get_remote_image_service
|
||||
|
||||
volume_name = volume_ref['name']
|
||||
volume_size = utils.as_int(volume_ref['size'], quiet=False)
|
||||
volume_name = volume.name
|
||||
volume_size = utils.as_int(volume.size, quiet=False)
|
||||
|
||||
# Create a dictionary that will represent the volume to be so that
|
||||
# later tasks can easily switch between the different types and create
|
||||
# the volume according to the volume types specifications (which are
|
||||
# represented in this dictionary).
|
||||
specs = {
|
||||
'status': volume_ref['status'],
|
||||
'status': volume.status,
|
||||
'type': 'raw', # This will have the type of the volume to be
|
||||
# created, which should be one of [raw, snap,
|
||||
# source_vol, image]
|
||||
'volume_id': volume_ref['id'],
|
||||
'volume_id': volume.id,
|
||||
'volume_name': volume_name,
|
||||
'volume_size': volume_size,
|
||||
}
|
||||
|
||||
if volume_ref.get('snapshot_id'):
|
||||
if volume.snapshot_id:
|
||||
# We are making a snapshot based volume instead of a raw volume.
|
||||
specs.update({
|
||||
'type': 'snap',
|
||||
'snapshot_id': volume_ref['snapshot_id'],
|
||||
'snapshot_id': volume.snapshot_id,
|
||||
})
|
||||
elif volume_ref.get('source_volid'):
|
||||
elif volume.source_volid:
|
||||
# We are making a source based volume instead of a raw volume.
|
||||
#
|
||||
# NOTE(harlowja): This will likely fail if the source volume
|
||||
# disappeared by the time this call occurred.
|
||||
source_volid = volume_ref.get('source_volid')
|
||||
source_volid = volume.source_volid
|
||||
source_volume_ref = objects.Volume.get_by_id(context,
|
||||
source_volid)
|
||||
specs.update({
|
||||
@@ -276,7 +277,7 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask):
|
||||
# This is captured incase we have to revert and we want to set
|
||||
# back the source volume status to its original status. This
|
||||
# may or may not be sketchy to do??
|
||||
'source_volstatus': source_volume_ref['status'],
|
||||
'source_volstatus': source_volume_ref.status,
|
||||
'type': 'source_vol',
|
||||
})
|
||||
elif request_spec.get('source_replicaid'):
|
||||
@@ -289,7 +290,7 @@ class ExtractVolumeSpecTask(flow_utils.CinderTask):
|
||||
source_volid)
|
||||
specs.update({
|
||||
'source_replicaid': source_volid,
|
||||
'source_replicastatus': source_volume_ref['status'],
|
||||
'source_replicastatus': source_volume_ref.status,
|
||||
'type': 'source_replica',
|
||||
})
|
||||
elif request_spec.get('image_id'):
|
||||
@@ -334,19 +335,18 @@ class NotifyVolumeActionTask(flow_utils.CinderTask):
|
||||
self.db = db
|
||||
self.event_suffix = event_suffix
|
||||
|
||||
def execute(self, context, volume_ref):
|
||||
volume_id = volume_ref['id']
|
||||
def execute(self, context, volume):
|
||||
try:
|
||||
volume_utils.notify_about_volume_usage(context, volume_ref,
|
||||
volume_utils.notify_about_volume_usage(context, volume,
|
||||
self.event_suffix,
|
||||
host=volume_ref['host'])
|
||||
host=volume.host)
|
||||
except exception.CinderException:
|
||||
# If notification sending of volume database entry reading fails
|
||||
# then we shouldn't error out the whole workflow since this is
|
||||
# not always information that must be sent for volumes to operate
|
||||
LOG.exception(_LE("Failed notifying about the volume"
|
||||
" action %(event)s for volume %(volume_id)s"),
|
||||
{'event': self.event_suffix, 'volume_id': volume_id})
|
||||
{'event': self.event_suffix, 'volume_id': volume.id})
|
||||
|
||||
|
||||
class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
@@ -355,8 +355,6 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
Reversion strategy: N/A
|
||||
"""
|
||||
|
||||
default_provides = 'volume'
|
||||
|
||||
def __init__(self, manager, db, driver, image_volume_cache=None):
|
||||
super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])
|
||||
self.manager = manager
|
||||
@@ -364,7 +362,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
self.driver = driver
|
||||
self.image_volume_cache = image_volume_cache
|
||||
|
||||
def _handle_bootable_volume_glance_meta(self, context, volume_id,
|
||||
def _handle_bootable_volume_glance_meta(self, context, volume,
|
||||
**kwargs):
|
||||
"""Enable bootable flag and properly handle glance metadata.
|
||||
|
||||
@@ -380,7 +378,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
" %(src_id)s metadata")
|
||||
src_type = None
|
||||
src_id = None
|
||||
self._enable_bootable_flag(context, volume_id)
|
||||
self._enable_bootable_flag(context, volume)
|
||||
try:
|
||||
if kwargs.get('snapshot_id'):
|
||||
src_type = 'snapshot'
|
||||
@@ -388,31 +386,31 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
snapshot_id = src_id
|
||||
LOG.debug(log_template, {'src_type': src_type,
|
||||
'src_id': src_id,
|
||||
'vol_id': volume_id})
|
||||
'vol_id': volume.id})
|
||||
self.db.volume_glance_metadata_copy_to_volume(
|
||||
context, volume_id, snapshot_id)
|
||||
context, volume.id, snapshot_id)
|
||||
elif kwargs.get('source_volid'):
|
||||
src_type = 'source volume'
|
||||
src_id = kwargs['source_volid']
|
||||
source_volid = src_id
|
||||
LOG.debug(log_template, {'src_type': src_type,
|
||||
'src_id': src_id,
|
||||
'vol_id': volume_id})
|
||||
'vol_id': volume.id})
|
||||
self.db.volume_glance_metadata_copy_from_volume_to_volume(
|
||||
context,
|
||||
source_volid,
|
||||
volume_id)
|
||||
volume.id)
|
||||
elif kwargs.get('source_replicaid'):
|
||||
src_type = 'source replica'
|
||||
src_id = kwargs['source_replicaid']
|
||||
source_replicaid = src_id
|
||||
LOG.debug(log_template, {'src_type': src_type,
|
||||
'src_id': src_id,
|
||||
'vol_id': volume_id})
|
||||
'vol_id': volume.id})
|
||||
self.db.volume_glance_metadata_copy_from_volume_to_volume(
|
||||
context,
|
||||
source_replicaid,
|
||||
volume_id)
|
||||
volume.id)
|
||||
elif kwargs.get('image_id'):
|
||||
src_type = 'image'
|
||||
src_id = kwargs['image_id']
|
||||
@@ -420,8 +418,8 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
image_meta = kwargs.get('image_meta', {})
|
||||
LOG.debug(log_template, {'src_type': src_type,
|
||||
'src_id': src_id,
|
||||
'vol_id': volume_id})
|
||||
self._capture_volume_image_metadata(context, volume_id,
|
||||
'vol_id': volume.id})
|
||||
self._capture_volume_image_metadata(context, volume.id,
|
||||
image_id, image_meta)
|
||||
except exception.GlanceMetadataNotFound:
|
||||
# If volume is not created from image, No glance metadata
|
||||
@@ -431,14 +429,13 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
except exception.CinderException as ex:
|
||||
LOG.exception(exception_template, {'src_type': src_type,
|
||||
'src_id': src_id,
|
||||
'vol_id': volume_id})
|
||||
'vol_id': volume.id})
|
||||
raise exception.MetadataCopyFailure(reason=ex)
|
||||
|
||||
def _create_from_snapshot(self, context, volume_ref, snapshot_id,
|
||||
def _create_from_snapshot(self, context, volume, snapshot_id,
|
||||
**kwargs):
|
||||
volume_id = volume_ref['id']
|
||||
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
|
||||
model_update = self.driver.create_volume_from_snapshot(volume_ref,
|
||||
model_update = self.driver.create_volume_from_snapshot(volume,
|
||||
snapshot)
|
||||
# NOTE(harlowja): Subtasks would be useful here since after this
|
||||
# point the volume has already been created and further failures
|
||||
@@ -457,21 +454,22 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
'snapshot_ref_id': snapshot.volume_id})
|
||||
raise exception.MetadataUpdateFailure(reason=ex)
|
||||
if make_bootable:
|
||||
self._handle_bootable_volume_glance_meta(context, volume_id,
|
||||
self._handle_bootable_volume_glance_meta(context, volume,
|
||||
snapshot_id=snapshot_id)
|
||||
return model_update
|
||||
|
||||
def _enable_bootable_flag(self, context, volume_id):
|
||||
def _enable_bootable_flag(self, context, volume):
|
||||
try:
|
||||
LOG.debug('Marking volume %s as bootable.', volume_id)
|
||||
self.db.volume_update(context, volume_id, {'bootable': True})
|
||||
LOG.debug('Marking volume %s as bootable.', volume.id)
|
||||
volume.bootable = True
|
||||
volume.save()
|
||||
except exception.CinderException as ex:
|
||||
LOG.exception(_LE("Failed updating volume %(volume_id)s bootable "
|
||||
"flag to true"), {'volume_id': volume_id})
|
||||
"flag to true"), {'volume_id': volume.id})
|
||||
raise exception.MetadataUpdateFailure(reason=ex)
|
||||
|
||||
def _create_from_source_volume(self, context, volume_ref,
|
||||
source_volid, **kwargs):
|
||||
def _create_from_source_volume(self, context, volume, source_volid,
|
||||
**kwargs):
|
||||
# NOTE(harlowja): if the source volume has disappeared this will be our
|
||||
# detection of that since this database call should fail.
|
||||
#
|
||||
@@ -479,17 +477,17 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
# and we should have proper locks on the source volume while actions
|
||||
# that use the source volume are underway.
|
||||
srcvol_ref = objects.Volume.get_by_id(context, source_volid)
|
||||
model_update = self.driver.create_cloned_volume(volume_ref, srcvol_ref)
|
||||
model_update = self.driver.create_cloned_volume(volume, srcvol_ref)
|
||||
# NOTE(harlowja): Subtasks would be useful here since after this
|
||||
# point the volume has already been created and further failures
|
||||
# will not destroy the volume (although they could in the future).
|
||||
if srcvol_ref.bootable:
|
||||
self._handle_bootable_volume_glance_meta(
|
||||
context, volume_ref.id, source_volid=srcvol_ref.id)
|
||||
context, volume, source_volid=srcvol_ref.id)
|
||||
return model_update
|
||||
|
||||
def _create_from_source_replica(self, context, volume_ref,
|
||||
source_replicaid, **kwargs):
|
||||
def _create_from_source_replica(self, context, volume, source_replicaid,
|
||||
**kwargs):
|
||||
# NOTE(harlowja): if the source volume has disappeared this will be our
|
||||
# detection of that since this database call should fail.
|
||||
#
|
||||
@@ -497,7 +495,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
# and we should have proper locks on the source volume while actions
|
||||
# that use the source volume are underway.
|
||||
srcvol_ref = objects.Volume.get_by_id(context, source_replicaid)
|
||||
model_update = self.driver.create_replica_test_volume(volume_ref,
|
||||
model_update = self.driver.create_replica_test_volume(volume,
|
||||
srcvol_ref)
|
||||
# NOTE(harlowja): Subtasks would be useful here since after this
|
||||
# point the volume has already been created and further failures
|
||||
@@ -505,34 +503,33 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
if srcvol_ref.bootable:
|
||||
self._handle_bootable_volume_glance_meta(
|
||||
context,
|
||||
volume_ref['id'],
|
||||
volume,
|
||||
source_replicaid=source_replicaid)
|
||||
return model_update
|
||||
|
||||
def _copy_image_to_volume(self, context, volume_ref,
|
||||
def _copy_image_to_volume(self, context, volume,
|
||||
image_id, image_location, image_service):
|
||||
"""Downloads Glance image to the specified volume."""
|
||||
copy_image_to_volume = self.driver.copy_image_to_volume
|
||||
volume_id = volume_ref['id']
|
||||
LOG.debug("Attempting download of %(image_id)s (%(image_location)s)"
|
||||
" to volume %(volume_id)s.",
|
||||
{'image_id': image_id, 'volume_id': volume_id,
|
||||
{'image_id': image_id, 'volume_id': volume.id,
|
||||
'image_location': image_location})
|
||||
try:
|
||||
copy_image_to_volume(context, volume_ref, image_service, image_id)
|
||||
copy_image_to_volume(context, volume, image_service, image_id)
|
||||
except processutils.ProcessExecutionError as ex:
|
||||
LOG.exception(_LE("Failed to copy image %(image_id)s to volume: "
|
||||
"%(volume_id)s"),
|
||||
{'volume_id': volume_id, 'image_id': image_id})
|
||||
{'volume_id': volume.id, 'image_id': image_id})
|
||||
raise exception.ImageCopyFailure(reason=ex.stderr)
|
||||
except exception.ImageUnacceptable as ex:
|
||||
LOG.exception(_LE("Failed to copy image to volume: %(volume_id)s"),
|
||||
{'volume_id': volume_id})
|
||||
{'volume_id': volume.id})
|
||||
raise exception.ImageUnacceptable(ex)
|
||||
except Exception as ex:
|
||||
LOG.exception(_LE("Failed to copy image %(image_id)s to "
|
||||
"volume: %(volume_id)s"),
|
||||
{'volume_id': volume_id, 'image_id': image_id})
|
||||
{'volume_id': volume.id, 'image_id': image_id})
|
||||
if not isinstance(ex, exception.ImageCopyFailure):
|
||||
raise exception.ImageCopyFailure(reason=ex)
|
||||
else:
|
||||
@@ -540,7 +537,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
|
||||
LOG.debug("Downloaded image %(image_id)s (%(image_location)s)"
|
||||
" to volume %(volume_id)s successfully.",
|
||||
{'image_id': image_id, 'volume_id': volume_id,
|
||||
{'image_id': image_id, 'volume_id': volume.id,
|
||||
'image_location': image_location})
|
||||
|
||||
def _capture_volume_image_metadata(self, context, volume_id,
|
||||
@@ -632,7 +629,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
{'id': image_volume['id']})
|
||||
return None, False
|
||||
|
||||
def _create_from_image_download(self, context, volume_ref, image_location,
|
||||
def _create_from_image_download(self, context, volume, image_location,
|
||||
image_id, image_service):
|
||||
# TODO(harlowja): what needs to be rolled back in the clone if this
|
||||
# volume create fails?? Likely this should be a subflow or broken
|
||||
@@ -640,21 +637,21 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
# do we make said subflow/task which is only triggered in the
|
||||
# clone image 'path' resumable and revertable in the correct
|
||||
# manner.
|
||||
model_update = self.driver.create_volume(volume_ref)
|
||||
updates = dict(model_update or dict(), status='downloading')
|
||||
model_update = self.driver.create_volume(volume) or {}
|
||||
model_update['status'] = 'downloading'
|
||||
try:
|
||||
volume_ref = self.db.volume_update(context,
|
||||
volume_ref['id'], updates)
|
||||
volume.update(model_update)
|
||||
volume.save()
|
||||
except exception.CinderException:
|
||||
LOG.exception(_LE("Failed updating volume %(volume_id)s with "
|
||||
"%(updates)s"),
|
||||
{'volume_id': volume_ref['id'],
|
||||
'updates': updates})
|
||||
self._copy_image_to_volume(context, volume_ref,
|
||||
image_id, image_location, image_service)
|
||||
{'volume_id': volume.id,
|
||||
'updates': model_update})
|
||||
self._copy_image_to_volume(context, volume, image_id, image_location,
|
||||
image_service)
|
||||
return model_update
|
||||
|
||||
def _create_from_image_cache(self, context, internal_context, volume_ref,
|
||||
def _create_from_image_cache(self, context, internal_context, volume,
|
||||
image_id, image_meta):
|
||||
"""Attempt to create the volume using the image cache.
|
||||
|
||||
@@ -664,10 +661,10 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
"""
|
||||
LOG.debug('Attempting to retrieve cache entry for image = '
|
||||
'%(image_id)s on host %(host)s.',
|
||||
{'image_id': image_id, 'host': volume_ref['host']})
|
||||
{'image_id': image_id, 'host': volume.host})
|
||||
try:
|
||||
cache_entry = self.image_volume_cache.get_entry(internal_context,
|
||||
volume_ref,
|
||||
volume,
|
||||
image_id,
|
||||
image_meta)
|
||||
if cache_entry:
|
||||
@@ -675,7 +672,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
{'volume_id': cache_entry['volume_id']})
|
||||
model_update = self._create_from_source_volume(
|
||||
context,
|
||||
volume_ref,
|
||||
volume,
|
||||
cache_entry['volume_id']
|
||||
)
|
||||
return model_update, True
|
||||
@@ -685,18 +682,18 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
'%(exception)s'), {'exception': e})
|
||||
return None, False
|
||||
|
||||
def _create_from_image(self, context, volume_ref,
|
||||
def _create_from_image(self, context, volume,
|
||||
image_location, image_id, image_meta,
|
||||
image_service, **kwargs):
|
||||
LOG.debug("Cloning %(volume_id)s from image %(image_id)s "
|
||||
" at location %(image_location)s.",
|
||||
{'volume_id': volume_ref['id'],
|
||||
{'volume_id': volume.id,
|
||||
'image_location': image_location, 'image_id': image_id})
|
||||
|
||||
virtual_size = image_meta.get('virtual_size')
|
||||
if virtual_size:
|
||||
virtual_size = image_utils.check_virtual_size(virtual_size,
|
||||
volume_ref.size,
|
||||
volume.size,
|
||||
image_id)
|
||||
|
||||
# Create the volume from an image.
|
||||
@@ -707,7 +704,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
# dict containing provider_location for cloned volume
|
||||
# and clone status.
|
||||
model_update, cloned = self.driver.clone_image(context,
|
||||
volume_ref,
|
||||
volume,
|
||||
image_location,
|
||||
image_meta,
|
||||
image_service)
|
||||
@@ -715,7 +712,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
# Try and clone the image if we have it set as a glance location.
|
||||
if not cloned and 'cinder' in CONF.allowed_direct_url_schemes:
|
||||
model_update, cloned = self._clone_image_volume(context,
|
||||
volume_ref,
|
||||
volume,
|
||||
image_location,
|
||||
image_meta)
|
||||
# Try and use the image cache.
|
||||
@@ -729,7 +726,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
model_update, cloned = self._create_from_image_cache(
|
||||
context,
|
||||
internal_context,
|
||||
volume_ref,
|
||||
volume,
|
||||
image_id,
|
||||
image_meta
|
||||
)
|
||||
@@ -738,7 +735,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
|
||||
# Fall back to default behavior of creating volume,
|
||||
# download the image data and copy it into the volume.
|
||||
original_size = volume_ref['size']
|
||||
original_size = volume.size
|
||||
try:
|
||||
if not cloned:
|
||||
with image_utils.TemporaryImages.fetch(
|
||||
@@ -748,16 +745,16 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
data = image_utils.qemu_img_info(tmp_image)
|
||||
|
||||
virtual_size = image_utils.check_virtual_size(
|
||||
data.virtual_size, volume_ref.size, image_id)
|
||||
data.virtual_size, volume.size, image_id)
|
||||
|
||||
if should_create_cache_entry:
|
||||
if virtual_size and virtual_size != original_size:
|
||||
volume_ref.size = virtual_size
|
||||
volume_ref.save()
|
||||
volume.size = virtual_size
|
||||
volume.save()
|
||||
|
||||
model_update = self._create_from_image_download(
|
||||
context,
|
||||
volume_ref,
|
||||
volume,
|
||||
image_location,
|
||||
image_id,
|
||||
image_service
|
||||
@@ -767,10 +764,10 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
# Update the newly created volume db entry before we clone it
|
||||
# for the image-volume creation.
|
||||
if model_update:
|
||||
volume_ref.update(model_update)
|
||||
volume_ref.save()
|
||||
volume.update(model_update)
|
||||
volume.save()
|
||||
self.manager._create_image_cache_volume_entry(internal_context,
|
||||
volume_ref,
|
||||
volume,
|
||||
image_id,
|
||||
image_meta)
|
||||
finally:
|
||||
@@ -778,24 +775,24 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
# what was originally requested. If an exception has occurred we
|
||||
# still need to put this back before letting it be raised further
|
||||
# up the stack.
|
||||
if volume_ref.size != original_size:
|
||||
self.driver.extend_volume(volume_ref, original_size)
|
||||
volume_ref.size = original_size
|
||||
volume_ref.save()
|
||||
if volume.size != original_size:
|
||||
self.driver.extend_volume(volume, original_size)
|
||||
volume.size = original_size
|
||||
volume.save()
|
||||
|
||||
self._handle_bootable_volume_glance_meta(context, volume_ref.id,
|
||||
self._handle_bootable_volume_glance_meta(context, volume,
|
||||
image_id=image_id,
|
||||
image_meta=image_meta)
|
||||
return model_update
|
||||
|
||||
def _create_raw_volume(self, volume_ref, **kwargs):
|
||||
return self.driver.create_volume(volume_ref)
|
||||
def _create_raw_volume(self, volume, **kwargs):
|
||||
return self.driver.create_volume(volume)
|
||||
|
||||
def execute(self, context, volume_ref, volume_spec):
|
||||
def execute(self, context, volume, volume_spec):
|
||||
volume_spec = dict(volume_spec)
|
||||
volume_id = volume_spec.pop('volume_id', None)
|
||||
if not volume_id:
|
||||
volume_id = volume_ref['id']
|
||||
volume_id = volume.id
|
||||
|
||||
# we can't do anything if the driver didn't init
|
||||
if not self.driver.initialized:
|
||||
@@ -810,21 +807,19 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
{'volume_spec': volume_spec, 'volume_id': volume_id,
|
||||
'create_type': create_type})
|
||||
if create_type == 'raw':
|
||||
model_update = self._create_raw_volume(volume_ref=volume_ref,
|
||||
**volume_spec)
|
||||
model_update = self._create_raw_volume(volume, **volume_spec)
|
||||
elif create_type == 'snap':
|
||||
model_update = self._create_from_snapshot(context,
|
||||
volume_ref=volume_ref,
|
||||
model_update = self._create_from_snapshot(context, volume,
|
||||
**volume_spec)
|
||||
elif create_type == 'source_vol':
|
||||
model_update = self._create_from_source_volume(
|
||||
context, volume_ref=volume_ref, **volume_spec)
|
||||
context, volume, **volume_spec)
|
||||
elif create_type == 'source_replica':
|
||||
model_update = self._create_from_source_replica(
|
||||
context, volume_ref=volume_ref, **volume_spec)
|
||||
context, volume, **volume_spec)
|
||||
elif create_type == 'image':
|
||||
model_update = self._create_from_image(context,
|
||||
volume_ref=volume_ref,
|
||||
volume,
|
||||
**volume_spec)
|
||||
else:
|
||||
raise exception.VolumeTypeNotFound(volume_type_id=create_type)
|
||||
@@ -832,8 +827,8 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
# Persist any model information provided on creation.
|
||||
try:
|
||||
if model_update:
|
||||
volume_ref.update(model_update)
|
||||
volume_ref.save()
|
||||
volume.update(model_update)
|
||||
volume.save()
|
||||
except exception.CinderException:
|
||||
# If somehow the update failed we want to ensure that the
|
||||
# failure is logged (but not try rescheduling since the volume at
|
||||
@@ -843,8 +838,6 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
|
||||
{'volume_id': volume_id, 'model': model_update})
|
||||
raise
|
||||
|
||||
return volume_ref
|
||||
|
||||
|
||||
class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
|
||||
"""On successful volume creation this will perform final volume actions.
|
||||
@@ -891,7 +884,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
|
||||
'volume_id': volume.id})
|
||||
|
||||
|
||||
def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume_id,
|
||||
def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume,
|
||||
allow_reschedule, reschedule_context, request_spec,
|
||||
filter_properties, image_volume_cache=None):
|
||||
|
||||
@@ -920,7 +913,7 @@ def get_flow(context, manager, db, driver, scheduler_rpcapi, host, volume_id,
|
||||
'context': context,
|
||||
'filter_properties': filter_properties,
|
||||
'request_spec': request_spec,
|
||||
'volume_id': volume_id,
|
||||
'volume': volume,
|
||||
}
|
||||
|
||||
volume_flow.add(ExtractVolumeRefTask(db, host, set_error=False))
|
||||
|
||||
@@ -575,7 +575,7 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
self.driver,
|
||||
self.scheduler_rpcapi,
|
||||
self.host,
|
||||
volume.id,
|
||||
volume,
|
||||
allow_reschedule,
|
||||
context,
|
||||
request_spec,
|
||||
@@ -614,7 +614,6 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
# NOTE(dulek): Flag to indicate if volume was rescheduled. Used to
|
||||
# decide if allocated_capacity should be incremented.
|
||||
rescheduled = False
|
||||
vol_ref = None
|
||||
|
||||
try:
|
||||
if locked_action is None:
|
||||
@@ -624,7 +623,7 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
_run_flow()
|
||||
finally:
|
||||
try:
|
||||
vol_ref = flow_engine.storage.fetch('volume_ref')
|
||||
flow_engine.storage.fetch('refreshed')
|
||||
except tfe.NotFound:
|
||||
# If there's no vol_ref, then flow is reverted. Lets check out
|
||||
# if rescheduling occurred.
|
||||
@@ -636,16 +635,12 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
pass
|
||||
|
||||
if not rescheduled:
|
||||
if not vol_ref:
|
||||
# Flow was reverted and not rescheduled, fetching
|
||||
# volume_ref from the DB, because it will be needed.
|
||||
vol_ref = objects.Volume.get_by_id(context, volume.id)
|
||||
# NOTE(dulek): Volume wasn't rescheduled so we need to update
|
||||
# volume stats as these are decremented on delete.
|
||||
self._update_allocated_capacity(vol_ref)
|
||||
self._update_allocated_capacity(volume)
|
||||
|
||||
LOG.info(_LI("Created volume successfully."), resource=vol_ref)
|
||||
return vol_ref.id
|
||||
LOG.info(_LI("Created volume successfully."), resource=volume)
|
||||
return volume.id
|
||||
|
||||
# FIXME(bluex): replace volume_id with volume.id when volume_id is removed
|
||||
@coordination.synchronized('{volume_id}-{f_name}')
|
||||
|
||||
Reference in New Issue
Block a user