Add the checkpoint copy API for Karbor
Change-Id: I203733f9e810e16bcaba8e3ac7d11d06e4347372 Implements: blueprint support-copy-the-checkpoint-api
This commit is contained in:
parent
10de69e32d
commit
cf1fb3610d
126
karbor/api/v1/copies.py
Normal file
126
karbor/api/v1/copies.py
Normal file
@ -0,0 +1,126 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""The copy api."""
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from webob import exc
|
||||
|
||||
from karbor.api import common
|
||||
from karbor.api.openstack import wsgi
|
||||
from karbor import exception
|
||||
from karbor.i18n import _
|
||||
|
||||
from karbor import objects
|
||||
from karbor.policies import copies as copy_policy
|
||||
from karbor.services.protection import api as protection_api
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CopiesViewBuilder(common.ViewBuilder):
|
||||
"""Model a server API response as a python dictionary."""
|
||||
|
||||
def detail(self, request, copy):
|
||||
"""Detailed view of a single copy."""
|
||||
copy_ref = {
|
||||
'copy': {
|
||||
'project_id': copy.get('project_id'),
|
||||
'provider_id': copy.get('provider_id'),
|
||||
'plan_id': copy.get('plan_id'),
|
||||
'checkpoint_id': copy.get('checkpoint_id'),
|
||||
'parameters': copy.get('parameters'),
|
||||
}
|
||||
}
|
||||
return copy_ref
|
||||
|
||||
|
||||
class CopiesController(wsgi.Controller):
|
||||
"""The copy API controller for the OpenStack API."""
|
||||
|
||||
_view_builder_class = CopiesViewBuilder
|
||||
|
||||
def __init__(self):
|
||||
self.protection_api = protection_api.API()
|
||||
super(CopiesController, self).__init__()
|
||||
|
||||
def create(self, req, provider_id, body):
|
||||
"""Creates a new copy."""
|
||||
if not self.is_valid_body(body, 'copy'):
|
||||
raise exc.HTTPUnprocessableEntity()
|
||||
|
||||
LOG.debug('Create copy request body: %s', body)
|
||||
context = req.environ['karbor.context']
|
||||
context.can(copy_policy.CREATE_POLICY)
|
||||
copy = body['copy']
|
||||
plan_id = copy.get("plan_id", None)
|
||||
|
||||
if not uuidutils.is_uuid_like(plan_id):
|
||||
msg = _("Invalid plan id provided.")
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
if not uuidutils.is_uuid_like(provider_id):
|
||||
msg = _("Invalid provider id provided.")
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
parameters = copy.get("parameters", None)
|
||||
if parameters:
|
||||
if not isinstance(parameters, dict):
|
||||
msg = _("The parameters must be a dict when creating"
|
||||
" a copy.")
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
try:
|
||||
plan = objects.Plan.get_by_id(context, plan_id)
|
||||
except exception.PlanNotFound as error:
|
||||
raise exc.HTTPNotFound(explanation=error.msg)
|
||||
|
||||
if provider_id != plan.provider_id:
|
||||
msg = _("The provider id is not the same as the value "
|
||||
"in the plan.")
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
filters = {'plan_id': plan_id}
|
||||
checkpoints = self.protection_api.list_checkpoints(
|
||||
context, provider_id, marker=None, limit=None,
|
||||
sort_keys=None, sort_dirs=None, filters=filters, offset=None)
|
||||
|
||||
if not checkpoints:
|
||||
msg = _("The plan has not been protected.")
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
plan.parameters.update(parameters)
|
||||
try:
|
||||
checkpoint_copy = self.protection_api.copy(context, plan)
|
||||
except Exception:
|
||||
LOG.exception("Failed to create checkpoint copies.")
|
||||
raise
|
||||
|
||||
copy = {
|
||||
'project_id': context.project_id,
|
||||
'provider_id': plan.provider_id,
|
||||
'plan_id': plan.id,
|
||||
'checkpoint_id': checkpoint_copy,
|
||||
'parameters': parameters
|
||||
}
|
||||
|
||||
retval = self._view_builder.detail(req, copy)
|
||||
return retval
|
||||
|
||||
|
||||
def create_resource():
|
||||
return wsgi.Resource(CopiesController())
|
@ -13,6 +13,7 @@
|
||||
from oslo_service import wsgi as base_wsgi
|
||||
|
||||
from karbor.api.openstack import ProjectMapper
|
||||
from karbor.api.v1 import copies
|
||||
from karbor.api.v1 import operation_logs
|
||||
from karbor.api.v1 import plans
|
||||
from karbor.api.v1 import protectables
|
||||
@ -43,6 +44,7 @@ class APIRouter(base_wsgi.Router):
|
||||
service_resources = services.create_resource()
|
||||
quota_resources = quotas.create_resource()
|
||||
quota_class_resources = quota_classes.create_resource()
|
||||
copy_resources = copies.create_resource()
|
||||
|
||||
mapper.resource("plan", "plans",
|
||||
controller=plans_resources,
|
||||
@ -132,4 +134,10 @@ class APIRouter(base_wsgi.Router):
|
||||
controller=quota_class_resources,
|
||||
collection={},
|
||||
member={'action': 'POST'})
|
||||
mapper.connect("copy",
|
||||
"/{project_id}/providers/{provider_id}/checkpoints/"
|
||||
"action",
|
||||
controller=copy_resources,
|
||||
action='create',
|
||||
conditions={"method": ['POST']})
|
||||
super(APIRouter, self).__init__(mapper)
|
||||
|
@ -16,11 +16,13 @@ OPERATION_TYPES = (
|
||||
OPERATION_RESTORE,
|
||||
OPERATION_DELETE,
|
||||
OPERATION_VERIFY,
|
||||
OPERATION_COPY,
|
||||
) = (
|
||||
'protect',
|
||||
'restore',
|
||||
'delete',
|
||||
'verify'
|
||||
'verify',
|
||||
'copy'
|
||||
)
|
||||
|
||||
|
||||
@ -66,6 +68,9 @@ PLAN_STATUS_STARTED = 'started'
|
||||
|
||||
CHECKPOINT_STATUS_ERROR = 'error'
|
||||
CHECKPOINT_STATUS_PROTECTING = 'protecting'
|
||||
CHECKPOINT_STATUS_WAIT_COPYING = 'wait_copying'
|
||||
CHECKPOINT_STATUS_COPYING = 'copying'
|
||||
CHECKPOINT_STATUS_COPY_FINISHED = 'finished'
|
||||
CHECKPOINT_STATUS_AVAILABLE = 'available'
|
||||
CHECKPOINT_STATUS_DELETING = 'deleting'
|
||||
CHECKPOINT_STATUS_DELETED = 'deleted'
|
||||
|
@ -15,6 +15,7 @@
|
||||
import itertools
|
||||
|
||||
from karbor.policies import base
|
||||
from karbor.policies import copies
|
||||
from karbor.policies import operation_logs
|
||||
from karbor.policies import plans
|
||||
from karbor.policies import protectables
|
||||
@ -42,4 +43,5 @@ def list_rules():
|
||||
services.list_rules(),
|
||||
quotas.list_rules(),
|
||||
quota_classes.list_rules(),
|
||||
copies.list_rules(),
|
||||
)
|
||||
|
39
karbor/policies/copies.py
Normal file
39
karbor/policies/copies.py
Normal file
@ -0,0 +1,39 @@
|
||||
# Copyright (c) 2017 Huawei Technologies Co., Ltd.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_policy import policy
|
||||
|
||||
from karbor.policies import base
|
||||
|
||||
|
||||
CREATE_POLICY = 'copy:create'
|
||||
|
||||
copies_policies = [
|
||||
policy.DocumentedRuleDefault(
|
||||
name=CREATE_POLICY,
|
||||
check_str=base.RULE_ADMIN_OR_OWNER,
|
||||
description='Create a copy.',
|
||||
operations=[
|
||||
{
|
||||
'method': 'POST',
|
||||
'path': '/{project_id}/providers/{provider_id}/'
|
||||
'checkpoints/action'
|
||||
}
|
||||
]),
|
||||
]
|
||||
|
||||
|
||||
def list_rules():
|
||||
return copies_policies
|
@ -34,6 +34,9 @@ class API(base.Base):
|
||||
return self.protection_rpcapi.protect(context, plan,
|
||||
checkpoint_properties)
|
||||
|
||||
def copy(self, context, plan):
|
||||
return self.protection_rpcapi.copy(context, plan)
|
||||
|
||||
def delete(self, context, provider_id, checkpoint_id):
|
||||
return self.protection_rpcapi.delete(
|
||||
context,
|
||||
|
@ -73,6 +73,10 @@ class Checkpoint(object):
|
||||
# TODO(saggi): check for valid values and transitions
|
||||
return self._md_cache["status"]
|
||||
|
||||
@property
|
||||
def extra_info(self):
|
||||
return self._md_cache["extra_info"]
|
||||
|
||||
@property
|
||||
def project_id(self):
|
||||
return self._md_cache["project_id"]
|
||||
@ -100,6 +104,10 @@ class Checkpoint(object):
|
||||
def status(self, value):
|
||||
self._md_cache["status"] = value
|
||||
|
||||
@extra_info.setter
|
||||
def extra_info(self, value):
|
||||
self._md_cache["extra_info"] = value
|
||||
|
||||
@resource_graph.setter
|
||||
def resource_graph(self, resource_graph):
|
||||
serialized_resource_graph = graph.serialize_resource_graph(
|
||||
@ -170,14 +178,18 @@ class Checkpoint(object):
|
||||
provider_id = plan.get("provider_id")
|
||||
project_id = plan.get("project_id")
|
||||
extra_info = None
|
||||
checkpoint_status = constants.CHECKPOINT_STATUS_PROTECTING
|
||||
if checkpoint_properties:
|
||||
extra_info = checkpoint_properties.get("extra_info", None)
|
||||
status = checkpoint_properties.get("status", None)
|
||||
if status:
|
||||
checkpoint_status = status
|
||||
checkpoint_section.update_object(
|
||||
key=_INDEX_FILE_NAME,
|
||||
value={
|
||||
"version": cls.VERSION,
|
||||
"id": checkpoint_id,
|
||||
"status": constants.CHECKPOINT_STATUS_PROTECTING,
|
||||
"status": checkpoint_status,
|
||||
"owner_id": owner_id,
|
||||
"provider_id": provider_id,
|
||||
"project_id": project_id,
|
||||
|
191
karbor/services/protection/flows/copy.py
Normal file
191
karbor/services/protection/flows/copy.py
Normal file
@ -0,0 +1,191 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor import exception
|
||||
from karbor.resource import Resource
|
||||
from karbor.services.protection.flows import utils
|
||||
from karbor.services.protection import resource_flow
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from taskflow import task
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InitiateCopyTask(task.Task):
|
||||
def execute(self, context, checkpoint, checkpoint_copy, operation_log,
|
||||
*args, **kwargs):
|
||||
LOG.debug("Initiate copy checkpoint_id: %s", checkpoint_copy.id)
|
||||
checkpoint_copy.status = constants.CHECKPOINT_STATUS_COPYING
|
||||
checkpoint_copy.commit()
|
||||
update_fields = {"status": checkpoint_copy.status}
|
||||
utils.update_operation_log(context, operation_log, update_fields)
|
||||
|
||||
def revert(self, context, checkpoint, checkpoint_copy, operation_log,
|
||||
*args, **kwargs):
|
||||
LOG.debug("Failed to copy checkpoint_id: %s", checkpoint_copy.id)
|
||||
checkpoint_copy.status = constants.CHECKPOINT_STATUS_ERROR
|
||||
checkpoint_copy.commit()
|
||||
update_fields = {
|
||||
"status": checkpoint_copy.status,
|
||||
"ended_at": timeutils.utcnow()
|
||||
}
|
||||
utils.update_operation_log(context, operation_log, update_fields)
|
||||
|
||||
|
||||
class CompleteCopyTask(task.Task):
|
||||
def execute(self, context, checkpoint, checkpoint_copy, operation_log):
|
||||
LOG.debug("Complete copy checkpoint_id: %s", checkpoint_copy.id)
|
||||
checkpoint_copy.status = constants.CHECKPOINT_STATUS_AVAILABLE
|
||||
if checkpoint_copy.extra_info:
|
||||
extra_info = jsonutils.loads(checkpoint_copy.extra_info)
|
||||
extra_info['copy_status'] = \
|
||||
constants.CHECKPOINT_STATUS_COPY_FINISHED
|
||||
else:
|
||||
extra_info = {
|
||||
'copy_status': constants.CHECKPOINT_STATUS_COPY_FINISHED}
|
||||
checkpoint_copy.extra_info = jsonutils.dumps(extra_info)
|
||||
checkpoint_copy.commit()
|
||||
update_fields = {
|
||||
"status": checkpoint_copy.status,
|
||||
"ended_at": timeutils.utcnow()
|
||||
}
|
||||
utils.update_operation_log(context, operation_log, update_fields)
|
||||
|
||||
|
||||
def get_flow(context, protectable_registry, workflow_engine, plan, provider,
|
||||
checkpoint, checkpoint_copy):
|
||||
resources = set(Resource(**item) for item in plan.get("resources"))
|
||||
resource_graph = protectable_registry.build_graph(context,
|
||||
resources)
|
||||
checkpoint_copy.resource_graph = resource_graph
|
||||
checkpoint_copy.commit()
|
||||
operation_log = utils.create_operation_log(context, checkpoint_copy,
|
||||
constants.OPERATION_COPY)
|
||||
flow_name = "Copy_" + plan.get('id')+checkpoint.id
|
||||
copy_flow = workflow_engine.build_flow(flow_name, 'linear')
|
||||
plugins = provider.load_plugins()
|
||||
parameters = {}
|
||||
parameters.update(plan.get('parameters', {}))
|
||||
parameters['checkpoint'] = checkpoint
|
||||
parameters['checkpoint_copy'] = checkpoint_copy
|
||||
parameters['operation_log'] = operation_log
|
||||
resources_task_flow = resource_flow.build_resource_flow(
|
||||
operation_type=constants.OPERATION_COPY,
|
||||
context=context,
|
||||
workflow_engine=workflow_engine,
|
||||
resource_graph=resource_graph,
|
||||
plugins=plugins,
|
||||
parameters=parameters,
|
||||
)
|
||||
store_dict = {'context': context,
|
||||
'checkpoint': checkpoint,
|
||||
'checkpoint_copy': checkpoint_copy,
|
||||
'operation_log': operation_log
|
||||
}
|
||||
workflow_engine.add_tasks(
|
||||
copy_flow,
|
||||
InitiateCopyTask(name='InitiateCopyTask_'+checkpoint_copy.id,
|
||||
inject=store_dict),
|
||||
resources_task_flow,
|
||||
CompleteCopyTask(name='CompleteCopyTask_'+checkpoint_copy.id,
|
||||
inject=store_dict),
|
||||
)
|
||||
return copy_flow
|
||||
|
||||
|
||||
def get_flows(context, protectable_registry, workflow_engine, plan, provider,
|
||||
checkpoints, checkpoint_collection):
|
||||
checkpoints_protect_copy = prepare_create_flows(
|
||||
context, plan, checkpoints, checkpoint_collection)
|
||||
|
||||
copy_flows = create_flows(
|
||||
context, protectable_registry, workflow_engine, plan, provider,
|
||||
checkpoints_protect_copy, checkpoint_collection)
|
||||
|
||||
return copy_flows, checkpoints_protect_copy
|
||||
|
||||
|
||||
def prepare_create_flows(context, plan, checkpoints, checkpoint_collection):
|
||||
LOG.debug("Creating checkpoint copy for plan. plan: %s", plan.id)
|
||||
checkpoints_protect_copy = []
|
||||
for checkpoint in checkpoints:
|
||||
extra_info = checkpoint.get("extra_info", None)
|
||||
copy_status = None
|
||||
if extra_info:
|
||||
extra_info = jsonutils.loads(extra_info)
|
||||
copy_status = extra_info.get('copy_status', None)
|
||||
if (checkpoint.get("status") !=
|
||||
constants.CHECKPOINT_STATUS_AVAILABLE) or (
|
||||
copy_status ==
|
||||
constants.CHECKPOINT_STATUS_COPY_FINISHED):
|
||||
continue
|
||||
checkpoint_dict = {
|
||||
'project_id': context.project_id,
|
||||
'status': constants.CHECKPOINT_STATUS_WAIT_COPYING,
|
||||
'provider_id': checkpoint.get("provider_id"),
|
||||
"protection_plan": checkpoint.get("protection_plan"),
|
||||
"extra_info": {}
|
||||
}
|
||||
checkpoint_copy = checkpoint_collection.create(plan,
|
||||
checkpoint_dict)
|
||||
checkpoint_protect_copy = {
|
||||
'checkpoint_protect_id': checkpoint.get("id"),
|
||||
'checkpoint_copy_id': checkpoint_copy.id
|
||||
}
|
||||
checkpoints_protect_copy.append(checkpoint_protect_copy)
|
||||
LOG.debug("The protect and copy checkpoints . checkpoints_copy: %s",
|
||||
checkpoints_protect_copy)
|
||||
return checkpoints_protect_copy
|
||||
|
||||
|
||||
def create_flows(context, protectable_registry, workflow_engine,
|
||||
plan, provider, checkpoints_protect_copy,
|
||||
checkpoint_collection):
|
||||
LOG.debug("Creating flows for the plan. checkpoints: %s",
|
||||
checkpoints_protect_copy)
|
||||
flow_name = "Copy_flows" + plan.get('id')
|
||||
copy_flows = workflow_engine.build_flow(flow_name, 'linear')
|
||||
for checkpoint_protect_copy in checkpoints_protect_copy:
|
||||
checkpoint_protect_id = checkpoint_protect_copy.get(
|
||||
"checkpoint_protect_id")
|
||||
checkpoint_copy_id = checkpoint_protect_copy.get(
|
||||
"checkpoint_copy_id")
|
||||
checkpoint_protect = checkpoint_collection.get(checkpoint_protect_id)
|
||||
checkpoint_copy = checkpoint_collection.get(checkpoint_copy_id)
|
||||
try:
|
||||
copy_flow = get_flow(
|
||||
context,
|
||||
protectable_registry,
|
||||
workflow_engine,
|
||||
plan,
|
||||
provider,
|
||||
checkpoint_protect,
|
||||
checkpoint_copy,
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to create copy flow, checkpoint: %s",
|
||||
checkpoint_protect_id)
|
||||
raise exception.FlowError(
|
||||
flow="copy",
|
||||
error=e.msg if hasattr(e, 'msg') else 'Internal error')
|
||||
workflow_engine.add_tasks(copy_flows, copy_flow)
|
||||
flows_engine = workflow_engine.get_engine(copy_flows, store={
|
||||
'context': context
|
||||
})
|
||||
LOG.debug("Creating flows for the plan. copy_flows: %s", copy_flows)
|
||||
|
||||
return flows_engine
|
@ -22,7 +22,7 @@ from oslo_utils import timeutils
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_operation_log(context, checkpoint):
|
||||
def create_operation_log(context, checkpoint, operation_type=None):
|
||||
checkpoint_dict = checkpoint.to_dict()
|
||||
extra_info = checkpoint_dict.get('extra_info', None)
|
||||
scheduled_operation_id = None
|
||||
@ -41,7 +41,9 @@ def create_operation_log(context, checkpoint):
|
||||
provider_id = protection_plan.get("provider_id")
|
||||
operation_log_properties = {
|
||||
'project_id': checkpoint_dict['project_id'],
|
||||
'operation_type': constants.OPERATION_PROTECT,
|
||||
'operation_type': (
|
||||
constants.OPERATION_PROTECT if operation_type is None
|
||||
else operation_type),
|
||||
'checkpoint_id': checkpoint_dict['id'],
|
||||
'plan_id': plan_id,
|
||||
'provider_id': provider_id,
|
||||
|
@ -17,6 +17,7 @@ from oslo_utils import importutils
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor import exception
|
||||
from karbor.services.protection.flows import copy as flow_copy
|
||||
from karbor.services.protection.flows import delete as flow_delete
|
||||
from karbor.services.protection.flows import protect as flow_protect
|
||||
from karbor.services.protection.flows import restore as flow_restore
|
||||
@ -90,6 +91,20 @@ class Worker(object):
|
||||
checkpoint,
|
||||
provider,
|
||||
)
|
||||
elif operation_type == constants.OPERATION_COPY:
|
||||
plan = kwargs.get('plan', None)
|
||||
protectable_registry = kwargs.get('protectable_registry', None)
|
||||
checkpoint_collection = kwargs.get('checkpoint_collection', None)
|
||||
flow, checkpoint_copy = flow_copy.get_flows(
|
||||
context,
|
||||
protectable_registry,
|
||||
self.workflow_engine,
|
||||
plan,
|
||||
provider,
|
||||
checkpoint,
|
||||
checkpoint_collection,
|
||||
)
|
||||
return flow, checkpoint_copy
|
||||
else:
|
||||
raise exception.InvalidParameterValue(
|
||||
err='unknown operation type %s' % operation_type
|
||||
|
@ -130,6 +130,53 @@ class ProtectionManager(manager.Manager):
|
||||
self._spawn(self.worker.run_flow, flow)
|
||||
return checkpoint.id
|
||||
|
||||
@messaging.expected_exceptions(exception.InvalidPlan,
|
||||
exception.ProviderNotFound,
|
||||
exception.FlowError)
|
||||
def copy(self, context, plan):
|
||||
"""create copy of checkpoint for the given plan
|
||||
|
||||
:param plan: Define that protection plan should be done
|
||||
"""
|
||||
|
||||
LOG.info("Starting protection service:copy action.")
|
||||
LOG.debug("Creating the checkpoint copy for the plan: %s", plan)
|
||||
|
||||
if not plan:
|
||||
raise exception.InvalidPlan(
|
||||
reason=_('The protection plan is None'))
|
||||
provider_id = plan.get('provider_id', None)
|
||||
plan_id = plan.get('id', None)
|
||||
provider = self.provider_registry.show_provider(provider_id)
|
||||
checkpoints = None
|
||||
checkpoint_collection = provider.get_checkpoint_collection()
|
||||
try:
|
||||
checkpoints = self.list_checkpoints(context, provider_id,
|
||||
filters={'plan_id': plan_id})
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to get checkpoints for the plan: %s",
|
||||
plan_id)
|
||||
exc = exception.FlowError(flow="copy",
|
||||
error="Failed to get checkpoints")
|
||||
six.raise_from(exc, e)
|
||||
try:
|
||||
flow, checkpoint_copy = self.worker.get_flow(
|
||||
context=context,
|
||||
protectable_registry=self.protectable_registry,
|
||||
operation_type=constants.OPERATION_COPY,
|
||||
plan=plan,
|
||||
provider=provider,
|
||||
checkpoint=checkpoints,
|
||||
checkpoint_collection=checkpoint_collection)
|
||||
except Exception as e:
|
||||
LOG.exception("Failed to create copy flow, plan: %s",
|
||||
plan_id)
|
||||
raise exception.FlowError(
|
||||
flow="copy",
|
||||
error=e.msg if hasattr(e, 'msg') else 'Internal error')
|
||||
self._spawn(self.worker.run_flow, flow)
|
||||
return checkpoint_copy
|
||||
|
||||
@messaging.expected_exceptions(exception.ProviderNotFound,
|
||||
exception.CheckpointNotFound,
|
||||
exception.CheckpointNotAvailable,
|
||||
|
@ -90,6 +90,13 @@ class ProtectionPlugin(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def get_copy_operation(self, resource):
|
||||
"""Returns the copy Operation for this resource
|
||||
|
||||
:returns: Operation for the resource
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def get_delete_operation(self, resource):
|
||||
"""Returns the delete Operation for this resource
|
||||
|
||||
|
@ -42,6 +42,7 @@ ResourceHooks = namedtuple('ResourceHooks', [
|
||||
OPERATION_EXTRA_ARGS = {
|
||||
constants.OPERATION_RESTORE: ['restore', 'new_resources'],
|
||||
constants.OPERATION_VERIFY: ['verify', 'new_resources'],
|
||||
constants.OPERATION_COPY: ['checkpoint', 'checkpoint_copy'],
|
||||
}
|
||||
|
||||
|
||||
@ -99,6 +100,14 @@ class ResourceFlowGraphWalkerListener(graph.GraphWalkerListener):
|
||||
'parameters': parameters,
|
||||
'resource': resource,
|
||||
}
|
||||
if self.operation_type == constants.OPERATION_COPY:
|
||||
injects['checkpoint'] = self.parameters.get(
|
||||
'checkpoint')
|
||||
injects['checkpoint_copy'] = self.parameters.get(
|
||||
'checkpoint_copy')
|
||||
injects['operation_log'] = self.parameters.get(
|
||||
'operation_log')
|
||||
|
||||
requires = OPERATION_EXTRA_ARGS.get(self.operation_type, [])
|
||||
requires.append('operation_log')
|
||||
task = self.workflow_engine.create_task(method,
|
||||
|
@ -67,6 +67,13 @@ class ProtectionAPI(object):
|
||||
plan=plan,
|
||||
checkpoint_properties=checkpoint_properties)
|
||||
|
||||
def copy(self, ctxt, plan=None):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(
|
||||
ctxt,
|
||||
'copy',
|
||||
plan=plan)
|
||||
|
||||
def delete(self, ctxt, provider_id, checkpoint_id):
|
||||
cctxt = self.client.prepare(version='1.0')
|
||||
return cctxt.call(
|
||||
|
@ -86,6 +86,8 @@ class ResourceFlowTest(base.TestCase):
|
||||
elif operation == constants.OPERATION_VERIFY:
|
||||
kwargs['new_resources'] = {}
|
||||
kwargs['verify'] = None
|
||||
elif operation == constants.OPERATION_COPY:
|
||||
kwargs['checkpoint_copy'] = None
|
||||
self._walk_operation(mock_protection, operation, **kwargs)
|
||||
|
||||
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
|
||||
@ -105,6 +107,8 @@ class ResourceFlowTest(base.TestCase):
|
||||
elif operation == constants.OPERATION_VERIFY:
|
||||
kwargs['new_resources'] = {}
|
||||
kwargs['verify'] = None
|
||||
elif operation == constants.OPERATION_COPY:
|
||||
kwargs['checkpoint_copy'] = None
|
||||
self._walk_operation(mock_protection, operation, **kwargs)
|
||||
|
||||
self.assertEqual(mock_operation.on_prepare_begin.call_count,
|
||||
@ -133,6 +137,8 @@ class ResourceFlowTest(base.TestCase):
|
||||
self.assertEqual(v, result[k])
|
||||
|
||||
for operation in constants.OPERATION_TYPES:
|
||||
if operation == constants.OPERATION_COPY:
|
||||
continue
|
||||
fake_operation = fakes.FakeOperation()
|
||||
get_operation_attr = 'get_{}_operation'.format(operation)
|
||||
getattr(
|
||||
@ -153,6 +159,8 @@ class ResourceFlowTest(base.TestCase):
|
||||
elif operation == constants.OPERATION_VERIFY:
|
||||
kwargs['new_resources'] = {}
|
||||
kwargs['verify'] = None
|
||||
elif operation == constants.OPERATION_COPY:
|
||||
kwargs['checkpoint_copy'] = None
|
||||
|
||||
self._walk_operation(mock_protection, operation,
|
||||
parameters=parameters, **kwargs)
|
||||
|
Loading…
Reference in New Issue
Block a user