Merge "Refactor API create_volume flow"
This commit is contained in:
commit
06e5b315ef
|
@ -277,18 +277,14 @@ class API(base.Base):
|
|||
'multiattach': multiattach,
|
||||
}
|
||||
try:
|
||||
if cgsnapshot:
|
||||
flow_engine = create_volume.get_flow_no_rpc(self.db,
|
||||
self.image_service,
|
||||
availability_zones,
|
||||
create_what)
|
||||
else:
|
||||
flow_engine = create_volume.get_flow(self.scheduler_rpcapi,
|
||||
self.volume_rpcapi,
|
||||
self.db,
|
||||
self.image_service,
|
||||
availability_zones,
|
||||
create_what)
|
||||
sched_rpcapi = self.scheduler_rpcapi if not cgsnapshot else None
|
||||
volume_rpcapi = self.volume_rpcapi if not cgsnapshot else None
|
||||
flow_engine = create_volume.get_flow(self.db,
|
||||
self.image_service,
|
||||
availability_zones,
|
||||
create_what,
|
||||
sched_rpcapi,
|
||||
volume_rpcapi)
|
||||
except Exception:
|
||||
msg = _('Failed to create api volume flow.')
|
||||
LOG.exception(msg)
|
||||
|
|
|
@ -72,117 +72,64 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
|
|||
self.availability_zones = availability_zones
|
||||
|
||||
@staticmethod
|
||||
def _extract_consistencygroup(consistencygroup):
|
||||
"""Extracts the consistencygroup id from the provided consistencygroup.
|
||||
def _extract_resource(resource, allowed_vals, exc, resource_name,
|
||||
props=('status',)):
|
||||
"""Extracts the resource id from the provided resource.
|
||||
|
||||
This function validates the input consistencygroup dict and checks that
|
||||
the status of that consistencygroup is valid for creating a volume in.
|
||||
This method validates the input resource dict and checks that the
|
||||
properties which names are passed in `props` argument match
|
||||
corresponding lists in `allowed` argument. In case of mismatch
|
||||
exception of type exc is raised.
|
||||
|
||||
:param resource: Resource dict.
|
||||
:param allowed_vals: Tuple of allowed values lists.
|
||||
:param exc: Exception type to raise.
|
||||
:param resource_name: Name of resource - used to construct log message.
|
||||
:param props: Tuple of resource properties names to validate.
|
||||
:return: Id of a resource.
|
||||
"""
|
||||
|
||||
consistencygroup_id = None
|
||||
if consistencygroup is not None:
|
||||
if consistencygroup['status'] not in CG_PROCEED_STATUS:
|
||||
msg = _("Originating consistencygroup status must be one"
|
||||
" of '%s' values")
|
||||
msg = msg % (", ".join(CG_PROCEED_STATUS))
|
||||
raise exception.InvalidConsistencyGroup(reason=msg)
|
||||
consistencygroup_id = consistencygroup['id']
|
||||
return consistencygroup_id
|
||||
resource_id = None
|
||||
if resource:
|
||||
for prop, allowed_states in zip(props, allowed_vals):
|
||||
if resource[prop] not in allowed_states:
|
||||
msg = _("Originating %(res)s %(prop)s must be one of"
|
||||
"'%(vals)s' values")
|
||||
msg = msg % {'res': resource_name,
|
||||
'prop': prop,
|
||||
'vals': ', '.join(allowed_states)}
|
||||
# TODO(harlowja): what happens if the status changes after
|
||||
# this initial resource status check occurs??? Seems like
|
||||
# someone could delete the resource after this check passes
|
||||
# but before the volume is officially created?
|
||||
raise exc(reason=msg)
|
||||
resource_id = resource['id']
|
||||
return resource_id
|
||||
|
||||
@staticmethod
|
||||
def _extract_cgsnapshot(cgsnapshot):
|
||||
"""Extracts the cgsnapshot id from the provided cgsnapshot.
|
||||
def _extract_consistencygroup(self, consistencygroup):
|
||||
return self._extract_resource(consistencygroup, (CG_PROCEED_STATUS,),
|
||||
exception.InvalidConsistencyGroup,
|
||||
'consistencygroup')
|
||||
|
||||
This function validates the input cgsnapshot dict and checks that
|
||||
the status of that cgsnapshot is valid for creating a cg from.
|
||||
"""
|
||||
def _extract_cgsnapshot(self, cgsnapshot):
|
||||
return self._extract_resource(cgsnapshot, (CGSNAPSHOT_PROCEED_STATUS,),
|
||||
exception.InvalidCgSnapshot,
|
||||
'CGSNAPSHOT')
|
||||
|
||||
cgsnapshot_id = None
|
||||
if cgsnapshot:
|
||||
if cgsnapshot['status'] not in CGSNAPSHOT_PROCEED_STATUS:
|
||||
msg = _("Originating CGSNAPSHOT status must be one"
|
||||
" of '%s' values")
|
||||
msg = msg % (", ".join(CGSNAPSHOT_PROCEED_STATUS))
|
||||
raise exception.InvalidCgSnapshot(reason=msg)
|
||||
cgsnapshot_id = cgsnapshot['id']
|
||||
return cgsnapshot_id
|
||||
def _extract_snapshot(self, snapshot):
|
||||
return self._extract_resource(snapshot, (SNAPSHOT_PROCEED_STATUS,),
|
||||
exception.InvalidSnapshot, 'snapshot')
|
||||
|
||||
@staticmethod
|
||||
def _extract_snapshot(snapshot):
|
||||
"""Extracts the snapshot id from the provided snapshot (if provided).
|
||||
def _extract_source_volume(self, source_volume):
|
||||
return self._extract_resource(source_volume, (SRC_VOL_PROCEED_STATUS,),
|
||||
exception.InvalidVolume, 'source volume')
|
||||
|
||||
This function validates the input snapshot dict and checks that the
|
||||
status of that snapshot is valid for creating a volume from.
|
||||
"""
|
||||
|
||||
snapshot_id = None
|
||||
if snapshot is not None:
|
||||
if snapshot['status'] not in SNAPSHOT_PROCEED_STATUS:
|
||||
msg = _("Originating snapshot status must be one"
|
||||
" of %s values")
|
||||
msg = msg % (", ".join(SNAPSHOT_PROCEED_STATUS))
|
||||
# TODO(harlowja): what happens if the status changes after this
|
||||
# initial snapshot status check occurs??? Seems like someone
|
||||
# could delete the snapshot after this check passes but before
|
||||
# the volume is officially created?
|
||||
raise exception.InvalidSnapshot(reason=msg)
|
||||
snapshot_id = snapshot['id']
|
||||
return snapshot_id
|
||||
|
||||
@staticmethod
|
||||
def _extract_source_volume(source_volume):
|
||||
"""Extracts the volume id from the provided volume (if provided).
|
||||
|
||||
This function validates the input source_volume dict and checks that
|
||||
the status of that source_volume is valid for creating a volume from.
|
||||
"""
|
||||
|
||||
source_volid = None
|
||||
if source_volume is not None:
|
||||
if source_volume['status'] not in SRC_VOL_PROCEED_STATUS:
|
||||
msg = _("Unable to create a volume from an originating source"
|
||||
" volume when its status is not one of %s"
|
||||
" values")
|
||||
msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
|
||||
# TODO(harlowja): what happens if the status changes after this
|
||||
# initial volume status check occurs??? Seems like someone
|
||||
# could delete the volume after this check passes but before
|
||||
# the volume is officially created?
|
||||
raise exception.InvalidVolume(reason=msg)
|
||||
source_volid = source_volume['id']
|
||||
return source_volid
|
||||
|
||||
@staticmethod
|
||||
def _extract_source_replica(source_replica):
|
||||
"""Extracts the volume id from the provided replica (if provided).
|
||||
|
||||
This function validates the input replica_volume dict and checks that
|
||||
the status of that replica_volume is valid for creating a volume from.
|
||||
"""
|
||||
|
||||
source_replicaid = None
|
||||
if source_replica is not None:
|
||||
if source_replica['status'] not in SRC_VOL_PROCEED_STATUS:
|
||||
msg = _("Unable to create a volume from an originating source"
|
||||
" volume when its status is not one of %s"
|
||||
" values")
|
||||
msg = msg % (", ".join(SRC_VOL_PROCEED_STATUS))
|
||||
# TODO(harlowja): what happens if the status changes after this
|
||||
# initial volume status check occurs??? Seems like someone
|
||||
# could delete the volume after this check passes but before
|
||||
# the volume is officially created?
|
||||
raise exception.InvalidVolume(reason=msg)
|
||||
replication_status = source_replica['replication_status']
|
||||
if replication_status not in REPLICA_PROCEED_STATUS:
|
||||
msg = _("Unable to create a volume from a replica"
|
||||
" when replication status is not one of %s"
|
||||
" values")
|
||||
msg = msg % (", ".join(REPLICA_PROCEED_STATUS))
|
||||
# TODO(ronenkat): what happens if the replication status
|
||||
# changes after this initial volume status check occurs???
|
||||
raise exception.InvalidVolume(reason=msg)
|
||||
source_replicaid = source_replica['id']
|
||||
return source_replicaid
|
||||
def _extract_source_replica(self, source_replica):
|
||||
return self._extract_resource(source_replica, (SRC_VOL_PROCEED_STATUS,
|
||||
REPLICA_PROCEED_STATUS),
|
||||
exception.InvalidVolume,
|
||||
'replica', ('status',
|
||||
'replication_status'))
|
||||
|
||||
@staticmethod
|
||||
def _extract_size(size, source_volume, snapshot):
|
||||
|
@ -328,6 +275,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
|
|||
else:
|
||||
# For backwards compatibility use the storage_availability_zone
|
||||
availability_zone = CONF.storage_availability_zone
|
||||
|
||||
if availability_zone not in self.availability_zones:
|
||||
msg = _("Availability zone '%s' is invalid") % (availability_zone)
|
||||
LOG.warning(msg)
|
||||
|
@ -379,27 +327,22 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
|
|||
return encryption_key_id
|
||||
|
||||
def _get_volume_type_id(self, volume_type, source_volume, snapshot):
|
||||
volume_type_id = None
|
||||
if not volume_type and source_volume:
|
||||
volume_type_id = source_volume['volume_type_id']
|
||||
return source_volume['volume_type_id']
|
||||
elif snapshot is not None:
|
||||
if volume_type:
|
||||
current_volume_type_id = volume_type.get('id')
|
||||
if (current_volume_type_id !=
|
||||
snapshot['volume_type_id']):
|
||||
if current_volume_type_id != snapshot['volume_type_id']:
|
||||
msg = _LW("Volume type will be changed to "
|
||||
"be the same as the source volume.")
|
||||
LOG.warning(msg)
|
||||
volume_type_id = snapshot['volume_type_id']
|
||||
return snapshot['volume_type_id']
|
||||
else:
|
||||
volume_type_id = volume_type.get('id')
|
||||
|
||||
return volume_type_id
|
||||
return volume_type.get('id')
|
||||
|
||||
def execute(self, context, size, snapshot, image_id, source_volume,
|
||||
availability_zone, volume_type, metadata,
|
||||
key_manager, source_replica,
|
||||
consistencygroup, cgsnapshot):
|
||||
availability_zone, volume_type, metadata, key_manager,
|
||||
source_replica, consistencygroup, cgsnapshot):
|
||||
|
||||
utils.check_exclusive_options(snapshot=snapshot,
|
||||
imageRef=image_id,
|
||||
|
@ -425,7 +368,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
|
|||
# their volume type matches the source volume is too convoluted. We
|
||||
# should copy encryption metadata from the encrypted volume type to the
|
||||
# volume upon creation and propagate that information to each snapshot.
|
||||
# This strategy avoid any dependency upon the encrypted volume type.
|
||||
# This strategy avoids any dependency upon the encrypted volume type.
|
||||
def_vol_type = volume_types.get_default_volume_type()
|
||||
if not volume_type and not source_volume and not snapshot:
|
||||
volume_type = def_vol_type
|
||||
|
@ -532,14 +475,15 @@ class EntryCreateTask(flow_utils.CinderTask):
|
|||
}
|
||||
|
||||
def revert(self, context, result, optional_args, **kwargs):
|
||||
# We never produced a result and therefore can't destroy anything.
|
||||
if isinstance(result, ft.Failure):
|
||||
# We never produced a result and therefore can't destroy anything.
|
||||
return
|
||||
|
||||
if optional_args['is_quota_committed']:
|
||||
# Committed quota doesn't rollback as the volume has already been
|
||||
# created at this point, and the quota has already been absorbed.
|
||||
# If quota got commited we shouldn't rollback as the volume has
|
||||
# already been created and the quota has already been absorbed.
|
||||
return
|
||||
|
||||
vol_id = result['volume_id']
|
||||
try:
|
||||
self.db.volume_destroy(context.elevated(), vol_id)
|
||||
|
@ -594,7 +538,7 @@ class QuotaReserveTask(flow_utils.CinderTask):
|
|||
usages = e.kwargs['usages']
|
||||
|
||||
def _consumed(name):
|
||||
return (usages[name]['reserved'] + usages[name]['in_use'])
|
||||
return usages[name]['reserved'] + usages[name]['in_use']
|
||||
|
||||
def _is_over(name):
|
||||
for over in overs:
|
||||
|
@ -675,6 +619,7 @@ class QuotaCommitTask(flow_utils.CinderTask):
|
|||
# We never produced a result and therefore can't destroy anything.
|
||||
if isinstance(result, ft.Failure):
|
||||
return
|
||||
|
||||
volume = result['volume_properties']
|
||||
try:
|
||||
reserve_opts = {'volumes': -1, 'gigabytes': -volume['size']}
|
||||
|
@ -695,10 +640,11 @@ class QuotaCommitTask(flow_utils.CinderTask):
|
|||
class VolumeCastTask(flow_utils.CinderTask):
|
||||
"""Performs a volume create cast to the scheduler or to the volume manager.
|
||||
|
||||
This which will signal a transition of the api workflow to another child
|
||||
and/or related workflow on another component.
|
||||
This will signal a transition of the api workflow to another child and/or
|
||||
related workflow on another component.
|
||||
|
||||
Reversion strategy: N/A
|
||||
Reversion strategy: rollback source volume status and error out newly
|
||||
created volume.
|
||||
"""
|
||||
|
||||
def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
|
||||
|
@ -718,23 +664,23 @@ class VolumeCastTask(flow_utils.CinderTask):
|
|||
volume_id = request_spec['volume_id']
|
||||
snapshot_id = request_spec['snapshot_id']
|
||||
image_id = request_spec['image_id']
|
||||
group_id = request_spec['consistencygroup_id']
|
||||
cgroup_id = request_spec['consistencygroup_id']
|
||||
host = None
|
||||
cgsnapshot_id = request_spec['cgsnapshot_id']
|
||||
|
||||
if group_id:
|
||||
group = self.db.consistencygroup_get(context, group_id)
|
||||
if group:
|
||||
host = group.get('host', None)
|
||||
if cgroup_id:
|
||||
cgroup = self.db.consistencygroup_get(context, cgroup_id)
|
||||
if cgroup:
|
||||
host = cgroup.get('host', None)
|
||||
elif snapshot_id and CONF.snapshot_same_host:
|
||||
# NOTE(Rongze Zhu): A simple solution for bug 1008866.
|
||||
#
|
||||
# If snapshot_id is set, make the call create volume directly to
|
||||
# the volume host where the snapshot resides instead of passing it
|
||||
# through the scheduler. So snapshot can be copy to new volume.
|
||||
# If snapshot_id is set and CONF.snapshot_same_host is True, make
|
||||
# the call create volume directly to the volume host where the
|
||||
# snapshot resides instead of passing it through the scheduler, so
|
||||
# snapshot can be copied to the new volume.
|
||||
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
|
||||
source_volume_ref = self.db.volume_get(context,
|
||||
snapshot.volume_id)
|
||||
source_volume_ref = self.db.volume_get(context, snapshot.volume_id)
|
||||
host = source_volume_ref['host']
|
||||
elif source_volid:
|
||||
source_volume_ref = self.db.volume_get(context, source_volid)
|
||||
|
@ -772,7 +718,7 @@ class VolumeCastTask(flow_utils.CinderTask):
|
|||
image_id=image_id,
|
||||
source_volid=source_volid,
|
||||
source_replicaid=source_replicaid,
|
||||
consistencygroup_id=group_id)
|
||||
consistencygroup_id=cgroup_id)
|
||||
|
||||
def execute(self, context, **kwargs):
|
||||
scheduler_hints = kwargs.pop('scheduler_hints', None)
|
||||
|
@ -797,9 +743,8 @@ class VolumeCastTask(flow_utils.CinderTask):
|
|||
LOG.error(_LE('Unexpected build error:'), exc_info=exc_info)
|
||||
|
||||
|
||||
def get_flow(scheduler_rpcapi, volume_rpcapi, db_api,
|
||||
image_service_api, availability_zones,
|
||||
create_what):
|
||||
def get_flow(db_api, image_service_api, availability_zones, create_what,
|
||||
scheduler_rpcapi=None, volume_rpcapi=None):
|
||||
"""Constructs and returns the api entrypoint flow.
|
||||
|
||||
This flow will do the following:
|
||||
|
@ -825,39 +770,10 @@ def get_flow(scheduler_rpcapi, volume_rpcapi, db_api,
|
|||
EntryCreateTask(db_api),
|
||||
QuotaCommitTask())
|
||||
|
||||
# This will cast it out to either the scheduler or volume manager via
|
||||
# the rpc apis provided.
|
||||
api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
|
||||
|
||||
# Now load (but do not run) the flow using the provided initial data.
|
||||
return taskflow.engines.load(api_flow, store=create_what)
|
||||
|
||||
|
||||
def get_flow_no_rpc(db_api, image_service_api, availability_zones,
|
||||
create_what):
|
||||
"""Constructs and returns the api entrypoint flow.
|
||||
|
||||
This flow will do the following:
|
||||
|
||||
1. Inject keys & values for dependent tasks.
|
||||
2. Extracts and validates the input keys & values.
|
||||
3. Reserves the quota (reverts quota on any failures).
|
||||
4. Creates the database entry.
|
||||
5. Commits the quota.
|
||||
"""
|
||||
|
||||
flow_name = ACTION.replace(":", "_") + "_api"
|
||||
api_flow = linear_flow.Flow(flow_name)
|
||||
|
||||
api_flow.add(ExtractVolumeRequestTask(
|
||||
image_service_api,
|
||||
availability_zones,
|
||||
rebind={'size': 'raw_size',
|
||||
'availability_zone': 'raw_availability_zone',
|
||||
'volume_type': 'raw_volume_type'}))
|
||||
api_flow.add(QuotaReserveTask(),
|
||||
EntryCreateTask(db_api),
|
||||
QuotaCommitTask())
|
||||
if scheduler_rpcapi and volume_rpcapi:
|
||||
# This will cast it out to either the scheduler or volume manager via
|
||||
# the rpc apis provided.
|
||||
api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db_api))
|
||||
|
||||
# Now load (but do not run) the flow using the provided initial data.
|
||||
return taskflow.engines.load(api_flow, store=create_what)
|
||||
|
|
Loading…
Reference in New Issue