Merge "libvirt:Rsync remote FS driver was added"
This commit is contained in:
@@ -9999,14 +9999,20 @@ class LibvirtConnTestCase(test.NoDBTestCase):
|
|||||||
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
|
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
|
||||||
drvr.get_host_ip_addr = mock.MagicMock(return_value='bar')
|
drvr.get_host_ip_addr = mock.MagicMock(return_value='bar')
|
||||||
mock_exists.return_value = is_same
|
mock_exists.return_value = is_same
|
||||||
with mock.patch('nova.utils.ssh_execute') as mock_ssh_method:
|
with contextlib.nested(
|
||||||
result = drvr._is_storage_shared_with('foo', '/path')
|
mock.patch.object(drvr._remotefs, 'create_file'),
|
||||||
mock_ssh_method.assert_any_call('foo', 'touch', mock.ANY)
|
mock.patch.object(drvr._remotefs, 'remove_file')
|
||||||
|
) as (mock_rem_fs_create, mock_rem_fs_remove):
|
||||||
|
result = drvr._is_storage_shared_with('host', '/path')
|
||||||
|
mock_rem_fs_create.assert_any_call('host', mock.ANY)
|
||||||
|
create_args, create_kwargs = mock_rem_fs_create.call_args
|
||||||
|
self.assertTrue(create_args[1].startswith('/path'))
|
||||||
if is_same:
|
if is_same:
|
||||||
mock_unlink.assert_called_once_with(mock.ANY)
|
mock_unlink.assert_called_once_with(mock.ANY)
|
||||||
else:
|
else:
|
||||||
self.assertEqual(2, mock_ssh_method.call_count)
|
mock_rem_fs_remove.assert_called_with('host', mock.ANY)
|
||||||
mock_ssh_method.assert_called_with('foo', 'rm', mock.ANY)
|
remove_args, remove_kwargs = mock_rem_fs_remove.call_args
|
||||||
|
self.assertTrue(remove_args[1].startswith('/path'))
|
||||||
return result
|
return result
|
||||||
|
|
||||||
def test_shared_storage_detection_same_host(self):
|
def test_shared_storage_detection_same_host(self):
|
||||||
|
|||||||
@@ -57,65 +57,23 @@ blah BLAH: bb
|
|||||||
self.assertEqual('raw', disk_type)
|
self.assertEqual('raw', disk_type)
|
||||||
|
|
||||||
@mock.patch('nova.utils.execute')
|
@mock.patch('nova.utils.execute')
|
||||||
def test_copy_image_local_cp(self, mock_execute):
|
def test_copy_image_local(self, mock_execute):
|
||||||
libvirt_utils.copy_image('src', 'dest')
|
libvirt_utils.copy_image('src', 'dest')
|
||||||
mock_execute.assert_called_once_with('cp', 'src', 'dest')
|
mock_execute.assert_called_once_with('cp', 'src', 'dest')
|
||||||
|
|
||||||
_rsync_call = functools.partial(mock.call,
|
@mock.patch('nova.virt.libvirt.volume.remotefs.SshDriver.copy_file')
|
||||||
'rsync', '--sparse', '--compress',
|
def test_copy_image_remote_ssh(self, mock_rem_fs_remove):
|
||||||
on_execute=None, on_completion=None)
|
self.flags(remote_filesystem_transport='ssh', group='libvirt')
|
||||||
|
|
||||||
@mock.patch('nova.utils.execute')
|
|
||||||
def test_copy_image_rsync(self, mock_execute):
|
|
||||||
libvirt_utils.copy_image('src', 'dest', host='host')
|
libvirt_utils.copy_image('src', 'dest', host='host')
|
||||||
|
mock_rem_fs_remove.assert_called_once_with('src', 'host:dest',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
|
||||||
mock_execute.assert_has_calls([
|
@mock.patch('nova.virt.libvirt.volume.remotefs.RsyncDriver.copy_file')
|
||||||
self._rsync_call('--dry-run', 'src', 'host:dest'),
|
def test_copy_image_remote_rsync(self, mock_rem_fs_remove):
|
||||||
self._rsync_call('src', 'host:dest'),
|
self.flags(remote_filesystem_transport='rsync', group='libvirt')
|
||||||
])
|
|
||||||
self.assertEqual(2, mock_execute.call_count)
|
|
||||||
|
|
||||||
@mock.patch('nova.utils.execute')
|
|
||||||
def test_copy_image_scp(self, mock_execute):
|
|
||||||
mock_execute.side_effect = [
|
|
||||||
processutils.ProcessExecutionError,
|
|
||||||
mock.DEFAULT,
|
|
||||||
]
|
|
||||||
|
|
||||||
libvirt_utils.copy_image('src', 'dest', host='host')
|
libvirt_utils.copy_image('src', 'dest', host='host')
|
||||||
|
mock_rem_fs_remove.assert_called_once_with('src', 'host:dest',
|
||||||
mock_execute.assert_has_calls([
|
on_completion=None, on_execute=None)
|
||||||
self._rsync_call('--dry-run', 'src', 'host:dest'),
|
|
||||||
mock.call('scp', 'src', 'host:dest',
|
|
||||||
on_execute=None, on_completion=None),
|
|
||||||
])
|
|
||||||
self.assertEqual(2, mock_execute.call_count)
|
|
||||||
|
|
||||||
@mock.patch('nova.utils.execute')
|
|
||||||
def test_copy_image_rsync_ipv6(self, mock_execute):
|
|
||||||
libvirt_utils.copy_image('src', 'dest', host='2600::')
|
|
||||||
|
|
||||||
mock_execute.assert_has_calls([
|
|
||||||
self._rsync_call('--dry-run', 'src', '[2600::]:dest'),
|
|
||||||
self._rsync_call('src', '[2600::]:dest'),
|
|
||||||
])
|
|
||||||
self.assertEqual(2, mock_execute.call_count)
|
|
||||||
|
|
||||||
@mock.patch('nova.utils.execute')
|
|
||||||
def test_copy_image_scp_ipv6(self, mock_execute):
|
|
||||||
mock_execute.side_effect = [
|
|
||||||
processutils.ProcessExecutionError,
|
|
||||||
mock.DEFAULT,
|
|
||||||
]
|
|
||||||
|
|
||||||
libvirt_utils.copy_image('src', 'dest', host='2600::')
|
|
||||||
|
|
||||||
mock_execute.assert_has_calls([
|
|
||||||
self._rsync_call('--dry-run', 'src', '[2600::]:dest'),
|
|
||||||
mock.call('scp', 'src', '[2600::]:dest',
|
|
||||||
on_execute=None, on_completion=None),
|
|
||||||
])
|
|
||||||
self.assertEqual(2, mock_execute.call_count)
|
|
||||||
|
|
||||||
@mock.patch('os.path.exists', return_value=True)
|
@mock.patch('os.path.exists', return_value=True)
|
||||||
def test_disk_type(self, mock_exists):
|
def test_disk_type(self, mock_exists):
|
||||||
|
|||||||
@@ -58,3 +58,125 @@ class RemoteFSTestCase(test.NoDBTestCase):
|
|||||||
mock_execute.assert_any_call('umount', mock.sentinel.mount_path,
|
mock_execute.assert_any_call('umount', mock.sentinel.mount_path,
|
||||||
run_as_root=True, attempts=3,
|
run_as_root=True, attempts=3,
|
||||||
delay_on_retry=True)
|
delay_on_retry=True)
|
||||||
|
|
||||||
|
@mock.patch('tempfile.mkdtemp', return_value='/tmp/Mercury')
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_remove_remote_file_rsync(self, mock_execute, mock_mkdtemp):
|
||||||
|
remotefs.RsyncDriver().remove_file('host', 'dest', None, None)
|
||||||
|
rsync_call_args = mock.call('rsync', '--archive',
|
||||||
|
'--delete', '--include',
|
||||||
|
'dest', '--exclude', '*',
|
||||||
|
'/tmp/Mercury/', 'host:',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
self.assertEqual(mock_execute.mock_calls[0], rsync_call_args)
|
||||||
|
rm_call_args = mock.call('rm', '-rf', '/tmp/Mercury')
|
||||||
|
self.assertEqual(mock_execute.mock_calls[1], rm_call_args)
|
||||||
|
self.assertEqual(2, mock_execute.call_count)
|
||||||
|
self.assertEqual(1, mock_mkdtemp.call_count)
|
||||||
|
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_remove_remote_file_ssh(self, mock_execute):
|
||||||
|
remotefs.SshDriver().remove_file('host', 'dest', None, None)
|
||||||
|
mock_execute.assert_called_once_with(
|
||||||
|
'ssh', 'host', 'rm', 'dest',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
|
||||||
|
@mock.patch('tempfile.mkdtemp', return_value='/tmp/Venus')
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_remove_remote_dir_rsync(self, mock_execute, mock_mkdtemp):
|
||||||
|
remotefs.RsyncDriver().remove_dir('host', 'dest', None, None)
|
||||||
|
rsync_call_args = mock.call('rsync', '--archive',
|
||||||
|
'--delete-excluded', '/tmp/Venus/',
|
||||||
|
'host:dest',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
self.assertEqual(mock_execute.mock_calls[0], rsync_call_args)
|
||||||
|
rsync_call_args = mock.call('rsync', '--archive',
|
||||||
|
'--delete', '--include',
|
||||||
|
'dest', '--exclude', '*',
|
||||||
|
'/tmp/Venus/', 'host:',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
self.assertEqual(mock_execute.mock_calls[1], rsync_call_args)
|
||||||
|
rm_call_args = mock.call('rm', '-rf', '/tmp/Venus')
|
||||||
|
self.assertEqual(mock_execute.mock_calls[2], rm_call_args)
|
||||||
|
self.assertEqual(3, mock_execute.call_count)
|
||||||
|
self.assertEqual(1, mock_mkdtemp.call_count)
|
||||||
|
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_remove_remote_dir_ssh(self, mock_execute):
|
||||||
|
remotefs.SshDriver().remove_dir('host', 'dest', None, None)
|
||||||
|
mock_execute.assert_called_once_with(
|
||||||
|
'ssh', 'host', 'rm', '-rf', 'dest', on_completion=None,
|
||||||
|
on_execute=None)
|
||||||
|
|
||||||
|
@mock.patch('tempfile.mkdtemp', return_value='/tmp/Mars')
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_create_remote_file_rsync(self, mock_execute, mock_mkdtemp):
|
||||||
|
remotefs.RsyncDriver().create_file('host', 'dest_dir', None, None)
|
||||||
|
mkdir_call_args = mock.call('mkdir', '-p', '/tmp/Mars/',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
self.assertEqual(mock_execute.mock_calls[0], mkdir_call_args)
|
||||||
|
touch_call_args = mock.call('touch', '/tmp/Mars/dest_dir',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
self.assertEqual(mock_execute.mock_calls[1], touch_call_args)
|
||||||
|
rsync_call_args = mock.call('rsync', '--archive', '--relative',
|
||||||
|
'--no-implied-dirs',
|
||||||
|
'/tmp/Mars/./dest_dir', 'host:/',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
self.assertEqual(mock_execute.mock_calls[2], rsync_call_args)
|
||||||
|
rm_call_args = mock.call('rm', '-rf', '/tmp/Mars')
|
||||||
|
self.assertEqual(mock_execute.mock_calls[3], rm_call_args)
|
||||||
|
self.assertEqual(4, mock_execute.call_count)
|
||||||
|
self.assertEqual(1, mock_mkdtemp.call_count)
|
||||||
|
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_create_remote_file_ssh(self, mock_execute):
|
||||||
|
remotefs.SshDriver().create_file('host', 'dest_dir', None, None)
|
||||||
|
mock_execute.assert_called_once_with('ssh', 'host',
|
||||||
|
'touch', 'dest_dir',
|
||||||
|
on_completion=None,
|
||||||
|
on_execute=None)
|
||||||
|
|
||||||
|
@mock.patch('tempfile.mkdtemp', return_value='/tmp/Jupiter')
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_create_remote_dir_rsync(self, mock_execute, mock_mkdtemp):
|
||||||
|
remotefs.RsyncDriver().create_dir('host', 'dest_dir', None, None)
|
||||||
|
mkdir_call_args = mock.call('mkdir', '-p', '/tmp/Jupiter/dest_dir',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
self.assertEqual(mock_execute.mock_calls[0], mkdir_call_args)
|
||||||
|
rsync_call_args = mock.call('rsync', '--archive', '--relative',
|
||||||
|
'--no-implied-dirs',
|
||||||
|
'/tmp/Jupiter/./dest_dir', 'host:/',
|
||||||
|
on_completion=None, on_execute=None)
|
||||||
|
self.assertEqual(mock_execute.mock_calls[1], rsync_call_args)
|
||||||
|
rm_call_args = mock.call('rm', '-rf', '/tmp/Jupiter')
|
||||||
|
self.assertEqual(mock_execute.mock_calls[2], rm_call_args)
|
||||||
|
self.assertEqual(3, mock_execute.call_count)
|
||||||
|
self.assertEqual(1, mock_mkdtemp.call_count)
|
||||||
|
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_create_remote_dir_ssh(self, mock_execute):
|
||||||
|
remotefs.SshDriver().create_dir('host', 'dest_dir', None, None)
|
||||||
|
mock_execute.assert_called_once_with('ssh', 'host', 'mkdir',
|
||||||
|
'-p', 'dest_dir',
|
||||||
|
on_completion=None,
|
||||||
|
on_execute=None)
|
||||||
|
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_remote_copy_file_rsync(self, mock_execute):
|
||||||
|
remotefs.RsyncDriver().copy_file('1.2.3.4:/home/star_wars',
|
||||||
|
'/home/favourite', None, None)
|
||||||
|
mock_execute.assert_called_once_with('rsync', '--sparse', '--compress',
|
||||||
|
'1.2.3.4:/home/star_wars',
|
||||||
|
'/home/favourite',
|
||||||
|
on_completion=None,
|
||||||
|
on_execute=None)
|
||||||
|
|
||||||
|
@mock.patch('nova.utils.execute')
|
||||||
|
def test_remote_copy_file_ssh(self, mock_execute):
|
||||||
|
remotefs.SshDriver().copy_file('1.2.3.4:/home/SpaceOdyssey',
|
||||||
|
'/home/favourite', None, None)
|
||||||
|
mock_execute.assert_called_once_with('scp',
|
||||||
|
'1.2.3.4:/home/SpaceOdyssey',
|
||||||
|
'/home/favourite',
|
||||||
|
on_completion=None,
|
||||||
|
on_execute=None)
|
||||||
|
|||||||
@@ -103,6 +103,7 @@ from nova.virt.libvirt.storage import lvm
|
|||||||
from nova.virt.libvirt.storage import rbd_utils
|
from nova.virt.libvirt.storage import rbd_utils
|
||||||
from nova.virt.libvirt import utils as libvirt_utils
|
from nova.virt.libvirt import utils as libvirt_utils
|
||||||
from nova.virt.libvirt import vif as libvirt_vif
|
from nova.virt.libvirt import vif as libvirt_vif
|
||||||
|
from nova.virt.libvirt.volume import remotefs
|
||||||
from nova.virt import netutils
|
from nova.virt import netutils
|
||||||
from nova.virt import watchdog_actions
|
from nova.virt import watchdog_actions
|
||||||
from nova import volume
|
from nova import volume
|
||||||
@@ -477,6 +478,7 @@ class LibvirtDriver(driver.ComputeDriver):
|
|||||||
sysinfo_serial_funcs.keys())})
|
sysinfo_serial_funcs.keys())})
|
||||||
|
|
||||||
self.job_tracker = instancejobtracker.InstanceJobTracker()
|
self.job_tracker = instancejobtracker.InstanceJobTracker()
|
||||||
|
self._remotefs = remotefs.RemoteFilesystem()
|
||||||
|
|
||||||
def _get_volume_drivers(self):
|
def _get_volume_drivers(self):
|
||||||
return libvirt_volume_drivers
|
return libvirt_volume_drivers
|
||||||
@@ -6270,7 +6272,7 @@ class LibvirtDriver(driver.ComputeDriver):
|
|||||||
utils.execute('rm', '-rf', inst_base)
|
utils.execute('rm', '-rf', inst_base)
|
||||||
utils.execute('mv', inst_base_resize, inst_base)
|
utils.execute('mv', inst_base_resize, inst_base)
|
||||||
if not shared_storage:
|
if not shared_storage:
|
||||||
utils.ssh_execute(dest, 'rm', '-rf', inst_base)
|
self._remotefs.remove_dir(dest, inst_base)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
@@ -6285,12 +6287,12 @@ class LibvirtDriver(driver.ComputeDriver):
|
|||||||
tmp_path = os.path.join(inst_base, tmp_file)
|
tmp_path = os.path.join(inst_base, tmp_file)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
utils.ssh_execute(dest, 'touch', tmp_path)
|
self._remotefs.create_file(dest, tmp_path)
|
||||||
if os.path.exists(tmp_path):
|
if os.path.exists(tmp_path):
|
||||||
shared_storage = True
|
shared_storage = True
|
||||||
os.unlink(tmp_path)
|
os.unlink(tmp_path)
|
||||||
else:
|
else:
|
||||||
utils.ssh_execute(dest, 'rm', tmp_path)
|
self._remotefs.remove_file(dest, tmp_path)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return shared_storage
|
return shared_storage
|
||||||
@@ -6344,7 +6346,7 @@ class LibvirtDriver(driver.ComputeDriver):
|
|||||||
# failures here earlier
|
# failures here earlier
|
||||||
if not shared_storage:
|
if not shared_storage:
|
||||||
try:
|
try:
|
||||||
utils.ssh_execute(dest, 'mkdir', '-p', inst_base)
|
self._remotefs.create_dir(dest, inst_base)
|
||||||
except processutils.ProcessExecutionError as e:
|
except processutils.ProcessExecutionError as e:
|
||||||
reason = _("not able to execute ssh command: %s") % e
|
reason = _("not able to execute ssh command: %s") % e
|
||||||
raise exception.InstanceFaultRollback(
|
raise exception.InstanceFaultRollback(
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ from nova.i18n import _LI
|
|||||||
from nova import utils
|
from nova import utils
|
||||||
from nova.virt import images
|
from nova.virt import images
|
||||||
from nova.virt.libvirt import config as vconfig
|
from nova.virt.libvirt import config as vconfig
|
||||||
|
from nova.virt.libvirt.volume import remotefs
|
||||||
from nova.virt import volumeutils
|
from nova.virt import volumeutils
|
||||||
|
|
||||||
libvirt_opts = [
|
libvirt_opts = [
|
||||||
@@ -206,22 +207,10 @@ def copy_image(src, dest, host=None, receive=False,
|
|||||||
src = "%s:%s" % (utils.safe_ip_format(host), src)
|
src = "%s:%s" % (utils.safe_ip_format(host), src)
|
||||||
else:
|
else:
|
||||||
dest = "%s:%s" % (utils.safe_ip_format(host), dest)
|
dest = "%s:%s" % (utils.safe_ip_format(host), dest)
|
||||||
# Try rsync first as that can compress and create sparse dest files.
|
|
||||||
# Note however that rsync currently doesn't read sparse files
|
remote_filesystem_driver = remotefs.RemoteFilesystem()
|
||||||
# efficiently: https://bugzilla.samba.org/show_bug.cgi?id=8918
|
remote_filesystem_driver.copy_file(src, dest,
|
||||||
# At least network traffic is mitigated with compression.
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
try:
|
|
||||||
# Do a relatively light weight test first, so that we
|
|
||||||
# can fall back to scp, without having run out of space
|
|
||||||
# on the destination for example.
|
|
||||||
execute('rsync', '--sparse', '--compress', '--dry-run', src, dest,
|
|
||||||
on_execute=on_execute, on_completion=on_completion)
|
|
||||||
except processutils.ProcessExecutionError:
|
|
||||||
execute('scp', src, dest, on_execute=on_execute,
|
|
||||||
on_completion=on_completion)
|
|
||||||
else:
|
|
||||||
execute('rsync', '--sparse', '--compress', src, dest,
|
|
||||||
on_execute=on_execute, on_completion=on_completion)
|
|
||||||
|
|
||||||
|
|
||||||
def write_to_file(path, contents, umask=None):
|
def write_to_file(path, contents, umask=None):
|
||||||
|
|||||||
@@ -13,14 +13,33 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import functools
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
|
||||||
from oslo_concurrency import processutils
|
from oslo_concurrency import processutils
|
||||||
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
from oslo_utils import importutils
|
||||||
|
import six
|
||||||
|
|
||||||
from nova.i18n import _LE, _LW
|
from nova.i18n import _LE, _LW
|
||||||
from nova import utils
|
from nova import utils
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
libvirt_opts = [
|
||||||
|
cfg.StrOpt('remote_filesystem_transport',
|
||||||
|
default='ssh',
|
||||||
|
choices=('ssh', 'rsync'),
|
||||||
|
help='Use ssh or rsync transport for creating, copying, '
|
||||||
|
'removing files on the remote host.'),
|
||||||
|
]
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
CONF.register_opts(libvirt_opts, 'libvirt')
|
||||||
|
|
||||||
|
|
||||||
def mount_share(mount_path, export_path,
|
def mount_share(mount_path, export_path,
|
||||||
export_type, options=None):
|
export_type, options=None):
|
||||||
@@ -62,3 +81,256 @@ def unmount_share(mount_path, export_path):
|
|||||||
else:
|
else:
|
||||||
LOG.exception(_LE("Couldn't unmount the share %s"),
|
LOG.exception(_LE("Couldn't unmount the share %s"),
|
||||||
export_path)
|
export_path)
|
||||||
|
|
||||||
|
|
||||||
|
class RemoteFilesystem(object):
|
||||||
|
"""Represents actions that can be taken on a remote host's filesystem."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
transport = CONF.libvirt.remote_filesystem_transport
|
||||||
|
cls_name = '.'.join([__name__, transport.capitalize()])
|
||||||
|
cls_name += 'Driver'
|
||||||
|
self.driver = importutils.import_object(cls_name)
|
||||||
|
|
||||||
|
def create_file(self, host, dst_path, on_execute=None,
|
||||||
|
on_completion=None):
|
||||||
|
LOG.debug("Creating file %s on remote host %s", dst_path, host)
|
||||||
|
self.driver.create_file(host, dst_path, on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
def remove_file(self, host, dst_path, on_execute=None,
|
||||||
|
on_completion=None):
|
||||||
|
LOG.debug("Removing file %s on remote host %s", dst_path, host)
|
||||||
|
self.driver.remove_file(host, dst_path, on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
def create_dir(self, host, dst_path, on_execute=None,
|
||||||
|
on_completion=None):
|
||||||
|
LOG.debug("Creating directory %s on remote host %s", dst_path, host)
|
||||||
|
self.driver.create_dir(host, dst_path, on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
def remove_dir(self, host, dst_path, on_execute=None,
|
||||||
|
on_completion=None):
|
||||||
|
LOG.debug("Removing directory %s on remote host %s", dst_path, host)
|
||||||
|
self.driver.remove_dir(host, dst_path, on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
def copy_file(self, src, dst, on_execute=None,
|
||||||
|
on_completion=None):
|
||||||
|
LOG.debug("Copying file %s to %s", src, dst)
|
||||||
|
self.driver.copy_file(src, dst, on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
|
||||||
|
@six.add_metaclass(abc.ABCMeta)
|
||||||
|
class RemoteFilesystemDriver(object):
|
||||||
|
@abc.abstractmethod
|
||||||
|
def create_file(self, host, dst_path, on_execute, on_completion):
|
||||||
|
"""Create file on the remote system.
|
||||||
|
|
||||||
|
:param host: Remote host
|
||||||
|
:param dst_path: Destination path
|
||||||
|
:param on_execute: Callback method to store pid of process in cache
|
||||||
|
:param on_completion: Callback method to remove pid of process from
|
||||||
|
cache
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def remove_file(self, host, dst_path, on_execute, on_completion):
|
||||||
|
"""Removes a file on a remote host.
|
||||||
|
|
||||||
|
:param host: Remote host
|
||||||
|
:param dst_path: Destination path
|
||||||
|
:param on_execute: Callback method to store pid of process in cache
|
||||||
|
:param on_completion: Callback method to remove pid of process from
|
||||||
|
cache
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def create_dir(self, host, dst_path, on_execute, on_completion):
|
||||||
|
"""Create directory on the remote system.
|
||||||
|
|
||||||
|
:param host: Remote host
|
||||||
|
:param dst_path: Destination path
|
||||||
|
:param on_execute: Callback method to store pid of process in cache
|
||||||
|
:param on_completion: Callback method to remove pid of process from
|
||||||
|
cache
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def remove_dir(self, host, dst_path, on_execute, on_completion):
|
||||||
|
"""Removes a directory on a remote host.
|
||||||
|
|
||||||
|
:param host: Remote host
|
||||||
|
:param dst_path: Destination path
|
||||||
|
:param on_execute: Callback method to store pid of process in cache
|
||||||
|
:param on_completion: Callback method to remove pid of process from
|
||||||
|
cache
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def copy_file(self, src, dst, on_execute, on_completion):
|
||||||
|
"""Copy file to/from remote host.
|
||||||
|
|
||||||
|
Remote address must be specified in format:
|
||||||
|
REM_HOST_IP_ADDRESS:REM_HOST_PATH
|
||||||
|
For example:
|
||||||
|
192.168.1.10:/home/file
|
||||||
|
|
||||||
|
:param src: Source address
|
||||||
|
:param dst: Destination path
|
||||||
|
:param on_execute: Callback method to store pid of process in cache
|
||||||
|
:param on_completion: Callback method to remove pid of process from
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class SshDriver(RemoteFilesystemDriver):
|
||||||
|
|
||||||
|
def create_file(self, host, dst_path, on_execute, on_completion):
|
||||||
|
utils.execute('ssh', host, 'touch', dst_path,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
def remove_file(self, host, dst, on_execute, on_completion):
|
||||||
|
utils.execute('ssh', host, 'rm', dst,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
def create_dir(self, host, dst_path, on_execute, on_completion):
|
||||||
|
utils.execute('ssh', host, 'mkdir', '-p', dst_path,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
def remove_dir(self, host, dst, on_execute, on_completion):
|
||||||
|
utils.execute('ssh', host, 'rm', '-rf', dst,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
def copy_file(self, src, dst, on_execute, on_completion):
|
||||||
|
utils.execute('scp', src, dst,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
|
||||||
|
def create_tmp_dir(function):
|
||||||
|
"""Creates temporary directory for rsync purposes.
|
||||||
|
Removes created directory in the end.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@functools.wraps(function)
|
||||||
|
def decorated_function(*args, **kwargs):
|
||||||
|
# Create directory
|
||||||
|
tmp_dir_path = tempfile.mkdtemp()
|
||||||
|
kwargs['tmp_dir_path'] = tmp_dir_path
|
||||||
|
|
||||||
|
try:
|
||||||
|
return function(*args, **kwargs)
|
||||||
|
finally:
|
||||||
|
# Remove directory
|
||||||
|
utils.execute('rm', '-rf', tmp_dir_path)
|
||||||
|
|
||||||
|
return decorated_function
|
||||||
|
|
||||||
|
|
||||||
|
class RsyncDriver(RemoteFilesystemDriver):
|
||||||
|
|
||||||
|
@create_tmp_dir
|
||||||
|
def create_file(self, host, dst_path, on_execute, on_completion, **kwargs):
|
||||||
|
dir_path = os.path.dirname(os.path.normpath(dst_path))
|
||||||
|
|
||||||
|
# Create target dir inside temporary directory
|
||||||
|
local_tmp_dir = os.path.join(kwargs['tmp_dir_path'],
|
||||||
|
dir_path.strip(os.path.sep))
|
||||||
|
utils.execute('mkdir', '-p', local_tmp_dir,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
# Create file in directory
|
||||||
|
file_name = os.path.basename(os.path.normpath(dst_path))
|
||||||
|
local_tmp_file = os.path.join(local_tmp_dir, file_name)
|
||||||
|
utils.execute('touch', local_tmp_file,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
RsyncDriver._synchronize_object(kwargs['tmp_dir_path'],
|
||||||
|
host, dst_path,
|
||||||
|
on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
@create_tmp_dir
|
||||||
|
def remove_file(self, host, dst, on_execute, on_completion, **kwargs):
|
||||||
|
# Delete file
|
||||||
|
RsyncDriver._remove_object(kwargs['tmp_dir_path'], host, dst,
|
||||||
|
on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
@create_tmp_dir
|
||||||
|
def create_dir(self, host, dst_path, on_execute, on_completion, **kwargs):
|
||||||
|
dir_path = os.path.normpath(dst_path)
|
||||||
|
|
||||||
|
# Create target dir inside temporary directory
|
||||||
|
local_tmp_dir = os.path.join(kwargs['tmp_dir_path'],
|
||||||
|
dir_path.strip(os.path.sep))
|
||||||
|
utils.execute('mkdir', '-p', local_tmp_dir,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
RsyncDriver._synchronize_object(kwargs['tmp_dir_path'],
|
||||||
|
host, dst_path,
|
||||||
|
on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
@create_tmp_dir
|
||||||
|
def remove_dir(self, host, dst, on_execute, on_completion, **kwargs):
|
||||||
|
# Remove remote directory's content
|
||||||
|
utils.execute('rsync', '--archive', '--delete-excluded',
|
||||||
|
kwargs['tmp_dir_path'] + os.path.sep,
|
||||||
|
'%s:%s' % (host, dst),
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
# Delete empty directory
|
||||||
|
RsyncDriver._remove_object(kwargs['tmp_dir_path'], host, dst,
|
||||||
|
on_execute=on_execute,
|
||||||
|
on_completion=on_completion)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _remove_object(src, host, dst, on_execute, on_completion):
|
||||||
|
"""Removes a file or empty directory on a remote host.
|
||||||
|
|
||||||
|
:param src: Empty directory used for rsync purposes
|
||||||
|
:param host: Remote host
|
||||||
|
:param dst: Destination path
|
||||||
|
:param on_execute: Callback method to store pid of process in cache
|
||||||
|
:param on_completion: Callback method to remove pid of process from
|
||||||
|
cache
|
||||||
|
"""
|
||||||
|
utils.execute('rsync', '--archive', '--delete',
|
||||||
|
'--include', os.path.basename(os.path.normpath(dst)),
|
||||||
|
'--exclude', '*',
|
||||||
|
os.path.normpath(src) + os.path.sep,
|
||||||
|
'%s:%s' % (host, os.path.dirname(os.path.normpath(dst))),
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _synchronize_object(src, host, dst, on_execute, on_completion):
|
||||||
|
"""Creates a file or empty directory on a remote host.
|
||||||
|
|
||||||
|
:param src: Empty directory used for rsync purposes
|
||||||
|
:param host: Remote host
|
||||||
|
:param dst: Destination path
|
||||||
|
:param on_execute: Callback method to store pid of process in cache
|
||||||
|
:param on_completion: Callback method to remove pid of process from
|
||||||
|
cache
|
||||||
|
"""
|
||||||
|
|
||||||
|
# For creating path on the remote host rsync --relative path must
|
||||||
|
# be used. With a modern rsync on the sending side (beginning with
|
||||||
|
# 2.6.7), you can insert a dot and a slash into the source path,
|
||||||
|
# like this:
|
||||||
|
# rsync -avR /foo/./bar/baz.c remote:/tmp/
|
||||||
|
# That would create /tmp/bar/baz.c on the remote machine.
|
||||||
|
# (Note that the dot must be followed by a slash, so "/foo/."
|
||||||
|
# would not be abbreviated.)
|
||||||
|
relative_tmp_file_path = os.path.join(
|
||||||
|
src, './',
|
||||||
|
os.path.normpath(dst).strip(os.path.sep))
|
||||||
|
|
||||||
|
# Do relative rsync local directory with remote root directory
|
||||||
|
utils.execute('rsync', '--archive', '--relative', '--no-implied-dirs',
|
||||||
|
relative_tmp_file_path, '%s:%s' % (host, os.path.sep),
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|
||||||
|
def copy_file(self, src, dst, on_execute, on_completion):
|
||||||
|
utils.execute('rsync', '--sparse', '--compress', src, dst,
|
||||||
|
on_execute=on_execute, on_completion=on_completion)
|
||||||
|
|||||||
Reference in New Issue
Block a user