Application Data Protection as a Service in OpenStack
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager.py 22KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535
  1. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  2. # not use this file except in compliance with the License. You may obtain
  3. # a copy of the License at
  4. #
  5. # http://www.apache.org/licenses/LICENSE-2.0
  6. #
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  9. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  10. # License for the specific language governing permissions and limitations
  11. # under the License.
  12. """
  13. Protection Service
  14. """
  15. from datetime import datetime
  16. from eventlet import greenpool
  17. from eventlet import greenthread
  18. import six
  19. from oslo_config import cfg
  20. from oslo_log import log as logging
  21. import oslo_messaging as messaging
  22. from oslo_utils import uuidutils
  23. from karbor.common import constants
  24. from karbor import exception
  25. from karbor.i18n import _
  26. from karbor import manager
  27. from karbor.resource import Resource
  28. from karbor.services.protection.flows import worker as flow_manager
  29. from karbor.services.protection.protectable_registry import ProtectableRegistry
  30. from karbor import utils
  31. LOG = logging.getLogger(__name__)
  32. protection_manager_opts = [
  33. cfg.StrOpt('provider_registry',
  34. default='provider-registry',
  35. help='the provider registry'),
  36. cfg.IntOpt('max_concurrent_operations',
  37. default=0,
  38. help='number of maximum concurrent operation (protect, restore,'
  39. ' delete) flows. 0 means no hard limit'
  40. )
  41. ]
  42. CONF = cfg.CONF
  43. CONF.register_opts(protection_manager_opts)
  44. PROVIDER_NAMESPACE = 'karbor.provider'
  45. class ProtectionManager(manager.Manager):
  46. """karbor Protection Manager."""
  47. RPC_API_VERSION = '1.0'
  48. target = messaging.Target(version=RPC_API_VERSION)
  49. def __init__(self, service_name=None,
  50. *args, **kwargs):
  51. super(ProtectionManager, self).__init__(*args, **kwargs)
  52. provider_reg = CONF.provider_registry
  53. self.provider_registry = utils.load_plugin(PROVIDER_NAMESPACE,
  54. provider_reg)
  55. self.protectable_registry = ProtectableRegistry()
  56. self.protectable_registry.load_plugins()
  57. self.worker = flow_manager.Worker()
  58. self._greenpool = None
  59. self._greenpool_size = CONF.max_concurrent_operations
  60. if self._greenpool_size != 0:
  61. self._greenpool = greenpool.GreenPool(self._greenpool_size)
  62. def _spawn(self, func, *args, **kwargs):
  63. if self._greenpool is not None:
  64. return self._greenpool.spawn_n(func, *args, **kwargs)
  65. else:
  66. return greenthread.spawn_n(func, *args, **kwargs)
  67. def init_host(self, **kwargs):
  68. """Handle initialization if this is a standalone service"""
  69. # TODO(wangliuan)
  70. LOG.info("Starting protection service")
  71. @messaging.expected_exceptions(exception.InvalidPlan,
  72. exception.ProviderNotFound,
  73. exception.FlowError)
  74. def protect(self, context, plan, checkpoint_properties=None):
  75. """create protection for the given plan
  76. :param plan: Define that protection plan should be done
  77. """
  78. LOG.info("Starting protection service:protect action")
  79. LOG.debug("protecting: %s checkpoint_properties:%s",
  80. plan, checkpoint_properties)
  81. if not plan:
  82. raise exception.InvalidPlan(
  83. reason=_('the protection plan is None'))
  84. provider_id = plan.get('provider_id', None)
  85. plan_id = plan.get('id', None)
  86. provider = self.provider_registry.show_provider(provider_id)
  87. checkpoint_collection = provider.get_checkpoint_collection()
  88. try:
  89. checkpoint = checkpoint_collection.create(plan,
  90. checkpoint_properties,
  91. context=context)
  92. except Exception as e:
  93. LOG.exception("Failed to create checkpoint, plan: %s", plan_id)
  94. exc = exception.FlowError(flow="protect",
  95. error="Error creating checkpoint")
  96. six.raise_from(exc, e)
  97. try:
  98. flow = self.worker.get_flow(
  99. context=context,
  100. protectable_registry=self.protectable_registry,
  101. operation_type=constants.OPERATION_PROTECT,
  102. plan=plan,
  103. provider=provider,
  104. checkpoint=checkpoint)
  105. except Exception as e:
  106. LOG.exception("Failed to create protection flow, plan: %s",
  107. plan_id)
  108. raise exception.FlowError(
  109. flow="protect",
  110. error=e.msg if hasattr(e, 'msg') else 'Internal error')
  111. self._spawn(self.worker.run_flow, flow)
  112. return checkpoint.id
  113. @messaging.expected_exceptions(exception.InvalidPlan,
  114. exception.ProviderNotFound,
  115. exception.FlowError)
  116. def copy(self, context, plan):
  117. """create copy of checkpoint for the given plan
  118. :param plan: Define that protection plan should be done
  119. """
  120. LOG.info("Starting protection service:copy action.")
  121. LOG.debug("Creating the checkpoint copy for the plan: %s", plan)
  122. if not plan:
  123. raise exception.InvalidPlan(
  124. reason=_('The protection plan is None'))
  125. provider_id = plan.get('provider_id', None)
  126. plan_id = plan.get('id', None)
  127. provider = self.provider_registry.show_provider(provider_id)
  128. checkpoints = None
  129. checkpoint_collection = provider.get_checkpoint_collection()
  130. try:
  131. checkpoints = self.list_checkpoints(context, provider_id,
  132. filters={'plan_id': plan_id})
  133. except Exception as e:
  134. LOG.exception("Failed to get checkpoints for the plan: %s",
  135. plan_id)
  136. exc = exception.FlowError(flow="copy",
  137. error="Failed to get checkpoints")
  138. six.raise_from(exc, e)
  139. try:
  140. flow, checkpoint_copy = self.worker.get_flow(
  141. context=context,
  142. protectable_registry=self.protectable_registry,
  143. operation_type=constants.OPERATION_COPY,
  144. plan=plan,
  145. provider=provider,
  146. checkpoint=checkpoints,
  147. checkpoint_collection=checkpoint_collection)
  148. except Exception as e:
  149. LOG.exception("Failed to create copy flow, plan: %s",
  150. plan_id)
  151. raise exception.FlowError(
  152. flow="copy",
  153. error=e.msg if hasattr(e, 'msg') else 'Internal error')
  154. self._spawn(self.worker.run_flow, flow)
  155. return checkpoint_copy
  156. @messaging.expected_exceptions(exception.ProviderNotFound,
  157. exception.CheckpointNotFound,
  158. exception.CheckpointNotAvailable,
  159. exception.FlowError,
  160. exception.InvalidInput)
  161. def restore(self, context, restore, restore_auth):
  162. LOG.info("Starting restore service:restore action")
  163. checkpoint_id = restore["checkpoint_id"]
  164. provider_id = restore["provider_id"]
  165. provider = self.provider_registry.show_provider(provider_id)
  166. if not provider:
  167. raise exception.ProviderNotFound(provider_id=provider_id)
  168. self.validate_restore_parameters(restore, provider)
  169. checkpoint_collection = provider.get_checkpoint_collection()
  170. checkpoint = checkpoint_collection.get(checkpoint_id)
  171. if checkpoint.status != constants.CHECKPOINT_STATUS_AVAILABLE:
  172. raise exception.CheckpointNotAvailable(
  173. checkpoint_id=checkpoint_id)
  174. try:
  175. flow = self.worker.get_flow(
  176. context=context,
  177. operation_type=constants.OPERATION_RESTORE,
  178. checkpoint=checkpoint,
  179. provider=provider,
  180. restore=restore,
  181. restore_auth=restore_auth)
  182. except Exception:
  183. LOG.exception("Failed to create restore flow checkpoint: %s",
  184. checkpoint_id)
  185. raise exception.FlowError(
  186. flow="restore",
  187. error=_("Failed to create flow"))
  188. self._spawn(self.worker.run_flow, flow)
  189. @messaging.expected_exceptions(exception.ProviderNotFound,
  190. exception.CheckpointNotFound,
  191. exception.CheckpointNotAvailable,
  192. exception.FlowError,
  193. exception.InvalidInput)
  194. def verification(self, context, verification):
  195. LOG.info("Starting verify service:verify action")
  196. checkpoint_id = verification["checkpoint_id"]
  197. provider_id = verification["provider_id"]
  198. provider = self.provider_registry.show_provider(provider_id)
  199. if not provider:
  200. raise exception.ProviderNotFound(provider_id=provider_id)
  201. self.validate_verify_parameters(verification, provider)
  202. checkpoint_collection = provider.get_checkpoint_collection()
  203. checkpoint = checkpoint_collection.get(checkpoint_id)
  204. if checkpoint.status != constants.CHECKPOINT_STATUS_AVAILABLE:
  205. raise exception.CheckpointNotAvailable(
  206. checkpoint_id=checkpoint_id)
  207. try:
  208. flow = self.worker.get_flow(
  209. context=context,
  210. operation_type=constants.OPERATION_VERIFY,
  211. checkpoint=checkpoint,
  212. provider=provider,
  213. verify=verification)
  214. except Exception:
  215. LOG.exception("Failed to create verify flow checkpoint: %s",
  216. checkpoint_id)
  217. raise exception.FlowError(
  218. flow="verify",
  219. error=_("Failed to create flow"))
  220. self._spawn(self.worker.run_flow, flow)
  221. def validate_restore_parameters(self, restore, provider):
  222. parameters = restore["parameters"]
  223. if not parameters:
  224. return
  225. restore_schema = provider.extended_info_schema.get(
  226. "restore_schema", None)
  227. if restore_schema is None:
  228. msg = _("The restore schema of plugin must be provided.")
  229. raise exception.InvalidInput(reason=msg)
  230. for resource_key, parameter_value in parameters.items():
  231. if "#" in resource_key:
  232. resource_type, resource_id = resource_key.split("#")
  233. if not uuidutils.is_uuid_like(resource_id):
  234. msg = _("The resource_id must be a uuid.")
  235. raise exception.InvalidInput(reason=msg)
  236. else:
  237. resource_type = resource_key
  238. if (resource_type not in constants.RESOURCE_TYPES) or (
  239. resource_type not in restore_schema):
  240. msg = _("The key of restore parameters is invalid.")
  241. raise exception.InvalidInput(reason=msg)
  242. properties = restore_schema[resource_type]["properties"]
  243. if not set(parameter_value.keys()).issubset(
  244. set(properties.keys())):
  245. msg = _("The restore property of restore parameters "
  246. "is invalid.")
  247. raise exception.InvalidInput(reason=msg)
  248. def validate_verify_parameters(self, verify, provider):
  249. parameters = verify["parameters"]
  250. if not parameters:
  251. return
  252. verify_schema = provider.extended_info_schema.get(
  253. "verify_schema", None)
  254. if verify_schema is None:
  255. msg = _("The verify schema of plugin must be provided.")
  256. raise exception.InvalidInput(reason=msg)
  257. for resource_key, parameter_value in parameters.items():
  258. if "#" in resource_key:
  259. resource_type, resource_id = resource_key.split("#")
  260. if not uuidutils.is_uuid_like(resource_id):
  261. msg = _("The resource_id must be a uuid.")
  262. raise exception.InvalidInput(reason=msg)
  263. else:
  264. resource_type = resource_key
  265. if (resource_type not in constants.RESOURCE_TYPES) or (
  266. resource_type not in verify_schema):
  267. msg = _("The key of verify parameters is invalid.")
  268. raise exception.InvalidInput(reason=msg)
  269. properties = verify_schema[resource_type]["properties"]
  270. if not set(parameter_value.keys()).issubset(
  271. set(properties.keys())):
  272. msg = _("The verify property of verify parameters "
  273. "is invalid.")
  274. raise exception.InvalidInput(reason=msg)
  275. @messaging.expected_exceptions(exception.DeleteCheckpointNotAllowed)
  276. def delete(self, context, provider_id, checkpoint_id):
  277. LOG.info("Starting protection service:delete action")
  278. LOG.debug('provider_id :%s checkpoint_id:%s', provider_id,
  279. checkpoint_id)
  280. provider = self.provider_registry.show_provider(provider_id)
  281. try:
  282. checkpoint_collection = provider.get_checkpoint_collection()
  283. checkpoint = checkpoint_collection.get(checkpoint_id,
  284. context=context)
  285. except Exception:
  286. LOG.error("get checkpoint failed, checkpoint_id:%s",
  287. checkpoint_id)
  288. raise exception.InvalidInput(
  289. reason=_("Invalid checkpoint_id or provider_id"))
  290. checkpoint_dict = checkpoint.to_dict()
  291. if not context.is_admin and (
  292. context.project_id != checkpoint_dict['project_id']):
  293. LOG.warn("Delete checkpoint(%s) is not allowed." % checkpoint_id)
  294. raise exception.DeleteCheckpointNotAllowed(
  295. checkpoint_id=checkpoint_id)
  296. if checkpoint.status not in [
  297. constants.CHECKPOINT_STATUS_AVAILABLE,
  298. constants.CHECKPOINT_STATUS_ERROR,
  299. ]:
  300. raise exception.CheckpointNotBeDeleted(
  301. checkpoint_id=checkpoint_id)
  302. checkpoint.status = constants.CHECKPOINT_STATUS_DELETING
  303. checkpoint.commit()
  304. try:
  305. flow = self.worker.get_flow(
  306. context=context,
  307. operation_type=constants.OPERATION_DELETE,
  308. checkpoint=checkpoint,
  309. provider=provider)
  310. except Exception:
  311. LOG.exception("Failed to create delete checkpoint flow,"
  312. "checkpoint:%s.", checkpoint_id)
  313. raise exception.KarborException(_(
  314. "Failed to create delete checkpoint flow."
  315. ))
  316. self._spawn(self.worker.run_flow, flow)
  317. def start(self, plan):
  318. # TODO(wangliuan)
  319. pass
  320. def suspend(self, plan):
  321. # TODO(wangliuan)
  322. pass
  323. @messaging.expected_exceptions(exception.ProviderNotFound,
  324. exception.CheckpointNotFound,
  325. exception.BankListObjectsFailed)
  326. def list_checkpoints(self, context, provider_id, marker=None, limit=None,
  327. sort_keys=None, sort_dirs=None, filters=None,
  328. all_tenants=False):
  329. LOG.info("Starting list checkpoints. provider_id:%s", provider_id)
  330. plan_id = filters.get("plan_id", None)
  331. start_date = None
  332. end_date = None
  333. if filters.get("start_date", None):
  334. start_date = datetime.strptime(
  335. filters.get("start_date"), "%Y-%m-%d")
  336. if filters.get("end_date", None):
  337. end_date = datetime.strptime(
  338. filters.get("end_date"), "%Y-%m-%d")
  339. sort_dir = None if sort_dirs is None else sort_dirs[0]
  340. provider = self.provider_registry.show_provider(provider_id)
  341. project_id = context.project_id
  342. checkpoint_ids = provider.list_checkpoints(
  343. project_id, provider_id, limit=limit, marker=marker,
  344. plan_id=plan_id, start_date=start_date, end_date=end_date,
  345. sort_dir=sort_dir, context=context, all_tenants=all_tenants)
  346. checkpoints = []
  347. for checkpoint_id in checkpoint_ids:
  348. checkpoint = provider.get_checkpoint(checkpoint_id,
  349. context=context)
  350. checkpoints.append(checkpoint.to_dict())
  351. return checkpoints
  352. @messaging.expected_exceptions(exception.ProviderNotFound,
  353. exception.CheckpointNotFound,
  354. exception.AccessCheckpointNotAllowed)
  355. def show_checkpoint(self, context, provider_id, checkpoint_id):
  356. provider = self.provider_registry.show_provider(provider_id)
  357. checkpoint = provider.get_checkpoint(checkpoint_id, context=context)
  358. checkpoint_dict = checkpoint.to_dict()
  359. if not context.is_admin and (
  360. context.project_id != checkpoint_dict['project_id']):
  361. raise exception.AccessCheckpointNotAllowed(
  362. checkpoint_id=checkpoint_id)
  363. return checkpoint_dict
  364. def list_protectable_types(self, context):
  365. LOG.info("Start to list protectable types.")
  366. return self.protectable_registry.list_resource_types()
  367. @messaging.expected_exceptions(exception.ProtectableTypeNotFound)
  368. def show_protectable_type(self, context, protectable_type):
  369. LOG.info("Start to show protectable type %s", protectable_type)
  370. plugin = self.protectable_registry.get_protectable_resource_plugin(
  371. protectable_type)
  372. if not plugin:
  373. raise exception.ProtectableTypeNotFound(
  374. protectable_type=protectable_type)
  375. dependents = []
  376. for t in self.protectable_registry.list_resource_types():
  377. if t == protectable_type:
  378. continue
  379. p = self.protectable_registry.get_protectable_resource_plugin(t)
  380. if p and protectable_type in p.get_parent_resource_types():
  381. dependents.append(t)
  382. return {
  383. 'name': plugin.get_resource_type(),
  384. "dependent_types": dependents
  385. }
  386. @messaging.expected_exceptions(exception.ListProtectableResourceFailed)
  387. def list_protectable_instances(self, context,
  388. protectable_type=None,
  389. marker=None,
  390. limit=None,
  391. sort_keys=None,
  392. sort_dirs=None,
  393. filters=None,
  394. parameters=None):
  395. LOG.info("Start to list protectable instances of type: %s",
  396. protectable_type)
  397. try:
  398. resource_instances = self.protectable_registry.list_resources(
  399. context, protectable_type, parameters)
  400. except exception.ListProtectableResourceFailed as err:
  401. LOG.error("List resources of type %(type)s failed: %(err)s",
  402. {'type': protectable_type, 'err': six.text_type(err)})
  403. raise
  404. result = []
  405. for resource in resource_instances:
  406. result.append(dict(id=resource.id, name=resource.name,
  407. extra_info=resource.extra_info))
  408. return result
  409. @messaging.expected_exceptions(exception.ListProtectableResourceFailed)
  410. def show_protectable_instance(self, context, protectable_type,
  411. protectable_id, parameters=None):
  412. LOG.info("Start to show protectable instance of type: %s",
  413. protectable_type)
  414. registry = self.protectable_registry
  415. try:
  416. resource_instance = registry.show_resource(
  417. context,
  418. protectable_type,
  419. protectable_id,
  420. parameters=parameters
  421. )
  422. except exception.ListProtectableResourceFailed as err:
  423. LOG.error("Show resources of type %(type)s id %(id)s "
  424. "failed: %(err)s",
  425. {'type': protectable_type,
  426. 'id': protectable_id,
  427. 'err': six.text_type(err)})
  428. raise
  429. return resource_instance.to_dict() if resource_instance else None
  430. @messaging.expected_exceptions(exception.ListProtectableResourceFailed)
  431. def list_protectable_dependents(self, context,
  432. protectable_id,
  433. protectable_type,
  434. protectable_name):
  435. LOG.info("Start to list dependents of resource (type:%(type)s, "
  436. "id:%(id)s, name:%(name)s)",
  437. {'type': protectable_type,
  438. 'id': protectable_id,
  439. 'name': protectable_name})
  440. parent_resource = Resource(type=protectable_type, id=protectable_id,
  441. name=protectable_name)
  442. registry = self.protectable_registry
  443. try:
  444. dependent_resources = registry.fetch_dependent_resources(
  445. context, parent_resource)
  446. except exception.ListProtectableResourceFailed as err:
  447. LOG.error("List dependent resources of (%(res)s) failed: %(err)s",
  448. {'res': parent_resource,
  449. 'err': six.text_type(err)})
  450. raise
  451. return [resource.to_dict() for resource in dependent_resources]
  452. def list_providers(self, context, marker=None, limit=None,
  453. sort_keys=None, sort_dirs=None, filters=None):
  454. return self.provider_registry.list_providers(marker=marker,
  455. limit=limit,
  456. sort_keys=sort_keys,
  457. sort_dirs=sort_dirs,
  458. filters=filters)
  459. @messaging.expected_exceptions(exception.ProviderNotFound)
  460. def show_provider(self, context, provider_id):
  461. provider = self.provider_registry.show_provider(provider_id)
  462. response = {'id': provider.id,
  463. 'name': provider.name,
  464. 'description': provider.description,
  465. 'extended_info_schema': provider.extended_info_schema,
  466. }
  467. return response