DS8K: add replication consistency group support

Adding replication consistency group capabilities for ds8k driver.

Supported operations:
* Create volume and put it into replication consistency group
* Adjust existing replication consistency group
* Enable replication on group
* Disable replication on group
* Fail over replication group back and forth
* Get error info for replicated groups and its volumes

DocImpact
Implements: blueprint replication-cg-ds8k
Depends-On: Ibd98fa3246118aa6ad5d86ecbfe46ae78de87716
Change-Id: I87c9554e5d96e1a9356a7b9f760794bfd419898e
This commit is contained in:
Jia Min 2017-05-18 02:09:41 -07:00
parent b401355c6f
commit b5e46bb9bb
5 changed files with 1432 additions and 488 deletions

View File

@ -16,6 +16,7 @@
"""Tests for the IBM DS8K family driver."""
import ast
import copy
import ddt
import eventlet
import json
import mock
@ -785,6 +786,7 @@ FAKE_ASSIGN_HOST_PORT_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_DELETE_MAPPINGS_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_DELETE_HOST_PORTS_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_DELETE_HOSTS_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_PAUSE_RESPONSE = FAKE_GENERIC_RESPONSE
FAKE_REST_API_RESPONSES = {
TEST_SOURCE_DS8K_IP + '/get':
@ -879,6 +881,10 @@ FAKE_REST_API_RESPONSES = {
FAKE_FAILBACK_RESPONSE,
TEST_TARGET_DS8K_IP + '/cs/pprcs/resume/post':
FAKE_FAILBACK_RESPONSE,
TEST_SOURCE_DS8K_IP + '/cs/pprcs/pause/post':
FAKE_PAUSE_RESPONSE,
TEST_TARGET_DS8K_IP + '/cs/pprcs/pause/post':
FAKE_PAUSE_RESPONSE,
TEST_SOURCE_DS8K_IP + '/cs/flashcopies/post':
FAKE_POST_FLASHCOPIES_RESPONSE,
TEST_SOURCE_DS8K_IP + '/cs/flashcopies/unfreeze/post':
@ -948,6 +954,9 @@ class FakeDS8KCommonHelper(helper.DS8KCommonHelper):
self._storage_pools = None
self.backend = {}
self.setup()
self._existing_pool_ids = [TEST_POOL_ID_1,
TEST_POOL_ID_2,
TEST_ECKD_POOL_ID]
def _get_value(self, key):
value = getattr(self.conf, key, None)
@ -1044,12 +1053,13 @@ class FakeDS8KProxy(ds8kproxy.DS8KProxy):
def _do_replication_setup(self, devices, src_helper):
self._replication = FakeReplication(src_helper, devices[0])
if self._active_backend_id:
self._switch_backend_connection(self._active_backend_id)
self._replication.switch_source_and_target_client()
else:
self._replication.check_physical_links()
self._replication_enabled = True
@ddt.ddt
class DS8KProxyTest(test.TestCase):
"""Test proxy for DS8K volume driver."""
@ -1209,6 +1219,8 @@ class DS8KProxyTest(test.TestCase):
"free_capacity_gb": 10,
"reserved_percentage": 0,
"consistent_group_snapshot_enabled": True,
"group_replication_enabled": True,
"consistent_group_replication_enabled": True,
"multiattach": False
}
@ -1429,23 +1441,6 @@ class DS8KProxyTest(test.TestCase):
# error group will be ignored, so LSS 20 can be used.
self.assertEqual(lss, '20')
def test_create_volume_and_assign_to_group_with_wrong_host(self):
# create volume for group which has wrong format of host.
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host="fake_invalid_host",
group_type_id=group_type.id)
volume = self._create_volume(group_id=group.id)
self.assertRaises(exception.VolumeDriverException,
self.driver.create_volume, volume)
@mock.patch.object(helper.DS8KCommonHelper, '_create_lun')
def test_create_volume_but_lss_full_afterwards(self, mock_create_lun):
"""create volume in a LSS which is full afterwards."""
@ -1629,7 +1624,8 @@ class DS8KProxyTest(test.TestCase):
@mock.patch.object(proxy.IBMStorageProxy, '__init__')
@mock.patch.object(replication, 'Replication')
@mock.patch.object(ds8kproxy.DS8KProxy, '_switch_backend_connection')
@mock.patch.object(replication.Replication,
'switch_source_and_target_client')
def test_switch_backend_connection(self, mock_switch_connection,
mock_replication, mock_proxy_init):
"""driver should switch connection if it has been failed over."""
@ -1988,11 +1984,12 @@ class DS8KProxyTest(test.TestCase):
self.assertRaises(restclient.APIException,
self.driver.create_cloned_volume, tgt_vol, src_vol)
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
@mock.patch.object(helper.DS8KCommonHelper, 'lun_exists')
@mock.patch.object(helper.DS8KCommonHelper, 'create_lun')
def test_create_cloned_volume5(self, mock_create_lun, mock_lun_exists,
mock_get_flashcopy):
mock_get_flashcopy, mock_sleep):
"""clone a volume when target has volume ID but it is nonexistent."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
@ -2714,6 +2711,84 @@ class DS8KProxyTest(test.TestCase):
self.driver.create_group,
self.ctxt, group)
def test_create_generic_group_not_implemented(self):
"""create generic group is not implemented."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group'
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
self.assertRaises(NotImplementedError,
self.driver.create_group,
self.ctxt, group)
def test_create_replication_cg_should_verify_volume_types(self):
"""Cannot put non-replication volume type into replication cg."""
self.configuration.lss_range_for_cg = '20-23'
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {})
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_replication_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
volume_type_ids=[vol_type.id])
self.assertRaises(exception.VolumeDriverException,
self.driver.create_group,
self.ctxt, group)
@ddt.data({'bundle_version': "5.7.51.1067"},
{'bundle_version': "5.8.20.1058"})
@mock.patch.object(helper.DS8KCommonHelper, '_get_version')
def test_create_replication_consisgroup_should_verify_rest_version(
self, rest_version, mock_get_version):
"""Driver should verify whether does REST support pprc cg or not."""
self.configuration.lss_range_for_cg = '20-23'
mock_get_version.return_value = rest_version
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
vol_type = volume_types.create(
self.ctxt, 'VOL_TYPE', {'replication_enabled': '<is> True'})
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_replication_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
volume_type_ids=[vol_type.id])
self.assertRaises(exception.VolumeDriverException,
self.driver.create_group,
self.ctxt, group)
def test_create_consistency_group_without_reserve_lss(self):
"""user should reserve LSS for group if it enables cg."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
self.assertRaises(exception.VolumeDriverException,
self.driver.create_group, self.ctxt, group)
def test_delete_consistency_group_sucessfully(self):
"""test a successful consistency group deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -2759,24 +2834,60 @@ class DS8KProxyTest(test.TestCase):
self.assertEqual(fields.GroupStatus.ERROR_DELETING,
model_update['status'])
def test_create_consistency_group_without_reserve_lss(self):
"""user should reserve LSS for group if it enables cg."""
def test_delete_replication_group_is_not_implemented(self):
"""delete replication group is not implemented."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
{'group_replication_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
self.assertRaises(exception.VolumeDriverException,
self.driver.create_group, self.ctxt, group)
def test_update_generic_group_without_enable_cg(self):
"""update group which not enable cg should return None."""
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
group_id=group.id)
self.assertRaises(NotImplementedError,
self.driver.delete_group,
self.ctxt, group, [volume])
def test_update_replication_group_is_not_implemented(self):
"""update replication group is not implemented."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'group_replication_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
group_id=group.id)
self.assertRaises(NotImplementedError,
self.driver.update_group,
self.ctxt, group, [volume], [])
def test_update_generic_group_is_not_implemented(self):
"""update group which not enable cg is not implemented."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
@ -2786,11 +2897,9 @@ class DS8KProxyTest(test.TestCase):
group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(provider_location=location)
model_update, add_volumes_update, remove_volumes_update = (
self.driver.update_group(self.ctxt, group, [volume], []))
self.assertIsNone(model_update)
self.assertIsNone(add_volumes_update)
self.assertIsNone(remove_volumes_update)
self.assertRaises(NotImplementedError,
self.driver.update_group,
self.ctxt, group, [volume], [])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
@ -2868,9 +2977,8 @@ class DS8KProxyTest(test.TestCase):
location = ast.literal_eval(add_volumes_update[0]['provider_location'])
self.assertEqual('2200', location['vol_hex_id'])
@mock.patch.object(helper.DS8KCommonHelper, 'delete_lun')
def test_delete_generic_group_failed(self, mock_delete_lun):
"""test a failed group deletion."""
def test_delete_generic_group_not_implemented(self):
"""delete generic group but it is not implemented."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
@ -2880,29 +2988,9 @@ class DS8KProxyTest(test.TestCase):
volume = self._create_volume(group_type_id=group_type.id,
provider_location=location,
group_id=group.id)
mock_delete_lun.side_effect = (
restclient.APIException('delete volume failed.'))
model_update, volumes_model_update = (
self.driver.delete_group(self.ctxt, group, [volume]))
self.assertEqual('error_deleting', volumes_model_update[0]['status'])
self.assertEqual(fields.GroupStatus.ERROR_DELETING,
model_update['status'])
def test_delete_generic_group_sucessfully(self):
"""test a successful generic group deletion."""
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(self.ctxt, 'CG', {})
group = self._create_group(group_type_id=group_type.id)
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(group_type_id=group_type.id,
provider_location=location,
group_id=group.id)
model_update, volumes_model_update = (
self.driver.delete_group(self.ctxt, group, [volume]))
self.assertEqual('deleted', volumes_model_update[0]['status'])
self.assertEqual(fields.GroupStatus.DELETED, model_update['status'])
self.assertRaises(NotImplementedError,
self.driver.delete_group,
self.ctxt, group, [volume])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy')
@ -3074,6 +3162,38 @@ class DS8KProxyTest(test.TestCase):
self.assertEqual(fields.GroupStatus.AVAILABLE,
model_update['status'])
def test_create_group_from_generic_group(self):
"""create group from generic group is not implemented."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'group_replication_enabled': '<is> True'}
)
src_group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps({'vol_hex_id': TEST_VOLUME_ID})
src_volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
group_id=src_group.id)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
volume = self._create_volume(group_id=group.id)
self.assertRaises(NotImplementedError,
self.driver.create_group_from_src,
self.ctxt, group, [volume],
None, None, src_group, [src_volume])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_failover_host_successfully(self, mock_get_pprc_pairs, mock_sleep):
@ -3100,8 +3220,91 @@ class DS8KProxyTest(test.TestCase):
self.ctxt, [volume], TEST_TARGET_DS8K_IP, [])
self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id)
@mock.patch.object(replication.Replication, 'do_pprc_failover')
def test_failover_host_failed(self, mock_do_pprc_failover):
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_failover_host_with_group(self, mock_get_pprc_pairs, mock_sleep):
"""Failover host with group."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'group_replication_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='enabled')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id)
pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs[0]['state'] = 'suspended'
mock_get_pprc_pairs.side_effect = [pprc_pairs]
secondary_id, volume_update_list, group_update_list = (
self.driver.failover_host(self.ctxt, [volume],
TEST_TARGET_DS8K_IP, [group]))
self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id)
volume_update = volume_update_list[0]
self.assertEqual(volume_update['volume_id'], volume.id)
self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
volume_update['updates']['replication_status'])
group_update = group_update_list[0]
self.assertEqual(group_update['group_id'], group.id)
self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
group_update['updates']['replication_status'])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_failover_host_with_group_failed_over(self, mock_get_pprc_pairs,
mock_sleep):
"""Failover host with group that has been failed over."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'group_replication_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='failed-over')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{'default': {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id)
pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs[0]['state'] = 'suspended'
mock_get_pprc_pairs.side_effect = [pprc_pairs]
secondary_id, volume_update_list, group_update_list = (
self.driver.failover_host(self.ctxt, [volume],
TEST_TARGET_DS8K_IP, [group]))
self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id)
self.assertEqual(volume_update_list, [])
self.assertEqual(group_update_list, [])
@mock.patch.object(replication.Replication, 'start_host_pprc_failover')
def test_failover_host_failed(self, mock_host_pprc_failover):
"""Failover host should raise exception when failed."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -3119,7 +3322,7 @@ class DS8KProxyTest(test.TestCase):
replication_driver_data=data,
volume_metadata=metadata)
mock_do_pprc_failover.side_effect = (
mock_host_pprc_failover.side_effect = (
restclient.APIException('failed to do failover.'))
self.assertRaises(exception.UnableToFailOver,
self.driver.failover_host, self.ctxt,
@ -3155,7 +3358,7 @@ class DS8KProxyTest(test.TestCase):
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
{'default': {'vol_hex_id': TEST_VOLUME_ID}})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data)
@ -3175,7 +3378,7 @@ class DS8KProxyTest(test.TestCase):
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
{'default': {'vol_hex_id': TEST_VOLUME_ID}})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data)
@ -3184,42 +3387,6 @@ class DS8KProxyTest(test.TestCase):
self.assertIsNone(secondary_id)
self.assertEqual([], volume_update_list)
def test_failover_host_which_only_has_unreplicated_volume(self):
"""Failover host which only has unreplicated volume."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location)
secondary_id, volume_update_list, __ = self.driver.failover_host(
self.ctxt, [volume], TEST_TARGET_DS8K_IP, [])
self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id)
self.assertEqual('error', volume_update_list[0]['updates']['status'])
def test_failback_should_recover_status_of_unreplicated_volume(self):
"""Failback host should recover the status of unreplicated volume."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self, TEST_TARGET_DS8K_IP)
self.driver.setup(self.ctxt)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {})
location = six.text_type({
'vol_hex_id': TEST_VOLUME_ID,
'old_status': 'available'
})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location)
secondary_id, volume_update_list, __ = self.driver.failover_host(
self.ctxt, [volume], 'default', [])
self.assertEqual('default', secondary_id)
self.assertEqual('available',
volume_update_list[0]['updates']['status'])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_failback_host_successfully(self, mock_get_pprc_pairs, mock_sleep):
@ -3233,7 +3400,7 @@ class DS8KProxyTest(test.TestCase):
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
{'default': {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
@ -3249,8 +3416,8 @@ class DS8KProxyTest(test.TestCase):
self.ctxt, [volume], 'default', [])
self.assertEqual('default', secondary_id)
@mock.patch.object(replication.Replication, 'start_pprc_failback')
def test_failback_host_failed(self, mock_start_pprc_failback):
@mock.patch.object(replication.Replication, 'start_host_pprc_failback')
def test_failback_host_failed(self, mock_start_host_pprc_failback):
"""Failback host should raise exception when failed."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
@ -3261,12 +3428,401 @@ class DS8KProxyTest(test.TestCase):
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
{'default': {'vol_hex_id': TEST_VOLUME_ID}})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data)
mock_start_pprc_failback.side_effect = (
mock_start_host_pprc_failback.side_effect = (
restclient.APIException('failed to do failback.'))
self.assertRaises(exception.UnableToFailOver,
self.driver.failover_host, self.ctxt,
[volume], 'default', [])
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_enable_replication_successfully(self, mock_get_pprc_pairs):
"""Enable replication for the group successfully."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='disabled')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id,
replication_status='disabled')
pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs[0]['state'] = 'suspended'
mock_get_pprc_pairs.side_effect = [pprc_pairs]
model_update, volumes_update_list = self.driver.enable_replication(
self.ctxt, group, [volume])
self.assertEqual(fields.ReplicationStatus.ENABLED,
model_update.get('replication_status'))
for volume_update in volumes_update_list:
self.assertEqual(fields.ReplicationStatus.ENABLED,
volume_update.get('replication_status'))
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_enable_replication_if_pprc_in_invalid_state(
self, mock_get_pprc_pairs):
"""Enable replication but pprc relationship is in invalid state."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='disabled')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id,
replication_status='disabled')
pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs[0]['state'] = 'invalid'
mock_get_pprc_pairs.side_effect = [pprc_pairs]
self.assertRaises(exception.VolumeDriverException,
self.driver.enable_replication,
self.ctxt, group, [volume])
@mock.patch.object(helper.DS8KCommonHelper, 'resume_pprc_pairs')
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_enable_replication_but_resume_fails(self, mock_get_pprc_pairs,
mock_resume_pprc_pairs):
"""Enable replication but resume fails."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='disabled')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id,
replication_status='disabled')
pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs[0]['state'] = 'suspended'
mock_get_pprc_pairs.side_effect = [pprc_pairs]
mock_resume_pprc_pairs.side_effect = (
restclient.APIException('failed to resume replication.'))
self.assertRaises(exception.VolumeDriverException,
self.driver.enable_replication,
self.ctxt, group, [volume])
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_disable_replication_successfully(self, mock_get_pprc_pairs):
"""Disable replication for the group successfully."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='enabled')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id,
replication_status='enabled')
pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs[0]['state'] = 'full_duplex'
mock_get_pprc_pairs.side_effect = [pprc_pairs]
model_update, volumes_update_list = self.driver.disable_replication(
self.ctxt, group, [volume])
self.assertEqual(fields.ReplicationStatus.DISABLED,
model_update.get('replication_status'))
for volume_update in volumes_update_list:
self.assertEqual(fields.ReplicationStatus.DISABLED,
volume_update.get('replication_status'))
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_disable_replication_if_pprc_in_invalid_state(
self, mock_get_pprc_pairs):
"""Disable replication but pprc relationship is in invalid state."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='enabled')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id,
replication_status='enabled')
pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs[0]['state'] = 'invalid'
mock_get_pprc_pairs.side_effect = [pprc_pairs]
self.assertRaises(exception.VolumeDriverException,
self.driver.disable_replication,
self.ctxt, group, [volume])
@mock.patch.object(helper.DS8KCommonHelper, 'pause_pprc_pairs')
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_disable_replication_but_pause_fails(self, mock_get_pprc_pairs,
mock_pause_pprc_pairs):
"""Disable replication but pause fails."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='enabled')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id,
replication_status='enabled')
pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs[0]['state'] = 'full_duplex'
mock_get_pprc_pairs.side_effect = [pprc_pairs]
mock_pause_pprc_pairs.side_effect = (
restclient.APIException('failed to pause replication.'))
self.assertRaises(exception.VolumeDriverException,
self.driver.disable_replication,
self.ctxt, group, [volume])
@mock.patch.object(eventlet, 'sleep')
@mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs')
def test_failover_group_successfully(self, mock_get_pprc_pairs,
mock_sleep):
"""Failover group to valid secondary successfully."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id)
pprc_pairs_1 = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs_1[0]['state'] = 'suspended'
pprc_pairs_2 = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs'])
pprc_pairs_2[0]['state'] = 'full_duplex'
mock_get_pprc_pairs.side_effect = [pprc_pairs_1, pprc_pairs_2]
model_update, volumes_update_list = self.driver.failover_replication(
self.ctxt, group, [volume], TEST_TARGET_DS8K_IP)
self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
model_update.get('replication_status'))
for volume_update in volumes_update_list:
self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
volume_update.get('replication_status'))
@mock.patch.object(replication.Replication, 'start_group_pprc_failover')
def test_failover_group_failed(self, mock_group_pprc_failover):
"""Failover group should raise exception when failed."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
metadata = [{'key': 'data_type', 'value': 'FB 512'}]
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
volume_metadata=metadata,
group_id=group.id)
mock_group_pprc_failover.side_effect = (
restclient.APIException('failed to failover group.'))
self.assertRaises(exception.VolumeDriverException,
self.driver.failover_replication, self.ctxt,
group, [volume], TEST_TARGET_DS8K_IP)
def test_failover_group_to_invalid_target(self):
"""Failover group to invalid secondary should fail."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id)
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
group_id=group.id)
self.assertRaises(exception.InvalidReplicationTarget,
self.driver.failover_replication, self.ctxt,
group, [volume], 'fake_target')
def test_failover_group_that_has_been_failed_over(self):
"""Failover group that has been failed over should just return."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='failed-over')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{'default': {'vol_hex_id': TEST_VOLUME_ID}})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
group_id=group.id,
replication_status='failed-over')
model_update, volumes_update_list = self.driver.failover_replication(
self.ctxt, group, [volume], TEST_TARGET_DS8K_IP)
self.assertEqual({}, model_update)
self.assertEqual([], volumes_update_list)
def test_failback_group_that_has_been_failed_back(self):
"""Failback group that has been failed back should just return."""
self.configuration.replication_device = [TEST_REPLICATION_DEVICE]
self.driver = FakeDS8KProxy(self.storage_info, self.logger,
self.exception, self, TEST_TARGET_DS8K_IP)
self.driver.setup(self.ctxt)
group_type = group_types.create(
self.ctxt,
'group',
{'consistent_group_snapshot_enabled': '<is> True'}
)
group = self._create_group(host=TEST_GROUP_HOST,
group_type_id=group_type.id,
replication_status='enabled')
vol_type = volume_types.create(self.ctxt, 'VOL_TYPE',
{'replication_enabled': '<is> True'})
location = six.text_type({'vol_hex_id': TEST_VOLUME_ID})
data = json.dumps(
{TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}})
volume = self._create_volume(volume_type_id=vol_type.id,
provider_location=location,
replication_driver_data=data,
group_id=group.id,
replication_status='available')
model_update, volume_update_list = self.driver.failover_replication(
self.ctxt, group, [volume], 'default')
self.assertEqual({}, model_update)
self.assertEqual([], volume_update_list)

View File

@ -62,8 +62,9 @@ class DS8KCommonHelper(object):
OPTIONAL_PARAMS = ['ds8k_host_type', 'lss_range_for_cg']
# if use new REST API, please update the version below
VALID_REST_VERSION_5_7_MIN = '5.7.51.1047'
VALID_REST_VERSION_5_8_MIN = ''
INVALID_STORAGE_VERSION = '8.0.1'
REST_VERSION_5_7_MIN_PPRC_CG = '5.7.51.1068'
REST_VERSION_5_8_MIN_PPRC_CG = '5.8.20.1059'
def __init__(self, conf, HTTPConnectorObject=None):
self.conf = conf
@ -111,8 +112,8 @@ class DS8KCommonHelper(object):
self.backend['pools_str'] = self._get_value('san_clustername')
self._storage_pools = self.get_pools()
self.verify_pools(self._storage_pools)
self._get_lss_ids_for_cg()
self._verify_version()
self.backend['lss_ids_for_cg'] = self._get_lss_ids_for_cg()
self._verify_rest_version()
def update_client(self):
self._client.close()
@ -160,6 +161,7 @@ class DS8KCommonHelper(object):
self.backend['storage_version'] = storage_info['release']
def _get_lss_ids_for_cg(self):
lss_ids_for_cg = set()
lss_range = self._get_value('lss_range_for_cg')
if lss_range:
lss_range = lss_range.replace(' ', '').split('-')
@ -173,10 +175,9 @@ class DS8KCommonHelper(object):
raise exception.InvalidParameterValue(
err=_('Param [lss_range_for_cg] is invalid, it '
'should be within 00-FF.'))
self.backend['lss_ids_for_cg'] = set(
lss_ids_for_cg = set(
('%02x' % i).upper() for i in range(begin, end + 1))
else:
self.backend['lss_ids_for_cg'] = set()
return lss_ids_for_cg
def _check_host_type(self):
ds8k_host_type = self._get_value('ds8k_host_type')
@ -189,7 +190,7 @@ class DS8KCommonHelper(object):
self.backend['host_type_override'] = (
None if ds8k_host_type == 'auto' else ds8k_host_type)
def _verify_version(self):
def _verify_rest_version(self):
if self.backend['storage_version'] == self.INVALID_STORAGE_VERSION:
raise exception.VolumeDriverException(
message=(_("%s does not support bulk deletion of volumes, "
@ -205,6 +206,28 @@ class DS8KCommonHelper(object):
% {'invalid': self.backend['rest_version'],
'valid': self.VALID_REST_VERSION_5_7_MIN}))
def verify_rest_version_for_pprc_cg(self):
if '8.1' in self.backend['rest_version']:
raise exception.VolumeDriverException(
message=_("REST for DS8K 8.1 does not support PPRC "
"consistency group, please upgrade the CCL."))
valid_rest_version = None
if ('5.7' in self.backend['rest_version'] and
dist_version.LooseVersion(self.backend['rest_version']) <
dist_version.LooseVersion(self.REST_VERSION_5_7_MIN_PPRC_CG)):
valid_rest_version = self.REST_VERSION_5_7_MIN_PPRC_CG
elif ('5.8' in self.backend['rest_version'] and
dist_version.LooseVersion(self.backend['rest_version']) <
dist_version.LooseVersion(self.REST_VERSION_5_8_MIN_PPRC_CG)):
valid_rest_version = self.REST_VERSION_5_8_MIN_PPRC_CG
if valid_rest_version:
raise exception.VolumeDriverException(
message=(_("REST version %(invalid)s is lower than "
"%(valid)s, please upgrade it in DS8K.")
% {'invalid': self.backend['rest_version'],
'valid': valid_rest_version}))
def verify_pools(self, storage_pools):
if self._connection_type == storage.XIV_CONNECTION_TYPE_FC:
ptype = 'fb'
@ -821,14 +844,14 @@ class DS8KCommonHelper(object):
def delete_pprc_path(self, path_id):
self._client.send('DELETE', '/cs/pprcs/paths/%s' % path_id)
def create_pprc_pair(self, pairData):
self._client.send('POST', '/cs/pprcs', pairData)
def create_pprc_pair(self, pair_data):
self._client.send('POST', '/cs/pprcs', pair_data)
def delete_pprc_pair_by_pair_id(self, pids):
self._client.statusok('DELETE', '/cs/pprcs', params=pids)
def do_failback(self, pairData):
self._client.send('POST', '/cs/pprcs/resume', pairData)
def do_failback(self, pair_data):
self._client.send('POST', '/cs/pprcs/resume', pair_data)
def get_pprc_pairs(self, min_vol_id, max_vol_id):
return self._client.fetchall(
@ -844,14 +867,27 @@ class DS8KCommonHelper(object):
return None
# don't use pprc pair ID to delete it, because it may have
# communication issues.
pairData = {
pair_data = {
'volume_full_ids': [{
'volume_id': vol_id,
'system_id': self.backend['storage_unit']
}],
'options': ['unconditional', 'issue_source']
}
self._client.send('POST', '/cs/pprcs/delete', pairData)
self._client.send('POST', '/cs/pprcs/delete', pair_data)
def pause_pprc_pairs(self, pprc_pair_ids):
pair_data = {'pprc_ids': pprc_pair_ids}
self._client.send('POST', '/cs/pprcs/pause', pair_data)
def resume_pprc_pairs(self, pprc_pair_ids):
pair_data = {
'pprc_ids': pprc_pair_ids,
'type': 'metro_mirror',
'options': ['permit_space_efficient_target',
'initial_copy_out_of_sync']
}
self._client.send('POST', '/cs/pprcs/resume', pair_data)
class DS8KReplicationSourceHelper(DS8KCommonHelper):
@ -890,18 +926,19 @@ class DS8KReplicationSourceHelper(DS8KCommonHelper):
class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper):
"""Manage target storage for replication."""
OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs']
OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs', 'lss_range_for_cg']
def setup(self):
self._create_client()
self._get_storage_information()
self._get_replication_information()
self._check_host_type()
self.backend['lss_ids_for_cg'] = self._get_lss_ids_for_cg()
self.backend['pools_str'] = self._get_value(
'san_clustername').replace('_', ',')
self._storage_pools = self.get_pools()
self.verify_pools(self._storage_pools)
self._verify_version()
self._verify_rest_version()
def _get_replication_information(self):
port_pairs = []
@ -917,20 +954,6 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper):
self.backend['port_pairs'] = port_pairs
self.backend['id'] = self._get_value('backend_id')
@proxy.logger
def _find_lss_for_type_replication(self, node, excluded_lss):
# prefer to choose non-existing one first.
existing_lss = self.get_all_lss()
LOG.info("existing LSS IDs are %s",
','.join([lss['id'] for lss in existing_lss]))
lss_id = self._find_from_nonexistent_lss(node, existing_lss)
if not lss_id:
if excluded_lss:
existing_lss = [lss for lss in existing_lss
if lss['id'] not in excluded_lss]
lss_id = self._find_from_existing_lss(node, existing_lss)
return lss_id
def create_lun(self, lun):
volData = {
'cap': self._gb2b(lun.size),
@ -952,14 +975,14 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper):
def delete_pprc_pair(self, vol_id):
if not self.get_pprc_pairs(vol_id, vol_id):
return None
pairData = {
pair_data = {
'volume_full_ids': [{
'volume_id': vol_id,
'system_id': self.backend['storage_unit']
}],
'options': ['unconditional', 'issue_target']
}
self._client.send('POST', '/cs/pprcs/delete', pairData)
self._client.send('POST', '/cs/pprcs/delete', pair_data)
class DS8KECKDHelper(DS8KCommonHelper):
@ -999,16 +1022,16 @@ class DS8KECKDHelper(DS8KCommonHelper):
self._create_client()
self._get_storage_information()
self._check_host_type()
self._get_lss_ids_for_cg()
self.backend['lss_ids_for_cg'] = self._get_lss_ids_for_cg()
self.backend['pools_str'] = self._get_value('san_clustername')
self._storage_pools = self.get_pools()
self.verify_pools(self._storage_pools)
ssid_prefix = self._get_value('ds8k_ssid_prefix')
self.backend['ssid_prefix'] = ssid_prefix if ssid_prefix else 'FF'
self.backend['device_mapping'] = self._get_device_mapping()
self._verify_version()
self._verify_rest_version()
def _verify_version(self):
def _verify_rest_version(self):
if self.backend['storage_version'] == self.INVALID_STORAGE_VERSION:
raise exception.VolumeDriverException(
message=(_("%s does not support bulk deletion of volumes, "
@ -1034,6 +1057,7 @@ class DS8KECKDHelper(DS8KCommonHelper):
in self.backend['rest_version'] else
self.VALID_REST_VERSION_5_8_MIN)}))
@proxy.logger
def _get_device_mapping(self):
map_str = self._get_value('ds8k_devadd_unitadd_mapping')
mappings = map_str.replace(' ', '').upper().split(';')
@ -1198,6 +1222,7 @@ class DS8KReplicationTargetECKDHelper(DS8KECKDHelper,
self._get_storage_information()
self._get_replication_information()
self._check_host_type()
self.backend['lss_ids_for_cg'] = self._get_lss_ids_for_cg()
self.backend['pools_str'] = self._get_value(
'san_clustername').replace('_', ',')
self._storage_pools = self.get_pools()
@ -1205,7 +1230,7 @@ class DS8KReplicationTargetECKDHelper(DS8KECKDHelper,
ssid_prefix = self._get_value('ds8k_ssid_prefix')
self.backend['ssid_prefix'] = ssid_prefix if ssid_prefix else 'FF'
self.backend['device_mapping'] = self._get_device_mapping()
self._verify_version()
self._verify_rest_version()
def create_lun(self, lun):
volData = {

View File

@ -57,7 +57,6 @@ connection_type = fibre_channel
"""
import ast
import collections
import json
import six
@ -135,9 +134,10 @@ class Lun(object):
1.0.0 - initial revision.
2.1.0 - Added support for specify pool and lss, also improve the code.
2.1.1 - Added support for replication consistency group.
"""
VERSION = "2.1.0"
VERSION = "2.1.1"
class FakeLun(object):
@ -226,6 +226,7 @@ class Lun(object):
self.replica_ds_name = (
"OS%s:%s" % ('Replica', helper.filter_alnum(self.cinder_name))
)[:16]
self.previous_status = volume.previous_status
self.replication_status = volume.replication_status
self.replication_driver_data = (
json.loads(volume.replication_driver_data)
@ -234,12 +235,20 @@ class Lun(object):
# now only support one replication target.
replication_target = sorted(
self.replication_driver_data.values())[0]
replica_id = replication_target['vol_hex_id']
self.replica_ds_id = replication_target['vol_hex_id']
self.pool_lss_pair = {
'source': (None, self.ds_id[0:2]),
'target': (None, replica_id[0:2])
'target': (None, self.replica_ds_id[0:2])
}
# Don't use self.replication_status to judge if volume has
# been failed over or not, because when user fail over a
# group, replication_status of each volume in group is
# failing over.
self.failed_over = (True if 'default' in
self.replication_driver_data.keys()
else False)
else:
self.failed_over = False
if os400:
if os400 not in VALID_OS400_VOLUME_TYPES.keys():
raise restclient.APIException(
@ -295,7 +304,6 @@ class Lun(object):
volume_update = {}
volume_update['provider_location'] = six.text_type(
{'vol_hex_id': self.ds_id})
# update metadata
if self.is_snapshot:
metadata = self._get_snapshot_metadata(self.volume)
@ -308,7 +316,9 @@ class Lun(object):
metadata.pop('replication', None)
volume_update['replication_driver_data'] = json.dumps(
self.replication_driver_data)
volume_update['replication_status'] = self.replication_status
volume_update['replication_status'] = (
self.replication_status or
fields.ReplicationStatus.NOT_CAPABLE)
metadata['data_type'] = (self.data_type if self.data_type else
metadata['data_type'])
@ -328,11 +338,23 @@ class Group(object):
def __init__(self, group, is_snapshot=False):
self.id = group.id
self.host = group.host
self.consisgroup_snapshot_enabled = (
utils.is_group_a_cg_snapshot_type(group))
self.group_replication_enabled = (
utils.is_group_a_type(group,
"group_replication_enabled"))
self.consisgroup_replication_enabled = (
utils.is_group_a_type(group,
"consistent_group_replication_enabled"))
if is_snapshot:
self.snapshots = group.snapshots
else:
self.failed_over = (
group.replication_status ==
fields.ReplicationStatus.FAILED_OVER)
# create_volume needs to check volumes in the group,
# so get it from volume.group object.
self.volumes = group.volumes
self.consisgroup_enabled = utils.is_group_a_cg_snapshot_type(group)
class DS8KProxy(proxy.IBMStorageProxy):
@ -388,16 +410,9 @@ class DS8KProxy(proxy.IBMStorageProxy):
self._replication.check_physical_links()
self._replication.check_connection_type()
if self._active_backend_id:
self._switch_backend_connection(self._active_backend_id)
self._replication.switch_source_and_target_client()
self._replication_enabled = True
@proxy.logger
def _switch_backend_connection(self, backend_id, repl_luns=None):
repl_luns = self._replication.switch_source_and_target(backend_id,
repl_luns)
self._helper = self._replication._source_helper
return repl_luns
@staticmethod
def _b2gb(b):
return b // (2 ** 30)
@ -431,6 +446,8 @@ class DS8KProxy(proxy.IBMStorageProxy):
sum(p['capavail'] for p in storage_pools.values())),
"reserved_percentage": self.configuration.reserved_percentage,
"consistent_group_snapshot_enabled": True,
"group_replication_enabled": True,
"consistent_group_replication_enabled": True,
"multiattach": False
}
@ -455,7 +472,7 @@ class DS8KProxy(proxy.IBMStorageProxy):
LOG.error(msg)
raise exception.VolumeDriverException(message=msg)
if lun.type_replication:
target_helper = self._replication._target_helper
target_helper = self._replication.get_target_helper()
# PPRC can not copy from ESE volume to standard volume
# or vice versa.
if target_helper.get_thin_provision():
@ -473,10 +490,10 @@ class DS8KProxy(proxy.IBMStorageProxy):
lun.pool_lss_pair = {
'source': self._find_pool_lss_pair_from_spec(
lun, excluded_lss)}
elif lun.group and lun.group.consisgroup_enabled:
lun.pool_lss_pair = {
'source': self._find_pool_lss_pair_for_cg(
lun, excluded_lss)}
elif lun.group and (lun.group.consisgroup_snapshot_enabled or
lun.group.consisgroup_replication_enabled):
lun.pool_lss_pair = (
self._find_pool_lss_pair_for_cg(lun, excluded_lss))
else:
if lun.type_replication and not lun.is_snapshot:
lun.pool_lss_pair = (
@ -493,7 +510,8 @@ class DS8KProxy(proxy.IBMStorageProxy):
excluded_lss.add(lun.pool_lss_pair['source'][1])
def _find_pool_lss_pair_from_spec(self, lun, excluded_lss):
if lun.group and lun.group.consisgroup_enabled:
if lun.group and (lun.group.consisgroup_snapshot_enabled or
lun.group.consisgroup_replication_enabled):
msg = _("No support for specifying pool or lss for "
"volumes that belong to consistency group.")
LOG.error(msg)
@ -505,83 +523,112 @@ class DS8KProxy(proxy.IBMStorageProxy):
@coordination.synchronized('{self.prefix}-consistency-group')
def _find_pool_lss_pair_for_cg(self, lun, excluded_lss):
lss_in_cache = self.consisgroup_cache.get(lun.group.id, set())
if not lss_in_cache:
lss_in_cg = self._get_lss_in_cg(lun.group, lun.is_snapshot)
LOG.debug("LSSs used by CG %(cg)s are %(lss)s.",
{'cg': lun.group.id, 'lss': ','.join(lss_in_cg)})
available_lss = lss_in_cg - excluded_lss
# NOTE: a group may have multiple LSSs.
lss_pairs_in_cache = self.consisgroup_cache.get(lun.group.id, set())
if not lss_pairs_in_cache:
lss_pairs_in_group = self._get_lss_pairs_in_group(lun.group,
lun.is_snapshot)
LOG.debug("LSSs used by group %(grp)s are %(lss_pair)s.",
{'grp': lun.group.id, 'lss_pair': lss_pairs_in_group})
available_lss_pairs = set(pair for pair in lss_pairs_in_group
if pair[0] != excluded_lss)
else:
available_lss = lss_in_cache - excluded_lss
if not available_lss:
available_lss = self._find_lss_for_cg()
available_lss_pairs = set(pair for pair in lss_pairs_in_cache
if pair[0] != excluded_lss)
if not available_lss_pairs:
available_lss_pairs = self._find_lss_pair_for_cg(lun.group)
pid, lss = self._find_pool_for_lss(available_lss)
if pid:
lss_in_cache.add(lss)
self.consisgroup_cache[lun.group.id] = lss_in_cache
pool_lss_pair, lss_pair = self._find_pool_for_lss(available_lss_pairs)
if pool_lss_pair:
lss_pairs_in_cache.add(lss_pair)
self.consisgroup_cache[lun.group.id] = lss_pairs_in_cache
else:
raise exception.VolumeDriverException(
message=_('There are still some available LSSs for CG, '
'but they are not in the same node as pool.'))
return (pid, lss)
message=(_('There are still some available LSSs %s for CG, '
'but they are not in the same node as pool.')
% available_lss_pairs))
return pool_lss_pair
def _get_lss_in_cg(self, group, is_snapshot=False):
# Driver can not support the case that dedicating LSS for CG while
# user enable multiple backends which use the same DS8K.
try:
volume_backend_name = (
group.host[group.host.index('@') + 1:group.host.index('#')])
except ValueError:
raise exception.VolumeDriverException(
message=(_('Invalid host %(host)s in group %(group)s')
% {'host': group.host, 'group': group.id}))
lss_in_cg = set()
if volume_backend_name == self.configuration.volume_backend_name:
if is_snapshot:
luns = [Lun(snapshot, is_snapshot=True)
for snapshot in group.snapshots]
else:
luns = [Lun(volume) for volume in group.volumes]
lss_in_cg = set(lun.ds_id[:2] for lun in luns if lun.ds_id)
return lss_in_cg
def _get_lss_pairs_in_group(self, group, is_snapshot=False):
lss_pairs_in_group = set()
if is_snapshot:
luns = [Lun(snapshot, is_snapshot=True)
for snapshot in group.snapshots]
else:
luns = [Lun(volume) for volume in group.volumes]
if group.consisgroup_replication_enabled and not is_snapshot:
lss_pairs_in_group = set((lun.ds_id[:2], lun.replica_ds_id[:2])
for lun in luns if lun.ds_id and
lun.replica_ds_id)
else:
lss_pairs_in_group = set((lun.ds_id[:2], None)
for lun in luns if lun.ds_id)
return lss_pairs_in_group
def _find_lss_for_cg(self):
# Unable to get CGs/groups belonging to the current tenant, so
# get all of them, this function will consume some time if there
# are so many CGs/groups.
lss_used = set()
def _find_lss_pair_for_cg(self, group):
lss_pairs_used = set()
ctxt = context.get_admin_context()
existing_groups = objects.GroupList.get_all(
ctxt, filters={'status': 'available'})
for group in existing_groups:
if Group(group).consisgroup_enabled:
lss_used = lss_used | self._get_lss_in_cg(group)
existing_groupsnapshots = objects.GroupSnapshotList.get_all(
ctxt, filters={'status': 'available'})
for group in existing_groupsnapshots:
if Group(group, True).consisgroup_enabled:
lss_used = lss_used | self._get_lss_in_cg(group, True)
available_lss = set(self._helper.backend['lss_ids_for_cg']) - lss_used
for lss_set in self.consisgroup_cache.values():
available_lss -= lss_set
self._assert(available_lss,
filters = {'host': group.host, 'status': 'available'}
groups = objects.GroupList.get_all(ctxt, filters=filters)
for grp in groups:
grp = Group(grp)
if (grp.consisgroup_snapshot_enabled or
grp.consisgroup_replication_enabled):
lss_pairs_used |= self._get_lss_pairs_in_group(grp)
group_snapshots = (
objects.GroupSnapshotList.get_all(ctxt, filters=filters))
for grp in group_snapshots:
grp = Group(grp, True)
if (grp.consisgroup_snapshot_enabled or
grp.consisgroup_replication_enabled):
lss_pairs_used |= self._get_lss_pairs_in_group(grp, True)
# in order to keep one-to-one pprc mapping relationship, zip LSSs
# which reserved by user.
if group.consisgroup_replication_enabled:
target_helper = self._replication.get_target_helper()
available_lss_pairs = zip(self._helper.backend['lss_ids_for_cg'],
target_helper.backend['lss_ids_for_cg'])
else:
available_lss_pairs = [(lss, None) for lss in
self._helper.backend['lss_ids_for_cg']]
source_lss_used = set()
for lss_pair in lss_pairs_used:
source_lss_used.add(lss_pair[0])
# in concurrency case, lss may be reversed in cache but the group has
# not been committed into DB.
for lss_pairs_set in self.consisgroup_cache.values():
source_lss_used |= set(lss_pair[0] for lss_pair in lss_pairs_set)
available_lss_pairs = [lss_pair for lss_pair in available_lss_pairs
if lss_pair[0] not in source_lss_used]
self._assert(available_lss_pairs,
"All LSSs reserved for CG have been used out, "
"please reserve more LSS for CG if there are still"
"some empty LSSs left.")
LOG.debug('_find_lss_for_cg: available LSSs for consistency '
'group are %s', ','.join(available_lss))
return available_lss
LOG.debug('_find_lss_pair_for_cg: available LSSs for consistency '
'group are %s', available_lss_pairs)
return available_lss_pairs
@proxy.logger
def _find_pool_for_lss(self, available_lss):
for lss in available_lss:
pid = self._helper.get_pool(lss)
if pid:
return (pid, lss)
def _find_pool_for_lss(self, available_lss_pairs):
# all LSS pairs have target LSS or do not have.
for src_lss, tgt_lss in available_lss_pairs:
src_pid = self._helper.get_pool(src_lss)
if not src_pid:
continue
if tgt_lss:
target_helper = self._replication.get_target_helper()
tgt_pid = target_helper.get_pool(tgt_lss)
if tgt_pid:
return ({'source': (src_pid, src_lss),
'target': (tgt_pid, tgt_lss)},
(src_lss, tgt_lss))
else:
return {'source': (src_pid, src_lss)}, (src_lss, tgt_lss)
raise exception.VolumeDriverException(
message=(_("Can not find pool for LSSs %s.")
% ','.join(available_lss)))
% ','.join(available_lss_pairs)))
@proxy.logger
def _clone_lun(self, src_lun, tgt_lun):
@ -648,7 +695,7 @@ class DS8KProxy(proxy.IBMStorageProxy):
def _create_replica_helper(self, lun):
if not lun.pool_lss_pair.get('target'):
lun = self._replication.enable_replication(lun, True)
lun = self._replication.establish_replication(lun, True)
else:
lun = self._replication.create_replica(lun)
return lun
@ -865,7 +912,8 @@ class DS8KProxy(proxy.IBMStorageProxy):
# exception happens during clean up can be ignored.
if new_type_replication:
new_lun.type_replication = True
new_lun = self._replication.enable_replication(new_lun, True)
new_lun = self._replication.establish_replication(new_lun,
True)
elif old_type_replication:
new_lun.type_replication = False
try:
@ -882,7 +930,7 @@ class DS8KProxy(proxy.IBMStorageProxy):
# lun when failed to enable replication or delete replica.
if not old_type_replication and new_type_replication:
lun.type_replication = True
lun = self._replication.enable_replication(lun)
lun = self._replication.establish_replication(lun)
elif old_type_replication and not new_type_replication:
lun = self._replication.delete_replica(lun)
lun.type_replication = False
@ -893,73 +941,125 @@ class DS8KProxy(proxy.IBMStorageProxy):
@proxy.logger
def initialize_connection(self, volume, connector, **kwargs):
"""Attach a volume to the host."""
vol_id = Lun(volume).ds_id
LOG.info('Attach the volume %s.', vol_id)
return self._helper.initialize_connection(vol_id, connector, **kwargs)
lun = Lun(volume)
LOG.info('Attach the volume %s.', lun.ds_id)
if lun.group and lun.failed_over:
backend_helper = self._replication.get_target_helper()
else:
backend_helper = self._helper
return backend_helper.initialize_connection(lun.ds_id, connector,
**kwargs)
@proxy._trace_time
@proxy.logger
def terminate_connection(self, volume, connector, force=False, **kwargs):
"""Detach a volume from a host."""
vol_id = Lun(volume).ds_id
LOG.info('Detach the volume %s.', vol_id)
return self._helper.terminate_connection(vol_id, connector,
force, **kwargs)
lun = Lun(volume)
LOG.info('Detach the volume %s.', lun.ds_id)
if lun.group and lun.failed_over:
backend_helper = self._replication.get_target_helper()
else:
backend_helper = self._helper
return backend_helper.terminate_connection(lun.ds_id, connector,
force, **kwargs)
@proxy.logger
def create_group(self, ctxt, group):
"""Create generic volume group."""
if Group(group).consisgroup_enabled:
"""Create consistency group of FlashCopy or RemoteCopy."""
grp = Group(group)
if (grp.group_replication_enabled or
grp.consisgroup_replication_enabled):
for volume_type in group.volume_types:
replication_type = utils.is_replicated_spec(
volume_type.extra_specs)
self._assert(replication_type,
'Unable to create group: group %(grp)s '
'is for replication type, but volume '
'%(vtype)s is a non-replication one.'
% {'grp': grp.id, 'vtype': volume_type.id})
if (grp.consisgroup_snapshot_enabled or
grp.consisgroup_replication_enabled):
self._assert(self._helper.backend['lss_ids_for_cg'],
'No LSS(s) for CG, please make sure you have '
'reserved LSS for CG via param lss_range_for_cg.')
return self._helper.create_group(group)
model_update = {}
if grp.consisgroup_replication_enabled:
self._helper.verify_rest_version_for_pprc_cg()
target_helper = self._replication.get_target_helper()
target_helper.verify_rest_version_for_pprc_cg()
model_update['replication_status'] = (
fields.ReplicationStatus.ENABLED)
model_update.update(self._helper.create_group(group))
return model_update
else:
# NOTE(jiamin): If grp.group_replication_enabled is True, the
# default implementation will handle the creation of the group
# and driver just makes sure each volume type in group has
# enabled replication.
raise NotImplementedError()
@proxy.logger
def delete_group(self, ctxt, group, volumes):
"""Delete group and the volumes in the group."""
luns = [Lun(volume) for volume in volumes]
if Group(group).consisgroup_enabled:
"""Delete consistency group and volumes in it."""
grp = Group(group)
if grp.consisgroup_snapshot_enabled:
luns = [Lun(volume) for volume in volumes]
return self._delete_group_with_lock(group, luns)
elif grp.consisgroup_replication_enabled:
self._assert(not grp.failed_over,
'Group %s has been failed over, it does '
'not support to delete it' % grp.id)
luns = [Lun(volume) for volume in volumes]
for lun in luns:
self._replication.delete_replica(lun)
return self._delete_group_with_lock(group, luns)
else:
return self._helper.delete_group(group, luns)
raise NotImplementedError()
@coordination.synchronized('{self.prefix}-consistency-group')
def _delete_group_with_lock(self, group, luns):
model_update, volumes_model_update = (
self._helper.delete_group(group, luns))
if model_update['status'] == fields.GroupStatus.DELETED:
self._update_consisgroup_cache(group.id)
self._remove_record_from_consisgroup_cache(group.id)
return model_update, volumes_model_update
@proxy.logger
def delete_group_snapshot(self, ctxt, group_snapshot, snapshots):
"""Delete volume group snapshot."""
tgt_luns = [Lun(s, is_snapshot=True) for s in snapshots]
if Group(group_snapshot, True).consisgroup_enabled:
grp = Group(group_snapshot, True)
if (grp.consisgroup_snapshot_enabled or
grp.consisgroup_replication_enabled):
tgt_luns = [Lun(s, is_snapshot=True) for s in snapshots]
return self._delete_group_snapshot_with_lock(
group_snapshot, tgt_luns)
else:
return self._helper.delete_group_snapshot(
group_snapshot, tgt_luns)
raise NotImplementedError()
@coordination.synchronized('{self.prefix}-consistency-group')
def _delete_group_snapshot_with_lock(self, group_snapshot, tgt_luns):
model_update, snapshots_model_update = (
self._helper.delete_group_snapshot(group_snapshot, tgt_luns))
if model_update['status'] == fields.GroupStatus.DELETED:
self._update_consisgroup_cache(group_snapshot.id)
self._remove_record_from_consisgroup_cache(group_snapshot.id)
return model_update, snapshots_model_update
@proxy.logger
def create_group_snapshot(self, ctxt, group_snapshot, snapshots):
"""Create volume group snapshot."""
tgt_group = Group(group_snapshot, True)
if (not tgt_group.consisgroup_snapshot_enabled and
not tgt_group.consisgroup_replication_enabled):
raise NotImplementedError()
src_group = Group(group_snapshot.group)
self._assert(not src_group.failed_over,
'Group %s has been failed over, it does not '
'support to create group snapshot.' % src_group.id)
snapshots_model_update = []
model_update = {'status': fields.GroupStatus.AVAILABLE}
src_luns = [Lun(snapshot['volume']) for snapshot in snapshots]
src_luns = [Lun(snapshot.volume) for snapshot in snapshots]
tgt_luns = [Lun(snapshot, is_snapshot=True) for snapshot in snapshots]
try:
if src_luns and tgt_luns:
self._clone_group(src_luns, tgt_luns)
@ -980,89 +1080,89 @@ class DS8KProxy(proxy.IBMStorageProxy):
@proxy.logger
def update_group(self, ctxt, group, add_volumes, remove_volumes):
"""Update generic volume group."""
if Group(group).consisgroup_enabled:
return self._update_group(group, add_volumes, remove_volumes)
grp = Group(group)
if (grp.consisgroup_snapshot_enabled or
grp.consisgroup_replication_enabled):
self._assert(not grp.failed_over,
'Group %s has been failed over, it does not '
'support to update it.' % grp.id)
return self._update_consisgroup(grp, add_volumes, remove_volumes)
else:
return None, None, None
raise NotImplementedError()
def _update_group(self, group, add_volumes, remove_volumes):
def _update_consisgroup(self, grp, add_volumes, remove_volumes):
add_volumes_update = []
group_volume_ids = [vol.id for vol in group.volumes]
add_volumes = [vol for vol in add_volumes
if vol.id not in group_volume_ids]
remove_volumes = [vol for vol in remove_volumes
if vol.id in group_volume_ids]
if add_volumes:
add_luns = [Lun(vol) for vol in add_volumes]
lss_in_cg = [Lun(vol).ds_id[:2] for vol in group.volumes]
if not lss_in_cg:
lss_in_cg = self._find_lss_for_empty_group(group, add_luns)
add_volumes_update = self._add_volumes_into_group(
group, add_luns, lss_in_cg)
add_volumes_update = self._add_volumes_into_consisgroup(
grp, add_volumes)
remove_volumes_update = []
if remove_volumes:
self._remove_volumes_in_group(group, add_volumes, remove_volumes)
return None, add_volumes_update, None
remove_volumes_update = self._remove_volumes_from_consisgroup(
grp, add_volumes, remove_volumes)
return None, add_volumes_update, remove_volumes_update
@coordination.synchronized('{self.prefix}-consistency-group')
def _find_lss_for_empty_group(self, group, luns):
sorted_lss_ids = collections.Counter([lun.ds_id[:2] for lun in luns])
available_lss = self._find_lss_for_cg()
lss_for_cg = None
for lss_id in sorted_lss_ids:
if lss_id in available_lss:
lss_for_cg = lss_id
break
if not lss_for_cg:
lss_for_cg = available_lss.pop()
self._update_consisgroup_cache(group.id, lss_for_cg)
return lss_for_cg
def _add_volumes_into_group(self, group, add_luns, lss_in_cg):
@proxy.logger
def _add_volumes_into_consisgroup(self, grp, add_volumes):
add_volumes_update = []
luns = [lun for lun in add_luns if lun.ds_id[:2] not in lss_in_cg]
for lun in luns:
if lun.type_replication:
new_lun = self._clone_lun_for_group(group, lun)
new_lun.type_replication = True
new_lun = self._replication.enable_replication(new_lun, True)
lun = self._replication.delete_replica(lun)
else:
new_lun = self._clone_lun_for_group(group, lun)
self._helper.delete_lun(lun)
volume_update = new_lun.update_volume(lun)
volume_update['id'] = new_lun.os_id
new_add_luns, old_add_luns = (
self._clone_lun_for_consisgroup(add_volumes, grp))
for new_add_lun, old_add_lun in zip(new_add_luns, old_add_luns):
volume_update = new_add_lun.update_volume(old_add_lun)
volume_update['id'] = new_add_lun.os_id
add_volumes_update.append(volume_update)
return add_volumes_update
def _clone_lun_for_group(self, group, lun):
lun.group = Group(group)
new_lun = lun.shallow_copy()
new_lun.type_replication = False
self._clone_lun(lun, new_lun)
return new_lun
@proxy.logger
@coordination.synchronized('{self.prefix}-consistency-group')
def _remove_volumes_in_group(self, group, add_volumes, remove_volumes):
if len(remove_volumes) == len(group.volumes) + len(add_volumes):
self._update_consisgroup_cache(group.id)
def _remove_volumes_from_consisgroup(self, grp, add_volumes,
remove_volumes):
remove_volumes_update = []
new_remove_luns, old_remove_luns = (
self._clone_lun_for_consisgroup(remove_volumes))
for new_remove_lun, old_remove_lun in zip(new_remove_luns,
old_remove_luns):
volume_update = new_remove_lun.update_volume(old_remove_lun)
volume_update['id'] = new_remove_lun.os_id
remove_volumes_update.append(volume_update)
if len(remove_volumes) == len(grp.volumes) + len(add_volumes):
self._remove_record_from_consisgroup_cache(grp.id)
return remove_volumes_update
def _clone_lun_for_consisgroup(self, volumes, grp=None):
new_luns = []
old_luns = []
for volume in volumes:
old_lun = Lun(volume)
if old_lun.ds_id:
new_lun = old_lun.shallow_copy()
new_lun.group = grp
self._clone_lun(old_lun, new_lun)
if old_lun.type_replication:
new_lun = self._create_replica_helper(new_lun)
old_lun = self._replication.delete_replica(old_lun)
self._helper.delete_lun(old_lun)
new_luns.append(new_lun)
old_luns.append(old_lun)
return new_luns, old_luns
@proxy.logger
def _update_consisgroup_cache(self, group_id, lss_id=None):
if lss_id:
self.consisgroup_cache[group_id] = set([lss_id])
else:
if self.consisgroup_cache.get(group_id):
LOG.debug('Group %(id)s owns LSS %(lss)s in the cache.', {
'id': group_id,
'lss': ','.join(self.consisgroup_cache[group_id])
})
self.consisgroup_cache.pop(group_id)
def _remove_record_from_consisgroup_cache(self, group_id):
lss_pairs = self.consisgroup_cache.get(group_id)
if lss_pairs:
LOG.debug('Consistecy Group %(id)s owns LSS %(lss)s in the cache.',
{'id': group_id, 'lss': lss_pairs})
self.consisgroup_cache.pop(group_id)
@proxy._trace_time
def create_group_from_src(self, ctxt, group, volumes, group_snapshot,
sorted_snapshots, source_group,
sorted_source_vols):
"""Create volume group from volume group or volume group snapshot."""
grp = Group(group)
if (not grp.consisgroup_snapshot_enabled and
not grp.consisgroup_replication_enabled):
raise NotImplementedError()
model_update = {'status': fields.GroupStatus.AVAILABLE}
volumes_model_update = []
@ -1072,6 +1172,10 @@ class DS8KProxy(proxy.IBMStorageProxy):
elif source_group and sorted_source_vols:
src_luns = [Lun(source_vol)
for source_vol in sorted_source_vols]
src_group = Group(source_group)
self._assert(not src_group.failed_over,
'Group %s has been failed over, it does not '
'support to create a group from it.' % src_group.id)
else:
msg = _("_create_group_from_src supports a group snapshot "
"source or a group source, other sources can not "
@ -1080,16 +1184,6 @@ class DS8KProxy(proxy.IBMStorageProxy):
raise exception.InvalidInput(message=msg)
try:
# Don't use paramter volumes because it has DetachedInstanceError
# issue frequently. here tries to get and sort new volumes, a lot
# of cases have been guaranteed by the _sort_source_vols in
# manange.py, so not verify again.
sorted_volumes = []
for vol in volumes:
found_vols = [v for v in group.volumes if v['id'] == vol['id']]
sorted_volumes.extend(found_vols)
volumes = sorted_volumes
tgt_luns = [Lun(volume) for volume in volumes]
if src_luns and tgt_luns:
self._clone_group(src_luns, tgt_luns)
@ -1124,7 +1218,7 @@ class DS8KProxy(proxy.IBMStorageProxy):
"source_volume": src_lun.ds_id,
"target_volume": tgt_lun.ds_id
})
if tgt_lun.group.consisgroup_enabled:
if tgt_lun.group.consisgroup_snapshot_enabled:
self._do_flashcopy_with_freeze(vol_pairs)
else:
self._helper.start_flashcopy(vol_pairs)
@ -1171,49 +1265,53 @@ class DS8KProxy(proxy.IBMStorageProxy):
self._active_backend_id)
return self._active_backend_id, volume_update_list, []
backend_id = self._replication._target_helper.backend['id']
target_helper = self._replication.get_target_helper()
if secondary_id is None:
secondary_id = backend_id
elif secondary_id != backend_id:
secondary_id = target_helper.backend['id']
elif secondary_id != target_helper.backend['id']:
raise exception.InvalidReplicationTarget(
message=(_('Invalid secondary_backend_id specified. '
'Valid backend id is %s.') % backend_id))
'Valid backend id is %s.')
% target_helper.backend['id']))
LOG.debug("Starting failover to %s.", secondary_id)
replicated_luns = []
for volume in volumes:
lun = Lun(volume)
if lun.type_replication and lun.status == "available":
replicated_luns.append(lun)
else:
volume_update = (
self._replication.failover_unreplicated_volume(lun))
volume_update_list.append(volume_update)
if replicated_luns:
LOG.debug("Starting failover host to %s.", secondary_id)
# all volumes passed to failover_host are replicated.
replicated_luns = [Lun(volume) for volume in volumes if
volume.status in ('available', 'in-use')]
# volumes in group may have been failed over.
if secondary_id != strings.PRIMARY_BACKEND_ID:
failover_luns = [lun for lun in replicated_luns if
not lun.failed_over]
else:
failover_luns = [lun for lun in replicated_luns if
lun.failed_over]
if failover_luns:
try:
if secondary_id != strings.PRIMARY_BACKEND_ID:
self._replication.do_pprc_failover(replicated_luns,
secondary_id)
self._replication.start_host_pprc_failover(
failover_luns, secondary_id)
self._active_backend_id = secondary_id
replicated_luns = self._switch_backend_connection(
secondary_id, replicated_luns)
else:
self._replication.start_pprc_failback(
replicated_luns, self._active_backend_id)
self._replication.start_host_pprc_failback(
failover_luns, secondary_id)
self._active_backend_id = ""
self._helper = self._replication._source_helper
self._helper = self._replication.get_source_helper()
except restclient.APIException as e:
raise exception.UnableToFailOver(
reason=(_("Unable to failover host to %(id)s. "
"Exception= %(ex)s")
% {'id': secondary_id, 'ex': six.text_type(e)}))
for lun in replicated_luns:
for lun in failover_luns:
volume_update = lun.get_volume_update()
# failover_host in base cinder has considered previous status
# of the volume, it doesn't need to return it for update.
volume_update['status'] = (
lun.previous_status or 'available')
volume_update['replication_status'] = (
'failed-over' if self._active_backend_id else 'enabled')
fields.ReplicationStatus.FAILED_OVER
if self._active_backend_id else
fields.ReplicationStatus.ENABLED)
model_update = {'volume_id': lun.os_id,
'updates': volume_update}
volume_update_list.append(model_update)
@ -1221,11 +1319,160 @@ class DS8KProxy(proxy.IBMStorageProxy):
LOG.info("No volume has replication capability.")
if secondary_id != strings.PRIMARY_BACKEND_ID:
LOG.info("Switch to the target %s", secondary_id)
self._switch_backend_connection(secondary_id)
self._replication.switch_source_and_target_client()
self._active_backend_id = secondary_id
else:
LOG.info("Switch to the primary %s", secondary_id)
self._switch_backend_connection(self._active_backend_id)
self._replication.switch_source_and_target_client()
self._active_backend_id = ""
return secondary_id, volume_update_list, []
# No group entity in DS8K, so just need to update replication_status
# of the group.
group_update_list = []
groups = [grp for grp in groups if grp.status == 'available']
if groups:
if secondary_id != strings.PRIMARY_BACKEND_ID:
update_groups = [grp for grp in groups
if grp.replication_status ==
fields.ReplicationStatus.ENABLED]
repl_status = fields.ReplicationStatus.FAILED_OVER
else:
update_groups = [grp for grp in groups
if grp.replication_status ==
fields.ReplicationStatus.FAILED_OVER]
repl_status = fields.ReplicationStatus.ENABLED
if update_groups:
for group in update_groups:
group_update = {
'group_id': group.id,
'updates': {'replication_status': repl_status}
}
group_update_list.append(group_update)
return secondary_id, volume_update_list, group_update_list
def enable_replication(self, context, group, volumes):
"""Resume pprc pairs.
if user wants to adjust group, he/she does not need to pause/resume
pprc pairs, here just provide a way to resume replicaiton.
"""
volumes_model_update = []
model_update = (
{'replication_status': fields.ReplicationStatus.ENABLED})
if volumes:
luns = [Lun(volume) for volume in volumes]
try:
self._replication.enable_replication(luns)
except restclient.APIException as e:
msg = (_('Failed to enable replication for group %(id)s, '
'Exception: %(ex)s.')
% {'id': group.id, 'ex': six.text_type(e)})
LOG.exception(msg)
raise exception.VolumeDriverException(message=msg)
for lun in luns:
volumes_model_update.append(
{'id': lun.os_id,
'replication_status': fields.ReplicationStatus.ENABLED})
return model_update, volumes_model_update
def disable_replication(self, context, group, volumes):
"""Pause pprc pairs.
if user wants to adjust group, he/she does not need to pause/resume
pprc pairs, here just provide a way to pause replicaiton.
"""
volumes_model_update = []
model_update = (
{'replication_status': fields.ReplicationStatus.DISABLED})
if volumes:
luns = [Lun(volume) for volume in volumes]
try:
self._replication.disable_replication(luns)
except restclient.APIException as e:
msg = (_('Failed to disable replication for group %(id)s, '
'Exception: %(ex)s.')
% {'id': group.id, 'ex': six.text_type(e)})
LOG.exception(msg)
raise exception.VolumeDriverException(message=msg)
for lun in luns:
volumes_model_update.append(
{'id': lun.os_id,
'replication_status': fields.ReplicationStatus.DISABLED})
return model_update, volumes_model_update
def failover_replication(self, context, group, volumes,
secondary_backend_id):
"""Fail over replication for a group and volumes in the group."""
volumes_model_update = []
model_update = {}
luns = [Lun(volume) for volume in volumes]
if secondary_backend_id == strings.PRIMARY_BACKEND_ID:
if luns:
if not luns[0].failed_over:
LOG.info("Group %s has been failed back. it doesn't "
"need to fail back again.", group.id)
return model_update, volumes_model_update
else:
return model_update, volumes_model_update
else:
target_helper = self._replication.get_target_helper()
backend_id = target_helper.backend['id']
if secondary_backend_id is None:
secondary_backend_id = backend_id
elif secondary_backend_id != backend_id:
raise exception.InvalidReplicationTarget(
message=(_('Invalid secondary_backend_id %(id)s. '
'Valid backend ids are %(ids)s.')
% {'id': secondary_backend_id,
'ids': (strings.PRIMARY_BACKEND_ID,
backend_id)}))
if luns:
if luns[0].failed_over:
LOG.info("Group %(grp)s has been failed over to %(id)s.",
{'grp': group.id, 'id': backend_id})
return model_update, volumes_model_update
else:
return model_update, volumes_model_update
LOG.debug("Starting failover group %(grp)s to %(id)s.",
{'grp': group.id, 'id': secondary_backend_id})
try:
if secondary_backend_id != strings.PRIMARY_BACKEND_ID:
self._replication.start_group_pprc_failover(
luns, secondary_backend_id)
model_update['replication_status'] = (
fields.ReplicationStatus.FAILED_OVER)
else:
self._replication.start_group_pprc_failback(
luns, secondary_backend_id)
model_update['replication_status'] = (
fields.ReplicationStatus.ENABLED)
except restclient.APIException as e:
raise exception.VolumeDriverException(
message=(_("Unable to failover group %(grp_id)s to "
"backend %(bck_id)s. Exception= %(ex)s")
% {'grp_id': group.id,
'bck_id': secondary_backend_id,
'ex': six.text_type(e)}))
for lun in luns:
volume_model_update = lun.get_volume_update()
# base cinder doesn't consider previous status of the volume
# in failover_replication, so here returns it for update.
volume_model_update['previous_status'] = lun.status
volume_model_update['status'] = (
lun.previous_status or 'available')
volume_model_update['replication_status'] = (
model_update['replication_status'])
volume_model_update['id'] = lun.os_id
volumes_model_update.append(volume_model_update)
return model_update, volumes_model_update
def get_replication_error_status(self, context, groups):
"""Return error info for replicated groups and its volumes.
all pprc copy related APIs wait until copy is finished, so it does
not need to check their status afterwards.
"""
return [], []

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import ast
import eventlet
import six
@ -39,35 +38,36 @@ PPRC_PATH_FULL = 0x03
class MetroMirrorManager(object):
"""Manage metro mirror for replication."""
def __init__(self, source, target):
self._source = source
self._target = target
def __init__(self, source_helper, target_helper):
self._source_helper = source_helper
self._target_helper = target_helper
def switch_source_and_target(self):
self._source, self._target = self._target, self._source
self._source_helper, self._target_helper = (
self._target_helper, self._source_helper)
def check_physical_links(self):
ports = self._source.get_physical_links(
self._target.backend['storage_wwnn'])
ports = self._source_helper.get_physical_links(
self._target_helper.backend['storage_wwnn'])
if not ports:
raise exception.VolumeDriverException(
message=((_("%(tgt)s is not connected to %(src)s!") % {
'tgt': self._target.backend['storage_wwnn'],
'src': self._source.backend['storage_wwnn']
'tgt': self._target_helper.backend['storage_wwnn'],
'src': self._source_helper.backend['storage_wwnn']
})))
pairs = [{
'source_port_id': p['source_port_id'],
'target_port_id': p['target_port_id']
} for p in ports]
if not self._target.backend['port_pairs']:
if not self._target_helper.backend['port_pairs']:
# if there are more than eight physical links,
# choose eight of them.
self._target.backend['port_pairs'] = (
self._target_helper.backend['port_pairs'] = (
pairs[:8] if len(pairs) > 8 else pairs)
else:
# verify the port pairs user set
for pair in self._target.backend['port_pairs']:
for pair in self._target_helper.backend['port_pairs']:
if pair not in pairs:
valid_pairs = ';'.join(
["%s-%s" % (p['source_port_id'],
@ -80,14 +80,14 @@ class MetroMirrorManager(object):
"port pair(s) are: %(valid)s")
% {'invalid': invalid_pair,
'valid': valid_pairs})))
self._source.backend['port_pairs'] = [{
self._source_helper.backend['port_pairs'] = [{
'source_port_id': p['target_port_id'],
'target_port_id': p['source_port_id']
} for p in self._target.backend['port_pairs']]
} for p in self._target_helper.backend['port_pairs']]
def is_target_alive(self):
try:
self._target.get_systems()
self._target_helper.get_systems()
except restclient.TimeoutException as e:
LOG.info("REST request time out, backend may be not available "
"any more. Exception: %s", e)
@ -110,12 +110,12 @@ class MetroMirrorManager(object):
if excluded_lss:
paths = [p for p in paths
if p['source_lss_id'] not in excluded_lss]
# only enable_replication will specify the source LSS
# only establish_replication will specify the source LSS
# and it need to reuse LSS reserved for CG if this LSS
# is in PPRC path.
if not specified_lss:
paths = [p for p in paths if p['source_lss_id'] not in
self._source.backend['lss_ids_for_cg']]
self._source_helper.backend['lss_ids_for_cg']]
# sort pairs according to the number of luns in their LSSes,
# and get the pair which LSS has least luns.
@ -123,7 +123,7 @@ class MetroMirrorManager(object):
source_lss_set = set(p['source_lss_id'] for p in paths)
for lss in source_lss_set:
# get the number of luns in source.
src_luns = self._source.get_lun_number_in_lss(lss)
src_luns = self._source_helper.get_lun_number_in_lss(lss)
if src_luns == helper.LSS_VOL_SLOTS and not specified_lss:
continue
@ -131,7 +131,7 @@ class MetroMirrorManager(object):
for path in spec_paths:
# get the number of luns in target.
try:
tgt_luns = self._target.get_lun_number_in_lss(
tgt_luns = self._target_helper.get_lun_number_in_lss(
path['target_lss_id'])
except restclient.APIException:
# if DS8K can fix this problem, then remove the
@ -148,23 +148,24 @@ class MetroMirrorManager(object):
else:
src_lss, tgt_lss, num = sorted(candidates, key=lambda c: c[2])[0]
return PPRC_PATH_HEALTHY, {
'source': (self._source.get_pool(src_lss), src_lss),
'target': (self._target.get_pool(tgt_lss), tgt_lss)
'source': (self._source_helper.get_pool(src_lss), src_lss),
'target': (self._target_helper.get_pool(tgt_lss), tgt_lss)
}
def _filter_pprc_paths(self, lss):
paths = self._source.get_pprc_paths(lss)
paths = self._source_helper.get_pprc_paths(lss)
if paths:
# get the paths only connected to replication target
paths = [p for p in paths if p['target_system_wwnn'] in
self._target.backend['storage_wwnn']]
self._target_helper.backend['storage_wwnn']]
else:
LOG.info("No PPRC paths found in primary DS8K.")
return PPRC_PATH_NOT_EXIST, None
# get the paths whose port pairs have been set in configuration file.
expected_port_pairs = [(p['source_port_id'], p['target_port_id'])
for p in self._target.backend['port_pairs']]
expected_port_pairs = [
(port['source_port_id'], port['target_port_id'])
for port in self._target_helper.backend['port_pairs']]
for path in paths[:]:
port_pairs = [(p['source_port_id'], p['target_port_id'])
for p in path['port_pairs']]
@ -177,11 +178,11 @@ class MetroMirrorManager(object):
# abandon PPRC paths according to volume type(fb/ckd)
source_lss_set = set(p['source_lss_id'] for p in paths)
if self._source.backend.get('device_mapping'):
if self._source_helper.backend.get('device_mapping'):
source_lss_set = source_lss_set & set(
self._source.backend['device_mapping'].keys())
self._source_helper.backend['device_mapping'].keys())
else:
all_lss = self._source.get_all_lss(['id', 'type'])
all_lss = self._source_helper.get_all_lss(['id', 'type'])
fb_lss = set(
lss['id'] for lss in all_lss if lss['type'] == 'fb')
source_lss_set = source_lss_set & fb_lss
@ -196,13 +197,13 @@ class MetroMirrorManager(object):
discarded_tgt_lss = []
for lss in source_lss_set:
spec_paths = [p for p in paths if p['source_lss_id'] == lss]
if self._source.get_pool(lss) is None:
if self._source_helper.get_pool(lss) is None:
discarded_src_lss.append(lss)
continue
for spec_path in spec_paths:
tgt_lss = spec_path['target_lss_id']
if self._target.get_pool(tgt_lss) is None:
if self._target_helper.get_pool(tgt_lss) is None:
discarded_tgt_lss.append(tgt_lss)
if discarded_src_lss:
@ -228,13 +229,17 @@ class MetroMirrorManager(object):
return PPRC_PATH_HEALTHY, paths
def create_pprc_path(self, pool_lss_pair):
src_lss = pool_lss_pair['source'][1]
tgt_lss = pool_lss_pair['target'][1]
# check whether the pprc path exists and is healthy or not firstly.
pid = (self._source.backend['storage_wwnn'] + '_' + src_lss + ':' +
self._target.backend['storage_wwnn'] + '_' + tgt_lss)
state = self._is_pprc_paths_healthy(pid)
def create_pprc_path(self, lun, is_group=False):
switch = lun.failed_over if is_group else False
src_helper, tgt_helper = (
(self._target_helper, self._source_helper) if switch else
(self._source_helper, self._target_helper))
src_lss = lun.pool_lss_pair['source'][1]
tgt_lss = lun.pool_lss_pair['target'][1]
# check whether the pprc path exists and is healthy or not.
pid = (src_helper.backend['storage_wwnn'] + '_' + src_lss + ':' +
tgt_helper.backend['storage_wwnn'] + '_' + tgt_lss)
state = self._is_pprc_paths_healthy(pid, switch)
LOG.info("The state of PPRC path %(path)s is %(state)s.",
{'path': pid, 'state': state})
if state == PPRC_PATH_HEALTHY:
@ -242,31 +247,34 @@ class MetroMirrorManager(object):
# create the pprc path
pathData = {
'target_system_wwnn': self._target.backend['storage_wwnn'],
'target_system_wwnn': tgt_helper.backend['storage_wwnn'],
'source_lss_id': src_lss,
'target_lss_id': tgt_lss,
'port_pairs': self._target.backend['port_pairs']
'port_pairs': tgt_helper.backend['port_pairs']
}
if lun.group and lun.group.consisgroup_replication_enabled:
pathData['pprc_consistency_group'] = 'enable'
LOG.info("PPRC path %(src)s:%(tgt)s will be created.",
{'src': src_lss, 'tgt': tgt_lss})
self._source.create_pprc_path(pathData)
src_helper.create_pprc_path(pathData)
# check the state of the pprc path
LOG.debug("Checking the state of the new PPRC path.")
for retry in range(4):
eventlet.sleep(2)
if self._is_pprc_paths_healthy(pid) == PPRC_PATH_HEALTHY:
if self._is_pprc_paths_healthy(pid, switch) == PPRC_PATH_HEALTHY:
break
if retry == 3:
self._source.delete_pprc_path(pid)
src_helper.delete_pprc_path(pid)
raise restclient.APIException(
data=(_("Failed to create PPRC path %(src)s:%(tgt)s.")
% {'src': src_lss, 'tgt': tgt_lss}))
LOG.debug("Create the new PPRC path successfully.")
def _is_pprc_paths_healthy(self, path_id):
def _is_pprc_paths_healthy(self, path_id, switch):
bck_helper = self._target_helper if switch else self._source_helper
try:
path = self._source.get_pprc_path(path_id)
path = bck_helper.get_pprc_path(path_id)
except restclient.APIException:
return PPRC_PATH_NOT_EXIST
@ -278,99 +286,114 @@ class MetroMirrorManager(object):
def create_pprc_pairs(self, lun):
tgt_vol_id = lun.replication_driver_data[
self._target.backend['id']]['vol_hex_id']
tgt_stg_id = self._target.backend['storage_unit']
self._target_helper.backend['id']]['vol_hex_id']
tgt_stg_id = self._target_helper.backend['storage_unit']
vol_pairs = [{
'source_volume': lun.ds_id,
'source_system_id': self._source.backend['storage_unit'],
'source_system_id': self._source_helper.backend['storage_unit'],
'target_volume': tgt_vol_id,
'target_system_id': tgt_stg_id
}]
pairData = {
pair_data = {
"volume_pairs": vol_pairs,
"type": "metro_mirror",
"options": ["permit_space_efficient_target",
"initial_copy_full"]
}
LOG.debug("Creating pprc pair, pairData is %s.", pairData)
self._source.create_pprc_pair(pairData)
self._source.wait_pprc_copy_finished([lun.ds_id], 'full_duplex')
LOG.debug("Creating pprc pair, pair_data is %s.", pair_data)
self._source_helper.create_pprc_pair(pair_data)
self._source_helper.wait_pprc_copy_finished([lun.ds_id], 'full_duplex')
LOG.info("The state of PPRC pair has become full_duplex.")
def delete_pprc_pairs(self, lun):
self._source.delete_pprc_pair(lun.ds_id)
self._source_helper.delete_pprc_pair(lun.ds_id)
if self.is_target_alive() and lun.replication_driver_data:
replica = sorted(lun.replication_driver_data.values())[0]
self._target.delete_pprc_pair(replica['vol_hex_id'])
self._target_helper.delete_pprc_pair(replica['vol_hex_id'])
def do_pprc_failover(self, luns, backend_id):
def do_pprc_failover(self, luns, is_group=False):
switch = luns[0].failed_over if is_group else False
src_helper, tgt_helper = (
(self._target_helper, self._source_helper) if switch else
(self._source_helper, self._target_helper))
vol_pairs = []
target_vol_ids = []
for lun in luns:
target_vol_id = (
lun.replication_driver_data[backend_id]['vol_hex_id'])
if not self._target.lun_exists(target_vol_id):
if not tgt_helper.lun_exists(lun.replica_ds_id):
LOG.info("Target volume %(volid)s doesn't exist in "
"DS8K %(storage)s.",
{'volid': target_vol_id,
'storage': self._target.backend['storage_unit']})
{'volid': lun.replica_ds_id,
'storage': tgt_helper.backend['storage_unit']})
continue
vol_pairs.append({
'source_volume': target_vol_id,
'source_system_id': self._target.backend['storage_unit'],
'source_volume': lun.replica_ds_id,
'source_system_id': tgt_helper.backend['storage_unit'],
'target_volume': lun.ds_id,
'target_system_id': self._source.backend['storage_unit']
'target_system_id': src_helper.backend['storage_unit']
})
target_vol_ids.append(target_vol_id)
target_vol_ids.append(lun.replica_ds_id)
pairData = {
pair_data = {
"volume_pairs": vol_pairs,
"type": "metro_mirror",
"options": ["failover"]
}
LOG.info("Begin to fail over to %s",
self._target.backend['storage_unit'])
self._target.create_pprc_pair(pairData)
self._target.wait_pprc_copy_finished(target_vol_ids,
'suspended', False)
LOG.info("Begin to fail over to %(backend)s, "
"pair_data is %(pair_data)s.",
{'backend': tgt_helper.backend['storage_unit'],
'pair_data': pair_data})
tgt_helper.create_pprc_pair(pair_data)
tgt_helper.wait_pprc_copy_finished(target_vol_ids,
'suspended', switch)
LOG.info("Failover from %(src)s to %(tgt)s is finished.", {
'src': self._source.backend['storage_unit'],
'tgt': self._target.backend['storage_unit']
'src': src_helper.backend['storage_unit'],
'tgt': tgt_helper.backend['storage_unit']
})
def do_pprc_failback(self, luns, backend_id):
pprc_ids = []
vol_ids = []
def get_pprc_pair_ids(self, luns, switch=False):
if not luns:
return None
src_helper, tgt_helper = (
(self._target_helper, self._source_helper) if switch else
(self._source_helper, self._target_helper))
pprc_pair_ids = []
for lun in luns:
target_vol_id = (
lun.replication_driver_data[backend_id]['vol_hex_id'])
if not self._target.lun_exists(target_vol_id):
if switch:
is_lun_exist = tgt_helper.lun_exists(lun.replica_ds_id)
else:
is_lun_exist = src_helper.lun_exists(lun.ds_id)
if not is_lun_exist:
LOG.info("Target volume %(volume)s doesn't exist in "
"DS8K %(storage)s.",
{'volume': lun.ds_id,
'storage': self._target.backend['storage_unit']})
{'volume': (lun.replica_ds_id
if switch else lun.ds_id),
'storage': (tgt_helper.backend['storage_unit']
if switch else
src_helper.backend['storage_unit'])})
continue
pprc_pair_ids.append(
src_helper.backend['storage_unit'] + '_' + lun.ds_id + ':' +
tgt_helper.backend['storage_unit'] + '_' + lun.replica_ds_id)
return pprc_pair_ids
pprc_id = (self._source.backend['storage_unit'] + '_' +
lun.ds_id + ':' +
self._target.backend['storage_unit'] +
'_' + target_vol_id)
pprc_ids.append(pprc_id)
vol_ids.append(lun.ds_id)
pairData = {"pprc_ids": pprc_ids,
"type": "metro_mirror",
"options": ["failback"]}
LOG.info("Begin to run failback in %s.",
self._source.backend['storage_unit'])
self._source.do_failback(pairData)
self._source.wait_pprc_copy_finished(vol_ids, 'full_duplex', False)
def do_pprc_failback(self, luns, is_group=False):
switch = luns[0].failed_over if is_group else False
bck_helper = self._target_helper if switch else self._source_helper
pair_data = {"pprc_ids": self.get_pprc_pair_ids(luns, switch),
"type": "metro_mirror",
"options": ["failback"]}
LOG.info("Begin to run failback in %(backend)s, "
"pair_data is %(pair_data)s.",
{'backend': bck_helper.backend['storage_unit'],
'pair_data': pair_data})
bck_helper.do_failback(pair_data)
lun_ids = [lun.ds_id for lun in luns]
bck_helper.wait_pprc_copy_finished(lun_ids, 'full_duplex', switch)
LOG.info("Run failback in %s is finished.",
self._source.backend['storage_unit'])
bck_helper.backend['storage_unit'])
class Replication(object):
@ -383,9 +406,10 @@ class Replication(object):
1.0.0 - initial revision.
2.1.0 - ignore exception during cleanup when creating or deleting
replica failed.
2.1.1 - Adding support for replication consistency group.
"""
VERSION = "2.1.0"
VERSION = "2.1.1"
def __init__(self, source_helper, target_device):
self._source_helper = source_helper
@ -401,11 +425,25 @@ class Replication(object):
err=(_("Param [connection_type] %s in replication_device "
"is invalid.") % connection_type))
self._target_helper.backend['lss_ids_for_cg'] = (
self._source_helper.backend['lss_ids_for_cg'])
if self._target_helper.backend['lss_ids_for_cg']:
if (len(self._target_helper.backend['lss_ids_for_cg']) !=
len(self._source_helper.backend['lss_ids_for_cg'])):
raise exception.VolumeDriverException(
message=_("Please reserve the same number of LSS for "
"secondary DS8K just as the primary DS8K."))
else:
self._target_helper.backend['lss_ids_for_cg'] = (
self._source_helper.backend['lss_ids_for_cg'])
self._mm_manager = MetroMirrorManager(self._source_helper,
self._target_helper)
def get_target_helper(self):
return self._target_helper
def get_source_helper(self):
return self._source_helper
def check_connection_type(self):
src_conn_type = self._source_helper.get_connection_type()
tgt_conn_type = self._target_helper.get_connection_type()
@ -420,19 +458,25 @@ class Replication(object):
def check_physical_links(self):
self._mm_manager.check_physical_links()
def switch_source_and_target(self, secondary_id, luns=None):
def switch_source_and_target_client(self):
# switch the helper in metro mirror manager
self._mm_manager.switch_source_and_target()
# switch the helper
self._source_helper, self._target_helper = (
self._target_helper, self._source_helper)
# switch the volume id
if luns:
for lun in luns:
backend = lun.replication_driver_data.get(secondary_id, None)
lun.replication_driver_data.update(
{secondary_id: {'vol_hex_id': lun.ds_id}})
lun.ds_id = backend['vol_hex_id']
def _switch_source_and_target_volume(self, luns, secondary_backend_id):
for lun in luns:
if secondary_backend_id == 'default':
backend_id = self._target_helper.backend['id']
lun.failed_over = False
else:
backend_id = 'default'
lun.failed_over = True
# secondary_id is never blank here.
lun.replication_driver_data = (
{backend_id: {'vol_hex_id': lun.ds_id}})
lun.ds_id, lun.replica_ds_id = lun.replica_ds_id, lun.ds_id
return luns
@proxy.logger
@ -455,10 +499,10 @@ class Replication(object):
return {'target': (tgt_pid, tgt_lss)}
@proxy.logger
def enable_replication(self, lun, delete_source=False):
def establish_replication(self, lun, delete_source=False):
state, lun.pool_lss_pair = (
self._mm_manager.find_from_pprc_paths(lun.ds_id[0:2]))
LOG.debug("enable_replication: pool_lss_pair is %s.",
LOG.debug("establish_replication: pool_lss_pair is %s.",
lun.pool_lss_pair)
if state == PPRC_PATH_UNHEALTHY:
raise restclient.APIException(
@ -479,7 +523,7 @@ class Replication(object):
try:
self._target_helper.create_lun(lun)
# create PPRC paths if need.
self._mm_manager.create_pprc_path(lun.pool_lss_pair)
self._mm_manager.create_pprc_path(lun)
# create pprc pair
self._mm_manager.create_pprc_pairs(lun)
except restclient.APIException:
@ -545,11 +589,35 @@ class Replication(object):
def create_pprc_pairs(self, lun):
self._mm_manager.create_pprc_pairs(lun)
def do_pprc_failover(self, luns, backend_id):
self._mm_manager.do_pprc_failover(luns, backend_id)
def start_host_pprc_failover(self, luns, backend_id):
self._mm_manager.do_pprc_failover(luns)
self.switch_source_and_target_client()
self._switch_source_and_target_volume(luns, backend_id)
def start_group_pprc_failover(self, luns, backend_id):
# unlike host failover, group failover needs to fetch changes from
# target volumes to source volumes after group is failed over.
self._mm_manager.do_pprc_failover(luns, True)
self._switch_source_and_target_volume(luns, backend_id)
sample_luns = self._get_sample_luns(luns)
for lun in sample_luns:
self._mm_manager.create_pprc_path(lun, True)
self._mm_manager.do_pprc_failback(luns, True)
def _get_sample_luns(self, luns):
# choose sample lun according to position.
sample_luns = []
positions = []
for lun in luns:
position = (lun.pool_lss_pair['source'][1],
lun.pool_lss_pair['target'][1])
if position not in positions:
sample_luns.append(lun)
positions.append(position)
return sample_luns
@proxy.logger
def start_pprc_failback(self, luns, backend_id):
def start_host_pprc_failback(self, luns, backend_id):
# check whether primary client is alive or not.
if not self._mm_manager.is_target_alive():
try:
@ -559,28 +627,72 @@ class Replication(object):
"please make sure it is back.")
LOG.error(msg)
raise exception.UnableToFailOver(reason=msg)
LOG.debug("Failback starts, backend id is %s.", backend_id)
for lun in luns:
self._mm_manager.create_pprc_path(lun.pool_lss_pair)
self._mm_manager.do_pprc_failback(luns, backend_id)
LOG.debug("Failback host starts, backend id is %s.", backend_id)
sample_luns = self._get_sample_luns(luns)
for lun in sample_luns:
self._mm_manager.create_pprc_path(lun)
self._mm_manager.do_pprc_failback(luns)
# revert the relationship of source volume and target volume
self.do_pprc_failover(luns, backend_id)
self.switch_source_and_target(backend_id, luns)
self._mm_manager.do_pprc_failback(luns, backend_id)
LOG.debug("Failback ends, backend id is %s.", backend_id)
self.start_host_pprc_failover(luns, backend_id)
self._mm_manager.do_pprc_failback(luns)
LOG.debug("Failback host ends, backend id is %s.", backend_id)
@proxy.logger
def failover_unreplicated_volume(self, lun):
provider_location = ast.literal_eval(lun.volume['provider_location'])
if 'old_status' in provider_location:
updates = {'status': provider_location['old_status']}
del provider_location['old_status']
updates['provider_location'] = six.text_type(provider_location)
def start_group_pprc_failback(self, luns, backend_id):
# NOTE: unlike failover host, after group is failed over,
# source and target clients are not swapped.
LOG.debug("Failback group starts, backend id is %s.", backend_id)
self.start_group_pprc_failover(luns, backend_id)
LOG.debug("Failback group ends, backend id is %s.", backend_id)
def _get_expected_luns(self, luns, state, ignored_state=None):
lun_ids = set(lun.ds_id for lun in luns)
min_lun_id = min(lun_ids)
max_lun_id = max(lun_ids)
if not luns[0].failed_over:
pairs = self._source_helper.get_pprc_pairs(min_lun_id, max_lun_id)
else:
provider_location['old_status'] = lun.status
updates = {
'status': 'error',
'provider_location': six.text_type(provider_location)
}
return {'volume_id': lun.os_id, 'updates': updates}
pairs = self._target_helper.get_pprc_pairs(min_lun_id, max_lun_id)
pairs = {pair['source_volume']['name']: pair for pair in pairs}
expected_luns = []
for lun in luns:
pair = pairs.get(lun.ds_id)
if pair:
if ignored_state and pair['state'] == ignored_state:
continue
elif pair['state'] != state:
raise exception.VolumeDriverException(
message=(_("Source volume %(id)s has wrong pprc pair "
"state %(invalid_state)s, expected one is "
"%(valid_state)s")
% {'id': pair['source_volume']['name'],
'invalid_state': pair['state'],
'valid_state': state}))
else:
raise exception.VolumeDriverException(
message=_("There is no PPRC pair for source volume "
"%s.") % lun.ds_id)
expected_luns.append(lun)
return expected_luns
@proxy.logger
def enable_replication(self, luns):
# after group is failed over, user can not enable replication.
if not luns:
return None
luns = self._get_expected_luns(luns, 'suspended', 'full_duplex')
pprc_pair_ids = self._mm_manager.get_pprc_pair_ids(luns)
LOG.debug("enable_replication: pprc_pair_ids is %s", pprc_pair_ids)
if pprc_pair_ids:
self._source_helper.resume_pprc_pairs(pprc_pair_ids)
@proxy.logger
def disable_replication(self, luns):
# after group is failed over, user can not disable replication.
if not luns:
return None
luns = self._get_expected_luns(luns, 'full_duplex', 'suspended')
pprc_pair_ids = self._mm_manager.get_pprc_pair_ids(luns)
LOG.debug("disable_replication: pprc_pair_ids is %s", pprc_pair_ids)
if pprc_pair_ids:
self._source_helper.pause_pprc_pairs(pprc_pair_ids)

View File

@ -0,0 +1,4 @@
---
features:
- |
Add replication consistency group support in DS8K cinder driver.