OpenStack Block Storage (Cinder)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

603 lines
25KB

  1. # Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
  2. # Copyright (c) 2014 TrilioData, Inc
  3. # Copyright (c) 2015 EMC Corporation
  4. # All Rights Reserved.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  7. # not use this file except in compliance with the License. You may obtain
  8. # a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  14. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  15. # License for the specific language governing permissions and limitations
  16. # under the License.
  17. """
  18. Handles all requests relating to the volume backups service.
  19. """
  20. from datetime import datetime
  21. from eventlet import greenthread
  22. from oslo_config import cfg
  23. from oslo_log import log as logging
  24. from oslo_utils import excutils
  25. from oslo_utils import strutils
  26. from pytz import timezone
  27. import random
  28. from cinder.backup import rpcapi as backup_rpcapi
  29. from cinder.common import constants
  30. from cinder import context
  31. from cinder.db import base
  32. from cinder import exception
  33. from cinder.i18n import _
  34. from cinder import objects
  35. from cinder.objects import fields
  36. from cinder.policies import backup_actions as backup_action_policy
  37. from cinder.policies import backups as policy
  38. import cinder.policy
  39. from cinder import quota
  40. from cinder import quota_utils
  41. import cinder.volume
  42. from cinder.volume import utils as volume_utils
  43. backup_opts = [
  44. cfg.BoolOpt('backup_use_same_host',
  45. default=False,
  46. help='Backup services use same backend.')
  47. ]
  48. CONF = cfg.CONF
  49. CONF.register_opts(backup_opts)
  50. LOG = logging.getLogger(__name__)
  51. QUOTAS = quota.QUOTAS
  52. IMPORT_VOLUME_ID = '00000000-0000-0000-0000-000000000000'
  53. class API(base.Base):
  54. """API for interacting with the volume backup manager."""
  55. def __init__(self, db=None):
  56. self.backup_rpcapi = backup_rpcapi.BackupAPI()
  57. self.volume_api = cinder.volume.API()
  58. super(API, self).__init__(db)
  59. def get(self, context, backup_id):
  60. backup = objects.Backup.get_by_id(context, backup_id)
  61. context.authorize(policy.GET_POLICY, target_obj=backup)
  62. return backup
  63. def _check_support_to_force_delete(self, context, backup_host):
  64. result = self.backup_rpcapi.check_support_to_force_delete(context,
  65. backup_host)
  66. return result
  67. def delete(self, context, backup, force=False):
  68. """Make the RPC call to delete a volume backup.
  69. Call backup manager to execute backup delete or force delete operation.
  70. :param context: running context
  71. :param backup: the dict of backup that is got from DB.
  72. :param force: indicate force delete or not
  73. :raises InvalidBackup:
  74. :raises BackupDriverException:
  75. :raises ServiceNotFound:
  76. """
  77. context.authorize(policy.DELETE_POLICY, target_obj=backup)
  78. if not force and backup.status not in [fields.BackupStatus.AVAILABLE,
  79. fields.BackupStatus.ERROR]:
  80. msg = _('Backup status must be available or error')
  81. raise exception.InvalidBackup(reason=msg)
  82. if force and not self._check_support_to_force_delete(context,
  83. backup.host):
  84. msg = _('force delete')
  85. raise exception.NotSupportedOperation(operation=msg)
  86. # Don't allow backup to be deleted if there are incremental
  87. # backups dependent on it.
  88. deltas = self.get_all(context, search_opts={'parent_id': backup.id})
  89. if deltas and len(deltas):
  90. msg = _('Incremental backups exist for this backup.')
  91. raise exception.InvalidBackup(reason=msg)
  92. backup.status = fields.BackupStatus.DELETING
  93. backup.host = self._get_available_backup_service_host(
  94. backup.host, backup.availability_zone)
  95. backup.save()
  96. self.backup_rpcapi.delete_backup(context, backup)
  97. def get_all(self, context, search_opts=None, marker=None, limit=None,
  98. offset=None, sort_keys=None, sort_dirs=None):
  99. context.authorize(policy.GET_ALL_POLICY)
  100. search_opts = search_opts or {}
  101. all_tenants = search_opts.pop('all_tenants', '0')
  102. if not strutils.is_valid_boolstr(all_tenants):
  103. msg = _("all_tenants must be a boolean, got '%s'.") % all_tenants
  104. raise exception.InvalidParameterValue(err=msg)
  105. if context.is_admin and strutils.bool_from_string(all_tenants):
  106. backups = objects.BackupList.get_all(context, search_opts,
  107. marker, limit, offset,
  108. sort_keys, sort_dirs)
  109. else:
  110. backups = objects.BackupList.get_all_by_project(
  111. context, context.project_id, search_opts,
  112. marker, limit, offset, sort_keys, sort_dirs
  113. )
  114. return backups
  115. def _az_matched(self, service, availability_zone):
  116. return ((not availability_zone) or
  117. service.availability_zone == availability_zone)
  118. def _is_backup_service_enabled(self, availability_zone, host):
  119. """Check if there is a backup service available."""
  120. topic = constants.BACKUP_TOPIC
  121. ctxt = context.get_admin_context()
  122. services = objects.ServiceList.get_all_by_topic(
  123. ctxt, topic, disabled=False)
  124. for srv in services:
  125. if (self._az_matched(srv, availability_zone) and
  126. srv.host == host and srv.is_up):
  127. return True
  128. return False
  129. def _get_any_available_backup_service(self, availability_zone):
  130. """Get an available backup service host.
  131. Get an available backup service host in the specified
  132. availability zone.
  133. """
  134. services = [srv for srv in self._list_backup_services()]
  135. random.shuffle(services)
  136. # Get the next running service with matching availability zone.
  137. idx = 0
  138. while idx < len(services):
  139. srv = services[idx]
  140. if(self._az_matched(srv, availability_zone) and
  141. srv.is_up):
  142. return srv.host
  143. idx = idx + 1
  144. return None
  145. def get_available_backup_service_host(self, host, az):
  146. return self._get_available_backup_service_host(host, az)
  147. def _get_available_backup_service_host(self, host, az):
  148. """Return an appropriate backup service host."""
  149. backup_host = None
  150. if not host or not CONF.backup_use_same_host:
  151. backup_host = self._get_any_available_backup_service(az)
  152. elif self._is_backup_service_enabled(az, host):
  153. backup_host = host
  154. if not backup_host:
  155. raise exception.ServiceNotFound(service_id='cinder-backup')
  156. return backup_host
  157. def _list_backup_services(self):
  158. """List all enabled backup services.
  159. :returns: list -- hosts for services that are enabled for backup.
  160. """
  161. topic = constants.BACKUP_TOPIC
  162. ctxt = context.get_admin_context()
  163. services = objects.ServiceList.get_all_by_topic(
  164. ctxt, topic, disabled=False)
  165. return services
  166. def _list_backup_hosts(self):
  167. services = self._list_backup_services()
  168. return [srv.host for srv in services
  169. if not srv.disabled and srv.is_up]
  170. def create(self, context, name, description, volume_id,
  171. container, incremental=False, availability_zone=None,
  172. force=False, snapshot_id=None, metadata=None):
  173. """Make the RPC call to create a volume backup."""
  174. volume = self.volume_api.get(context, volume_id)
  175. context.authorize(policy.CREATE_POLICY, target_obj=volume)
  176. snapshot = None
  177. if snapshot_id:
  178. snapshot = self.volume_api.get_snapshot(context, snapshot_id)
  179. if volume_id != snapshot.volume_id:
  180. msg = (_('Volume %(vol1)s does not match with '
  181. 'snapshot.volume_id %(vol2)s.')
  182. % {'vol1': volume_id,
  183. 'vol2': snapshot.volume_id})
  184. raise exception.InvalidVolume(reason=msg)
  185. if snapshot['status'] not in ["available"]:
  186. msg = (_('Snapshot to be backed up must be available, '
  187. 'but the current status is "%s".')
  188. % snapshot['status'])
  189. raise exception.InvalidSnapshot(reason=msg)
  190. elif volume['status'] not in ["available", "in-use"]:
  191. msg = (_('Volume to be backed up must be available '
  192. 'or in-use, but the current status is "%s".')
  193. % volume['status'])
  194. raise exception.InvalidVolume(reason=msg)
  195. elif volume['status'] in ["in-use"] and not force:
  196. msg = _('Backing up an in-use volume must use '
  197. 'the force flag.')
  198. raise exception.InvalidVolume(reason=msg)
  199. previous_status = volume['status']
  200. volume_host = volume_utils.extract_host(volume.host, 'host')
  201. availability_zone = availability_zone or volume.availability_zone
  202. host = self._get_available_backup_service_host(volume_host,
  203. availability_zone)
  204. # Reserve a quota before setting volume status and backup status
  205. try:
  206. reserve_opts = {'backups': 1,
  207. 'backup_gigabytes': volume['size']}
  208. reservations = QUOTAS.reserve(context, **reserve_opts)
  209. except exception.OverQuota as e:
  210. quota_utils.process_reserve_over_quota(
  211. context, e,
  212. resource='backups',
  213. size=volume.size)
  214. # Find the latest backup and use it as the parent backup to do an
  215. # incremental backup.
  216. latest_backup = None
  217. if incremental:
  218. backups = objects.BackupList.get_all_by_volume(context.elevated(),
  219. volume_id)
  220. if backups.objects:
  221. # NOTE(xyang): The 'data_timestamp' field records the time
  222. # when the data on the volume was first saved. If it is
  223. # a backup from volume, 'data_timestamp' will be the same
  224. # as 'created_at' for a backup. If it is a backup from a
  225. # snapshot, 'data_timestamp' will be the same as
  226. # 'created_at' for a snapshot.
  227. # If not backing up from snapshot, the backup with the latest
  228. # 'data_timestamp' will be the parent; If backing up from
  229. # snapshot, the backup with the latest 'data_timestamp' will
  230. # be chosen only if 'data_timestamp' is earlier than the
  231. # 'created_at' timestamp of the snapshot; Otherwise, the
  232. # backup will not be chosen as the parent.
  233. # For example, a volume has a backup taken at 8:00, then
  234. # a snapshot taken at 8:10, and then a backup at 8:20.
  235. # When taking an incremental backup of the snapshot, the
  236. # parent should be the backup at 8:00, not 8:20, and the
  237. # 'data_timestamp' of this new backup will be 8:10.
  238. latest_backup = max(
  239. backups.objects,
  240. key=lambda x: x['data_timestamp']
  241. if (not snapshot or (snapshot and x['data_timestamp']
  242. < snapshot['created_at']))
  243. else datetime(1, 1, 1, 1, 1, 1, tzinfo=timezone('UTC')))
  244. else:
  245. msg = _('No backups available to do an incremental backup.')
  246. raise exception.InvalidBackup(reason=msg)
  247. parent_id = None
  248. parent = None
  249. if latest_backup:
  250. parent = latest_backup
  251. parent_id = latest_backup.id
  252. if latest_backup['status'] != fields.BackupStatus.AVAILABLE:
  253. msg = _('The parent backup must be available for '
  254. 'incremental backup.')
  255. raise exception.InvalidBackup(reason=msg)
  256. data_timestamp = None
  257. if snapshot_id:
  258. snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
  259. data_timestamp = snapshot.created_at
  260. self.db.snapshot_update(
  261. context, snapshot_id,
  262. {'status': fields.SnapshotStatus.BACKING_UP})
  263. else:
  264. self.db.volume_update(context, volume_id,
  265. {'status': 'backing-up',
  266. 'previous_status': previous_status})
  267. backup = None
  268. try:
  269. kwargs = {
  270. 'user_id': context.user_id,
  271. 'project_id': context.project_id,
  272. 'display_name': name,
  273. 'display_description': description,
  274. 'volume_id': volume_id,
  275. 'status': fields.BackupStatus.CREATING,
  276. 'container': container,
  277. 'parent_id': parent_id,
  278. 'size': volume['size'],
  279. 'host': host,
  280. 'availability_zone': availability_zone,
  281. 'snapshot_id': snapshot_id,
  282. 'data_timestamp': data_timestamp,
  283. 'parent': parent,
  284. 'metadata': metadata or {}
  285. }
  286. backup = objects.Backup(context=context, **kwargs)
  287. backup.create()
  288. if not snapshot_id:
  289. backup.data_timestamp = backup.created_at
  290. backup.save()
  291. QUOTAS.commit(context, reservations)
  292. except Exception:
  293. with excutils.save_and_reraise_exception():
  294. try:
  295. if backup and 'id' in backup:
  296. backup.destroy()
  297. finally:
  298. QUOTAS.rollback(context, reservations)
  299. # TODO(DuncanT): In future, when we have a generic local attach,
  300. # this can go via the scheduler, which enables
  301. # better load balancing and isolation of services
  302. self.backup_rpcapi.create_backup(context, backup)
  303. return backup
  304. def restore(self, context, backup_id, volume_id=None, name=None):
  305. """Make the RPC call to restore a volume backup."""
  306. backup = self.get(context, backup_id)
  307. context.authorize(policy.RESTORE_POLICY, target_obj=backup)
  308. if backup['status'] != fields.BackupStatus.AVAILABLE:
  309. msg = _('Backup status must be available')
  310. raise exception.InvalidBackup(reason=msg)
  311. size = backup['size']
  312. if size is None:
  313. msg = _('Backup to be restored has invalid size')
  314. raise exception.InvalidBackup(reason=msg)
  315. # Create a volume if none specified. If a volume is specified check
  316. # it is large enough for the backup
  317. if volume_id is None:
  318. if name is None:
  319. name = 'restore_backup_%s' % backup_id
  320. description = 'auto-created_from_restore_from_backup'
  321. LOG.info("Creating volume of %(size)s GB for restore of "
  322. "backup %(backup_id)s.",
  323. {'size': size, 'backup_id': backup_id})
  324. volume = self.volume_api.create(context, size, name, description)
  325. volume_id = volume['id']
  326. while True:
  327. volume = self.volume_api.get(context, volume_id)
  328. if volume['status'] != 'creating':
  329. break
  330. greenthread.sleep(1)
  331. if volume['status'] == "error":
  332. msg = (_('Error while creating volume %(volume_id)s '
  333. 'for restoring backup %(backup_id)s.') %
  334. {'volume_id': volume_id, 'backup_id': backup_id})
  335. raise exception.InvalidVolume(reason=msg)
  336. else:
  337. volume = self.volume_api.get(context, volume_id)
  338. if volume['status'] != "available":
  339. msg = _('Volume to be restored to must be available')
  340. raise exception.InvalidVolume(reason=msg)
  341. LOG.debug('Checking backup size %(bs)s against volume size %(vs)s',
  342. {'bs': size, 'vs': volume['size']})
  343. if size > volume['size']:
  344. msg = (_('volume size %(volume_size)d is too small to restore '
  345. 'backup of size %(size)d.') %
  346. {'volume_size': volume['size'], 'size': size})
  347. raise exception.InvalidVolume(reason=msg)
  348. LOG.info("Overwriting volume %(volume_id)s with restore of "
  349. "backup %(backup_id)s",
  350. {'volume_id': volume_id, 'backup_id': backup_id})
  351. # Setting the status here rather than setting at start and unrolling
  352. # for each error condition, it should be a very small window
  353. backup.host = self._get_available_backup_service_host(
  354. backup.host, backup.availability_zone)
  355. backup.status = fields.BackupStatus.RESTORING
  356. backup.restore_volume_id = volume.id
  357. backup.save()
  358. self.db.volume_update(context, volume_id, {'status':
  359. 'restoring-backup'})
  360. self.backup_rpcapi.restore_backup(context, backup.host, backup,
  361. volume_id)
  362. d = {'backup_id': backup_id,
  363. 'volume_id': volume_id,
  364. 'volume_name': volume['display_name'], }
  365. return d
  366. def reset_status(self, context, backup_id, status):
  367. """Make the RPC call to reset a volume backup's status.
  368. Call backup manager to execute backup status reset operation.
  369. :param context: running context
  370. :param backup_id: which backup's status to be reset
  371. :param status: backup's status to be reset
  372. :raises InvalidBackup:
  373. """
  374. # get backup info
  375. backup = self.get(context, backup_id)
  376. context.authorize(
  377. backup_action_policy.BASE_POLICY_NAME % "reset_status",
  378. target_obj=backup)
  379. backup.host = self._get_available_backup_service_host(
  380. backup.host, backup.availability_zone)
  381. backup.save()
  382. # send to manager to do reset operation
  383. self.backup_rpcapi.reset_status(ctxt=context, backup=backup,
  384. status=status)
  385. def export_record(self, context, backup_id):
  386. """Make the RPC call to export a volume backup.
  387. Call backup manager to execute backup export.
  388. :param context: running context
  389. :param backup_id: backup id to export
  390. :returns: dictionary -- a description of how to import the backup
  391. :returns: contains 'backup_url' and 'backup_service'
  392. :raises InvalidBackup:
  393. """
  394. backup = self.get(context, backup_id)
  395. context.authorize(policy.EXPORT_POLICY, target_obj=backup)
  396. if backup['status'] != fields.BackupStatus.AVAILABLE:
  397. msg = (_('Backup status must be available and not %s.') %
  398. backup['status'])
  399. raise exception.InvalidBackup(reason=msg)
  400. LOG.debug("Calling RPCAPI with context: "
  401. "%(ctx)s, host: %(host)s, backup: %(id)s.",
  402. {'ctx': context,
  403. 'host': backup['host'],
  404. 'id': backup['id']})
  405. backup.host = self._get_available_backup_service_host(
  406. backup.host, backup.availability_zone)
  407. backup.save()
  408. export_data = self.backup_rpcapi.export_record(context, backup)
  409. return export_data
  410. def _get_import_backup(self, context, backup_url):
  411. """Prepare database backup record for import.
  412. This method decodes provided backup_url and expects to find the id of
  413. the backup in there.
  414. Then checks the DB for the presence of this backup record and if it
  415. finds it and is not deleted it will raise an exception because the
  416. record cannot be created or used.
  417. If the record is in deleted status then we must be trying to recover
  418. this record, so we'll reuse it.
  419. If the record doesn't already exist we create it with provided id.
  420. :param context: running context
  421. :param backup_url: backup description to be used by the backup driver
  422. :return: BackupImport object
  423. :raises InvalidBackup:
  424. :raises InvalidInput:
  425. """
  426. reservations = None
  427. backup = None
  428. # Deserialize string backup record into a dictionary
  429. backup_record = objects.Backup.decode_record(backup_url)
  430. # ID is a required field since it's what links incremental backups
  431. if 'id' not in backup_record:
  432. msg = _('Provided backup record is missing an id')
  433. raise exception.InvalidInput(reason=msg)
  434. # Since we use size to reserve&commit quota, size is another required
  435. # field.
  436. if 'size' not in backup_record:
  437. msg = _('Provided backup record is missing size attribute')
  438. raise exception.InvalidInput(reason=msg)
  439. try:
  440. reserve_opts = {'backups': 1,
  441. 'backup_gigabytes': backup_record['size']}
  442. reservations = QUOTAS.reserve(context, **reserve_opts)
  443. except exception.OverQuota as e:
  444. quota_utils.process_reserve_over_quota(
  445. context, e,
  446. resource='backups',
  447. size=backup_record['size'])
  448. kwargs = {
  449. 'user_id': context.user_id,
  450. 'project_id': context.project_id,
  451. 'volume_id': IMPORT_VOLUME_ID,
  452. 'status': fields.BackupStatus.CREATING,
  453. 'deleted_at': None,
  454. 'deleted': False,
  455. 'metadata': {}
  456. }
  457. try:
  458. try:
  459. # Try to get the backup with that ID in all projects even among
  460. # deleted entries.
  461. backup = objects.BackupImport.get_by_id(
  462. context.elevated(read_deleted='yes'),
  463. backup_record['id'],
  464. project_only=False)
  465. # If record exists and it's not deleted we cannot proceed
  466. # with the import
  467. if backup.status != fields.BackupStatus.DELETED:
  468. msg = _('Backup already exists in database.')
  469. raise exception.InvalidBackup(reason=msg)
  470. # Otherwise we'll "revive" delete backup record
  471. backup.update(kwargs)
  472. backup.save()
  473. QUOTAS.commit(context, reservations)
  474. except exception.BackupNotFound:
  475. # If record doesn't exist create it with the specific ID
  476. backup = objects.BackupImport(context=context,
  477. id=backup_record['id'], **kwargs)
  478. backup.create()
  479. QUOTAS.commit(context, reservations)
  480. except Exception:
  481. with excutils.save_and_reraise_exception():
  482. try:
  483. if backup and 'id' in backup:
  484. backup.destroy()
  485. finally:
  486. QUOTAS.rollback(context, reservations)
  487. return backup
  488. def import_record(self, context, backup_service, backup_url):
  489. """Make the RPC call to import a volume backup.
  490. :param context: running context
  491. :param backup_service: backup service name
  492. :param backup_url: backup description to be used by the backup driver
  493. :raises InvalidBackup:
  494. :raises ServiceNotFound:
  495. :raises InvalidInput:
  496. """
  497. context.authorize(policy.IMPORT_POLICY)
  498. # NOTE(ronenkat): since we don't have a backup-scheduler
  499. # we need to find a host that support the backup service
  500. # that was used to create the backup.
  501. # We send it to the first backup service host, and the backup manager
  502. # on that host will forward it to other hosts on the hosts list if it
  503. # cannot support correct service itself.
  504. hosts = self._list_backup_hosts()
  505. if len(hosts) == 0:
  506. raise exception.ServiceNotFound(service_id=backup_service)
  507. # Get Backup object that will be used to import this backup record
  508. backup = self._get_import_backup(context, backup_url)
  509. first_host = hosts.pop()
  510. self.backup_rpcapi.import_record(context,
  511. first_host,
  512. backup,
  513. backup_service,
  514. backup_url,
  515. hosts)
  516. return backup
  517. def update(self, context, backup_id, fields):
  518. backup = self.get(context, backup_id)
  519. context.authorize(policy.UPDATE_POLICY, target_obj=backup)
  520. backup.update(fields)
  521. backup.save()
  522. return backup