Add support for file I/O volume migration
This patch changes the generic volume migration logic to support non-attachable volumes. Non-attachable refers to volume drivers that do not support attachment via the typical iSCSI or similar protocols where a block device is made available on the host machine. Device drivers such as RBD make volumes available to cinder via a file handle that proxies read() and write() calls to the Ceph cluster. This patch improves the generic migration logic to determine whether a migration operation can proceed with dd using block device paths or file operations on handles returned from the os-brick connectors. Changes to the RBD driver are included to correctly rename the target volume during the completion phase of a successful migration. It appears there is still some work to be done for attached in-use volume migration for certain configurations. Successful tests were seen for: LVM to LVM (available and in-use) LVM to/from NFS (available and in-use) LVM to/from Ceph (available) Ceph to LVM (in-use) NFS to/from Ceph (available) Ceph to NFS (in-use) Failures were seen (due to Nova) for the following: LVM to Ceph (in-use) NFS to Ceph (in-use) (Pulled from gate, cinder can no longer pass unit tests) Blueprint: generic-volume-migration Closes-Bug: #1489335 Closes-Bug: #1489337 Change-Id: Iece2776fa751152f97b389ddab426e50c6f79bea
This commit is contained in:
parent
0a3f0232d1
commit
f586043fa9
@ -863,6 +863,30 @@ class RBDTestCase(test.TestCase):
|
||||
self.assertTrue(self.driver.retype(context, fake_volume,
|
||||
fake_type, diff, host))
|
||||
|
||||
@common_mocks
|
||||
def test_update_migrated_volume(self):
|
||||
client = self.mock_client.return_value
|
||||
client.__enter__.return_value = client
|
||||
|
||||
with mock.patch.object(self.driver.rbd.RBD(), 'rename') as mock_rename:
|
||||
context = {}
|
||||
current_volume = {'id': 'curr_id',
|
||||
'name': 'curr_name',
|
||||
'provider_location': 'curr_provider_location'}
|
||||
original_volume = {'id': 'orig_id',
|
||||
'name': 'orig_name',
|
||||
'provider_location': 'orig_provider_location'}
|
||||
mock_rename.return_value = 0
|
||||
model_update = self.driver.update_migrated_volume(context,
|
||||
original_volume,
|
||||
current_volume,
|
||||
'available')
|
||||
mock_rename.assert_called_with(client.ioctx,
|
||||
'volume-%s' % current_volume['id'],
|
||||
'volume-%s' % original_volume['id'])
|
||||
self.assertEqual({'_name_id': None,
|
||||
'provider_location': None}, model_update)
|
||||
|
||||
def test_rbd_volume_proxy_init(self):
|
||||
mock_driver = mock.Mock(name='driver')
|
||||
mock_driver._connect_to_rados.return_value = (None, None)
|
||||
|
@ -40,7 +40,6 @@ from taskflow.engines.action_engine import engine
|
||||
|
||||
from cinder.api import common
|
||||
from cinder.brick.local_dev import lvm as brick_lvm
|
||||
from cinder.compute import nova
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
@ -4243,20 +4242,21 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
self.assertEqual('error', volume['migration_status'])
|
||||
self.assertEqual('available', volume['status'])
|
||||
|
||||
@mock.patch.object(nova.API, 'update_server_volume')
|
||||
@mock.patch('cinder.compute.API')
|
||||
@mock.patch('cinder.volume.manager.VolumeManager.'
|
||||
'migrate_volume_completion')
|
||||
@mock.patch('cinder.db.volume_get')
|
||||
def test_migrate_volume_generic(self, volume_get,
|
||||
migrate_volume_completion,
|
||||
update_server_volume):
|
||||
nova_api):
|
||||
fake_volume_id = 'fake_volume_id'
|
||||
fake_new_volume = {'status': 'available', 'id': fake_volume_id}
|
||||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
volume_get.return_value = fake_new_volume
|
||||
update_server_volume = nova_api.return_value.update_server_volume
|
||||
volume = tests_utils.create_volume(self.context, size=1,
|
||||
host=CONF.host)
|
||||
with mock.patch.object(self.volume.driver, 'copy_volume_data') as \
|
||||
with mock.patch.object(self.volume, '_copy_volume_data') as \
|
||||
mock_copy_volume:
|
||||
self.volume._migrate_volume_generic(self.context, volume,
|
||||
host_obj, None)
|
||||
@ -4267,19 +4267,21 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
volume['id'],
|
||||
fake_new_volume['id'],
|
||||
error=False)
|
||||
self.assertFalse(update_server_volume.called)
|
||||
|
||||
@mock.patch.object(nova.API, 'update_server_volume')
|
||||
@mock.patch('cinder.compute.API')
|
||||
@mock.patch('cinder.volume.manager.VolumeManager.'
|
||||
'migrate_volume_completion')
|
||||
@mock.patch('cinder.db.volume_get')
|
||||
def test_migrate_volume_generic_attached_volume(self, volume_get,
|
||||
migrate_volume_completion,
|
||||
update_server_volume):
|
||||
nova_api):
|
||||
attached_host = 'some-host'
|
||||
fake_volume_id = 'fake_volume_id'
|
||||
fake_new_volume = {'status': 'available', 'id': fake_volume_id}
|
||||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
fake_uuid = fakes.get_fake_uuid()
|
||||
update_server_volume = nova_api.return_value.update_server_volume
|
||||
volume_get.return_value = fake_new_volume
|
||||
volume = tests_utils.create_volume(self.context, size=1,
|
||||
host=CONF.host)
|
||||
@ -4293,12 +4295,8 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
self.volume._migrate_volume_generic(self.context, volume,
|
||||
host_obj, None)
|
||||
self.assertFalse(migrate_volume_completion.called)
|
||||
with mock.patch.object(self.volume.driver, 'copy_volume_data') as \
|
||||
mock_copy_volume:
|
||||
self.volume._migrate_volume_generic(self.context, volume,
|
||||
host_obj, None)
|
||||
self.assertFalse(mock_copy_volume.called)
|
||||
self.assertFalse(migrate_volume_completion.called)
|
||||
update_server_volume.assert_called_with(self.context, fake_uuid,
|
||||
volume['id'], fake_volume_id)
|
||||
|
||||
@mock.patch.object(volume_rpcapi.VolumeAPI, 'update_migrated_volume')
|
||||
@mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume')
|
||||
@ -4312,7 +4310,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume') as \
|
||||
mock_migrate_volume,\
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data'), \
|
||||
mock.patch.object(self.volume, '_copy_volume_data'),\
|
||||
mock.patch.object(self.volume.driver, 'delete_volume') as \
|
||||
delete_volume:
|
||||
create_volume.side_effect = self._fake_create_volume
|
||||
@ -4331,7 +4329,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
|
||||
as mock_create_volume,\
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data') as \
|
||||
mock.patch.object(self.volume, '_copy_volume_data') as \
|
||||
mock_copy_volume,\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
|
||||
mock.patch.object(self.volume, 'migrate_volume_completion'),\
|
||||
@ -4455,7 +4453,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
|
||||
as mock_create_volume,\
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data') as \
|
||||
mock.patch.object(self.volume, '_copy_volume_data') as \
|
||||
mock_copy_volume,\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
|
||||
mock.patch.object(self.volume, 'migrate_volume_completion'),\
|
||||
@ -4489,16 +4487,21 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
with mock.patch.object(self.volume.driver, 'migrate_volume'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'create_volume')\
|
||||
as mock_create_volume,\
|
||||
mock.patch.object(self.volume.driver, 'copy_volume_data'),\
|
||||
mock.patch.object(volume_rpcapi.VolumeAPI, 'delete_volume'),\
|
||||
mock.patch.object(self.volume, 'migrate_volume_completion')\
|
||||
as mock_migrate_compl,\
|
||||
mock.patch.object(self.volume.driver, 'create_export'):
|
||||
mock.patch.object(self.volume.driver, 'create_export'), \
|
||||
mock.patch.object(self.volume, '_attach_volume'), \
|
||||
mock.patch.object(self.volume, '_detach_volume'), \
|
||||
mock.patch.object(os_brick.initiator.connector,
|
||||
'get_connector_properties') \
|
||||
as mock_get_connector_properties:
|
||||
|
||||
# Exception case at delete_volume
|
||||
# source_volume['migration_status'] is 'completing'
|
||||
mock_create_volume.side_effect = self._fake_create_volume
|
||||
mock_migrate_compl.side_effect = fake_migrate_volume_completion
|
||||
mock_get_connector_properties.return_value = {}
|
||||
volume = tests_utils.create_volume(self.context, size=0,
|
||||
host=CONF.host)
|
||||
host_obj = {'host': 'newhost', 'capabilities': {}}
|
||||
|
@ -409,3 +409,9 @@ class VolumeRpcAPITestCase(test.TestCase):
|
||||
host='fake_host',
|
||||
discover=True,
|
||||
version='1.29')
|
||||
|
||||
def test_remove_export(self):
|
||||
self._test_volume_api('remove_export',
|
||||
rpc_method='cast',
|
||||
volume=self.fake_volume,
|
||||
version='1.30')
|
||||
|
@ -17,6 +17,7 @@
|
||||
|
||||
|
||||
import datetime
|
||||
import io
|
||||
import mock
|
||||
|
||||
from oslo_concurrency import processutils
|
||||
@ -626,6 +627,23 @@ class CopyVolumeTestCase(test.TestCase):
|
||||
'iflag=direct', 'oflag=direct',
|
||||
'conv=sparse', run_as_root=True)
|
||||
|
||||
@mock.patch('cinder.volume.utils._copy_volume_with_file')
|
||||
def test_copy_volume_handles(self, mock_copy):
|
||||
handle1 = io.RawIOBase()
|
||||
handle2 = io.RawIOBase()
|
||||
output = volume_utils.copy_volume(handle1, handle2, 1024, 1)
|
||||
self.assertIsNone(output)
|
||||
mock_copy.assert_called_once_with(handle1, handle2, 1024)
|
||||
|
||||
@mock.patch('cinder.volume.utils._transfer_data')
|
||||
@mock.patch('cinder.volume.utils._open_volume_with_path')
|
||||
def test_copy_volume_handle_transfer(self, mock_open, mock_transfer):
|
||||
handle = io.RawIOBase()
|
||||
output = volume_utils.copy_volume('/foo/bar', handle, 1024, 1)
|
||||
self.assertIsNone(output)
|
||||
mock_transfer.assert_called_once_with(mock.ANY, mock.ANY,
|
||||
1073741824, mock.ANY)
|
||||
|
||||
|
||||
class VolumeUtilsTestCase(test.TestCase):
|
||||
def test_null_safe_str(self):
|
||||
|
@ -266,7 +266,7 @@ class RADOSClient(object):
|
||||
|
||||
class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
driver.CloneableVD, driver.CloneableImageVD, driver.SnapshotVD,
|
||||
driver.BaseVD):
|
||||
driver.MigrateVD, driver.BaseVD):
|
||||
"""Implements RADOS block device (RBD) volume commands."""
|
||||
|
||||
VERSION = '1.2.0'
|
||||
@ -1057,3 +1057,40 @@ class RBDDriver(driver.TransferVD, driver.ExtendVD,
|
||||
'size': image_size})
|
||||
raise exception.VolumeBackendAPIException(
|
||||
data=exception_message)
|
||||
|
||||
def update_migrated_volume(self, ctxt, volume, new_volume,
|
||||
original_volume_status):
|
||||
"""Return model update from RBD for migrated volume.
|
||||
|
||||
This method should rename the back-end volume name(id) on the
|
||||
destination host back to its original name(id) on the source host.
|
||||
|
||||
:param ctxt: The context used to run the method update_migrated_volume
|
||||
:param volume: The original volume that was migrated to this backend
|
||||
:param new_volume: The migration volume object that was created on
|
||||
this backend as part of the migration process
|
||||
:param original_volume_status: The status of the original volume
|
||||
:return model_update to update DB with any needed changes
|
||||
"""
|
||||
name_id = None
|
||||
provider_location = None
|
||||
|
||||
existing_name = CONF.volume_name_template % new_volume['id']
|
||||
wanted_name = CONF.volume_name_template % volume['id']
|
||||
with RADOSClient(self) as client:
|
||||
try:
|
||||
self.RBDProxy().rename(client.ioctx,
|
||||
utils.convert_str(existing_name),
|
||||
utils.convert_str(wanted_name))
|
||||
except self.rbd.ImageNotFound:
|
||||
LOG.error(_LE('Unable to rename the logical volume '
|
||||
'for volume %s.'), volume['id'])
|
||||
# If the rename fails, _name_id should be set to the new
|
||||
# volume id and provider_location should be set to the
|
||||
# one from the new volume as well.
|
||||
name_id = new_volume['_name_id'] or new_volume['id']
|
||||
provider_location = new_volume['provider_location']
|
||||
return {'_name_id': name_id, 'provider_location': provider_location}
|
||||
|
||||
def migrate_volume(self, context, volume, host):
|
||||
return (False, None)
|
||||
|
@ -47,6 +47,7 @@ from oslo_service import periodic_task
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import units
|
||||
from oslo_utils import uuidutils
|
||||
from osprofiler import profiler
|
||||
import six
|
||||
@ -190,7 +191,7 @@ def locked_snapshot_operation(f):
|
||||
class VolumeManager(manager.SchedulerDependentManager):
|
||||
"""Manages attachable block storage devices."""
|
||||
|
||||
RPC_API_VERSION = '1.29'
|
||||
RPC_API_VERSION = '1.30'
|
||||
|
||||
target = messaging.Target(version=RPC_API_VERSION)
|
||||
|
||||
@ -1332,6 +1333,21 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
LOG.info(_LI("Terminate volume connection completed successfully."),
|
||||
resource=volume_ref)
|
||||
|
||||
def remove_export(self, context, volume_id):
|
||||
"""Removes an export for a volume."""
|
||||
|
||||
utils.require_driver_initialized(self.driver)
|
||||
volume_ref = self.db.volume_get(context, volume_id)
|
||||
try:
|
||||
self.driver.remove_export(context, volume_ref)
|
||||
except Exception:
|
||||
msg = _("Remove volume export failed.")
|
||||
LOG.exception(msg, resource=volume_ref)
|
||||
raise exception.VolumeBackendAPIException(data=msg)
|
||||
|
||||
LOG.info(_LI("Remove volume export completed successfully."),
|
||||
resource=volume_ref)
|
||||
|
||||
def accept_transfer(self, context, volume_id, new_user, new_project):
|
||||
# NOTE(flaper87): Verify the driver is enabled
|
||||
# before going forward. The exception will be caught
|
||||
@ -1367,6 +1383,116 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
resource=volume_ref)
|
||||
return model_update
|
||||
|
||||
def _connect_device(self, conn):
|
||||
use_multipath = self.configuration.use_multipath_for_image_xfer
|
||||
device_scan_attempts = self.configuration.num_volume_device_scan_tries
|
||||
protocol = conn['driver_volume_type']
|
||||
connector = utils.brick_get_connector(
|
||||
protocol,
|
||||
use_multipath=use_multipath,
|
||||
device_scan_attempts=device_scan_attempts,
|
||||
conn=conn)
|
||||
vol_handle = connector.connect_volume(conn['data'])
|
||||
|
||||
root_access = True
|
||||
|
||||
if not connector.check_valid_device(vol_handle['path'], root_access):
|
||||
if isinstance(vol_handle['path'], six.string_types):
|
||||
raise exception.DeviceUnavailable(
|
||||
path=vol_handle['path'],
|
||||
reason=(_("Unable to access the backend storage via the "
|
||||
"path %(path)s.") %
|
||||
{'path': vol_handle['path']}))
|
||||
else:
|
||||
raise exception.DeviceUnavailable(
|
||||
path=None,
|
||||
reason=(_("Unable to access the backend storage via file "
|
||||
"handle.")))
|
||||
|
||||
return {'conn': conn, 'device': vol_handle, 'connector': connector}
|
||||
|
||||
def _attach_volume(self, ctxt, volume, properties, remote=False):
|
||||
status = volume['status']
|
||||
|
||||
if remote:
|
||||
rpcapi = volume_rpcapi.VolumeAPI()
|
||||
try:
|
||||
conn = rpcapi.initialize_connection(ctxt, volume, properties)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to attach volume %(vol)s."),
|
||||
{'vol': volume['id']})
|
||||
self.db.volume_update(ctxt, volume['id'],
|
||||
{'status': status})
|
||||
else:
|
||||
conn = self.initialize_connection(ctxt, volume['id'], properties)
|
||||
|
||||
return self._connect_device(conn)
|
||||
|
||||
def _detach_volume(self, ctxt, attach_info, volume, properties,
|
||||
force=False, remote=False):
|
||||
connector = attach_info['connector']
|
||||
connector.disconnect_volume(attach_info['conn']['data'],
|
||||
attach_info['device'])
|
||||
|
||||
if remote:
|
||||
rpcapi = volume_rpcapi.VolumeAPI()
|
||||
rpcapi.terminate_connection(ctxt, volume, properties, force=force)
|
||||
rpcapi.remove_export(ctxt, volume)
|
||||
else:
|
||||
try:
|
||||
self.terminate_connection(ctxt, volume['id'], properties,
|
||||
force=force)
|
||||
self.remove_export(ctxt, volume['id'])
|
||||
except Exception as err:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE('Unable to terminate volume connection: '
|
||||
'%(err)s.') % {'err': err})
|
||||
|
||||
def _copy_volume_data(self, ctxt, src_vol, dest_vol, remote=None):
|
||||
"""Copy data from src_vol to dest_vol."""
|
||||
|
||||
LOG.debug('copy_data_between_volumes %(src)s -> %(dest)s.',
|
||||
{'src': src_vol['name'], 'dest': dest_vol['name']})
|
||||
|
||||
properties = utils.brick_get_connector_properties()
|
||||
|
||||
dest_remote = remote in ['dest', 'both']
|
||||
dest_attach_info = self._attach_volume(ctxt, dest_vol, properties,
|
||||
remote=dest_remote)
|
||||
|
||||
try:
|
||||
src_remote = remote in ['src', 'both']
|
||||
src_attach_info = self._attach_volume(ctxt, src_vol, properties,
|
||||
remote=src_remote)
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to attach source volume for copy."))
|
||||
self._detach_volume(ctxt, dest_attach_info, dest_vol,
|
||||
properties, remote=dest_remote)
|
||||
|
||||
copy_error = True
|
||||
try:
|
||||
size_in_mb = int(src_vol['size']) * units.Ki # vol size is in GB
|
||||
vol_utils.copy_volume(src_attach_info['device']['path'],
|
||||
dest_attach_info['device']['path'],
|
||||
size_in_mb,
|
||||
self.configuration.volume_dd_blocksize)
|
||||
copy_error = False
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_LE("Failed to copy volume %(src)s to %(dest)s."),
|
||||
{'src': src_vol['id'], 'dest': dest_vol['id']})
|
||||
finally:
|
||||
try:
|
||||
self._detach_volume(ctxt, dest_attach_info, dest_vol,
|
||||
properties, force=copy_error,
|
||||
remote=dest_remote)
|
||||
finally:
|
||||
self._detach_volume(ctxt, src_attach_info, src_vol,
|
||||
properties, force=copy_error,
|
||||
remote=src_remote)
|
||||
|
||||
def _migrate_volume_generic(self, ctxt, volume, host, new_type_id):
|
||||
rpcapi = volume_rpcapi.VolumeAPI()
|
||||
|
||||
@ -1421,8 +1547,7 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
try:
|
||||
attachments = volume['volume_attachment']
|
||||
if not attachments:
|
||||
self.driver.copy_volume_data(ctxt, volume, new_volume,
|
||||
remote='dest')
|
||||
self._copy_volume_data(ctxt, volume, new_volume, remote='dest')
|
||||
# The above call is synchronous so we complete the migration
|
||||
self.migrate_volume_completion(ctxt, volume['id'],
|
||||
new_volume['id'],
|
||||
|
@ -75,6 +75,7 @@ class VolumeAPI(object):
|
||||
1.27 - Adds support for replication V2
|
||||
1.28 - Adds manage_existing_snapshot
|
||||
1.29 - Adds get_capabilities.
|
||||
1.30 - Adds remove_export
|
||||
"""
|
||||
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
@ -84,7 +85,7 @@ class VolumeAPI(object):
|
||||
target = messaging.Target(topic=CONF.volume_topic,
|
||||
version=self.BASE_RPC_API_VERSION)
|
||||
serializer = objects_base.CinderObjectSerializer()
|
||||
self.client = rpc.get_client(target, '1.29', serializer=serializer)
|
||||
self.client = rpc.get_client(target, '1.30', serializer=serializer)
|
||||
|
||||
def create_consistencygroup(self, ctxt, group, host):
|
||||
new_host = utils.extract_host(host)
|
||||
@ -197,6 +198,11 @@ class VolumeAPI(object):
|
||||
return cctxt.call(ctxt, 'terminate_connection', volume_id=volume['id'],
|
||||
connector=connector, force=force)
|
||||
|
||||
def remove_export(self, ctxt, volume):
|
||||
new_host = utils.extract_host(volume['host'])
|
||||
cctxt = self.client.prepare(server=new_host, version='1.30')
|
||||
cctxt.cast(ctxt, 'remove_export', volume_id=volume['id'])
|
||||
|
||||
def publish_service_capabilities(self, ctxt):
|
||||
cctxt = self.client.prepare(fanout=True, version='1.2')
|
||||
cctxt.cast(ctxt, 'publish_service_capabilities')
|
||||
|
@ -18,22 +18,26 @@
|
||||
import ast
|
||||
import math
|
||||
import re
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from Crypto.Random import random
|
||||
import eventlet
|
||||
from eventlet import tpool
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import strutils
|
||||
from oslo_utils import timeutils
|
||||
from oslo_utils import units
|
||||
import six
|
||||
from six.moves import range
|
||||
|
||||
from cinder.brick.local_dev import lvm as brick_lvm
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder.i18n import _LI, _LW
|
||||
from cinder.i18n import _, _LI, _LW, _LE
|
||||
from cinder import rpc
|
||||
from cinder import utils
|
||||
from cinder.volume import throttling
|
||||
@ -301,8 +305,9 @@ def check_for_odirect_support(src, dest, flag='oflag=direct'):
|
||||
return False
|
||||
|
||||
|
||||
def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False,
|
||||
execute=utils.execute, ionice=None, sparse=False):
|
||||
def _copy_volume_with_path(prefix, srcstr, deststr, size_in_m, blocksize,
|
||||
sync=False, execute=utils.execute, ionice=None,
|
||||
sparse=False):
|
||||
# Use O_DIRECT to avoid thrashing the system buffer cache
|
||||
extra_flags = []
|
||||
if check_for_odirect_support(srcstr, deststr, 'iflag=direct'):
|
||||
@ -354,15 +359,107 @@ def _copy_volume(prefix, srcstr, deststr, size_in_m, blocksize, sync=False,
|
||||
{'size_in_m': size_in_m, 'mbps': mbps})
|
||||
|
||||
|
||||
def copy_volume(srcstr, deststr, size_in_m, blocksize, sync=False,
|
||||
def _open_volume_with_path(path, mode):
|
||||
try:
|
||||
with utils.temporary_chown(path):
|
||||
handle = open(path, mode)
|
||||
return handle
|
||||
except Exception:
|
||||
LOG.error(_LE("Failed to open volume from %(path)s."), {'path': path})
|
||||
|
||||
|
||||
def _transfer_data(src, dest, length, chunk_size):
|
||||
"""Transfer data between files (Python IO objects)."""
|
||||
|
||||
chunks = int(math.ceil(length / chunk_size))
|
||||
remaining_length = length
|
||||
|
||||
LOG.debug("%(chunks)s chunks of %(bytes)s bytes to be transferred.",
|
||||
{'chunks': chunks, 'bytes': chunk_size})
|
||||
|
||||
for chunk in xrange(0, chunks):
|
||||
before = time.time()
|
||||
data = tpool.execute(src.read, min(chunk_size, remaining_length))
|
||||
|
||||
# If we have reached end of source, discard any extraneous bytes from
|
||||
# destination volume if trim is enabled and stop writing.
|
||||
if data == '':
|
||||
break
|
||||
|
||||
tpool.execute(dest.write, data)
|
||||
remaining_length -= len(data)
|
||||
delta = (time.time() - before)
|
||||
rate = (chunk_size / delta) / units.Ki
|
||||
LOG.debug("Transferred chunk %(chunk)s of %(chunks)s (%(rate)dK/s).",
|
||||
{'chunk': chunk + 1, 'chunks': chunks, 'rate': rate})
|
||||
|
||||
# yield to any other pending operations
|
||||
eventlet.sleep(0)
|
||||
|
||||
tpool.execute(dest.flush)
|
||||
|
||||
|
||||
def _copy_volume_with_file(src, dest, size_in_m):
|
||||
src_handle = src
|
||||
if isinstance(src, six.string_types):
|
||||
src_handle = _open_volume_with_path(src, 'rb')
|
||||
|
||||
dest_handle = dest
|
||||
if isinstance(dest, six.string_types):
|
||||
dest_handle = _open_volume_with_path(dest, 'wb')
|
||||
|
||||
if not src_handle:
|
||||
raise exception.DeviceUnavailable(
|
||||
_("Failed to copy volume, source device unavailable."))
|
||||
|
||||
if not dest_handle:
|
||||
raise exception.DeviceUnavailable(
|
||||
_("Failed to copy volume, destination device unavailable."))
|
||||
|
||||
start_time = timeutils.utcnow()
|
||||
|
||||
_transfer_data(src_handle, dest_handle, size_in_m * units.Mi, units.Mi * 4)
|
||||
|
||||
duration = max(1, timeutils.delta_seconds(start_time, timeutils.utcnow()))
|
||||
|
||||
if isinstance(src, six.string_types):
|
||||
src_handle.close()
|
||||
if isinstance(dest, six.string_types):
|
||||
dest_handle.close()
|
||||
|
||||
mbps = (size_in_m / duration)
|
||||
LOG.info(_LI("Volume copy completed (%(size_in_m).2f MB at "
|
||||
"%(mbps).2f MB/s)."),
|
||||
{'size_in_m': size_in_m, 'mbps': mbps})
|
||||
|
||||
|
||||
def copy_volume(src, dest, size_in_m, blocksize, sync=False,
|
||||
execute=utils.execute, ionice=None, throttle=None,
|
||||
sparse=False):
|
||||
if not throttle:
|
||||
throttle = throttling.Throttle.get_default()
|
||||
with throttle.subcommand(srcstr, deststr) as throttle_cmd:
|
||||
_copy_volume(throttle_cmd['prefix'], srcstr, deststr,
|
||||
size_in_m, blocksize, sync=sync,
|
||||
execute=execute, ionice=ionice, sparse=sparse)
|
||||
"""Copy data from the source volume to the destination volume.
|
||||
|
||||
The parameters 'src' and 'dest' are both typically of type str, which
|
||||
represents the path to each volume on the filesystem. Connectors can
|
||||
optionally return a volume handle of type RawIOBase for volumes that are
|
||||
not available on the local filesystem for open/close operations.
|
||||
|
||||
If either 'src' or 'dest' are not of type str, then they are assumed to be
|
||||
of type RawIOBase or any derivative that supports file operations such as
|
||||
read and write. In this case, the handles are treated as file handles
|
||||
instead of file paths and, at present moment, throttling is unavailable.
|
||||
"""
|
||||
|
||||
if (isinstance(src, six.string_types) and
|
||||
isinstance(dest, six.string_types)):
|
||||
if not throttle:
|
||||
throttle = throttling.Throttle.get_default()
|
||||
with throttle.subcommand(src, dest) as throttle_cmd:
|
||||
_copy_volume_with_path(throttle_cmd['prefix'], src, dest,
|
||||
size_in_m, blocksize, sync=sync,
|
||||
execute=execute, ionice=ionice,
|
||||
sparse=sparse)
|
||||
else:
|
||||
_copy_volume_with_file(src, dest, size_in_m)
|
||||
|
||||
|
||||
def clear_volume(volume_size, volume_path, volume_clear=None,
|
||||
|
Loading…
Reference in New Issue
Block a user