Use pipe between ceph backup diff export/import

We now use a piped transfer between the rbd export-diff
and import-diff for incremental backups/restores as
opposed to holding the entire diff in memory.

Change-Id: I33476d9b3934781413af5cd2867a11d825a5d78e
Fixes: bug 1244464
This commit is contained in:
Edward Hope-Morley 2013-10-25 10:57:55 -07:00
parent 75bcf70ed4
commit d384d28e1c
2 changed files with 104 additions and 35 deletions

View File

@ -41,8 +41,10 @@ was deemed the safest action to take. It is therefore recommended to always
restore to a new volume (default).
"""
import fcntl
import os
import re
import subprocess
import time
import eventlet
@ -51,7 +53,6 @@ from oslo.config import cfg
from cinder.backup.driver import BackupDriver
from cinder import exception
from cinder.openstack.common import log as logging
from cinder.openstack.common import processutils
from cinder import units
from cinder import utils
import cinder.volume.drivers.rbd as rbd_driver
@ -410,6 +411,36 @@ class CephBackupDriver(BackupDriver):
finally:
src_rbd.close()
def _piped_execute(self, cmd1, cmd2):
"""Pipe output of cmd1 into cmd2."""
LOG.debug("piping cmd1='%s' into..." % (' '.join(cmd1)))
LOG.debug("cmd2='%s'" % (' '.join(cmd2)))
try:
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError as e:
LOG.error("pipe1 failed - %s " % unicode(e))
raise
# NOTE(dosaboy): ensure that the pipe is blocking. This is to work
# around the case where evenlet.green.subprocess is used which seems to
# use a non-blocking pipe.
flags = fcntl.fcntl(p1.stdout, fcntl.F_GETFL) & (~os.O_NONBLOCK)
fcntl.fcntl(p1.stdout, fcntl.F_SETFL, flags)
try:
p2 = subprocess.Popen(cmd2, stdin=p1.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError as e:
LOG.error("pipe2 failed - %s " % unicode(e))
raise
p1.stdout.close()
stdout, stderr = p2.communicate()
return p2.returncode, stderr
def _rbd_diff_transfer(self, src_name, src_pool, dest_name, dest_pool,
src_user, src_conf, dest_user, dest_conf,
src_snap=None, from_snap=None):
@ -430,29 +461,22 @@ class CephBackupDriver(BackupDriver):
src_ceph_args = self._ceph_args(src_user, src_conf, pool=src_pool)
dest_ceph_args = self._ceph_args(dest_user, dest_conf, pool=dest_pool)
cmd = ['rbd', 'export-diff'] + src_ceph_args
cmd1 = ['rbd', 'export-diff'] + src_ceph_args
if from_snap is not None:
cmd.extend(['--from-snap', from_snap])
cmd1.extend(['--from-snap', from_snap])
if src_snap:
path = self._utf8("%s/%s@%s" % (src_pool, src_name, src_snap))
else:
path = self._utf8("%s/%s" % (src_pool, src_name))
cmd.extend([path, '-'])
try:
out, err = self._execute(*cmd)
except (processutils.ProcessExecutionError,
processutils.UnknownArgumentError) as exc:
msg = _("rbd export-diff failed - %s") % (str(exc))
LOG.info(msg)
raise exception.BackupRBDOperationFailed(msg)
cmd1.extend([path, '-'])
cmd = ['rbd', 'import-diff'] + dest_ceph_args
cmd.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))])
try:
out, err = self._execute(*cmd, process_input=out)
except (processutils.ProcessExecutionError,
processutils.UnknownArgumentError) as exc:
msg = _("rbd import-diff failed - %s") % (str(exc))
cmd2 = ['rbd', 'import-diff'] + dest_ceph_args
cmd2.extend(['-', self._utf8("%s/%s" % (dest_pool, dest_name))])
ret, stderr = self._piped_execute(cmd1, cmd2)
if ret:
msg = (_("rbd diff op failed - (ret=%(ret)s stderr=%(stderr)s)") %
({'ret': ret, 'stderr': stderr}))
LOG.info(msg)
raise exception.BackupRBDOperationFailed(msg)

View File

@ -14,8 +14,10 @@
# under the License.
""" Tests for Ceph backup service """
import fcntl
import hashlib
import os
import subprocess
import tempfile
import time
import uuid
@ -60,6 +62,31 @@ class BackupCephTestCase(test.TestCase):
'user_foo', 'conf_foo')
return rbddriver.RBDImageIOWrapper(rbd_meta)
def _setup_mock_popen(self, inst, retval=None, p1hook=None, p2hook=None):
class stdout(object):
def close(self):
inst.called.append('stdout_close')
class FakePopen(object):
PASS = 0
def __init__(self, cmd, *args, **kwargs):
inst.called.append('popen_init')
self.stdout = stdout()
self.returncode = 0
self.__class__.PASS += 1
if self.__class__.PASS == 1 and p1hook:
p1hook()
elif self.__class__.PASS == 2 and p2hook:
p2hook()
def communicate(self):
inst.called.append('communicate')
return retval
self.stubs.Set(subprocess, 'Popen', FakePopen)
def setUp(self):
super(BackupCephTestCase, self).setUp()
self.ctxt = context.get_admin_context()
@ -101,6 +128,13 @@ class BackupCephTestCase(test.TestCase):
self.stubs.Set(time, 'time', self.time_inc)
self.stubs.Set(eventlet, 'sleep', lambda *args: None)
# Used to collect info on what was called during a test
self.called = []
# Do this to ensure that any test ending up in a subprocess fails if
# not properly mocked.
self.stubs.Set(subprocess, 'Popen', None)
def test_get_rbd_support(self):
self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_LAYERING'))
self.assertFalse(hasattr(self.service.rbd, 'RBD_FEATURE_STRIPINGV2'))
@ -327,21 +361,30 @@ class BackupCephTestCase(test.TestCase):
self.stubs.Set(self.service, '_try_delete_base_image',
lambda *args, **kwargs: None)
self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)
with tempfile.NamedTemporaryFile() as test_file:
checksum = hashlib.sha256()
def write_data(inst, data, offset):
def write_data():
self.volume_file.seek(0)
data = self.volume_file.read(self.length)
self.called.append('write')
checksum.update(data)
test_file.write(data)
def read_data(inst, offset, length):
def read_data():
self.called.append('read')
return self.volume_file.read(self.length)
def rbd_list(inst, ioctx):
self.called.append('list')
return [backup_name]
self.stubs.Set(self.service.rbd.Image, 'read', read_data)
self.stubs.Set(self.service.rbd.Image, 'write', write_data)
self._setup_mock_popen(self, ['out', 'err'],
p1hook=read_data,
p2hook=write_data)
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
self.stubs.Set(self.service, '_discard_bytes',
@ -354,6 +397,10 @@ class BackupCephTestCase(test.TestCase):
self.service.backup(backup, rbd_io)
self.assertEquals(self.called, ['list', 'popen_init', 'read',
'popen_init', 'write',
'stdout_close', 'communicate'])
# Ensure the files are equal
self.assertEqual(checksum.digest(), self.checksum.digest())
@ -463,15 +510,12 @@ class BackupCephTestCase(test.TestCase):
self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
# Must be something mutable
remove_called = []
def remove(inst, ioctx, name):
remove_called.append(True)
self.called.append('remove')
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
self.assertTrue(remove_called[0])
self.assertEquals(self.called, ['remove'])
def test_try_delete_base_image(self):
# don't create volume db entry since it should not be required
@ -486,18 +530,15 @@ class BackupCephTestCase(test.TestCase):
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
# Must be something mutable
remove_called = []
self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)
def remove(inst, ioctx, name):
remove_called.append(True)
self.called.append('remove')
self.stubs.Set(self.service.rbd.RBD, 'remove', remove)
self.service.delete(backup)
self.assertTrue(remove_called[0])
self.assertEquals(self.called, ['remove'])
def test_try_delete_base_image_busy(self):
"""This should induce retries then raise rbd.ImageBusy."""
@ -513,9 +554,6 @@ class BackupCephTestCase(test.TestCase):
self.stubs.Set(self.service.rbd.RBD, 'list', rbd_list)
# Must be something mutable
remove_called = []
self.stubs.Set(self.service, 'get_backup_snaps',
lambda *args, **kwargs: None)
@ -616,6 +654,13 @@ class BackupCephTestCase(test.TestCase):
self.assertEqual(resp, not_allowed)
self._set_service_stub('_file_is_rbd', True)
def test_piped_execute(self):
self.stubs.Set(fcntl, 'fcntl', lambda *args, **kwargs: 0)
self._setup_mock_popen(self, ['out', 'err'])
self.service._piped_execute(['foo'], ['bar'])
self.assertEquals(self.called, ['popen_init', 'popen_init',
'stdout_close', 'communicate'])
def tearDown(self):
self.volume_file.close()
self.stubs.UnsetAll()