Implement cinder volume backup deletion

Change-Id: Ie9587a1574634a8518d2fab1ca815ae55404bdd1
Closes-Bug:#1587267
This commit is contained in:
chenying 2016-05-31 15:02:38 +08:00
parent 7833f19638
commit 924bc621e9
11 changed files with 188 additions and 12 deletions

View File

@ -43,6 +43,9 @@ RESOURCE_TYPES = (PROJECT_RESOURCE_TYPE,
CHECKPOINT_STATUS_ERROR = 'error'
CHECKPOINT_STATUS_PROTECTING = 'protecting'
CHECKPOINT_STATUS_AVAILABLE = 'available'
CHECKPOINT_STATUS_DELETING = 'deleting'
CHECKPOINT_STATUS_DELETED = 'deleted'
CHECKPOINT_STATUS_ERROR_DELETING = 'error-deleting'
# resource status
RESOURCE_STATUS_ERROR = 'error'

View File

@ -334,3 +334,7 @@ class InvalidOriginalId(Invalid):
class CheckpointNotAvailable(SmaugException):
message = _("The checkpoint %(checkpoint_id)s is not available")
class CheckpointNotBeDeleted(SmaugException):
message = _("The checkpoint %(checkpoint_id)s can not be deleted.")

View File

@ -20,7 +20,7 @@ from taskflow.utils import misc
sync_status_opts = [
cfg.IntOpt('sync_status_interval',
default=600,
default=60,
help='update protection status interval')
]

View File

@ -23,7 +23,7 @@ from taskflow import task
sync_status_opts = [
cfg.IntOpt('sync_status_interval',
default=600,
default=60,
help='update protection status interval')
]

View File

@ -0,0 +1,91 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
from smaug.common import constants
from smaug.i18n import _
from taskflow import task
sync_status_opts = [
cfg.IntOpt('sync_status_interval',
default=60,
help='update protection status interval')
]
CONF = cfg.CONF
CONF.register_opts(sync_status_opts)
LOG = logging.getLogger(__name__)
class SyncCheckpointStatusTask(task.Task):
def __init__(self, checkpoint, status_getters):
super(SyncCheckpointStatusTask, self).__init__()
self._status_getters = status_getters
self._checkpoint = checkpoint
def execute(self):
LOG.info(_("Start sync checkpoint status,checkpoint_id:%s"),
self._checkpoint.id)
sync_status = loopingcall.FixedIntervalLoopingCall(
self._sync_status, self._checkpoint, self._status_getters)
sync_status.start(interval=CONF.sync_status_interval)
def _sync_status(self, checkpoint, status_getters):
status = {}
for s in status_getters:
resource_id = s.get('resource_id')
get_resource_stats = s.get('get_resource_stats')
status[resource_id] = get_resource_stats(checkpoint,
resource_id)
list_status = list(set(status.values()))
LOG.info(_("Start sync checkpoint status,checkpoint_id:"
"%(checkpoint_id)s, resource_status:"
"%(resource_status)s") %
{"checkpoint_id": checkpoint.id,
"resource_status": status})
if constants.RESOURCE_STATUS_ERROR in list_status:
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR_DELETING
checkpoint.commit()
elif [constants.RESOURCE_STATUS_DELETED] == list_status:
checkpoint.status = constants.CHECKPOINT_STATUS_DELETED
checkpoint.commit()
LOG.info(_("Stop sync checkpoint status,checkpoint_id:"
"%(checkpoint_id)s,checkpoint status:"
"%(checkpoint_status)s") %
{"checkpoint_id": checkpoint.id,
"checkpoint_status": checkpoint.status})
raise loopingcall.LoopingCallDone()
def get_flow(context, workflow_engine, operation_type, checkpoint, provider):
ctx = {'context': context,
'checkpoint': checkpoint,
'workflow_engine': workflow_engine,
'operation_type': operation_type,
}
LOG.info(_("Start get checkpoint flow,checkpoint_id:%s"),
checkpoint.id)
flow_name = "delete_checkpoint_" + checkpoint.id
delete_flow = workflow_engine.build_flow(flow_name, 'linear')
result = provider.build_task_flow(ctx)
status_getters = result.get('status_getters')
resource_flow = result.get('task_flow')
workflow_engine.add_tasks(delete_flow,
resource_flow,
SyncCheckpointStatusTask(checkpoint,
status_getters))
flow_engine = workflow_engine.get_engine(delete_flow)
return flow_engine

View File

@ -19,6 +19,7 @@ from smaug.common import constants
from smaug.i18n import _LE
from smaug.services.protection.flows import create_protection
from smaug.services.protection.flows import create_restoration
from smaug.services.protection.flows import delete_checkpoint
workflow_opts = [
cfg.StrOpt(
@ -69,6 +70,16 @@ class Worker(object):
restore)
return restoration_flow
def get_delete_checkpoint_flow(self, context, operation_type, checkpoint,
provider):
delete_checkpoint_flow = delete_checkpoint.get_flow(
context,
self.workflow_engine,
operation_type,
checkpoint,
provider)
return delete_checkpoint_flow
def run_flow(self, flow_engine):
self.workflow_engine.run_engine(flow_engine)

View File

@ -158,12 +158,47 @@ class ProtectionManager(manager.Manager):
error=_("Failed to run flow"))
def delete(self, context, provider_id, checkpoint_id):
# TODO(wangliuan)
LOG.info(_LI("Starting protection service:delete action"))
LOG.debug('provider_id :%s checkpoint_id:%s', provider_id,
checkpoint_id)
provider = self.provider_registry.show_provider(provider_id)
try:
checkpoint_collection = provider.get_checkpoint_collection()
checkpoint = checkpoint_collection.get(checkpoint_id)
except Exception:
LOG.error(_LE("get checkpoint failed, checkpoint_id:%s"),
checkpoint_id)
raise exception.InvalidInput(
reason="Invalid checkpoint_id or provider_id")
return True
if checkpoint.status in [
constants.CHECKPOINT_STATUS_ERROR,
constants.CHECKPOINT_STATUS_PROTECTING
]:
raise exception.CheckpointNotBeDeleted(
checkpoint_id=checkpoint_id)
checkpoint.status = constants.CHECKPOINT_STATUS_DELETING
checkpoint.commit()
try:
delete_checkpoint_flow = self.worker.get_delete_checkpoint_flow(
context,
constants.OPERATION_DELETE,
checkpoint,
provider)
except Exception:
LOG.exception(
_LE("Failed to create delete checkpoint flow, checkpoint:%s."),
checkpoint_id)
raise exception.SmaugException(_(
"Failed to create delete checkpoint flow."
))
try:
self.worker.run_flow(delete_checkpoint_flow)
return True
except Exception:
LOG.exception(_LE("Failed to run delete checkpoint flow"))
raise
def start(self, plan):
# TODO(wangliuan)
@ -180,7 +215,7 @@ class ProtectionManager(manager.Manager):
return_stub = [
{
"id": "2220f8b1-975d-4621-a872-fa9afb43cb6c",
"id": "b42503e6-8e9c-4e4f-94e8-745d4eabdc51",
"project_id": "446a04d8-6ff5-4e0e-99a4-827a6389e9ff",
"status": "comitted",
"provider_id": "efc6a88b-9096-4bb6-8634-cda182a6e12a",
@ -213,7 +248,7 @@ class ProtectionManager(manager.Manager):
LOG.info(_LI("checkpoint_id:%s"), checkpoint_id)
return_stub = {
"id": "2220f8b1-975d-4621-a872-fa9afb43cb6c",
"id": "6f193601-39f8-4a81-993b-4d847393a0ee",
"project_id": "446a04d8-6ff5-4e0e-99a4-827a6389e9ff",
"status": "committed",
"protection_plan": {
@ -231,7 +266,7 @@ class ProtectionManager(manager.Manager):
}
]
},
"provider_id": "efc6a88b-9096-4bb6-8634-cda182a6e12a"
"provider_id": "cf56bd3e-97a7-4078-b6d5-f36246333fd9"
}
return return_stub

View File

@ -70,6 +70,11 @@ class BaseProtectionPlugin(ProtectionPlugin):
parameters['heat_template'] = context.heat_template
inject = parameters
requires = parameters.keys()
elif operation == constants.OPERATION_DELETE:
parameters['checkpoint'] = context.checkpoint
inject = parameters
requires = parameters.keys()
requires.append('checkpoint')
task_callback = self.task_callback_map.get(operation, None)
if task_callback is not None:

View File

@ -13,6 +13,7 @@
import six
from uuid import uuid4
from cinderclient.exceptions import NotFound
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
@ -113,7 +114,6 @@ class CinderProtectionPlugin(BaseProtectionPlugin):
bank_section = checkpoint.get_resource_bank_section(resource_id)
cinder_client = self._cinder_client(cntxt)
LOG.info(_("deleting volume backup, volume_id: %s."), resource_id)
try:
bank_section.update_object("status",
@ -121,7 +121,7 @@ class CinderProtectionPlugin(BaseProtectionPlugin):
resource_definition = bank_section.get_object("metadata")
backup_id = resource_definition["backup_id"]
cinder_client.backups.delete(backup_id)
bank_section.delete_object("metadata", resource_definition)
bank_section.delete_object("metadata")
self.protection_resource_map[resource_id] = {
"bank_section": bank_section,
"backup_id": backup_id,
@ -145,6 +145,7 @@ class CinderProtectionPlugin(BaseProtectionPlugin):
backup_id = resource_info["backup_id"]
bank_section = resource_info["bank_section"]
cinder_client = resource_info["cinder_client"]
operation = resource_info["operation"]
try:
backup = cinder_client.backups.get(backup_id)
if backup.status == "available":
@ -157,8 +158,16 @@ class CinderProtectionPlugin(BaseProtectionPlugin):
self.protection_resource_map.pop(resource_id)
else:
continue
except Exception:
LOG.info(_("deleting volume backup finished."))
except Exception as exc:
if operation == "delete" and type(exc) == NotFound:
bank_section.update_object(
"status",
constants.RESOURCE_STATUS_DELETED)
LOG.info(_("deleting volume backup finished."
"backup id: %s"), backup_id)
else:
LOG.error(_LE("deleting volume backup error.exc:%s."),
six.text_type(exc))
self.protection_resource_map.pop(resource_id)
def restore_backup(self, cntxt, checkpoint, **kwargs):

View File

@ -190,6 +190,19 @@ class PluggableProtectionProvider(object):
parameters=parameters,
heat_template=heat_template
)
if operation == constants.OPERATION_DELETE:
checkpoint = ctx['checkpoint']
task_flow = workflow_engine.build_flow(
flow_name=checkpoint.id)
resource_graph = checkpoint.resource_graph
resource_context = ResourceGraphContext(
cntxt=cntxt,
checkpoint=checkpoint,
operation=operation,
workflow_engine=workflow_engine,
task_flow=task_flow,
plugin_map=self._plugin_map
)
# TODO(luobin): for other type operations
@ -204,6 +217,10 @@ class PluggableProtectionProvider(object):
"resource_graph": resource_graph}
if operation == constants.OPERATION_RESTORE:
return {"task_flow": walker_listener.context.task_flow}
if operation == constants.OPERATION_DELETE:
return {"task_flow": walker_listener.context.task_flow,
"status_getters": walker_listener.context.status_getters
}
# TODO(luobin): for other type operations

View File

@ -75,7 +75,8 @@ class ResourceGraphWalkerListener(GraphWalkerListener):
# do something in protection_plugin
protection_plugin.on_resource_start(context)
if self.context.operation == constants.OPERATION_PROTECT:
if self.context.operation == constants.OPERATION_PROTECT \
or self.context.operation == constants.OPERATION_DELETE:
if not already_visited:
self.context.status_getters.append(
{"resource_id": resource.id,