DS8K: support clone volume asynchronously

when cloning volume in DS8K, driver doesn't need to wait until
flashcopy finished in some cases, so this patch provides an option
in volume metadata that can be used to tell driver to wait for
flashcopy completed or not.

Implements: blueprint ds8k-async-clone
Change-Id: I51829e7667d3ac148978e1799cecfea1920f838f
This commit is contained in:
Jia Min 2017-05-23 04:36:39 -07:00 committed by jiamin
parent 3df3dec2de
commit 1d9d7c00b4
7 changed files with 252 additions and 73 deletions

View File

@ -44,9 +44,12 @@ from cinder.volume.drivers.ibm.ibm_storage import ds8k_restclient as restclient
mock_logger.stop()
TEST_VOLUME_ID = '0001'
TEST_VOLUME_ID_2 = '0002'
TEST_HOST_ID = 'H1'
TEST_VOLUME_BACKEND_NAME = 'ds8k_backend'
TEST_GROUP_HOST = 'test_host@' + TEST_VOLUME_BACKEND_NAME + '#fakepool'
TEST_HOST_1 = 'test_host@' + TEST_VOLUME_BACKEND_NAME
TEST_HOST_2 = TEST_GROUP_HOST
TEST_LUN_ID = '00'
TEST_POOLS_STR = 'P0,P1'
TEST_POOL_ID_1 = 'P0'
@ -780,6 +783,7 @@ FAKE_FAILBACK_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_FAILOVER_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_CHANGE_VOLUME_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_POST_FLASHCOPIES_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_DELETE_FLASHCOPIES_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_POST_UNFREEZE_FLASHCOPIES_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_CREATE_LCU_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_ASSIGN_HOST_PORT_RESPONSE = FAKE_GENERIC_RESPONSE
@ -809,12 +813,16 @@ FAKE_REST_API_RESPONSES = {
FAKE_CHANGE_VOLUME_RESPONSE,
TEST_TARGET_DS8K_IP + '/volumes/' + TEST_VOLUME_ID + '/put':
FAKE_CHANGE_VOLUME_RESPONSE,
TEST_SOURCE_DS8K_IP + '/volumes/' + TEST_VOLUME_ID_2 + '/get':
FAKE_GET_VOLUME_RESPONSE,
TEST_SOURCE_DS8K_IP + '/volumes/delete':
FAKE_DELETE_VOLUME_RESPONSE,
TEST_SOURCE_DS8K_IP + '/volumes/' + TEST_VOLUME_ID + '/delete':
FAKE_DELETE_VOLUME_RESPONSE,
TEST_TARGET_DS8K_IP + '/volumes/' + TEST_VOLUME_ID + '/delete':
FAKE_DELETE_VOLUME_RESPONSE,
TEST_SOURCE_DS8K_IP + '/volumes/' + TEST_VOLUME_ID_2 + '/delete':
FAKE_DELETE_VOLUME_RESPONSE,
TEST_SOURCE_DS8K_IP + '/lss/get':
FAKE_GET_LSS_RESPONSE,
TEST_TARGET_DS8K_IP + '/lss/get':
@ -887,6 +895,9 @@ FAKE_REST_API_RESPONSES = {
FAKE_PAUSE_RESPONSE,
TEST_SOURCE_DS8K_IP + '/cs/flashcopies/post':
FAKE_POST_FLASHCOPIES_RESPONSE,
TEST_SOURCE_DS8K_IP + '/cs/flashcopies/' + TEST_VOLUME_ID + ":" +
TEST_VOLUME_ID_2 + '/delete':
FAKE_DELETE_FLASHCOPIES_RESPONSE,
TEST_SOURCE_DS8K_IP + '/cs/flashcopies/unfreeze/post':
FAKE_POST_UNFREEZE_FLASHCOPIES_RESPONSE,
TEST_SOURCE_DS8K_IP + '/cs/pprcs/physical_links/get':
@ -1017,7 +1028,7 @@ class FakeDS8KProxy(ds8kproxy.DS8KProxy):
def __init__(self, storage_info, logger, exception,
driver=None, active_backend_id=None,
HTTPConnectorObject=None):
HTTPConnectorObject=None, host=TEST_HOST_1):
with mock.patch.object(proxy.IBMStorageProxy,
'_get_safely_from_configuration') as get_conf:
get_conf.side_effect = [{}, False]
@ -1031,6 +1042,7 @@ class FakeDS8KProxy(ds8kproxy.DS8KProxy):
self._active_backend_id = active_backend_id
self.configuration = driver.configuration
self.consisgroup_cache = {}
self._host = host
self.setup(None)
def setup(self, context):
@ -1049,6 +1061,7 @@ class FakeDS8KProxy(ds8kproxy.DS8KProxy):
# set up replication target
if repl_devices:
self._do_replication_setup(repl_devices, self._helper)
self._check_async_cloned_volumes()
def _do_replication_setup(self, devices, src_helper):
self._replication = FakeReplication(src_helper, devices[0])
@ -1087,7 +1100,7 @@ class DS8KProxyTest(test.TestCase):
def _create_volume(self, **kwargs):
properties = {
'host': 'openstack@ds8k_backend#ds8k_pool',
'host': TEST_HOST_2,
'size': 1
}
for p in properties.keys():
@ -1901,6 +1914,97 @@ class DS8KProxyTest(test.TestCase):
self.assertTrue(mock_delete_lun_by_id.called)
self.assertTrue(mock_delete_lun.called)
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
def test_async_clone_volume(self, mock_get_flashcopy):
"""clone the volume asynchronously."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
src_vol = self._create_volume(volume_type_id=vol_type.id,
provider_location=location)
location = six.text_type({'vol_hex_id': None})
metadata = [{'key': 'async_clone', 'value': True}]
tgt_vol = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
volume_metadata=metadata)
self.mock_object(eventlet, 'spawn')
mock_get_flashcopy.return_value = [TEST_FLASHCOPY]
volume_update = self.driver.create_cloned_volume(tgt_vol, src_vol)
self.assertEqual(
TEST_VOLUME_ID,
ast.literal_eval(volume_update['provider_location'])['vol_hex_id'])
self.assertEqual('started', volume_update['metadata']['flashcopy'])
eventlet.spawn.assert_called()
def test_check_async_cloned_volumes_when_initialize_driver(self):
"""initialize driver should check volumes cloned asynchronously."""
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
src_vol = self._create_volume(volume_type_id=vol_type.id,
provider_location=location)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID_2})
metadata = [{'key': 'flashcopy', 'value': 'started'}]
self._create_volume(volume_type_id=vol_type.id,
source_volid=src_vol.id,
provider_location=location,
volume_metadata=metadata)
self.mock_object(eventlet, 'spawn')
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
eventlet.spawn.assert_called()
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
def test_wait_flashcopy_when_async_clone_volume(
self, mock_get_flashcopy, mock_sleep):
"""clone volume asynchronously when flashcopy failed."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
src_vol = self._create_volume(volume_type_id=vol_type.id,
provider_location=location)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID_2})
metadata = [{'key': 'async_clone', 'value': True}]
tgt_vol = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
volume_metadata=metadata)
src_lun = ds8kproxy.Lun(src_vol)
tgt_lun = ds8kproxy.Lun(tgt_vol)
mock_get_flashcopy.side_effect = (
restclient.APIException('flashcopy fails.'))
self.driver._wait_flashcopy([src_lun], [tgt_lun])
self.assertEqual('error', tgt_lun.status)
self.assertEqual('error', tgt_vol.metadata['flashcopy'])
self.assertEqual('error', tgt_vol.status)
self.assertIsNotNone(tgt_vol.metadata.get('error_msg'))
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
def test_wait_flashcopy_when_async_clone_volume_2(
self, mock_get_flashcopy, mock_sleep):
"""clone volume asynchronously when flashcopy successed."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
src_vol = self._create_volume(volume_type_id=vol_type.id,
provider_location=location)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID_2})
metadata = [{'key': 'async_clone', 'value': True}]
tgt_vol = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
volume_metadata=metadata)
src_lun = ds8kproxy.Lun(src_vol)
tgt_lun = ds8kproxy.Lun(tgt_vol)
mock_get_flashcopy.return_value = {}
self.driver._wait_flashcopy([src_lun], [tgt_lun])
self.assertEqual('available', tgt_lun.status)
self.assertEqual('success', tgt_vol.metadata['flashcopy'])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
def test_create_cloned_volume(self, mock_get_flashcopy, mock_sleep):

View File

@ -94,7 +94,7 @@ class IBMStorageFakeProxyDriver(object):
"""
def __init__(self, ibm_storage_info, logger, expt,
driver=None, active_backend_id=None):
driver=None, active_backend_id=None, host=None):
"""Initialize Proxy."""
self.ibm_storage_info = ibm_storage_info

View File

@ -481,36 +481,49 @@ class DS8KCommonHelper(object):
return lss_ids
def wait_flashcopy_finished(self, src_luns, tgt_luns):
finished = False
try:
fc_state = [False] * len(tgt_luns)
while True:
eventlet.sleep(5)
for i in range(len(tgt_luns)):
if not fc_state[i]:
fcs = self.get_flashcopy(tgt_luns[i].ds_id)
valid_fc_states = ('valid', 'validation_required')
for tgt_lun in tgt_luns:
tgt_lun.status = 'checking'
while True:
eventlet.sleep(5)
for src_lun, tgt_lun in zip(src_luns, tgt_luns):
if tgt_lun.status == 'checking':
try:
fcs = self.get_flashcopy(tgt_lun.ds_id)
if not fcs:
fc_state[i] = True
continue
if fcs[0]['state'] not in ('valid',
'validation_required'):
raise restclient.APIException(
data=(_('Flashcopy ended up in bad state %s. '
'Rolling back.') % fcs[0]['state']))
if fc_state.count(False) == 0:
break
finished = True
finally:
if not finished:
for src_lun, tgt_lun in zip(src_luns, tgt_luns):
self.delete_flashcopy(src_lun.ds_id, tgt_lun.ds_id)
return finished
tgt_lun.status = 'available'
elif fcs[0]['state'] not in valid_fc_states:
LOG.error('Flashcopy %(src)s:%(tgt)s ended '
'up in bad state %(state)s.',
{'src': src_lun.ds_id,
'tgt': tgt_lun.ds_id,
'state': fcs[0]['state']})
tgt_lun.status = 'error'
except restclient.APIException:
LOG.error('Can not get flashcopy relationship '
'%(src)s:%(tgt)s',
{'src': src_lun.ds_id,
'tgt': tgt_lun.ds_id})
tgt_lun.status = 'error'
if not [lun for lun in tgt_luns if lun.status == 'checking']:
break
# cleanup error flashcopy relationship.
for src_lun, tgt_lun in zip(src_luns, tgt_luns):
if tgt_lun.status == 'error':
self.delete_flashcopy(src_lun.ds_id, tgt_lun.ds_id)
def wait_pprc_copy_finished(self, vol_ids, state, delete=True):
LOG.info("Wait for PPRC pair to enter into state %s", state)
vol_ids = sorted(vol_ids)
min_vol_id = min(vol_ids)
max_vol_id = max(vol_ids)
invalid_states = ('target_suspended',
'invalid',
'volume_inaccessible')
if state == 'full_duplex':
invalid_states += ('suspended',)
elif state == 'suspended':
invalid_states += ('valid',)
try:
finished = False
while True:
@ -522,17 +535,6 @@ class DS8KCommonHelper(object):
if len(finished_pairs) == len(pairs):
finished = True
break
invalid_states = [
'target_suspended',
'invalid',
'volume_inaccessible'
]
if state == 'full_duplex':
invalid_states.append('suspended')
elif state == 'suspended':
invalid_states.append('valid')
unfinished_pairs = [p for p in pairs if p['state'] != state]
for p in unfinished_pairs:
if p['state'] in invalid_states:

View File

@ -64,6 +64,7 @@ import ast
import json
import six
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
@ -100,7 +101,8 @@ EXTRA_SPECS_DEFAULTS = {
'consistency': False,
'os400': '',
'storage_pool_ids': '',
'storage_lss_ids': ''
'storage_lss_ids': '',
'async_clone': False
}
ds8k_opts = [
@ -139,9 +141,10 @@ class Lun(object):
1.0.0 - initial revision.
2.1.0 - Added support for specify pool and lss, also improve the code.
2.1.1 - Added support for replication consistency group.
2.1.2 - Added support for cloning volume asynchronously.
"""
VERSION = "2.1.1"
VERSION = "2.1.2"
class FakeLun(object):
@ -159,6 +162,8 @@ class Lun(object):
self.group = lun.group
self.specified_pool = lun.specified_pool
self.specified_lss = lun.specified_lss
self.async_clone = lun.async_clone
self.status = lun.status
if not self.is_snapshot:
self.replica_ds_name = lun.replica_ds_name
self.replication_driver_data = (
@ -221,6 +226,8 @@ class Lun(object):
self.ds_name = (
"OS%s:%s" % ('snap', helper.filter_alnum(self.cinder_name))
)[:16]
self.metadata = self._get_snapshot_metadata(volume)
self.source_volid = volume.volume_id
else:
self.group = Group(volume.group) if volume.group else None
self.size = volume.size
@ -253,6 +260,13 @@ class Lun(object):
else False)
else:
self.failed_over = False
self.metadata = self._get_volume_metadata(volume)
self.source_volid = volume.source_volid
self.async_clone = self.metadata.get(
'async_clone',
'%s' % EXTRA_SPECS_DEFAULTS['async_clone']
).upper() == 'TRUE'
if os400:
if os400 not in VALID_OS400_VOLUME_TYPES.keys():
raise restclient.APIException(
@ -309,25 +323,22 @@ class Lun(object):
volume_update['provider_location'] = six.text_type(
{'vol_hex_id': self.ds_id})
# update metadata
if self.is_snapshot:
metadata = self._get_snapshot_metadata(self.volume)
else:
metadata = self._get_volume_metadata(self.volume)
if not self.is_snapshot:
if self.type_replication:
metadata['replication'] = six.text_type(
self.metadata['replication'] = six.text_type(
self.replication_driver_data)
else:
metadata.pop('replication', None)
self.metadata.pop('replication', None)
volume_update['replication_driver_data'] = json.dumps(
self.replication_driver_data)
volume_update['replication_status'] = (
self.replication_status or
fields.ReplicationStatus.NOT_CAPABLE)
metadata['data_type'] = (self.data_type if self.data_type else
metadata['data_type'])
metadata['vol_hex_id'] = self.ds_id
volume_update['metadata'] = metadata
self.metadata['data_type'] = (self.data_type or
self.metadata['data_type'])
self.metadata['vol_hex_id'] = self.ds_id
volume_update['metadata'] = self.metadata
# need to update volume size for OS400
if self.type_os400:
@ -365,12 +376,13 @@ class DS8KProxy(proxy.IBMStorageProxy):
prefix = "[IBM DS8K STORAGE]:"
def __init__(self, storage_info, logger, exception, driver,
active_backend_id=None, HTTPConnectorObject=None):
active_backend_id=None, HTTPConnectorObject=None, host=None):
proxy.IBMStorageProxy.__init__(
self, storage_info, logger, exception, driver, active_backend_id)
self._helper = None
self._replication = None
self._connector_obj = HTTPConnectorObject
self._host = host
self._replication_enabled = False
self._active_backend_id = active_backend_id
self.configuration = driver.configuration
@ -402,6 +414,31 @@ class DS8KProxy(proxy.IBMStorageProxy):
if replication_devices:
self._do_replication_setup(replication_devices, self._helper)
# checking volumes which are still in clone process.
self._check_async_cloned_volumes()
@proxy.logger
def _check_async_cloned_volumes(self):
ctxt = context.get_admin_context()
volumes = objects.VolumeList.get_all_by_host(ctxt, self._host)
src_luns = []
tgt_luns = []
for volume in volumes:
tgt_lun = Lun(volume)
if tgt_lun.metadata.get('flashcopy') == 'started':
try:
src_vol = objects.Volume.get_by_id(
ctxt, tgt_lun.source_volid)
except exception.VolumeNotFound:
LOG.error("Failed to get source volume %(src) for "
"target volume %(tgt)s",
{'src': tgt_lun.source_volid,
'tgt': tgt_lun.ds_id})
else:
src_luns.append(Lun(src_vol))
tgt_luns.append(tgt_lun)
if src_luns and tgt_luns:
eventlet.spawn(self._wait_flashcopy, src_luns, tgt_luns)
@proxy.logger
def _do_replication_setup(self, devices, src_helper):
@ -666,30 +703,58 @@ class DS8KProxy(proxy.IBMStorageProxy):
_('When target volume is pre-created, it must be equal '
'in size to source volume.'))
finished = False
vol_pairs = [{
"source_volume": src_lun.ds_id,
"target_volume": tgt_lun.ds_id
}]
try:
vol_pairs = [{
"source_volume": src_lun.ds_id,
"target_volume": tgt_lun.ds_id
}]
self._helper.start_flashcopy(vol_pairs)
fc_finished = self._helper.wait_flashcopy_finished(
[src_lun], [tgt_lun])
if (fc_finished and
tgt_lun.type_thin and
tgt_lun.size > src_lun.size):
param = {
'cap': self._helper._gb2b(tgt_lun.size),
'captype': 'bytes'
}
self._helper.change_lun(tgt_lun.ds_id, param)
finished = fc_finished
if ((tgt_lun.type_thin and tgt_lun.size > src_lun.size) or
(not tgt_lun.async_clone)):
self._helper.wait_flashcopy_finished([src_lun], [tgt_lun])
if (tgt_lun.status == 'available' and
tgt_lun.type_thin and
tgt_lun.size > src_lun.size):
param = {
'cap': self._helper._gb2b(tgt_lun.size),
'captype': 'bytes'
}
self._helper.change_lun(tgt_lun.ds_id, param)
else:
LOG.info("Clone volume %(tgt)s from volume %(src)s "
"in the background.",
{'src': src_lun.ds_id, 'tgt': tgt_lun.ds_id})
tgt_lun.metadata['flashcopy'] = "started"
eventlet.spawn(self._wait_flashcopy, [src_lun], [tgt_lun])
finally:
if not finished:
if not tgt_lun.async_clone and tgt_lun.status == 'error':
self._helper.delete_lun(tgt_lun)
return tgt_lun
def _wait_flashcopy(self, src_luns, tgt_luns):
# please note that the order of volumes should be fixed.
self._helper.wait_flashcopy_finished(src_luns, tgt_luns)
for src_lun, tgt_lun in zip(src_luns, tgt_luns):
if tgt_lun.status == 'available':
tgt_lun.volume.metadata['flashcopy'] = 'success'
elif tgt_lun.status == 'error':
tgt_lun.volume.metadata['flashcopy'] = "error"
tgt_lun.volume.metadata['error_msg'] = (
"FlashCopy from source volume %(src)s to target volume "
"%(tgt)s fails, the state of target volume %(id)s is set "
"to error." % {'src': src_lun.ds_id,
'tgt': tgt_lun.ds_id,
'id': tgt_lun.os_id})
tgt_lun.volume.status = 'error'
self._helper.delete_lun(tgt_lun)
else:
self._helper.delete_lun(tgt_lun)
raise exception.VolumeDriverException(
message=_("Volume %(id)s is in unexpected state "
"%(state)s.") % {'id': tgt_lun.ds_id,
'state': tgt_lun.status})
tgt_lun.volume.save()
def _ensure_vol_not_fc_target(self, vol_hex_id):
for cp in self._helper.get_flashcopy(vol_hex_id):
if cp['targetvolume']['id'] == vol_hex_id:
@ -1211,7 +1276,6 @@ class DS8KProxy(proxy.IBMStorageProxy):
def _clone_group(self, src_luns, tgt_luns):
for src_lun in src_luns:
self._ensure_vol_not_fc_target(src_lun.ds_id)
finished = False
try:
vol_pairs = []
for src_lun, tgt_lun in zip(src_luns, tgt_luns):
@ -1226,9 +1290,11 @@ class DS8KProxy(proxy.IBMStorageProxy):
self._do_flashcopy_with_freeze(vol_pairs)
else:
self._helper.start_flashcopy(vol_pairs)
finished = self._helper.wait_flashcopy_finished(src_luns, tgt_luns)
self._helper.wait_flashcopy_finished(src_luns, tgt_luns)
finally:
if not finished:
# if one of volume failed, delete all volumes.
error_luns = [lun for lun in tgt_luns if lun.status == 'error']
if error_luns:
self._helper.delete_lun(tgt_luns)
@coordination.synchronized('{self.prefix}-consistency-group')

View File

@ -111,7 +111,8 @@ class IBMStorageDriver(san.SanDriver,
LOG,
exception,
driver=self,
active_backend_id=active_backend_id)
active_backend_id=active_backend_id,
host=self.host)
def do_setup(self, context):
"""Setup and verify connection to IBM Storage."""

View File

@ -117,7 +117,7 @@ class XIVProxy(proxy.IBMStorageProxy):
"""
def __init__(self, storage_info, logger, exception,
driver=None, active_backend_id=None):
driver=None, active_backend_id=None, host=None):
"""Initialize Proxy."""
if not active_backend_id:
active_backend_id = strings.PRIMARY_BACKEND_ID

View File

@ -0,0 +1,6 @@
---
features:
- |
Added support for cloning volume asynchronously, it can be enabled by
option async_clone set to true in parameter metadata when creating
volume from volume or snapshot.