542 lines
22 KiB
Python
542 lines
22 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
Protection Service
|
|
"""
|
|
|
|
from datetime import datetime
|
|
from eventlet import greenpool
|
|
from eventlet import greenthread
|
|
import six
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
import oslo_messaging as messaging
|
|
|
|
from oslo_utils import uuidutils
|
|
|
|
from karbor.common import constants
|
|
from karbor import exception
|
|
from karbor.i18n import _
|
|
from karbor import manager
|
|
from karbor.resource import Resource
|
|
from karbor.services.protection.flows import worker as flow_manager
|
|
from karbor.services.protection.protectable_registry import ProtectableRegistry
|
|
from karbor import utils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
protection_manager_opts = [
|
|
cfg.StrOpt('provider_registry',
|
|
default='provider-registry',
|
|
help='the provider registry'),
|
|
cfg.IntOpt('max_concurrent_operations',
|
|
default=0,
|
|
help='number of maximum concurrent operation (protect, restore,'
|
|
' delete) flows. 0 means no hard limit'
|
|
)
|
|
]
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(protection_manager_opts)
|
|
|
|
PROVIDER_NAMESPACE = 'karbor.provider'
|
|
|
|
|
|
class ProtectionManager(manager.Manager):
|
|
"""karbor Protection Manager."""
|
|
|
|
RPC_API_VERSION = '1.0'
|
|
|
|
target = messaging.Target(version=RPC_API_VERSION)
|
|
|
|
def __init__(self, service_name=None,
|
|
*args, **kwargs):
|
|
super(ProtectionManager, self).__init__(*args, **kwargs)
|
|
provider_reg = CONF.provider_registry
|
|
self.provider_registry = utils.load_plugin(PROVIDER_NAMESPACE,
|
|
provider_reg)
|
|
self.protectable_registry = ProtectableRegistry()
|
|
self.protectable_registry.load_plugins()
|
|
self.worker = flow_manager.Worker()
|
|
self._greenpool = None
|
|
self._greenpool_size = CONF.max_concurrent_operations
|
|
if self._greenpool_size != 0:
|
|
self._greenpool = greenpool.GreenPool(self._greenpool_size)
|
|
|
|
def _spawn(self, func, *args, **kwargs):
|
|
if self._greenpool is not None:
|
|
return self._greenpool.spawn_n(func, *args, **kwargs)
|
|
else:
|
|
return greenthread.spawn_n(func, *args, **kwargs)
|
|
|
|
def init_host(self, **kwargs):
|
|
"""Handle initialization if this is a standalone service"""
|
|
# TODO(wangliuan)
|
|
LOG.info("Starting protection service")
|
|
|
|
@messaging.expected_exceptions(exception.InvalidPlan,
|
|
exception.ProviderNotFound,
|
|
exception.FlowError)
|
|
def protect(self, context, plan, checkpoint_properties=None):
|
|
"""create protection for the given plan
|
|
|
|
:param plan: Define that protection plan should be done
|
|
"""
|
|
|
|
LOG.info("Starting protection service:protect action")
|
|
LOG.debug("protecting: %s checkpoint_properties:%s",
|
|
plan, checkpoint_properties)
|
|
|
|
if not plan:
|
|
raise exception.InvalidPlan(
|
|
reason=_('the protection plan is None'))
|
|
provider_id = plan.get('provider_id', None)
|
|
plan_id = plan.get('id', None)
|
|
provider = self.provider_registry.show_provider(provider_id)
|
|
checkpoint_collection = provider.get_checkpoint_collection()
|
|
try:
|
|
checkpoint = checkpoint_collection.create(plan,
|
|
checkpoint_properties,
|
|
context=context)
|
|
except Exception as e:
|
|
LOG.exception("Failed to create checkpoint, plan: %s", plan_id)
|
|
exc = exception.FlowError(flow="protect",
|
|
error="Error creating checkpoint")
|
|
six.raise_from(exc, e)
|
|
try:
|
|
flow = self.worker.get_flow(
|
|
context=context,
|
|
protectable_registry=self.protectable_registry,
|
|
operation_type=constants.OPERATION_PROTECT,
|
|
plan=plan,
|
|
provider=provider,
|
|
checkpoint=checkpoint)
|
|
except Exception as e:
|
|
LOG.exception("Failed to create protection flow, plan: %s",
|
|
plan_id)
|
|
raise exception.FlowError(
|
|
flow="protect",
|
|
error=e.msg if hasattr(e, 'msg') else 'Internal error')
|
|
self._spawn(self.worker.run_flow, flow)
|
|
return checkpoint.id
|
|
|
|
@messaging.expected_exceptions(exception.InvalidPlan,
|
|
exception.ProviderNotFound,
|
|
exception.FlowError)
|
|
def copy(self, context, plan):
|
|
"""create copy of checkpoint for the given plan
|
|
|
|
:param plan: Define that protection plan should be done
|
|
"""
|
|
|
|
LOG.info("Starting protection service:copy action.")
|
|
LOG.debug("Creating the checkpoint copy for the plan: %s", plan)
|
|
|
|
if not plan:
|
|
raise exception.InvalidPlan(
|
|
reason=_('The protection plan is None'))
|
|
provider_id = plan.get('provider_id', None)
|
|
plan_id = plan.get('id', None)
|
|
provider = self.provider_registry.show_provider(provider_id)
|
|
checkpoints = None
|
|
checkpoint_collection = provider.get_checkpoint_collection()
|
|
try:
|
|
checkpoints = self.list_checkpoints(context, provider_id,
|
|
filters={'plan_id': plan_id})
|
|
except Exception as e:
|
|
LOG.exception("Failed to get checkpoints for the plan: %s",
|
|
plan_id)
|
|
exc = exception.FlowError(flow="copy",
|
|
error="Failed to get checkpoints")
|
|
six.raise_from(exc, e)
|
|
try:
|
|
flow, checkpoint_copy = self.worker.get_flow(
|
|
context=context,
|
|
protectable_registry=self.protectable_registry,
|
|
operation_type=constants.OPERATION_COPY,
|
|
plan=plan,
|
|
provider=provider,
|
|
checkpoint=checkpoints,
|
|
checkpoint_collection=checkpoint_collection)
|
|
except Exception as e:
|
|
LOG.exception("Failed to create copy flow, plan: %s",
|
|
plan_id)
|
|
raise exception.FlowError(
|
|
flow="copy",
|
|
error=e.msg if hasattr(e, 'msg') else 'Internal error')
|
|
self._spawn(self.worker.run_flow, flow)
|
|
return checkpoint_copy
|
|
|
|
@messaging.expected_exceptions(exception.ProviderNotFound,
|
|
exception.CheckpointNotFound,
|
|
exception.CheckpointNotAvailable,
|
|
exception.FlowError,
|
|
exception.InvalidInput)
|
|
def restore(self, context, restore, restore_auth):
|
|
LOG.info("Starting restore service:restore action")
|
|
|
|
checkpoint_id = restore["checkpoint_id"]
|
|
provider_id = restore["provider_id"]
|
|
provider = self.provider_registry.show_provider(provider_id)
|
|
if not provider:
|
|
raise exception.ProviderNotFound(provider_id=provider_id)
|
|
|
|
self.validate_restore_parameters(restore, provider)
|
|
|
|
checkpoint_collection = provider.get_checkpoint_collection()
|
|
checkpoint = checkpoint_collection.get(checkpoint_id)
|
|
|
|
if checkpoint.status != constants.CHECKPOINT_STATUS_AVAILABLE:
|
|
raise exception.CheckpointNotAvailable(
|
|
checkpoint_id=checkpoint_id)
|
|
|
|
try:
|
|
flow = self.worker.get_flow(
|
|
context=context,
|
|
operation_type=constants.OPERATION_RESTORE,
|
|
checkpoint=checkpoint,
|
|
provider=provider,
|
|
restore=restore,
|
|
restore_auth=restore_auth)
|
|
except Exception:
|
|
LOG.exception("Failed to create restore flow checkpoint: %s",
|
|
checkpoint_id)
|
|
raise exception.FlowError(
|
|
flow="restore",
|
|
error=_("Failed to create flow"))
|
|
self._spawn(self.worker.run_flow, flow)
|
|
|
|
@messaging.expected_exceptions(exception.ProviderNotFound,
|
|
exception.CheckpointNotFound,
|
|
exception.CheckpointNotAvailable,
|
|
exception.FlowError,
|
|
exception.InvalidInput)
|
|
def verification(self, context, verification):
|
|
LOG.info("Starting verify service:verify action")
|
|
|
|
checkpoint_id = verification["checkpoint_id"]
|
|
provider_id = verification["provider_id"]
|
|
provider = self.provider_registry.show_provider(provider_id)
|
|
if not provider:
|
|
raise exception.ProviderNotFound(provider_id=provider_id)
|
|
|
|
self.validate_verify_parameters(verification, provider)
|
|
|
|
checkpoint_collection = provider.get_checkpoint_collection()
|
|
checkpoint = checkpoint_collection.get(checkpoint_id)
|
|
|
|
if checkpoint.status != constants.CHECKPOINT_STATUS_AVAILABLE:
|
|
raise exception.CheckpointNotAvailable(
|
|
checkpoint_id=checkpoint_id)
|
|
|
|
try:
|
|
flow = self.worker.get_flow(
|
|
context=context,
|
|
operation_type=constants.OPERATION_VERIFY,
|
|
checkpoint=checkpoint,
|
|
provider=provider,
|
|
verify=verification)
|
|
except Exception:
|
|
LOG.exception("Failed to create verify flow checkpoint: %s",
|
|
checkpoint_id)
|
|
raise exception.FlowError(
|
|
flow="verify",
|
|
error=_("Failed to create flow"))
|
|
self._spawn(self.worker.run_flow, flow)
|
|
|
|
def validate_restore_parameters(self, restore, provider):
|
|
parameters = restore["parameters"]
|
|
if not parameters:
|
|
return
|
|
restore_schema = provider.extended_info_schema.get(
|
|
"restore_schema", None)
|
|
if restore_schema is None:
|
|
msg = _("The restore schema of plugin must be provided.")
|
|
raise exception.InvalidInput(reason=msg)
|
|
for resource_key, parameter_value in parameters.items():
|
|
if "#" in resource_key:
|
|
resource_type, resource_id = resource_key.split("#")
|
|
if not uuidutils.is_uuid_like(resource_id):
|
|
msg = _("The resource_id must be a uuid.")
|
|
raise exception.InvalidInput(reason=msg)
|
|
else:
|
|
resource_type = resource_key
|
|
if (resource_type not in constants.RESOURCE_TYPES) or (
|
|
resource_type not in restore_schema):
|
|
msg = _("The key of restore parameters is invalid.")
|
|
raise exception.InvalidInput(reason=msg)
|
|
properties = restore_schema[resource_type]["properties"]
|
|
if not set(parameter_value.keys()).issubset(
|
|
set(properties.keys())):
|
|
msg = _("The restore property of restore parameters "
|
|
"is invalid.")
|
|
raise exception.InvalidInput(reason=msg)
|
|
|
|
def validate_verify_parameters(self, verify, provider):
|
|
parameters = verify["parameters"]
|
|
if not parameters:
|
|
return
|
|
verify_schema = provider.extended_info_schema.get(
|
|
"verify_schema", None)
|
|
if verify_schema is None:
|
|
msg = _("The verify schema of plugin must be provided.")
|
|
raise exception.InvalidInput(reason=msg)
|
|
for resource_key, parameter_value in parameters.items():
|
|
if "#" in resource_key:
|
|
resource_type, resource_id = resource_key.split("#")
|
|
if not uuidutils.is_uuid_like(resource_id):
|
|
msg = _("The resource_id must be a uuid.")
|
|
raise exception.InvalidInput(reason=msg)
|
|
else:
|
|
resource_type = resource_key
|
|
if (resource_type not in constants.RESOURCE_TYPES) or (
|
|
resource_type not in verify_schema):
|
|
msg = _("The key of verify parameters is invalid.")
|
|
raise exception.InvalidInput(reason=msg)
|
|
properties = verify_schema[resource_type]["properties"]
|
|
if not set(parameter_value.keys()).issubset(
|
|
set(properties.keys())):
|
|
msg = _("The verify property of verify parameters "
|
|
"is invalid.")
|
|
raise exception.InvalidInput(reason=msg)
|
|
|
|
@messaging.expected_exceptions(exception.DeleteCheckpointNotAllowed)
|
|
def delete(self, context, provider_id, checkpoint_id):
|
|
LOG.info("Starting protection service:delete action")
|
|
LOG.debug('provider_id :%s checkpoint_id:%s', provider_id,
|
|
checkpoint_id)
|
|
provider = self.provider_registry.show_provider(provider_id)
|
|
try:
|
|
checkpoint_collection = provider.get_checkpoint_collection()
|
|
checkpoint = checkpoint_collection.get(checkpoint_id,
|
|
context=context)
|
|
except Exception:
|
|
LOG.error("get checkpoint failed, checkpoint_id:%s",
|
|
checkpoint_id)
|
|
raise exception.InvalidInput(
|
|
reason=_("Invalid checkpoint_id or provider_id"))
|
|
|
|
checkpoint_dict = checkpoint.to_dict()
|
|
if not context.is_admin and (
|
|
context.project_id != checkpoint_dict['project_id']):
|
|
LOG.warn("Delete checkpoint(%s) is not allowed." % checkpoint_id)
|
|
raise exception.DeleteCheckpointNotAllowed(
|
|
checkpoint_id=checkpoint_id)
|
|
|
|
if checkpoint.status not in [
|
|
constants.CHECKPOINT_STATUS_AVAILABLE,
|
|
constants.CHECKPOINT_STATUS_ERROR,
|
|
]:
|
|
raise exception.CheckpointNotBeDeleted(
|
|
checkpoint_id=checkpoint_id)
|
|
checkpoint.status = constants.CHECKPOINT_STATUS_DELETING
|
|
checkpoint.commit()
|
|
|
|
try:
|
|
flow = self.worker.get_flow(
|
|
context=context,
|
|
operation_type=constants.OPERATION_DELETE,
|
|
checkpoint=checkpoint,
|
|
provider=provider)
|
|
except Exception:
|
|
LOG.exception("Failed to create delete checkpoint flow,"
|
|
"checkpoint:%s.", checkpoint_id)
|
|
raise exception.KarborException(_(
|
|
"Failed to create delete checkpoint flow."
|
|
))
|
|
self._spawn(self.worker.run_flow, flow)
|
|
|
|
def start(self, plan):
|
|
# TODO(wangliuan)
|
|
pass
|
|
|
|
def suspend(self, plan):
|
|
# TODO(wangliuan)
|
|
pass
|
|
|
|
@messaging.expected_exceptions(exception.ProviderNotFound,
|
|
exception.CheckpointNotFound,
|
|
exception.BankListObjectsFailed)
|
|
def list_checkpoints(self, context, provider_id, marker=None, limit=None,
|
|
sort_keys=None, sort_dirs=None, filters=None,
|
|
all_tenants=False):
|
|
LOG.info("Starting list checkpoints. provider_id:%s", provider_id)
|
|
plan_id = filters.get("plan_id", None)
|
|
start_date = None
|
|
end_date = None
|
|
if filters.get("start_date", None):
|
|
start_date = datetime.strptime(
|
|
filters.get("start_date"), "%Y-%m-%d")
|
|
if filters.get("end_date", None):
|
|
end_date = datetime.strptime(
|
|
filters.get("end_date"), "%Y-%m-%d")
|
|
|
|
sort_dir = None if sort_dirs is None else sort_dirs[0]
|
|
provider = self.provider_registry.show_provider(provider_id)
|
|
|
|
if filters.get('project_id', None) and all_tenants:
|
|
project_id = filters.get('project_id')
|
|
all_tenants = False
|
|
else:
|
|
project_id = context.project_id
|
|
|
|
checkpoint_ids = provider.list_checkpoints(
|
|
project_id, provider_id, limit=limit, marker=marker,
|
|
plan_id=plan_id, start_date=start_date, end_date=end_date,
|
|
sort_dir=sort_dir, context=context, all_tenants=all_tenants)
|
|
checkpoints = []
|
|
for checkpoint_id in checkpoint_ids:
|
|
checkpoint = provider.get_checkpoint(checkpoint_id,
|
|
context=context)
|
|
checkpoints.append(checkpoint.to_dict())
|
|
return checkpoints
|
|
|
|
@messaging.expected_exceptions(exception.ProviderNotFound,
|
|
exception.CheckpointNotFound,
|
|
exception.AccessCheckpointNotAllowed)
|
|
def show_checkpoint(self, context, provider_id, checkpoint_id):
|
|
provider = self.provider_registry.show_provider(provider_id)
|
|
|
|
checkpoint = provider.get_checkpoint(checkpoint_id, context=context)
|
|
checkpoint_dict = checkpoint.to_dict()
|
|
if not context.is_admin and (
|
|
context.project_id != checkpoint_dict['project_id']):
|
|
raise exception.AccessCheckpointNotAllowed(
|
|
checkpoint_id=checkpoint_id)
|
|
return checkpoint_dict
|
|
|
|
def list_protectable_types(self, context):
|
|
LOG.info("Start to list protectable types.")
|
|
return self.protectable_registry.list_resource_types()
|
|
|
|
@messaging.expected_exceptions(exception.ProtectableTypeNotFound)
|
|
def show_protectable_type(self, context, protectable_type):
|
|
LOG.info("Start to show protectable type %s", protectable_type)
|
|
|
|
plugin = self.protectable_registry.get_protectable_resource_plugin(
|
|
protectable_type)
|
|
if not plugin:
|
|
raise exception.ProtectableTypeNotFound(
|
|
protectable_type=protectable_type)
|
|
|
|
dependents = []
|
|
for t in self.protectable_registry.list_resource_types():
|
|
if t == protectable_type:
|
|
continue
|
|
|
|
p = self.protectable_registry.get_protectable_resource_plugin(t)
|
|
if p and protectable_type in p.get_parent_resource_types():
|
|
dependents.append(t)
|
|
|
|
return {
|
|
'name': plugin.get_resource_type(),
|
|
"dependent_types": dependents
|
|
}
|
|
|
|
@messaging.expected_exceptions(exception.ListProtectableResourceFailed)
|
|
def list_protectable_instances(self, context,
|
|
protectable_type=None,
|
|
marker=None,
|
|
limit=None,
|
|
sort_keys=None,
|
|
sort_dirs=None,
|
|
filters=None,
|
|
parameters=None):
|
|
|
|
LOG.info("Start to list protectable instances of type: %s",
|
|
protectable_type)
|
|
|
|
try:
|
|
resource_instances = self.protectable_registry.list_resources(
|
|
context, protectable_type, parameters)
|
|
except exception.ListProtectableResourceFailed as err:
|
|
LOG.error("List resources of type %(type)s failed: %(err)s",
|
|
{'type': protectable_type, 'err': six.text_type(err)})
|
|
raise
|
|
|
|
result = []
|
|
for resource in resource_instances:
|
|
result.append(dict(id=resource.id, name=resource.name,
|
|
extra_info=resource.extra_info))
|
|
|
|
return result
|
|
|
|
@messaging.expected_exceptions(exception.ListProtectableResourceFailed)
|
|
def show_protectable_instance(self, context, protectable_type,
|
|
protectable_id, parameters=None):
|
|
LOG.info("Start to show protectable instance of type: %s",
|
|
protectable_type)
|
|
|
|
registry = self.protectable_registry
|
|
try:
|
|
resource_instance = registry.show_resource(
|
|
context,
|
|
protectable_type,
|
|
protectable_id,
|
|
parameters=parameters
|
|
)
|
|
except exception.ListProtectableResourceFailed as err:
|
|
LOG.error("Show resources of type %(type)s id %(id)s "
|
|
"failed: %(err)s",
|
|
{'type': protectable_type,
|
|
'id': protectable_id,
|
|
'err': six.text_type(err)})
|
|
raise
|
|
|
|
return resource_instance.to_dict() if resource_instance else None
|
|
|
|
@messaging.expected_exceptions(exception.ListProtectableResourceFailed)
|
|
def list_protectable_dependents(self, context,
|
|
protectable_id,
|
|
protectable_type,
|
|
protectable_name):
|
|
LOG.info("Start to list dependents of resource (type:%(type)s, "
|
|
"id:%(id)s, name:%(name)s)",
|
|
{'type': protectable_type,
|
|
'id': protectable_id,
|
|
'name': protectable_name})
|
|
|
|
parent_resource = Resource(type=protectable_type, id=protectable_id,
|
|
name=protectable_name)
|
|
|
|
registry = self.protectable_registry
|
|
try:
|
|
dependent_resources = registry.fetch_dependent_resources(
|
|
context, parent_resource)
|
|
except exception.ListProtectableResourceFailed as err:
|
|
LOG.error("List dependent resources of (%(res)s) failed: %(err)s",
|
|
{'res': parent_resource,
|
|
'err': six.text_type(err)})
|
|
raise
|
|
|
|
return [resource.to_dict() for resource in dependent_resources]
|
|
|
|
def list_providers(self, context, marker=None, limit=None,
|
|
sort_keys=None, sort_dirs=None, filters=None):
|
|
return self.provider_registry.list_providers(marker=marker,
|
|
limit=limit,
|
|
sort_keys=sort_keys,
|
|
sort_dirs=sort_dirs,
|
|
filters=filters)
|
|
|
|
@messaging.expected_exceptions(exception.ProviderNotFound)
|
|
def show_provider(self, context, provider_id):
|
|
provider = self.provider_registry.show_provider(provider_id)
|
|
response = {'id': provider.id,
|
|
'name': provider.name,
|
|
'description': provider.description,
|
|
'extended_info_schema': provider.extended_info_schema,
|
|
}
|
|
return response
|