Merge "Fixes ceph volume restore to larger image than source"

This commit is contained in:
Jenkins 2013-09-16 20:47:21 +00:00 committed by Gerrit Code Review
commit 2a725ef15d
5 changed files with 198 additions and 69 deletions

View File

@ -78,7 +78,10 @@ service_opts = [
cfg.IntOpt('backup_ceph_stripe_unit', default=0, cfg.IntOpt('backup_ceph_stripe_unit', default=0,
help='RBD stripe unit to use when creating a backup image'), help='RBD stripe unit to use when creating a backup image'),
cfg.IntOpt('backup_ceph_stripe_count', default=0, cfg.IntOpt('backup_ceph_stripe_count', default=0,
help='RBD stripe count to use when creating a backup image') help='RBD stripe count to use when creating a backup image'),
cfg.BoolOpt('restore_discard_excess_bytes', default=True,
help='If True, always discard excess bytes when restoring '
'volumes.')
] ]
CONF = cfg.CONF CONF = cfg.CONF
@ -210,6 +213,24 @@ class CephBackupDriver(BackupDriver):
raise exception.InvalidParameterValue(msg) raise exception.InvalidParameterValue(msg)
return self._utf8("volume-%s.backup.%s" % (volume_id, backup_id)) return self._utf8("volume-%s.backup.%s" % (volume_id, backup_id))
def _discard_bytes(self, volume, offset, length):
"""Trim length bytes from offset.
If the volume is an rbd do a discard() otherwise assume it is a file
and pad with zeroes.
"""
if length:
LOG.info("discarding %s bytes from offset %s" %
(length, offset))
if self._file_is_rbd(volume):
volume.rbd_image.discard(offset, length)
else:
zeroes = '\0' * length
chunks = int(length / self.chunk_size)
for chunk in xrange(0, chunks):
LOG.debug("writing zeroes chunk %d" % (chunk))
volume.write(zeroes)
def _transfer_data(self, src, src_name, dest, dest_name, length): def _transfer_data(self, src, src_name, dest, dest_name, length):
"""Transfer data between files (Python IO objects).""" """Transfer data between files (Python IO objects)."""
LOG.debug(_("transferring data between '%(src)s' and '%(dest)s'") % LOG.debug(_("transferring data between '%(src)s' and '%(dest)s'") %
@ -222,13 +243,22 @@ class CephBackupDriver(BackupDriver):
for chunk in xrange(0, chunks): for chunk in xrange(0, chunks):
before = time.time() before = time.time()
data = src.read(self.chunk_size) data = src.read(self.chunk_size)
# If we have reach end of source, discard any extraneous bytes from
# destination volume if trim is enabled and stop writing.
if data == '':
if CONF.restore_discard_excess_bytes:
self._discard_bytes(dest, dest.tell(),
length - dest.tell())
return
dest.write(data) dest.write(data)
dest.flush() dest.flush()
delta = (time.time() - before) delta = (time.time() - before)
rate = (self.chunk_size / delta) / 1024 rate = (self.chunk_size / delta) / 1024
LOG.debug((_("transferred chunk %(chunk)s of %(chunks)s " LOG.debug((_("transferred chunk %(chunk)s of %(chunks)s "
"(%(rate)dK/s)") % "(%(rate)dK/s)") %
{'chunk': chunk, 'chunks': chunks, {'chunk': chunk + 1, 'chunks': chunks,
'rate': rate})) 'rate': rate}))
# yield to any other pending backups # yield to any other pending backups
@ -238,6 +268,10 @@ class CephBackupDriver(BackupDriver):
if rem: if rem:
LOG.debug(_("transferring remaining %s bytes") % (rem)) LOG.debug(_("transferring remaining %s bytes") % (rem))
data = src.read(rem) data = src.read(rem)
if data == '':
if CONF.restore_discard_excess_bytes:
self._discard_bytes(dest, dest.tell(), rem)
else:
dest.write(data) dest.write(data)
dest.flush() dest.flush()
# yield to any other pending backups # yield to any other pending backups
@ -370,7 +404,7 @@ class CephBackupDriver(BackupDriver):
def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool, def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
src_user, src_conf, dest_user, dest_conf, src_user, src_conf, dest_user, dest_conf,
src_snap=None, from_snap=None): src_snap=None, from_snap=None):
"""Backup only extents changed between two points. """Copy only extents changed between two points.
If no snapshot is provided, the diff extents will be all those changed If no snapshot is provided, the diff extents will be all those changed
since the rbd volume/base was created, otherwise it will be those since the rbd volume/base was created, otherwise it will be those
@ -431,7 +465,7 @@ class CephBackupDriver(BackupDriver):
def _snap_exists(self, base_name, snap_name, client): def _snap_exists(self, base_name, snap_name, client):
"""Return True if snapshot exists in base image.""" """Return True if snapshot exists in base image."""
base_rbd = self.rbd.Image(client.ioctx, base_name) base_rbd = self.rbd.Image(client.ioctx, base_name, read_only=True)
try: try:
snaps = base_rbd.list_snaps() snaps = base_rbd.list_snaps()
finally: finally:
@ -707,23 +741,26 @@ class CephBackupDriver(BackupDriver):
def _full_restore(self, backup_id, volume_id, dest_file, dest_name, def _full_restore(self, backup_id, volume_id, dest_file, dest_name,
length, src_snap=None): length, src_snap=None):
"""Restore the given volume file from backup RBD. """Restore volume using full copy i.e. all extents.
This will result in all extents being copied from source to destination This will result in all extents being copied from source to
destination.
""" """
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
if src_snap:
# If a source snapshot is provided we assume the base is diff # If a source snapshot is provided we assume the base is diff
# format. # format.
backup_name = self._get_backup_base_name(volume_id, if src_snap:
diff_format=True) diff_format = True
else: else:
backup_name = self._get_backup_base_name(volume_id, backup_id) diff_format = False
backup_name = self._get_backup_base_name(volume_id,
backup_id=backup_id,
diff_format=diff_format)
# Retrieve backup volume # Retrieve backup volume
src_rbd = self.rbd.Image(client.ioctx, backup_name, src_rbd = self.rbd.Image(client.ioctx, backup_name,
snapshot=src_snap) snapshot=src_snap, read_only=True)
try: try:
rbd_meta = drivers.rbd.RBDImageMetadata(src_rbd, rbd_meta = drivers.rbd.RBDImageMetadata(src_rbd,
self._ceph_backup_pool, self._ceph_backup_pool,
@ -735,11 +772,40 @@ class CephBackupDriver(BackupDriver):
finally: finally:
src_rbd.close() src_rbd.close()
def _restore_rbd(self, base_name, volume_file, volume_name, restore_point): def _check_restore_vol_size(self, backup_base, restore_vol, restore_length,
"""Restore RBD volume from RBD image.""" src_pool):
rbd_user = volume_file.rbd_user """Ensure that the restore volume is the correct size.
rbd_pool = volume_file.rbd_pool
rbd_conf = volume_file.rbd_conf If the restore volume was bigger than the backup, the diff restore will
shrink it to the size of the original backup so we need to
post-process and resize it back to its expected size.
"""
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
adjust_size = 0
base_image = self.rbd.Image(client.ioctx, self._utf8(backup_base),
read_only=True)
try:
if restore_length != base_image.size():
adjust_size = restore_length
finally:
base_image.close()
if adjust_size:
with drivers.rbd.RADOSClient(self, src_pool) as client:
dest_image = self.rbd.Image(client.ioctx,
self._utf8(restore_vol))
try:
LOG.debug("adjusting restore vol size")
dest_image.resize(adjust_size)
finally:
dest_image.close()
def _diff_restore_rbd(self, base_name, restore_file, restore_name,
restore_point, restore_length):
"""Attempt restore rbd volume from backup using diff transfer."""
rbd_user = restore_file.rbd_user
rbd_pool = restore_file.rbd_pool
rbd_conf = restore_file.rbd_conf
LOG.debug(_("trying incremental restore from base='%(base)s' " LOG.debug(_("trying incremental restore from base='%(base)s' "
"snap='%(snap)s'") % "snap='%(snap)s'") %
@ -747,7 +813,7 @@ class CephBackupDriver(BackupDriver):
before = time.time() before = time.time()
try: try:
self._rbd_diff_transfer(base_name, self._ceph_backup_pool, self._rbd_diff_transfer(base_name, self._ceph_backup_pool,
volume_name, rbd_pool, restore_name, rbd_pool,
src_user=self._ceph_backup_user, src_user=self._ceph_backup_user,
src_conf=self._ceph_backup_conf, src_conf=self._ceph_backup_conf,
dest_user=rbd_user, dest_conf=rbd_conf, dest_user=rbd_user, dest_conf=rbd_conf,
@ -757,13 +823,21 @@ class CephBackupDriver(BackupDriver):
"restore")) "restore"))
raise raise
# If the volume we are restoring to is larger than the backup volume,
# we will need to resize it after the diff import since import-diff
# appears to shrink the target rbd volume to the size of the original
# backup volume.
self._check_restore_vol_size(base_name, restore_name, restore_length,
rbd_pool)
LOG.debug(_("restore transfer completed in %.4fs") % LOG.debug(_("restore transfer completed in %.4fs") %
(time.time() - before)) (time.time() - before))
def _num_backup_snaps(self, backup_base_name): def _num_backup_snaps(self, backup_base_name):
"""Return the number of snapshots that exist on the base image.""" """Return the number of snapshots that exist on the base image."""
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
base_rbd = self.rbd.Image(client.ioctx, backup_base_name) base_rbd = self.rbd.Image(client.ioctx, backup_base_name,
read_only=True)
try: try:
snaps = self.get_backup_snaps(base_rbd) snaps = self.get_backup_snaps(base_rbd)
finally: finally:
@ -780,7 +854,7 @@ class CephBackupDriver(BackupDriver):
If the backup was not incremental None is returned. If the backup was not incremental None is returned.
""" """
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
base_rbd = self.rbd.Image(client.ioctx, base_name) base_rbd = self.rbd.Image(client.ioctx, base_name, read_only=True)
try: try:
restore_point = self._get_backup_snap_name(base_rbd, base_name, restore_point = self._get_backup_snap_name(base_rbd, base_name,
backup_id) backup_id)
@ -856,8 +930,11 @@ class CephBackupDriver(BackupDriver):
return not_allowed return not_allowed
def _try_restore(self, backup, volume, volume_file): def _restore_volume(self, backup, volume, volume_file):
"""Attempt to restore volume from backup.""" """Restore volume from backup using diff transfer if possible.
Attempts a differential restore and reverts to full copy if diff fails.
"""
volume_name = volume['name'] volume_name = volume['name']
backup_id = backup['id'] backup_id = backup['id']
backup_volume_id = backup['volume_id'] backup_volume_id = backup['volume_id']
@ -867,20 +944,19 @@ class CephBackupDriver(BackupDriver):
diff_format=True) diff_format=True)
with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client: with drivers.rbd.RADOSClient(self, self._ceph_backup_pool) as client:
diff_restore, restore_point = \ diff_allowed, restore_point = \
self._diff_restore_allowed(base_name, backup, volume, self._diff_restore_allowed(base_name, backup, volume,
volume_file, client) volume_file, client)
if diff_restore: do_full_restore = True
if diff_allowed:
# Attempt diff
try: try:
self._diff_restore_rbd(base_name, volume_file, volume_name,
restore_point, length)
do_full_restore = False do_full_restore = False
self._restore_rbd(base_name, volume_file, volume_name,
restore_point)
except exception.BackupRBDOperationFailed: except exception.BackupRBDOperationFailed:
LOG.debug(_("forcing full restore")) LOG.debug(_("forcing full restore"))
do_full_restore = True
else:
do_full_restore = True
if do_full_restore: if do_full_restore:
# Otherwise full copy # Otherwise full copy
@ -894,13 +970,10 @@ class CephBackupDriver(BackupDriver):
'volume=%(dest)s') % 'volume=%(dest)s') %
{'src': backup['id'], 'dest': target_volume['name']}) {'src': backup['id'], 'dest': target_volume['name']})
# Ensure we are at the beginning of the volume
volume_file.seek(0)
try: try:
self._try_restore(backup, target_volume, volume_file) self._restore_volume(backup, target_volume, volume_file)
# Be tolerant to IO implementations that do not support fileno() # Be tolerant of IO implementations that do not support fileno()
try: try:
fileno = volume_file.fileno() fileno = volume_file.fileno()
except IOError: except IOError:
@ -909,7 +982,7 @@ class CephBackupDriver(BackupDriver):
else: else:
os.fsync(fileno) os.fsync(fileno)
LOG.debug(_('restore finished.')) LOG.debug(_('restore finished successfully.'))
except exception.BackupOperationError as e: except exception.BackupOperationError as e:
LOG.error(_('restore finished with error - %s') % (e)) LOG.error(_('restore finished with error - %s') % (e))
raise raise
@ -926,7 +999,7 @@ class CephBackupDriver(BackupDriver):
"that db entry can be removed") "that db entry can be removed")
LOG.warning(msg) LOG.warning(msg)
LOG.info(_("delete '%s' finished with warning") % (backup_id)) LOG.info(_("delete '%s' finished with warning") % (backup_id))
else:
LOG.debug(_("delete '%s' finished") % (backup_id)) LOG.debug(_("delete '%s' finished") % (backup_id))

View File

@ -95,6 +95,9 @@ class mock_rbd(object):
def resize(self, *args, **kwargs): def resize(self, *args, **kwargs):
raise NotImplementedError() raise NotImplementedError()
def discard(self, offset, length):
raise NotImplementedError()
def close(self): def close(self):
pass pass

View File

@ -31,6 +31,7 @@ from cinder.openstack.common import processutils
from cinder import test from cinder import test
from cinder.tests.backup.fake_rados import mock_rados from cinder.tests.backup.fake_rados import mock_rados
from cinder.tests.backup.fake_rados import mock_rbd from cinder.tests.backup.fake_rados import mock_rbd
from cinder import units
from cinder.volume.drivers import rbd as rbddriver from cinder.volume.drivers import rbd as rbddriver
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -293,6 +294,9 @@ class BackupCephTestCase(test.TestCase):
self.stubs.Set(self.service.rbd.Image, 'write', write_data) self.stubs.Set(self.service.rbd.Image, 'write', write_data)
self.stubs.Set(self.service, '_discard_bytes',
lambda *args: None)
self.service.backup(backup, self.volume_file) self.service.backup(backup, self.volume_file)
# Ensure the files are equal # Ensure the files are equal
@ -340,6 +344,9 @@ class BackupCephTestCase(test.TestCase):
self.stubs.Set(self.service.rbd.Image, 'write', write_data) self.stubs.Set(self.service.rbd.Image, 'write', write_data)
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
self.stubs.Set(self.service, '_discard_bytes',
lambda *args: None)
meta = rbddriver.RBDImageMetadata(self.service.rbd.Image(), meta = rbddriver.RBDImageMetadata(self.service.rbd.Image(),
'pool_foo', 'user_foo', 'pool_foo', 'user_foo',
'conf_foo') 'conf_foo')
@ -372,6 +379,8 @@ class BackupCephTestCase(test.TestCase):
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list) self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
self.stubs.Set(self.service, '_discard_bytes', lambda *args: None)
with tempfile.NamedTemporaryFile() as test_file: with tempfile.NamedTemporaryFile() as test_file:
self.volume_file.seek(0) self.volume_file.seek(0)
@ -390,8 +399,30 @@ class BackupCephTestCase(test.TestCase):
# Ensure the files are equal # Ensure the files are equal
self.assertEqual(checksum.digest(), self.checksum.digest()) self.assertEqual(checksum.digest(), self.checksum.digest())
def test_create_base_image_if_not_exists(self): def test_discard_bytes(self):
pass self.service._discard_bytes(mock_rbd(), 123456, 0)
calls = []
def _setter(*args, **kwargs):
calls.append(True)
self.stubs.Set(self.service.rbd.Image, 'discard', _setter)
self.service._discard_bytes(mock_rbd(), 123456, 0)
self.assertTrue(len(calls) == 0)
image = mock_rbd().Image()
wrapped_rbd = self._get_wrapped_rbd_io(image)
self.service._discard_bytes(wrapped_rbd, 123456, 1234)
self.assertTrue(len(calls) == 1)
self.stubs.Set(image, 'write', _setter)
wrapped_rbd = self._get_wrapped_rbd_io(image)
self.stubs.Set(self.service, '_file_is_rbd',
lambda *args: False)
self.service._discard_bytes(wrapped_rbd, 0,
self.service.chunk_size * 2)
self.assertTrue(len(calls) == 3)
def test_delete_backup_snapshot(self): def test_delete_backup_snapshot(self):
snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4()) snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
@ -521,50 +552,69 @@ class BackupCephTestCase(test.TestCase):
self.service.delete(backup) self.service.delete(backup)
def test_diff_restore_allowed_true(self): def test_diff_restore_allowed_true(self):
is_allowed = (True, 'restore.foo') restore_point = 'restore.foo'
is_allowed = (True, restore_point)
backup = db.backup_get(self.ctxt, self.backup_id) backup = db.backup_get(self.ctxt, self.backup_id)
alt_volume_id = str(uuid.uuid4()) alt_volume_id = str(uuid.uuid4())
self._create_volume_db_entry(alt_volume_id, 1) volume_size = 1
self._create_volume_db_entry(alt_volume_id, volume_size)
alt_volume = db.volume_get(self.ctxt, alt_volume_id) alt_volume = db.volume_get(self.ctxt, alt_volume_id)
rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image()) rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
self.stubs.Set(self.service, '_get_restore_point', self.stubs.Set(self.service, '_get_restore_point',
lambda *args: 'restore.foo') lambda *args: restore_point)
self.stubs.Set(self.service, '_rbd_has_extents', self.stubs.Set(self.service, '_rbd_has_extents',
lambda *args: False) lambda *args: False)
self.stubs.Set(self.service, '_rbd_image_exists', self.stubs.Set(self.service, '_rbd_image_exists',
lambda *args: (True, 'foo')) lambda *args: (True, 'foo'))
self.stubs.Set(self.service, '_file_is_rbd',
self.stubs.Set(self.service, '_file_is_rbd', lambda *args: True) lambda *args: True)
self.stubs.Set(self.service.rbd.Image, 'size',
lambda *args: volume_size * units.GiB)
resp = self.service._diff_restore_allowed('foo', backup, alt_volume, resp = self.service._diff_restore_allowed('foo', backup, alt_volume,
rbd_io, mock_rados()) rbd_io, mock_rados())
self.assertEqual(resp, is_allowed) self.assertEqual(resp, is_allowed)
def _set_service_stub(self, method, retval):
self.stubs.Set(self.service, method, lambda *args, **kwargs: retval)
def test_diff_restore_allowed_false(self): def test_diff_restore_allowed_false(self):
volume_size = 1
not_allowed = (False, None) not_allowed = (False, None)
backup = db.backup_get(self.ctxt, self.backup_id) backup = db.backup_get(self.ctxt, self.backup_id)
self._create_volume_db_entry(self.volume_id, 1) self._create_volume_db_entry(self.volume_id, volume_size)
original_volume = db.volume_get(self.ctxt, self.volume_id) original_volume = db.volume_get(self.ctxt, self.volume_id)
rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image()) rbd_io = self._get_wrapped_rbd_io(self.service.rbd.Image())
self.stubs.Set(self.service, '_get_restore_point', test_args = 'foo', backup, original_volume, rbd_io, mock_rados()
lambda *args: None)
self.stubs.Set(self.service, '_rbd_has_extents', self._set_service_stub('_get_restore_point', None)
lambda *args: True) resp = self.service._diff_restore_allowed(*test_args)
self.stubs.Set(self.service, '_rbd_image_exists',
lambda *args: (False, 'foo'))
self.stubs.Set(self.service, '_file_is_rbd', lambda *args: False)
resp = self.service._diff_restore_allowed('foo', backup,
original_volume, rbd_io,
mock_rados())
self.assertEqual(resp, not_allowed) self.assertEqual(resp, not_allowed)
self._set_service_stub('_get_restore_point', 'restore.foo')
self._set_service_stub('_rbd_has_extents', True)
resp = self.service._diff_restore_allowed(*test_args)
self.assertEqual(resp, not_allowed)
self._set_service_stub('_rbd_has_extents', False)
self._set_service_stub('_rbd_image_exists', (False, 'foo'))
resp = self.service._diff_restore_allowed(*test_args)
self.assertEqual(resp, not_allowed)
self._set_service_stub('_rbd_image_exists', None)
self.stubs.Set(self.service.rbd.Image, 'size',
lambda *args, **kwargs: volume_size * units.GiB * 2)
resp = self.service._diff_restore_allowed(*test_args)
self.assertEqual(resp, not_allowed)
self.stubs.Set(self.service.rbd.Image, 'size',
lambda *args, **kwargs: volume_size * units.GiB)
self._set_service_stub('_file_is_rbd', False)
resp = self.service._diff_restore_allowed(*test_args)
self.assertEqual(resp, not_allowed)
self._set_service_stub('_file_is_rbd', True)
def tearDown(self): def tearDown(self):
self.volume_file.close() self.volume_file.close()

View File

@ -155,7 +155,7 @@ class RBDImageIOWrapper(io.RawIOBase):
elif whence == 1: elif whence == 1:
new_offset = self._offset + offset new_offset = self._offset + offset
elif whence == 2: elif whence == 2:
new_offset = self.volume.size() - 1 new_offset = self._rbd_meta.image.size()
new_offset += offset new_offset += offset
else: else:
raise IOError(_("Invalid argument - whence=%s not supported") % raise IOError(_("Invalid argument - whence=%s not supported") %
@ -774,9 +774,8 @@ class RBDDriver(driver.VolumeDriver):
"""Create a new backup from an existing volume.""" """Create a new backup from an existing volume."""
volume = self.db.volume_get(context, backup['volume_id']) volume = self.db.volume_get(context, backup['volume_id'])
pool = self.configuration.rbd_pool pool = self.configuration.rbd_pool
volname = volume['name']
with RBDVolumeProxy(self, volname, pool) as rbd_image: with RBDVolumeProxy(self, volume['name'], pool) as rbd_image:
rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool, rbd_meta = RBDImageMetadata(rbd_image, self.configuration.rbd_pool,
self.configuration.rbd_user, self.configuration.rbd_user,
self.configuration.rbd_ceph_conf) self.configuration.rbd_ceph_conf)

View File

@ -172,6 +172,10 @@
# (integer value) # (integer value)
#backup_ceph_stripe_count=0 #backup_ceph_stripe_count=0
# If True, always discard excess bytes when restoring volumes.
# (boolean value)
#restore_discard_excess_bytes=true
# #
# Options defined in cinder.backup.drivers.swift # Options defined in cinder.backup.drivers.swift
@ -1764,4 +1768,4 @@
#volume_dd_blocksize=1M #volume_dd_blocksize=1M
# Total option count: 378 # Total option count: 379