Merge "Stabilize Protection Plugin API"

This commit is contained in:
Jenkins 2016-12-20 04:42:58 +00:00 committed by Gerrit Code Review
commit dc7aefed5d
38 changed files with 1005 additions and 895 deletions

19
etc/providers.d/noop.conf Normal file
View File

@ -0,0 +1,19 @@
[provider]
name = Noop
description = This provider uses a Noop Protection Plugin for every resource
id = b766f37c-d011-4026-8228-28730d734a3f
plugin=karbor-noop-protection-plugin
bank=karbor-swift-bank-plugin
[swift_client]
swift_auth_url=http://127.0.0.1:5000/v2.0/
swift_auth_version=2
swift_user=admin
swift_key=password
swift_tenant_name=admin
[swift_bank_plugin]
lease_expire_window=120
lease_renew_window=100
lease_validity_window=100

View File

@ -390,12 +390,12 @@ class ProvidersController(wsgi.Controller):
"resources": plan.get("resources"),
}
}
checkpoint = self.protection_api.protect(context, plan)
if checkpoint is not None:
checkpoint_properties['id'] = checkpoint.get('checkpoint_id')
else:
msg = _("Get checkpoint failed.")
raise exc.HTTPNotFound(explanation=msg)
try:
checkpoint_id = self.protection_api.protect(context, plan)
except Exception as error:
msg = _("Create checkpoint failed: %s") % error.msg
raise exc.HTTPBadRequest(explanation=msg)
checkpoint_properties['id'] = checkpoint_id
returnval = self._checkpoint_view_builder.detail(
req, checkpoint_properties)

View File

@ -264,7 +264,7 @@ class RestoresController(wsgi.Controller):
'checkpoint_id': restore.get('checkpoint_id'),
'restore_target': restore.get('restore_target'),
'parameters': parameters,
'status': constants.RESOURCE_STATUS_STARTED,
'status': constants.RESTORE_STATUS_IN_PROGRESS,
}
restoreobj = objects.Restore(context=context,
@ -276,10 +276,9 @@ class RestoresController(wsgi.Controller):
try:
self.protection_api.restore(context, restoreobj, restore_auth)
except Exception:
status_update = constants.OPERATION_EXE_STATE_FAILED
# update the status of restore
update_dict = {
"status": status_update
"status": constants.RESTORE_STATUS_FAILURE
}
check_policy(context, 'update', restoreobj)
restoreobj = self._restore_update(context,

View File

@ -75,3 +75,7 @@ OPERATION_EXE_STATE_IN_PROGRESS = 'in_progress'
OPERATION_EXE_STATE_SUCCESS = 'success'
OPERATION_EXE_STATE_FAILED = 'failed'
OPERATION_EXE_STATE_DROPPED_OUT_OF_WINDOW = 'dropped_out_of_window'
RESTORE_STATUS_SUCCESS = 'success'
RESTORE_STATUS_FAILURE = 'fail'
RESTORE_STATUS_IN_PROGRESS = 'in_progress'

View File

@ -262,6 +262,11 @@ class ProtectableTypeNotFound(NotFound):
" not be found.")
class ProtectionPluginNotFound(NotFound):
message = _("Protection Plugin for %(type)s could"
" not be found.")
class ProviderNotFound(NotFound):
message = _("Provider %(provider_id)s could"
" not be found.")

View File

@ -151,7 +151,7 @@ class Checkpoint(object):
value={
"version": cls.VERSION,
"id": checkpoint_id,
"status": "protecting",
"status": constants.CHECKPOINT_STATUS_PROTECTING,
"owner_id": owner_id,
"provider_id": plan.get("provider_id"),
"project_id": plan.get("project_id"),

View File

@ -11,94 +11,66 @@
# under the License.
from karbor.common import constants
from karbor.i18n import _LI
from oslo_config import cfg
from karbor.resource import Resource
from karbor.services.protection.protectable_registry import ProtectableRegistry
from karbor.services.protection import resource_flow
from oslo_log import log as logging
from oslo_service import loopingcall
from taskflow import task
sync_status_opts = [
cfg.IntOpt('sync_status_interval',
default=60,
help='update protection status interval')
]
CONF = cfg.CONF
CONF.register_opts(sync_status_opts)
LOG = logging.getLogger(__name__)
class CreateCheckpointTask(task.Task):
def __init__(self, plan, provider, resource_graph):
provides = 'checkpoint'
super(CreateCheckpointTask, self).__init__(provides=provides)
self._plan = plan
self._provider = provider
self._resource_graph = resource_graph
class InitiateProtectTask(task.Task):
def __init__(self):
super(InitiateProtectTask, self).__init__()
def execute(self):
checkpoint_collection = self._provider.get_checkpoint_collection()
checkpoint = checkpoint_collection.create(self._plan)
checkpoint.resource_graph = self._resource_graph
def execute(self, checkpoint, *args, **kwargs):
LOG.debug("Initiate protect checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_PROTECTING
checkpoint.commit()
def revert(self, checkpoint, *args, **kwargs):
LOG.debug("Failed to protect checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR
checkpoint.commit()
return checkpoint
class SyncCheckpointStatusTask(task.Task):
def __init__(self, status_getters):
requires = ['checkpoint']
super(SyncCheckpointStatusTask, self).__init__(requires=requires)
self._status_getters = status_getters
class CompleteProtectTask(task.Task):
def __init__(self):
super(CompleteProtectTask, self).__init__()
def execute(self, checkpoint):
LOG.info(_LI("Start sync checkpoint status,checkpoint_id: %s"),
checkpoint.id)
sync_status = loopingcall.FixedIntervalLoopingCall(
self._sync_status, checkpoint, self._status_getters)
sync_status.start(interval=CONF.sync_status_interval)
def _sync_status(self, checkpoint, status_getters):
statuses = set()
for s in status_getters:
resource_id = s.get('resource_id')
get_resource_stats = s.get('get_resource_stats')
statuses.add(get_resource_stats(checkpoint, resource_id))
if constants.RESOURCE_STATUS_ERROR in statuses:
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR
checkpoint.commit()
elif constants.RESOURCE_STATUS_PROTECTING in statuses:
checkpoint.status = constants.CHECKPOINT_STATUS_PROTECTING
checkpoint.commit()
elif constants.RESOURCE_STATUS_UNDEFINED in statuses:
checkpoint.status = constants.CHECKPOINT_STATUS_PROTECTING
checkpoint.commit()
else:
checkpoint.status = constants.CHECKPOINT_STATUS_AVAILABLE
checkpoint.commit()
LOG.info(_LI("Stop sync checkpoint status,checkpoint_id: "
"%(checkpoint_id)s,checkpoint status: "
"%(checkpoint_status)s"),
{"checkpoint_id": checkpoint.id,
"checkpoint_status": checkpoint.status})
raise loopingcall.LoopingCallDone()
LOG.debug("Complete protect checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_AVAILABLE
checkpoint.commit()
def get_flow(context, workflow_engine, operation_type, plan, provider):
ctx = {'context': context,
'plan': plan,
'workflow_engine': workflow_engine,
'operation_type': operation_type}
flow_name = "create_protection_" + plan.get('id')
def get_flow(context, workflow_engine, operation_type, plan, provider,
checkpoint):
protectable_registry = ProtectableRegistry()
resources = set(Resource(**item) for item in plan.get("resources"))
resource_graph = protectable_registry.build_graph(context,
resources)
checkpoint.resource_graph = resource_graph
checkpoint.commit()
flow_name = "Protect_" + plan.get('id')
protection_flow = workflow_engine.build_flow(flow_name, 'linear')
result = provider.build_task_flow(ctx)
status_getters = result.get('status_getters')
resource_flow = result.get('task_flow')
resource_graph = result.get('resource_graph')
workflow_engine.add_tasks(protection_flow,
CreateCheckpointTask(plan, provider,
resource_graph),
resource_flow,
SyncCheckpointStatusTask(status_getters))
flow_engine = workflow_engine.get_engine(protection_flow)
plugins = provider.load_plugins()
resources_task_flow = resource_flow.build_resource_flow(
operation_type=operation_type,
context=context,
workflow_engine=workflow_engine,
resource_graph=resource_graph,
plugins=plugins,
parameters=plan.get('parameters'),
)
workflow_engine.add_tasks(
protection_flow,
InitiateProtectTask(),
resources_task_flow,
CompleteProtectTask(),
)
flow_engine = workflow_engine.get_engine(protection_flow, store={
'checkpoint': checkpoint
})
return flow_engine

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import loopingcall
@ -17,13 +18,14 @@ from oslo_utils import uuidutils
from karbor.common import constants
from karbor.i18n import _LE, _LI
from karbor.services.protection.client_factory import ClientFactory
from karbor.services.protection.restore_heat import HeatTemplate
from karbor.services.protection import client_factory
from karbor.services.protection import resource_flow
from karbor.services.protection import restore_heat
from taskflow import task
sync_status_opts = [
cfg.IntOpt('sync_status_interval',
default=60,
default=20,
help='update protection status interval')
]
@ -33,106 +35,137 @@ CONF.register_opts(sync_status_opts)
LOG = logging.getLogger(__name__)
class CreateStackTask(task.Task):
def __init__(self, heat_client, template):
provides = 'stack_id'
super(CreateStackTask, self).__init__(provides=provides)
self._heat_client = heat_client
self._template = template
class InitiateRestoreTask(task.Task):
def __init__(self):
super(InitiateRestoreTask, self).__init__()
def execute(self):
def execute(self, restore, *args, **kwargs):
LOG.debug("Initiate restore restore_id: %s", restore.id)
restore['status'] = constants.RESTORE_STATUS_IN_PROGRESS
restore.save()
def revert(self, restore, *args, **kwargs):
LOG.debug("Failed to restore restore_id: %s", restore.id)
restore['status'] = constants.RESTORE_STATUS_FAILURE
restore.save()
class CompleteRestoreTask(task.Task):
def __init__(self):
super(CompleteRestoreTask, self).__init__()
def execute(self, restore, *args, **kwargs):
LOG.debug("Complete restore restore_id: %s", restore.id)
restore['status'] = constants.RESTORE_STATUS_SUCCESS
restore.save()
class CreateHeatTask(task.Task):
default_provides = ['heat_client', 'heat_template']
def __init__(self, *args, **kwargs):
super(CreateHeatTask, self).__init__(*args, **kwargs)
def execute(self, context, heat_conf):
LOG.info(_LI('Creating Heat template. Target: "%s"')
% heat_conf.get('auth_url', '(None)'))
heat_client = client_factory.ClientFactory.create_client(
'heat', context=context, **heat_conf)
heat_template = restore_heat.HeatTemplate()
return (heat_client, heat_template)
class CreateStackTask(task.Task):
default_provides = 'stack_id'
def __init__(self, *args, **kwargs):
super(CreateStackTask, self).__init__(*args, **kwargs)
def execute(self, heat_client, heat_template):
stack_name = "restore_%s" % uuidutils.generate_uuid()
LOG.info(_LI("creating stack, stack_name:%s"), stack_name)
LOG.info(_LI('Creating Heat stack, stack_name: %s'), stack_name)
try:
body = self._heat_client.stacks.create(
body = heat_client.stacks.create(
stack_name=stack_name,
template=self._template.to_dict())
template=heat_template.to_dict())
LOG.debug('Created stack with id: %s', body['stack']['id'])
return body['stack']['id']
except Exception:
LOG.error(_LE("use heat to create stack failed"))
raise
class SyncStackStatusTask(task.Task):
def __init__(self, checkpoint, heat_client, restore):
requires = ['stack_id']
super(SyncStackStatusTask, self).__init__(requires=requires)
self._heat_client = heat_client
self._checkpoint = checkpoint
self._restore = restore
class SyncRestoreStatusTask(task.Task):
def execute(self, stack_id):
LOG.info(_LI("syncing stack status, stack_id: %s"), stack_id)
def __init__(self, *args, **kwargs):
super(SyncRestoreStatusTask, self).__init__(*args, **kwargs)
def execute(self, stack_id, heat_client):
LOG.info(_LI('Syncing Heat stack status, stack_id: %s'), stack_id)
sync_status_loop = loopingcall.FixedIntervalLoopingCall(
self._sync_status, self._checkpoint, stack_id)
self._sync_status, heat_client, stack_id)
sync_status_loop.start(interval=CONF.sync_status_interval)
sync_status_loop.wait()
def _sync_status(self, checkpoint, stack_id):
def _sync_status(self, heat_client, stack_id):
try:
stack = self._heat_client.stacks.get(stack_id)
stack_status = getattr(stack, 'stack_status')
if stack_status == 'CREATE_IN_PROGRESS':
return
if stack_status == 'CREATE_FAILED':
status = constants.OPERATION_EXE_STATE_FAILED
elif stack_status == 'CREATE_COMPLETE':
status = constants.OPERATION_EXE_STATE_SUCCESS
status_dict = {
"status": status
}
self._restore.update(status_dict)
self._restore.save()
raise loopingcall.LoopingCallDone()
stack = heat_client.stacks.get(stack_id)
except Exception:
LOG.info(_LI("stop sync stack status, stack_id: %s"), stack_id)
LOG.debug('Heat error getting stack, stack_id: %s', stack_id)
raise
stack_status = getattr(stack, 'stack_status')
if stack_status == 'CREATE_IN_PROGRESS':
LOG.debug('Heat stack status: in progress, stack_id: %s',
stack_id)
return
if stack_status == 'CREATE_COMPLETE':
LOG.info(_LI('Heat stack status: complete, stack_id: %s'),
stack_id)
else:
LOG.info(_LI('Heat stack status: failure, stack_id: %s'), stack_id)
raise
raise loopingcall.LoopingCallDone()
def get_flow(context, workflow_engine, operation_type, checkpoint, provider,
restore, restore_auth):
target = restore.get('restore_target', None)
# TODO(luobin): create a heat_client
# Initialize a heat client using current login tenant when the
# restore_target and restore_auth is not provided.
heat_conf = {}
if target is not None:
username = None
password = None
heat_conf["auth_url"] = target
if restore_auth is not None:
auth_type = restore_auth.get("type", None)
if auth_type == "password":
username = restore_auth["username"]
password = restore_auth["password"]
kwargs = {"auth_url": target,
"username": username,
"password": password}
else:
kwargs = {}
heat_client = ClientFactory.create_client("heat",
context=context,
**kwargs)
heat_conf["username"] = restore_auth["username"]
heat_conf["password"] = restore_auth["password"]
# TODO(luobin): create a heat_template
heat_template = HeatTemplate()
ctx = {'context': context,
'checkpoint': checkpoint,
'workflow_engine': workflow_engine,
'operation_type': operation_type,
'restore': restore,
'heat_template': heat_template}
flow_name = "create_restoration_" + checkpoint.id
resource_graph = checkpoint.resource_graph
parameters = restore.parameters
flow_name = "Restore_" + checkpoint.id
restoration_flow = workflow_engine.build_flow(flow_name, 'linear')
result = provider.build_task_flow(ctx)
resource_flow = result.get('task_flow')
plugins = provider.load_plugins()
resources_task_flow = resource_flow.build_resource_flow(
operation_type=operation_type,
context=context,
workflow_engine=workflow_engine,
resource_graph=resource_graph,
plugins=plugins,
parameters=parameters
)
workflow_engine.add_tasks(
restoration_flow,
resource_flow,
CreateStackTask(heat_client, heat_template),
SyncStackStatusTask(checkpoint, heat_client, restore)
InitiateRestoreTask(),
CreateHeatTask(inject={'context': context, 'heat_conf': heat_conf}),
resources_task_flow,
CreateStackTask(),
SyncRestoreStatusTask(),
CompleteRestoreTask()
)
flow_engine = workflow_engine.get_engine(restoration_flow)
flow_engine = workflow_engine.get_engine(restoration_flow,
store={'checkpoint': checkpoint,
'restore': restore})
return flow_engine

View File

@ -12,78 +12,58 @@
from karbor.common import constants
from karbor.i18n import _LI
from oslo_config import cfg
from karbor.services.protection import resource_flow
from oslo_log import log as logging
from oslo_service import loopingcall
from taskflow import task
sync_status_opts = [
cfg.IntOpt('sync_status_interval',
default=60,
help='update protection status interval')
]
CONF = cfg.CONF
CONF.register_opts(sync_status_opts)
LOG = logging.getLogger(__name__)
class SyncCheckpointStatusTask(task.Task):
def __init__(self, checkpoint, status_getters):
super(SyncCheckpointStatusTask, self).__init__()
self._status_getters = status_getters
self._checkpoint = checkpoint
class InitiateDeleteTask(task.Task):
def __init__(self):
super(InitiateDeleteTask, self).__init__()
def execute(self):
LOG.info(_LI("Start sync checkpoint status,checkpoint_id:%s"),
self._checkpoint.id)
sync_status = loopingcall.FixedIntervalLoopingCall(
self._sync_status, self._checkpoint, self._status_getters)
sync_status.start(interval=CONF.sync_status_interval)
def execute(self, checkpoint, *args, **kwargs):
LOG.debug("Initiate delete checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_DELETING
checkpoint.commit()
def _sync_status(self, checkpoint, status_getters):
statuses = set()
for s in status_getters:
resource_id = s.get('resource_id')
get_resource_stats = s.get('get_resource_stats')
statuses.add(get_resource_stats(checkpoint, resource_id))
LOG.info(_LI("Start sync checkpoint status,checkpoint_id:"
"%(checkpoint_id)s, resource_status:"
"%(resource_status)s") %
{"checkpoint_id": checkpoint.id,
"resource_status": statuses})
if constants.RESOURCE_STATUS_ERROR in statuses:
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR_DELETING
checkpoint.commit()
raise loopingcall.LoopingCallDone()
elif statuses == {constants.RESOURCE_STATUS_DELETED, }:
checkpoint.delete()
LOG.info(_LI("Stop sync checkpoint status,checkpoint_id: "
"%(checkpoint_id)s,checkpoint status: "
"%(checkpoint_status)s"),
{"checkpoint_id": checkpoint.id,
"checkpoint_status": checkpoint.status})
raise loopingcall.LoopingCallDone()
def revert(self, checkpoint, *args, **kwargs):
LOG.debug("Failed to delete checkpoint_id: %s", checkpoint.id)
checkpoint.status = constants.CHECKPOINT_STATUS_ERROR_DELETING
checkpoint.commit()
class CompleteDeleteTask(task.Task):
def __init__(self, *args, **kwargs):
super(CompleteDeleteTask, self).__init__(*args, **kwargs)
def execute(self, checkpoint):
LOG.debug("Complete delete checkpoint_id: %s", checkpoint.id)
checkpoint.delete()
def get_flow(context, workflow_engine, operation_type, checkpoint, provider):
ctx = {'context': context,
'checkpoint': checkpoint,
'workflow_engine': workflow_engine,
'operation_type': operation_type,
}
LOG.info(_LI("Start get checkpoint flow,checkpoint_id: %s"),
LOG.info(_LI("Start get checkpoint flow, checkpoint_id: %s"),
checkpoint.id)
flow_name = "delete_checkpoint_" + checkpoint.id
flow_name = "Delete_Checkpoint_" + checkpoint.id
delete_flow = workflow_engine.build_flow(flow_name, 'linear')
result = provider.build_task_flow(ctx)
status_getters = result.get('status_getters')
resource_flow = result.get('task_flow')
workflow_engine.add_tasks(delete_flow,
resource_flow,
SyncCheckpointStatusTask(checkpoint,
status_getters))
flow_engine = workflow_engine.get_engine(delete_flow)
resource_graph = checkpoint.resource_graph
plugins = provider.load_plugins()
resources_task_flow = resource_flow.build_resource_flow(
operation_type=operation_type,
context=context,
workflow_engine=workflow_engine,
resource_graph=resource_graph,
plugins=plugins,
parameters=None
)
workflow_engine.add_tasks(
delete_flow,
InitiateDeleteTask(),
resources_task_flow,
CompleteDeleteTask(),
)
flow_engine = workflow_engine.get_engine(delete_flow,
store={'checkpoint': checkpoint})
return flow_engine

View File

@ -52,13 +52,14 @@ class Worker(object):
if operation_type == constants.OPERATION_PROTECT:
plan = kwargs.get('plan', None)
provider = kwargs.get('provider', None)
checkpoint = kwargs.get('checkpoint')
protection_flow = create_protection.get_flow(context,
self.workflow_engine,
operation_type,
plan,
provider)
provider,
checkpoint)
return protection_flow
# TODO(wangliuan)implement the other operation
def get_restoration_flow(self, context, operation_type, checkpoint,
provider, restore, restore_auth):

View File

@ -15,6 +15,8 @@ Protection Service
"""
from datetime import datetime
from eventlet import greenpool
from eventlet import greenthread
import six
from oslo_config import cfg
@ -35,7 +37,12 @@ LOG = logging.getLogger(__name__)
protection_manager_opts = [
cfg.StrOpt('provider_registry',
default='karbor.services.protection.provider.ProviderRegistry',
help='the provider registry')
help='the provider registry'),
cfg.IntOpt('max_concurrent_operations',
default=0,
help='number of maximum concurrent operation (protect, restore,'
' delete) flows. 0 means no hard limit'
)
]
CONF = cfg.CONF
@ -60,6 +67,16 @@ class ProtectionManager(manager.Manager):
self.protectable_registry = ProtectableRegistry()
self.protectable_registry.load_plugins()
self.worker = flow_manager.Worker()
self._greenpool = None
self._greenpool_size = CONF.max_concurrent_operations
if self._greenpool_size != 0:
self._greenpool = greenpool.GreenPool(self._greenpool_size)
def _spawn(self, func, *args, **kwargs):
if self._greenpool is not None:
return self._greenpool.spawn_n(func, *args, **kwargs)
else:
return greenthread.spawn_n(func, *args, **kwargs)
def init_host(self, **kwargs):
"""Handle initialization if this is a standalone service"""
@ -84,32 +101,32 @@ class ProtectionManager(manager.Manager):
provider_id = plan.get('provider_id', None)
plan_id = plan.get('id', None)
provider = self.provider_registry.show_provider(provider_id)
checkpoint_collection = provider.get_checkpoint_collection()
try:
checkpoint = checkpoint_collection.create(plan)
except Exception as e:
LOG.exception(_LE("Failed to create checkpoint, plan: %s"),
plan_id)
exc = exception.FlowError(flow="protect",
error="Error creating checkpoint")
six.raise_from(exc, e)
try:
protection_flow = self.worker.get_flow(context,
constants.OPERATION_PROTECT,
plan=plan,
provider=provider)
except Exception:
provider=provider,
checkpoint=checkpoint)
except Exception as e:
LOG.exception(_LE("Failed to create protection flow, plan: %s"),
plan_id)
raise exception.FlowError(
flow="protect",
error=_("Failed to create flow"))
try:
self.worker.run_flow(protection_flow)
except Exception:
LOG.exception(_LE("Failed to run protection flow, plan: %s"),
plan_id)
error=e.msg if hasattr(e, 'msg') else 'Internal error')
self._spawn(self.worker.run_flow, protection_flow)
return checkpoint.id
raise exception.FlowError(
flow="protect",
error=_("Failed to run flow"))
finally:
checkpoint = self.worker.flow_outputs(protection_flow,
target='checkpoint')
return {'checkpoint_id': checkpoint.id}
@messaging.expected_exceptions(exception.InvalidInput,
@messaging.expected_exceptions(exception.ProviderNotFound,
exception.CheckpointNotFound,
exception.CheckpointNotAvailable,
exception.FlowError)
def restore(self, context, restore, restore_auth):
@ -118,13 +135,11 @@ class ProtectionManager(manager.Manager):
checkpoint_id = restore["checkpoint_id"]
provider_id = restore["provider_id"]
provider = self.provider_registry.show_provider(provider_id)
try:
checkpoint_collection = provider.get_checkpoint_collection()
checkpoint = checkpoint_collection.get(checkpoint_id)
except Exception:
LOG.error(_LE("Invalid checkpoint id: %s"), checkpoint_id)
raise exception.InvalidInput(
reason=_("Invalid checkpoint id"))
if not provider:
raise exception.ProviderNotFound(provider_id=provider_id)
checkpoint_collection = provider.get_checkpoint_collection()
checkpoint = checkpoint_collection.get(checkpoint_id)
if checkpoint.status != constants.CHECKPOINT_STATUS_AVAILABLE:
raise exception.CheckpointNotAvailable(
@ -145,15 +160,7 @@ class ProtectionManager(manager.Manager):
raise exception.FlowError(
flow="restore",
error=_("Failed to create flow"))
try:
self.worker.run_flow(restoration_flow)
except Exception:
LOG.exception(
_LE("Failed to run restoration flow checkpoint: %s"),
checkpoint_id)
raise exception.FlowError(
flow="restore",
error=_("Failed to run flow"))
self._spawn(self.worker.run_flow, restoration_flow)
def delete(self, context, provider_id, checkpoint_id):
LOG.info(_LI("Starting protection service:delete action"))
@ -191,12 +198,7 @@ class ProtectionManager(manager.Manager):
raise exception.KarborException(_(
"Failed to create delete checkpoint flow."
))
try:
self.worker.run_flow(delete_checkpoint_flow)
return True
except Exception:
LOG.exception(_LE("Failed to run delete checkpoint flow"))
raise
self._spawn(self.worker.run_flow, delete_checkpoint_flow)
def start(self, plan):
# TODO(wangliuan)

View File

@ -9,60 +9,129 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from oslo_config import cfg
from oslo_log import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Operation(object):
def __init__(self):
super(Operation, self).__init__()
def on_prepare_begin(self, checkpoint, resource, context, parameters,
**kwargs):
"""on_prepare_begin hook runs before any child resource's hooks run
Optional
:param checkpoint: checkpoint object for this operation
:param resource: a resource object for this operation
:param context: current operation context (viable for clients)
:param parameters: dictionary representing operation parameters
:param heat_template: HeatTemplate for restore operation only
"""
pass
def on_prepare_finish(self, checkpoint, resource, context, parameters,
**kwargs):
"""on_prepare_finish hook runs after all child resources' prepare hooks
Optional
:param checkpoint: checkpoint object for this operation
:param resource: a resource object for this operation
:param context: current operation context (viable for clients)
:param parameters: dictionary representing operation parameters
:param heat_template: HeatTemplate for restore operation only
"""
pass
def on_main(self, checkpoint, resource, context, parameters, **kwargs):
"""on_main hook runs in parallel to other resources' on_main hooks
Your main operation heavy lifting should probably be here.
Optional
:param checkpoint: checkpoint object for this operation
:param resource: a resource object for this operation
:param context: current operation context (viable for clients)
:param parameters: dictionary representing operation parameters
:param heat_template: HeatTemplate for restore operation only
"""
pass
def on_complete(self, checkpoint, resource, context, parameters, **kwargs):
"""on_complete hook runs after all dependent resource's hooks
Optional
:param checkpoint: checkpoint object for this operation
:param resource: a resource object for this operation
:param context: current operation context (viable for clients)
:param parameters: dictionary representing operation parameters
:param heat_template: HeatTemplate for restore operation only
"""
pass
class ProtectionPlugin(object):
def __init__(self, config=None):
self._config = config
@abc.abstractmethod
def get_supported_resources_types(self):
# TODO(wangliuan)
pass
def get_protect_operation(self, resource):
"""Returns the protect Operation for this resource
@abc.abstractmethod
def get_options_schema(self, resources_type):
# TODO(wangliuan)
pass
:returns: Operation for the resource
"""
raise NotImplementedError
@abc.abstractmethod
def get_saved_info_schema(self, resources_type):
# TODO(wangliuan)
pass
def get_restore_operation(self, resource):
"""Returns the restore Operation for this resource
@abc.abstractmethod
def get_restore_schema(self, resources_type):
# TODO(wangliuan)
pass
:returns: Operation for the resource
"""
raise NotImplementedError
@abc.abstractmethod
def get_saved_info(self, metadata_store, resource):
# TODO(wangliuan)
pass
def get_delete_operation(self, resource):
"""Returns the delete Operation for this resource
@abc.abstractmethod
def get_resource_stats(self, checkpoint, resource_id):
# TODO(wangliuan)
pass
:returns: Operation for the resource
"""
raise NotImplementedError
@abc.abstractmethod
def on_resource_start(self, context):
# TODO(wangliuan)
pass
@classmethod
def get_supported_resources_types(cls):
"""Returns a list of resource types this plugin supports
@abc.abstractmethod
def on_resource_end(self, context):
# TODO(wangliuan)
pass
:returns: a list of resource types
"""
raise NotImplementedError
@classmethod
def get_options_schema(cls, resource_type):
"""Returns the protect options schema for a resource type
:returns: a dictionary representing the schema
"""
raise NotImplementedError
@classmethod
def get_saved_info_schema(cls, resource_type):
"""Returns the saved info schema for a resource type
:returns: a dictionary representing the schema
"""
raise NotImplementedError
@classmethod
def get_restore_schema(cls, resource_type):
"""Returns the restore schema for a resource type
:returns: a dictionary representing the schema
"""
raise NotImplementedError
@classmethod
def get_saved_info(cls, metadata_store, resource):
"""Returns the saved info for a resource
:returns: a dictionary representing the saved info
"""
raise NotImplementedError

View File

@ -98,12 +98,3 @@ class BaseProtectionPlugin(ProtectionPlugin):
child_task, parent_task)
else:
task_stack.pop()
def get_resource_stats(self, checkpoint, resource_id):
# Get the status of this resource
bank_section = checkpoint.get_resource_bank_section(resource_id)
try:
status = bank_section.get_object("status")
return status
except Exception:
return constants.RESOURCE_STATUS_UNDEFINED

View File

@ -50,25 +50,20 @@ class GlanceProtectionPlugin(BaseProtectionPlugin):
def _add_to_threadpool(self, func, *args, **kwargs):
self._tp.spawn_n(func, *args, **kwargs)
def get_resource_stats(self, checkpoint, resource_id):
# Get the status of this resource
bank_section = checkpoint.get_resource_bank_section(resource_id)
try:
status = bank_section.get_object("status")
return status
except Exception:
return "undefined"
def get_options_schema(self, resources_type):
@classmethod
def get_options_schema(cls, resources_type):
return image_schemas.OPTIONS_SCHEMA
def get_restore_schema(self, resources_type):
@classmethod
def get_restore_schema(cls, resources_type):
return image_schemas.RESTORE_SCHEMA
def get_saved_info_schema(self, resources_type):
@classmethod
def get_saved_info_schema(cls, resources_type):
return image_schemas.SAVED_INFO_SCHEMA
def get_saved_info(self, metadata_store, resource):
@classmethod
def get_saved_info(cls, metadata_store, resource):
pass
def _glance_client(self, cntxt):
@ -228,5 +223,6 @@ class GlanceProtectionPlugin(BaseProtectionPlugin):
resource_id=image_id,
resource_type=constants.IMAGE_RESOURCE_TYPE)
def get_supported_resources_types(self):
return self._SUPPORT_RESOURCE_TYPES
@classmethod
def get_supported_resources_types(cls):
return cls._SUPPORT_RESOURCE_TYPES

View File

@ -0,0 +1,69 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from karbor.common import constants
from karbor.services.protection import protection_plugin
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class NoopOperation(protection_plugin.Operation):
def __init__(self):
super(NoopOperation, self).__init__()
def on_prepare_begin(self, *args, **kwargs):
pass
def on_prepare_finish(self, *args, **kwargs):
pass
def on_main(self, *args, **kwargs):
pass
def on_complete(self, *args, **kwargs):
pass
class NoopProtectionPlugin(protection_plugin.ProtectionPlugin):
def __init__(self, config=None):
super(NoopProtectionPlugin, self).__init__(config)
def get_protect_operation(self, resource):
return NoopOperation()
def get_restore_operation(self, resource):
return NoopOperation()
def get_delete_operation(self, resource):
return NoopOperation()
@classmethod
def get_supported_resources_types(cls):
return constants.RESOURCE_TYPES
@classmethod
def get_options_schema(cls, resource_type):
return {}
@classmethod
def get_saved_info_schema(cls, resource_type):
return {}
@classmethod
def get_restore_schema(cls, resource_type):
return {}
@classmethod
def get_saved_info(cls, metadata_store, resource):
return None

View File

@ -53,16 +53,20 @@ class NovaProtectionPlugin(BaseProtectionPlugin):
def _add_to_threadpool(self, func, *args, **kwargs):
self._tp.spawn_n(func, *args, **kwargs)
def get_options_schema(self, resource_type):
@classmethod
def get_options_schema(cls, resource_type):
return server_plugin_schemas.OPTIONS_SCHEMA
def get_restore_schema(self, resource_type):
@classmethod
def get_restore_schema(cls, resource_type):
return server_plugin_schemas.RESTORE_SCHEMA
def get_saved_info_schema(self, resource_type):
@classmethod
def get_saved_info_schema(cls, resource_type):
return server_plugin_schemas.SAVED_INFO_SCHEMA
def get_saved_info(self, metadata_store, resource):
@classmethod
def get_saved_info(cls, metadata_store, resource):
# TODO(luobin)
pass
@ -487,5 +491,6 @@ class NovaProtectionPlugin(BaseProtectionPlugin):
resource_id=resource_id,
resource_type=constants.SERVER_RESOURCE_TYPE)
def get_supported_resources_types(self):
return self._SUPPORT_RESOURCE_TYPES
@classmethod
def get_supported_resources_types(cls):
return cls._SUPPORT_RESOURCE_TYPES

View File

@ -52,19 +52,24 @@ class CinderProtectionPlugin(BaseProtectionPlugin):
sync_status_loop.start(interval=self.protection_sync_interval,
initial_delay=self.protection_sync_interval)
def get_supported_resources_types(self):
return self._SUPPORT_RESOURCE_TYPES
@classmethod
def get_supported_resources_types(cls):
return cls._SUPPORT_RESOURCE_TYPES
def get_options_schema(self, resources_type):
@classmethod
def get_options_schema(cls, resources_type):
return cinder_schemas.OPTIONS_SCHEMA
def get_restore_schema(self, resources_type):
@classmethod
def get_restore_schema(cls, resources_type):
return cinder_schemas.RESTORE_SCHEMA
def get_saved_info_schema(self, resources_type):
@classmethod
def get_saved_info_schema(cls, resources_type):
return cinder_schemas.SAVED_INFO_SCHEMA
def get_saved_info(self, metadata_store, resource):
@classmethod
def get_saved_info(cls, metadata_store, resource):
# TODO(hurong)
pass

View File

@ -11,18 +11,12 @@
# under the License.
import os
import six
from karbor.common import constants
from karbor import exception
from karbor.i18n import _, _LE
from karbor.resource import Resource
from karbor.services.protection import bank_plugin
from karbor.services.protection.checkpoint import CheckpointCollection
from karbor.services.protection.graph import GraphWalker
from karbor.services.protection.protectable_registry import ProtectableRegistry
from karbor.services.protection.resource_graph import ResourceGraphContext
from karbor.services.protection.resource_graph \
import ResourceGraphWalkerListener
from karbor import utils
from oslo_config import cfg
from oslo_log import log as logging
@ -84,7 +78,7 @@ class PluggableProtectionProvider(object):
for plugin_name in self._config.provider.plugin:
if not plugin_name:
raise ImportError(_("Empty protection plugin"))
self._load_plugin(plugin_name)
self._register_plugin(plugin_name)
@property
def id(self):
@ -110,6 +104,12 @@ class PluggableProtectionProvider(object):
def plugins(self):
return self._plugin_map
def load_plugins(self):
return {
plugin_type: plugin_class(self._config)
for plugin_type, plugin_class in six.iteritems(self.plugins)
}
def _load_bank(self, bank_name):
try:
plugin = utils.load_plugin(PROTECTION_NAMESPACE, bank_name,
@ -121,17 +121,16 @@ class PluggableProtectionProvider(object):
else:
self._bank_plugin = plugin
def _load_plugin(self, plugin_name):
def _register_plugin(self, plugin_name):
try:
plugin = utils.load_plugin(PROTECTION_NAMESPACE, plugin_name,
self._config)
plugin = utils.load_class(PROTECTION_NAMESPACE, plugin_name)
except Exception:
LOG.exception(_LE("Load protection plugin: '%s' failed."),
plugin_name)
raise
else:
self._plugin_map[plugin_name] = plugin
for resource in plugin.get_supported_resources_types():
self._plugin_map[resource] = plugin
if hasattr(plugin, 'get_options_schema'):
self._extended_info_schema['options_schema'][resource] \
= plugin.get_options_schema(resource)
@ -155,88 +154,6 @@ class PluggableProtectionProvider(object):
limit=limit, marker=marker, plan_id=plan_id,
start_date=start_date, end_date=end_date, sort_dir=sort_dir)
def build_task_flow(self, ctx):
cntxt = ctx["context"]
workflow_engine = ctx["workflow_engine"]
operation = ctx["operation_type"]
resource_context = None
resource_graph = None
if operation == constants.OPERATION_PROTECT:
plan = ctx["plan"]
task_flow = workflow_engine.build_flow(flow_name=plan.get('id'))
resources = plan.get('resources')
parameters = plan.get('parameters')
graph_resources = []
for resource in resources:
graph_resources.append(Resource(type=resource['type'],
id=resource['id'],
name=resource['name']))
# TODO(luobin): pass registry in ctx
registry = ProtectableRegistry()
registry.load_plugins()
resource_graph = registry.build_graph(cntxt, graph_resources)
resource_context = ResourceGraphContext(
cntxt=cntxt,
operation=operation,
workflow_engine=workflow_engine,
task_flow=task_flow,
plugin_map=self._plugin_map,
parameters=parameters
)
if operation == constants.OPERATION_RESTORE:
restore = ctx['restore']
task_flow = workflow_engine.build_flow(
flow_name=restore.get('id'))
checkpoint = ctx["checkpoint"]
resource_graph = checkpoint.resource_graph
parameters = restore.get('parameters')
heat_template = ctx["heat_template"]
resource_context = ResourceGraphContext(
cntxt=cntxt,
checkpoint=checkpoint,
operation=operation,
workflow_engine=workflow_engine,
task_flow=task_flow,
plugin_map=self._plugin_map,
parameters=parameters,
heat_template=heat_template
)
if operation == constants.OPERATION_DELETE:
checkpoint = ctx['checkpoint']
task_flow = workflow_engine.build_flow(
flow_name=checkpoint.id)
resource_graph = checkpoint.resource_graph
resource_context = ResourceGraphContext(
cntxt=cntxt,
checkpoint=checkpoint,
operation=operation,
workflow_engine=workflow_engine,
task_flow=task_flow,
plugin_map=self._plugin_map
)
# TODO(luobin): for other type operations
walker_listener = ResourceGraphWalkerListener(resource_context)
graph_walker = GraphWalker()
graph_walker.register_listener(walker_listener)
graph_walker.walk_graph(resource_graph)
if operation == constants.OPERATION_PROTECT:
return {"task_flow": walker_listener.context.task_flow,
"status_getters": walker_listener.context.status_getters,
"resource_graph": resource_graph}
if operation == constants.OPERATION_RESTORE:
return {"task_flow": walker_listener.context.task_flow}
if operation == constants.OPERATION_DELETE:
return {"task_flow": walker_listener.context.task_flow,
"status_getters": walker_listener.context.status_getters
}
# TODO(luobin): for other type operations
class ProviderRegistry(object):
def __init__(self):

View File

@ -0,0 +1,183 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from karbor.common import constants
from karbor import exception
from karbor.i18n import _LI
from karbor.services.protection import graph
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
HOOKS = (
HOOK_PRE_BEGIN,
HOOK_PRE_FINISH,
HOOK_MAIN,
HOOK_COMPLETE
) = (
'on_prepare_begin',
'on_prepare_finish',
'on_main',
'on_complete'
)
ResourceHooks = namedtuple('ResourceHooks', [
HOOK_PRE_BEGIN,
HOOK_PRE_FINISH,
HOOK_MAIN,
HOOK_COMPLETE,
])
OPERATION_EXTRA_ARGS = {
constants.OPERATION_RESTORE: ['heat_template'],
}
def noop_handle(*args, **kwargs):
pass
class ResourceFlowGraphWalkerListener(graph.GraphWalkerListener):
def __init__(self, resource_flow, operation_type, context, parameters,
plugins, workflow_engine):
super(ResourceFlowGraphWalkerListener, self).__init__()
self.operation_type = operation_type
self.context = context
self.parameters = parameters or {}
self.plugins = plugins
self.workflow_engine = workflow_engine
self.flow = resource_flow
self.node_tasks = {}
self.task_stack = []
self.current_resource = None
def _create_hook_tasks(self, operation_obj, resource):
pre_begin_task = self._create_hook_task(operation_obj, resource,
HOOK_PRE_BEGIN)
pre_finish_task = self._create_hook_task(operation_obj, resource,
HOOK_PRE_FINISH)
main_task = self._create_hook_task(operation_obj, resource,
HOOK_MAIN)
post_task = self._create_hook_task(operation_obj, resource,
HOOK_COMPLETE)
return ResourceHooks(pre_begin_task, pre_finish_task, main_task,
post_task)
def _create_hook_task(self, operation_obj, resource, hook_type):
method = getattr(operation_obj, hook_type, noop_handle)
assert callable(method), 'Resource {} method "{}" is not callable' \
.format(resource.type, hook_type)
task_name = "{operation_type}_{hook_type}_{type}_{id}".format(
type=resource.type,
id=resource.id,
hook_type=hook_type,
operation_type=self.operation_type,
)
parameters = {}
parameters.update(self.parameters.get(resource.type, {}))
resource_id = '{}#{}'.format(resource.type, resource.id)
parameters.update(self.parameters.get(resource_id, {}))
injects = {
'context': self.context,
'parameters': parameters,
'resource': resource,
}
requires = list(injects)
requires.append('checkpoint')
requires.extend(OPERATION_EXTRA_ARGS.get(self.operation_type, []))
task = self.workflow_engine.create_task(method,
name=task_name,
inject=injects,
requires=requires)
return task
def on_node_enter(self, node, already_visited):
resource = node.value
LOG.debug(
"Enter node (type: %(type)s id: %(id)s visited: %(visited)s)",
{"type": resource.type, "id": resource.id, "visited":
already_visited}
)
self.current_resource = resource
if already_visited:
self.task_stack.append(self.node_tasks[resource.id])
return
if resource.type not in self.plugins:
raise exception.ProtectionPluginNotFound(type=resource.type)
protection_plugin = self.plugins[resource.type]
operation_getter_name = 'get_{}_operation'.format(self.operation_type)
operation_getter = getattr(protection_plugin, operation_getter_name)
assert callable(operation_getter)
operation_obj = operation_getter(resource)
hooks = self._create_hook_tasks(operation_obj, resource)
LOG.debug("added operation %s hooks", self.operation_type)
self.node_tasks[resource.id] = hooks
self.task_stack.append(hooks)
self.workflow_engine.add_tasks(self.flow, hooks.on_prepare_begin,
hooks.on_prepare_finish, hooks.on_main,
hooks.on_complete)
self.workflow_engine.link_task(self.flow, hooks.on_prepare_begin,
hooks.on_prepare_finish)
self.workflow_engine.link_task(self.flow, hooks.on_prepare_finish,
hooks.on_main)
self.workflow_engine.link_task(self.flow, hooks.on_main,
hooks.on_complete)
def on_node_exit(self, node):
resource = node.value
LOG.debug(
"Exit node (type: %(type)s id: %(id)s)",
{"type": resource.type, "id": resource.id}
)
child_hooks = self.task_stack.pop()
if len(self.task_stack) > 0:
parent_hooks = self.task_stack[-1]
self.workflow_engine.link_task(self.flow,
parent_hooks.on_prepare_begin,
child_hooks.on_prepare_begin)
self.workflow_engine.link_task(self.flow,
child_hooks.on_prepare_finish,
parent_hooks.on_prepare_finish)
self.workflow_engine.link_task(self.flow, child_hooks.on_complete,
parent_hooks.on_complete)
def build_resource_flow(operation_type, context, workflow_engine,
plugins, resource_graph, parameters):
LOG.info(_LI("Build resource flow for operation %s"), operation_type)
resource_graph_flow = workflow_engine.build_flow(
'ResourceGraphFlow_{}'.format(operation_type),
'graph',
)
resource_walker = ResourceFlowGraphWalkerListener(resource_graph_flow,
operation_type,
context,
parameters,
plugins,
workflow_engine)
walker = graph.GraphWalker()
walker.register_listener(resource_walker)
LOG.debug("Starting resource graph walk (operation %s)", operation_type)
walker.walk_graph(resource_graph)
LOG.debug("Finished resource graph walk (operation %s)", operation_type)
return resource_graph_flow

View File

@ -1,105 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from karbor.common import constants
from karbor.i18n import _, _LE, _LI
from karbor.services.protection.graph import GraphWalkerListener
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class ResourceGraphContext(object):
def __init__(self, cntxt, is_first_visited=False, operation="protect",
parameters=None, plugin_map=None, node=None,
workflow_engine=None, task_flow=None, checkpoint=None,
heat_template=None):
self.cntxt = cntxt
self.is_first_visited = is_first_visited
self.operation = operation
self.parameters = parameters
self.plugin_map = plugin_map
self.node = node
self.workflow_engine = workflow_engine
self.task_flow = task_flow
self.task_stack = []
# used for protection
self.status_getters = []
# used for restoration
self.checkpoint = checkpoint
self.heat_template = heat_template
def get_node_context(self, node, is_first_visited=None):
node_context = ResourceGraphContext(
self.cntxt,
is_first_visited=is_first_visited,
operation=self.operation,
parameters=self.parameters,
plugin_map=self.plugin_map,
node=node,
workflow_engine=self.workflow_engine,
task_flow=self.task_flow,
checkpoint=self.checkpoint,
heat_template=self.heat_template
)
node_context.task_stack = self.task_stack
node_context.status_getters = self.status_getters
return node_context
class ResourceGraphWalkerListener(GraphWalkerListener):
def __init__(self, context):
self.context = context
self.plugin_map = self.context.plugin_map
def on_node_enter(self, node, already_visited):
resource = node.value
resource_type = resource.type
LOG.info(_LI("on_node_enter, node resource_type: %s"), resource_type)
protection_plugin = self._get_protection_plugin(resource_type)
# get node context
is_first_visited = not already_visited
context = self.context.get_node_context(node, is_first_visited)
# do something in protection_plugin
protection_plugin.on_resource_start(context)
if self.context.operation == constants.OPERATION_PROTECT \
or self.context.operation == constants.OPERATION_DELETE:
if not already_visited:
self.context.status_getters.append(
{"resource_id": resource.id,
"get_resource_stats": protection_plugin.get_resource_stats
}
)
def on_node_exit(self, node):
resource = node.value
resource_type = resource.type
LOG.info(_LI("on_node_exit, node resource_type: %s"), resource_type)
protection_plugin = self._get_protection_plugin(resource_type)
# get node context
context = self.context.get_node_context(node)
# do something in protection_plugin
protection_plugin.on_resource_end(context)
def _get_protection_plugin(self, resource_type):
for plugin in self.plugin_map.values():
if hasattr(plugin, "get_supported_resources_types"):
if resource_type in plugin.get_supported_resources_types():
return plugin
LOG.error(_LE("no plugin support this resource_type:%s"),
resource_type)
raise Exception(_("No plugin support this resource_type"))

View File

@ -170,6 +170,8 @@ class KarborBaseTest(base.BaseTestCase):
self.karbor_client = _get_karbor_client_from_creds()
self.keystone_endpoint = _get_keystone_endpoint_from_creds()
self._testcase_store = ObjectStore()
self.provider_id_noop = 'b766f37c-d011-4026-8228-28730d734a3f'
self.provider_id_os = 'cf56bd3e-97a7-4078-b6d5-f36246333fd9'
def store(self, obj, close_func=None):
return self._testcase_store.store(obj, close_func)

View File

@ -19,15 +19,14 @@ class CheckpointsTest(karbor_base.KarborBaseTest):
"""Test Checkpoints operation """
def setUp(self):
super(CheckpointsTest, self).setUp()
providers = self.provider_list()
self.assertTrue(len(providers))
self.provider_id = providers[0].id
self.provider_id = self.provider_id_noop
def test_checkpoint_create(self):
self.skipTest('Requires cinder protection plugin adjustment')
volume = self.store(objects.Volume())
volume.create(1)
plan = self.store(objects.Plan())
plan.create(self.provider_id, [volume, ])
plan.create(self.provider_id_os, [volume, ])
backups = self.cinder_client.backups.list()
before_num = len(backups)

View File

@ -19,9 +19,7 @@ class PlansTest(karbor_base.KarborBaseTest):
"""Test Plans operation"""
def setUp(self):
super(PlansTest, self).setUp()
providers = self.provider_list()
self.assertTrue(len(providers))
self.provider_id = providers[0].id
self.provider_id = self.provider_id_noop
def test_plans_list(self):
nplans_before = len(self.karbor_client.plans.list())

View File

@ -27,9 +27,7 @@ class RestoresTest(karbor_base.KarborBaseTest):
def setUp(self):
super(RestoresTest, self).setUp()
providers = self.provider_list()
self.assertTrue(len(providers))
self.provider_id = providers[0].id
self.provider_id = self.provider_id_noop
def _store_volume(self, volumes_pre, volumes_post):
volumes = list(set(volumes_post).difference(set(volumes_pre)))

View File

@ -25,7 +25,7 @@ class ScheduledOperationsTest(karbor_base.KarborBaseTest):
super(ScheduledOperationsTest, self).setUp()
providers = self.provider_list()
self.assertTrue(len(providers))
self.provider_id = providers[0].id
self.provider_id = self.provider_id_noop
def _create_scheduled_operation(self,
trigger_properties,

View File

@ -23,6 +23,7 @@ class FakeBankPlugin(bank_plugin.BankPlugin):
def __init__(self, config=None):
super(FakeBankPlugin, self).__init__(config)
config.register_opts(fake_bank_opts, 'fake_bank')
self.fake_host = config['fake_bank']['fake_host']
def create_object(self, key, value):
return

View File

@ -1,49 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from karbor.services.protection import protection_plugin
fake_plugin_opts = [
cfg.StrOpt('fake_user'),
]
class FakeProtectionPlugin(protection_plugin.ProtectionPlugin):
def __init__(self, config=None):
super(FakeProtectionPlugin, self).__init__(config)
config.register_opts(fake_plugin_opts, 'fake_plugin')
def get_supported_resources_types(self):
return ['Test::Resource']
def get_options_schema(self, resource_type):
return []
def get_saved_info_schema(self, resource_type):
return []
def get_restore_schema(self, resource_type):
return []
def get_saved_info(self, metadata_store, resource):
pass
def get_resource_stats(self, checkpoint, resource_id):
pass
def on_resource_start(self, context):
pass
def on_resource_end(self, context):
pass

View File

@ -3,7 +3,7 @@ name = fake_provider1
id = fake_id1
description = Test Provider 1
bank = karbor.tests.unit.fake_bank.FakeBankPlugin
plugin = karbor.tests.unit.fake_protection.FakeProtectionPlugin
plugin = karbor.tests.unit.protection.fakes.FakeProtectionPlugin
[fake_plugin]
fake_user = user

View File

@ -14,7 +14,9 @@
# under the License.
import futurist
import mock
from oslo_config import cfg
from oslo_log import log as logging
from karbor.i18n import _LE
@ -23,11 +25,14 @@ from karbor.services.protection.bank_plugin import Bank
from karbor.services.protection.bank_plugin import BankPlugin
from karbor.services.protection.bank_plugin import BankSection
from karbor.services.protection.graph import build_graph
from karbor.services.protection import protection_plugin
from karbor.services.protection import provider
from karbor.services.protection import resource_flow
from taskflow import engines
from taskflow.patterns import graph_flow
from taskflow.patterns import linear_flow
from taskflow import task
LOG = logging.getLogger(__name__)
@ -73,6 +78,11 @@ class FakeBankPlugin(BankPlugin):
def __init__(self, config=None):
super(FakeBankPlugin, self).__init__(config=config)
self._objects = {}
fake_bank_opts = [
cfg.StrOpt('fake_host'),
]
config.register_opts(fake_bank_opts, 'fake_bank')
self.fake_host = config['fake_bank']['fake_host']
def create_object(self, key, value):
self._objects[key] = value
@ -128,35 +138,57 @@ class FakeProtectablePlugin(object):
pass
class FakeProtectionPlugin(object):
def __init__(self, expected_event_stream):
self._expected_event_stream = expected_event_stream
class MockOperation(protection_plugin.Operation):
def __init__(self):
super(MockOperation, self).__init__()
for hook_name in resource_flow.HOOKS:
setattr(self, hook_name, mock.Mock())
def on_resource_start(self, context):
resource = context.node.value
workflow_engine = context.workflow_engine
task_flow = context.task_flow
if self._expected_event_stream.pop(0) != (
"on_resource_start",
resource.id,
context.is_first_visited):
raise Exception
if context.is_first_visited and workflow_engine:
task = workflow_engine.create_task("fake_task")
workflow_engine.add_tasks(task_flow, task)
def on_resource_end(self, context):
resource = context.node.value
if self._expected_event_stream.pop(0) != (
"on_resource_end",
resource.id):
raise Exception
class FakeProtectionPlugin(protection_plugin.ProtectionPlugin):
SUPPORTED_RESOURCES = [
'Test::ResourceA',
'Test::ResourceB',
'Test::ResourceC',
]
def get_resource_stats(self):
pass
def __init__(self, config=None, *args, **kwargs):
super(FakeProtectionPlugin, self).__init__(config)
fake_plugin_opts = [
cfg.StrOpt('fake_user'),
]
if config:
config.register_opts(fake_plugin_opts, 'fake_plugin')
self.fake_user = config['fake_plugin']['fake_user']
def get_supported_resources_types(self):
return ["fake"]
def get_protect_operation(self, *args, **kwargs):
return MockOperation()
def get_restore_operation(self, *args, **kwargs):
return MockOperation()
def get_delete_operation(self, *args, **kwargs):
return MockOperation()
@classmethod
def get_supported_resources_types(cls):
return cls.SUPPORTED_RESOURCES
@classmethod
def get_options_schema(cls, resource_type):
return {}
@classmethod
def get_saved_info_schema(cls, resource_type):
return {}
@classmethod
def get_restore_schema(cls, resource_type):
return {}
@classmethod
def get_saved_info(cls, metadata_store, resource):
return None
class FakeCheckpoint(object):
@ -199,12 +231,10 @@ class FakeProvider(provider.PluggableProtectionProvider):
self._name = 'provider'
self._description = 'fake_provider'
self._extend_info_schema = {}
def build_task_flow(self, plan):
status_getters = []
return {'status_getters': status_getters,
'task_flow': graph_flow.Flow('fake_flow')
}
self._config = None
self._plugin_map = {
'fake': FakeProtectionPlugin,
}
def get_checkpoint_collection(self):
return FakeCheckpointCollection()
@ -214,11 +244,32 @@ class FakeFlowEngine(object):
def __init__(self):
super(FakeFlowEngine, self).__init__()
def create_task(self, function, requires=None, provides=None,
inject=None, **kwargs):
name = kwargs.get('name', None)
auto_extract = kwargs.get('auto_extract', True)
rebind = kwargs.get('rebind', None)
revert = kwargs.get('revert', None)
version = kwargs.get('version', None)
if function:
return task.FunctorTask(function,
name=name,
provides=provides,
requires=requires,
auto_extract=auto_extract,
rebind=rebind,
revert=revert,
version=version,
inject=inject)
def add_tasks(self, flow, *nodes, **kwargs):
if flow is None:
LOG.error(_LE("The flow is None, get it first"))
flow.add(*nodes, **kwargs)
def link_task(self, flow, u, v):
flow.link(u, v)
def build_flow(self, flow_name, flow_type='graph'):
if flow_type == 'linear':
return linear_flow.Flow(flow_name)

View File

@ -1,156 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from karbor.common import constants
from karbor.services.protection.graph import build_graph
from karbor.services.protection.protectable_registry import ProtectableRegistry
from karbor.services.protection.provider import ProviderRegistry
from karbor.tests import base
from karbor.tests.unit.protection.fakes import fake_protection_plan
from karbor.tests.unit.protection.fakes import fake_restore
from karbor.tests.unit.protection.fakes import FakeCheckpoint
from karbor.tests.unit.protection.fakes import FakeProtectionPlugin
from karbor.tests.unit.protection.fakes import plan_resources
from karbor.tests.unit.protection.fakes import resource_map
class FakeWorkflowEngine(object):
def build_flow(self, flow_name):
self.flow_name = flow_name
self.task_flow = []
return self.task_flow
def add_tasks(self, task_flow, task):
task_flow.append(task)
def create_task(self, func, **kwargs):
return "fake_task"
class FakeProtectableRegistry(object):
def fetch_dependent_resources(self, resource):
return resource_map.__getitem__(resource)
def build_graph(self, context, resources):
return build_graph(
start_nodes=resources,
get_child_nodes_func=self.fetch_dependent_resources,
)
class PluggableProtectionProviderTest(base.TestCase):
def setUp(self):
super(PluggableProtectionProviderTest, self).setUp()
@mock.patch.object(ProtectableRegistry, 'build_graph')
def test_build_protect_task_flow(self, mock_build_graph):
pr = ProviderRegistry()
self.assertEqual(len(pr.providers), 1)
plugable_provider = pr.providers["fake_id1"]
cntxt = "fake_cntxt"
plan = fake_protection_plan()
workflow_engine = FakeWorkflowEngine()
operation = constants.OPERATION_PROTECT
ctx = {"context": cntxt,
"plan": plan,
"workflow_engine": workflow_engine,
"operation_type": operation,
}
expected_calls = [
("on_resource_start", 'A', True),
("on_resource_start", 'C', True),
("on_resource_start", 'D', True),
("on_resource_end", 'D'),
("on_resource_start", 'E', True),
("on_resource_end", 'E'),
("on_resource_end", 'C'),
("on_resource_end", 'A'),
("on_resource_start", 'B', True),
("on_resource_start", 'C', False),
("on_resource_start", 'D', False),
("on_resource_end", 'D'),
("on_resource_start", 'E', False),
("on_resource_end", 'E'),
("on_resource_end", 'C'),
("on_resource_end", 'B'),
]
fake_registry = FakeProtectableRegistry()
plugable_provider.protectable_registry = fake_registry
fake_registry.build_graph = mock.MagicMock()
resource_graph = build_graph(plan_resources, resource_map.__getitem__)
mock_build_graph.return_value = resource_graph
fake_protection_plugin = FakeProtectionPlugin(expected_calls)
plugable_provider._plugin_map = {
"fake_plugin": fake_protection_plugin
}
result = plugable_provider.build_task_flow(ctx)
self.assertEqual(len(result["status_getters"]), 5)
self.assertEqual(len(result["task_flow"]), 5)
def test_build_restore_task_flow(self):
pr = ProviderRegistry()
self.assertEqual(len(pr.providers), 1)
plugable_provider = pr.providers["fake_id1"]
cntxt = "fake_cntxt"
restore = fake_restore()
checkpoint = FakeCheckpoint()
workflow_engine = FakeWorkflowEngine()
operation = constants.OPERATION_RESTORE
ctx = {"context": cntxt,
"restore": restore,
"workflow_engine": workflow_engine,
"operation_type": operation,
"checkpoint": checkpoint,
"heat_template": "heat_template"
}
expected_calls = [
("on_resource_start", 'A', True),
("on_resource_start", 'C', True),
("on_resource_start", 'D', True),
("on_resource_end", 'D'),
("on_resource_start", 'E', True),
("on_resource_end", 'E'),
("on_resource_end", 'C'),
("on_resource_end", 'A'),
("on_resource_start", 'B', True),
("on_resource_start", 'C', False),
("on_resource_start", 'D', False),
("on_resource_end", 'D'),
("on_resource_start", 'E', False),
("on_resource_end", 'E'),
("on_resource_end", 'C'),
("on_resource_end", 'B'),
]
fake_protection_plugin = FakeProtectionPlugin(expected_calls)
plugable_provider._plugin_map = {
"fake_plugin": fake_protection_plugin
}
result = plugable_provider.build_task_flow(ctx)
self.assertEqual(len(result["task_flow"]), 5)
def tearDown(self):
super(PluggableProtectionProviderTest, self).tearDown()

View File

@ -88,15 +88,6 @@ class CinderProtectionPluginTest(base.TestCase):
self.cinder_client = ClientFactory.create_client("cinder", self.cntxt)
self.checkpoint = FakeCheckpoint()
def test_get_resource_stats(self):
fake_resource_id = "123"
fake_bank_section.get_object = mock.MagicMock()
fake_bank_section.get_object.return_value = \
constants.RESOURCE_STATUS_AVAILABLE
status = self.plugin.get_resource_stats(self.checkpoint,
fake_resource_id)
self.assertEqual(status, constants.RESOURCE_STATUS_AVAILABLE)
def test_get_options_schema(self):
options_schema = self.plugin.get_options_schema(
'OS::Cinder::Volume')

View File

@ -87,16 +87,6 @@ class GlanceProtectionPluginTest(base.TestCase):
self.glance_client = ClientFactory.create_client("glance", self.cntxt)
self.checkpoint = CheckpointCollection()
def test_get_resource_stats(self):
fake_resource_id = "123"
fake_bank_section.get_object = mock.MagicMock()
fake_bank_section.get_object.return_value = \
constants.RESOURCE_STATUS_PROTECTING
status = self.plugin.get_resource_stats(self.checkpoint,
fake_resource_id)
self.assertEqual(status, constants.RESOURCE_STATUS_PROTECTING)
def test_get_options_schema(self):
options_schema = self.plugin.get_options_schema(
constants.IMAGE_RESOURCE_TYPE)

View File

@ -146,7 +146,8 @@ class ProtectionServiceTest(base.TestCase):
mock_cp_collection_get):
mock_provider.return_value = fakes.FakeProvider()
context = mock.MagicMock()
mock_cp_collection_get.side_effect = exception.CheckpointNotFound()
mock_cp_collection_get.side_effect = exception.CheckpointNotFound(
checkpoint_id='123')
self.assertRaises(oslo_messaging.ExpectedException,
self.pro_manager.show_checkpoint,
context,

View File

@ -10,25 +10,39 @@
# License for the specific language governing permissions and limitations
# under the License.
from karbor import exception
from karbor.services.protection import provider
from karbor.tests import base
import mock
from karbor.resource import Resource
from karbor.services.protection import provider
from karbor.tests import base
from karbor.tests.unit.protection import fakes
from oslo_config import cfg
CONF = cfg.CONF
(parent_type, child_type, grandchild_type) = \
fakes.FakeProtectionPlugin.SUPPORTED_RESOURCES
parent = Resource(id='A1', name='parent', type=parent_type)
child = Resource(id='B1', name='child', type=child_type)
grandchild = Resource(id='C1', name='grandchild', type=grandchild_type)
resource_graph = {
parent: [child],
child: [grandchild],
grandchild: [],
}
class ProviderRegistryTest(base.TestCase):
def setUp(self):
super(ProviderRegistryTest, self).setUp()
@mock.patch.object(provider.PluggableProtectionProvider, '_load_bank')
@mock.patch.object(provider.PluggableProtectionProvider, '_load_plugin')
def test_load_providers(self, mock_load_bank, mock_load_plugin):
@mock.patch.object(provider.PluggableProtectionProvider,
'_register_plugin')
def test_load_providers(self, mock_load_bank, mock_register_plugin):
pr = provider.ProviderRegistry()
self.assertEqual(mock_load_plugin.call_count, 1)
self.assertEqual(mock_register_plugin.call_count, 1)
self.assertEqual(mock_load_bank.call_count, 1)
self.assertEqual(len(pr.providers), 1)
@ -38,16 +52,13 @@ class ProviderRegistryTest(base.TestCase):
def test_provider_bank_config(self):
pr = provider.ProviderRegistry()
provider1 = pr.show_provider('fake_id1')
self.assertEqual(provider1.bank._plugin._config.fake_bank.fake_host,
'thor')
self.assertEqual(provider1.bank._plugin.fake_host, 'thor')
def test_provider_plugin_config(self):
pr = provider.ProviderRegistry()
provider1 = pr.show_provider('fake_id1')
plugin_name = 'karbor.tests.unit.fake_protection.FakeProtectionPlugin'
self.assertEqual(
provider1.plugins[plugin_name]._config.fake_plugin.fake_user,
'user')
plugins = provider1.load_plugins()
self.assertEqual(plugins['Test::ResourceA'].fake_user, 'user')
def test_list_provider(self):
pr = provider.ProviderRegistry()
@ -58,8 +69,3 @@ class ProviderRegistryTest(base.TestCase):
provider_list = pr.list_providers()
for provider_node in provider_list:
self.assertTrue(pr.show_provider(provider_node['id']))
def test_show_non_existent_provider(self):
pr = provider.ProviderRegistry()
self.assertRaises(exception.ProviderNotFound, pr.show_provider,
'garbage')

View File

@ -0,0 +1,199 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functools import partial
import mock
from karbor.common import constants
from karbor.resource import Resource
from karbor.services.protection.flows.workflow import TaskFlowEngine
from karbor.services.protection import graph
from karbor.services.protection import resource_flow
from karbor.services.protection import restore_heat
from karbor.tests import base
from karbor.tests.unit.protection import fakes
from oslo_config import cfg
CONF = cfg.CONF
(parent_type, child_type, grandchild_type) = \
fakes.FakeProtectionPlugin.SUPPORTED_RESOURCES
parent = Resource(id='A1', name='parent', type=parent_type)
child = Resource(id='B1', name='child', type=child_type)
grandchild = Resource(id='C1', name='grandchild', type=grandchild_type)
class ResourceFlowTest(base.TestCase):
def setUp(self):
super(ResourceFlowTest, self).setUp()
self.resource_graph = {
parent: [child],
child: [grandchild],
grandchild: [],
}
self.provider = fakes.FakeProvider()
self.test_graph = graph.build_graph([parent],
self.resource_graph.__getitem__)
self.taskflow_engine = TaskFlowEngine()
def _walk_operation(self, protection, operation_type,
checkpoint='checkpoint', parameters={}, context=None,
heat_template=None, **kwargs):
plugin_map = {
parent_type: protection,
child_type: protection,
grandchild_type: protection,
}
flow = resource_flow.build_resource_flow(operation_type,
context,
self.taskflow_engine,
plugin_map,
self.test_graph,
parameters)
store = {
'checkpoint': checkpoint
}
if heat_template:
store['heat_template'] = heat_template
engine = self.taskflow_engine.get_engine(flow,
engine='parallel',
store=store)
self.taskflow_engine.run_engine(engine)
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
def test_resource_no_impl(self, mock_protection):
for operation in constants.OPERATION_TYPES:
heat_template = restore_heat.HeatTemplate() \
if operation == constants.OPERATION_RESTORE else None
self._walk_operation(mock_protection, operation,
heat_template=heat_template)
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
def test_resource_flow_callbacks(self, mock_protection):
for operation in constants.OPERATION_TYPES:
mock_operation = fakes.MockOperation()
get_operation_attr = 'get_{}_operation'.format(operation)
getattr(mock_protection, get_operation_attr).return_value = \
mock_operation
heat_template = restore_heat.HeatTemplate() \
if operation == constants.OPERATION_RESTORE else None
self._walk_operation(mock_protection, operation,
heat_template=heat_template)
self.assertEqual(mock_operation.on_prepare_begin.call_count,
len(self.resource_graph))
self.assertEqual(mock_operation.on_prepare_finish.call_count,
len(self.resource_graph))
self.assertEqual(mock_operation.on_main.call_count,
len(self.resource_graph))
self.assertEqual(mock_operation.on_complete.call_count,
len(self.resource_graph))
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
def test_resource_flow_parameters(self, mock_protection):
resource_a1_id = "{}#{}".format(parent_type, 'A1')
resource_b1_id = "{}#{}".format(child_type, 'B1')
parameters = {
resource_a1_id: {'option1': 'value1'},
resource_b1_id: {'option2': 'value2', 'option3': 'value3'},
parent_type: {'option4': 'value4'},
child_type: {'option3': 'value5'}
}
for operation in constants.OPERATION_TYPES:
mock_operation = fakes.MockOperation()
get_operation_attr = 'get_{}_operation'.format(operation)
getattr(mock_protection, get_operation_attr).return_value = \
mock_operation
kwargs = {
'checkpoint': 'A',
'context': 'B',
}
if operation == constants.OPERATION_RESTORE:
kwargs['heat_template'] = restore_heat.HeatTemplate()
self._walk_operation(mock_protection, operation,
parameters=parameters, **kwargs)
for resource in self.resource_graph:
resource_params = parameters.get(resource.type, {})
resource_id = '{}#{}'.format(resource.type, resource.id)
resource_params.update(parameters.get(resource_id, {}))
mock_operation.on_prepare_begin.assert_any_call(
resource=resource,
parameters=resource_params,
**kwargs)
mock_operation.on_prepare_finish.assert_any_call(
resource=resource,
parameters=resource_params,
**kwargs)
mock_operation.on_main.assert_any_call(
resource=resource,
parameters=resource_params,
**kwargs)
mock_operation.on_complete.assert_any_call(
resource=resource,
parameters=resource_params,
**kwargs)
@mock.patch('karbor.tests.unit.protection.fakes.FakeProtectionPlugin')
def test_resource_flow_order(self, mock_protection):
def test_order(order_list, hook_type, resource, *args, **kwargs):
order_list.append((hook_type, resource.id))
operation = constants.OPERATION_PROTECT
mock_operation = fakes.MockOperation()
get_operation_attr = 'get_{}_operation'.format(operation)
getattr(mock_protection, get_operation_attr).return_value = \
mock_operation
order_list = []
mock_operation.on_prepare_begin = partial(test_order, order_list,
'pre_begin')
mock_operation.on_prepare_finish = partial(test_order, order_list,
'pre_finish')
mock_operation.on_main = partial(test_order, order_list, 'main')
mock_operation.on_complete = partial(test_order, order_list,
'complete')
self._walk_operation(mock_protection, operation)
self.assertTrue(order_list.index(('pre_begin', parent.id)) <
order_list.index(('pre_begin', child.id)))
self.assertTrue(order_list.index(('pre_begin', child.id)) <
order_list.index(('pre_begin', grandchild.id)))
self.assertTrue(order_list.index(('pre_finish', parent.id)) >
order_list.index(('pre_finish', child.id)))
self.assertTrue(order_list.index(('pre_finish', child.id)) >
order_list.index(('pre_finish', grandchild.id)))
self.assertTrue(order_list.index(('complete', parent.id)) >
order_list.index(('complete', child.id)))
self.assertTrue(order_list.index(('complete', child.id)) >
order_list.index(('complete', grandchild.id)))
for resource_id in (parent.id, child.id, grandchild.id):
self.assertTrue(order_list.index(('pre_begin', resource_id)) <
order_list.index(('pre_finish', resource_id)))
self.assertTrue(order_list.index(('pre_finish', resource_id)) <
order_list.index(('main', resource_id)))
self.assertTrue(order_list.index(('main', resource_id)) <
order_list.index(('complete', resource_id)))

View File

@ -1,71 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import karbor.services.protection.graph as graph
from karbor.services.protection.resource_graph import ResourceGraphContext
from karbor.services.protection.resource_graph \
import ResourceGraphWalkerListener
from karbor.tests import base
from karbor.tests.unit.protection.fakes import FakeProtectionPlugin
from karbor.tests.unit.protection.fakes import plan_resources
from karbor.tests.unit.protection.fakes import resource_graph
from karbor.tests.unit.protection.fakes import resource_map
class ResourceGraphWalkerListenerTest(base.TestCase):
def test_resource_graph_walker_listener_plan(self):
expected_calls = [
("on_resource_start", 'A', True),
("on_resource_start", 'C', True),
("on_resource_start", 'D', True),
("on_resource_end", 'D'),
("on_resource_start", 'E', True),
("on_resource_end", 'E'),
("on_resource_end", 'C'),
("on_resource_end", 'A'),
("on_resource_start", 'B', True),
("on_resource_start", 'C', False),
("on_resource_start", 'D', False),
("on_resource_end", 'D'),
("on_resource_start", 'E', False),
("on_resource_end", 'E'),
("on_resource_end", 'C'),
("on_resource_end", 'B'),
]
fake_cntxt = "fake_cntxt"
fake_protection_plugin = FakeProtectionPlugin(expected_calls)
fake_context = ResourceGraphContext(
fake_cntxt,
plugin_map={"fake_plugin": fake_protection_plugin})
listener = ResourceGraphWalkerListener(fake_context)
walker = graph.GraphWalker()
walker.register_listener(listener)
walker.walk_graph(graph.build_graph(plan_resources,
resource_map.__getitem__))
self.assertEqual(len(listener.context.status_getters), 5)
def tearDown(self):
super(ResourceGraphWalkerListenerTest, self).tearDown()
class SerializeResourceGraphTest(base.TestCase):
def test_serialize_deserialize_packed_resource_graph(self):
serialized_resource_graph = graph.serialize_resource_graph(
resource_graph)
deserialized_resource_graph = graph.deserialize_resource_graph(
serialized_resource_graph)
self.assertEqual(len(resource_graph), len(deserialized_resource_graph))
for start_node in resource_graph:
self.assertIn(start_node, deserialized_resource_graph)

View File

@ -101,20 +101,24 @@ def get_bool_param(param_string, params):
return strutils.bool_from_string(param, strict=True)
def load_plugin(namespace, plugin_name, *args, **kwargs):
def load_class(namespace, plugin_name):
try:
LOG.debug('Start load plugin %s. ', plugin_name)
# Try to resolve plugin by name
mgr = driver.DriverManager(namespace, plugin_name)
plugin_class = mgr.driver
return mgr.driver
except RuntimeError as e1:
# fallback to class name
try:
plugin_class = importutils.import_class(plugin_name)
return importutils.import_class(plugin_name)
except ImportError as e2:
LOG.error(_LE("Error loading plugin by name, %s"), e1)
LOG.error(_LE("Error loading plugin by class, %s"), e2)
raise ImportError(_("Class not found."))
def load_plugin(namespace, plugin_name, *args, **kwargs):
plugin_class = load_class(namespace, plugin_name)
return plugin_class(*args, **kwargs)

View File

@ -44,6 +44,7 @@ karbor.protections =
karbor-volume-protection-plugin = karbor.services.protection.protection_plugins.volume.cinder_protection_plugin:CinderProtectionPlugin
karbor-image-protection-plugin = karbor.services.protection.protection_plugins.image.image_protection_plugin:GlanceProtectionPlugin
karbor-server-protection-plugin = karbor.services.protection.protection_plugins.server.nova_protection_plugin:NovaProtectionPlugin
karbor-noop-protection-plugin = karbor.services.protection.protection_plugins.noop_plugin:NoopProtectionPlugin
karbor.provider =
provider-registry = karbor.services.protection.provider:ProviderRegistry
karbor.protectables =