Stabilize Protection Plugin API
Protection plugin now needs to implement methods for each operation (protect, delete, restore). Each protection plugin is created in the context of the flow, while each operation is created in the context of the resource. A protection plugin is responsible for returning an Operation class for each operation. Such Operation class defines the behavior of the protection plugin by implementing these optional hooks: - on_prepare_begin hook is invoked before any hook of this resource and dependent resources has begun - on_prepare_finish hook is invoked after any prepare hooks of dependent resources are complete. - on_main hook is invoked after the resource prepare hooks are complete - on_complete hook is invoked once the resource's main hook is complete, and the dependent resources' on_complete hooks are complete HeatTemplate is now created and supplied from a task instead of being created and passed to resource tasks before the restore operation began. Change-Id: I847eec6990b2d24a66a12542d242fbfb682272fe Co-Authored-By: Saggi Mizrahi <saggi.mizrahi@huawei.com> Implements: blueprint protection-plugin-is-design
This commit is contained in:
parent
c7f966933b
commit
a80b9cc283
|
@ -0,0 +1,19 @@
|
|||
[provider]
|
||||
name = Noop
|
||||
description = This provider uses a Noop Protection Plugin for every resource
|
||||
id = b766f37c-d011-4026-8228-28730d734a3f
|
||||
|
||||
plugin=karbor-noop-protection-plugin
|
||||
bank=karbor-swift-bank-plugin
|
||||
|
||||
[swift_client]
|
||||
swift_auth_url=http://127.0.0.1:5000/v2.0/
|
||||
swift_auth_version=2
|
||||
swift_user=admin
|
||||
swift_key=password
|
||||
swift_tenant_name=admin
|
||||
|
||||
[swift_bank_plugin]
|
||||
lease_expire_window=120
|
||||
lease_renew_window=100
|
||||
lease_validity_window=100
|
|
@ -388,12 +388,12 @@ class ProvidersController(wsgi.Controller):
|
|||
"resources": plan.get("resources"),
|
||||
}
|
||||
}
|
||||
checkpoint = self.protection_api.protect(context, plan)
|
||||
if checkpoint is not None:
|
||||
checkpoint_properties['id'] = checkpoint.get('checkpoint_id')
|
||||
else:
|
||||
msg = _("Get checkpoint failed.")
|
||||
raise exc.HTTPNotFound(explanation=msg)
|
||||
try:
|
||||
checkpoint_id = self.protection_api.protect(context, plan)
|
||||
except Exception as error:
|
||||
msg = _("Create checkpoint failed: %s") % error.msg
|
||||
raise exc.HTTPBadRequest(explanation=msg)
|
||||
checkpoint_properties['id'] = checkpoint_id
|
||||
|
||||
returnval = self._checkpoint_view_builder.detail(
|
||||
req, checkpoint_properties)
|
||||
|
|
|
@ -264,7 +264,7 @@ class RestoresController(wsgi.Controller):
|
|||
'checkpoint_id': restore.get('checkpoint_id'),
|
||||
'restore_target': restore.get('restore_target'),
|
||||
'parameters': parameters,
|
||||
'status': constants.RESOURCE_STATUS_STARTED,
|
||||
'status': constants.RESTORE_STATUS_IN_PROGRESS,
|
||||
}
|
||||
|
||||
restoreobj = objects.Restore(context=context,
|
||||
|
@ -276,10 +276,9 @@ class RestoresController(wsgi.Controller):
|
|||
try:
|
||||
self.protection_api.restore(context, restoreobj, restore_auth)
|
||||
except Exception:
|
||||
status_update = constants.OPERATION_EXE_STATE_FAILED
|
||||
# update the status of restore
|
||||
update_dict = {
|
||||
"status": status_update
|
||||
"status": constants.RESTORE_STATUS_FAILURE
|
||||
}
|
||||
check_policy(context, 'update', restoreobj)
|
||||
restoreobj = self._restore_update(context,
|
||||
|
|
|
@ -75,3 +75,7 @@ OPERATION_EXE_STATE_IN_PROGRESS = 'in_progress'
|
|||
OPERATION_EXE_STATE_SUCCESS = 'success'
|
||||
OPERATION_EXE_STATE_FAILED = 'failed'
|
||||
OPERATION_EXE_STATE_DROPPED_OUT_OF_WINDOW = 'dropped_out_of_window'
|
||||
|
||||
RESTORE_STATUS_SUCCESS = 'success'
|
||||
RESTORE_STATUS_FAILURE = 'fail'
|
||||
RESTORE_STATUS_IN_PROGRESS = 'in_progress'
|
||||
|
|
|
@ -262,6 +262,11 @@ class ProtectableTypeNotFound(NotFound):
|
|||
" not be found.")
|
||||
|
||||
|
||||
class ProtectionPluginNotFound(NotFound):
|
||||
message = _("Protection Plugin for %(type)s could"
|
||||
" not be found.")
|
||||
|
||||
|
||||
class ProviderNotFound(NotFound):
|
||||
message = _("Provider %(provider_id)s could"
|
||||
" not be found.")
|
||||
|
|
|
@ -151,7 +151,7 @@ class Checkpoint(object):
|
|||
value={
|
||||
"version": cls.VERSION,
|
||||
"id": checkpoint_id,
|
||||
"status": "protecting",
|
||||
"status": constants.CHECKPOINT_STATUS_PROTECTING,
|
||||
"owner_id": owner_id,
|
||||
"provider_id": plan.get("provider_id"),
|
||||
"project_id": plan.get("project_id"),
|
||||
|
|
|
@ -11,94 +11,66 @@
|
|||
# under the License.
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor.i18n import _LI
|
||||
from oslo_config import cfg
|
||||
from karbor.resource import Resource
|
||||
from karbor.services.protection.protectable_registry import ProtectableRegistry
|
||||
from karbor.services.protection import resource_flow
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
from taskflow import task
|
||||
|
||||
sync_status_opts = [
|
||||
cfg.IntOpt('sync_status_interval',
|
||||
default=60,
|
||||
help='update protection status interval')
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(sync_status_opts)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CreateCheckpointTask(task.Task):
|
||||
def __init__(self, plan, provider, resource_graph):
|
||||
provides = 'checkpoint'
|
||||
super(CreateCheckpointTask, self).__init__(provides=provides)
|
||||
self._plan = plan
|
||||
self._provider = provider
|
||||
self._resource_graph = resource_graph
|
||||
class InitiateProtectTask(task.Task):
|
||||
def __init__(self):
|
||||
super(InitiateProtectTask, self).__init__()
|
||||
|
||||
def execute(self):
|
||||
checkpoint_collection = self._provider.get_checkpoint_collection()
|
||||
checkpoint = checkpoint_collection.create(self._plan)
|
||||
checkpoint.resource_graph = self._resource_graph
|
||||
def execute(self, checkpoint, *args, **kwargs):
|
||||
LOG.debug("Initiate protect checkpoint_id: %s", checkpoint.id)
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_PROTECTING
|
||||
checkpoint.commit()
|
||||
|
||||
def revert(self, checkpoint, *args, **kwargs):
|
||||
LOG.debug("Failed to protect checkpoint_id: %s", checkpoint.id)
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR
|
||||
checkpoint.commit()
|
||||
return checkpoint
|
||||
|
||||
|
||||
class SyncCheckpointStatusTask(task.Task):
|
||||
def __init__(self, status_getters):
|
||||
requires = ['checkpoint']
|
||||
super(SyncCheckpointStatusTask, self).__init__(requires=requires)
|
||||
self._status_getters = status_getters
|
||||
class CompleteProtectTask(task.Task):
|
||||
def __init__(self):
|
||||
super(CompleteProtectTask, self).__init__()
|
||||
|
||||
def execute(self, checkpoint):
|
||||
LOG.info(_LI("Start sync checkpoint status,checkpoint_id: %s"),
|
||||
checkpoint.id)
|
||||
sync_status = loopingcall.FixedIntervalLoopingCall(
|
||||
self._sync_status, checkpoint, self._status_getters)
|
||||
sync_status.start(interval=CONF.sync_status_interval)
|
||||
|
||||
def _sync_status(self, checkpoint, status_getters):
|
||||
statuses = set()
|
||||
for s in status_getters:
|
||||
resource_id = s.get('resource_id')
|
||||
get_resource_stats = s.get('get_resource_stats')
|
||||
statuses.add(get_resource_stats(checkpoint, resource_id))
|
||||
if constants.RESOURCE_STATUS_ERROR in statuses:
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR
|
||||
checkpoint.commit()
|
||||
elif constants.RESOURCE_STATUS_PROTECTING in statuses:
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_PROTECTING
|
||||
checkpoint.commit()
|
||||
elif constants.RESOURCE_STATUS_UNDEFINED in statuses:
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_PROTECTING
|
||||
checkpoint.commit()
|
||||
else:
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_AVAILABLE
|
||||
checkpoint.commit()
|
||||
LOG.info(_LI("Stop sync checkpoint status,checkpoint_id: "
|
||||
"%(checkpoint_id)s,checkpoint status: "
|
||||
"%(checkpoint_status)s"),
|
||||
{"checkpoint_id": checkpoint.id,
|
||||
"checkpoint_status": checkpoint.status})
|
||||
raise loopingcall.LoopingCallDone()
|
||||
LOG.debug("Complete protect checkpoint_id: %s", checkpoint.id)
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_AVAILABLE
|
||||
checkpoint.commit()
|
||||
|
||||
|
||||
def get_flow(context, workflow_engine, operation_type, plan, provider):
|
||||
ctx = {'context': context,
|
||||
'plan': plan,
|
||||
'workflow_engine': workflow_engine,
|
||||
'operation_type': operation_type}
|
||||
flow_name = "create_protection_" + plan.get('id')
|
||||
def get_flow(context, workflow_engine, operation_type, plan, provider,
|
||||
checkpoint):
|
||||
protectable_registry = ProtectableRegistry()
|
||||
resources = set(Resource(**item) for item in plan.get("resources"))
|
||||
resource_graph = protectable_registry.build_graph(context,
|
||||
resources)
|
||||
checkpoint.resource_graph = resource_graph
|
||||
checkpoint.commit()
|
||||
flow_name = "Protect_" + plan.get('id')
|
||||
protection_flow = workflow_engine.build_flow(flow_name, 'linear')
|
||||
result = provider.build_task_flow(ctx)
|
||||
status_getters = result.get('status_getters')
|
||||
resource_flow = result.get('task_flow')
|
||||
resource_graph = result.get('resource_graph')
|
||||
workflow_engine.add_tasks(protection_flow,
|
||||
CreateCheckpointTask(plan, provider,
|
||||
resource_graph),
|
||||
resource_flow,
|
||||
SyncCheckpointStatusTask(status_getters))
|
||||
flow_engine = workflow_engine.get_engine(protection_flow)
|
||||
plugins = provider.load_plugins()
|
||||
resources_task_flow = resource_flow.build_resource_flow(
|
||||
operation_type=operation_type,
|
||||
context=context,
|
||||
workflow_engine=workflow_engine,
|
||||
resource_graph=resource_graph,
|
||||
plugins=plugins,
|
||||
parameters=plan.get('parameters'),
|
||||
)
|
||||
workflow_engine.add_tasks(
|
||||
protection_flow,
|
||||
InitiateProtectTask(),
|
||||
resources_task_flow,
|
||||
CompleteProtectTask(),
|
||||
)
|
||||
flow_engine = workflow_engine.get_engine(protection_flow, store={
|
||||
'checkpoint': checkpoint
|
||||
})
|
||||
return flow_engine
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
|
@ -17,13 +18,14 @@ from oslo_utils import uuidutils
|
|||
|
||||
from karbor.common import constants
|
||||
from karbor.i18n import _LE, _LI
|
||||
from karbor.services.protection.client_factory import ClientFactory
|
||||
from karbor.services.protection.restore_heat import HeatTemplate
|
||||
from karbor.services.protection import client_factory
|
||||
from karbor.services.protection import resource_flow
|
||||
from karbor.services.protection import restore_heat
|
||||
from taskflow import task
|
||||
|
||||
sync_status_opts = [
|
||||
cfg.IntOpt('sync_status_interval',
|
||||
default=60,
|
||||
default=20,
|
||||
help='update protection status interval')
|
||||
]
|
||||
|
||||
|
@ -33,106 +35,137 @@ CONF.register_opts(sync_status_opts)
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CreateStackTask(task.Task):
|
||||
def __init__(self, heat_client, template):
|
||||
provides = 'stack_id'
|
||||
super(CreateStackTask, self).__init__(provides=provides)
|
||||
self._heat_client = heat_client
|
||||
self._template = template
|
||||
class InitiateRestoreTask(task.Task):
|
||||
def __init__(self):
|
||||
super(InitiateRestoreTask, self).__init__()
|
||||
|
||||
def execute(self):
|
||||
def execute(self, restore, *args, **kwargs):
|
||||
LOG.debug("Initiate restore restore_id: %s", restore.id)
|
||||
restore['status'] = constants.RESTORE_STATUS_IN_PROGRESS
|
||||
restore.save()
|
||||
|
||||
def revert(self, restore, *args, **kwargs):
|
||||
LOG.debug("Failed to restore restore_id: %s", restore.id)
|
||||
restore['status'] = constants.RESTORE_STATUS_FAILURE
|
||||
restore.save()
|
||||
|
||||
|
||||
class CompleteRestoreTask(task.Task):
|
||||
def __init__(self):
|
||||
super(CompleteRestoreTask, self).__init__()
|
||||
|
||||
def execute(self, restore, *args, **kwargs):
|
||||
LOG.debug("Complete restore restore_id: %s", restore.id)
|
||||
restore['status'] = constants.RESTORE_STATUS_SUCCESS
|
||||
restore.save()
|
||||
|
||||
|
||||
class CreateHeatTask(task.Task):
|
||||
default_provides = ['heat_client', 'heat_template']
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CreateHeatTask, self).__init__(*args, **kwargs)
|
||||
|
||||
def execute(self, context, heat_conf):
|
||||
LOG.info(_LI('Creating Heat template. Target: "%s"')
|
||||
% heat_conf.get('auth_url', '(None)'))
|
||||
heat_client = client_factory.ClientFactory.create_client(
|
||||
'heat', context=context, **heat_conf)
|
||||
|
||||
heat_template = restore_heat.HeatTemplate()
|
||||
|
||||
return (heat_client, heat_template)
|
||||
|
||||
|
||||
class CreateStackTask(task.Task):
|
||||
default_provides = 'stack_id'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CreateStackTask, self).__init__(*args, **kwargs)
|
||||
|
||||
def execute(self, heat_client, heat_template):
|
||||
stack_name = "restore_%s" % uuidutils.generate_uuid()
|
||||
LOG.info(_LI("creating stack, stack_name:%s"), stack_name)
|
||||
LOG.info(_LI('Creating Heat stack, stack_name: %s'), stack_name)
|
||||
try:
|
||||
body = self._heat_client.stacks.create(
|
||||
body = heat_client.stacks.create(
|
||||
stack_name=stack_name,
|
||||
template=self._template.to_dict())
|
||||
template=heat_template.to_dict())
|
||||
LOG.debug('Created stack with id: %s', body['stack']['id'])
|
||||
return body['stack']['id']
|
||||
except Exception:
|
||||
LOG.error(_LE("use heat to create stack failed"))
|
||||
raise
|
||||
|
||||
|
||||
class SyncStackStatusTask(task.Task):
|
||||
def __init__(self, checkpoint, heat_client, restore):
|
||||
requires = ['stack_id']
|
||||
super(SyncStackStatusTask, self).__init__(requires=requires)
|
||||
self._heat_client = heat_client
|
||||
self._checkpoint = checkpoint
|
||||
self._restore = restore
|
||||
class SyncRestoreStatusTask(task.Task):
|
||||
|
||||
def execute(self, stack_id):
|
||||
LOG.info(_LI("syncing stack status, stack_id: %s"), stack_id)
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(SyncRestoreStatusTask, self).__init__(*args, **kwargs)
|
||||
|
||||
def execute(self, stack_id, heat_client):
|
||||
LOG.info(_LI('Syncing Heat stack status, stack_id: %s'), stack_id)
|
||||
sync_status_loop = loopingcall.FixedIntervalLoopingCall(
|
||||
self._sync_status, self._checkpoint, stack_id)
|
||||
self._sync_status, heat_client, stack_id)
|
||||
sync_status_loop.start(interval=CONF.sync_status_interval)
|
||||
sync_status_loop.wait()
|
||||
|
||||
def _sync_status(self, checkpoint, stack_id):
|
||||
def _sync_status(self, heat_client, stack_id):
|
||||
try:
|
||||
stack = self._heat_client.stacks.get(stack_id)
|
||||
stack_status = getattr(stack, 'stack_status')
|
||||
if stack_status == 'CREATE_IN_PROGRESS':
|
||||
return
|
||||
|
||||
if stack_status == 'CREATE_FAILED':
|
||||
status = constants.OPERATION_EXE_STATE_FAILED
|
||||
elif stack_status == 'CREATE_COMPLETE':
|
||||
status = constants.OPERATION_EXE_STATE_SUCCESS
|
||||
|
||||
status_dict = {
|
||||
"status": status
|
||||
}
|
||||
self._restore.update(status_dict)
|
||||
self._restore.save()
|
||||
raise loopingcall.LoopingCallDone()
|
||||
stack = heat_client.stacks.get(stack_id)
|
||||
except Exception:
|
||||
LOG.info(_LI("stop sync stack status, stack_id: %s"), stack_id)
|
||||
LOG.debug('Heat error getting stack, stack_id: %s', stack_id)
|
||||
raise
|
||||
stack_status = getattr(stack, 'stack_status')
|
||||
if stack_status == 'CREATE_IN_PROGRESS':
|
||||
LOG.debug('Heat stack status: in progress, stack_id: %s',
|
||||
stack_id)
|
||||
return
|
||||
if stack_status == 'CREATE_COMPLETE':
|
||||
LOG.info(_LI('Heat stack status: complete, stack_id: %s'),
|
||||
stack_id)
|
||||
else:
|
||||
LOG.info(_LI('Heat stack status: failure, stack_id: %s'), stack_id)
|
||||
raise
|
||||
raise loopingcall.LoopingCallDone()
|
||||
|
||||
|
||||
def get_flow(context, workflow_engine, operation_type, checkpoint, provider,
|
||||
restore, restore_auth):
|
||||
target = restore.get('restore_target', None)
|
||||
|
||||
# TODO(luobin): create a heat_client
|
||||
# Initialize a heat client using current login tenant when the
|
||||
# restore_target and restore_auth is not provided.
|
||||
heat_conf = {}
|
||||
if target is not None:
|
||||
username = None
|
||||
password = None
|
||||
heat_conf["auth_url"] = target
|
||||
if restore_auth is not None:
|
||||
auth_type = restore_auth.get("type", None)
|
||||
if auth_type == "password":
|
||||
username = restore_auth["username"]
|
||||
password = restore_auth["password"]
|
||||
kwargs = {"auth_url": target,
|
||||
"username": username,
|
||||
"password": password}
|
||||
else:
|
||||
kwargs = {}
|
||||
heat_client = ClientFactory.create_client("heat",
|
||||
context=context,
|
||||
**kwargs)
|
||||
heat_conf["username"] = restore_auth["username"]
|
||||
heat_conf["password"] = restore_auth["password"]
|
||||
|
||||
# TODO(luobin): create a heat_template
|
||||
heat_template = HeatTemplate()
|
||||
|
||||
ctx = {'context': context,
|
||||
'checkpoint': checkpoint,
|
||||
'workflow_engine': workflow_engine,
|
||||
'operation_type': operation_type,
|
||||
'restore': restore,
|
||||
'heat_template': heat_template}
|
||||
|
||||
flow_name = "create_restoration_" + checkpoint.id
|
||||
resource_graph = checkpoint.resource_graph
|
||||
parameters = restore.parameters
|
||||
flow_name = "Restore_" + checkpoint.id
|
||||
restoration_flow = workflow_engine.build_flow(flow_name, 'linear')
|
||||
result = provider.build_task_flow(ctx)
|
||||
resource_flow = result.get('task_flow')
|
||||
plugins = provider.load_plugins()
|
||||
resources_task_flow = resource_flow.build_resource_flow(
|
||||
operation_type=operation_type,
|
||||
context=context,
|
||||
workflow_engine=workflow_engine,
|
||||
resource_graph=resource_graph,
|
||||
plugins=plugins,
|
||||
parameters=parameters
|
||||
)
|
||||
|
||||
workflow_engine.add_tasks(
|
||||
restoration_flow,
|
||||
resource_flow,
|
||||
CreateStackTask(heat_client, heat_template),
|
||||
SyncStackStatusTask(checkpoint, heat_client, restore)
|
||||
InitiateRestoreTask(),
|
||||
CreateHeatTask(inject={'context': context, 'heat_conf': heat_conf}),
|
||||
resources_task_flow,
|
||||
CreateStackTask(),
|
||||
SyncRestoreStatusTask(),
|
||||
CompleteRestoreTask()
|
||||
)
|
||||
flow_engine = workflow_engine.get_engine(restoration_flow)
|
||||
flow_engine = workflow_engine.get_engine(restoration_flow,
|
||||
store={'checkpoint': checkpoint,
|
||||
'restore': restore})
|
||||
return flow_engine
|
||||
|
|
|
@ -12,78 +12,58 @@
|
|||
|
||||
from karbor.common import constants
|
||||
from karbor.i18n import _LI
|
||||
from oslo_config import cfg
|
||||
from karbor.services.protection import resource_flow
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
from taskflow import task
|
||||
|
||||
sync_status_opts = [
|
||||
cfg.IntOpt('sync_status_interval',
|
||||
default=60,
|
||||
help='update protection status interval')
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(sync_status_opts)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SyncCheckpointStatusTask(task.Task):
|
||||
def __init__(self, checkpoint, status_getters):
|
||||
super(SyncCheckpointStatusTask, self).__init__()
|
||||
self._status_getters = status_getters
|
||||
self._checkpoint = checkpoint
|
||||
class InitiateDeleteTask(task.Task):
|
||||
def __init__(self):
|
||||
super(InitiateDeleteTask, self).__init__()
|
||||
|
||||
def execute(self):
|
||||
LOG.info(_LI("Start sync checkpoint status,checkpoint_id:%s"),
|
||||
self._checkpoint.id)
|
||||
sync_status = loopingcall.FixedIntervalLoopingCall(
|
||||
self._sync_status, self._checkpoint, self._status_getters)
|
||||
sync_status.start(interval=CONF.sync_status_interval)
|
||||
def execute(self, checkpoint, *args, **kwargs):
|
||||
LOG.debug("Initiate delete checkpoint_id: %s", checkpoint.id)
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_DELETING
|
||||
checkpoint.commit()
|
||||
|
||||
def _sync_status(self, checkpoint, status_getters):
|
||||
statuses = set()
|
||||
for s in status_getters:
|
||||
resource_id = s.get('resource_id')
|
||||
get_resource_stats = s.get('get_resource_stats')
|
||||
statuses.add(get_resource_stats(checkpoint, resource_id))
|
||||
LOG.info(_LI("Start sync checkpoint status,checkpoint_id:"
|
||||
"%(checkpoint_id)s, resource_status:"
|
||||
"%(resource_status)s") %
|
||||
{"checkpoint_id": checkpoint.id,
|
||||
"resource_status": statuses})
|
||||
if constants.RESOURCE_STATUS_ERROR in statuses:
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR_DELETING
|
||||
checkpoint.commit()
|
||||
raise loopingcall.LoopingCallDone()
|
||||
elif statuses == {constants.RESOURCE_STATUS_DELETED, }:
|
||||
checkpoint.delete()
|
||||
LOG.info(_LI("Stop sync checkpoint status,checkpoint_id: "
|
||||
"%(checkpoint_id)s,checkpoint status: "
|
||||
"%(checkpoint_status)s"),
|
||||
{"checkpoint_id": checkpoint.id,
|
||||
"checkpoint_status": checkpoint.status})
|
||||
raise loopingcall.LoopingCallDone()
|
||||
def revert(self, checkpoint, *args, **kwargs):
|
||||
LOG.debug("Failed to delete checkpoint_id: %s", checkpoint.id)
|
||||
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR_DELETING
|
||||
checkpoint.commit()
|
||||
|
||||
|
||||
class CompleteDeleteTask(task.Task):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(CompleteDeleteTask, self).__init__(*args, **kwargs)
|
||||
|
||||
def execute(self, checkpoint):
|
||||
LOG.debug("Complete delete checkpoint_id: %s", checkpoint.id)
|
||||
checkpoint.delete()
|
||||
|
||||
|
||||
def get_flow(context, workflow_engine, operation_type, checkpoint, provider):
|
||||
|
||||
ctx = {'context': context,
|
||||
'checkpoint': checkpoint,
|
||||
'workflow_engine': workflow_engine,
|
||||
'operation_type': operation_type,
|
||||
}
|
||||
LOG.info(_LI("Start get checkpoint flow,checkpoint_id: %s"),
|
||||
LOG.info(_LI("Start get checkpoint flow, checkpoint_id: %s"),
|
||||
checkpoint.id)
|
||||
flow_name = "delete_checkpoint_" + checkpoint.id
|
||||
flow_name = "Delete_Checkpoint_" + checkpoint.id
|
||||
delete_flow = workflow_engine.build_flow(flow_name, 'linear')
|
||||
result = provider.build_task_flow(ctx)
|
||||
status_getters = result.get('status_getters')
|
||||
resource_flow = result.get('task_flow')
|
||||
workflow_engine.add_tasks(delete_flow,
|
||||
resource_flow,
|
||||
SyncCheckpointStatusTask(checkpoint,
|
||||
status_getters))
|
||||
flow_engine = workflow_engine.get_engine(delete_flow)
|
||||
resource_graph = checkpoint.resource_graph
|
||||
plugins = provider.load_plugins()
|
||||
resources_task_flow = resource_flow.build_resource_flow(
|
||||
operation_type=operation_type,
|
||||
context=context,
|
||||
workflow_engine=workflow_engine,
|
||||
resource_graph=resource_graph,
|
||||
plugins=plugins,
|
||||
parameters=None
|
||||
)
|
||||
workflow_engine.add_tasks(
|
||||
delete_flow,
|
||||
InitiateDeleteTask(),
|
||||
resources_task_flow,
|
||||
CompleteDeleteTask(),
|
||||
)
|
||||
flow_engine = workflow_engine.get_engine(delete_flow,
|
||||
store={'checkpoint': checkpoint})
|
||||
return flow_engine
|
||||
|
|
|
@ -52,13 +52,14 @@ class Worker(object):
|
|||
if operation_type == constants.OPERATION_PROTECT:
|
||||
plan = kwargs.get('plan', None)
|
||||
provider = kwargs.get('provider', None)
|
||||
checkpoint = kwargs.get('checkpoint')
|
||||
protection_flow = create_protection.get_flow(context,
|
||||
self.workflow_engine,
|
||||
operation_type,
|
||||
plan,
|
||||
provider)
|
||||
provider,
|
||||
checkpoint)
|
||||
return protection_flow
|
||||
# TODO(wangliuan)implement the other operation
|
||||
|
||||
def get_restoration_flow(self, context, operation_type, checkpoint,
|
||||
provider, restore, restore_auth):
|
||||
|
|
|
@ -15,6 +15,8 @@ Protection Service
|
|||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from eventlet import greenpool
|
||||
from eventlet import greenthread
|
||||
import six
|
||||
|
||||
from oslo_config import cfg
|
||||
|
@ -35,7 +37,12 @@ LOG = logging.getLogger(__name__)
|
|||
protection_manager_opts = [
|
||||
cfg.StrOpt('provider_registry',
|
||||
default='karbor.services.protection.provider.ProviderRegistry',
|
||||
help='the provider registry')
|
||||
help='the provider registry'),
|
||||
cfg.IntOpt('max_concurrent_operations',
|
||||
default=0,
|
||||
help='number of maximum concurrent operation (protect, restore,'
|
||||
' delete) flows. 0 means no hard limit'
|
||||
)
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -60,6 +67,16 @@ class ProtectionManager(manager.Manager):
|
|||
self.protectable_registry = ProtectableRegistry()
|
||||
self.protectable_registry.load_plugins()
|
||||
self.worker = flow_manager.Worker()
|
||||
self._greenpool = None
|
||||
self._greenpool_size = CONF.max_concurrent_operations
|
||||
if self._greenpool_size != 0:
|
||||
self._greenpool = greenpool.GreenPool(self._greenpool_size)
|
||||
|
||||
def _spawn(self, func, *args, **kwargs):
|
||||
if self._greenpool is not None:
|
||||
return self._greenpool.spawn_n(func, *args, **kwargs)
|
||||
else:
|
||||
return greenthread.spawn_n(func, *args, **kwargs)
|
||||
|
||||
def init_host(self, **kwargs):
|
||||
"""Handle initialization if this is a standalone service"""
|
||||
|
@ -84,32 +101,32 @@ class ProtectionManager(manager.Manager):
|
|||
provider_id = plan.get('provider_id', None)
|
||||
plan_id = plan.get('id', None)
|
||||
provider = self.provider_registry.show_provider(provider_id)
|
||||
checkpoint_collection = provider.get_checkpoint_collection()
|
||||
try:
|
||||
checkpoint = checkpoint_collection.create(plan)
|
||||
except Exception as e:
|
||||
LOG.exception(_LE("Failed to create checkpoint, plan: %s"),
|
||||
plan_id)
|
||||
exc = exception.FlowError(flow="protect",
|
||||
error="Error creating checkpoint")
|
||||
six.raise_from(exc, e)
|
||||
try:
|
||||
protection_flow = self.worker.get_flow(context,
|
||||
constants.OPERATION_PROTECT,
|
||||
plan=plan,
|
||||
provider=provider)
|
||||
except Exception:
|
||||
provider=provider,
|
||||
checkpoint=checkpoint)
|
||||
except Exception as e:
|
||||
LOG.exception(_LE("Failed to create protection flow, plan: %s"),
|
||||
plan_id)
|
||||
raise exception.FlowError(
|
||||
flow="protect",
|
||||
error=_("Failed to create flow"))
|
||||
try:
|
||||
self.worker.run_flow(protection_flow)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed to run protection flow, plan: %s"),
|
||||
plan_id)
|
||||
error=e.msg if hasattr(e, 'msg') else 'Internal error')
|
||||
self._spawn(self.worker.run_flow, protection_flow)
|
||||
return checkpoint.id
|
||||
|
||||
raise exception.FlowError(
|
||||
flow="protect",
|
||||
error=_("Failed to run flow"))
|
||||
finally:
|
||||
checkpoint = self.worker.flow_outputs(protection_flow,
|
||||
target='checkpoint')
|
||||
return {'checkpoint_id': checkpoint.id}
|
||||
|
||||
@messaging.expected_exceptions(exception.InvalidInput,
|
||||
@messaging.expected_exceptions(exception.ProviderNotFound,
|
||||
exception.CheckpointNotFound,
|
||||
exception.CheckpointNotAvailable,
|
||||
exception.FlowError)
|
||||
def restore(self, context, restore, restore_auth):
|
||||
|
@ -118,13 +135,11 @@ class ProtectionManager(manager.Manager):
|
|||
checkpoint_id = restore["checkpoint_id"]
|
||||
provider_id = restore["provider_id"]
|
||||
provider = self.provider_registry.show_provider(provider_id)
|
||||
try:
|
||||
checkpoint_collection = provider.get_checkpoint_collection()
|
||||
checkpoint = checkpoint_collection.get(checkpoint_id)
|
||||
except Exception:
|
||||
LOG.error(_LE("Invalid checkpoint id: %s"), checkpoint_id)
|
||||
raise exception.InvalidInput(
|
||||
reason=_("Invalid checkpoint id"))
|
||||
if not provider:
|
||||
raise exception.ProviderNotFound(provider_id=provider_id)
|
||||
|
||||
checkpoint_collection = provider.get_checkpoint_collection()
|
||||
checkpoint = checkpoint_collection.get(checkpoint_id)
|
||||
|
||||
if checkpoint.status != constants.CHECKPOINT_STATUS_AVAILABLE:
|
||||
raise exception.CheckpointNotAvailable(
|
||||
|
@ -145,15 +160,7 @@ class ProtectionManager(manager.Manager):
|
|||
raise exception.FlowError(
|
||||
flow="restore",
|
||||
error=_("Failed to create flow"))
|
||||
try:
|
||||
self.worker.run_flow(restoration_flow)
|
||||
except Exception:
|
||||
LOG.exception(
|
||||
_LE("Failed to run restoration flow checkpoint: %s"),
|
||||
checkpoint_id)
|
||||
raise exception.FlowError(
|
||||
flow="restore",
|
||||
error=_("Failed to run flow"))
|
||||
self._spawn(self.worker.run_flow, restoration_flow)
|
||||
|
||||
def delete(self, context, provider_id, checkpoint_id):
|
||||
LOG.info(_LI("Starting protection service:delete action"))
|
||||
|
@ -191,12 +198,7 @@ class ProtectionManager(manager.Manager):
|
|||
raise exception.KarborException(_(
|
||||
"Failed to create delete checkpoint flow."
|
||||
))
|
||||
try:
|
||||
self.worker.run_flow(delete_checkpoint_flow)
|
||||
return True
|
||||
except Exception:
|
||||
LOG.exception(_LE("Failed to run delete checkpoint flow"))
|
||||
raise
|
||||
self._spawn(self.worker.run_flow, delete_checkpoint_flow)
|
||||
|
||||
def start(self, plan):
|
||||
# TODO(wangliuan)
|
||||
|
|
|
@ -9,60 +9,129 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Operation(object):
|
||||
def __init__(self):
|
||||
super(Operation, self).__init__()
|
||||
|
||||
def on_prepare_begin(self, checkpoint, resource, context, parameters,
|
||||
**kwargs):
|
||||
"""on_prepare_begin hook runs before any child resource's hooks run
|
||||
|
||||
Optional
|
||||
:param checkpoint: checkpoint object for this operation
|
||||
:param resource: a resource object for this operation
|
||||
:param context: current operation context (viable for clients)
|
||||
:param parameters: dictionary representing operation parameters
|
||||
:param heat_template: HeatTemplate for restore operation only
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_prepare_finish(self, checkpoint, resource, context, parameters,
|
||||
**kwargs):
|
||||
"""on_prepare_finish hook runs after all child resources' prepare hooks
|
||||
|
||||
Optional
|
||||
:param checkpoint: checkpoint object for this operation
|
||||
:param resource: a resource object for this operation
|
||||
:param context: current operation context (viable for clients)
|
||||
:param parameters: dictionary representing operation parameters
|
||||
:param heat_template: HeatTemplate for restore operation only
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_main(self, checkpoint, resource, context, parameters, **kwargs):
|
||||
"""on_main hook runs in parallel to other resources' on_main hooks
|
||||
|
||||
Your main operation heavy lifting should probably be here.
|
||||
Optional
|
||||
:param checkpoint: checkpoint object for this operation
|
||||
:param resource: a resource object for this operation
|
||||
:param context: current operation context (viable for clients)
|
||||
:param parameters: dictionary representing operation parameters
|
||||
:param heat_template: HeatTemplate for restore operation only
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_complete(self, checkpoint, resource, context, parameters, **kwargs):
|
||||
"""on_complete hook runs after all dependent resource's hooks
|
||||
|
||||
Optional
|
||||
:param checkpoint: checkpoint object for this operation
|
||||
:param resource: a resource object for this operation
|
||||
:param context: current operation context (viable for clients)
|
||||
:param parameters: dictionary representing operation parameters
|
||||
:param heat_template: HeatTemplate for restore operation only
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ProtectionPlugin(object):
|
||||
def __init__(self, config=None):
|
||||
self._config = config
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_supported_resources_types(self):
|
||||
# TODO(wangliuan)
|
||||
pass
|
||||
def get_protect_operation(self, resource):
|
||||
"""Returns the protect Operation for this resource
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_options_schema(self, resources_type):
|
||||
# TODO(wangliuan)
|
||||
pass
|
||||
:returns: Operation for the resource
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_saved_info_schema(self, resources_type):
|
||||
# TODO(wangliuan)
|
||||
pass
|
||||
def get_restore_operation(self, resource):
|
||||
"""Returns the restore Operation for this resource
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_restore_schema(self, resources_type):
|
||||
# TODO(wangliuan)
|
||||
pass
|
||||
:returns: Operation for the resource
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_saved_info(self, metadata_store, resource):
|
||||
# TODO(wangliuan)
|
||||
pass
|
||||
def get_delete_operation(self, resource):
|
||||
"""Returns the delete Operation for this resource
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_resource_stats(self, checkpoint, resource_id):
|
||||
# TODO(wangliuan)
|
||||
pass
|
||||
:returns: Operation for the resource
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def on_resource_start(self, context):
|
||||
# TODO(wangliuan)
|
||||
pass
|
||||
@classmethod
|
||||
def get_supported_resources_types(cls):
|
||||
"""Returns a list of resource types this plugin supports
|
||||
|
||||
@abc.abstractmethod
|
||||
def on_resource_end(self, context):
|
||||
# TODO(wangliuan)
|
||||
pass
|
||||
:returns: a list of resource types
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls, resource_type):
|
||||
"""Returns the protect options schema for a resource type
|
||||
|
||||
:returns: a dictionary representing the schema
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def get_saved_info_schema(cls, resource_type):
|
||||
"""Returns the saved info schema for a resource type
|
||||
|
||||
:returns: a dictionary representing the schema
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def get_restore_schema(cls, resource_type):
|
||||
"""Returns the restore schema for a resource type
|
||||
|
||||
:returns: a dictionary representing the schema
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@classmethod
|
||||
def get_saved_info(cls, metadata_store, resource):
|
||||
"""Returns the saved info for a resource
|
||||
|
||||
:returns: a dictionary representing the saved info
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
|
|
@ -98,12 +98,3 @@ class BaseProtectionPlugin(ProtectionPlugin):
|
|||
child_task, parent_task)
|
||||
else:
|
||||
task_stack.pop()
|
||||
|
||||
def get_resource_stats(self, checkpoint, resource_id):
|
||||
# Get the status of this resource
|
||||
bank_section = checkpoint.get_resource_bank_section(resource_id)
|
||||
try:
|
||||
status = bank_section.get_object("status")
|
||||
return status
|
||||
except Exception:
|
||||
return constants.RESOURCE_STATUS_UNDEFINED
|
||||
|
|
|
@ -50,25 +50,20 @@ class GlanceProtectionPlugin(BaseProtectionPlugin):
|
|||
def _add_to_threadpool(self, func, *args, **kwargs):
|
||||
self._tp.spawn_n(func, *args, **kwargs)
|
||||
|
||||
def get_resource_stats(self, checkpoint, resource_id):
|
||||
# Get the status of this resource
|
||||
bank_section = checkpoint.get_resource_bank_section(resource_id)
|
||||
try:
|
||||
status = bank_section.get_object("status")
|
||||
return status
|
||||
except Exception:
|
||||
return "undefined"
|
||||
|
||||
def get_options_schema(self, resources_type):
|
||||
@classmethod
|
||||
def get_options_schema(cls, resources_type):
|
||||
return image_schemas.OPTIONS_SCHEMA
|
||||
|
||||
def get_restore_schema(self, resources_type):
|
||||
@classmethod
|
||||
def get_restore_schema(cls, resources_type):
|
||||
return image_schemas.RESTORE_SCHEMA
|
||||
|
||||
def get_saved_info_schema(self, resources_type):
|
||||
@classmethod
|
||||
def get_saved_info_schema(cls, resources_type):
|
||||
return image_schemas.SAVED_INFO_SCHEMA
|
||||
|
||||
def get_saved_info(self, metadata_store, resource):
|
||||
@classmethod
|
||||
def get_saved_info(cls, metadata_store, resource):
|
||||
pass
|
||||
|
||||
def _glance_client(self, cntxt):
|
||||
|
@ -228,5 +223,6 @@ class GlanceProtectionPlugin(BaseProtectionPlugin):
|
|||
resource_id=image_id,
|
||||
resource_type=constants.IMAGE_RESOURCE_TYPE)
|
||||
|
||||
def get_supported_resources_types(self):
|
||||
return self._SUPPORT_RESOURCE_TYPES
|
||||
@classmethod
|
||||
def get_supported_resources_types(cls):
|
||||
return cls._SUPPORT_RESOURCE_TYPES
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor.services.protection import protection_plugin
|
||||
from oslo_log import log as logging
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NoopOperation(protection_plugin.Operation):
|
||||
def __init__(self):
|
||||
super(NoopOperation, self).__init__()
|
||||
|
||||
def on_prepare_begin(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def on_prepare_finish(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def on_main(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
def on_complete(self, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class NoopProtectionPlugin(protection_plugin.ProtectionPlugin):
|
||||
def __init__(self, config=None):
|
||||
super(NoopProtectionPlugin, self).__init__(config)
|
||||
|
||||
def get_protect_operation(self, resource):
|
||||
return NoopOperation()
|
||||
|
||||
def get_restore_operation(self, resource):
|
||||
return NoopOperation()
|
||||
|
||||
def get_delete_operation(self, resource):
|
||||
return NoopOperation()
|
||||
|
||||
@classmethod
|
||||
def get_supported_resources_types(cls):
|
||||
return constants.RESOURCE_TYPES
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls, resource_type):
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
def get_saved_info_schema(cls, resource_type):
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
def get_restore_schema(cls, resource_type):
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
def get_saved_info(cls, metadata_store, resource):
|
||||
return None
|
|
@ -53,16 +53,20 @@ class NovaProtectionPlugin(BaseProtectionPlugin):
|
|||
def _add_to_threadpool(self, func, *args, **kwargs):
|
||||
self._tp.spawn_n(func, *args, **kwargs)
|
||||
|
||||
def get_options_schema(self, resource_type):
|
||||
@classmethod
|
||||
def get_options_schema(cls, resource_type):
|
||||
return server_plugin_schemas.OPTIONS_SCHEMA
|
||||
|
||||
def get_restore_schema(self, resource_type):
|
||||
@classmethod
|
||||
def get_restore_schema(cls, resource_type):
|
||||
return server_plugin_schemas.RESTORE_SCHEMA
|
||||
|
||||
def get_saved_info_schema(self, resource_type):
|
||||
@classmethod
|
||||
def get_saved_info_schema(cls, resource_type):
|
||||
return server_plugin_schemas.SAVED_INFO_SCHEMA
|
||||
|
||||
def get_saved_info(self, metadata_store, resource):
|
||||
@classmethod
|
||||
def get_saved_info(cls, metadata_store, resource):
|
||||
# TODO(luobin)
|
||||
pass
|
||||
|
||||
|
@ -487,5 +491,6 @@ class NovaProtectionPlugin(BaseProtectionPlugin):
|
|||
resource_id=resource_id,
|
||||
resource_type=constants.SERVER_RESOURCE_TYPE)
|
||||
|
||||
def get_supported_resources_types(self):
|
||||
return self._SUPPORT_RESOURCE_TYPES
|
||||
@classmethod
|
||||
def get_supported_resources_types(cls):
|
||||
return cls._SUPPORT_RESOURCE_TYPES
|
||||
|
|
|
@ -52,19 +52,24 @@ class CinderProtectionPlugin(BaseProtectionPlugin):
|
|||
sync_status_loop.start(interval=self.protection_sync_interval,
|
||||
initial_delay=self.protection_sync_interval)
|
||||
|
||||
def get_supported_resources_types(self):
|
||||
return self._SUPPORT_RESOURCE_TYPES
|
||||
@classmethod
|
||||
def get_supported_resources_types(cls):
|
||||
return cls._SUPPORT_RESOURCE_TYPES
|
||||
|
||||
def get_options_schema(self, resources_type):
|
||||
@classmethod
|
||||
def get_options_schema(cls, resources_type):
|
||||
return cinder_schemas.OPTIONS_SCHEMA
|
||||
|
||||
def get_restore_schema(self, resources_type):
|
||||
@classmethod
|
||||
def get_restore_schema(cls, resources_type):
|
||||
return cinder_schemas.RESTORE_SCHEMA
|
||||
|
||||
def get_saved_info_schema(self, resources_type):
|
||||
@classmethod
|
||||
def get_saved_info_schema(cls, resources_type):
|
||||
return cinder_schemas.SAVED_INFO_SCHEMA
|
||||
|
||||
def get_saved_info(self, metadata_store, resource):
|
||||
@classmethod
|
||||
def get_saved_info(cls, metadata_store, resource):
|
||||
# TODO(hurong)
|
||||
pass
|
||||
|
||||
|
|
|
@ -11,18 +11,12 @@
|
|||
# under the License.
|
||||
|
||||
import os
|
||||
import six
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor import exception
|
||||
from karbor.i18n import _, _LE
|
||||
from karbor.resource import Resource
|
||||
from karbor.services.protection import bank_plugin
|
||||
from karbor.services.protection.checkpoint import CheckpointCollection
|
||||
from karbor.services.protection.graph import GraphWalker
|
||||
from karbor.services.protection.protectable_registry import ProtectableRegistry
|
||||
from karbor.services.protection.resource_graph import ResourceGraphContext
|
||||
from karbor.services.protection.resource_graph \
|
||||
import ResourceGraphWalkerListener
|
||||
from karbor import utils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
@ -84,7 +78,7 @@ class PluggableProtectionProvider(object):
|
|||
for plugin_name in self._config.provider.plugin:
|
||||
if not plugin_name:
|
||||
raise ImportError(_("Empty protection plugin"))
|
||||
self._load_plugin(plugin_name)
|
||||
self._register_plugin(plugin_name)
|
||||
|
||||
@property
|
||||
def id(self):
|
||||
|
@ -110,6 +104,12 @@ class PluggableProtectionProvider(object):
|
|||
def plugins(self):
|
||||
return self._plugin_map
|
||||
|
||||
def load_plugins(self):
|
||||
return {
|
||||
plugin_type: plugin_class(self._config)
|
||||
for plugin_type, plugin_class in six.iteritems(self.plugins)
|
||||
}
|
||||
|
||||
def _load_bank(self, bank_name):
|
||||
try:
|
||||
plugin = utils.load_plugin(PROTECTION_NAMESPACE, bank_name,
|
||||
|
@ -121,17 +121,16 @@ class PluggableProtectionProvider(object):
|
|||
else:
|
||||
self._bank_plugin = plugin
|
||||
|
||||
def _load_plugin(self, plugin_name):
|
||||
def _register_plugin(self, plugin_name):
|
||||
try:
|
||||
plugin = utils.load_plugin(PROTECTION_NAMESPACE, plugin_name,
|
||||
self._config)
|
||||
plugin = utils.load_class(PROTECTION_NAMESPACE, plugin_name)
|
||||
except Exception:
|
||||
LOG.exception(_LE("Load protection plugin: '%s' failed."),
|
||||
plugin_name)
|
||||
raise
|
||||
else:
|
||||
self._plugin_map[plugin_name] = plugin
|
||||
for resource in plugin.get_supported_resources_types():
|
||||
self._plugin_map[resource] = plugin
|
||||
if hasattr(plugin, 'get_options_schema'):
|
||||
self._extended_info_schema['options_schema'][resource] \
|
||||
= plugin.get_options_schema(resource)
|
||||
|
@ -155,88 +154,6 @@ class PluggableProtectionProvider(object):
|
|||
limit=limit, marker=marker, plan_id=plan_id,
|
||||
start_date=start_date, end_date=end_date, sort_dir=sort_dir)
|
||||
|
||||
def build_task_flow(self, ctx):
|
||||
cntxt = ctx["context"]
|
||||
workflow_engine = ctx["workflow_engine"]
|
||||
operation = ctx["operation_type"]
|
||||
|
||||
resource_context = None
|
||||
resource_graph = None
|
||||
|
||||
if operation == constants.OPERATION_PROTECT:
|
||||
plan = ctx["plan"]
|
||||
task_flow = workflow_engine.build_flow(flow_name=plan.get('id'))
|
||||
resources = plan.get('resources')
|
||||
parameters = plan.get('parameters')
|
||||
graph_resources = []
|
||||
for resource in resources:
|
||||
graph_resources.append(Resource(type=resource['type'],
|
||||
id=resource['id'],
|
||||
name=resource['name']))
|
||||
# TODO(luobin): pass registry in ctx
|
||||
registry = ProtectableRegistry()
|
||||
registry.load_plugins()
|
||||
resource_graph = registry.build_graph(cntxt, graph_resources)
|
||||
resource_context = ResourceGraphContext(
|
||||
cntxt=cntxt,
|
||||
operation=operation,
|
||||
workflow_engine=workflow_engine,
|
||||
task_flow=task_flow,
|
||||
plugin_map=self._plugin_map,
|
||||
parameters=parameters
|
||||
)
|
||||
if operation == constants.OPERATION_RESTORE:
|
||||
restore = ctx['restore']
|
||||
task_flow = workflow_engine.build_flow(
|
||||
flow_name=restore.get('id'))
|
||||
checkpoint = ctx["checkpoint"]
|
||||
resource_graph = checkpoint.resource_graph
|
||||
parameters = restore.get('parameters')
|
||||
heat_template = ctx["heat_template"]
|
||||
resource_context = ResourceGraphContext(
|
||||
cntxt=cntxt,
|
||||
checkpoint=checkpoint,
|
||||
operation=operation,
|
||||
workflow_engine=workflow_engine,
|
||||
task_flow=task_flow,
|
||||
plugin_map=self._plugin_map,
|
||||
parameters=parameters,
|
||||
heat_template=heat_template
|
||||
)
|
||||
if operation == constants.OPERATION_DELETE:
|
||||
checkpoint = ctx['checkpoint']
|
||||
task_flow = workflow_engine.build_flow(
|
||||
flow_name=checkpoint.id)
|
||||
resource_graph = checkpoint.resource_graph
|
||||
resource_context = ResourceGraphContext(
|
||||
cntxt=cntxt,
|
||||
checkpoint=checkpoint,
|
||||
operation=operation,
|
||||
workflow_engine=workflow_engine,
|
||||
task_flow=task_flow,
|
||||
plugin_map=self._plugin_map
|
||||
)
|
||||
|
||||
# TODO(luobin): for other type operations
|
||||
|
||||
walker_listener = ResourceGraphWalkerListener(resource_context)
|
||||
graph_walker = GraphWalker()
|
||||
graph_walker.register_listener(walker_listener)
|
||||
graph_walker.walk_graph(resource_graph)
|
||||
|
||||
if operation == constants.OPERATION_PROTECT:
|
||||
return {"task_flow": walker_listener.context.task_flow,
|
||||
"status_getters": walker_listener.context.status_getters,
|
||||
"resource_graph": resource_graph}
|
||||
if operation == constants.OPERATION_RESTORE:
|
||||
return {"task_flow": walker_listener.context.task_flow}
|
||||
if operation == constants.OPERATION_DELETE:
|
||||
return {"task_flow": walker_listener.context.task_flow,
|
||||
"status_getters": walker_listener.context.status_getters
|
||||
}
|
||||
|
||||
# TODO(luobin): for other type operations
|
||||
|
||||
|
||||
class ProviderRegistry(object):
|
||||
def __init__(self):
|
||||
|
|
|
@ -0,0 +1,183 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from collections import namedtuple
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor import exception
|
||||
from karbor.i18n import _LI
|
||||
from karbor.services.protection import graph
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
HOOKS = (
|
||||
HOOK_PRE_BEGIN,
|
||||
HOOK_PRE_FINISH,
|
||||
HOOK_MAIN,
|
||||
HOOK_COMPLETE
|
||||
) = (
|
||||
'on_prepare_begin',
|
||||
'on_prepare_finish',
|
||||
'on_main',
|
||||
'on_complete'
|
||||
)
|
||||
|
||||
ResourceHooks = namedtuple('ResourceHooks', [
|
||||
HOOK_PRE_BEGIN,
|
||||
HOOK_PRE_FINISH,
|
||||
HOOK_MAIN,
|
||||
HOOK_COMPLETE,
|
||||
])
|
||||
|
||||
|
||||
OPERATION_EXTRA_ARGS = {
|
||||
constants.OPERATION_RESTORE: ['heat_template'],
|
||||
}
|
||||
|
||||
|
||||
def noop_handle(*args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class ResourceFlowGraphWalkerListener(graph.GraphWalkerListener):
|
||||
def __init__(self, resource_flow, operation_type, context, parameters,
|
||||
plugins, workflow_engine):
|
||||
super(ResourceFlowGraphWalkerListener, self).__init__()
|
||||
self.operation_type = operation_type
|
||||
self.context = context
|
||||
self.parameters = parameters or {}
|
||||
self.plugins = plugins
|
||||
self.workflow_engine = workflow_engine
|
||||
self.flow = resource_flow
|
||||
|
||||
self.node_tasks = {}
|
||||
self.task_stack = []
|
||||
self.current_resource = None
|
||||
|
||||
def _create_hook_tasks(self, operation_obj, resource):
|
||||
pre_begin_task = self._create_hook_task(operation_obj, resource,
|
||||
HOOK_PRE_BEGIN)
|
||||
pre_finish_task = self._create_hook_task(operation_obj, resource,
|
||||
HOOK_PRE_FINISH)
|
||||
main_task = self._create_hook_task(operation_obj, resource,
|
||||
HOOK_MAIN)
|
||||
post_task = self._create_hook_task(operation_obj, resource,
|
||||
HOOK_COMPLETE)
|
||||
|
||||
return ResourceHooks(pre_begin_task, pre_finish_task, main_task,
|
||||
post_task)
|
||||
|
||||
def _create_hook_task(self, operation_obj, resource, hook_type):
|
||||
method = getattr(operation_obj, hook_type, noop_handle)
|
||||
assert callable(method), 'Resource {} method "{}" is not callable' \
|
||||
.format(resource.type, hook_type)
|
||||
|
||||
task_name = "{operation_type}_{hook_type}_{type}_{id}".format(
|
||||
type=resource.type,
|
||||
id=resource.id,
|
||||
hook_type=hook_type,
|
||||
operation_type=self.operation_type,
|
||||
)
|
||||
|
||||
parameters = {}
|
||||
parameters.update(self.parameters.get(resource.type, {}))
|
||||
resource_id = '{}#{}'.format(resource.type, resource.id)
|
||||
parameters.update(self.parameters.get(resource_id, {}))
|
||||
injects = {
|
||||
'context': self.context,
|
||||
'parameters': parameters,
|
||||
'resource': resource,
|
||||
}
|
||||
requires = list(injects)
|
||||
requires.append('checkpoint')
|
||||
requires.extend(OPERATION_EXTRA_ARGS.get(self.operation_type, []))
|
||||
|
||||
task = self.workflow_engine.create_task(method,
|
||||
name=task_name,
|
||||
inject=injects,
|
||||
requires=requires)
|
||||
return task
|
||||
|
||||
def on_node_enter(self, node, already_visited):
|
||||
resource = node.value
|
||||
LOG.debug(
|
||||
"Enter node (type: %(type)s id: %(id)s visited: %(visited)s)",
|
||||
{"type": resource.type, "id": resource.id, "visited":
|
||||
already_visited}
|
||||
)
|
||||
self.current_resource = resource
|
||||
if already_visited:
|
||||
self.task_stack.append(self.node_tasks[resource.id])
|
||||
return
|
||||
|
||||
if resource.type not in self.plugins:
|
||||
raise exception.ProtectionPluginNotFound(type=resource.type)
|
||||
|
||||
protection_plugin = self.plugins[resource.type]
|
||||
operation_getter_name = 'get_{}_operation'.format(self.operation_type)
|
||||
operation_getter = getattr(protection_plugin, operation_getter_name)
|
||||
assert callable(operation_getter)
|
||||
operation_obj = operation_getter(resource)
|
||||
hooks = self._create_hook_tasks(operation_obj, resource)
|
||||
LOG.debug("added operation %s hooks", self.operation_type)
|
||||
self.node_tasks[resource.id] = hooks
|
||||
self.task_stack.append(hooks)
|
||||
self.workflow_engine.add_tasks(self.flow, hooks.on_prepare_begin,
|
||||
hooks.on_prepare_finish, hooks.on_main,
|
||||
hooks.on_complete)
|
||||
self.workflow_engine.link_task(self.flow, hooks.on_prepare_begin,
|
||||
hooks.on_prepare_finish)
|
||||
self.workflow_engine.link_task(self.flow, hooks.on_prepare_finish,
|
||||
hooks.on_main)
|
||||
self.workflow_engine.link_task(self.flow, hooks.on_main,
|
||||
hooks.on_complete)
|
||||
|
||||
def on_node_exit(self, node):
|
||||
resource = node.value
|
||||
LOG.debug(
|
||||
"Exit node (type: %(type)s id: %(id)s)",
|
||||
{"type": resource.type, "id": resource.id}
|
||||
)
|
||||
child_hooks = self.task_stack.pop()
|
||||
if len(self.task_stack) > 0:
|
||||
parent_hooks = self.task_stack[-1]
|
||||
self.workflow_engine.link_task(self.flow,
|
||||
parent_hooks.on_prepare_begin,
|
||||
child_hooks.on_prepare_begin)
|
||||
self.workflow_engine.link_task(self.flow,
|
||||
child_hooks.on_prepare_finish,
|
||||
parent_hooks.on_prepare_finish)
|
||||
self.workflow_engine.link_task(self.flow, child_hooks.on_complete,
|
||||
parent_hooks.on_complete)
|
||||
|
||||
|
||||
def build_resource_flow(operation_type, context, workflow_engine,
|
||||
plugins, resource_graph, parameters):
|
||||
LOG.info(_LI("Build resource flow for operation %s"), operation_type)
|
||||
|
||||
resource_graph_flow = workflow_engine.build_flow(
|
||||
'ResourceGraphFlow_{}'.format(operation_type),
|
||||
'graph',
|
||||
)
|
||||
resource_walker = ResourceFlowGraphWalkerListener(resource_graph_flow,
|
||||
operation_type,
|
||||
context,
|
||||
parameters,
|
||||
plugins,
|
||||
workflow_engine)
|
||||
walker = graph.GraphWalker()
|
||||
walker.register_listener(resource_walker)
|
||||
LOG.debug("Starting resource graph walk (operation %s)", operation_type)
|
||||
walker.walk_graph(resource_graph)
|
||||
LOG.debug("Finished resource graph walk (operation %s)", operation_type)
|
||||
return resource_graph_flow
|
|
@ -1,105 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor.i18n import _, _LE, _LI
|
||||
from karbor.services.protection.graph import GraphWalkerListener
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ResourceGraphContext(object):
|
||||
def __init__(self, cntxt, is_first_visited=False, operation="protect",
|
||||
parameters=None, plugin_map=None, node=None,
|
||||
workflow_engine=None, task_flow=None, checkpoint=None,
|
||||
heat_template=None):
|
||||
self.cntxt = cntxt
|
||||
self.is_first_visited = is_first_visited
|
||||
self.operation = operation
|
||||
self.parameters = parameters
|
||||
self.plugin_map = plugin_map
|
||||
self.node = node
|
||||
self.workflow_engine = workflow_engine
|
||||
self.task_flow = task_flow
|
||||
self.task_stack = []
|
||||
|
||||
# used for protection
|
||||
self.status_getters = []
|
||||
|
||||
# used for restoration
|
||||
self.checkpoint = checkpoint
|
||||
self.heat_template = heat_template
|
||||
|
||||
def get_node_context(self, node, is_first_visited=None):
|
||||
node_context = ResourceGraphContext(
|
||||
self.cntxt,
|
||||
is_first_visited=is_first_visited,
|
||||
operation=self.operation,
|
||||
parameters=self.parameters,
|
||||
plugin_map=self.plugin_map,
|
||||
node=node,
|
||||
workflow_engine=self.workflow_engine,
|
||||
task_flow=self.task_flow,
|
||||
checkpoint=self.checkpoint,
|
||||
heat_template=self.heat_template
|
||||
)
|
||||
node_context.task_stack = self.task_stack
|
||||
node_context.status_getters = self.status_getters
|
||||
return node_context
|
||||
|
||||
|
||||
class ResourceGraphWalkerListener(GraphWalkerListener):
|
||||
def __init__(self, context):
|
||||
self.context = context
|
||||
self.plugin_map = self.context.plugin_map
|
||||
|
||||
def on_node_enter(self, node, already_visited):
|
||||
resource = node.value
|
||||
resource_type = resource.type
|
||||
LOG.info(_LI("on_node_enter, node resource_type: %s"), resource_type)
|
||||
protection_plugin = self._get_protection_plugin(resource_type)
|
||||
|
||||
# get node context
|
||||
is_first_visited = not already_visited
|
||||
context = self.context.get_node_context(node, is_first_visited)
|
||||
# do something in protection_plugin
|
||||
protection_plugin.on_resource_start(context)
|
||||
|
||||
if self.context.operation == constants.OPERATION_PROTECT \
|
||||
or self.context.operation == constants.OPERATION_DELETE:
|
||||
if not already_visited:
|
||||
self.context.status_getters.append(
|
||||
{"resource_id": resource.id,
|
||||
"get_resource_stats": protection_plugin.get_resource_stats
|
||||
}
|
||||
)
|
||||
|
||||
def on_node_exit(self, node):
|
||||
resource = node.value
|
||||
resource_type = resource.type
|
||||
LOG.info(_LI("on_node_exit, node resource_type: %s"), resource_type)
|
||||
protection_plugin = self._get_protection_plugin(resource_type)
|
||||
|
||||
# get node context
|
||||
context = self.context.get_node_context(node)
|
||||
# do something in protection_plugin
|
||||
protection_plugin.on_resource_end(context)
|
||||
|
||||
def _get_protection_plugin(self, resource_type):
|
||||
for plugin in self.plugin_map.values():
|
||||
if hasattr(plugin, "get_supported_resources_types"):
|
||||
if resource_type in plugin.get_supported_resources_types():
|
||||
return plugin
|
||||
LOG.error(_LE("no plugin support this resource_type:%s"),
|
||||
resource_type)
|
||||
raise Exception(_("No plugin support this resource_type"))
|
|
@ -170,6 +170,8 @@ class KarborBaseTest(base.BaseTestCase):
|
|||
self.karbor_client = _get_karbor_client_from_creds()
|
||||
self.keystone_endpoint = _get_keystone_endpoint_from_creds()
|
||||
self._testcase_store = ObjectStore()
|
||||
self.provider_id_noop = 'b766f37c-d011-4026-8228-28730d734a3f'
|
||||
self.provider_id_os = 'cf56bd3e-97a7-4078-b6d5-f36246333fd9'
|
||||
|
||||
def store(self, obj, close_func=None):
|
||||
return self._testcase_store.store(obj, close_func)
|
||||
|
|
|
@ -19,15 +19,14 @@ class CheckpointsTest(karbor_base.KarborBaseTest):
|
|||
"""Test Checkpoints operation """
|
||||
def setUp(self):
|
||||
super(CheckpointsTest, self).setUp()
|
||||
providers = self.provider_list()
|
||||
self.assertTrue(len(providers))
|
||||
self.provider_id = providers[0].id
|
||||
self.provider_id = self.provider_id_noop
|
||||
|
||||
def test_checkpoint_create(self):
|
||||
self.skipTest('Requires cinder protection plugin adjustment')
|
||||
volume = self.store(objects.Volume())
|
||||
volume.create(1)
|
||||
plan = self.store(objects.Plan())
|
||||
plan.create(self.provider_id, [volume, ])
|
||||
plan.create(self.provider_id_os, [volume, ])
|
||||
|
||||
backups = self.cinder_client.backups.list()
|
||||
before_num = len(backups)
|
||||
|
|
|
@ -19,9 +19,7 @@ class PlansTest(karbor_base.KarborBaseTest):
|
|||
"""Test Plans operation"""
|
||||
def setUp(self):
|
||||
super(PlansTest, self).setUp()
|
||||
providers = self.provider_list()
|
||||
self.assertTrue(len(providers))
|
||||
self.provider_id = providers[0].id
|
||||
self.provider_id = self.provider_id_noop
|
||||
|
||||
def test_plans_list(self):
|
||||
nplans_before = len(self.karbor_client.plans.list())
|
||||
|
|
|
@ -27,9 +27,7 @@ class RestoresTest(karbor_base.KarborBaseTest):
|
|||
|
||||
def setUp(self):
|
||||
super(RestoresTest, self).setUp()
|
||||
providers = self.provider_list()
|
||||
self.assertTrue(len(providers))
|
||||
self.provider_id = providers[0].id
|
||||
self.provider_id = self.provider_id_noop
|
||||
|
||||
def _store_volume(self, volumes_pre, volumes_post):
|
||||
volumes = list(set(volumes_post).difference(set(volumes_pre)))
|
||||
|
|
|
@ -25,7 +25,7 @@ class ScheduledOperationsTest(karbor_base.KarborBaseTest):
|
|||
super(ScheduledOperationsTest, self).setUp()
|
||||
providers = self.provider_list()
|
||||
self.assertTrue(len(providers))
|
||||
self.provider_id = providers[0].id
|
||||
self.provider_id = self.provider_id_noop
|
||||
|
||||
def _create_scheduled_operation(self,
|
||||
trigger_properties,
|
||||
|
|
|
@ -23,6 +23,7 @@ class FakeBankPlugin(bank_plugin.BankPlugin):
|
|||
def __init__(self, config=None):
|
||||
super(FakeBankPlugin, self).__init__(config)
|
||||
config.register_opts(fake_bank_opts, 'fake_bank')
|
||||
self.fake_host = config['fake_bank']['fake_host']
|
||||
|
||||
def create_object(self, key, value):
|
||||
return
|
||||
|
|
|
@ -1,49 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from karbor.services.protection import protection_plugin
|
||||
|
||||
fake_plugin_opts = [
|
||||
cfg.StrOpt('fake_user'),
|
||||
]
|
||||
|
||||
|
||||
class FakeProtectionPlugin(protection_plugin.ProtectionPlugin):
|
||||
def __init__(self, config=None):
|
||||
super(FakeProtectionPlugin, self).__init__(config)
|
||||
config.register_opts(fake_plugin_opts, 'fake_plugin')
|
||||
|
||||
def get_supported_resources_types(self):
|
||||
return ['Test::Resource']
|
||||
|
||||
def get_options_schema(self, resource_type):
|
||||
return []
|
||||
|
||||
def get_saved_info_schema(self, resource_type):
|
||||
return []
|
||||
|
||||
def get_restore_schema(self, resource_type):
|
||||
return []
|
||||
|
||||
def get_saved_info(self, metadata_store, resource):
|
||||
pass
|
||||
|
||||
def get_resource_stats(self, checkpoint, resource_id):
|
||||
pass
|
||||
|
||||
def on_resource_start(self, context):
|
||||
pass
|
||||
|
||||
def on_resource_end(self, context):
|
||||
pass
|
|
@ -3,7 +3,7 @@ name = fake_provider1
|
|||
id = fake_id1
|
||||
description = Test Provider 1
|
||||
bank = karbor.tests.unit.fake_bank.FakeBankPlugin
|
||||
plugin = karbor.tests.unit.fake_protection.FakeProtectionPlugin
|
||||
plugin = karbor.tests.unit.protection.fakes.FakeProtectionPlugin
|
||||
|
||||
[fake_plugin]
|
||||
fake_user = user
|
||||
|
|
|
@ -14,7 +14,9 @@
|
|||
# under the License.
|
||||
|
||||
import futurist
|
||||
import mock
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from karbor.i18n import _LE
|
||||
|
@ -23,11 +25,14 @@ from karbor.services.protection.bank_plugin import Bank
|
|||
from karbor.services.protection.bank_plugin import BankPlugin
|
||||
from karbor.services.protection.bank_plugin import BankSection
|
||||
from karbor.services.protection.graph import build_graph
|
||||
from karbor.services.protection import protection_plugin
|
||||
from karbor.services.protection import provider
|
||||
from karbor.services.protection import resource_flow
|
||||
|
||||
from taskflow import engines
|
||||
from taskflow.patterns import graph_flow
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow import task
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -73,6 +78,11 @@ class FakeBankPlugin(BankPlugin):
|
|||
def __init__(self, config=None):
|
||||
super(FakeBankPlugin, self).__init__(config=config)
|
||||
self._objects = {}
|
||||
fake_bank_opts = [
|
||||
cfg.StrOpt('fake_host'),
|
||||
]
|
||||
config.register_opts(fake_bank_opts, 'fake_bank')
|
||||
self.fake_host = config['fake_bank']['fake_host']
|
||||
|
||||
def create_object(self, key, value):
|
||||
self._objects[key] = value
|
||||
|
@ -128,35 +138,57 @@ class FakeProtectablePlugin(object):
|
|||
pass
|
||||
|
||||
|
||||
class FakeProtectionPlugin(object):
|
||||
def __init__(self, expected_event_stream):
|
||||
self._expected_event_stream = expected_event_stream
|
||||
class MockOperation(protection_plugin.Operation):
|
||||
def __init__(self):
|
||||
super(MockOperation, self).__init__()
|
||||
for hook_name in resource_flow.HOOKS:
|
||||
setattr(self, hook_name, mock.Mock())
|
||||
|
||||
def on_resource_start(self, context):
|
||||
resource = context.node.value
|
||||
workflow_engine = context.workflow_engine
|
||||
task_flow = context.task_flow
|
||||
if self._expected_event_stream.pop(0) != (
|
||||
"on_resource_start",
|
||||
resource.id,
|
||||
context.is_first_visited):
|
||||
raise Exception
|
||||
if context.is_first_visited and workflow_engine:
|
||||
task = workflow_engine.create_task("fake_task")
|
||||
workflow_engine.add_tasks(task_flow, task)
|
||||
|
||||
def on_resource_end(self, context):
|
||||
resource = context.node.value
|
||||
if self._expected_event_stream.pop(0) != (
|
||||
"on_resource_end",
|
||||
resource.id):
|
||||
raise Exception
|
||||
class FakeProtectionPlugin(protection_plugin.ProtectionPlugin):
|
||||
SUPPORTED_RESOURCES = [
|
||||
'Test::ResourceA',
|
||||
'Test::ResourceB',
|
||||
'Test::ResourceC',
|
||||
]
|
||||
|
||||
def get_resource_stats(self):
|
||||
pass
|
||||
def __init__(self, config=None, *args, **kwargs):
|
||||
super(FakeProtectionPlugin, self).__init__(config)
|
||||
fake_plugin_opts = [
|
||||
cfg.StrOpt('fake_user'),
|
||||
]
|
||||
if config:
|
||||
config.register_opts(fake_plugin_opts, 'fake_plugin')
|
||||
self.fake_user = config['fake_plugin']['fake_user']
|
||||
|
||||
def get_supported_resources_types(self):
|
||||
return ["fake"]
|
||||
def get_protect_operation(self, *args, **kwargs):
|
||||
return MockOperation()
|
||||
|
||||
def get_restore_operation(self, *args, **kwargs):
|
||||
return MockOperation()
|
||||
|
||||
def get_delete_operation(self, *args, **kwargs):
|
||||
return MockOperation()
|
||||
|
||||
@classmethod
|
||||
def get_supported_resources_types(cls):
|
||||
return cls.SUPPORTED_RESOURCES
|
||||
|
||||
@classmethod
|
||||
def get_options_schema(cls, resource_type):
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
def get_saved_info_schema(cls, resource_type):
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
def get_restore_schema(cls, resource_type):
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
def get_saved_info(cls, metadata_store, resource):
|
||||
return None
|
||||
|
||||
|
||||
class FakeCheckpoint(object):
|
||||
|
@ -199,12 +231,10 @@ class FakeProvider(provider.PluggableProtectionProvider):
|
|||
self._name = 'provider'
|
||||
self._description = 'fake_provider'
|
||||
self._extend_info_schema = {}
|
||||
|
||||
def build_task_flow(self, plan):
|
||||
status_getters = []
|
||||
return {'status_getters': status_getters,
|
||||
'task_flow': graph_flow.Flow('fake_flow')
|
||||
}
|
||||
self._config = None
|
||||
self._plugin_map = {
|
||||
'fake': FakeProtectionPlugin,
|
||||
}
|
||||
|
||||
def get_checkpoint_collection(self):
|
||||
return FakeCheckpointCollection()
|
||||
|
@ -214,11 +244,32 @@ class FakeFlowEngine(object):
|
|||
def __init__(self):
|
||||
super(FakeFlowEngine, self).__init__()
|
||||
|
||||
def create_task(self, function, requires=None, provides=None,
|
||||
inject=None, **kwargs):
|
||||
name = kwargs.get('name', None)
|
||||
auto_extract = kwargs.get('auto_extract', True)
|
||||
rebind = kwargs.get('rebind', None)
|
||||
revert = kwargs.get('revert', None)
|
||||
version = kwargs.get('version', None)
|
||||
if function:
|
||||
return task.FunctorTask(function,
|
||||
name=name,
|
||||
provides=provides,
|
||||
requires=requires,
|
||||
auto_extract=auto_extract,
|
||||
rebind=rebind,
|
||||
revert=revert,
|
||||
version=version,
|
||||
inject=inject)
|
||||
|
||||
def add_tasks(self, flow, *nodes, **kwargs):
|
||||
if flow is None:
|
||||
LOG.error(_LE("The flow is None, get it first"))
|
||||
flow.add(*nodes, **kwargs)
|
||||
|
||||
def link_task(self, flow, u, v):
|
||||
flow.link(u, v)
|
||||
|
||||
def build_flow(self, flow_name, flow_type='graph'):
|
||||
if flow_type == 'linear':
|
||||
return linear_flow.Flow(flow_name)
|
||||
|
|
|
@ -1,156 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor.services.protection.graph import build_graph
|
||||
from karbor.services.protection.protectable_registry import ProtectableRegistry
|
||||
from karbor.services.protection.provider import ProviderRegistry
|
||||
|
||||
from karbor.tests import base
|
||||
from karbor.tests.unit.protection.fakes import fake_protection_plan
|
||||
from karbor.tests.unit.protection.fakes import fake_restore
|
||||
from karbor.tests.unit.protection.fakes import FakeCheckpoint
|
||||
from karbor.tests.unit.protection.fakes import FakeProtectionPlugin
|
||||
from karbor.tests.unit.protection.fakes import plan_resources
|
||||
from karbor.tests.unit.protection.fakes import resource_map
|
||||
|
||||
|
||||
class FakeWorkflowEngine(object):
|
||||
def build_flow(self, flow_name):
|
||||
self.flow_name = flow_name
|
||||
self.task_flow = []
|
||||
return self.task_flow
|
||||
|
||||
def add_tasks(self, task_flow, task):
|
||||
task_flow.append(task)
|
||||
|
||||
def create_task(self, func, **kwargs):
|
||||
return "fake_task"
|
||||
|
||||
|
||||
class FakeProtectableRegistry(object):
|
||||
def fetch_dependent_resources(self, resource):
|
||||
return resource_map.__getitem__(resource)
|
||||
|
||||
def build_graph(self, context, resources):
|
||||
return build_graph(
|
||||
start_nodes=resources,
|
||||
get_child_nodes_func=self.fetch_dependent_resources,
|
||||
)
|
||||
|
||||
|
||||
class PluggableProtectionProviderTest(base.TestCase):
|
||||
def setUp(self):
|
||||
super(PluggableProtectionProviderTest, self).setUp()
|
||||
|
||||
@mock.patch.object(ProtectableRegistry, 'build_graph')
|
||||
def test_build_protect_task_flow(self, mock_build_graph):
|
||||
pr = ProviderRegistry()
|
||||
self.assertEqual(len(pr.providers), 1)
|
||||
|
||||
plugable_provider = pr.providers["fake_id1"]
|
||||
cntxt = "fake_cntxt"
|
||||
plan = fake_protection_plan()
|
||||
workflow_engine = FakeWorkflowEngine()
|
||||
operation = constants.OPERATION_PROTECT
|
||||
|
||||
ctx = {"context": cntxt,
|
||||
"plan": plan,
|
||||
"workflow_engine": workflow_engine,
|
||||
"operation_type": operation,
|
||||
}
|
||||
|
||||
expected_calls = [
|
||||
("on_resource_start", 'A', True),
|
||||
("on_resource_start", 'C', True),
|
||||
("on_resource_start", 'D', True),
|
||||
("on_resource_end", 'D'),
|
||||
("on_resource_start", 'E', True),
|
||||
("on_resource_end", 'E'),
|
||||
("on_resource_end", 'C'),
|
||||
("on_resource_end", 'A'),
|
||||
("on_resource_start", 'B', True),
|
||||
("on_resource_start", 'C', False),
|
||||
("on_resource_start", 'D', False),
|
||||
("on_resource_end", 'D'),
|
||||
("on_resource_start", 'E', False),
|
||||
("on_resource_end", 'E'),
|
||||
("on_resource_end", 'C'),
|
||||
("on_resource_end", 'B'),
|
||||
]
|
||||
|
||||
fake_registry = FakeProtectableRegistry()
|
||||
plugable_provider.protectable_registry = fake_registry
|
||||
|
||||
fake_registry.build_graph = mock.MagicMock()
|
||||
resource_graph = build_graph(plan_resources, resource_map.__getitem__)
|
||||
mock_build_graph.return_value = resource_graph
|
||||
|
||||
fake_protection_plugin = FakeProtectionPlugin(expected_calls)
|
||||
plugable_provider._plugin_map = {
|
||||
"fake_plugin": fake_protection_plugin
|
||||
}
|
||||
|
||||
result = plugable_provider.build_task_flow(ctx)
|
||||
self.assertEqual(len(result["status_getters"]), 5)
|
||||
self.assertEqual(len(result["task_flow"]), 5)
|
||||
|
||||
def test_build_restore_task_flow(self):
|
||||
pr = ProviderRegistry()
|
||||
self.assertEqual(len(pr.providers), 1)
|
||||
|
||||
plugable_provider = pr.providers["fake_id1"]
|
||||
cntxt = "fake_cntxt"
|
||||
restore = fake_restore()
|
||||
checkpoint = FakeCheckpoint()
|
||||
workflow_engine = FakeWorkflowEngine()
|
||||
operation = constants.OPERATION_RESTORE
|
||||
|
||||
ctx = {"context": cntxt,
|
||||
"restore": restore,
|
||||
"workflow_engine": workflow_engine,
|
||||
"operation_type": operation,
|
||||
"checkpoint": checkpoint,
|
||||
"heat_template": "heat_template"
|
||||
}
|
||||
|
||||
expected_calls = [
|
||||
("on_resource_start", 'A', True),
|
||||
("on_resource_start", 'C', True),
|
||||
("on_resource_start", 'D', True),
|
||||
("on_resource_end", 'D'),
|
||||
("on_resource_start", 'E', True),
|
||||
("on_resource_end", 'E'),
|
||||
("on_resource_end", 'C'),
|
||||
("on_resource_end", 'A'),
|
||||
("on_resource_start", 'B', True),
|
||||
("on_resource_start", 'C', False),
|
||||
("on_resource_start", 'D', False),
|
||||
("on_resource_end", 'D'),
|
||||
("on_resource_start", 'E', False),
|
||||
("on_resource_end", 'E'),
|
||||
("on_resource_end", 'C'),
|
||||
("on_resource_end", 'B'),
|
||||
]
|
||||
|
||||
fake_protection_plugin = FakeProtectionPlugin(expected_calls)
|
||||
plugable_provider._plugin_map = {
|
||||
"fake_plugin": fake_protection_plugin
|
||||
}
|
||||
|
||||
result = plugable_provider.build_task_flow(ctx)
|
||||
self.assertEqual(len(result["task_flow"]), 5)
|
||||
|
||||
def tearDown(self):
|
||||
super(PluggableProtectionProviderTest, self).tearDown()
|
|
@ -88,15 +88,6 @@ class CinderProtectionPluginTest(base.TestCase):
|
|||
self.cinder_client = ClientFactory.create_client("cinder", self.cntxt)
|
||||
self.checkpoint = FakeCheckpoint()
|
||||
|
||||
def test_get_resource_stats(self):
|
||||
fake_resource_id = "123"
|
||||
fake_bank_section.get_object = mock.MagicMock()
|
||||
fake_bank_section.get_object.return_value = \
|
||||
constants.RESOURCE_STATUS_AVAILABLE
|
||||
status = self.plugin.get_resource_stats(self.checkpoint,
|
||||
fake_resource_id)
|
||||
self.assertEqual(status, constants.RESOURCE_STATUS_AVAILABLE)
|
||||
|
||||
def test_get_options_schema(self):
|
||||
options_schema = self.plugin.get_options_schema(
|
||||
'OS::Cinder::Volume')
|
||||
|
|
|
@ -87,16 +87,6 @@ class GlanceProtectionPluginTest(base.TestCase):
|
|||
self.glance_client = ClientFactory.create_client("glance", self.cntxt)
|
||||
self.checkpoint = CheckpointCollection()
|
||||
|
||||
def test_get_resource_stats(self):
|
||||
fake_resource_id = "123"
|
||||
|
||||
fake_bank_section.get_object = mock.MagicMock()
|
||||
fake_bank_section.get_object.return_value = \
|
||||
constants.RESOURCE_STATUS_PROTECTING
|
||||
status = self.plugin.get_resource_stats(self.checkpoint,
|
||||
fake_resource_id)
|
||||
self.assertEqual(status, constants.RESOURCE_STATUS_PROTECTING)
|
||||
|
||||
def test_get_options_schema(self):
|
||||
options_schema = self.plugin.get_options_schema(
|
||||
constants.IMAGE_RESOURCE_TYPE)
|
||||
|
|
|
@ -146,7 +146,8 @@ class ProtectionServiceTest(base.TestCase):
|
|||
mock_cp_collection_get):
|
||||
mock_provider.return_value = fakes.FakeProvider()
|
||||
context = mock.MagicMock()
|
||||
mock_cp_collection_get.side_effect = exception.CheckpointNotFound()
|
||||
mock_cp_collection_get.side_effect = exception.CheckpointNotFound(
|
||||
checkpoint_id='123')
|
||||
self.assertRaises(oslo_messaging.ExpectedException,
|
||||
self.pro_manager.show_checkpoint,
|
||||
context,
|
||||
|
|
|
@ -10,25 +10,39 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from karbor import exception
|
||||
from karbor.services.protection import provider
|
||||
from karbor.tests import base
|
||||
import mock
|
||||
|
||||
from karbor.resource import Resource
|
||||
from karbor.services.protection import provider
|
||||
from karbor.tests import base
|
||||
from karbor.tests.unit.protection import fakes
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
(parent_type, child_type, grandchild_type) = \
|
||||
fakes.FakeProtectionPlugin.SUPPORTED_RESOURCES
|
||||
|
||||
parent = Resource(id='A1', name='parent', type=parent_type)
|
||||
child = Resource(id='B1', name='child', type=child_type)
|
||||
grandchild = Resource(id='C1', name='grandchild', type=grandchild_type)
|
||||
resource_graph = {
|
||||
parent: [child],
|
||||
child: [grandchild],
|
||||
grandchild: [],
|
||||
}
|
||||
|
||||
|
||||
class ProviderRegistryTest(base.TestCase):
|
||||
def setUp(self):
|
||||
super(ProviderRegistryTest, self).setUp()
|
||||
|
||||
@mock.patch.object(provider.PluggableProtectionProvider, '_load_bank')
|
||||
@mock.patch.object(provider.PluggableProtectionProvider, '_load_plugin')
|
||||
def test_load_providers(self, mock_load_bank, mock_load_plugin):
|
||||
@mock.patch.object(provider.PluggableProtectionProvider,
|
||||
'_register_plugin')
|
||||
def test_load_providers(self, mock_load_bank, mock_register_plugin):
|
||||
pr = provider.ProviderRegistry()
|
||||
self.assertEqual(mock_load_plugin.call_count, 1)
|
||||
self.assertEqual(mock_register_plugin.call_count, 1)
|
||||
self.assertEqual(mock_load_bank.call_count, 1)
|
||||
self.assertEqual(len(pr.providers), 1)
|
||||
|
||||
|
@ -38,16 +52,13 @@ class ProviderRegistryTest(base.TestCase):
|
|||
def test_provider_bank_config(self):
|
||||
pr = provider.ProviderRegistry()
|
||||
provider1 = pr.show_provider('fake_id1')
|
||||
self.assertEqual(provider1.bank._plugin._config.fake_bank.fake_host,
|
||||
'thor')
|
||||
self.assertEqual(provider1.bank._plugin.fake_host, 'thor')
|
||||
|
||||
def test_provider_plugin_config(self):
|
||||
pr = provider.ProviderRegistry()
|
||||
provider1 = pr.show_provider('fake_id1')
|
||||
plugin_name = 'karbor.tests.unit.fake_protection.FakeProtectionPlugin'
|
||||
self.assertEqual(
|
||||
provider1.plugins[plugin_name]._config.fake_plugin.fake_user,
|
||||
'user')
|
||||
plugins = provider1.load_plugins()
|
||||
self.assertEqual(plugins['Test::ResourceA'].fake_user, 'user')
|
||||
|
||||
def test_list_provider(self):
|
||||
pr = provider.ProviderRegistry()
|
||||
|
@ -58,8 +69,3 @@ class ProviderRegistryTest(base.TestCase):
|
|||
provider_list = pr.list_providers()
|
||||
for provider_node in provider_list:
|
||||
self.assertTrue(pr.show_provider(provider_node['id']))
|
||||
|
||||
def test_show_non_existent_provider(self):
|
||||
pr = provider.ProviderRegistry()
|
||||
self.assertRaises(exception.ProviderNotFound, pr.show_provider,
|
||||
'garbage')
|
||||
|
|
|
@ -0,0 +1,199 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from functools import partial
|
||||
import mock
|
||||
|
||||
from karbor.common import constants
|
||||
from karbor.resource import Resource
|
||||
from karbor.services.protection.flows.workflow import TaskFlowEngine
|
||||
from karbor.services.protection import graph
|
||||
from karbor.services.protection import resource_flow
|
||||
from karbor.services.protection import restore_heat
|
||||
from karbor.tests import base
|
||||
from karbor.tests.unit.protection import fakes
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
(parent_type, child_type, grandchild_type) = \
|
||||
fakes.FakeProtectionPlugin.SUPPORTED_RESOURCES
|
||||
|
||||
parent = Resource(id='A1', name='parent', type=parent_type)
|
||||
child = Resource(id='B1', name='child', type=child_type)
|
||||
grandchild = Resource(id='C1', name='grandchild', type=grandchild_type)
|
||||
|
||||
|
||||
class ResourceFlowTest(base.TestCase):
|
||||
def setUp(self):
|
||||
super(ResourceFlowTest, self).setUp()
|
||||
|
||||
self.resource_graph = {
|
||||
parent: [child],
|
||||
child: [grandchild],
|
||||
grandchild: [],
|
||||
}
|
||||
|
||||
self.provider = fakes.FakeProvider()
|
||||
self.test_graph = graph.build_graph([parent],
|
||||
self.resource_graph.__getitem__)
|
||||
self.taskflow_engine = TaskFlowEngine()
|
||||
|
||||
def _walk_operation(self, protection, operation_type,
|
||||
checkpoint='checkpoint', parameters={}, context=None,
|
||||
heat_template=None, **kwargs):
|
||||
plugin_map = {
|
||||
parent_type: protection,
|
||||
child_type: protection,
|
||||
grandchild_type: protection,
|
||||
}
|
||||
flow = resource_flow.build_resource_flow(operation_type,
|
||||
context,
|
||||
self.taskflow_engine,
|
||||
plugin_map,
|
||||
self.test_graph,
|
||||
parameters)
|
||||
|
||||
store = {
|
||||
'checkpoint': checkpoint
|
||||
}
|
||||
if heat_template:
|
||||
store['heat_template'] = heat_template
|
||||
|
||||
engine = self.taskflow_engine.get_engine(flow,
|
||||
engine='parallel',
|
||||
store=store)
|
||||
self.taskflow_engine.run_engine(engine)
|
||||
|
||||
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
|
||||
def test_resource_no_impl(self, mock_protection):
|
||||
for operation in constants.OPERATION_TYPES:
|
||||
heat_template = restore_heat.HeatTemplate() \
|
||||
if operation == constants.OPERATION_RESTORE else None
|
||||
self._walk_operation(mock_protection, operation,
|
||||
heat_template=heat_template)
|
||||
|
||||
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
|
||||
def test_resource_flow_callbacks(self, mock_protection):
|
||||
for operation in constants.OPERATION_TYPES:
|
||||
mock_operation = fakes.MockOperation()
|
||||
get_operation_attr = 'get_{}_operation'.format(operation)
|
||||
getattr(mock_protection, get_operation_attr).return_value = \
|
||||
mock_operation
|
||||
|
||||
heat_template = restore_heat.HeatTemplate() \
|
||||
if operation == constants.OPERATION_RESTORE else None
|
||||
|
||||
self._walk_operation(mock_protection, operation,
|
||||
heat_template=heat_template)
|
||||
self.assertEqual(mock_operation.on_prepare_begin.call_count,
|
||||
len(self.resource_graph))
|
||||
self.assertEqual(mock_operation.on_prepare_finish.call_count,
|
||||
len(self.resource_graph))
|
||||
self.assertEqual(mock_operation.on_main.call_count,
|
||||
len(self.resource_graph))
|
||||
self.assertEqual(mock_operation.on_complete.call_count,
|
||||
len(self.resource_graph))
|
||||
|
||||
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
|
||||
def test_resource_flow_parameters(self, mock_protection):
|
||||
resource_a1_id = "{}#{}".format(parent_type, 'A1')
|
||||
resource_b1_id = "{}#{}".format(child_type, 'B1')
|
||||
parameters = {
|
||||
resource_a1_id: {'option1': 'value1'},
|
||||
resource_b1_id: {'option2': 'value2', 'option3': 'value3'},
|
||||
parent_type: {'option4': 'value4'},
|
||||
child_type: {'option3': 'value5'}
|
||||
}
|
||||
|
||||
for operation in constants.OPERATION_TYPES:
|
||||
mock_operation = fakes.MockOperation()
|
||||
get_operation_attr = 'get_{}_operation'.format(operation)
|
||||
getattr(mock_protection, get_operation_attr).return_value = \
|
||||
mock_operation
|
||||
|
||||
kwargs = {
|
||||
'checkpoint': 'A',
|
||||
'context': 'B',
|
||||
}
|
||||
|
||||
if operation == constants.OPERATION_RESTORE:
|
||||
kwargs['heat_template'] = restore_heat.HeatTemplate()
|
||||
|
||||
self._walk_operation(mock_protection, operation,
|
||||
parameters=parameters, **kwargs)
|
||||
|
||||
for resource in self.resource_graph:
|
||||
resource_params = parameters.get(resource.type, {})
|
||||
resource_id = '{}#{}'.format(resource.type, resource.id)
|
||||
resource_params.update(parameters.get(resource_id, {}))
|
||||
mock_operation.on_prepare_begin.assert_any_call(
|
||||
resource=resource,
|
||||
parameters=resource_params,
|
||||
**kwargs)
|
||||
mock_operation.on_prepare_finish.assert_any_call(
|
||||
resource=resource,
|
||||
parameters=resource_params,
|
||||
**kwargs)
|
||||
mock_operation.on_main.assert_any_call(
|
||||
resource=resource,
|
||||
parameters=resource_params,
|
||||
**kwargs)
|
||||
mock_operation.on_complete.assert_any_call(
|
||||
resource=resource,
|
||||
parameters=resource_params,
|
||||
**kwargs)
|
||||
|
||||
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
|
||||
def test_resource_flow_order(self, mock_protection):
|
||||
def test_order(order_list, hook_type, resource, *args, **kwargs):
|
||||
order_list.append((hook_type, resource.id))
|
||||
|
||||
operation = constants.OPERATION_PROTECT
|
||||
mock_operation = fakes.MockOperation()
|
||||
get_operation_attr = 'get_{}_operation'.format(operation)
|
||||
getattr(mock_protection, get_operation_attr).return_value = \
|
||||
mock_operation
|
||||
|
||||
order_list = []
|
||||
mock_operation.on_prepare_begin = partial(test_order, order_list,
|
||||
'pre_begin')
|
||||
mock_operation.on_prepare_finish = partial(test_order, order_list,
|
||||
'pre_finish')
|
||||
mock_operation.on_main = partial(test_order, order_list, 'main')
|
||||
mock_operation.on_complete = partial(test_order, order_list,
|
||||
'complete')
|
||||
|
||||
self._walk_operation(mock_protection, operation)
|
||||
|
||||
self.assertTrue(order_list.index(('pre_begin', parent.id)) <
|
||||
order_list.index(('pre_begin', child.id)))
|
||||
self.assertTrue(order_list.index(('pre_begin', child.id)) <
|
||||
order_list.index(('pre_begin', grandchild.id)))
|
||||
|
||||
self.assertTrue(order_list.index(('pre_finish', parent.id)) >
|
||||
order_list.index(('pre_finish', child.id)))
|
||||
self.assertTrue(order_list.index(('pre_finish', child.id)) >
|
||||
order_list.index(('pre_finish', grandchild.id)))
|
||||
|
||||
self.assertTrue(order_list.index(('complete', parent.id)) >
|
||||
order_list.index(('complete', child.id)))
|
||||
self.assertTrue(order_list.index(('complete', child.id)) >
|
||||
order_list.index(('complete', grandchild.id)))
|
||||
|
||||
for resource_id in (parent.id, child.id, grandchild.id):
|
||||
self.assertTrue(order_list.index(('pre_begin', resource_id)) <
|
||||
order_list.index(('pre_finish', resource_id)))
|
||||
self.assertTrue(order_list.index(('pre_finish', resource_id)) <
|
||||
order_list.index(('main', resource_id)))
|
||||
self.assertTrue(order_list.index(('main', resource_id)) <
|
||||
order_list.index(('complete', resource_id)))
|
|
@ -1,71 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import karbor.services.protection.graph as graph
|
||||
from karbor.services.protection.resource_graph import ResourceGraphContext
|
||||
from karbor.services.protection.resource_graph \
|
||||
import ResourceGraphWalkerListener
|
||||
|
||||
from karbor.tests import base
|
||||
from karbor.tests.unit.protection.fakes import FakeProtectionPlugin
|
||||
from karbor.tests.unit.protection.fakes import plan_resources
|
||||
from karbor.tests.unit.protection.fakes import resource_graph
|
||||
from karbor.tests.unit.protection.fakes import resource_map
|
||||
|
||||
|
||||
class ResourceGraphWalkerListenerTest(base.TestCase):
|
||||
def test_resource_graph_walker_listener_plan(self):
|
||||
expected_calls = [
|
||||
("on_resource_start", 'A', True),
|
||||
("on_resource_start", 'C', True),
|
||||
("on_resource_start", 'D', True),
|
||||
("on_resource_end", 'D'),
|
||||
("on_resource_start", 'E', True),
|
||||
("on_resource_end", 'E'),
|
||||
("on_resource_end", 'C'),
|
||||
("on_resource_end", 'A'),
|
||||
("on_resource_start", 'B', True),
|
||||
("on_resource_start", 'C', False),
|
||||
("on_resource_start", 'D', False),
|
||||
("on_resource_end", 'D'),
|
||||
("on_resource_start", 'E', False),
|
||||
("on_resource_end", 'E'),
|
||||
("on_resource_end", 'C'),
|
||||
("on_resource_end", 'B'),
|
||||
]
|
||||
|
||||
fake_cntxt = "fake_cntxt"
|
||||
fake_protection_plugin = FakeProtectionPlugin(expected_calls)
|
||||
fake_context = ResourceGraphContext(
|
||||
fake_cntxt,
|
||||
plugin_map={"fake_plugin": fake_protection_plugin})
|
||||
listener = ResourceGraphWalkerListener(fake_context)
|
||||
|
||||
walker = graph.GraphWalker()
|
||||
walker.register_listener(listener)
|
||||
walker.walk_graph(graph.build_graph(plan_resources,
|
||||
resource_map.__getitem__))
|
||||
self.assertEqual(len(listener.context.status_getters), 5)
|
||||
|
||||
def tearDown(self):
|
||||
super(ResourceGraphWalkerListenerTest, self).tearDown()
|
||||
|
||||
|
||||
class SerializeResourceGraphTest(base.TestCase):
|
||||
def test_serialize_deserialize_packed_resource_graph(self):
|
||||
serialized_resource_graph = graph.serialize_resource_graph(
|
||||
resource_graph)
|
||||
deserialized_resource_graph = graph.deserialize_resource_graph(
|
||||
serialized_resource_graph)
|
||||
self.assertEqual(len(resource_graph), len(deserialized_resource_graph))
|
||||
for start_node in resource_graph:
|
||||
self.assertIn(start_node, deserialized_resource_graph)
|
|
@ -101,20 +101,24 @@ def get_bool_param(param_string, params):
|
|||
return strutils.bool_from_string(param, strict=True)
|
||||
|
||||
|
||||
def load_plugin(namespace, plugin_name, *args, **kwargs):
|
||||
def load_class(namespace, plugin_name):
|
||||
try:
|
||||
LOG.debug('Start load plugin %s. ', plugin_name)
|
||||
# Try to resolve plugin by name
|
||||
mgr = driver.DriverManager(namespace, plugin_name)
|
||||
plugin_class = mgr.driver
|
||||
return mgr.driver
|
||||
except RuntimeError as e1:
|
||||
# fallback to class name
|
||||
try:
|
||||
plugin_class = importutils.import_class(plugin_name)
|
||||
return importutils.import_class(plugin_name)
|
||||
except ImportError as e2:
|
||||
LOG.error(_LE("Error loading plugin by name, %s"), e1)
|
||||
LOG.error(_LE("Error loading plugin by class, %s"), e2)
|
||||
raise ImportError(_("Class not found."))
|
||||
|
||||
|
||||
def load_plugin(namespace, plugin_name, *args, **kwargs):
|
||||
plugin_class = load_class(namespace, plugin_name)
|
||||
return plugin_class(*args, **kwargs)
|
||||
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ karbor.protections =
|
|||
karbor-volume-protection-plugin = karbor.services.protection.protection_plugins.volume.cinder_protection_plugin:CinderProtectionPlugin
|
||||
karbor-image-protection-plugin = karbor.services.protection.protection_plugins.image.image_protection_plugin:GlanceProtectionPlugin
|
||||
karbor-server-protection-plugin = karbor.services.protection.protection_plugins.server.nova_protection_plugin:NovaProtectionPlugin
|
||||
karbor-noop-protection-plugin = karbor.services.protection.protection_plugins.noop_plugin:NoopProtectionPlugin
|
||||
karbor.provider =
|
||||
provider-registry = karbor.services.protection.provider:ProviderRegistry
|
||||
karbor.protectables =
|
||||
|
|
Loading…
Reference in New Issue