Added incremental backup support to Ceph backup driver

The Ceph backup driver is now capable of doing differential
and incremental backups between or within Ceph clusters.

Implements: blueprint cinder-backup-to-ceph

Change-Id: Id59bf1963c6d35aae4baf6f49be17340982c205c
This commit is contained in:
Edward Hope-Morley 2013-07-02 09:40:44 +01:00
parent b123edca4f
commit 4796efe60d
7 changed files with 1277 additions and 218 deletions

View File

@ -13,19 +13,46 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Ceph Backup Service Implementation"""
"""Ceph Backup Service Implementation.
import os
import time
This driver supports backing up volumes of any type to a Ceph backend store. It
is also capable of detecting whether the volume to be backed up is a Ceph RBD
volume and if so, attempts to perform incremental/differential backups.
Support is also included for the following in the case of source volume being a
Ceph RBD volume:
* backing up within the same Ceph pool (not recommended)
* backing up between different Ceph pools
* backing up between different Ceph clusters
At the time of writing, differential backup support in Ceph/librbd was quite
new so this driver accounts for this by first attempting differential backup
and falling back to full backup/copy if the former fails.
If incremental backups are used, multiple backups of the same volume are stored
as snapshots so that minimal space is consumed in the backup store and
restoring the volume takes a far reduced amount of time compared to a full
copy.
Note that Cinder supports restoring to a new volume or the original volume the
backup was taken from. For the latter case, a full copy is enforced since this
was deemed the safest action to take. It is therefore recommended to always
restore to a new volume (default).
"""
import eventlet
from oslo.config import cfg
import os
import re
import time
from cinder.backup.driver import BackupDriver
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import units
import cinder.volume.drivers.rbd as rbddriver
from cinder import utils
import cinder.volume.drivers as drivers
from oslo.config import cfg
try:
import rados
@ -57,58 +84,93 @@ CONF.register_opts(service_opts)
class CephBackupDriver(BackupDriver):
"""Backup up Cinder volumes to Ceph Object Store"""
"""Backup up Cinder volumes to Ceph Object Store.
def __init__(self, context, db_driver=None):
This class enables backing up Cinder volumes to a Ceph object store.
Backups may be stored in their own pool or even cluster. Store location is
defined by the Ceph conf file and Service config options supplied.
If the source volume is itself an RBD volume, the backup will be performed
using incremental differential backups which *should* give a performance
gain.
"""
def __init__(self, context, db_driver=None, execute=None):
super(CephBackupDriver, self).__init__(db_driver)
self.rbd = rbd
self.rados = rados
self.context = context
self.chunk_size = CONF.backup_ceph_chunk_size
if self._supports_stripingv2():
self._execute = execute or utils.execute
if self._supports_stripingv2:
self.rbd_stripe_unit = CONF.backup_ceph_stripe_unit
self.rbd_stripe_count = CONF.backup_ceph_stripe_count
else:
LOG.info("rbd striping not supported - ignoring conf settings "
"for rbd striping")
LOG.info(_("rbd striping not supported - ignoring configuration "
"settings for rbd striping"))
self.rbd_stripe_count = 0
self.rbd_stripe_unit = 0
self._ceph_user = str(CONF.backup_ceph_user)
self._ceph_pool = str(CONF.backup_ceph_pool)
self._ceph_conf = str(CONF.backup_ceph_conf)
self._ceph_backup_user = str(CONF.backup_ceph_user)
self._ceph_backup_pool = str(CONF.backup_ceph_pool)
self._ceph_backup_conf = str(CONF.backup_ceph_conf)
def _validate_string_args(self, *args):
"""Ensure all args are non-None and non-empty."""
return all(args)
def _ceph_args(self, user, conf=None, pool=None):
"""Create default ceph args for executing rbd commands.
If no --conf is provided, rbd will look in the default locations e.g.
/etc/ceph/ceph.conf
"""
# Make sure user arg is valid since rbd command may not fail if
# invalid/no user provided, resulting in unexpected behaviour.
if not self._validate_string_args(user):
raise exception.BackupInvalidCephArgs(_("invalid user '%s'") %
(user))
args = ['--id', user]
if conf:
args += ['--conf', conf]
if pool:
args += '--pool', pool
return args
@property
def _supports_layering(self):
"""
Determine whether copy-on-write is supported by our version of librbd
"""
"""Determine if copy-on-write is supported by our version of librbd."""
return hasattr(self.rbd, 'RBD_FEATURE_LAYERING')
@property
def _supports_stripingv2(self):
"""
Determine whether striping is supported by our version of librbd
"""
"""Determine if striping is supported by our version of librbd."""
return hasattr(self.rbd, 'RBD_FEATURE_STRIPINGV2')
def _get_rbd_support(self):
"""Determine RBD features supported by our version of librbd."""
old_format = True
features = 0
if self._supports_layering():
if self._supports_layering:
old_format = False
features |= self.rbd.RBD_FEATURE_LAYERING
if self._supports_stripingv2():
if self._supports_stripingv2:
old_format = False
features |= self.rbd.RBD_FEATURE_STRIPINGV2
return (old_format, features)
def _connect_to_rados(self, pool=None):
"""Establish connection to the Ceph cluster"""
client = self.rados.Rados(rados_id=self._ceph_user,
conffile=self._ceph_conf)
"""Establish connection to the backup Ceph cluster."""
client = self.rados.Rados(rados_id=self._ceph_backup_user,
conffile=self._ceph_backup_conf)
try:
client.connect()
pool_to_open = str(pool or self._ceph_pool)
pool_to_open = str(pool or self._ceph_backup_pool)
ioctx = client.open_ioctx(pool_to_open)
return client, ioctx
except self.rados.Error:
@ -117,157 +179,728 @@ class CephBackupDriver(BackupDriver):
raise
def _disconnect_from_rados(self, client, ioctx):
"""Terminate connection with the Ceph cluster"""
"""Terminate connection with the backup Ceph cluster."""
# closing an ioctx cannot raise an exception
ioctx.close()
client.shutdown()
def _get_backup_base_name(self, volume_id, backup_id):
"""Return name of base image used for backup."""
def _get_backup_base_name(self, volume_id, backup_id=None,
diff_format=False):
"""Return name of base image used for backup.
Incremental backups use a new base name so we support old and new style
format.
"""
# Ensure no unicode
return str("volume-%s.backup.%s" % (volume_id, backup_id))
if diff_format:
return str("volume-%s.backup.base" % (volume_id))
else:
if backup_id is None:
msg = _("backup_id required")
raise exception.InvalidParameterValue(msg)
return str("volume-%s.backup.%s" % (volume_id, backup_id))
def _transfer_data(self, src, src_name, dest, dest_name, length):
"""Transfer data between files (Python IO objects)."""
LOG.debug(_("transferring data between '%(src)s' and '%(dest)s'") %
{'src': src_name, 'dest': dest_name})
def _transfer_data(self, src, dest, dest_name, length, dest_is_rbd=False):
"""
Transfer data between file and rbd. If destination is rbd, source is
assumed to be file, otherwise source is assumed to be rbd.
"""
chunks = int(length / self.chunk_size)
LOG.debug("transferring %s chunks of %s bytes to '%s'" %
(chunks, self.chunk_size, dest_name))
LOG.debug(_("%(chunks)s chunks of %(bytes)s bytes to be transferred") %
{'chunks': chunks, 'bytes': self.chunk_size})
for chunk in xrange(0, chunks):
offset = chunk * self.chunk_size
before = time.time()
if dest_is_rbd:
dest.write(src.read(self.chunk_size), offset)
# note(dosaboy): librbd writes are synchronous so flush() will
# have not effect. Also, flush only supported in more recent
# versions of librbd.
else:
dest.write(src.read(offset, self.chunk_size))
dest.flush()
data = src.read(self.chunk_size)
dest.write(data)
dest.flush()
delta = (time.time() - before)
rate = (self.chunk_size / delta) / 1024
LOG.debug("transferred chunk %s of %s (%dK/s)" %
(chunk, chunks, rate))
LOG.debug((_("transferred chunk %(chunk)s of %(chunks)s "
"(%(rate)dK/s)") %
{'chunk': chunk, 'chunks': chunks,
'rate': rate}))
# yield to any other pending backups
eventlet.sleep(0)
rem = int(length % self.chunk_size)
if rem:
LOG.debug("transferring remaining %s bytes" % (rem))
offset = (length - rem)
if dest_is_rbd:
dest.write(src.read(rem), offset)
# note(dosaboy): librbd writes are synchronous so flush() will
# have not effect. Also, flush only supported in more recent
# versions of librbd.
else:
dest.write(src.read(offset, rem))
dest.flush()
LOG.debug(_("transferring remaining %s bytes") % (rem))
data = src.read(rem)
dest.write(data)
dest.flush()
# yield to any other pending backups
eventlet.sleep(0)
def _backup_volume_from_file(self, backup_name, backup_size, volume_file):
"""Backup a volume from file stream"""
LOG.debug("performing backup from file")
def _create_base_image(self, name, size, rados_client):
"""Create a base backup image.
This will be the base image used for storing differential exports.
"""
LOG.debug(_("creating base image '%s'") % (name))
old_format, features = self._get_rbd_support()
self.rbd.RBD().create(ioctx=rados_client.ioctx,
name=name,
size=size,
old_format=old_format,
features=features,
stripe_unit=self.rbd_stripe_unit,
stripe_count=self.rbd_stripe_count)
with rbddriver.RADOSClient(self, self._ceph_pool) as client:
def _delete_backup_snapshots(self, rados_client, base_name, backup_id):
"""Delete any snapshots associated with this backup.
A backup should have at most *one* associated snapshot.
This is required before attempting to delete the base image. The
snapshots on the original volume can be left as they will be purged
when the volume is deleted.
"""
backup_snaps = None
base_rbd = self.rbd.Image(rados_client.ioctx, base_name)
try:
snap_name = self._get_backup_snap_name(base_rbd, base_name,
backup_id)
if snap_name:
LOG.debug(_("deleting backup snapshot='%s'") % (snap_name))
base_rbd.remove_snap(snap_name)
else:
LOG.debug(_("no backup snapshot to delete"))
# Now check whether any snapshots remain on the base image
backup_snaps = self.get_backup_snaps(base_rbd)
finally:
base_rbd.close()
if backup_snaps:
return len(backup_snaps)
else:
return 0
def _try_delete_base_image(self, backup_id, volume_id, base_name=None):
"""Try to delete backup RBD image.
If the rbd image is a base image for incremental backups, it may have
snapshots. Delete the snapshot associated with backup_id and if the
image has no more snapshots, delete it. Otherwise return.
If no base name is provided try normal (full) format then diff format
image name.
If a base name is provided but does not exist, ImageNotFound will be
raised.
If the image is busy, a number of retries will be performed if
ImageBusy is received, after which the exception will be propagated to
the caller.
"""
retries = 3
delay = 5
try_diff_format = False
if base_name is None:
try_diff_format = True
base_name = self._get_backup_base_name(volume_id, backup_id)
LOG.debug(_("trying diff format name format basename='%s'") %
(base_name))
with drivers.rbd.RADOSClient(self) as client:
rbd_exists, base_name = \
self._rbd_image_exists(base_name, volume_id, client,
try_diff_format=try_diff_format)
if not rbd_exists:
raise self.rbd.ImageNotFound(_("image %s not found") %
(base_name))
while retries >= 0:
# First delete associated snapshot (if exists)
rem = self._delete_backup_snapshots(client, base_name,
backup_id)
if rem:
msg = (_("base image still has %s snapshots so not "
"deleting base image") % (rem))
LOG.info(msg)
return
LOG.info(_("deleting base image='%s'") % (base_name))
# Delete base if no more snapshots
try:
self.rbd.RBD().remove(client.ioctx, base_name)
except self.rbd.ImageBusy as exc:
# Allow a retry if the image is busy
if retries > 0:
LOG.info((_("image busy, retrying %(retries)s "
"more time(s) in %(delay)ss") %
{'retries': retries, 'delay': delay}))
eventlet.sleep(delay)
else:
LOG.error(_("max retries reached - raising error"))
raise exc
else:
LOG.debug(_("base backup image='%s' deleted)") %
(base_name))
retries = 0
finally:
retries -= 1
def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
src_user, src_conf, dest_user, dest_conf,
src_snap=None, from_snap=None):
"""Backup only extents changed between two points.
If no snapshot is provided, the diff extents will be all those changed
since the rbd volume/base was created, otherwise it will be those
changed since the snapshot was created.
"""
LOG.debug(_("performing differential transfer from '%(src)s' to "
"'%(dest)s'") %
{'src': src_name, 'dest': dest_name})
# NOTE(dosaboy): Need to be tolerant of clusters/clients that do
# not support these operations since at the time of writing they
# were very new.
src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool)
dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool)
try:
cmd = ['rbd', 'export-diff'] + src_ceph_args
if from_snap is not None:
cmd += ['--from-snap', from_snap]
if src_snap:
path = str("%s/%s@%s" % (src_pool, src_name, src_snap))
else:
path = str("%s/%s" % (src_pool, src_name))
cmd += [path, '-']
out, err = self._execute(*cmd)
except (exception.ProcessExecutionError, exception.Error) as exc:
LOG.info(_("rbd export-diff failed - %s") % (str(exc)))
raise exception.BackupRBDOperationFailed("rbd export-diff failed")
try:
cmd = ['rbd', 'import-diff'] + dest_ceph_args
cmd += ['-', str("%s/%s" % (dest_pool, dest_name))]
out, err = self._execute(*cmd, process_input=out)
except (exception.ProcessExecutionError, exception.Error) as exc:
LOG.info(_("rbd import-diff failed - %s") % (str(exc)))
raise exception.BackupRBDOperationFailed("rbd import-diff failed")
def _rbd_image_exists(self, name, volume_id, client,
try_diff_format=False):
"""Return tuple (exists, name)."""
rbds = self.rbd.RBD().list(client.ioctx)
if name not in rbds:
msg = _("image '%s' not found - trying diff format name") % (name)
LOG.debug(msg)
if try_diff_format:
name = self._get_backup_base_name(volume_id, diff_format=True)
if name not in rbds:
msg = _("diff format image '%s' not found") % (name)
LOG.debug(msg)
return False, name
else:
return False, name
return True, name
def _snap_exists(self, base_name, snap_name, client):
"""Return True if snapshot exists in base image."""
base_rbd = self.rbd.Image(client.ioctx, base_name)
try:
snaps = base_rbd.list_snaps()
finally:
base_rbd.close()
if snaps is None:
return False
for snap in snaps:
if snap['name'] == snap_name:
return True
return False
def _backup_rbd(self, backup_id, volume_id, volume_file, volume_name,
length):
"""Create a incremental backup from an RBD image."""
rbd_user = volume_file.rbd_user
rbd_pool = volume_file.rbd_pool
rbd_conf = volume_file.rbd_conf
source_rbd_image = volume_file.rbd_image
# Identify our --from-snap point (if one exists)
from_snap = self._get_most_recent_snap(source_rbd_image)
LOG.debug(_("using --from-snap '%s'") % from_snap)
backup_name = self._get_backup_base_name(volume_id, diff_format=True)
image_created = False
force_full_backup = False
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
# If from_snap does not exist in the destination, this implies a
# previous backup has failed. In this case we will force a full
# backup.
#
# TODO(dosaboy): find a way to repair the broken backup
#
if backup_name not in self.rbd.RBD().list(ioctx=client.ioctx):
# If a from_snap is defined then we cannot proceed (see above)
if from_snap is not None:
force_full_backup = True
# Create new base image
self._create_base_image(backup_name, length, client)
image_created = True
else:
# If a from_snap is defined but does not exist in the back base
# then we cannot proceed (see above)
if not self._snap_exists(backup_name, from_snap, client):
force_full_backup = True
if force_full_backup:
errmsg = (_("snap='%(snap)s' does not exist in base "
"image='%(base)s' - aborting incremental backup") %
{'snap': from_snap, 'base': backup_name})
LOG.info(errmsg)
# Raise this exception so that caller can try another
# approach
raise exception.BackupRBDOperationFailed(errmsg)
# Snapshot source volume so that we have a new point-in-time
new_snap = self._get_new_snap_name(backup_id)
LOG.debug(_("creating backup snapshot='%s'") % (new_snap))
source_rbd_image.create_snap(new_snap)
# Attempt differential backup. If this fails, perhaps because librbd
# or Ceph cluster version does not support it, do a full backup
# instead.
#
# TODO(dosaboy): find a way to determine if the operation is supported
# rather than brute force approach.
try:
before = time.time()
self._rbd_diff_transfer(volume_name, rbd_pool, backup_name,
self._ceph_backup_pool,
src_user=rbd_user,
src_conf=rbd_conf,
dest_user=self._ceph_backup_user,
dest_conf=self._ceph_backup_conf,
src_snap=new_snap,
from_snap=from_snap)
LOG.debug(_("differential backup transfer completed in %.4fs") %
(time.time() - before))
# We don't need the previous snapshot (if there was one) anymore so
# delete it.
if from_snap:
source_rbd_image.remove_snap(from_snap)
except exception.BackupRBDOperationFailed:
LOG.debug(_("differential backup transfer failed"))
# Clean up if image was created as part of this operation
if image_created:
self._try_delete_base_image(backup_id, volume_id,
base_name=backup_name)
# Delete snapshot
LOG.debug(_("deleting backup snapshot='%s'") % (new_snap))
source_rbd_image.remove_snap(new_snap)
# Re-raise the exception so that caller can try another approach
raise
def _file_is_rbd(self, volume_file):
"""Returns True if the volume_file is actually an RBD image."""
return hasattr(volume_file, 'rbd_image')
def _full_backup(self, backup_id, volume_id, src_volume, src_name, length):
"""Perform a full backup of src volume.
First creates a base backup image in our backup location then performs
an chunked copy of all data from source volume to a new backup rbd
image.
"""
backup_name = self._get_backup_base_name(volume_id, backup_id)
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
# First create base backup image
old_format, features = self._get_rbd_support()
LOG.debug(_("creating base image='%s'") % (backup_name))
self.rbd.RBD().create(ioctx=client.ioctx,
name=backup_name,
size=backup_size,
size=length,
old_format=old_format,
features=features,
stripe_unit=self.rbd_stripe_unit,
stripe_count=self.rbd_stripe_count)
LOG.debug(_("copying data"))
dest_rbd = self.rbd.Image(client.ioctx, backup_name)
try:
self._transfer_data(volume_file, dest_rbd, backup_name,
backup_size, dest_is_rbd=True)
rbd_meta = drivers.rbd.RBDImageMetadata(dest_rbd,
self._ceph_backup_pool,
self._ceph_backup_user,
self._ceph_backup_conf)
rbd_fd = drivers.rbd.RBDImageIOWrapper(rbd_meta)
self._transfer_data(src_volume, src_name, rbd_fd, backup_name,
length)
finally:
dest_rbd.close()
def backup(self, backup, volume_file):
"""Backup the given volume to Ceph object store"""
backup_id = backup['id']
volume = self.db.volume_get(self.context, backup['volume_id'])
backup_name = self._get_backup_base_name(volume['id'], backup_id)
@staticmethod
def backup_snapshot_name_pattern():
"""Returns the pattern used to match backup snapshots.
LOG.debug("Starting backup of volume='%s' to rbd='%s'" %
(volume['name'], backup_name))
It is essential that snapshots created for purposes other than backups
do not have this name format.
"""
return r"^backup\.([a-z0-9\-]+?)\.snap\.(.+)$"
@classmethod
def get_backup_snaps(cls, rbd_image, sort=False):
"""Get all backup snapshots for the given rbd image.
NOTE: this call is made public since these snapshots must be deleted
before the base volume can be deleted.
"""
snaps = rbd_image.list_snaps()
backup_snaps = []
for snap in snaps:
search_key = cls.backup_snapshot_name_pattern()
result = re.search(search_key, snap['name'])
if result:
backup_snaps.append({'name': result.group(0),
'backup_id': result.group(1),
'timestamp': result.group(2)})
if sort:
# Sort into ascending order of timestamp
backup_snaps.sort(key=lambda x: x['timestamp'], reverse=True)
return backup_snaps
def _get_new_snap_name(self, backup_id):
return str("backup.%s.snap.%s" % (backup_id, time.time()))
def _get_backup_snap_name(self, rbd_image, name, backup_id):
"""Return the name of the snapshot associated with backup_id.
The rbd image provided must be the base image used for an incremental
backup.
A back is only allowed to have one associated snapshot. If more than
one is found, exception.BackupOperationError is raised.
"""
snaps = self.get_backup_snaps(rbd_image)
LOG.debug(_("looking for snapshot of backup base '%s'") % (name))
if not snaps:
LOG.debug(_("backup base '%s' has no snapshots") % (name))
return None
snaps = [snap['name'] for snap in snaps
if snap['backup_id'] == backup_id]
if not snaps:
LOG.debug(_("backup '%s' has no snapshot") % (backup_id))
return None
if len(snaps) > 1:
msg = (_("backup should only have one snapshot but instead has %s")
% (len(snaps)))
LOG.error(msg)
raise exception.BackupOperationError(msg)
LOG.debug(_("found snapshot '%s'") % (snaps[0]))
return snaps[0]
def _get_most_recent_snap(self, rbd_image):
"""Get the most recent backup snapshot of the provided image.
Returns name of most recent backup snapshot or None if there are no
backup snapshot.
"""
backup_snaps = self.get_backup_snaps(rbd_image, sort=True)
if not backup_snaps:
return None
return backup_snaps[0]['name']
def _get_volume_size_gb(self, volume):
"""Return the size in gigabytes of the given volume.
Raises exception.InvalidParameterValue if voluem size is 0.
"""
if int(volume['size']) == 0:
raise exception.InvalidParameterValue("need non-zero volume size")
else:
backup_size = int(volume['size']) * units.GiB
if volume_file:
self._backup_volume_from_file(backup_name, backup_size,
volume_file)
else:
errmsg = ("No volume_file was provided so I cannot do requested "
"backup (id=%s)" % (backup_id))
raise exception.BackupVolumeInvalidType(errmsg)
return int(volume['size']) * units.GiB
self.db.backup_update(self.context, backup['id'],
{'container': self._ceph_pool})
def backup(self, backup, volume_file):
"""Backup the given volume to Ceph object store.
If the source volume is an RBD we will attempt to do an
incremental/differential backup, otherwise a full copy is performed.
If this fails we will attempt to fall back to full copy.
"""
backup_id = backup['id']
volume = self.db.volume_get(self.context, backup['volume_id'])
volume_id = volume['id']
volume_name = volume['name']
LOG.debug(_("Starting backup of volume='%s'") % volume_name)
# Ensure we are at the beginning of the volume
volume_file.seek(0)
length = self._get_volume_size_gb(volume)
do_full_backup = False
if self._file_is_rbd(volume_file):
# If volume an RBD, attempt incremental backup.
try:
self._backup_rbd(backup_id, volume_id, volume_file,
volume_name, length)
except exception.BackupRBDOperationFailed:
LOG.debug(_("forcing full backup"))
do_full_backup = True
else:
do_full_backup = True
if do_full_backup:
self._full_backup(backup_id, volume_id, volume_file,
volume_name, length)
self.db.backup_update(self.context, backup_id,
{'container': self._ceph_backup_pool})
LOG.debug(_("backup '%s' finished.") % (backup_id))
def restore(self, backup, volume_id, volume_file):
"""Restore the given volume backup from Ceph object store"""
volume = self.db.volume_get(self.context, volume_id)
backup_name = self._get_backup_base_name(backup['volume_id'],
backup['id'])
def _full_restore(self, backup_id, volume_id, dest_file, dest_name,
length, src_snap=None):
"""Restore the given volume file from backup RBD.
LOG.debug('starting backup restore from Ceph backup=%s '
'to volume=%s' % (backup['id'], volume['name']))
This will result in all extents being copied from source to destination
"""
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
if src_snap:
# If a source snapshot is provided we assume the base is diff
# format.
backup_name = self._get_backup_base_name(volume_id,
diff_format=True)
else:
backup_name = self._get_backup_base_name(volume_id, backup_id)
# Retrieve backup volume
src_rbd = self.rbd.Image(client.ioctx, backup_name,
snapshot=src_snap)
try:
rbd_meta = drivers.rbd.RBDImageMetadata(src_rbd,
self._ceph_backup_pool,
self._ceph_backup_user,
self._ceph_backup_conf)
rbd_fd = drivers.rbd.RBDImageIOWrapper(rbd_meta)
self._transfer_data(rbd_fd, backup_name, dest_file, dest_name,
length)
finally:
src_rbd.close()
def _restore_rbd(self, base_name, volume_file, volume_name, restore_point):
"""Restore RBD volume from RBD image."""
rbd_user = volume_file.rbd_user
rbd_pool = volume_file.rbd_pool
rbd_conf = volume_file.rbd_conf
LOG.debug(_("trying incremental restore from base='%(base)s' "
"snap='%(snap)s'") %
{'base': base_name, 'snap': restore_point})
before = time.time()
try:
self._rbd_diff_transfer(base_name, self._ceph_backup_pool,
volume_name, rbd_pool,
src_user=self._ceph_backup_user,
src_conf=self._ceph_backup_conf,
dest_user=rbd_user, dest_conf=rbd_conf,
src_snap=restore_point)
except exception.BackupRBDOperationFailed:
LOG.exception(_("differential restore failed, trying full "
"restore"))
raise
LOG.debug(_("restore transfer completed in %.4fs") %
(time.time() - before))
def _num_backup_snaps(self, backup_base_name):
"""Return the number of snapshots that exist on the base image."""
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
base_rbd = self.rbd.Image(client.ioctx, backup_base_name)
try:
snaps = self.get_backup_snaps(base_rbd)
finally:
base_rbd.close()
if snaps:
return len(snaps)
else:
return 0
def _get_restore_point(self, base_name, backup_id):
"""Get restore point snapshot name for incremental backup.
If the backup was not incremental None is returned.
"""
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
base_rbd = self.rbd.Image(client.ioctx, base_name)
try:
restore_point = self._get_backup_snap_name(base_rbd, base_name,
backup_id)
finally:
base_rbd.close()
return restore_point
def _rbd_has_extents(self, rbd_volume):
"""Check whether the given rbd volume has extents.
Return True if has extents, otherwise False.
"""
extents = []
def iter_cb(offset, length, exists):
if exists:
extents.append(length)
rbd_volume.diff_iterate(0, rbd_volume.size(), None, iter_cb)
if extents:
LOG.debug("rbd has %s extents" % (sum(extents)))
return True
return False
def _diff_restore_allowed(self, base_name, backup, volume, volume_file,
rados_client):
"""Determine whether a differential restore is possible/allowed.
In order for a differential restore to be performed we need:
* destination volume must be RBD
* destination volume must have zero extents
* backup base image must exist
* backup must have a restore point
Returns True if differential restore is allowed, False otherwise.
"""
not_allowed = (False, None)
# If the volume we are restoring to is the volume the backup was made
# from, force a full restore since a diff will not work in this case.
if volume['id'] == backup['volume_id']:
LOG.debug("dest volume is original volume - forcing full copy")
return not_allowed
if self._file_is_rbd(volume_file):
rbd_exists, base_name = self._rbd_image_exists(base_name,
backup['volume_id'],
rados_client)
if not rbd_exists:
return not_allowed
# Get the restore point. If no restore point is found, we assume
# that the backup was not performed using diff/incremental methods
# so we enforce full copy.
restore_point = self._get_restore_point(base_name, backup['id'])
if restore_point:
# If the destination volume has extents we cannot allow a diff
# restore.
if self._rbd_has_extents(volume_file.rbd_image):
# We return the restore point so that a full copy is done
# from snapshot.
LOG.debug("destination has extents - forcing full copy")
return False, restore_point
return True, restore_point
else:
LOG.info(_("no restore point found for backup='%s', forcing "
"full copy") % (backup['id']))
return not_allowed
def _try_restore(self, backup, volume, volume_file):
"""Attempt to restore volume from backup."""
volume_name = volume['name']
backup_id = backup['id']
backup_volume_id = backup['volume_id']
length = int(volume['size']) * units.GiB
base_name = self._get_backup_base_name(backup['volume_id'],
diff_format=True)
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
diff_restore, restore_point = \
self._diff_restore_allowed(base_name, backup, volume,
volume_file, client)
if diff_restore:
try:
do_full_restore = False
self._restore_rbd(base_name, volume_file, volume_name,
restore_point)
except exception.BackupRBDOperationFailed:
LOG.debug(_("forcing full restore"))
do_full_restore = True
else:
do_full_restore = True
if do_full_restore:
# Otherwise full copy
self._full_restore(backup_id, backup_volume_id, volume_file,
volume_name, length, src_snap=restore_point)
def restore(self, backup, volume_id, volume_file):
"""Restore the given volume backup from Ceph object store."""
target_volume = self.db.volume_get(self.context, volume_id)
LOG.debug(_('starting restore from Ceph backup=%(src)s to '
'volume=%(dest)s') %
{'src': backup['id'], 'dest': target_volume['name']})
# Ensure we are at the beginning of the volume
volume_file.seek(0)
backup_size = int(volume['size']) * units.GiB
with rbddriver.RADOSClient(self, self._ceph_pool) as client:
src_rbd = self.rbd.Image(client.ioctx, backup_name)
try:
self._transfer_data(src_rbd, volume_file, volume['name'],
backup_size)
finally:
src_rbd.close()
# Be tolerant to IO implementations that do not support fileno()
try:
fileno = volume_file.fileno()
except IOError:
LOG.info("volume_file does not support fileno() so skipping "
"fsync()")
else:
os.fsync(fileno)
self._try_restore(backup, target_volume, volume_file)
LOG.debug('restore %s to %s finished.' % (backup['id'], volume_id))
# Be tolerant to IO implementations that do not support fileno()
try:
fileno = volume_file.fileno()
except IOError:
LOG.info(_("volume_file does not support fileno() so skipping "
"fsync()"))
else:
os.fsync(fileno)
LOG.debug(_('restore finished.'))
except exception.BackupOperationError as e:
LOG.error(_('restore finished with error - %s') % (e))
raise
def delete(self, backup):
"""Delete the given backup from Ceph object store"""
"""Delete the given backup from Ceph object store."""
backup_id = backup['id']
backup_name = self._get_backup_base_name(backup['volume_id'],
backup_id)
LOG.debug('delete started for backup=%s', backup['id'])
LOG.debug(_('delete started for backup=%s') % backup['id'])
try:
with rbddriver.RADOSClient(self) as client:
self.rbd.RBD().remove(client.ioctx, backup_name)
self._try_delete_base_image(backup['id'], backup['volume_id'])
except self.rbd.ImageNotFound:
LOG.warning("rbd image '%s' not found but continuing anyway so "
"that db entry can be removed" % (backup_name))
msg = _("rbd image not found but continuing anyway so "
"that db entry can be removed")
LOG.warning(msg)
LOG.info(_("delete '%s' finished with warning") % (backup_id))
LOG.debug(_("delete '%s' finished") % (backup_id))

View File

@ -564,6 +564,18 @@ class ImageCopyFailure(Invalid):
message = _("Failed to copy image to volume: %(reason)s")
class BackupInvalidCephArgs(Invalid):
message = _("Invalid Ceph args provided for backup rbd operation")
class BackupOperationError(Invalid):
message = _("An error has occurred during backup operation")
class BackupRBDOperationFailed(Invalid):
message = _("Backup RBD operation failed")
class BackupVolumeInvalidType(Invalid):
message = _("Backup volume %(volume_id)s type not recognised.")

View File

@ -16,7 +16,7 @@
class mock_rados(object):
class mock_ioctx(object):
class ioctx(object):
def __init__(self, *args, **kwargs):
pass
@ -32,7 +32,7 @@ class mock_rados(object):
pass
def open_ioctx(self, *args, **kwargs):
return mock_rados.mock_ioctx()
return mock_rados.ioctx()
def shutdown(self, *args, **kwargs):
pass
@ -44,22 +44,42 @@ class mock_rados(object):
class mock_rbd(object):
class ImageBusy(Exception):
def __init__(self, *args, **kwargs):
pass
class ImageNotFound(Exception):
def __init__(self, *args, **kwargs):
pass
class Image(object):
def __init__(self, *args, **kwargs):
pass
def read(self, *args, **kwargs):
def create_snap(self, *args, **kwargs):
pass
def remove_snap(self, *args, **kwargs):
pass
def read(self, *args, **kwargs):
raise NotImplementedError()
def write(self, *args, **kwargs):
pass
raise NotImplementedError()
def resize(self, *args, **kwargs):
raise NotImplementedError()
def close(self):
pass
def close(self, *args, **kwargs):
pass
def list_snaps(self):
raise NotImplementedError()
def size(self):
raise NotImplementedError()
class RBD(object):
@ -72,6 +92,5 @@ class mock_rbd(object):
def remove(self, *args, **kwargs):
pass
class ImageNotFound(Exception):
def __init__(self, *args, **kwargs):
pass
def list(self, *args, **kwargs):
raise NotImplementedError()

View File

@ -14,21 +14,22 @@
# under the License.
""" Tests for Ceph backup service """
import eventlet
import hashlib
import os
import tempfile
import time
import uuid
from cinder.backup.drivers.ceph import CephBackupDriver
from cinder.tests.backup.fake_rados import mock_rados
from cinder.tests.backup.fake_rados import mock_rbd
from cinder.backup.drivers import ceph
from cinder import context
from cinder import db
from cinder import exception
from cinder.openstack.common import log as logging
from cinder import test
from cinder.tests.backup.fake_rados import mock_rados
from cinder.tests.backup.fake_rados import mock_rbd
from cinder.volume.drivers import rbd as rbddriver
LOG = logging.getLogger(__name__)
@ -44,18 +45,30 @@ class BackupCephTestCase(test.TestCase):
backup = {'id': backupid, 'size': size, 'volume_id': volid}
return db.backup_create(self.ctxt, backup)['id']
def fake_execute_w_exception(*args, **kwargs):
raise exception.ProcessExecutionError()
def time_inc(self):
self.counter += 1
return self.counter
def _get_wrapped_rbd_io(self, rbd_image):
rbd_meta = rbddriver.RBDImageMetadata(rbd_image, 'pool_foo',
'user_foo', 'conf_foo')
return rbddriver.RBDImageIOWrapper(rbd_meta)
def setUp(self):
super(BackupCephTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.vol_id = str(uuid.uuid4())
self.volume_id = str(uuid.uuid4())
self.backup_id = str(uuid.uuid4())
# Setup librbd stubs
self.stubs.Set(ceph, 'rados', mock_rados)
self.stubs.Set(ceph, 'rbd', mock_rbd)
self._create_backup_db_entry(self.backup_id, self.vol_id, 1)
self._create_backup_db_entry(self.backup_id, self.volume_id, 1)
self.chunk_size = 1024
self.num_chunks = 128
@ -72,30 +85,108 @@ class BackupCephTestCase(test.TestCase):
self.volume_file.seek(0)
# Always trigger an exception if a command is executed since it should
# always be dealt with gracefully. At time of writing on rbd
# export/import-diff is executed and if they fail we expect to find
# alternative means of backing up.
fake_exec = self.fake_execute_w_exception
self.service = ceph.CephBackupDriver(self.ctxt, execute=fake_exec)
# Ensure that time.time() always returns more than the last time it was
# called to avoid div by zero errors.
self.counter = float(0)
self.stubs.Set(time, 'time', self.time_inc)
self.stubs.Set(eventlet, 'sleep', lambda *args: None)
def test_get_rbd_support(self):
service = CephBackupDriver(self.ctxt)
self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING'))
self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2'))
self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_LAYERING'))
self.assertFalse(hasattr(service.rbd, 'RBD_FEATURE_STRIPINGV2'))
oldformat, features = service._get_rbd_support()
oldformat, features = self.service._get_rbd_support()
self.assertTrue(oldformat)
self.assertEquals(features, 0)
service.rbd.RBD_FEATURE_LAYERING = 1
self.service.rbd.RBD_FEATURE_LAYERING = 1
oldformat, features = service._get_rbd_support()
oldformat, features = self.service._get_rbd_support()
self.assertFalse(oldformat)
self.assertEquals(features, 1)
service.rbd.RBD_FEATURE_STRIPINGV2 = 2
self.service.rbd.RBD_FEATURE_STRIPINGV2 = 2
oldformat, features = service._get_rbd_support()
oldformat, features = self.service._get_rbd_support()
self.assertFalse(oldformat)
self.assertEquals(features, 1 | 2)
def test_tranfer_data_from_rbd(self):
service = CephBackupDriver(self.ctxt)
def _set_common_backup_stubs(self, service):
self.stubs.Set(self.service, '_get_rbd_support', lambda: (True, 3))
self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)
def rbd_size(inst):
return self.chunk_size * self.num_chunks
self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
def _set_common_restore_stubs(self, service):
self._set_common_backup_stubs(self.service)
def rbd_size(inst):
return self.chunk_size * self.num_chunks
self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
def test_get_most_recent_snap(self):
last = 'backup.%s.snap.9824923.1212' % (uuid.uuid4())
def list_snaps(inst, *args):
return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4())},
{'name': 'backup.%s.snap.1321319.3235' % (uuid.uuid4())},
{'name': last},
{'name': 'backup.%s.snap.3824923.1412' % (uuid.uuid4())}]
self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
snap = self.service._get_most_recent_snap(self.service.rbd.Image())
self.assertEquals(last, snap)
def test_get_backup_snap_name(self):
snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
def mock_get_backup_snaps(inst, *args):
return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4()),
'backup_id': str(uuid.uuid4())},
{'name': snap_name,
'backup_id': self.backup_id}]
self.stubs.Set(self.service, 'get_backup_snaps', lambda *args: None)
name = self.service._get_backup_snap_name(self.service.rbd.Image(),
'base_foo',
self.backup_id)
self.assertIsNone(name)
self.stubs.Set(self.service, 'get_backup_snaps', mock_get_backup_snaps)
name = self.service._get_backup_snap_name(self.service.rbd.Image(),
'base_foo',
self.backup_id)
self.assertEquals(name, snap_name)
def test_get_backup_snaps(self):
def list_snaps(inst, *args):
return [{'name': 'backup.%s.snap.6423868.2342' % (uuid.uuid4())},
{'name': 'backup.%s.wambam.6423868.2342' % (uuid.uuid4())},
{'name': 'backup.%s.snap.1321319.3235' % (uuid.uuid4())},
{'name': 'bbbackup.%s.snap.1321319.3235' % (uuid.uuid4())},
{'name': 'backup.%s.snap.3824923.1412' % (uuid.uuid4())}]
self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
snaps = self.service.get_backup_snaps(self.service.rbd.Image())
self.assertTrue(len(snaps) == 3)
def test_transfer_data_from_rbd_to_file(self):
self._set_common_backup_stubs(self.service)
with tempfile.NamedTemporaryFile() as test_file:
self.volume_file.seek(0)
@ -103,10 +194,11 @@ class BackupCephTestCase(test.TestCase):
def read_data(inst, offset, length):
return self.volume_file.read(self.length)
self.stubs.Set(service.rbd.Image, 'read', read_data)
self.stubs.Set(self.service.rbd.Image, 'read', read_data)
service._transfer_data(service.rbd.Image(), test_file, 'foo',
self.length)
rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
self.service._transfer_data(rbd_io, 'src_foo', test_file,
'dest_foo', self.length)
checksum = hashlib.sha256()
test_file.seek(0)
@ -116,26 +208,79 @@ class BackupCephTestCase(test.TestCase):
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
def test_tranfer_data_to_rbd(self):
service = CephBackupDriver(self.ctxt)
def test_transfer_data_from_rbd_to_rbd(self):
def rbd_size(inst):
return self.chunk_size * self.num_chunks
with tempfile.NamedTemporaryFile() as test_file:
self.volume_file.seek(0)
checksum = hashlib.sha256()
def read_data(inst, offset, length):
return self.volume_file.read(self.length)
def write_data(inst, data, offset):
checksum.update(data)
test_file.write(data)
self.stubs.Set(self.service.rbd.Image, 'read', read_data)
self.stubs.Set(self.service.rbd.Image, 'size', rbd_size)
self.stubs.Set(self.service.rbd.Image, 'write', write_data)
rbd1 = self.service.rbd.Image()
rbd2 = self.service.rbd.Image()
src_rbd_io = self._get_wrapped_rbd_io(rbd1)
dest_rbd_io = self._get_wrapped_rbd_io(rbd2)
self.service._transfer_data(src_rbd_io, 'src_foo', dest_rbd_io,
'dest_foo', self.length)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
def test_transfer_data_from_file_to_rbd(self):
self._set_common_backup_stubs(self.service)
with tempfile.NamedTemporaryFile() as test_file:
self.volume_file.seek(0)
checksum = hashlib.sha256()
def write_data(inst, data, offset):
checksum.update(data)
test_file.write(data)
self.stubs.Set(service.rbd.Image, 'write', write_data)
self.stubs.Set(self.service.rbd.Image, 'write', write_data)
service._transfer_data(self.volume_file, service.rbd.Image(),
'foo', self.length, dest_is_rbd=True)
rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
self.service._transfer_data(self.volume_file, 'src_foo',
rbd_io, 'dest_foo', self.length)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
def test_transfer_data_from_file_to_file(self):
self._set_common_backup_stubs(self.service)
with tempfile.NamedTemporaryFile() as test_file:
self.volume_file.seek(0)
checksum = hashlib.sha256()
self.service._transfer_data(self.volume_file, 'src_foo', test_file,
'dest_foo', self.length)
checksum = hashlib.sha256()
test_file.seek(0)
for c in xrange(0, self.num_chunks):
checksum.update(test_file.read(self.chunk_size))
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
def test_backup_volume_from_file(self):
service = CephBackupDriver(self.ctxt)
self._create_volume_db_entry(self.volume_id, 1)
backup = db.backup_get(self.ctxt, self.backup_id)
self._set_common_backup_stubs(self.service)
with tempfile.NamedTemporaryFile() as test_file:
checksum = hashlib.sha256()
@ -144,65 +289,96 @@ class BackupCephTestCase(test.TestCase):
checksum.update(data)
test_file.write(data)
self.stubs.Set(service.rbd.Image, 'write', write_data)
self.stubs.Set(self.service.rbd.Image, 'write', write_data)
service._backup_volume_from_file('foo', self.length,
self.volume_file)
self.service.backup(backup, self.volume_file)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
def tearDown(self):
self.volume_file.close()
super(BackupCephTestCase, self).tearDown()
def test_get_backup_base_name(self):
name = self.service._get_backup_base_name(self.volume_id,
diff_format=True)
self.assertEquals(name, "volume-%s.backup.base" % (self.volume_id))
def test_backup_error1(self):
service = CephBackupDriver(self.ctxt)
self.assertRaises(exception.InvalidParameterValue,
self.service._get_backup_base_name,
self.volume_id)
name = self.service._get_backup_base_name(self.volume_id, '1234')
self.assertEquals(name, "volume-%s.backup.%s" %
(self.volume_id, '1234'))
def test_backup_volume_from_rbd(self):
self._create_volume_db_entry(self.volume_id, 1)
backup = db.backup_get(self.ctxt, self.backup_id)
self._create_volume_db_entry(self.vol_id, 0)
self.assertRaises(exception.InvalidParameterValue, service.backup,
self._set_common_backup_stubs(self.service)
backup_name = self.service._get_backup_base_name(self.backup_id,
diff_format=True)
self.stubs.Set(self.service, '_try_delete_base_image',
lambda *args, **kwargs: None)
with tempfile.NamedTemporaryFile() as test_file:
checksum = hashlib.sha256()
def write_data(inst, data, offset):
checksum.update(data)
test_file.write(data)
def read_data(inst, offset, length):
return self.volume_file.read(self.length)
def rbd_list(inst, ioctx):
return [backup_name]
self.stubs.Set(self.service.rbd.Image, 'read', read_data)
self.stubs.Set(self.service.rbd.Image, 'write', write_data)
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
meta = rbddriver.RBDImageMetadata(self.service.rbd.Image(),
'pool_foo', 'user_foo',
'conf_foo')
rbd_io = rbddriver.RBDImageIOWrapper(meta)
self.service.backup(backup, rbd_io)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
def test_backup_vol_length_0(self):
self._set_common_backup_stubs(self.service)
backup = db.backup_get(self.ctxt, self.backup_id)
self._create_volume_db_entry(self.volume_id, 0)
self.assertRaises(exception.InvalidParameterValue, self.service.backup,
backup, self.volume_file)
def test_backup_error2(self):
service = CephBackupDriver(self.ctxt)
backup = db.backup_get(self.ctxt, self.backup_id)
self._create_volume_db_entry(self.vol_id, 1)
self.assertRaises(exception.BackupVolumeInvalidType, service.backup,
backup, None)
def test_backup_good(self):
service = CephBackupDriver(self.ctxt)
backup = db.backup_get(self.ctxt, self.backup_id)
self._create_volume_db_entry(self.vol_id, 1)
with tempfile.NamedTemporaryFile() as test_file:
checksum = hashlib.sha256()
def write_data(inst, data, offset):
checksum.update(data)
test_file.write(data)
self.stubs.Set(service.rbd.Image, 'write', write_data)
service.backup(backup, self.volume_file)
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
def test_restore(self):
service = CephBackupDriver(self.ctxt)
self._create_volume_db_entry(self.vol_id, 1)
self._create_volume_db_entry(self.volume_id, 1)
backup = db.backup_get(self.ctxt, self.backup_id)
self._set_common_restore_stubs(self.service)
backup_name = self.service._get_backup_base_name(self.backup_id,
diff_format=True)
def rbd_list(inst, ioctx):
return [backup_name]
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
with tempfile.NamedTemporaryFile() as test_file:
self.volume_file.seek(0)
def read_data(inst, offset, length):
return self.volume_file.read(self.length)
self.stubs.Set(service.rbd.Image, 'read', read_data)
self.stubs.Set(self.service.rbd.Image, 'read', read_data)
service.restore(backup, self.vol_id, test_file)
self.service.restore(backup, self.volume_id, test_file)
checksum = hashlib.sha256()
test_file.seek(0)
@ -212,17 +388,183 @@ class BackupCephTestCase(test.TestCase):
# Ensure the files are equal
self.assertEquals(checksum.digest(), self.checksum.digest())
def test_delete(self):
service = CephBackupDriver(self.ctxt)
self._create_volume_db_entry(self.vol_id, 1)
def test_create_base_image_if_not_exists(self):
pass
def test_delete_backup_snapshots(self):
snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
base_name = self.service._get_backup_base_name(self.volume_id,
diff_format=True)
self.stubs.Set(self.service, '_get_backup_snap_name',
lambda *args: snap_name)
self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args: None)
rem = self.service._delete_backup_snapshots(mock_rados(), base_name,
self.backup_id)
self.assertEquals(rem, 0)
def test_try_delete_base_image_diff_format(self):
# don't create volume db entry since it should not be required
backup = db.backup_get(self.ctxt, self.backup_id)
backup_name = self.service._get_backup_base_name(self.volume_id,
diff_format=True)
snap_name = self.service._get_new_snap_name(self.backup_id)
snaps = [{'name': snap_name}]
def rbd_list(*args):
return [backup_name]
def list_snaps(*args):
return snaps
def remove_snap(*args):
snaps.pop()
self.stubs.Set(self.service.rbd.Image, 'remove_snap', remove_snap)
self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
# Must be something mutable
remove_called = []
def remove(inst, ioctx, name):
remove_called.append(True)
self.stubs.Set(service.rbd.RBD, 'remove', remove)
service.delete(backup)
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
self.assertTrue(remove_called[0])
def test_try_delete_base_image(self):
# don't create volume db entry since it should not be required
self._create_volume_db_entry(self.volume_id, 1)
backup = db.backup_get(self.ctxt, self.backup_id)
backup_name = self.service._get_backup_base_name(self.volume_id,
self.backup_id)
def rbd_list(inst, ioctx):
return [backup_name]
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
# Must be something mutable
remove_called = []
self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)
def remove(inst, ioctx, name):
remove_called.append(True)
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
self.assertTrue(remove_called[0])
def test_try_delete_base_image_busy(self):
"""This should induce retries then raise rbd.ImageBusy."""
# don't create volume db entry since it should not be required
self._create_volume_db_entry(self.volume_id, 1)
backup = db.backup_get(self.ctxt, self.backup_id)
backup_name = self.service._get_backup_base_name(self.volume_id,
self.backup_id)
def rbd_list(inst, ioctx):
return [backup_name]
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
# Must be something mutable
remove_called = []
self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)
def remove(inst, ioctx, name):
raise self.service.rbd.ImageBusy("image busy")
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.assertRaises(self.service.rbd.ImageBusy,
self.service._try_delete_base_image,
backup['id'], backup['volume_id'])
def test_delete(self):
backup = db.backup_get(self.ctxt, self.backup_id)
def del_base_image(*args):
pass
self.stubs.Set(self.service, '_try_delete_base_image',
lambda *args: None)
self.service.delete(backup)
def test_delete_image_not_found(self):
backup = db.backup_get(self.ctxt, self.backup_id)
def del_base_image(*args):
raise self.service.rbd.ImageNotFound
self.stubs.Set(self.service, '_try_delete_base_image',
lambda *args: None)
# ImageNotFound exception is caught so that db entry can be cleared
self.service.delete(backup)
def test_diff_restore_allowed_true(self):
is_allowed = (True, 'restore.foo')
backup = db.backup_get(self.ctxt, self.backup_id)
alt_volume_id = str(uuid.uuid4())
self._create_volume_db_entry(alt_volume_id, 1)
alt_volume = db.volume_get(self.ctxt, alt_volume_id)
rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
self.stubs.Set(self.service, '_get_restore_point',
lambda *args: 'restore.foo')
self.stubs.Set(self.service, '_rbd_has_extents',
lambda *args: False)
self.stubs.Set(self.service, '_rbd_image_exists',
lambda *args: (True, 'foo'))
self.stubs.Set(self.service, '_file_is_rbd', lambda *args: True)
resp = self.service._diff_restore_allowed('foo', backup, alt_volume,
rbd_io, mock_rados())
self.assertEquals(resp, is_allowed)
def test_diff_restore_allowed_false(self):
not_allowed = (False, None)
backup = db.backup_get(self.ctxt, self.backup_id)
self._create_volume_db_entry(self.volume_id, 1)
original_volume = db.volume_get(self.ctxt, self.volume_id)
rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
self.stubs.Set(self.service, '_get_restore_point',
lambda *args: None)
self.stubs.Set(self.service, '_rbd_has_extents',
lambda *args: True)
self.stubs.Set(self.service, '_rbd_image_exists',
lambda *args: (False, 'foo'))
self.stubs.Set(self.service, '_file_is_rbd', lambda *args: False)
resp = self.service._diff_restore_allowed('foo', backup,
original_volume, rbd_io,
mock_rados())
self.assertEquals(resp, not_allowed)
def tearDown(self):
self.volume_file.close()
self.stubs.UnsetAll()
super(BackupCephTestCase, self).tearDown()

View File

@ -127,9 +127,13 @@ class RBDTestCase(test.TestCase):
volume = dict(name=name)
mock_client = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(driver, 'RADOSClient')
self.stubs.Set(self.driver, '_get_backup_snaps', lambda *args: None)
driver.RADOSClient(self.driver).AndReturn(mock_client)
mock_client.__enter__().AndReturn(mock_client)
mock_image = self.mox.CreateMockAnything()
self.rbd.Image(mox.IgnoreArg(), str(name)).AndReturn(mock_image)
mock_image.close()
mock_rbd = self.mox.CreateMockAnything()
self.rbd.RBD().AndReturn(mock_rbd)
mock_rbd.remove(mox.IgnoreArg(), str(name))

View File

@ -24,17 +24,15 @@ import os
import tempfile
import urllib
from oslo.config import cfg
import cinder.backup.drivers.ceph as ceph_backup
from cinder import exception
from cinder.image import image_utils
from cinder import units
from cinder import utils
from cinder.openstack.common import fileutils
from cinder.openstack.common import log as logging
from cinder import units
from cinder import utils
from cinder.volume import driver
from oslo.config import cfg
try:
import rados
@ -85,28 +83,53 @@ def ascii_str(string):
return str(string)
class RBDImageMetadata(object):
"""RBD image metadata to be used with RBDImageIOWrapper"""
def __init__(self, image, pool, user, conf):
self.image = image
self.pool = str(pool)
self.user = str(user)
self.conf = str(conf)
class RBDImageIOWrapper(io.RawIOBase):
"""
Wrapper to provide standard Python IO interface to RBD images so that they
can be treated as files.
"""
def __init__(self, rbd_image):
def __init__(self, rbd_meta):
super(RBDImageIOWrapper, self).__init__()
self.rbd_image = rbd_image
self._rbd_meta = rbd_meta
self._offset = 0
def _inc_offset(self, length):
self._offset += length
@property
def rbd_image(self):
return self._rbd_meta.image
@property
def rbd_user(self):
return self._rbd_meta.user
@property
def rbd_pool(self):
return self._rbd_meta.pool
@property
def rbd_conf(self):
return self._rbd_meta.conf
def read(self, length=None):
offset = self._offset
total = self.rbd_image.size()
total = self._rbd_meta.image.size()
# (dosaboy): posix files do not barf if you read beyond their length
# (they just return nothing) but rbd images do so we need to return
# empty string if we are at the end of the image
if (offset == total):
if (offset >= total):
return ''
if length is None:
@ -116,10 +139,10 @@ class RBDImageIOWrapper(io.RawIOBase):
length = total - offset
self._inc_offset(length)
return self.rbd_image.read(int(offset), int(length))
return self._rbd_meta.image.read(int(offset), int(length))
def write(self, data):
self.rbd_image.write(data, self._offset)
self._rbd_meta.image.write(data, self._offset)
self._inc_offset(len(data))
def seekable(self):
@ -147,10 +170,9 @@ class RBDImageIOWrapper(io.RawIOBase):
def flush(self):
try:
self.rbd_image.flush()
except AttributeError as exc:
LOG.warning("flush() not supported in this version of librbd - "
"%s" % (str(rbd.RBD().version())))
self._rbd_meta.image.flush()
except AttributeError:
LOG.warning(_("flush() not supported in this version of librbd"))
def fileno(self):
"""
@ -274,6 +296,14 @@ class RBDDriver(driver.VolumeDriver):
ioctx.close()
client.shutdown()
def _get_backup_snaps(self, rbd_image):
"""Get list of any backup snapshots that exist on this volume.
There should only ever be one but accept all since they need to be
deleted before the volume can be.
"""
return ceph_backup.CephBackupDriver.get_backup_snaps(rbd_image)
def _get_mon_addrs(self):
args = ['ceph', 'mon', 'dump', '--format=json'] + self._ceph_args()
out, _ = self._execute(*args)
@ -386,6 +416,16 @@ class RBDDriver(driver.VolumeDriver):
def delete_volume(self, volume):
"""Deletes a logical volume."""
with RADOSClient(self) as client:
# Ensure any backup snapshots are deleted
rbd_image = self.rbd.Image(client.ioctx, str(volume['name']))
try:
backup_snaps = self._get_backup_snaps(rbd_image)
if backup_snaps:
for snap in backup_snaps:
rbd_image.remove_snap(snap['name'])
finally:
rbd_image.close()
try:
self.rbd.RBD().remove(client.ioctx, str(volume['name']))
except self.rbd.ImageHasSnapshots:
@ -540,8 +580,11 @@ class RBDDriver(driver.VolumeDriver):
pool = self.configuration.rbd_pool
volname = volume['name']
with RBDVolumeProxy(self, volname, pool, read_only=True) as rbd_image:
rbd_fd = RBDImageIOWrapper(rbd_image)
with RBDVolumeProxy(self, volname, pool) as rbd_image:
rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
self.configuration.rbd_user,
self.configuration.rbd_ceph_conf)
rbd_fd = RBDImageIOWrapper(rbd_meta)
backup_service.backup(backup, rbd_fd)
LOG.debug("volume backup complete.")
@ -551,7 +594,10 @@ class RBDDriver(driver.VolumeDriver):
pool = self.configuration.rbd_pool
with RBDVolumeProxy(self, volume['name'], pool) as rbd_image:
rbd_fd = RBDImageIOWrapper(rbd_image)
rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
self.configuration.rbd_user,
self.configuration.rbd_ceph_conf)
rbd_fd = RBDImageIOWrapper(rbd_meta)
backup_service.restore(backup, volume['id'], rbd_fd)
LOG.debug("volume restore complete.")

View File

@ -1127,6 +1127,9 @@
# Options defined in cinder.volume.drivers.rbd
#
# path to the ceph configuration file to use (string)
#rbd_ceph_conf=/etc/ceph/ceph.conf
# the RADOS pool in which rbd volumes are stored (string
# value)
#rbd_pool=rbd