Fix backup temp snapshot path on remote node

The backup using temp snapshot code path was broken
in Mitaka. The following patches fixed it on the local
node so that the fix can be backported:

  https://review.openstack.org/#/c/321943/
  https://review.openstack.org/#/c/331835/

Continuing with the effort, this patch tries to
address the problem on the remote node so that
backup using temp snapshot code path will work
when backup and volume service are running on
different nodes.

Co-Authored-By: Accela Zhao <accelazh@gmail.com>
Closes-Bug: #1596305
Change-Id: I361458adbc1851a99a9bcffe82a02ba96a9fd460
This commit is contained in:
xing-yang 2016-05-20 03:59:14 -04:00
parent 2ef76fb251
commit ea7d0860c5
6 changed files with 259 additions and 242 deletions

View File

@ -90,101 +90,10 @@ class BackupManager(manager.ThreadPoolManager):
self.service = importutils.import_module(self.driver_name)
self.az = CONF.storage_availability_zone
self.volume_managers = {}
# TODO(xyang): If backup_use_same_host is True, we'll find
# the volume backend on the backup node. This allows us
# to use a temp snapshot to backup an in-use volume if the
# driver supports it. This code should go away when we add
# support for backing up in-use volume using a temp snapshot
# on a remote node.
if CONF.backup_use_same_host:
self._setup_volume_drivers()
self.backup_rpcapi = backup_rpcapi.BackupAPI()
self.volume_rpcapi = volume_rpcapi.VolumeAPI()
super(BackupManager, self).__init__(*args, **kwargs)
def _get_volume_backend(self, host=None, allow_null_host=False):
if host is None:
if not allow_null_host:
msg = _("NULL host not allowed for volume backend lookup.")
raise exception.BackupFailedToGetVolumeBackend(msg)
else:
LOG.debug("Checking hostname '%s' for backend info.", host)
# NOTE(xyang): If host='myhost@lvmdriver', backend='lvmdriver'
# by the logic below. This is different from extract_host.
# vol_utils.extract_host(host, 'backend')='myhost@lvmdriver'.
part = host.partition('@')
if (part[1] == '@') and (part[2] != ''):
backend = part[2]
LOG.debug("Got backend '%s'.", backend)
return backend
LOG.info("Backend not found in hostname (%s) so using default.",
host)
if 'default' not in self.volume_managers:
# For multi-backend we just pick the top of the list.
return next(iter(self.volume_managers))
return 'default'
def _get_manager(self, backend):
LOG.debug("Manager requested for volume_backend '%s'.",
backend)
if backend is None:
LOG.debug("Fetching default backend.")
backend = self._get_volume_backend(allow_null_host=True)
if backend not in self.volume_managers:
msg = (_("Volume manager for backend '%s' does not exist.") %
(backend))
raise exception.BackupFailedToGetVolumeBackend(msg)
return self.volume_managers[backend]
def _get_driver(self, backend=None):
LOG.debug("Driver requested for volume_backend '%s'.",
backend)
if backend is None:
LOG.debug("Fetching default backend.")
backend = self._get_volume_backend(allow_null_host=True)
mgr = self._get_manager(backend)
mgr.driver.db = self.db
return mgr.driver
def _setup_volume_drivers(self):
if CONF.enabled_backends:
for backend in filter(None, CONF.enabled_backends):
host = "%s@%s" % (CONF.host, backend)
mgr = importutils.import_object(CONF.volume_manager,
host=host,
service_name=backend)
config = mgr.configuration
backend_name = config.safe_get('volume_backend_name')
LOG.debug("Registering backend %(backend)s (host=%(host)s "
"backend_name=%(backend_name)s).",
{'backend': backend, 'host': host,
'backend_name': backend_name})
self.volume_managers[backend] = mgr
else:
default = importutils.import_object(CONF.volume_manager)
LOG.debug("Registering default backend %s.", default)
self.volume_managers['default'] = default
def _init_volume_driver(self, ctxt, driver):
LOG.info("Starting volume driver %(driver_name)s (%(version)s).",
{'driver_name': driver.__class__.__name__,
'version': driver.get_version()})
try:
driver.do_setup(ctxt)
driver.check_for_setup_error()
except Exception:
LOG.exception("Error encountered during initialization of "
"driver: %(name)s.",
{'name': driver.__class__.__name__})
# we don't want to continue since we failed
# to initialize the driver correctly.
return
driver.set_initialized()
@property
def driver_name(self):
"""This function maps old backup services to backup drivers."""
@ -207,9 +116,6 @@ class BackupManager(manager.ThreadPoolManager):
"""Run initialization needed for a standalone service."""
ctxt = context.get_admin_context()
for mgr in self.volume_managers.values():
self._init_volume_driver(ctxt, mgr.driver)
try:
self._cleanup_incomplete_backup_operations(ctxt)
except Exception:
@ -317,12 +223,7 @@ class BackupManager(manager.ThreadPoolManager):
try:
temp_snapshot = objects.Snapshot.get_by_id(
ctxt, backup.temp_snapshot_id)
volume = objects.Volume.get_by_id(
ctxt, backup.volume_id)
# The temp snapshot should be deleted directly through the
# volume driver, not through the volume manager.
self.volume_rpcapi.delete_snapshot(ctxt, temp_snapshot,
volume.host)
self.volume_rpcapi.delete_snapshot(ctxt, temp_snapshot)
except exception.SnapshotNotFound:
LOG.debug("Could not find temp snapshot %(snap)s to clean "
"up for backup %(backup)s.",
@ -932,18 +833,13 @@ class BackupManager(manager.ThreadPoolManager):
backup_service = self.service.get_backup_driver(context)
return backup_service.support_force_delete
def _attach_device(self, context, backup_device,
def _attach_device(self, ctxt, backup_device,
properties, is_snapshot=False):
"""Attach backup device."""
if not is_snapshot:
return self._attach_volume(context, backup_device, properties)
return self._attach_volume(ctxt, backup_device, properties)
else:
volume = self.db.volume_get(context, backup_device.volume_id)
host = volume_utils.extract_host(volume['host'], 'backend')
backend = self._get_volume_backend(host=host)
rc = self._get_driver(backend)._attach_snapshot(
context, backup_device, properties)
return rc
return self._attach_snapshot(ctxt, backup_device, properties)
def _attach_volume(self, context, volume, properties):
"""Attach a volume."""
@ -965,6 +861,24 @@ class BackupManager(manager.ThreadPoolManager):
"acceptable.",
{'volume_id', volume.id})
def _attach_snapshot(self, ctxt, snapshot, properties):
"""Attach a snapshot."""
try:
conn = self.volume_rpcapi.initialize_connection_snapshot(
ctxt, snapshot, properties)
return self._connect_device(conn)
except Exception:
with excutils.save_and_reraise_exception():
try:
self.volume_rpcapi.terminate_connection_snapshot(
ctxt, snapshot, properties, force=True)
except Exception:
LOG.warning("Failed to terminate the connection "
"of snapshot %(snapshot_id)s, but it is "
"acceptable.",
{'snapshot_id', snapshot.id})
def _connect_device(self, conn):
"""Establish connection to device."""
use_multipath = CONF.use_multipath_for_image_xfer
@ -979,20 +893,18 @@ class BackupManager(manager.ThreadPoolManager):
return {'conn': conn, 'device': vol_handle, 'connector': connector}
def _detach_device(self, context, attach_info, device,
def _detach_device(self, ctxt, attach_info, device,
properties, is_snapshot=False, force=False):
"""Disconnect the volume or snapshot from the host. """
if not is_snapshot:
connector = attach_info['connector']
connector.disconnect_volume(attach_info['conn']['data'],
attach_info['device'])
rpcapi = self.volume_rpcapi
rpcapi.terminate_connection(context, device, properties,
if not is_snapshot:
rpcapi.terminate_connection(ctxt, device, properties,
force=force)
rpcapi.remove_export(context, device)
rpcapi.remove_export(ctxt, device)
else:
volume = self.db.volume_get(context, device.volume_id)
host = volume_utils.extract_host(volume['host'], 'backend')
backend = self._get_volume_backend(host=host)
self._get_driver(backend)._detach_snapshot(
context, attach_info, device, properties, force)
rpcapi.terminate_connection_snapshot(ctxt, device,
properties, force=force)
rpcapi.remove_export_snapshot(ctxt, device)

View File

@ -38,7 +38,7 @@ from cinder import test
from cinder.tests import fake_driver
from cinder.tests.unit.backup import fake_service_with_verify as fake_service
from cinder.tests.unit import utils
from cinder.volume import driver
from cinder.volume import rpcapi as volume_rpcapi
CONF = cfg.CONF
@ -257,12 +257,9 @@ class BackupTestCase(BaseBackupTest):
mock_get_admin_context.side_effect = get_admin_context
self.volume = importutils.import_object(CONF.volume_manager)
self.backup_mgr.volume_managers = {'driver': self.volume}
self.backup_mgr.init_host()
mock_setup.assert_called_once_with(self.ctxt)
mock_check.assert_called_once_with()
mock_set_initialized.assert_called_once_with()
self.assertEqual({}, self.backup_mgr.volume_managers)
vol1 = db.volume_get(self.ctxt, vol1_id)
self.assertEqual('available', vol1['status'])
@ -347,13 +344,6 @@ class BackupTestCase(BaseBackupTest):
def test_is_working(self):
self.assertTrue(self.backup_mgr.is_working())
def test_get_volume_backend(self):
backup_mgr = manager.BackupManager()
backup_mgr.volume_managers = {'backend1': 'backend1',
'backend2': 'backend2'}
backend = backup_mgr._get_volume_backend(allow_null_host=True)
self.assertIn(backend, backup_mgr.volume_managers)
def test_cleanup_incomplete_backup_operations_with_exceptions(self):
"""Test cleanup resilience in the face of exceptions."""
@ -710,7 +700,6 @@ class BackupTestCase(BaseBackupTest):
mock_get_conn):
"""Test backup in-use volume using temp snapshot."""
self.override_config('backup_use_same_host', True)
self.backup_mgr._setup_volume_drivers()
vol_size = 1
vol_id = self._create_volume_db_entry(size=vol_size,
previous_status='in-use')
@ -728,29 +717,34 @@ class BackupTestCase(BaseBackupTest):
'device': {'path': '/dev/null'},
'conn': {'data': {}},
'connector': fake.FakeConnector(None)}
mock_detach_snapshot = self.mock_object(driver.BaseVD,
'_detach_snapshot')
mock_attach_snapshot = self.mock_object(driver.BaseVD,
'_attach_snapshot')
mock_attach_snapshot.return_value = attach_info
mock_terminate_connection_snapshot = self.mock_object(
volume_rpcapi.VolumeAPI,
'terminate_connection_snapshot')
mock_initialize_connection_snapshot = self.mock_object(
volume_rpcapi.VolumeAPI,
'initialize_connection_snapshot')
mock_connect_device = self.mock_object(
manager.BackupManager,
'_connect_device')
mock_connect_device.return_value = attach_info
properties = {}
mock_get_conn.return_value = properties
mock_open.return_value = open('/dev/null', 'rb')
self.backup_mgr.create_backup(self.ctxt, backup)
mock_temporary_chown.assert_called_once_with('/dev/null')
mock_attach_snapshot.assert_called_once_with(self.ctxt, snap,
properties)
mock_initialize_connection_snapshot.assert_called_once_with(
self.ctxt, snap, properties)
mock_get_backup_device.assert_called_once_with(self.ctxt, backup, vol)
mock_get_conn.assert_called_once_with()
mock_detach_snapshot.assert_called_once_with(self.ctxt, attach_info,
snap, properties, False)
mock_terminate_connection_snapshot.assert_called_once_with(
self.ctxt, snap, properties, force=False)
vol = objects.Volume.get_by_id(self.ctxt, vol_id)
self.assertEqual('in-use', vol['status'])
self.assertEqual('backing-up', vol['previous_status'])
backup = db.backup_get(self.ctxt, backup.id)
self.assertEqual(fields.BackupStatus.AVAILABLE, backup['status'])
self.assertEqual(vol_size, backup['size'])
backup = objects.Backup.get_by_id(self.ctxt, backup.id)
self.assertEqual(fields.BackupStatus.AVAILABLE, backup.status)
self.assertEqual(vol_size, backup.size)
@mock.patch.object(fake_driver.FakeLoggingVolumeDriver, 'create_snapshot')
def test_create_temp_snapshot(self, mock_create_snapshot):

View File

@ -592,3 +592,40 @@ class VolumeRPCAPITestCase(test.RPCAPITestCase):
service=service,
log_request='log_request',
version='3.12')
@ddt.data(None, 'mycluster')
def test_initialize_connection_snapshot(self, cluster_name):
self._change_cluster_name(self.fake_snapshot.volume, cluster_name)
self._test_rpc_api('initialize_connection_snapshot',
rpc_method='call',
server=(cluster_name or
self.fake_snapshot.volume.host),
connector='fake_connector',
snapshot=self.fake_snapshot,
expected_kwargs_diff={
'snapshot_id': self.fake_snapshot.id},
version='3.13')
@ddt.data(None, 'mycluster')
def test_terminate_connection_snapshot(self, cluster_name):
self._change_cluster_name(self.fake_snapshot.volume, cluster_name)
self._test_rpc_api('terminate_connection_snapshot',
rpc_method='call',
server=(cluster_name or
self.fake_snapshot.volume.host),
snapshot=self.fake_snapshot,
connector='fake_connector',
force=False,
retval=None,
expected_kwargs_diff={
'snapshot_id': self.fake_snapshot.id},
version='3.13')
def test_remove_export_snapshot(self):
self._test_rpc_api('remove_export_snapshot',
rpc_method='cast',
server=self.fake_volume_obj.host,
snapshot=self.fake_snapshot,
expected_kwargs_diff={
'snapshot_id': self.fake_snapshot.id},
version='3.13')

View File

@ -466,42 +466,20 @@ class BaseVD(object):
raise exception.RemoveExportException(volume=volume['id'],
reason=ex)
def _detach_snapshot(self, context, attach_info, snapshot, properties,
force=False, remote=False):
def _detach_snapshot(self, ctxt, snapshot, properties, force=False):
"""Disconnect the snapshot from the host."""
# Use Brick's code to do attach/detach
connector = attach_info['connector']
connector.disconnect_volume(attach_info['conn']['data'],
attach_info['device'])
# NOTE(xyang): This method is introduced for non-disruptive backup.
# Currently backup service has to be on the same node as the volume
# driver. Therefore it is not possible to call a volume driver on a
# remote node. In the future, if backup can be done from a remote
# node, this function can be modified to allow RPC calls. The remote
# flag in the interface is for anticipation that it will be enabled
# in the future.
if remote:
LOG.error("Detaching snapshot from a remote node "
"is not supported.")
raise exception.NotSupportedOperation(
operation=_("detach snapshot from remote node"))
else:
# Call local driver's terminate_connection and remove export.
# NOTE(avishay) This is copied from the manager's code - need to
# clean this up in the future.
try:
self.terminate_connection_snapshot(snapshot, properties,
force=force)
except Exception as err:
err_msg = (_('Unable to terminate volume connection: %(err)s')
err_msg = (_('Unable to terminate snapshot connection: %(err)s')
% {'err': six.text_type(err)})
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
try:
LOG.debug("Snapshot %s: removing export.", snapshot.id)
self.remove_export_snapshot(context, snapshot)
self.remove_export_snapshot(ctxt, snapshot)
except Exception as ex:
LOG.exception("Error detaching snapshot %(snapshot)s, "
"due to remove export failure.",
@ -1016,28 +994,12 @@ class BaseVD(object):
return (attach_info, volume)
def _attach_snapshot(self, context, snapshot, properties, remote=False):
def _attach_snapshot(self, ctxt, snapshot, properties):
"""Attach the snapshot."""
# NOTE(xyang): This method is introduced for non-disruptive backup.
# Currently backup service has to be on the same node as the volume
# driver. Therefore it is not possible to call a volume driver on a
# remote node. In the future, if backup can be done from a remote
# node, this function can be modified to allow RPC calls. The remote
# flag in the interface is for anticipation that it will be enabled
# in the future.
if remote:
LOG.error("Attaching snapshot from a remote node "
"is not supported.")
raise exception.NotSupportedOperation(
operation=_("attach snapshot from remote node"))
else:
# Call local driver's create_export and initialize_connection.
# NOTE(avishay) This is copied from the manager's code - need to
# clean this up in the future.
model_update = None
try:
LOG.debug("Snapshot %s: creating export.", snapshot.id)
model_update = self.create_export_snapshot(context, snapshot,
model_update = self.create_export_snapshot(ctxt, snapshot,
properties)
if model_update:
snapshot.provider_location = model_update.get(
@ -1064,7 +1026,7 @@ class BaseVD(object):
{'err': six.text_type(err)})
LOG.error(err_msg)
LOG.debug("Cleaning up failed connect initialization.")
self.remove_export_snapshot(context, snapshot)
self.remove_export_snapshot(ctxt, snapshot)
except Exception as ex:
ex_msg = (_('Error encountered during cleanup '
'of a failed attach: %(ex)s') %
@ -1072,7 +1034,7 @@ class BaseVD(object):
LOG.error(err_msg)
raise exception.VolumeBackendAPIException(data=ex_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
return self._connect_device(conn)
return conn
def _connect_device(self, conn):
# Use Brick's code to do attach/detach
@ -1129,7 +1091,7 @@ class BaseVD(object):
"""
backup_device = None
is_snapshot = False
if self.backup_use_temp_snapshot() and CONF.backup_use_same_host:
if self.backup_use_temp_snapshot():
(backup_device, is_snapshot) = (
self._get_backup_volume_temp_snapshot(context, backup))
else:

View File

@ -1543,6 +1543,66 @@ class VolumeManager(manager.CleanableManager,
resource=volume)
return conn_info
def initialize_connection_snapshot(self, ctxt, snapshot_id, connector):
utils.require_driver_initialized(self.driver)
snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
try:
self.driver.validate_connector(connector)
except exception.InvalidConnectorException as err:
raise exception.InvalidInput(reason=six.text_type(err))
except Exception as err:
err_msg = (_("Validate snapshot connection failed "
"(error: %(err)s).") % {'err': six.text_type(err)})
LOG.exception(err_msg, resource=snapshot)
raise exception.VolumeBackendAPIException(data=err_msg)
model_update = None
try:
LOG.debug("Snapshot %s: creating export.", snapshot.id)
model_update = self.driver.create_export_snapshot(
ctxt.elevated(), snapshot, connector)
if model_update:
snapshot.provider_location = model_update.get(
'provider_location', None)
snapshot.provider_auth = model_update.get(
'provider_auth', None)
snapshot.save()
except exception.CinderException as ex:
msg = _("Create export of snapshot failed (%s)") % ex.msg
LOG.exception(msg, resource=snapshot)
raise exception.VolumeBackendAPIException(data=msg)
try:
if model_update:
snapshot.update(model_update)
snapshot.save()
except exception.CinderException as ex:
LOG.exception("Model update failed.", resource=snapshot)
raise exception.ExportFailure(reason=six.text_type(ex))
try:
conn = self.driver.initialize_connection_snapshot(snapshot,
connector)
except Exception as err:
try:
err_msg = (_('Unable to fetch connection information from '
'backend: %(err)s') %
{'err': six.text_type(err)})
LOG.error(err_msg)
LOG.debug("Cleaning up failed connect initialization.")
self.driver.remove_export_snapshot(ctxt.elevated(), snapshot)
except Exception as ex:
ex_msg = (_('Error encountered during cleanup '
'of a failed attach: %(ex)s') %
{'ex': six.text_type(ex)})
LOG.error(ex_msg)
raise exception.VolumeBackendAPIException(data=ex_msg)
raise exception.VolumeBackendAPIException(data=err_msg)
LOG.info("Initialize snapshot connection completed successfully.",
resource=snapshot)
return conn
def terminate_connection(self, context, volume_id, connector, force=False):
"""Cleanup connection from host represented by connector.
@ -1565,6 +1625,22 @@ class VolumeManager(manager.CleanableManager,
LOG.info("Terminate volume connection completed successfully.",
resource=volume_ref)
def terminate_connection_snapshot(self, ctxt, snapshot_id,
connector, force=False):
utils.require_driver_initialized(self.driver)
snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
try:
self.driver.terminate_connection_snapshot(snapshot, connector,
force=force)
except Exception as err:
err_msg = (_('Terminate snapshot connection failed: %(err)s')
% {'err': six.text_type(err)})
LOG.exception(err_msg, resource=snapshot)
raise exception.VolumeBackendAPIException(data=err_msg)
LOG.info("Terminate snapshot connection completed successfully.",
resource=snapshot)
def remove_export(self, context, volume_id):
"""Removes an export for a volume."""
utils.require_driver_initialized(self.driver)
@ -1579,6 +1655,20 @@ class VolumeManager(manager.CleanableManager,
LOG.info("Remove volume export completed successfully.",
resource=volume_ref)
def remove_export_snapshot(self, ctxt, snapshot_id):
"""Removes an export for a snapshot."""
utils.require_driver_initialized(self.driver)
snapshot = objects.Snapshot.get_by_id(ctxt, snapshot_id)
try:
self.driver.remove_export_snapshot(ctxt, snapshot)
except Exception:
msg = _("Remove snapshot export failed.")
LOG.exception(msg, resource=snapshot)
raise exception.VolumeBackendAPIException(data=msg)
LOG.info("Remove snapshot export completed successfully.",
resource=snapshot)
def accept_transfer(self, context, volume_id, new_user, new_project):
# NOTE(flaper87): Verify the driver is enabled
# before going forward. The exception will be caught

View File

@ -128,9 +128,11 @@ class VolumeAPI(rpc.RPCAPI):
create_cgsnapshot, delete_cgsnapshot, update_consistencygroup,
and create_consistencygroup_from_src.
3.12 - Adds set_log_levels and get_log_levels
3.13 - Add initialize_connection_snapshot,
terminate_connection_snapshot, and remove_export_snapshot.
"""
RPC_API_VERSION = '3.12'
RPC_API_VERSION = '3.13'
RPC_DEFAULT_VERSION = '3.0'
TOPIC = constants.VOLUME_TOPIC
BINARY = 'cinder-volume'
@ -399,6 +401,26 @@ class VolumeAPI(rpc.RPCAPI):
cctxt.cast(ctxt, 'delete_group_snapshot',
group_snapshot=group_snapshot)
@rpc.assert_min_rpc_version('3.13')
def initialize_connection_snapshot(self, ctxt, snapshot, connector):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
return cctxt.call(ctxt, 'initialize_connection_snapshot',
snapshot_id=snapshot.id,
connector=connector)
@rpc.assert_min_rpc_version('3.13')
def terminate_connection_snapshot(self, ctxt, snapshot, connector,
force=False):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
return cctxt.call(ctxt, 'terminate_connection_snapshot',
snapshot_id=snapshot.id,
connector=connector, force=force)
@rpc.assert_min_rpc_version('3.13')
def remove_export_snapshot(self, ctxt, snapshot):
cctxt = self._get_cctxt(snapshot.service_topic_queue, version='3.13')
cctxt.cast(ctxt, 'remove_export_snapshot', snapshot_id=snapshot.id)
@rpc.assert_min_rpc_version('3.9')
def attachment_update(self, ctxt, vref, connector, attachment_id):
version = self._compat_ver('3.9')