diff --git a/cinder/backup/drivers/ceph.py b/cinder/backup/drivers/ceph.py index 7b56b7a0ebd..d9c7341946d 100644 --- a/cinder/backup/drivers/ceph.py +++ b/cinder/backup/drivers/ceph.py @@ -13,19 +13,46 @@ # License for the specific language governing permissions and limitations # under the License. -"""Ceph Backup Service Implementation""" +"""Ceph Backup Service Implementation. -import os -import time +This driver supports backing up volumes of any type to a Ceph backend store. It +is also capable of detecting whether the volume to be backed up is a Ceph RBD +volume and if so, attempts to perform incremental/differential backups. + +Support is also included for the following in the case of source volume being a +Ceph RBD volume: + + * backing up within the same Ceph pool (not recommended) + * backing up between different Ceph pools + * backing up between different Ceph clusters + +At the time of writing, differential backup support in Ceph/librbd was quite +new so this driver accounts for this by first attempting differential backup +and falling back to full backup/copy if the former fails. + +If incremental backups are used, multiple backups of the same volume are stored +as snapshots so that minimal space is consumed in the backup store and +restoring the volume takes a far reduced amount of time compared to a full +copy. + +Note that Cinder supports restoring to a new volume or the original volume the +backup was taken from. For the latter case, a full copy is enforced since this +was deemed the safest action to take. It is therefore recommended to always +restore to a new volume (default). +""" import eventlet -from oslo.config import cfg +import os +import re +import time from cinder.backup.driver import BackupDriver from cinder import exception from cinder.openstack.common import log as logging from cinder import units -import cinder.volume.drivers.rbd as rbddriver +from cinder import utils +import cinder.volume.drivers as drivers +from oslo.config import cfg try: import rados @@ -57,58 +84,93 @@ CONF.register_opts(service_opts) class CephBackupDriver(BackupDriver): - """Backup up Cinder volumes to Ceph Object Store""" + """Backup up Cinder volumes to Ceph Object Store. - def __init__(self, context, db_driver=None): + This class enables backing up Cinder volumes to a Ceph object store. + Backups may be stored in their own pool or even cluster. Store location is + defined by the Ceph conf file and Service config options supplied. + + If the source volume is itself an RBD volume, the backup will be performed + using incremental differential backups which *should* give a performance + gain. + """ + + def __init__(self, context, db_driver=None, execute=None): super(CephBackupDriver, self).__init__(db_driver) self.rbd = rbd self.rados = rados self.context = context self.chunk_size = CONF.backup_ceph_chunk_size - if self._supports_stripingv2(): + self._execute = execute or utils.execute + + if self._supports_stripingv2: self.rbd_stripe_unit = CONF.backup_ceph_stripe_unit self.rbd_stripe_count = CONF.backup_ceph_stripe_count else: - LOG.info("rbd striping not supported - ignoring conf settings " - "for rbd striping") + LOG.info(_("rbd striping not supported - ignoring configuration " + "settings for rbd striping")) self.rbd_stripe_count = 0 self.rbd_stripe_unit = 0 - self._ceph_user = str(CONF.backup_ceph_user) - self._ceph_pool = str(CONF.backup_ceph_pool) - self._ceph_conf = str(CONF.backup_ceph_conf) + self._ceph_backup_user = str(CONF.backup_ceph_user) + self._ceph_backup_pool = str(CONF.backup_ceph_pool) + self._ceph_backup_conf = str(CONF.backup_ceph_conf) + def _validate_string_args(self, *args): + """Ensure all args are non-None and non-empty.""" + return all(args) + + def _ceph_args(self, user, conf=None, pool=None): + """Create default ceph args for executing rbd commands. + + If no --conf is provided, rbd will look in the default locations e.g. + /etc/ceph/ceph.conf + """ + + # Make sure user arg is valid since rbd command may not fail if + # invalid/no user provided, resulting in unexpected behaviour. + if not self._validate_string_args(user): + raise exception.BackupInvalidCephArgs(_("invalid user '%s'") % + (user)) + + args = ['--id', user] + if conf: + args += ['--conf', conf] + if pool: + args += '--pool', pool + + return args + + @property def _supports_layering(self): - """ - Determine whether copy-on-write is supported by our version of librbd - """ + """Determine if copy-on-write is supported by our version of librbd.""" return hasattr(self.rbd, 'RBD_FEATURE_LAYERING') + @property def _supports_stripingv2(self): - """ - Determine whether striping is supported by our version of librbd - """ + """Determine if striping is supported by our version of librbd.""" return hasattr(self.rbd, 'RBD_FEATURE_STRIPINGV2') def _get_rbd_support(self): + """Determine RBD features supported by our version of librbd.""" old_format = True features = 0 - if self._supports_layering(): + if self._supports_layering: old_format = False features |= self.rbd.RBD_FEATURE_LAYERING - if self._supports_stripingv2(): + if self._supports_stripingv2: old_format = False features |= self.rbd.RBD_FEATURE_STRIPINGV2 return (old_format, features) def _connect_to_rados(self, pool=None): - """Establish connection to the Ceph cluster""" - client = self.rados.Rados(rados_id=self._ceph_user, - conffile=self._ceph_conf) + """Establish connection to the backup Ceph cluster.""" + client = self.rados.Rados(rados_id=self._ceph_backup_user, + conffile=self._ceph_backup_conf) try: client.connect() - pool_to_open = str(pool or self._ceph_pool) + pool_to_open = str(pool or self._ceph_backup_pool) ioctx = client.open_ioctx(pool_to_open) return client, ioctx except self.rados.Error: @@ -117,157 +179,728 @@ class CephBackupDriver(BackupDriver): raise def _disconnect_from_rados(self, client, ioctx): - """Terminate connection with the Ceph cluster""" + """Terminate connection with the backup Ceph cluster.""" # closing an ioctx cannot raise an exception ioctx.close() client.shutdown() - def _get_backup_base_name(self, volume_id, backup_id): - """Return name of base image used for backup.""" + def _get_backup_base_name(self, volume_id, backup_id=None, + diff_format=False): + """Return name of base image used for backup. + + Incremental backups use a new base name so we support old and new style + format. + """ # Ensure no unicode - return str("volume-%s.backup.%s" % (volume_id, backup_id)) + if diff_format: + return str("volume-%s.backup.base" % (volume_id)) + else: + if backup_id is None: + msg = _("backup_id required") + raise exception.InvalidParameterValue(msg) + return str("volume-%s.backup.%s" % (volume_id, backup_id)) + + def _transfer_data(self, src, src_name, dest, dest_name, length): + """Transfer data between files (Python IO objects).""" + LOG.debug(_("transferring data between '%(src)s' and '%(dest)s'") % + {'src': src_name, 'dest': dest_name}) - def _transfer_data(self, src, dest, dest_name, length, dest_is_rbd=False): - """ - Transfer data between file and rbd. If destination is rbd, source is - assumed to be file, otherwise source is assumed to be rbd. - """ chunks = int(length / self.chunk_size) - LOG.debug("transferring %s chunks of %s bytes to '%s'" % - (chunks, self.chunk_size, dest_name)) + LOG.debug(_("%(chunks)s chunks of %(bytes)s bytes to be transferred") % + {'chunks': chunks, 'bytes': self.chunk_size}) + for chunk in xrange(0, chunks): - offset = chunk * self.chunk_size before = time.time() - - if dest_is_rbd: - dest.write(src.read(self.chunk_size), offset) - # note(dosaboy): librbd writes are synchronous so flush() will - # have not effect. Also, flush only supported in more recent - # versions of librbd. - else: - dest.write(src.read(offset, self.chunk_size)) - dest.flush() - + data = src.read(self.chunk_size) + dest.write(data) + dest.flush() delta = (time.time() - before) rate = (self.chunk_size / delta) / 1024 - LOG.debug("transferred chunk %s of %s (%dK/s)" % - (chunk, chunks, rate)) + LOG.debug((_("transferred chunk %(chunk)s of %(chunks)s " + "(%(rate)dK/s)") % + {'chunk': chunk, 'chunks': chunks, + 'rate': rate})) # yield to any other pending backups eventlet.sleep(0) rem = int(length % self.chunk_size) if rem: - LOG.debug("transferring remaining %s bytes" % (rem)) - offset = (length - rem) - if dest_is_rbd: - dest.write(src.read(rem), offset) - # note(dosaboy): librbd writes are synchronous so flush() will - # have not effect. Also, flush only supported in more recent - # versions of librbd. - else: - dest.write(src.read(offset, rem)) - dest.flush() - + LOG.debug(_("transferring remaining %s bytes") % (rem)) + data = src.read(rem) + dest.write(data) + dest.flush() # yield to any other pending backups eventlet.sleep(0) - def _backup_volume_from_file(self, backup_name, backup_size, volume_file): - """Backup a volume from file stream""" - LOG.debug("performing backup from file") + def _create_base_image(self, name, size, rados_client): + """Create a base backup image. + This will be the base image used for storing differential exports. + """ + LOG.debug(_("creating base image '%s'") % (name)) old_format, features = self._get_rbd_support() + self.rbd.RBD().create(ioctx=rados_client.ioctx, + name=name, + size=size, + old_format=old_format, + features=features, + stripe_unit=self.rbd_stripe_unit, + stripe_count=self.rbd_stripe_count) - with rbddriver.RADOSClient(self, self._ceph_pool) as client: + def _delete_backup_snapshots(self, rados_client, base_name, backup_id): + """Delete any snapshots associated with this backup. + + A backup should have at most *one* associated snapshot. + + This is required before attempting to delete the base image. The + snapshots on the original volume can be left as they will be purged + when the volume is deleted. + """ + backup_snaps = None + base_rbd = self.rbd.Image(rados_client.ioctx, base_name) + try: + snap_name = self._get_backup_snap_name(base_rbd, base_name, + backup_id) + if snap_name: + LOG.debug(_("deleting backup snapshot='%s'") % (snap_name)) + base_rbd.remove_snap(snap_name) + else: + LOG.debug(_("no backup snapshot to delete")) + + # Now check whether any snapshots remain on the base image + backup_snaps = self.get_backup_snaps(base_rbd) + finally: + base_rbd.close() + + if backup_snaps: + return len(backup_snaps) + else: + return 0 + + def _try_delete_base_image(self, backup_id, volume_id, base_name=None): + """Try to delete backup RBD image. + + If the rbd image is a base image for incremental backups, it may have + snapshots. Delete the snapshot associated with backup_id and if the + image has no more snapshots, delete it. Otherwise return. + + If no base name is provided try normal (full) format then diff format + image name. + + If a base name is provided but does not exist, ImageNotFound will be + raised. + + If the image is busy, a number of retries will be performed if + ImageBusy is received, after which the exception will be propagated to + the caller. + """ + retries = 3 + delay = 5 + try_diff_format = False + + if base_name is None: + try_diff_format = True + + base_name = self._get_backup_base_name(volume_id, backup_id) + LOG.debug(_("trying diff format name format basename='%s'") % + (base_name)) + + with drivers.rbd.RADOSClient(self) as client: + rbd_exists, base_name = \ + self._rbd_image_exists(base_name, volume_id, client, + try_diff_format=try_diff_format) + if not rbd_exists: + raise self.rbd.ImageNotFound(_("image %s not found") % + (base_name)) + + while retries >= 0: + # First delete associated snapshot (if exists) + rem = self._delete_backup_snapshots(client, base_name, + backup_id) + if rem: + msg = (_("base image still has %s snapshots so not " + "deleting base image") % (rem)) + LOG.info(msg) + return + + LOG.info(_("deleting base image='%s'") % (base_name)) + # Delete base if no more snapshots + try: + self.rbd.RBD().remove(client.ioctx, base_name) + except self.rbd.ImageBusy as exc: + # Allow a retry if the image is busy + if retries > 0: + LOG.info((_("image busy, retrying %(retries)s " + "more time(s) in %(delay)ss") % + {'retries': retries, 'delay': delay})) + eventlet.sleep(delay) + else: + LOG.error(_("max retries reached - raising error")) + raise exc + else: + LOG.debug(_("base backup image='%s' deleted)") % + (base_name)) + retries = 0 + finally: + retries -= 1 + + def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool, + src_user, src_conf, dest_user, dest_conf, + src_snap=None, from_snap=None): + """Backup only extents changed between two points. + + If no snapshot is provided, the diff extents will be all those changed + since the rbd volume/base was created, otherwise it will be those + changed since the snapshot was created. + """ + LOG.debug(_("performing differential transfer from '%(src)s' to " + "'%(dest)s'") % + {'src': src_name, 'dest': dest_name}) + + # NOTE(dosaboy): Need to be tolerant of clusters/clients that do + # not support these operations since at the time of writing they + # were very new. + + src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool) + dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool) + + try: + cmd = ['rbd', 'export-diff'] + src_ceph_args + if from_snap is not None: + cmd += ['--from-snap', from_snap] + if src_snap: + path = str("%s/%s@%s" % (src_pool, src_name, src_snap)) + else: + path = str("%s/%s" % (src_pool, src_name)) + cmd += [path, '-'] + out, err = self._execute(*cmd) + except (exception.ProcessExecutionError, exception.Error) as exc: + LOG.info(_("rbd export-diff failed - %s") % (str(exc))) + raise exception.BackupRBDOperationFailed("rbd export-diff failed") + + try: + cmd = ['rbd', 'import-diff'] + dest_ceph_args + cmd += ['-', str("%s/%s" % (dest_pool, dest_name))] + out, err = self._execute(*cmd, process_input=out) + except (exception.ProcessExecutionError, exception.Error) as exc: + LOG.info(_("rbd import-diff failed - %s") % (str(exc))) + raise exception.BackupRBDOperationFailed("rbd import-diff failed") + + def _rbd_image_exists(self, name, volume_id, client, + try_diff_format=False): + """Return tuple (exists, name).""" + rbds = self.rbd.RBD().list(client.ioctx) + if name not in rbds: + msg = _("image '%s' not found - trying diff format name") % (name) + LOG.debug(msg) + if try_diff_format: + name = self._get_backup_base_name(volume_id, diff_format=True) + if name not in rbds: + msg = _("diff format image '%s' not found") % (name) + LOG.debug(msg) + return False, name + else: + return False, name + + return True, name + + def _snap_exists(self, base_name, snap_name, client): + """Return True if snapshot exists in base image.""" + base_rbd = self.rbd.Image(client.ioctx, base_name) + try: + snaps = base_rbd.list_snaps() + finally: + base_rbd.close() + + if snaps is None: + return False + + for snap in snaps: + if snap['name'] == snap_name: + return True + + return False + + def _backup_rbd(self, backup_id, volume_id, volume_file, volume_name, + length): + """Create a incremental backup from an RBD image.""" + rbd_user = volume_file.rbd_user + rbd_pool = volume_file.rbd_pool + rbd_conf = volume_file.rbd_conf + source_rbd_image = volume_file.rbd_image + + # Identify our --from-snap point (if one exists) + from_snap = self._get_most_recent_snap(source_rbd_image) + LOG.debug(_("using --from-snap '%s'") % from_snap) + + backup_name = self._get_backup_base_name(volume_id, diff_format=True) + image_created = False + force_full_backup = False + with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: + # If from_snap does not exist in the destination, this implies a + # previous backup has failed. In this case we will force a full + # backup. + # + # TODO(dosaboy): find a way to repair the broken backup + # + if backup_name not in self.rbd.RBD().list(ioctx=client.ioctx): + # If a from_snap is defined then we cannot proceed (see above) + if from_snap is not None: + force_full_backup = True + + # Create new base image + self._create_base_image(backup_name, length, client) + image_created = True + else: + # If a from_snap is defined but does not exist in the back base + # then we cannot proceed (see above) + if not self._snap_exists(backup_name, from_snap, client): + force_full_backup = True + + if force_full_backup: + errmsg = (_("snap='%(snap)s' does not exist in base " + "image='%(base)s' - aborting incremental backup") % + {'snap': from_snap, 'base': backup_name}) + LOG.info(errmsg) + # Raise this exception so that caller can try another + # approach + raise exception.BackupRBDOperationFailed(errmsg) + + # Snapshot source volume so that we have a new point-in-time + new_snap = self._get_new_snap_name(backup_id) + LOG.debug(_("creating backup snapshot='%s'") % (new_snap)) + source_rbd_image.create_snap(new_snap) + + # Attempt differential backup. If this fails, perhaps because librbd + # or Ceph cluster version does not support it, do a full backup + # instead. + # + # TODO(dosaboy): find a way to determine if the operation is supported + # rather than brute force approach. + try: + before = time.time() + self._rbd_diff_transfer(volume_name, rbd_pool, backup_name, + self._ceph_backup_pool, + src_user=rbd_user, + src_conf=rbd_conf, + dest_user=self._ceph_backup_user, + dest_conf=self._ceph_backup_conf, + src_snap=new_snap, + from_snap=from_snap) + + LOG.debug(_("differential backup transfer completed in %.4fs") % + (time.time() - before)) + + # We don't need the previous snapshot (if there was one) anymore so + # delete it. + if from_snap: + source_rbd_image.remove_snap(from_snap) + + except exception.BackupRBDOperationFailed: + LOG.debug(_("differential backup transfer failed")) + + # Clean up if image was created as part of this operation + if image_created: + self._try_delete_base_image(backup_id, volume_id, + base_name=backup_name) + + # Delete snapshot + LOG.debug(_("deleting backup snapshot='%s'") % (new_snap)) + source_rbd_image.remove_snap(new_snap) + + # Re-raise the exception so that caller can try another approach + raise + + def _file_is_rbd(self, volume_file): + """Returns True if the volume_file is actually an RBD image.""" + return hasattr(volume_file, 'rbd_image') + + def _full_backup(self, backup_id, volume_id, src_volume, src_name, length): + """Perform a full backup of src volume. + + First creates a base backup image in our backup location then performs + an chunked copy of all data from source volume to a new backup rbd + image. + """ + backup_name = self._get_backup_base_name(volume_id, backup_id) + + with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: + # First create base backup image + old_format, features = self._get_rbd_support() + LOG.debug(_("creating base image='%s'") % (backup_name)) self.rbd.RBD().create(ioctx=client.ioctx, name=backup_name, - size=backup_size, + size=length, old_format=old_format, features=features, stripe_unit=self.rbd_stripe_unit, stripe_count=self.rbd_stripe_count) + LOG.debug(_("copying data")) dest_rbd = self.rbd.Image(client.ioctx, backup_name) try: - self._transfer_data(volume_file, dest_rbd, backup_name, - backup_size, dest_is_rbd=True) + rbd_meta = drivers.rbd.RBDImageMetadata(dest_rbd, + self._ceph_backup_pool, + self._ceph_backup_user, + self._ceph_backup_conf) + rbd_fd = drivers.rbd.RBDImageIOWrapper(rbd_meta) + self._transfer_data(src_volume, src_name, rbd_fd, backup_name, + length) finally: dest_rbd.close() - def backup(self, backup, volume_file): - """Backup the given volume to Ceph object store""" - backup_id = backup['id'] - volume = self.db.volume_get(self.context, backup['volume_id']) - backup_name = self._get_backup_base_name(volume['id'], backup_id) + @staticmethod + def backup_snapshot_name_pattern(): + """Returns the pattern used to match backup snapshots. - LOG.debug("Starting backup of volume='%s' to rbd='%s'" % - (volume['name'], backup_name)) + It is essential that snapshots created for purposes other than backups + do not have this name format. + """ + return r"^backup\.([a-z0-9\-]+?)\.snap\.(.+)$" + @classmethod + def get_backup_snaps(cls, rbd_image, sort=False): + """Get all backup snapshots for the given rbd image. + + NOTE: this call is made public since these snapshots must be deleted + before the base volume can be deleted. + """ + snaps = rbd_image.list_snaps() + + backup_snaps = [] + for snap in snaps: + search_key = cls.backup_snapshot_name_pattern() + result = re.search(search_key, snap['name']) + if result: + backup_snaps.append({'name': result.group(0), + 'backup_id': result.group(1), + 'timestamp': result.group(2)}) + + if sort: + # Sort into ascending order of timestamp + backup_snaps.sort(key=lambda x: x['timestamp'], reverse=True) + + return backup_snaps + + def _get_new_snap_name(self, backup_id): + return str("backup.%s.snap.%s" % (backup_id, time.time())) + + def _get_backup_snap_name(self, rbd_image, name, backup_id): + """Return the name of the snapshot associated with backup_id. + + The rbd image provided must be the base image used for an incremental + backup. + + A back is only allowed to have one associated snapshot. If more than + one is found, exception.BackupOperationError is raised. + """ + snaps = self.get_backup_snaps(rbd_image) + + LOG.debug(_("looking for snapshot of backup base '%s'") % (name)) + + if not snaps: + LOG.debug(_("backup base '%s' has no snapshots") % (name)) + return None + + snaps = [snap['name'] for snap in snaps + if snap['backup_id'] == backup_id] + + if not snaps: + LOG.debug(_("backup '%s' has no snapshot") % (backup_id)) + return None + + if len(snaps) > 1: + msg = (_("backup should only have one snapshot but instead has %s") + % (len(snaps))) + LOG.error(msg) + raise exception.BackupOperationError(msg) + + LOG.debug(_("found snapshot '%s'") % (snaps[0])) + return snaps[0] + + def _get_most_recent_snap(self, rbd_image): + """Get the most recent backup snapshot of the provided image. + + Returns name of most recent backup snapshot or None if there are no + backup snapshot. + """ + backup_snaps = self.get_backup_snaps(rbd_image, sort=True) + if not backup_snaps: + return None + + return backup_snaps[0]['name'] + + def _get_volume_size_gb(self, volume): + """Return the size in gigabytes of the given volume. + + Raises exception.InvalidParameterValue if voluem size is 0. + """ if int(volume['size']) == 0: raise exception.InvalidParameterValue("need non-zero volume size") - else: - backup_size = int(volume['size']) * units.GiB - if volume_file: - self._backup_volume_from_file(backup_name, backup_size, - volume_file) - else: - errmsg = ("No volume_file was provided so I cannot do requested " - "backup (id=%s)" % (backup_id)) - raise exception.BackupVolumeInvalidType(errmsg) + return int(volume['size']) * units.GiB - self.db.backup_update(self.context, backup['id'], - {'container': self._ceph_pool}) + def backup(self, backup, volume_file): + """Backup the given volume to Ceph object store. + + If the source volume is an RBD we will attempt to do an + incremental/differential backup, otherwise a full copy is performed. + If this fails we will attempt to fall back to full copy. + """ + backup_id = backup['id'] + volume = self.db.volume_get(self.context, backup['volume_id']) + volume_id = volume['id'] + volume_name = volume['name'] + + LOG.debug(_("Starting backup of volume='%s'") % volume_name) + + # Ensure we are at the beginning of the volume + volume_file.seek(0) + length = self._get_volume_size_gb(volume) + + do_full_backup = False + if self._file_is_rbd(volume_file): + # If volume an RBD, attempt incremental backup. + try: + self._backup_rbd(backup_id, volume_id, volume_file, + volume_name, length) + except exception.BackupRBDOperationFailed: + LOG.debug(_("forcing full backup")) + do_full_backup = True + else: + do_full_backup = True + + if do_full_backup: + self._full_backup(backup_id, volume_id, volume_file, + volume_name, length) + + self.db.backup_update(self.context, backup_id, + {'container': self._ceph_backup_pool}) LOG.debug(_("backup '%s' finished.") % (backup_id)) - def restore(self, backup, volume_id, volume_file): - """Restore the given volume backup from Ceph object store""" - volume = self.db.volume_get(self.context, volume_id) - backup_name = self._get_backup_base_name(backup['volume_id'], - backup['id']) + def _full_restore(self, backup_id, volume_id, dest_file, dest_name, + length, src_snap=None): + """Restore the given volume file from backup RBD. - LOG.debug('starting backup restore from Ceph backup=%s ' - 'to volume=%s' % (backup['id'], volume['name'])) + This will result in all extents being copied from source to destination + """ + with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: + + if src_snap: + # If a source snapshot is provided we assume the base is diff + # format. + backup_name = self._get_backup_base_name(volume_id, + diff_format=True) + else: + backup_name = self._get_backup_base_name(volume_id, backup_id) + + # Retrieve backup volume + src_rbd = self.rbd.Image(client.ioctx, backup_name, + snapshot=src_snap) + try: + rbd_meta = drivers.rbd.RBDImageMetadata(src_rbd, + self._ceph_backup_pool, + self._ceph_backup_user, + self._ceph_backup_conf) + rbd_fd = drivers.rbd.RBDImageIOWrapper(rbd_meta) + self._transfer_data(rbd_fd, backup_name, dest_file, dest_name, + length) + finally: + src_rbd.close() + + def _restore_rbd(self, base_name, volume_file, volume_name, restore_point): + """Restore RBD volume from RBD image.""" + rbd_user = volume_file.rbd_user + rbd_pool = volume_file.rbd_pool + rbd_conf = volume_file.rbd_conf + + LOG.debug(_("trying incremental restore from base='%(base)s' " + "snap='%(snap)s'") % + {'base': base_name, 'snap': restore_point}) + before = time.time() + try: + self._rbd_diff_transfer(base_name, self._ceph_backup_pool, + volume_name, rbd_pool, + src_user=self._ceph_backup_user, + src_conf=self._ceph_backup_conf, + dest_user=rbd_user, dest_conf=rbd_conf, + src_snap=restore_point) + except exception.BackupRBDOperationFailed: + LOG.exception(_("differential restore failed, trying full " + "restore")) + raise + + LOG.debug(_("restore transfer completed in %.4fs") % + (time.time() - before)) + + def _num_backup_snaps(self, backup_base_name): + """Return the number of snapshots that exist on the base image.""" + with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: + base_rbd = self.rbd.Image(client.ioctx, backup_base_name) + try: + snaps = self.get_backup_snaps(base_rbd) + finally: + base_rbd.close() + + if snaps: + return len(snaps) + else: + return 0 + + def _get_restore_point(self, base_name, backup_id): + """Get restore point snapshot name for incremental backup. + + If the backup was not incremental None is returned. + """ + with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: + base_rbd = self.rbd.Image(client.ioctx, base_name) + try: + restore_point = self._get_backup_snap_name(base_rbd, base_name, + backup_id) + finally: + base_rbd.close() + + return restore_point + + def _rbd_has_extents(self, rbd_volume): + """Check whether the given rbd volume has extents. + + Return True if has extents, otherwise False. + """ + extents = [] + + def iter_cb(offset, length, exists): + if exists: + extents.append(length) + + rbd_volume.diff_iterate(0, rbd_volume.size(), None, iter_cb) + + if extents: + LOG.debug("rbd has %s extents" % (sum(extents))) + return True + + return False + + def _diff_restore_allowed(self, base_name, backup, volume, volume_file, + rados_client): + """Determine whether a differential restore is possible/allowed. + + In order for a differential restore to be performed we need: + * destination volume must be RBD + * destination volume must have zero extents + * backup base image must exist + * backup must have a restore point + + Returns True if differential restore is allowed, False otherwise. + """ + not_allowed = (False, None) + + # If the volume we are restoring to is the volume the backup was made + # from, force a full restore since a diff will not work in this case. + if volume['id'] == backup['volume_id']: + LOG.debug("dest volume is original volume - forcing full copy") + return not_allowed + + if self._file_is_rbd(volume_file): + rbd_exists, base_name = self._rbd_image_exists(base_name, + backup['volume_id'], + rados_client) + + if not rbd_exists: + return not_allowed + + # Get the restore point. If no restore point is found, we assume + # that the backup was not performed using diff/incremental methods + # so we enforce full copy. + restore_point = self._get_restore_point(base_name, backup['id']) + if restore_point: + # If the destination volume has extents we cannot allow a diff + # restore. + if self._rbd_has_extents(volume_file.rbd_image): + # We return the restore point so that a full copy is done + # from snapshot. + LOG.debug("destination has extents - forcing full copy") + return False, restore_point + + return True, restore_point + else: + LOG.info(_("no restore point found for backup='%s', forcing " + "full copy") % (backup['id'])) + + return not_allowed + + def _try_restore(self, backup, volume, volume_file): + """Attempt to restore volume from backup.""" + volume_name = volume['name'] + backup_id = backup['id'] + backup_volume_id = backup['volume_id'] + length = int(volume['size']) * units.GiB + + base_name = self._get_backup_base_name(backup['volume_id'], + diff_format=True) + + with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: + diff_restore, restore_point = \ + self._diff_restore_allowed(base_name, backup, volume, + volume_file, client) + + if diff_restore: + try: + do_full_restore = False + self._restore_rbd(base_name, volume_file, volume_name, + restore_point) + except exception.BackupRBDOperationFailed: + LOG.debug(_("forcing full restore")) + do_full_restore = True + else: + do_full_restore = True + + if do_full_restore: + # Otherwise full copy + self._full_restore(backup_id, backup_volume_id, volume_file, + volume_name, length, src_snap=restore_point) + + def restore(self, backup, volume_id, volume_file): + """Restore the given volume backup from Ceph object store.""" + target_volume = self.db.volume_get(self.context, volume_id) + LOG.debug(_('starting restore from Ceph backup=%(src)s to ' + 'volume=%(dest)s') % + {'src': backup['id'], 'dest': target_volume['name']}) # Ensure we are at the beginning of the volume volume_file.seek(0) - backup_size = int(volume['size']) * units.GiB - - with rbddriver.RADOSClient(self, self._ceph_pool) as client: - src_rbd = self.rbd.Image(client.ioctx, backup_name) - try: - self._transfer_data(src_rbd, volume_file, volume['name'], - backup_size) - finally: - src_rbd.close() - - # Be tolerant to IO implementations that do not support fileno() try: - fileno = volume_file.fileno() - except IOError: - LOG.info("volume_file does not support fileno() so skipping " - "fsync()") - else: - os.fsync(fileno) + self._try_restore(backup, target_volume, volume_file) - LOG.debug('restore %s to %s finished.' % (backup['id'], volume_id)) + # Be tolerant to IO implementations that do not support fileno() + try: + fileno = volume_file.fileno() + except IOError: + LOG.info(_("volume_file does not support fileno() so skipping " + "fsync()")) + else: + os.fsync(fileno) + + LOG.debug(_('restore finished.')) + except exception.BackupOperationError as e: + LOG.error(_('restore finished with error - %s') % (e)) + raise def delete(self, backup): - """Delete the given backup from Ceph object store""" + """Delete the given backup from Ceph object store.""" backup_id = backup['id'] - backup_name = self._get_backup_base_name(backup['volume_id'], - backup_id) - - LOG.debug('delete started for backup=%s', backup['id']) + LOG.debug(_('delete started for backup=%s') % backup['id']) try: - with rbddriver.RADOSClient(self) as client: - self.rbd.RBD().remove(client.ioctx, backup_name) + self._try_delete_base_image(backup['id'], backup['volume_id']) except self.rbd.ImageNotFound: - LOG.warning("rbd image '%s' not found but continuing anyway so " - "that db entry can be removed" % (backup_name)) + msg = _("rbd image not found but continuing anyway so " + "that db entry can be removed") + LOG.warning(msg) + LOG.info(_("delete '%s' finished with warning") % (backup_id)) LOG.debug(_("delete '%s' finished") % (backup_id)) diff --git a/cinder/exception.py b/cinder/exception.py index ac0cc157a30..da35a30b212 100644 --- a/cinder/exception.py +++ b/cinder/exception.py @@ -564,6 +564,18 @@ class ImageCopyFailure(Invalid): message = _("Failed to copy image to volume: %(reason)s") +class BackupInvalidCephArgs(Invalid): + message = _("Invalid Ceph args provided for backup rbd operation") + + +class BackupOperationError(Invalid): + message = _("An error has occurred during backup operation") + + +class BackupRBDOperationFailed(Invalid): + message = _("Backup RBD operation failed") + + class BackupVolumeInvalidType(Invalid): message = _("Backup volume %(volume_id)s type not recognised.") diff --git a/cinder/tests/backup/fake_rados.py b/cinder/tests/backup/fake_rados.py index 2169cc05fd7..b9d3cd6a695 100644 --- a/cinder/tests/backup/fake_rados.py +++ b/cinder/tests/backup/fake_rados.py @@ -16,7 +16,7 @@ class mock_rados(object): - class mock_ioctx(object): + class ioctx(object): def __init__(self, *args, **kwargs): pass @@ -32,7 +32,7 @@ class mock_rados(object): pass def open_ioctx(self, *args, **kwargs): - return mock_rados.mock_ioctx() + return mock_rados.ioctx() def shutdown(self, *args, **kwargs): pass @@ -44,22 +44,42 @@ class mock_rados(object): class mock_rbd(object): + class ImageBusy(Exception): + def __init__(self, *args, **kwargs): + pass + + class ImageNotFound(Exception): + def __init__(self, *args, **kwargs): + pass + class Image(object): def __init__(self, *args, **kwargs): pass - def read(self, *args, **kwargs): + def create_snap(self, *args, **kwargs): pass + def remove_snap(self, *args, **kwargs): + pass + + def read(self, *args, **kwargs): + raise NotImplementedError() + def write(self, *args, **kwargs): - pass + raise NotImplementedError() def resize(self, *args, **kwargs): + raise NotImplementedError() + + def close(self): pass - def close(self, *args, **kwargs): - pass + def list_snaps(self): + raise NotImplementedError() + + def size(self): + raise NotImplementedError() class RBD(object): @@ -72,6 +92,5 @@ class mock_rbd(object): def remove(self, *args, **kwargs): pass - class ImageNotFound(Exception): - def __init__(self, *args, **kwargs): - pass + def list(self, *args, **kwargs): + raise NotImplementedError() diff --git a/cinder/tests/test_backup_ceph.py b/cinder/tests/test_backup_ceph.py index a25fccde744..2dd69ace113 100644 --- a/cinder/tests/test_backup_ceph.py +++ b/cinder/tests/test_backup_ceph.py @@ -14,21 +14,22 @@ # under the License. """ Tests for Ceph backup service """ +import eventlet import hashlib import os import tempfile +import time import uuid -from cinder.backup.drivers.ceph import CephBackupDriver -from cinder.tests.backup.fake_rados import mock_rados -from cinder.tests.backup.fake_rados import mock_rbd - from cinder.backup.drivers import ceph from cinder import context from cinder import db from cinder import exception from cinder.openstack.common import log as logging from cinder import test +from cinder.tests.backup.fake_rados import mock_rados +from cinder.tests.backup.fake_rados import mock_rbd +from cinder.volume.drivers import rbd as rbddriver LOG = logging.getLogger(__name__) @@ -44,18 +45,30 @@ class BackupCephTestCase(test.TestCase): backup = {'id': backupid, 'size': size, 'volume_id': volid} return db.backup_create(self.ctxt, backup)['id'] + def fake_execute_w_exception(*args, **kwargs): + raise exception.ProcessExecutionError() + + def time_inc(self): + self.counter += 1 + return self.counter + + def _get_wrapped_rbd_io(self, rbd_image): + rbd_meta = rbddriver.RBDImageMetadata(rbd_image, 'pool_foo', + 'user_foo', 'conf_foo') + return rbddriver.RBDImageIOWrapper(rbd_meta) + def setUp(self): super(BackupCephTestCase, self).setUp() self.ctxt = context.get_admin_context() - self.vol_id = str(uuid.uuid4()) + self.volume_id = str(uuid.uuid4()) self.backup_id = str(uuid.uuid4()) # Setup librbd stubs self.stubs.Set(ceph, 'rados', mock_rados) self.stubs.Set(ceph, 'rbd', mock_rbd) - self._create_backup_db_entry(self.backup_id, self.vol_id, 1) + self._create_backup_db_entry(self.backup_id, self.volume_id, 1) self.chunk_size = 1024 self.num_chunks = 128 @@ -72,30 +85,108 @@ class BackupCephTestCase(test.TestCase): self.volume_file.seek(0) + # Always trigger an exception if a command is executed since it should + # always be dealt with gracefully. At time of writing on rbd + # export/import-diff is executed and if they fail we expect to find + # alternative means of backing up. + fake_exec = self.fake_execute_w_exception + self.service = ceph.CephBackupDriver(self.ctxt, execute=fake_exec) + + # Ensure that time.time() always returns more than the last time it was + # called to avoid div by zero errors. + self.counter = float(0) + self.stubs.Set(time, 'time', self.time_inc) + self.stubs.Set(eventlet, 'sleep', lambda *args: None) + def test_get_rbd_support(self): - service = CephBackupDriver(self.ctxt) + self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING')) + self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2')) - self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_LAYERING')) - self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_STRIPINGV2')) - - oldformat, features = service._get_rbd_support() + oldformat, features = self.service._get_rbd_support() self.assertTrue(oldformat) self.assertEquals(features, 0) - service.rbd.RBD_FEATURE_LAYERING = 1 + self.service.rbd.RBD_FEATURE_LAYERING = 1 - oldformat, features = service._get_rbd_support() + oldformat, features = self.service._get_rbd_support() self.assertFalse(oldformat) self.assertEquals(features, 1) - service.rbd.RBD_FEATURE_STRIPINGV2 = 2 + self.service.rbd.RBD_FEATURE_STRIPINGV2 = 2 - oldformat, features = service._get_rbd_support() + oldformat, features = self.service._get_rbd_support() self.assertFalse(oldformat) self.assertEquals(features, 1 | 2) - def test_tranfer_data_from_rbd(self): - service = CephBackupDriver(self.ctxt) + def _set_common_backup_stubs(self, service): + self.stubs.Set(self.service, '_get_rbd_support', lambda: (True, 3)) + self.stubs.Set(self.service, 'get_backup_snaps', + lambda *args, **kwargs: None) + + def rbd_size(inst): + return self.chunk_size * self.num_chunks + + self.stubs.Set(self.service.rbd.Image, 'size', rbd_size) + + def _set_common_restore_stubs(self, service): + self._set_common_backup_stubs(self.service) + + def rbd_size(inst): + return self.chunk_size * self.num_chunks + + self.stubs.Set(self.service.rbd.Image, 'size', rbd_size) + + def test_get_most_recent_snap(self): + last = 'backup.%s.snap.9824923.1212' % (uuid.uuid4()) + + def list_snaps(inst, *args): + return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4())}, + {'name': 'backup.%s.snap.1321319.3235' % (uuid.uuid4())}, + {'name': last}, + {'name': 'backup.%s.snap.3824923.1412' % (uuid.uuid4())}] + + self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps) + + snap = self.service._get_most_recent_snap(self.service.rbd.Image()) + + self.assertEquals(last, snap) + + def test_get_backup_snap_name(self): + snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4()) + + def mock_get_backup_snaps(inst, *args): + return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4()), + 'backup_id': str(uuid.uuid4())}, + {'name': snap_name, + 'backup_id': self.backup_id}] + + self.stubs.Set(self.service, 'get_backup_snaps', lambda *args: None) + name = self.service._get_backup_snap_name(self.service.rbd.Image(), + 'base_foo', + self.backup_id) + self.assertIsNone(name) + + self.stubs.Set(self.service, 'get_backup_snaps', mock_get_backup_snaps) + name = self.service._get_backup_snap_name(self.service.rbd.Image(), + 'base_foo', + self.backup_id) + self.assertEquals(name, snap_name) + + def test_get_backup_snaps(self): + + def list_snaps(inst, *args): + return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4())}, + {'name': 'backup.%s.wambam.6423868.2342' % (uuid.uuid4())}, + {'name': 'backup.%s.snap.1321319.3235' % (uuid.uuid4())}, + {'name': 'bbbackup.%s.snap.1321319.3235' % (uuid.uuid4())}, + {'name': 'backup.%s.snap.3824923.1412' % (uuid.uuid4())}] + + self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps) + snaps = self.service.get_backup_snaps(self.service.rbd.Image()) + self.assertTrue(len(snaps) == 3) + + def test_transfer_data_from_rbd_to_file(self): + self._set_common_backup_stubs(self.service) with tempfile.NamedTemporaryFile() as test_file: self.volume_file.seek(0) @@ -103,10 +194,11 @@ class BackupCephTestCase(test.TestCase): def read_data(inst, offset, length): return self.volume_file.read(self.length) - self.stubs.Set(service.rbd.Image, 'read', read_data) + self.stubs.Set(self.service.rbd.Image, 'read', read_data) - service._transfer_data(service.rbd.Image(), test_file, 'foo', - self.length) + rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image()) + self.service._transfer_data(rbd_io, 'src_foo', test_file, + 'dest_foo', self.length) checksum = hashlib.sha256() test_file.seek(0) @@ -116,26 +208,79 @@ class BackupCephTestCase(test.TestCase): # Ensure the files are equal self.assertEquals(checksum.digest(), self.checksum.digest()) - def test_tranfer_data_to_rbd(self): - service = CephBackupDriver(self.ctxt) + def test_transfer_data_from_rbd_to_rbd(self): + def rbd_size(inst): + return self.chunk_size * self.num_chunks with tempfile.NamedTemporaryFile() as test_file: + self.volume_file.seek(0) + checksum = hashlib.sha256() + + def read_data(inst, offset, length): + return self.volume_file.read(self.length) + + def write_data(inst, data, offset): + checksum.update(data) + test_file.write(data) + + self.stubs.Set(self.service.rbd.Image, 'read', read_data) + self.stubs.Set(self.service.rbd.Image, 'size', rbd_size) + self.stubs.Set(self.service.rbd.Image, 'write', write_data) + + rbd1 = self.service.rbd.Image() + rbd2 = self.service.rbd.Image() + + src_rbd_io = self._get_wrapped_rbd_io(rbd1) + dest_rbd_io = self._get_wrapped_rbd_io(rbd2) + self.service._transfer_data(src_rbd_io, 'src_foo', dest_rbd_io, + 'dest_foo', self.length) + + # Ensure the files are equal + self.assertEquals(checksum.digest(), self.checksum.digest()) + + def test_transfer_data_from_file_to_rbd(self): + self._set_common_backup_stubs(self.service) + + with tempfile.NamedTemporaryFile() as test_file: + self.volume_file.seek(0) checksum = hashlib.sha256() def write_data(inst, data, offset): checksum.update(data) test_file.write(data) - self.stubs.Set(service.rbd.Image, 'write', write_data) + self.stubs.Set(self.service.rbd.Image, 'write', write_data) - service._transfer_data(self.volume_file, service.rbd.Image(), - 'foo', self.length, dest_is_rbd=True) + rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image()) + self.service._transfer_data(self.volume_file, 'src_foo', + rbd_io, 'dest_foo', self.length) + + # Ensure the files are equal + self.assertEquals(checksum.digest(), self.checksum.digest()) + + def test_transfer_data_from_file_to_file(self): + self._set_common_backup_stubs(self.service) + + with tempfile.NamedTemporaryFile() as test_file: + self.volume_file.seek(0) + checksum = hashlib.sha256() + + self.service._transfer_data(self.volume_file, 'src_foo', test_file, + 'dest_foo', self.length) + + checksum = hashlib.sha256() + test_file.seek(0) + for c in xrange(0, self.num_chunks): + checksum.update(test_file.read(self.chunk_size)) # Ensure the files are equal self.assertEquals(checksum.digest(), self.checksum.digest()) def test_backup_volume_from_file(self): - service = CephBackupDriver(self.ctxt) + self._create_volume_db_entry(self.volume_id, 1) + backup = db.backup_get(self.ctxt, self.backup_id) + + self._set_common_backup_stubs(self.service) with tempfile.NamedTemporaryFile() as test_file: checksum = hashlib.sha256() @@ -144,65 +289,96 @@ class BackupCephTestCase(test.TestCase): checksum.update(data) test_file.write(data) - self.stubs.Set(service.rbd.Image, 'write', write_data) + self.stubs.Set(self.service.rbd.Image, 'write', write_data) - service._backup_volume_from_file('foo', self.length, - self.volume_file) + self.service.backup(backup, self.volume_file) # Ensure the files are equal self.assertEquals(checksum.digest(), self.checksum.digest()) - def tearDown(self): - self.volume_file.close() - super(BackupCephTestCase, self).tearDown() + def test_get_backup_base_name(self): + name = self.service._get_backup_base_name(self.volume_id, + diff_format=True) + self.assertEquals(name, "volume-%s.backup.base" % (self.volume_id)) - def test_backup_error1(self): - service = CephBackupDriver(self.ctxt) + self.assertRaises(exception.InvalidParameterValue, + self.service._get_backup_base_name, + self.volume_id) + + name = self.service._get_backup_base_name(self.volume_id, '1234') + self.assertEquals(name, "volume-%s.backup.%s" % + (self.volume_id, '1234')) + + def test_backup_volume_from_rbd(self): + self._create_volume_db_entry(self.volume_id, 1) backup = db.backup_get(self.ctxt, self.backup_id) - self._create_volume_db_entry(self.vol_id, 0) - self.assertRaises(exception.InvalidParameterValue, service.backup, + + self._set_common_backup_stubs(self.service) + + backup_name = self.service._get_backup_base_name(self.backup_id, + diff_format=True) + + self.stubs.Set(self.service, '_try_delete_base_image', + lambda *args, **kwargs: None) + + with tempfile.NamedTemporaryFile() as test_file: + checksum = hashlib.sha256() + + def write_data(inst, data, offset): + checksum.update(data) + test_file.write(data) + + def read_data(inst, offset, length): + return self.volume_file.read(self.length) + + def rbd_list(inst, ioctx): + return [backup_name] + + self.stubs.Set(self.service.rbd.Image, 'read', read_data) + self.stubs.Set(self.service.rbd.Image, 'write', write_data) + self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) + + meta = rbddriver.RBDImageMetadata(self.service.rbd.Image(), + 'pool_foo', 'user_foo', + 'conf_foo') + rbd_io = rbddriver.RBDImageIOWrapper(meta) + + self.service.backup(backup, rbd_io) + + # Ensure the files are equal + self.assertEquals(checksum.digest(), self.checksum.digest()) + + def test_backup_vol_length_0(self): + self._set_common_backup_stubs(self.service) + + backup = db.backup_get(self.ctxt, self.backup_id) + self._create_volume_db_entry(self.volume_id, 0) + self.assertRaises(exception.InvalidParameterValue, self.service.backup, backup, self.volume_file) - def test_backup_error2(self): - service = CephBackupDriver(self.ctxt) - backup = db.backup_get(self.ctxt, self.backup_id) - self._create_volume_db_entry(self.vol_id, 1) - self.assertRaises(exception.BackupVolumeInvalidType, service.backup, - backup, None) - - def test_backup_good(self): - service = CephBackupDriver(self.ctxt) - backup = db.backup_get(self.ctxt, self.backup_id) - self._create_volume_db_entry(self.vol_id, 1) - - with tempfile.NamedTemporaryFile() as test_file: - checksum = hashlib.sha256() - - def write_data(inst, data, offset): - checksum.update(data) - test_file.write(data) - - self.stubs.Set(service.rbd.Image, 'write', write_data) - - service.backup(backup, self.volume_file) - - # Ensure the files are equal - self.assertEquals(checksum.digest(), self.checksum.digest()) - def test_restore(self): - service = CephBackupDriver(self.ctxt) - self._create_volume_db_entry(self.vol_id, 1) + self._create_volume_db_entry(self.volume_id, 1) backup = db.backup_get(self.ctxt, self.backup_id) + self._set_common_restore_stubs(self.service) + + backup_name = self.service._get_backup_base_name(self.backup_id, + diff_format=True) + + def rbd_list(inst, ioctx): + return [backup_name] + + self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) + with tempfile.NamedTemporaryFile() as test_file: self.volume_file.seek(0) def read_data(inst, offset, length): return self.volume_file.read(self.length) - self.stubs.Set(service.rbd.Image, 'read', read_data) + self.stubs.Set(self.service.rbd.Image, 'read', read_data) - service.restore(backup, self.vol_id, test_file) + self.service.restore(backup, self.volume_id, test_file) checksum = hashlib.sha256() test_file.seek(0) @@ -212,17 +388,183 @@ class BackupCephTestCase(test.TestCase): # Ensure the files are equal self.assertEquals(checksum.digest(), self.checksum.digest()) - def test_delete(self): - service = CephBackupDriver(self.ctxt) - self._create_volume_db_entry(self.vol_id, 1) + def test_create_base_image_if_not_exists(self): + pass + + def test_delete_backup_snapshots(self): + snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4()) + base_name = self.service._get_backup_base_name(self.volume_id, + diff_format=True) + + self.stubs.Set(self.service, '_get_backup_snap_name', + lambda *args: snap_name) + + self.stubs.Set(self.service, 'get_backup_snaps', + lambda *args: None) + + rem = self.service._delete_backup_snapshots(mock_rados(), base_name, + self.backup_id) + + self.assertEquals(rem, 0) + + def test_try_delete_base_image_diff_format(self): + # don't create volume db entry since it should not be required backup = db.backup_get(self.ctxt, self.backup_id) + backup_name = self.service._get_backup_base_name(self.volume_id, + diff_format=True) + + snap_name = self.service._get_new_snap_name(self.backup_id) + snaps = [{'name': snap_name}] + + def rbd_list(*args): + return [backup_name] + + def list_snaps(*args): + return snaps + + def remove_snap(*args): + snaps.pop() + + self.stubs.Set(self.service.rbd.Image, 'remove_snap', remove_snap) + self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps) + self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) + # Must be something mutable remove_called = [] def remove(inst, ioctx, name): remove_called.append(True) - self.stubs.Set(service.rbd.RBD, 'remove', remove) - service.delete(backup) + self.stubs.Set(self.service.rbd.RBD, 'remove', remove) + self.service.delete(backup) self.assertTrue(remove_called[0]) + + def test_try_delete_base_image(self): + # don't create volume db entry since it should not be required + self._create_volume_db_entry(self.volume_id, 1) + backup = db.backup_get(self.ctxt, self.backup_id) + + backup_name = self.service._get_backup_base_name(self.volume_id, + self.backup_id) + + def rbd_list(inst, ioctx): + return [backup_name] + + self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) + + # Must be something mutable + remove_called = [] + + self.stubs.Set(self.service, 'get_backup_snaps', + lambda *args, **kwargs: None) + + def remove(inst, ioctx, name): + remove_called.append(True) + + self.stubs.Set(self.service.rbd.RBD, 'remove', remove) + self.service.delete(backup) + self.assertTrue(remove_called[0]) + + def test_try_delete_base_image_busy(self): + """This should induce retries then raise rbd.ImageBusy.""" + # don't create volume db entry since it should not be required + self._create_volume_db_entry(self.volume_id, 1) + backup = db.backup_get(self.ctxt, self.backup_id) + + backup_name = self.service._get_backup_base_name(self.volume_id, + self.backup_id) + + def rbd_list(inst, ioctx): + return [backup_name] + + self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) + + # Must be something mutable + remove_called = [] + + self.stubs.Set(self.service, 'get_backup_snaps', + lambda *args, **kwargs: None) + + def remove(inst, ioctx, name): + raise self.service.rbd.ImageBusy("image busy") + + self.stubs.Set(self.service.rbd.RBD, 'remove', remove) + + self.assertRaises(self.service.rbd.ImageBusy, + self.service._try_delete_base_image, + backup['id'], backup['volume_id']) + + def test_delete(self): + backup = db.backup_get(self.ctxt, self.backup_id) + + def del_base_image(*args): + pass + + self.stubs.Set(self.service, '_try_delete_base_image', + lambda *args: None) + + self.service.delete(backup) + + def test_delete_image_not_found(self): + backup = db.backup_get(self.ctxt, self.backup_id) + + def del_base_image(*args): + raise self.service.rbd.ImageNotFound + + self.stubs.Set(self.service, '_try_delete_base_image', + lambda *args: None) + + # ImageNotFound exception is caught so that db entry can be cleared + self.service.delete(backup) + + def test_diff_restore_allowed_true(self): + is_allowed = (True, 'restore.foo') + backup = db.backup_get(self.ctxt, self.backup_id) + alt_volume_id = str(uuid.uuid4()) + self._create_volume_db_entry(alt_volume_id, 1) + alt_volume = db.volume_get(self.ctxt, alt_volume_id) + rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image()) + + self.stubs.Set(self.service, '_get_restore_point', + lambda *args: 'restore.foo') + + self.stubs.Set(self.service, '_rbd_has_extents', + lambda *args: False) + + self.stubs.Set(self.service, '_rbd_image_exists', + lambda *args: (True, 'foo')) + + self.stubs.Set(self.service, '_file_is_rbd', lambda *args: True) + + resp = self.service._diff_restore_allowed('foo', backup, alt_volume, + rbd_io, mock_rados()) + self.assertEquals(resp, is_allowed) + + def test_diff_restore_allowed_false(self): + not_allowed = (False, None) + backup = db.backup_get(self.ctxt, self.backup_id) + self._create_volume_db_entry(self.volume_id, 1) + original_volume = db.volume_get(self.ctxt, self.volume_id) + rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image()) + + self.stubs.Set(self.service, '_get_restore_point', + lambda *args: None) + + self.stubs.Set(self.service, '_rbd_has_extents', + lambda *args: True) + + self.stubs.Set(self.service, '_rbd_image_exists', + lambda *args: (False, 'foo')) + + self.stubs.Set(self.service, '_file_is_rbd', lambda *args: False) + + resp = self.service._diff_restore_allowed('foo', backup, + original_volume, rbd_io, + mock_rados()) + self.assertEquals(resp, not_allowed) + + def tearDown(self): + self.volume_file.close() + self.stubs.UnsetAll() + super(BackupCephTestCase, self).tearDown() diff --git a/cinder/tests/test_rbd.py b/cinder/tests/test_rbd.py index 024a046c230..473172c51f9 100644 --- a/cinder/tests/test_rbd.py +++ b/cinder/tests/test_rbd.py @@ -127,9 +127,13 @@ class RBDTestCase(test.TestCase): volume = dict(name=name) mock_client = self.mox.CreateMockAnything() self.mox.StubOutWithMock(driver, 'RADOSClient') + self.stubs.Set(self.driver, '_get_backup_snaps', lambda *args: None) driver.RADOSClient(self.driver).AndReturn(mock_client) mock_client.__enter__().AndReturn(mock_client) + mock_image = self.mox.CreateMockAnything() + self.rbd.Image(mox.IgnoreArg(), str(name)).AndReturn(mock_image) + mock_image.close() mock_rbd = self.mox.CreateMockAnything() self.rbd.RBD().AndReturn(mock_rbd) mock_rbd.remove(mox.IgnoreArg(), str(name)) diff --git a/cinder/volume/drivers/rbd.py b/cinder/volume/drivers/rbd.py index b5a8b0a71ad..5fa180b8c98 100644 --- a/cinder/volume/drivers/rbd.py +++ b/cinder/volume/drivers/rbd.py @@ -24,17 +24,15 @@ import os import tempfile import urllib -from oslo.config import cfg - +import cinder.backup.drivers.ceph as ceph_backup from cinder import exception from cinder.image import image_utils -from cinder import units -from cinder import utils - from cinder.openstack.common import fileutils from cinder.openstack.common import log as logging +from cinder import units +from cinder import utils from cinder.volume import driver - +from oslo.config import cfg try: import rados @@ -85,28 +83,53 @@ def ascii_str(string): return str(string) +class RBDImageMetadata(object): + """RBD image metadata to be used with RBDImageIOWrapper""" + def __init__(self, image, pool, user, conf): + self.image = image + self.pool = str(pool) + self.user = str(user) + self.conf = str(conf) + + class RBDImageIOWrapper(io.RawIOBase): """ Wrapper to provide standard Python IO interface to RBD images so that they can be treated as files. """ - def __init__(self, rbd_image): + def __init__(self, rbd_meta): super(RBDImageIOWrapper, self).__init__() - self.rbd_image = rbd_image + self._rbd_meta = rbd_meta self._offset = 0 def _inc_offset(self, length): self._offset += length + @property + def rbd_image(self): + return self._rbd_meta.image + + @property + def rbd_user(self): + return self._rbd_meta.user + + @property + def rbd_pool(self): + return self._rbd_meta.pool + + @property + def rbd_conf(self): + return self._rbd_meta.conf + def read(self, length=None): offset = self._offset - total = self.rbd_image.size() + total = self._rbd_meta.image.size() # (dosaboy): posix files do not barf if you read beyond their length # (they just return nothing) but rbd images do so we need to return # empty string if we are at the end of the image - if (offset == total): + if (offset >= total): return '' if length is None: @@ -116,10 +139,10 @@ class RBDImageIOWrapper(io.RawIOBase): length = total - offset self._inc_offset(length) - return self.rbd_image.read(int(offset), int(length)) + return self._rbd_meta.image.read(int(offset), int(length)) def write(self, data): - self.rbd_image.write(data, self._offset) + self._rbd_meta.image.write(data, self._offset) self._inc_offset(len(data)) def seekable(self): @@ -147,10 +170,9 @@ class RBDImageIOWrapper(io.RawIOBase): def flush(self): try: - self.rbd_image.flush() - except AttributeError as exc: - LOG.warning("flush() not supported in this version of librbd - " - "%s" % (str(rbd.RBD().version()))) + self._rbd_meta.image.flush() + except AttributeError: + LOG.warning(_("flush() not supported in this version of librbd")) def fileno(self): """ @@ -274,6 +296,14 @@ class RBDDriver(driver.VolumeDriver): ioctx.close() client.shutdown() + def _get_backup_snaps(self, rbd_image): + """Get list of any backup snapshots that exist on this volume. + + There should only ever be one but accept all since they need to be + deleted before the volume can be. + """ + return ceph_backup.CephBackupDriver.get_backup_snaps(rbd_image) + def _get_mon_addrs(self): args = ['ceph', 'mon', 'dump', '--format=json'] + self._ceph_args() out, _ = self._execute(*args) @@ -386,6 +416,16 @@ class RBDDriver(driver.VolumeDriver): def delete_volume(self, volume): """Deletes a logical volume.""" with RADOSClient(self) as client: + # Ensure any backup snapshots are deleted + rbd_image = self.rbd.Image(client.ioctx, str(volume['name'])) + try: + backup_snaps = self._get_backup_snaps(rbd_image) + if backup_snaps: + for snap in backup_snaps: + rbd_image.remove_snap(snap['name']) + finally: + rbd_image.close() + try: self.rbd.RBD().remove(client.ioctx, str(volume['name'])) except self.rbd.ImageHasSnapshots: @@ -540,8 +580,11 @@ class RBDDriver(driver.VolumeDriver): pool = self.configuration.rbd_pool volname = volume['name'] - with RBDVolumeProxy(self, volname, pool, read_only=True) as rbd_image: - rbd_fd = RBDImageIOWrapper(rbd_image) + with RBDVolumeProxy(self, volname, pool) as rbd_image: + rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool, + self.configuration.rbd_user, + self.configuration.rbd_ceph_conf) + rbd_fd = RBDImageIOWrapper(rbd_meta) backup_service.backup(backup, rbd_fd) LOG.debug("volume backup complete.") @@ -551,7 +594,10 @@ class RBDDriver(driver.VolumeDriver): pool = self.configuration.rbd_pool with RBDVolumeProxy(self, volume['name'], pool) as rbd_image: - rbd_fd = RBDImageIOWrapper(rbd_image) + rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool, + self.configuration.rbd_user, + self.configuration.rbd_ceph_conf) + rbd_fd = RBDImageIOWrapper(rbd_meta) backup_service.restore(backup, volume['id'], rbd_fd) LOG.debug("volume restore complete.") diff --git a/etc/cinder/cinder.conf.sample b/etc/cinder/cinder.conf.sample index 2a80c607188..c605266efdd 100644 --- a/etc/cinder/cinder.conf.sample +++ b/etc/cinder/cinder.conf.sample @@ -1127,6 +1127,9 @@ # Options defined in cinder.volume.drivers.rbd # +# path to the ceph configuration file to use (string) +#rbd_ceph_conf=/etc/ceph/ceph.conf + # the RADOS pool in which rbd volumes are stored (string # value) #rbd_pool=rbd