cinder/cinder/volume/flows/manager/manage_existing_snapshot.py

345 lines
14 KiB
Python

# Copyright (c) 2015 Huawei Technologies Co., Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow.types import failure as ft
from taskflow.utils import misc
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _, _LE, _LI
from cinder import objects
from cinder import quota
from cinder.volume.flows import common as flow_common
from cinder.volume import utils as volume_utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
QUOTAS = quota.QUOTAS
ACTION = 'snapshot:manage_existing'
class ExtractSnapshotRefTask(flow_utils.CinderTask):
"""Extracts snapshot reference for given snapshot id."""
default_provides = 'snapshot_ref'
def __init__(self, db):
super(ExtractSnapshotRefTask, self).__init__(addons=[ACTION])
self.db = db
def execute(self, context, snapshot_id):
# NOTE(wanghao): this will fetch the snapshot from the database, if
# the snapshot has been deleted before we got here then this should
# fail.
#
# In the future we might want to have a lock on the snapshot_id so that
# the snapshot can not be deleted while its still being created?
snapshot_ref = objects.Snapshot.get_by_id(context, snapshot_id)
LOG.debug("ExtractSnapshotRefTask return"
" snapshot_ref: %s", snapshot_ref)
return snapshot_ref
def revert(self, context, snapshot_id, result, **kwargs):
if isinstance(result, misc.Failure):
return
flow_common.error_out_snapshot(context, self.db, snapshot_id)
LOG.error(_LE("Snapshot %s: create failed"), snapshot_id)
class NotifySnapshotActionTask(flow_utils.CinderTask):
"""Performs a notification about the given snapshot when called.
Reversion strategy: N/A
"""
def __init__(self, db, event_suffix, host):
super(NotifySnapshotActionTask, self).__init__(addons=[ACTION,
event_suffix])
self.db = db
self.event_suffix = event_suffix
self.host = host
def execute(self, context, snapshot_ref):
snapshot_id = snapshot_ref['id']
try:
volume_utils.notify_about_snapshot_usage(context, snapshot_ref,
self.event_suffix,
host=self.host)
except exception.CinderException:
# If notification sending of snapshot database entry reading fails
# then we shouldn't error out the whole workflow since this is
# not always information that must be sent for snapshots to operate
LOG.exception(_LE("Failed notifying about the snapshot "
"action %(event)s for snapshot %(snp_id)s."),
{'event': self.event_suffix,
'snp_id': snapshot_id})
class PrepareForQuotaReservationTask(flow_utils.CinderTask):
"""Gets the snapshot size from the driver."""
default_provides = set(['size', 'snapshot_properties'])
def __init__(self, db, driver):
super(PrepareForQuotaReservationTask, self).__init__(addons=[ACTION])
self.db = db
self.driver = driver
def execute(self, context, snapshot_ref, manage_existing_ref):
snapshot_id = snapshot_ref['id']
if not self.driver.initialized:
driver_name = (self.driver.configuration.
safe_get('volume_backend_name'))
LOG.error(_LE("Unable to manage existing snapshot. "
"Volume driver %s not initialized."), driver_name)
flow_common.error_out_snapshot(context, self.db, snapshot_id,
reason=_("Volume driver %s "
"not initialized.") %
driver_name)
raise exception.DriverNotInitialized()
size = self.driver.manage_existing_snapshot_get_size(
snapshot=snapshot_ref,
existing_ref=manage_existing_ref)
return {'size': size,
'snapshot_properties': snapshot_ref}
class QuotaReserveTask(flow_utils.CinderTask):
"""Reserves a single snapshot with the given size.
Reversion strategy: rollback the quota reservation.
Warning Warning: if the process that is running this reserve and commit
process fails (or is killed before the quota is rolled back or committed
it does appear like the quota will never be rolled back). This makes
software upgrades hard (inflight operations will need to be stopped or
allowed to complete before the upgrade can occur). *In the future* when
taskflow has persistence built-in this should be easier to correct via
an automated or manual process.
"""
default_provides = set(['reservations'])
def __init__(self):
super(QuotaReserveTask, self).__init__(addons=[ACTION])
def execute(self, context, size, optional_args):
try:
if CONF.no_snapshot_gb_quota:
reserve_opts = {'snapshots': 1}
else:
reserve_opts = {'snapshots': 1, 'gigabytes': size}
reservations = QUOTAS.reserve(context, **reserve_opts)
return {
'reservations': reservations,
}
except exception.OverQuota as e:
overs = e.kwargs['overs']
quotas = e.kwargs['quotas']
usages = e.kwargs['usages']
volume_utils.process_reserve_over_quota(context, overs, usages,
quotas, size)
def revert(self, context, result, optional_args, **kwargs):
# We never produced a result and therefore can't destroy anything.
if isinstance(result, misc.Failure):
return
if optional_args['is_quota_committed']:
# The reservations have already been committed and can not be
# rolled back at this point.
return
# We actually produced an output that we can revert so lets attempt
# to use said output to rollback the reservation.
reservations = result['reservations']
try:
QUOTAS.rollback(context, reservations)
except exception.CinderException:
# We are already reverting, therefore we should silence this
# exception since a second exception being active will be bad.
LOG.exception(_LE("Failed rolling back quota for"
" %s reservations."), reservations)
class QuotaCommitTask(flow_utils.CinderTask):
"""Commits the reservation.
Reversion strategy: N/A (the rollback will be handled by the task that did
the initial reservation (see: QuotaReserveTask).
Warning Warning: if the process that is running this reserve and commit
process fails (or is killed before the quota is rolled back or committed
it does appear like the quota will never be rolled back). This makes
software upgrades hard (inflight operations will need to be stopped or
allowed to complete before the upgrade can occur). *In the future* when
taskflow has persistence built-in this should be easier to correct via
an automated or manual process.
"""
def __init__(self):
super(QuotaCommitTask, self).__init__(addons=[ACTION])
def execute(self, context, reservations, snapshot_properties,
optional_args):
QUOTAS.commit(context, reservations)
# updating is_quota_committed attribute of optional_args dictionary
optional_args['is_quota_committed'] = True
return {'snapshot_properties': snapshot_properties}
def revert(self, context, result, **kwargs):
# We never produced a result and therefore can't destroy anything.
if isinstance(result, ft.Failure):
return
snapshot = result['snapshot_properties']
try:
reserve_opts = {'snapshots': -1,
'gigabytes': -snapshot['volume_size']}
reservations = QUOTAS.reserve(context,
project_id=context.project_id,
**reserve_opts)
if reservations:
QUOTAS.commit(context, reservations,
project_id=context.project_id)
except Exception:
LOG.exception(_LE("Failed to update quota while deleting "
"snapshots: %s"), snapshot['id'])
class ManageExistingTask(flow_utils.CinderTask):
"""Brings an existing snapshot under Cinder management."""
default_provides = set(['snapshot', 'new_status'])
def __init__(self, db, driver):
super(ManageExistingTask, self).__init__(addons=[ACTION])
self.db = db
self.driver = driver
def execute(self, context, snapshot_ref, manage_existing_ref, size):
model_update = self.driver.manage_existing_snapshot(
snapshot=snapshot_ref,
existing_ref=manage_existing_ref)
if not model_update:
model_update = {}
model_update.update({'size': size})
try:
snapshot_object = objects.Snapshot.get_by_id(context,
snapshot_ref['id'])
snapshot_object.update(model_update)
snapshot_object.save()
except exception.CinderException:
LOG.exception(_LE("Failed updating model of snapshot "
"%(snapshot_id)s with creation provided model "
"%(model)s."),
{'snapshot_id': snapshot_ref['id'],
'model': model_update})
raise
return {'snapshot': snapshot_ref,
'new_status': 'available'}
class CreateSnapshotOnFinishTask(NotifySnapshotActionTask):
"""Perform final snapshot actions.
When a snapshot is created successfully it is expected that MQ
notifications and database updates will occur to 'signal' to others that
the snapshot is now ready for usage. This task does those notifications and
updates in a reliable manner (not re-raising exceptions if said actions can
not be triggered).
Reversion strategy: N/A
"""
def __init__(self, db, event_suffix, host):
super(CreateSnapshotOnFinishTask, self).__init__(db, event_suffix,
host)
def execute(self, context, snapshot, new_status):
LOG.debug("Begin to call CreateSnapshotOnFinishTask execute.")
snapshot_id = snapshot['id']
LOG.debug("New status: %s", new_status)
update = {
'status': new_status
}
try:
# TODO(harlowja): is it acceptable to only log if this fails??
# or are there other side-effects that this will cause if the
# status isn't updated correctly (aka it will likely be stuck in
# 'building' if this fails)??
snapshot_object = objects.Snapshot.get_by_id(context,
snapshot_id)
snapshot_object.update(update)
snapshot_object.save()
# Now use the parent to notify.
super(CreateSnapshotOnFinishTask, self).execute(context, snapshot)
except exception.CinderException:
LOG.exception(_LE("Failed updating snapshot %(snapshot_id)s with "
"%(update)s."), {'snapshot_id': snapshot_id,
'update': update})
# Even if the update fails, the snapshot is ready.
LOG.info(_LI("Snapshot %s created successfully."), snapshot_id)
def get_flow(context, db, driver, host, snapshot_id, ref):
"""Constructs and returns the manager entry point flow."""
LOG.debug("Input parmeter: context=%(context)s, db=%(db)s,"
"driver=%(driver)s, host=%(host)s, "
"snapshot_id=(snapshot_id)s, ref=%(ref)s.",
{'context': context,
'db': db,
'driver': driver,
'host': host,
'snapshot_id': snapshot_id,
'ref': ref}
)
flow_name = ACTION.replace(":", "_") + "_manager"
snapshot_flow = linear_flow.Flow(flow_name)
# This injects the initial starting flow values into the workflow so that
# the dependency order of the tasks provides/requires can be correctly
# determined.
create_what = {
'context': context,
'snapshot_id': snapshot_id,
'manage_existing_ref': ref,
'optional_args': {'is_quota_committed': False}
}
notify_start_msg = "manage_existing_snapshot.start"
notify_end_msg = "manage_existing_snapshot.end"
snapshot_flow.add(ExtractSnapshotRefTask(db),
NotifySnapshotActionTask(db, notify_start_msg,
host=host),
PrepareForQuotaReservationTask(db, driver),
QuotaReserveTask(),
ManageExistingTask(db, driver),
QuotaCommitTask(),
CreateSnapshotOnFinishTask(db, notify_end_msg,
host=host))
LOG.debug("Begin to return taskflow.engines."
"load(snapshot_flow,store=create_what).")
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(snapshot_flow, store=create_what)