diff --git a/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py b/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py index a0ceece4467..f4ddec602c3 100644 --- a/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py +++ b/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py @@ -16,6 +16,7 @@ """Tests for the IBM DS8K family driver.""" import ast import copy +import ddt import eventlet import json import mock @@ -785,6 +786,7 @@ FAKE_ASSIGN_HOST_PORT_RESPONSE = FAKE_GENERIC_RESPONSE FAKE_DELETE_MAPPINGS_RESPONSE = FAKE_GENERIC_RESPONSE FAKE_DELETE_HOST_PORTS_RESPONSE = FAKE_GENERIC_RESPONSE FAKE_DELETE_HOSTS_RESPONSE = FAKE_GENERIC_RESPONSE +FAKE_PAUSE_RESPONSE = FAKE_GENERIC_RESPONSE FAKE_REST_API_RESPONSES = { TEST_SOURCE_DS8K_IP + '/get': @@ -879,6 +881,10 @@ FAKE_REST_API_RESPONSES = { FAKE_FAILBACK_RESPONSE, TEST_TARGET_DS8K_IP + '/cs/pprcs/resume/post': FAKE_FAILBACK_RESPONSE, + TEST_SOURCE_DS8K_IP + '/cs/pprcs/pause/post': + FAKE_PAUSE_RESPONSE, + TEST_TARGET_DS8K_IP + '/cs/pprcs/pause/post': + FAKE_PAUSE_RESPONSE, TEST_SOURCE_DS8K_IP + '/cs/flashcopies/post': FAKE_POST_FLASHCOPIES_RESPONSE, TEST_SOURCE_DS8K_IP + '/cs/flashcopies/unfreeze/post': @@ -948,6 +954,9 @@ class FakeDS8KCommonHelper(helper.DS8KCommonHelper): self._storage_pools = None self.backend = {} self.setup() + self._existing_pool_ids = [TEST_POOL_ID_1, + TEST_POOL_ID_2, + TEST_ECKD_POOL_ID] def _get_value(self, key): value = getattr(self.conf, key, None) @@ -1044,12 +1053,13 @@ class FakeDS8KProxy(ds8kproxy.DS8KProxy): def _do_replication_setup(self, devices, src_helper): self._replication = FakeReplication(src_helper, devices[0]) if self._active_backend_id: - self._switch_backend_connection(self._active_backend_id) + self._replication.switch_source_and_target_client() else: self._replication.check_physical_links() self._replication_enabled = True +@ddt.ddt class DS8KProxyTest(test.TestCase): """Test proxy for DS8K volume driver.""" @@ -1209,6 +1219,8 @@ class DS8KProxyTest(test.TestCase): "free_capacity_gb": 10, "reserved_percentage": 0, "consistent_group_snapshot_enabled": True, + "group_replication_enabled": True, + "consistent_group_replication_enabled": True, "multiattach": False } @@ -1429,23 +1441,6 @@ class DS8KProxyTest(test.TestCase): # error group will be ignored, so LSS 20 can be used. self.assertEqual(lss, '20') - def test_create_volume_and_assign_to_group_with_wrong_host(self): - # create volume for group which has wrong format of host. - self.configuration.lss_range_for_cg = '20-23' - self.driver = FakeDS8KProxy(self.storage_info, self.logger, - self.exception, self) - self.driver.setup(self.ctxt) - group_type = group_types.create( - self.ctxt, - 'group', - {'consistent_group_snapshot_enabled': ' True'} - ) - group = self._create_group(host="fake_invalid_host", - group_type_id=group_type.id) - volume = self._create_volume(group_id=group.id) - self.assertRaises(exception.VolumeDriverException, - self.driver.create_volume, volume) - @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') def test_create_volume_but_lss_full_afterwards(self, mock_create_lun): """create volume in a LSS which is full afterwards.""" @@ -1629,7 +1624,8 @@ class DS8KProxyTest(test.TestCase): @mock.patch.object(proxy.IBMStorageProxy, '__init__') @mock.patch.object(replication, 'Replication') - @mock.patch.object(ds8kproxy.DS8KProxy, '_switch_backend_connection') + @mock.patch.object(replication.Replication, + 'switch_source_and_target_client') def test_switch_backend_connection(self, mock_switch_connection, mock_replication, mock_proxy_init): """driver should switch connection if it has been failed over.""" @@ -1988,11 +1984,12 @@ class DS8KProxyTest(test.TestCase): self.assertRaises(restclient.APIException, self.driver.create_cloned_volume, tgt_vol, src_vol) + @mock.patch.object(eventlet, 'sleep') @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') @mock.patch.object(helper.DS8KCommonHelper, 'lun_exists') @mock.patch.object(helper.DS8KCommonHelper, 'create_lun') def test_create_cloned_volume5(self, mock_create_lun, mock_lun_exists, - mock_get_flashcopy): + mock_get_flashcopy, mock_sleep): """clone a volume when target has volume ID but it is nonexistent.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) @@ -2714,6 +2711,84 @@ class DS8KProxyTest(test.TestCase): self.driver.create_group, self.ctxt, group) + def test_create_generic_group_not_implemented(self): + """create generic group is not implemented.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group' + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + self.assertRaises(NotImplementedError, + self.driver.create_group, + self.ctxt, group) + + def test_create_replication_cg_should_verify_volume_types(self): + """Cannot put non-replication volume type into replication cg.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {}) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_replication_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + volume_type_ids=[vol_type.id]) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_group, + self.ctxt, group) + + @ddt.data({'bundle_version': "5.7.51.1067"}, + {'bundle_version': "5.8.20.1058"}) + @mock.patch.object(helper.DS8KCommonHelper, '_get_version') + def test_create_replication_consisgroup_should_verify_rest_version( + self, rest_version, mock_get_version): + """Driver should verify whether does REST support pprc cg or not.""" + self.configuration.lss_range_for_cg = '20-23' + mock_get_version.return_value = rest_version + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + vol_type = volume_types.create( + self.ctxt, 'VOL_TYPE', {'replication_enabled': ' True'}) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_replication_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + volume_type_ids=[vol_type.id]) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_group, + self.ctxt, group) + + def test_create_consistency_group_without_reserve_lss(self): + """user should reserve LSS for group if it enables cg.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_group, self.ctxt, group) + def test_delete_consistency_group_sucessfully(self): """test a successful consistency group deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -2759,24 +2834,60 @@ class DS8KProxyTest(test.TestCase): self.assertEqual(fields.GroupStatus.ERROR_DELETING, model_update['status']) - def test_create_consistency_group_without_reserve_lss(self): - """user should reserve LSS for group if it enables cg.""" + def test_delete_replication_group_is_not_implemented(self): + """delete replication group is not implemented.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - group_type = group_types.create( self.ctxt, 'group', - {'consistent_group_snapshot_enabled': ' True'} + {'group_replication_enabled': ' True'} ) group = self._create_group(host=TEST_GROUP_HOST, group_type_id=group_type.id) - self.assertRaises(exception.VolumeDriverException, - self.driver.create_group, self.ctxt, group) - def test_update_generic_group_without_enable_cg(self): - """update group which not enable cg should return None.""" + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps({'vol_hex_id': TEST_VOLUME_ID}) + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + group_id=group.id) + self.assertRaises(NotImplementedError, + self.driver.delete_group, + self.ctxt, group, [volume]) + + def test_update_replication_group_is_not_implemented(self): + """update replication group is not implemented.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'group_replication_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps({'vol_hex_id': TEST_VOLUME_ID}) + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + group_id=group.id) + self.assertRaises(NotImplementedError, + self.driver.update_group, + self.ctxt, group, [volume], []) + + def test_update_generic_group_is_not_implemented(self): + """update group which not enable cg is not implemented.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) @@ -2786,11 +2897,9 @@ class DS8KProxyTest(test.TestCase): group_type_id=group_type.id) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) volume = self._create_volume(provider_location=location) - model_update, add_volumes_update, remove_volumes_update = ( - self.driver.update_group(self.ctxt, group, [volume], [])) - self.assertIsNone(model_update) - self.assertIsNone(add_volumes_update) - self.assertIsNone(remove_volumes_update) + self.assertRaises(NotImplementedError, + self.driver.update_group, + self.ctxt, group, [volume], []) @mock.patch.object(eventlet, 'sleep') @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') @@ -2868,9 +2977,8 @@ class DS8KProxyTest(test.TestCase): location = ast.literal_eval(add_volumes_update[0]['provider_location']) self.assertEqual('2200', location['vol_hex_id']) - @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') - def test_delete_generic_group_failed(self, mock_delete_lun): - """test a failed group deletion.""" + def test_delete_generic_group_not_implemented(self): + """delete generic group but it is not implemented.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) @@ -2880,29 +2988,9 @@ class DS8KProxyTest(test.TestCase): volume = self._create_volume(group_type_id=group_type.id, provider_location=location, group_id=group.id) - mock_delete_lun.side_effect = ( - restclient.APIException('delete volume failed.')) - model_update, volumes_model_update = ( - self.driver.delete_group(self.ctxt, group, [volume])) - self.assertEqual('error_deleting', volumes_model_update[0]['status']) - self.assertEqual(fields.GroupStatus.ERROR_DELETING, - model_update['status']) - - def test_delete_generic_group_sucessfully(self): - """test a successful generic group deletion.""" - self.driver = FakeDS8KProxy(self.storage_info, self.logger, - self.exception, self) - self.driver.setup(self.ctxt) - group_type = group_types.create(self.ctxt, 'CG', {}) - group = self._create_group(group_type_id=group_type.id) - location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(group_type_id=group_type.id, - provider_location=location, - group_id=group.id) - model_update, volumes_model_update = ( - self.driver.delete_group(self.ctxt, group, [volume])) - self.assertEqual('deleted', volumes_model_update[0]['status']) - self.assertEqual(fields.GroupStatus.DELETED, model_update['status']) + self.assertRaises(NotImplementedError, + self.driver.delete_group, + self.ctxt, group, [volume]) @mock.patch.object(eventlet, 'sleep') @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') @@ -3074,6 +3162,38 @@ class DS8KProxyTest(test.TestCase): self.assertEqual(fields.GroupStatus.AVAILABLE, model_update['status']) + def test_create_group_from_generic_group(self): + """create group from generic group is not implemented.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'group_replication_enabled': ' True'} + ) + src_group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps({'vol_hex_id': TEST_VOLUME_ID}) + src_volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + group_id=src_group.id) + + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + + self.assertRaises(NotImplementedError, + self.driver.create_group_from_src, + self.ctxt, group, [volume], + None, None, src_group, [src_volume]) + @mock.patch.object(eventlet, 'sleep') @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') def test_failover_host_successfully(self, mock_get_pprc_pairs, mock_sleep): @@ -3100,8 +3220,91 @@ class DS8KProxyTest(test.TestCase): self.ctxt, [volume], TEST_TARGET_DS8K_IP, []) self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id) - @mock.patch.object(replication.Replication, 'do_pprc_failover') - def test_failover_host_failed(self, mock_do_pprc_failover): + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_failover_host_with_group(self, mock_get_pprc_pairs, mock_sleep): + """Failover host with group.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'group_replication_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='enabled') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id) + pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs[0]['state'] = 'suspended' + mock_get_pprc_pairs.side_effect = [pprc_pairs] + secondary_id, volume_update_list, group_update_list = ( + self.driver.failover_host(self.ctxt, [volume], + TEST_TARGET_DS8K_IP, [group])) + self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id) + volume_update = volume_update_list[0] + self.assertEqual(volume_update['volume_id'], volume.id) + self.assertEqual(fields.ReplicationStatus.FAILED_OVER, + volume_update['updates']['replication_status']) + group_update = group_update_list[0] + self.assertEqual(group_update['group_id'], group.id) + self.assertEqual(fields.ReplicationStatus.FAILED_OVER, + group_update['updates']['replication_status']) + + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_failover_host_with_group_failed_over(self, mock_get_pprc_pairs, + mock_sleep): + """Failover host with group that has been failed over.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'group_replication_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='failed-over') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {'default': {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id) + pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs[0]['state'] = 'suspended' + mock_get_pprc_pairs.side_effect = [pprc_pairs] + secondary_id, volume_update_list, group_update_list = ( + self.driver.failover_host(self.ctxt, [volume], + TEST_TARGET_DS8K_IP, [group])) + self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id) + self.assertEqual(volume_update_list, []) + self.assertEqual(group_update_list, []) + + @mock.patch.object(replication.Replication, 'start_host_pprc_failover') + def test_failover_host_failed(self, mock_host_pprc_failover): """Failover host should raise exception when failed.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -3119,7 +3322,7 @@ class DS8KProxyTest(test.TestCase): replication_driver_data=data, volume_metadata=metadata) - mock_do_pprc_failover.side_effect = ( + mock_host_pprc_failover.side_effect = ( restclient.APIException('failed to do failover.')) self.assertRaises(exception.UnableToFailOver, self.driver.failover_host, self.ctxt, @@ -3155,7 +3358,7 @@ class DS8KProxyTest(test.TestCase): {'replication_enabled': ' True'}) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) data = json.dumps( - {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + {'default': {'vol_hex_id': TEST_VOLUME_ID}}) volume = self._create_volume(volume_type_id=vol_type.id, provider_location=location, replication_driver_data=data) @@ -3175,7 +3378,7 @@ class DS8KProxyTest(test.TestCase): {'replication_enabled': ' True'}) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) data = json.dumps( - {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + {'default': {'vol_hex_id': TEST_VOLUME_ID}}) volume = self._create_volume(volume_type_id=vol_type.id, provider_location=location, replication_driver_data=data) @@ -3184,42 +3387,6 @@ class DS8KProxyTest(test.TestCase): self.assertIsNone(secondary_id) self.assertEqual([], volume_update_list) - def test_failover_host_which_only_has_unreplicated_volume(self): - """Failover host which only has unreplicated volume.""" - self.configuration.replication_device = [TEST_REPLICATION_DEVICE] - self.driver = FakeDS8KProxy(self.storage_info, self.logger, - self.exception, self) - self.driver.setup(self.ctxt) - - vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {}) - location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=vol_type.id, - provider_location=location) - secondary_id, volume_update_list, __ = self.driver.failover_host( - self.ctxt, [volume], TEST_TARGET_DS8K_IP, []) - self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id) - self.assertEqual('error', volume_update_list[0]['updates']['status']) - - def test_failback_should_recover_status_of_unreplicated_volume(self): - """Failback host should recover the status of unreplicated volume.""" - self.configuration.replication_device = [TEST_REPLICATION_DEVICE] - self.driver = FakeDS8KProxy(self.storage_info, self.logger, - self.exception, self, TEST_TARGET_DS8K_IP) - self.driver.setup(self.ctxt) - - vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', {}) - location = six.text_type({ - 'vol_hex_id': TEST_VOLUME_ID, - 'old_status': 'available' - }) - volume = self._create_volume(volume_type_id=vol_type.id, - provider_location=location) - secondary_id, volume_update_list, __ = self.driver.failover_host( - self.ctxt, [volume], 'default', []) - self.assertEqual('default', secondary_id) - self.assertEqual('available', - volume_update_list[0]['updates']['status']) - @mock.patch.object(eventlet, 'sleep') @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') def test_failback_host_successfully(self, mock_get_pprc_pairs, mock_sleep): @@ -3233,7 +3400,7 @@ class DS8KProxyTest(test.TestCase): {'replication_enabled': ' True'}) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) data = json.dumps( - {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + {'default': {'vol_hex_id': TEST_VOLUME_ID}}) metadata = [{'key': 'data_type', 'value': 'FB 512'}] volume = self._create_volume(volume_type_id=vol_type.id, provider_location=location, @@ -3249,8 +3416,8 @@ class DS8KProxyTest(test.TestCase): self.ctxt, [volume], 'default', []) self.assertEqual('default', secondary_id) - @mock.patch.object(replication.Replication, 'start_pprc_failback') - def test_failback_host_failed(self, mock_start_pprc_failback): + @mock.patch.object(replication.Replication, 'start_host_pprc_failback') + def test_failback_host_failed(self, mock_start_host_pprc_failback): """Failback host should raise exception when failed.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -3261,12 +3428,401 @@ class DS8KProxyTest(test.TestCase): {'replication_enabled': ' True'}) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) data = json.dumps( - {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + {'default': {'vol_hex_id': TEST_VOLUME_ID}}) volume = self._create_volume(volume_type_id=vol_type.id, provider_location=location, replication_driver_data=data) - mock_start_pprc_failback.side_effect = ( + mock_start_host_pprc_failback.side_effect = ( restclient.APIException('failed to do failback.')) self.assertRaises(exception.UnableToFailOver, self.driver.failover_host, self.ctxt, [volume], 'default', []) + + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_enable_replication_successfully(self, mock_get_pprc_pairs): + """Enable replication for the group successfully.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='disabled') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id, + replication_status='disabled') + pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs[0]['state'] = 'suspended' + mock_get_pprc_pairs.side_effect = [pprc_pairs] + model_update, volumes_update_list = self.driver.enable_replication( + self.ctxt, group, [volume]) + self.assertEqual(fields.ReplicationStatus.ENABLED, + model_update.get('replication_status')) + for volume_update in volumes_update_list: + self.assertEqual(fields.ReplicationStatus.ENABLED, + volume_update.get('replication_status')) + + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_enable_replication_if_pprc_in_invalid_state( + self, mock_get_pprc_pairs): + """Enable replication but pprc relationship is in invalid state.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='disabled') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id, + replication_status='disabled') + pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs[0]['state'] = 'invalid' + mock_get_pprc_pairs.side_effect = [pprc_pairs] + self.assertRaises(exception.VolumeDriverException, + self.driver.enable_replication, + self.ctxt, group, [volume]) + + @mock.patch.object(helper.DS8KCommonHelper, 'resume_pprc_pairs') + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_enable_replication_but_resume_fails(self, mock_get_pprc_pairs, + mock_resume_pprc_pairs): + """Enable replication but resume fails.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='disabled') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id, + replication_status='disabled') + pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs[0]['state'] = 'suspended' + mock_get_pprc_pairs.side_effect = [pprc_pairs] + mock_resume_pprc_pairs.side_effect = ( + restclient.APIException('failed to resume replication.')) + self.assertRaises(exception.VolumeDriverException, + self.driver.enable_replication, + self.ctxt, group, [volume]) + + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_disable_replication_successfully(self, mock_get_pprc_pairs): + """Disable replication for the group successfully.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='enabled') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id, + replication_status='enabled') + pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs[0]['state'] = 'full_duplex' + mock_get_pprc_pairs.side_effect = [pprc_pairs] + model_update, volumes_update_list = self.driver.disable_replication( + self.ctxt, group, [volume]) + self.assertEqual(fields.ReplicationStatus.DISABLED, + model_update.get('replication_status')) + for volume_update in volumes_update_list: + self.assertEqual(fields.ReplicationStatus.DISABLED, + volume_update.get('replication_status')) + + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_disable_replication_if_pprc_in_invalid_state( + self, mock_get_pprc_pairs): + """Disable replication but pprc relationship is in invalid state.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='enabled') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id, + replication_status='enabled') + pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs[0]['state'] = 'invalid' + mock_get_pprc_pairs.side_effect = [pprc_pairs] + self.assertRaises(exception.VolumeDriverException, + self.driver.disable_replication, + self.ctxt, group, [volume]) + + @mock.patch.object(helper.DS8KCommonHelper, 'pause_pprc_pairs') + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_disable_replication_but_pause_fails(self, mock_get_pprc_pairs, + mock_pause_pprc_pairs): + """Disable replication but pause fails.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='enabled') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id, + replication_status='enabled') + pprc_pairs = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs[0]['state'] = 'full_duplex' + mock_get_pprc_pairs.side_effect = [pprc_pairs] + mock_pause_pprc_pairs.side_effect = ( + restclient.APIException('failed to pause replication.')) + self.assertRaises(exception.VolumeDriverException, + self.driver.disable_replication, + self.ctxt, group, [volume]) + + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_pairs') + def test_failover_group_successfully(self, mock_get_pprc_pairs, + mock_sleep): + """Failover group to valid secondary successfully.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id) + pprc_pairs_1 = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs_1[0]['state'] = 'suspended' + pprc_pairs_2 = copy.deepcopy(FAKE_GET_PPRCS_RESPONSE['data']['pprcs']) + pprc_pairs_2[0]['state'] = 'full_duplex' + mock_get_pprc_pairs.side_effect = [pprc_pairs_1, pprc_pairs_2] + model_update, volumes_update_list = self.driver.failover_replication( + self.ctxt, group, [volume], TEST_TARGET_DS8K_IP) + self.assertEqual(fields.ReplicationStatus.FAILED_OVER, + model_update.get('replication_status')) + for volume_update in volumes_update_list: + self.assertEqual(fields.ReplicationStatus.FAILED_OVER, + volume_update.get('replication_status')) + + @mock.patch.object(replication.Replication, 'start_group_pprc_failover') + def test_failover_group_failed(self, mock_group_pprc_failover): + """Failover group should raise exception when failed.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata, + group_id=group.id) + + mock_group_pprc_failover.side_effect = ( + restclient.APIException('failed to failover group.')) + self.assertRaises(exception.VolumeDriverException, + self.driver.failover_replication, self.ctxt, + group, [volume], TEST_TARGET_DS8K_IP) + + def test_failover_group_to_invalid_target(self): + """Failover group to invalid secondary should fail.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + group_id=group.id) + self.assertRaises(exception.InvalidReplicationTarget, + self.driver.failover_replication, self.ctxt, + group, [volume], 'fake_target') + + def test_failover_group_that_has_been_failed_over(self): + """Failover group that has been failed over should just return.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='failed-over') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {'default': {'vol_hex_id': TEST_VOLUME_ID}}) + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + group_id=group.id, + replication_status='failed-over') + model_update, volumes_update_list = self.driver.failover_replication( + self.ctxt, group, [volume], TEST_TARGET_DS8K_IP) + self.assertEqual({}, model_update) + self.assertEqual([], volumes_update_list) + + def test_failback_group_that_has_been_failed_back(self): + """Failback group that has been failed back should just return.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self, TEST_TARGET_DS8K_IP) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id, + replication_status='enabled') + vol_type = volume_types.create(self.ctxt, 'VOL_TYPE', + {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + group_id=group.id, + replication_status='available') + model_update, volume_update_list = self.driver.failover_replication( + self.ctxt, group, [volume], 'default') + self.assertEqual({}, model_update) + self.assertEqual([], volume_update_list) diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py index 59c412d560f..d1090b5078a 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py @@ -62,8 +62,9 @@ class DS8KCommonHelper(object): OPTIONAL_PARAMS = ['ds8k_host_type', 'lss_range_for_cg'] # if use new REST API, please update the version below VALID_REST_VERSION_5_7_MIN = '5.7.51.1047' - VALID_REST_VERSION_5_8_MIN = '' INVALID_STORAGE_VERSION = '8.0.1' + REST_VERSION_5_7_MIN_PPRC_CG = '5.7.51.1068' + REST_VERSION_5_8_MIN_PPRC_CG = '5.8.20.1059' def __init__(self, conf, HTTPConnectorObject=None): self.conf = conf @@ -111,8 +112,8 @@ class DS8KCommonHelper(object): self.backend['pools_str'] = self._get_value('san_clustername') self._storage_pools = self.get_pools() self.verify_pools(self._storage_pools) - self._get_lss_ids_for_cg() - self._verify_version() + self.backend['lss_ids_for_cg'] = self._get_lss_ids_for_cg() + self._verify_rest_version() def update_client(self): self._client.close() @@ -160,6 +161,7 @@ class DS8KCommonHelper(object): self.backend['storage_version'] = storage_info['release'] def _get_lss_ids_for_cg(self): + lss_ids_for_cg = set() lss_range = self._get_value('lss_range_for_cg') if lss_range: lss_range = lss_range.replace(' ', '').split('-') @@ -173,10 +175,9 @@ class DS8KCommonHelper(object): raise exception.InvalidParameterValue( err=_('Param [lss_range_for_cg] is invalid, it ' 'should be within 00-FF.')) - self.backend['lss_ids_for_cg'] = set( + lss_ids_for_cg = set( ('%02x' % i).upper() for i in range(begin, end + 1)) - else: - self.backend['lss_ids_for_cg'] = set() + return lss_ids_for_cg def _check_host_type(self): ds8k_host_type = self._get_value('ds8k_host_type') @@ -189,7 +190,7 @@ class DS8KCommonHelper(object): self.backend['host_type_override'] = ( None if ds8k_host_type == 'auto' else ds8k_host_type) - def _verify_version(self): + def _verify_rest_version(self): if self.backend['storage_version'] == self.INVALID_STORAGE_VERSION: raise exception.VolumeDriverException( message=(_("%s does not support bulk deletion of volumes, " @@ -205,6 +206,28 @@ class DS8KCommonHelper(object): % {'invalid': self.backend['rest_version'], 'valid': self.VALID_REST_VERSION_5_7_MIN})) + def verify_rest_version_for_pprc_cg(self): + if '8.1' in self.backend['rest_version']: + raise exception.VolumeDriverException( + message=_("REST for DS8K 8.1 does not support PPRC " + "consistency group, please upgrade the CCL.")) + valid_rest_version = None + if ('5.7' in self.backend['rest_version'] and + dist_version.LooseVersion(self.backend['rest_version']) < + dist_version.LooseVersion(self.REST_VERSION_5_7_MIN_PPRC_CG)): + valid_rest_version = self.REST_VERSION_5_7_MIN_PPRC_CG + elif ('5.8' in self.backend['rest_version'] and + dist_version.LooseVersion(self.backend['rest_version']) < + dist_version.LooseVersion(self.REST_VERSION_5_8_MIN_PPRC_CG)): + valid_rest_version = self.REST_VERSION_5_8_MIN_PPRC_CG + + if valid_rest_version: + raise exception.VolumeDriverException( + message=(_("REST version %(invalid)s is lower than " + "%(valid)s, please upgrade it in DS8K.") + % {'invalid': self.backend['rest_version'], + 'valid': valid_rest_version})) + def verify_pools(self, storage_pools): if self._connection_type == storage.XIV_CONNECTION_TYPE_FC: ptype = 'fb' @@ -821,14 +844,14 @@ class DS8KCommonHelper(object): def delete_pprc_path(self, path_id): self._client.send('DELETE', '/cs/pprcs/paths/%s' % path_id) - def create_pprc_pair(self, pairData): - self._client.send('POST', '/cs/pprcs', pairData) + def create_pprc_pair(self, pair_data): + self._client.send('POST', '/cs/pprcs', pair_data) def delete_pprc_pair_by_pair_id(self, pids): self._client.statusok('DELETE', '/cs/pprcs', params=pids) - def do_failback(self, pairData): - self._client.send('POST', '/cs/pprcs/resume', pairData) + def do_failback(self, pair_data): + self._client.send('POST', '/cs/pprcs/resume', pair_data) def get_pprc_pairs(self, min_vol_id, max_vol_id): return self._client.fetchall( @@ -844,14 +867,27 @@ class DS8KCommonHelper(object): return None # don't use pprc pair ID to delete it, because it may have # communication issues. - pairData = { + pair_data = { 'volume_full_ids': [{ 'volume_id': vol_id, 'system_id': self.backend['storage_unit'] }], 'options': ['unconditional', 'issue_source'] } - self._client.send('POST', '/cs/pprcs/delete', pairData) + self._client.send('POST', '/cs/pprcs/delete', pair_data) + + def pause_pprc_pairs(self, pprc_pair_ids): + pair_data = {'pprc_ids': pprc_pair_ids} + self._client.send('POST', '/cs/pprcs/pause', pair_data) + + def resume_pprc_pairs(self, pprc_pair_ids): + pair_data = { + 'pprc_ids': pprc_pair_ids, + 'type': 'metro_mirror', + 'options': ['permit_space_efficient_target', + 'initial_copy_out_of_sync'] + } + self._client.send('POST', '/cs/pprcs/resume', pair_data) class DS8KReplicationSourceHelper(DS8KCommonHelper): @@ -890,18 +926,19 @@ class DS8KReplicationSourceHelper(DS8KCommonHelper): class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): """Manage target storage for replication.""" - OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs'] + OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs', 'lss_range_for_cg'] def setup(self): self._create_client() self._get_storage_information() self._get_replication_information() self._check_host_type() + self.backend['lss_ids_for_cg'] = self._get_lss_ids_for_cg() self.backend['pools_str'] = self._get_value( 'san_clustername').replace('_', ',') self._storage_pools = self.get_pools() self.verify_pools(self._storage_pools) - self._verify_version() + self._verify_rest_version() def _get_replication_information(self): port_pairs = [] @@ -917,20 +954,6 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): self.backend['port_pairs'] = port_pairs self.backend['id'] = self._get_value('backend_id') - @proxy.logger - def _find_lss_for_type_replication(self, node, excluded_lss): - # prefer to choose non-existing one first. - existing_lss = self.get_all_lss() - LOG.info("existing LSS IDs are %s", - ','.join([lss['id'] for lss in existing_lss])) - lss_id = self._find_from_nonexistent_lss(node, existing_lss) - if not lss_id: - if excluded_lss: - existing_lss = [lss for lss in existing_lss - if lss['id'] not in excluded_lss] - lss_id = self._find_from_existing_lss(node, existing_lss) - return lss_id - def create_lun(self, lun): volData = { 'cap': self._gb2b(lun.size), @@ -952,14 +975,14 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): def delete_pprc_pair(self, vol_id): if not self.get_pprc_pairs(vol_id, vol_id): return None - pairData = { + pair_data = { 'volume_full_ids': [{ 'volume_id': vol_id, 'system_id': self.backend['storage_unit'] }], 'options': ['unconditional', 'issue_target'] } - self._client.send('POST', '/cs/pprcs/delete', pairData) + self._client.send('POST', '/cs/pprcs/delete', pair_data) class DS8KECKDHelper(DS8KCommonHelper): @@ -999,16 +1022,16 @@ class DS8KECKDHelper(DS8KCommonHelper): self._create_client() self._get_storage_information() self._check_host_type() - self._get_lss_ids_for_cg() + self.backend['lss_ids_for_cg'] = self._get_lss_ids_for_cg() self.backend['pools_str'] = self._get_value('san_clustername') self._storage_pools = self.get_pools() self.verify_pools(self._storage_pools) ssid_prefix = self._get_value('ds8k_ssid_prefix') self.backend['ssid_prefix'] = ssid_prefix if ssid_prefix else 'FF' self.backend['device_mapping'] = self._get_device_mapping() - self._verify_version() + self._verify_rest_version() - def _verify_version(self): + def _verify_rest_version(self): if self.backend['storage_version'] == self.INVALID_STORAGE_VERSION: raise exception.VolumeDriverException( message=(_("%s does not support bulk deletion of volumes, " @@ -1034,6 +1057,7 @@ class DS8KECKDHelper(DS8KCommonHelper): in self.backend['rest_version'] else self.VALID_REST_VERSION_5_8_MIN)})) + @proxy.logger def _get_device_mapping(self): map_str = self._get_value('ds8k_devadd_unitadd_mapping') mappings = map_str.replace(' ', '').upper().split(';') @@ -1198,6 +1222,7 @@ class DS8KReplicationTargetECKDHelper(DS8KECKDHelper, self._get_storage_information() self._get_replication_information() self._check_host_type() + self.backend['lss_ids_for_cg'] = self._get_lss_ids_for_cg() self.backend['pools_str'] = self._get_value( 'san_clustername').replace('_', ',') self._storage_pools = self.get_pools() @@ -1205,7 +1230,7 @@ class DS8KReplicationTargetECKDHelper(DS8KECKDHelper, ssid_prefix = self._get_value('ds8k_ssid_prefix') self.backend['ssid_prefix'] = ssid_prefix if ssid_prefix else 'FF' self.backend['device_mapping'] = self._get_device_mapping() - self._verify_version() + self._verify_rest_version() def create_lun(self, lun): volData = { diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py index 37a221b7bed..10318d52c71 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py @@ -57,7 +57,6 @@ connection_type = fibre_channel """ import ast -import collections import json import six @@ -135,9 +134,10 @@ class Lun(object): 1.0.0 - initial revision. 2.1.0 - Added support for specify pool and lss, also improve the code. + 2.1.1 - Added support for replication consistency group. """ - VERSION = "2.1.0" + VERSION = "2.1.1" class FakeLun(object): @@ -226,6 +226,7 @@ class Lun(object): self.replica_ds_name = ( "OS%s:%s" % ('Replica', helper.filter_alnum(self.cinder_name)) )[:16] + self.previous_status = volume.previous_status self.replication_status = volume.replication_status self.replication_driver_data = ( json.loads(volume.replication_driver_data) @@ -234,12 +235,20 @@ class Lun(object): # now only support one replication target. replication_target = sorted( self.replication_driver_data.values())[0] - replica_id = replication_target['vol_hex_id'] + self.replica_ds_id = replication_target['vol_hex_id'] self.pool_lss_pair = { 'source': (None, self.ds_id[0:2]), - 'target': (None, replica_id[0:2]) + 'target': (None, self.replica_ds_id[0:2]) } - + # Don't use self.replication_status to judge if volume has + # been failed over or not, because when user fail over a + # group, replication_status of each volume in group is + # failing over. + self.failed_over = (True if 'default' in + self.replication_driver_data.keys() + else False) + else: + self.failed_over = False if os400: if os400 not in VALID_OS400_VOLUME_TYPES.keys(): raise restclient.APIException( @@ -295,7 +304,6 @@ class Lun(object): volume_update = {} volume_update['provider_location'] = six.text_type( {'vol_hex_id': self.ds_id}) - # update metadata if self.is_snapshot: metadata = self._get_snapshot_metadata(self.volume) @@ -308,7 +316,9 @@ class Lun(object): metadata.pop('replication', None) volume_update['replication_driver_data'] = json.dumps( self.replication_driver_data) - volume_update['replication_status'] = self.replication_status + volume_update['replication_status'] = ( + self.replication_status or + fields.ReplicationStatus.NOT_CAPABLE) metadata['data_type'] = (self.data_type if self.data_type else metadata['data_type']) @@ -328,11 +338,23 @@ class Group(object): def __init__(self, group, is_snapshot=False): self.id = group.id self.host = group.host + self.consisgroup_snapshot_enabled = ( + utils.is_group_a_cg_snapshot_type(group)) + self.group_replication_enabled = ( + utils.is_group_a_type(group, + "group_replication_enabled")) + self.consisgroup_replication_enabled = ( + utils.is_group_a_type(group, + "consistent_group_replication_enabled")) if is_snapshot: self.snapshots = group.snapshots else: + self.failed_over = ( + group.replication_status == + fields.ReplicationStatus.FAILED_OVER) + # create_volume needs to check volumes in the group, + # so get it from volume.group object. self.volumes = group.volumes - self.consisgroup_enabled = utils.is_group_a_cg_snapshot_type(group) class DS8KProxy(proxy.IBMStorageProxy): @@ -388,16 +410,9 @@ class DS8KProxy(proxy.IBMStorageProxy): self._replication.check_physical_links() self._replication.check_connection_type() if self._active_backend_id: - self._switch_backend_connection(self._active_backend_id) + self._replication.switch_source_and_target_client() self._replication_enabled = True - @proxy.logger - def _switch_backend_connection(self, backend_id, repl_luns=None): - repl_luns = self._replication.switch_source_and_target(backend_id, - repl_luns) - self._helper = self._replication._source_helper - return repl_luns - @staticmethod def _b2gb(b): return b // (2 ** 30) @@ -431,6 +446,8 @@ class DS8KProxy(proxy.IBMStorageProxy): sum(p['capavail'] for p in storage_pools.values())), "reserved_percentage": self.configuration.reserved_percentage, "consistent_group_snapshot_enabled": True, + "group_replication_enabled": True, + "consistent_group_replication_enabled": True, "multiattach": False } @@ -455,7 +472,7 @@ class DS8KProxy(proxy.IBMStorageProxy): LOG.error(msg) raise exception.VolumeDriverException(message=msg) if lun.type_replication: - target_helper = self._replication._target_helper + target_helper = self._replication.get_target_helper() # PPRC can not copy from ESE volume to standard volume # or vice versa. if target_helper.get_thin_provision(): @@ -473,10 +490,10 @@ class DS8KProxy(proxy.IBMStorageProxy): lun.pool_lss_pair = { 'source': self._find_pool_lss_pair_from_spec( lun, excluded_lss)} - elif lun.group and lun.group.consisgroup_enabled: - lun.pool_lss_pair = { - 'source': self._find_pool_lss_pair_for_cg( - lun, excluded_lss)} + elif lun.group and (lun.group.consisgroup_snapshot_enabled or + lun.group.consisgroup_replication_enabled): + lun.pool_lss_pair = ( + self._find_pool_lss_pair_for_cg(lun, excluded_lss)) else: if lun.type_replication and not lun.is_snapshot: lun.pool_lss_pair = ( @@ -493,7 +510,8 @@ class DS8KProxy(proxy.IBMStorageProxy): excluded_lss.add(lun.pool_lss_pair['source'][1]) def _find_pool_lss_pair_from_spec(self, lun, excluded_lss): - if lun.group and lun.group.consisgroup_enabled: + if lun.group and (lun.group.consisgroup_snapshot_enabled or + lun.group.consisgroup_replication_enabled): msg = _("No support for specifying pool or lss for " "volumes that belong to consistency group.") LOG.error(msg) @@ -505,83 +523,112 @@ class DS8KProxy(proxy.IBMStorageProxy): @coordination.synchronized('{self.prefix}-consistency-group') def _find_pool_lss_pair_for_cg(self, lun, excluded_lss): - lss_in_cache = self.consisgroup_cache.get(lun.group.id, set()) - if not lss_in_cache: - lss_in_cg = self._get_lss_in_cg(lun.group, lun.is_snapshot) - LOG.debug("LSSs used by CG %(cg)s are %(lss)s.", - {'cg': lun.group.id, 'lss': ','.join(lss_in_cg)}) - available_lss = lss_in_cg - excluded_lss + # NOTE: a group may have multiple LSSs. + lss_pairs_in_cache = self.consisgroup_cache.get(lun.group.id, set()) + if not lss_pairs_in_cache: + lss_pairs_in_group = self._get_lss_pairs_in_group(lun.group, + lun.is_snapshot) + LOG.debug("LSSs used by group %(grp)s are %(lss_pair)s.", + {'grp': lun.group.id, 'lss_pair': lss_pairs_in_group}) + available_lss_pairs = set(pair for pair in lss_pairs_in_group + if pair[0] != excluded_lss) else: - available_lss = lss_in_cache - excluded_lss - if not available_lss: - available_lss = self._find_lss_for_cg() + available_lss_pairs = set(pair for pair in lss_pairs_in_cache + if pair[0] != excluded_lss) + if not available_lss_pairs: + available_lss_pairs = self._find_lss_pair_for_cg(lun.group) - pid, lss = self._find_pool_for_lss(available_lss) - if pid: - lss_in_cache.add(lss) - self.consisgroup_cache[lun.group.id] = lss_in_cache + pool_lss_pair, lss_pair = self._find_pool_for_lss(available_lss_pairs) + if pool_lss_pair: + lss_pairs_in_cache.add(lss_pair) + self.consisgroup_cache[lun.group.id] = lss_pairs_in_cache else: raise exception.VolumeDriverException( - message=_('There are still some available LSSs for CG, ' - 'but they are not in the same node as pool.')) - return (pid, lss) + message=(_('There are still some available LSSs %s for CG, ' + 'but they are not in the same node as pool.') + % available_lss_pairs)) + return pool_lss_pair - def _get_lss_in_cg(self, group, is_snapshot=False): - # Driver can not support the case that dedicating LSS for CG while - # user enable multiple backends which use the same DS8K. - try: - volume_backend_name = ( - group.host[group.host.index('@') + 1:group.host.index('#')]) - except ValueError: - raise exception.VolumeDriverException( - message=(_('Invalid host %(host)s in group %(group)s') - % {'host': group.host, 'group': group.id})) - lss_in_cg = set() - if volume_backend_name == self.configuration.volume_backend_name: - if is_snapshot: - luns = [Lun(snapshot, is_snapshot=True) - for snapshot in group.snapshots] - else: - luns = [Lun(volume) for volume in group.volumes] - lss_in_cg = set(lun.ds_id[:2] for lun in luns if lun.ds_id) - return lss_in_cg + def _get_lss_pairs_in_group(self, group, is_snapshot=False): + lss_pairs_in_group = set() + if is_snapshot: + luns = [Lun(snapshot, is_snapshot=True) + for snapshot in group.snapshots] + else: + luns = [Lun(volume) for volume in group.volumes] + if group.consisgroup_replication_enabled and not is_snapshot: + lss_pairs_in_group = set((lun.ds_id[:2], lun.replica_ds_id[:2]) + for lun in luns if lun.ds_id and + lun.replica_ds_id) + else: + lss_pairs_in_group = set((lun.ds_id[:2], None) + for lun in luns if lun.ds_id) + return lss_pairs_in_group - def _find_lss_for_cg(self): - # Unable to get CGs/groups belonging to the current tenant, so - # get all of them, this function will consume some time if there - # are so many CGs/groups. - lss_used = set() + def _find_lss_pair_for_cg(self, group): + lss_pairs_used = set() ctxt = context.get_admin_context() - existing_groups = objects.GroupList.get_all( - ctxt, filters={'status': 'available'}) - for group in existing_groups: - if Group(group).consisgroup_enabled: - lss_used = lss_used | self._get_lss_in_cg(group) - existing_groupsnapshots = objects.GroupSnapshotList.get_all( - ctxt, filters={'status': 'available'}) - for group in existing_groupsnapshots: - if Group(group, True).consisgroup_enabled: - lss_used = lss_used | self._get_lss_in_cg(group, True) - available_lss = set(self._helper.backend['lss_ids_for_cg']) - lss_used - for lss_set in self.consisgroup_cache.values(): - available_lss -= lss_set - self._assert(available_lss, + filters = {'host': group.host, 'status': 'available'} + groups = objects.GroupList.get_all(ctxt, filters=filters) + for grp in groups: + grp = Group(grp) + if (grp.consisgroup_snapshot_enabled or + grp.consisgroup_replication_enabled): + lss_pairs_used |= self._get_lss_pairs_in_group(grp) + group_snapshots = ( + objects.GroupSnapshotList.get_all(ctxt, filters=filters)) + for grp in group_snapshots: + grp = Group(grp, True) + if (grp.consisgroup_snapshot_enabled or + grp.consisgroup_replication_enabled): + lss_pairs_used |= self._get_lss_pairs_in_group(grp, True) + # in order to keep one-to-one pprc mapping relationship, zip LSSs + # which reserved by user. + if group.consisgroup_replication_enabled: + target_helper = self._replication.get_target_helper() + available_lss_pairs = zip(self._helper.backend['lss_ids_for_cg'], + target_helper.backend['lss_ids_for_cg']) + else: + available_lss_pairs = [(lss, None) for lss in + self._helper.backend['lss_ids_for_cg']] + + source_lss_used = set() + for lss_pair in lss_pairs_used: + source_lss_used.add(lss_pair[0]) + # in concurrency case, lss may be reversed in cache but the group has + # not been committed into DB. + for lss_pairs_set in self.consisgroup_cache.values(): + source_lss_used |= set(lss_pair[0] for lss_pair in lss_pairs_set) + + available_lss_pairs = [lss_pair for lss_pair in available_lss_pairs + if lss_pair[0] not in source_lss_used] + self._assert(available_lss_pairs, "All LSSs reserved for CG have been used out, " "please reserve more LSS for CG if there are still" "some empty LSSs left.") - LOG.debug('_find_lss_for_cg: available LSSs for consistency ' - 'group are %s', ','.join(available_lss)) - return available_lss + LOG.debug('_find_lss_pair_for_cg: available LSSs for consistency ' + 'group are %s', available_lss_pairs) + return available_lss_pairs @proxy.logger - def _find_pool_for_lss(self, available_lss): - for lss in available_lss: - pid = self._helper.get_pool(lss) - if pid: - return (pid, lss) + def _find_pool_for_lss(self, available_lss_pairs): + # all LSS pairs have target LSS or do not have. + for src_lss, tgt_lss in available_lss_pairs: + src_pid = self._helper.get_pool(src_lss) + if not src_pid: + continue + if tgt_lss: + target_helper = self._replication.get_target_helper() + tgt_pid = target_helper.get_pool(tgt_lss) + if tgt_pid: + return ({'source': (src_pid, src_lss), + 'target': (tgt_pid, tgt_lss)}, + (src_lss, tgt_lss)) + else: + return {'source': (src_pid, src_lss)}, (src_lss, tgt_lss) raise exception.VolumeDriverException( message=(_("Can not find pool for LSSs %s.") - % ','.join(available_lss))) + % ','.join(available_lss_pairs))) @proxy.logger def _clone_lun(self, src_lun, tgt_lun): @@ -648,7 +695,7 @@ class DS8KProxy(proxy.IBMStorageProxy): def _create_replica_helper(self, lun): if not lun.pool_lss_pair.get('target'): - lun = self._replication.enable_replication(lun, True) + lun = self._replication.establish_replication(lun, True) else: lun = self._replication.create_replica(lun) return lun @@ -865,7 +912,8 @@ class DS8KProxy(proxy.IBMStorageProxy): # exception happens during clean up can be ignored. if new_type_replication: new_lun.type_replication = True - new_lun = self._replication.enable_replication(new_lun, True) + new_lun = self._replication.establish_replication(new_lun, + True) elif old_type_replication: new_lun.type_replication = False try: @@ -882,7 +930,7 @@ class DS8KProxy(proxy.IBMStorageProxy): # lun when failed to enable replication or delete replica. if not old_type_replication and new_type_replication: lun.type_replication = True - lun = self._replication.enable_replication(lun) + lun = self._replication.establish_replication(lun) elif old_type_replication and not new_type_replication: lun = self._replication.delete_replica(lun) lun.type_replication = False @@ -893,73 +941,125 @@ class DS8KProxy(proxy.IBMStorageProxy): @proxy.logger def initialize_connection(self, volume, connector, **kwargs): """Attach a volume to the host.""" - vol_id = Lun(volume).ds_id - LOG.info('Attach the volume %s.', vol_id) - return self._helper.initialize_connection(vol_id, connector, **kwargs) + lun = Lun(volume) + LOG.info('Attach the volume %s.', lun.ds_id) + if lun.group and lun.failed_over: + backend_helper = self._replication.get_target_helper() + else: + backend_helper = self._helper + return backend_helper.initialize_connection(lun.ds_id, connector, + **kwargs) @proxy._trace_time @proxy.logger def terminate_connection(self, volume, connector, force=False, **kwargs): """Detach a volume from a host.""" - vol_id = Lun(volume).ds_id - LOG.info('Detach the volume %s.', vol_id) - return self._helper.terminate_connection(vol_id, connector, - force, **kwargs) + lun = Lun(volume) + LOG.info('Detach the volume %s.', lun.ds_id) + if lun.group and lun.failed_over: + backend_helper = self._replication.get_target_helper() + else: + backend_helper = self._helper + return backend_helper.terminate_connection(lun.ds_id, connector, + force, **kwargs) @proxy.logger def create_group(self, ctxt, group): - """Create generic volume group.""" - if Group(group).consisgroup_enabled: + """Create consistency group of FlashCopy or RemoteCopy.""" + grp = Group(group) + if (grp.group_replication_enabled or + grp.consisgroup_replication_enabled): + for volume_type in group.volume_types: + replication_type = utils.is_replicated_spec( + volume_type.extra_specs) + self._assert(replication_type, + 'Unable to create group: group %(grp)s ' + 'is for replication type, but volume ' + '%(vtype)s is a non-replication one.' + % {'grp': grp.id, 'vtype': volume_type.id}) + if (grp.consisgroup_snapshot_enabled or + grp.consisgroup_replication_enabled): self._assert(self._helper.backend['lss_ids_for_cg'], 'No LSS(s) for CG, please make sure you have ' 'reserved LSS for CG via param lss_range_for_cg.') - return self._helper.create_group(group) + model_update = {} + if grp.consisgroup_replication_enabled: + self._helper.verify_rest_version_for_pprc_cg() + target_helper = self._replication.get_target_helper() + target_helper.verify_rest_version_for_pprc_cg() + model_update['replication_status'] = ( + fields.ReplicationStatus.ENABLED) + model_update.update(self._helper.create_group(group)) + return model_update + else: + # NOTE(jiamin): If grp.group_replication_enabled is True, the + # default implementation will handle the creation of the group + # and driver just makes sure each volume type in group has + # enabled replication. + raise NotImplementedError() @proxy.logger def delete_group(self, ctxt, group, volumes): - """Delete group and the volumes in the group.""" - luns = [Lun(volume) for volume in volumes] - if Group(group).consisgroup_enabled: + """Delete consistency group and volumes in it.""" + grp = Group(group) + if grp.consisgroup_snapshot_enabled: + luns = [Lun(volume) for volume in volumes] + return self._delete_group_with_lock(group, luns) + elif grp.consisgroup_replication_enabled: + self._assert(not grp.failed_over, + 'Group %s has been failed over, it does ' + 'not support to delete it' % grp.id) + luns = [Lun(volume) for volume in volumes] + for lun in luns: + self._replication.delete_replica(lun) return self._delete_group_with_lock(group, luns) else: - return self._helper.delete_group(group, luns) + raise NotImplementedError() @coordination.synchronized('{self.prefix}-consistency-group') def _delete_group_with_lock(self, group, luns): model_update, volumes_model_update = ( self._helper.delete_group(group, luns)) if model_update['status'] == fields.GroupStatus.DELETED: - self._update_consisgroup_cache(group.id) + self._remove_record_from_consisgroup_cache(group.id) return model_update, volumes_model_update @proxy.logger def delete_group_snapshot(self, ctxt, group_snapshot, snapshots): """Delete volume group snapshot.""" - tgt_luns = [Lun(s, is_snapshot=True) for s in snapshots] - if Group(group_snapshot, True).consisgroup_enabled: + grp = Group(group_snapshot, True) + if (grp.consisgroup_snapshot_enabled or + grp.consisgroup_replication_enabled): + tgt_luns = [Lun(s, is_snapshot=True) for s in snapshots] return self._delete_group_snapshot_with_lock( group_snapshot, tgt_luns) else: - return self._helper.delete_group_snapshot( - group_snapshot, tgt_luns) + raise NotImplementedError() @coordination.synchronized('{self.prefix}-consistency-group') def _delete_group_snapshot_with_lock(self, group_snapshot, tgt_luns): model_update, snapshots_model_update = ( self._helper.delete_group_snapshot(group_snapshot, tgt_luns)) if model_update['status'] == fields.GroupStatus.DELETED: - self._update_consisgroup_cache(group_snapshot.id) + self._remove_record_from_consisgroup_cache(group_snapshot.id) return model_update, snapshots_model_update @proxy.logger def create_group_snapshot(self, ctxt, group_snapshot, snapshots): """Create volume group snapshot.""" + tgt_group = Group(group_snapshot, True) + if (not tgt_group.consisgroup_snapshot_enabled and + not tgt_group.consisgroup_replication_enabled): + raise NotImplementedError() + + src_group = Group(group_snapshot.group) + self._assert(not src_group.failed_over, + 'Group %s has been failed over, it does not ' + 'support to create group snapshot.' % src_group.id) snapshots_model_update = [] model_update = {'status': fields.GroupStatus.AVAILABLE} - - src_luns = [Lun(snapshot['volume']) for snapshot in snapshots] + src_luns = [Lun(snapshot.volume) for snapshot in snapshots] tgt_luns = [Lun(snapshot, is_snapshot=True) for snapshot in snapshots] - try: if src_luns and tgt_luns: self._clone_group(src_luns, tgt_luns) @@ -980,89 +1080,89 @@ class DS8KProxy(proxy.IBMStorageProxy): @proxy.logger def update_group(self, ctxt, group, add_volumes, remove_volumes): """Update generic volume group.""" - if Group(group).consisgroup_enabled: - return self._update_group(group, add_volumes, remove_volumes) + grp = Group(group) + if (grp.consisgroup_snapshot_enabled or + grp.consisgroup_replication_enabled): + self._assert(not grp.failed_over, + 'Group %s has been failed over, it does not ' + 'support to update it.' % grp.id) + return self._update_consisgroup(grp, add_volumes, remove_volumes) else: - return None, None, None + raise NotImplementedError() - def _update_group(self, group, add_volumes, remove_volumes): + def _update_consisgroup(self, grp, add_volumes, remove_volumes): add_volumes_update = [] - group_volume_ids = [vol.id for vol in group.volumes] - add_volumes = [vol for vol in add_volumes - if vol.id not in group_volume_ids] - remove_volumes = [vol for vol in remove_volumes - if vol.id in group_volume_ids] if add_volumes: - add_luns = [Lun(vol) for vol in add_volumes] - lss_in_cg = [Lun(vol).ds_id[:2] for vol in group.volumes] - if not lss_in_cg: - lss_in_cg = self._find_lss_for_empty_group(group, add_luns) - add_volumes_update = self._add_volumes_into_group( - group, add_luns, lss_in_cg) + add_volumes_update = self._add_volumes_into_consisgroup( + grp, add_volumes) + remove_volumes_update = [] if remove_volumes: - self._remove_volumes_in_group(group, add_volumes, remove_volumes) - return None, add_volumes_update, None + remove_volumes_update = self._remove_volumes_from_consisgroup( + grp, add_volumes, remove_volumes) + return None, add_volumes_update, remove_volumes_update - @coordination.synchronized('{self.prefix}-consistency-group') - def _find_lss_for_empty_group(self, group, luns): - sorted_lss_ids = collections.Counter([lun.ds_id[:2] for lun in luns]) - available_lss = self._find_lss_for_cg() - lss_for_cg = None - for lss_id in sorted_lss_ids: - if lss_id in available_lss: - lss_for_cg = lss_id - break - if not lss_for_cg: - lss_for_cg = available_lss.pop() - self._update_consisgroup_cache(group.id, lss_for_cg) - return lss_for_cg - - def _add_volumes_into_group(self, group, add_luns, lss_in_cg): + @proxy.logger + def _add_volumes_into_consisgroup(self, grp, add_volumes): add_volumes_update = [] - luns = [lun for lun in add_luns if lun.ds_id[:2] not in lss_in_cg] - for lun in luns: - if lun.type_replication: - new_lun = self._clone_lun_for_group(group, lun) - new_lun.type_replication = True - new_lun = self._replication.enable_replication(new_lun, True) - lun = self._replication.delete_replica(lun) - else: - new_lun = self._clone_lun_for_group(group, lun) - self._helper.delete_lun(lun) - volume_update = new_lun.update_volume(lun) - volume_update['id'] = new_lun.os_id + new_add_luns, old_add_luns = ( + self._clone_lun_for_consisgroup(add_volumes, grp)) + for new_add_lun, old_add_lun in zip(new_add_luns, old_add_luns): + volume_update = new_add_lun.update_volume(old_add_lun) + volume_update['id'] = new_add_lun.os_id add_volumes_update.append(volume_update) return add_volumes_update - def _clone_lun_for_group(self, group, lun): - lun.group = Group(group) - new_lun = lun.shallow_copy() - new_lun.type_replication = False - self._clone_lun(lun, new_lun) - return new_lun - + @proxy.logger @coordination.synchronized('{self.prefix}-consistency-group') - def _remove_volumes_in_group(self, group, add_volumes, remove_volumes): - if len(remove_volumes) == len(group.volumes) + len(add_volumes): - self._update_consisgroup_cache(group.id) + def _remove_volumes_from_consisgroup(self, grp, add_volumes, + remove_volumes): + remove_volumes_update = [] + new_remove_luns, old_remove_luns = ( + self._clone_lun_for_consisgroup(remove_volumes)) + for new_remove_lun, old_remove_lun in zip(new_remove_luns, + old_remove_luns): + volume_update = new_remove_lun.update_volume(old_remove_lun) + volume_update['id'] = new_remove_lun.os_id + remove_volumes_update.append(volume_update) + if len(remove_volumes) == len(grp.volumes) + len(add_volumes): + self._remove_record_from_consisgroup_cache(grp.id) + return remove_volumes_update + + def _clone_lun_for_consisgroup(self, volumes, grp=None): + new_luns = [] + old_luns = [] + for volume in volumes: + old_lun = Lun(volume) + if old_lun.ds_id: + new_lun = old_lun.shallow_copy() + new_lun.group = grp + self._clone_lun(old_lun, new_lun) + if old_lun.type_replication: + new_lun = self._create_replica_helper(new_lun) + old_lun = self._replication.delete_replica(old_lun) + self._helper.delete_lun(old_lun) + new_luns.append(new_lun) + old_luns.append(old_lun) + return new_luns, old_luns @proxy.logger - def _update_consisgroup_cache(self, group_id, lss_id=None): - if lss_id: - self.consisgroup_cache[group_id] = set([lss_id]) - else: - if self.consisgroup_cache.get(group_id): - LOG.debug('Group %(id)s owns LSS %(lss)s in the cache.', { - 'id': group_id, - 'lss': ','.join(self.consisgroup_cache[group_id]) - }) - self.consisgroup_cache.pop(group_id) + def _remove_record_from_consisgroup_cache(self, group_id): + lss_pairs = self.consisgroup_cache.get(group_id) + if lss_pairs: + LOG.debug('Consistecy Group %(id)s owns LSS %(lss)s in the cache.', + {'id': group_id, 'lss': lss_pairs}) + self.consisgroup_cache.pop(group_id) @proxy._trace_time def create_group_from_src(self, ctxt, group, volumes, group_snapshot, sorted_snapshots, source_group, sorted_source_vols): """Create volume group from volume group or volume group snapshot.""" + grp = Group(group) + if (not grp.consisgroup_snapshot_enabled and + not grp.consisgroup_replication_enabled): + raise NotImplementedError() + model_update = {'status': fields.GroupStatus.AVAILABLE} volumes_model_update = [] @@ -1072,6 +1172,10 @@ class DS8KProxy(proxy.IBMStorageProxy): elif source_group and sorted_source_vols: src_luns = [Lun(source_vol) for source_vol in sorted_source_vols] + src_group = Group(source_group) + self._assert(not src_group.failed_over, + 'Group %s has been failed over, it does not ' + 'support to create a group from it.' % src_group.id) else: msg = _("_create_group_from_src supports a group snapshot " "source or a group source, other sources can not " @@ -1080,16 +1184,6 @@ class DS8KProxy(proxy.IBMStorageProxy): raise exception.InvalidInput(message=msg) try: - # Don't use paramter volumes because it has DetachedInstanceError - # issue frequently. here tries to get and sort new volumes, a lot - # of cases have been guaranteed by the _sort_source_vols in - # manange.py, so not verify again. - sorted_volumes = [] - for vol in volumes: - found_vols = [v for v in group.volumes if v['id'] == vol['id']] - sorted_volumes.extend(found_vols) - volumes = sorted_volumes - tgt_luns = [Lun(volume) for volume in volumes] if src_luns and tgt_luns: self._clone_group(src_luns, tgt_luns) @@ -1124,7 +1218,7 @@ class DS8KProxy(proxy.IBMStorageProxy): "source_volume": src_lun.ds_id, "target_volume": tgt_lun.ds_id }) - if tgt_lun.group.consisgroup_enabled: + if tgt_lun.group.consisgroup_snapshot_enabled: self._do_flashcopy_with_freeze(vol_pairs) else: self._helper.start_flashcopy(vol_pairs) @@ -1171,49 +1265,53 @@ class DS8KProxy(proxy.IBMStorageProxy): self._active_backend_id) return self._active_backend_id, volume_update_list, [] - backend_id = self._replication._target_helper.backend['id'] + target_helper = self._replication.get_target_helper() if secondary_id is None: - secondary_id = backend_id - elif secondary_id != backend_id: + secondary_id = target_helper.backend['id'] + elif secondary_id != target_helper.backend['id']: raise exception.InvalidReplicationTarget( message=(_('Invalid secondary_backend_id specified. ' - 'Valid backend id is %s.') % backend_id)) + 'Valid backend id is %s.') + % target_helper.backend['id'])) - LOG.debug("Starting failover to %s.", secondary_id) - - replicated_luns = [] - for volume in volumes: - lun = Lun(volume) - if lun.type_replication and lun.status == "available": - replicated_luns.append(lun) - else: - volume_update = ( - self._replication.failover_unreplicated_volume(lun)) - volume_update_list.append(volume_update) - - if replicated_luns: + LOG.debug("Starting failover host to %s.", secondary_id) + # all volumes passed to failover_host are replicated. + replicated_luns = [Lun(volume) for volume in volumes if + volume.status in ('available', 'in-use')] + # volumes in group may have been failed over. + if secondary_id != strings.PRIMARY_BACKEND_ID: + failover_luns = [lun for lun in replicated_luns if + not lun.failed_over] + else: + failover_luns = [lun for lun in replicated_luns if + lun.failed_over] + if failover_luns: try: if secondary_id != strings.PRIMARY_BACKEND_ID: - self._replication.do_pprc_failover(replicated_luns, - secondary_id) + self._replication.start_host_pprc_failover( + failover_luns, secondary_id) self._active_backend_id = secondary_id - replicated_luns = self._switch_backend_connection( - secondary_id, replicated_luns) else: - self._replication.start_pprc_failback( - replicated_luns, self._active_backend_id) + self._replication.start_host_pprc_failback( + failover_luns, secondary_id) self._active_backend_id = "" - self._helper = self._replication._source_helper + self._helper = self._replication.get_source_helper() except restclient.APIException as e: raise exception.UnableToFailOver( reason=(_("Unable to failover host to %(id)s. " "Exception= %(ex)s") % {'id': secondary_id, 'ex': six.text_type(e)})) - for lun in replicated_luns: + for lun in failover_luns: volume_update = lun.get_volume_update() + # failover_host in base cinder has considered previous status + # of the volume, it doesn't need to return it for update. + volume_update['status'] = ( + lun.previous_status or 'available') volume_update['replication_status'] = ( - 'failed-over' if self._active_backend_id else 'enabled') + fields.ReplicationStatus.FAILED_OVER + if self._active_backend_id else + fields.ReplicationStatus.ENABLED) model_update = {'volume_id': lun.os_id, 'updates': volume_update} volume_update_list.append(model_update) @@ -1221,11 +1319,160 @@ class DS8KProxy(proxy.IBMStorageProxy): LOG.info("No volume has replication capability.") if secondary_id != strings.PRIMARY_BACKEND_ID: LOG.info("Switch to the target %s", secondary_id) - self._switch_backend_connection(secondary_id) + self._replication.switch_source_and_target_client() self._active_backend_id = secondary_id else: LOG.info("Switch to the primary %s", secondary_id) - self._switch_backend_connection(self._active_backend_id) + self._replication.switch_source_and_target_client() self._active_backend_id = "" - return secondary_id, volume_update_list, [] + # No group entity in DS8K, so just need to update replication_status + # of the group. + group_update_list = [] + groups = [grp for grp in groups if grp.status == 'available'] + if groups: + if secondary_id != strings.PRIMARY_BACKEND_ID: + update_groups = [grp for grp in groups + if grp.replication_status == + fields.ReplicationStatus.ENABLED] + repl_status = fields.ReplicationStatus.FAILED_OVER + else: + update_groups = [grp for grp in groups + if grp.replication_status == + fields.ReplicationStatus.FAILED_OVER] + repl_status = fields.ReplicationStatus.ENABLED + if update_groups: + for group in update_groups: + group_update = { + 'group_id': group.id, + 'updates': {'replication_status': repl_status} + } + group_update_list.append(group_update) + + return secondary_id, volume_update_list, group_update_list + + def enable_replication(self, context, group, volumes): + """Resume pprc pairs. + + if user wants to adjust group, he/she does not need to pause/resume + pprc pairs, here just provide a way to resume replicaiton. + """ + volumes_model_update = [] + model_update = ( + {'replication_status': fields.ReplicationStatus.ENABLED}) + if volumes: + luns = [Lun(volume) for volume in volumes] + try: + self._replication.enable_replication(luns) + except restclient.APIException as e: + msg = (_('Failed to enable replication for group %(id)s, ' + 'Exception: %(ex)s.') + % {'id': group.id, 'ex': six.text_type(e)}) + LOG.exception(msg) + raise exception.VolumeDriverException(message=msg) + for lun in luns: + volumes_model_update.append( + {'id': lun.os_id, + 'replication_status': fields.ReplicationStatus.ENABLED}) + return model_update, volumes_model_update + + def disable_replication(self, context, group, volumes): + """Pause pprc pairs. + + if user wants to adjust group, he/she does not need to pause/resume + pprc pairs, here just provide a way to pause replicaiton. + """ + volumes_model_update = [] + model_update = ( + {'replication_status': fields.ReplicationStatus.DISABLED}) + if volumes: + luns = [Lun(volume) for volume in volumes] + try: + self._replication.disable_replication(luns) + except restclient.APIException as e: + msg = (_('Failed to disable replication for group %(id)s, ' + 'Exception: %(ex)s.') + % {'id': group.id, 'ex': six.text_type(e)}) + LOG.exception(msg) + raise exception.VolumeDriverException(message=msg) + for lun in luns: + volumes_model_update.append( + {'id': lun.os_id, + 'replication_status': fields.ReplicationStatus.DISABLED}) + return model_update, volumes_model_update + + def failover_replication(self, context, group, volumes, + secondary_backend_id): + """Fail over replication for a group and volumes in the group.""" + volumes_model_update = [] + model_update = {} + luns = [Lun(volume) for volume in volumes] + if secondary_backend_id == strings.PRIMARY_BACKEND_ID: + if luns: + if not luns[0].failed_over: + LOG.info("Group %s has been failed back. it doesn't " + "need to fail back again.", group.id) + return model_update, volumes_model_update + else: + return model_update, volumes_model_update + else: + target_helper = self._replication.get_target_helper() + backend_id = target_helper.backend['id'] + if secondary_backend_id is None: + secondary_backend_id = backend_id + elif secondary_backend_id != backend_id: + raise exception.InvalidReplicationTarget( + message=(_('Invalid secondary_backend_id %(id)s. ' + 'Valid backend ids are %(ids)s.') + % {'id': secondary_backend_id, + 'ids': (strings.PRIMARY_BACKEND_ID, + backend_id)})) + if luns: + if luns[0].failed_over: + LOG.info("Group %(grp)s has been failed over to %(id)s.", + {'grp': group.id, 'id': backend_id}) + return model_update, volumes_model_update + else: + return model_update, volumes_model_update + + LOG.debug("Starting failover group %(grp)s to %(id)s.", + {'grp': group.id, 'id': secondary_backend_id}) + try: + if secondary_backend_id != strings.PRIMARY_BACKEND_ID: + self._replication.start_group_pprc_failover( + luns, secondary_backend_id) + model_update['replication_status'] = ( + fields.ReplicationStatus.FAILED_OVER) + else: + self._replication.start_group_pprc_failback( + luns, secondary_backend_id) + model_update['replication_status'] = ( + fields.ReplicationStatus.ENABLED) + except restclient.APIException as e: + raise exception.VolumeDriverException( + message=(_("Unable to failover group %(grp_id)s to " + "backend %(bck_id)s. Exception= %(ex)s") + % {'grp_id': group.id, + 'bck_id': secondary_backend_id, + 'ex': six.text_type(e)})) + + for lun in luns: + volume_model_update = lun.get_volume_update() + # base cinder doesn't consider previous status of the volume + # in failover_replication, so here returns it for update. + volume_model_update['previous_status'] = lun.status + volume_model_update['status'] = ( + lun.previous_status or 'available') + volume_model_update['replication_status'] = ( + model_update['replication_status']) + volume_model_update['id'] = lun.os_id + volumes_model_update.append(volume_model_update) + return model_update, volumes_model_update + + def get_replication_error_status(self, context, groups): + """Return error info for replicated groups and its volumes. + + all pprc copy related APIs wait until copy is finished, so it does + not need to check their status afterwards. + """ + return [], [] diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py index f15499aa64f..e689df987d9 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. # -import ast import eventlet import six @@ -39,35 +38,36 @@ PPRC_PATH_FULL = 0x03 class MetroMirrorManager(object): """Manage metro mirror for replication.""" - def __init__(self, source, target): - self._source = source - self._target = target + def __init__(self, source_helper, target_helper): + self._source_helper = source_helper + self._target_helper = target_helper def switch_source_and_target(self): - self._source, self._target = self._target, self._source + self._source_helper, self._target_helper = ( + self._target_helper, self._source_helper) def check_physical_links(self): - ports = self._source.get_physical_links( - self._target.backend['storage_wwnn']) + ports = self._source_helper.get_physical_links( + self._target_helper.backend['storage_wwnn']) if not ports: raise exception.VolumeDriverException( message=((_("%(tgt)s is not connected to %(src)s!") % { - 'tgt': self._target.backend['storage_wwnn'], - 'src': self._source.backend['storage_wwnn'] + 'tgt': self._target_helper.backend['storage_wwnn'], + 'src': self._source_helper.backend['storage_wwnn'] }))) pairs = [{ 'source_port_id': p['source_port_id'], 'target_port_id': p['target_port_id'] } for p in ports] - if not self._target.backend['port_pairs']: + if not self._target_helper.backend['port_pairs']: # if there are more than eight physical links, # choose eight of them. - self._target.backend['port_pairs'] = ( + self._target_helper.backend['port_pairs'] = ( pairs[:8] if len(pairs) > 8 else pairs) else: # verify the port pairs user set - for pair in self._target.backend['port_pairs']: + for pair in self._target_helper.backend['port_pairs']: if pair not in pairs: valid_pairs = ';'.join( ["%s-%s" % (p['source_port_id'], @@ -80,14 +80,14 @@ class MetroMirrorManager(object): "port pair(s) are: %(valid)s") % {'invalid': invalid_pair, 'valid': valid_pairs}))) - self._source.backend['port_pairs'] = [{ + self._source_helper.backend['port_pairs'] = [{ 'source_port_id': p['target_port_id'], 'target_port_id': p['source_port_id'] - } for p in self._target.backend['port_pairs']] + } for p in self._target_helper.backend['port_pairs']] def is_target_alive(self): try: - self._target.get_systems() + self._target_helper.get_systems() except restclient.TimeoutException as e: LOG.info("REST request time out, backend may be not available " "any more. Exception: %s", e) @@ -110,12 +110,12 @@ class MetroMirrorManager(object): if excluded_lss: paths = [p for p in paths if p['source_lss_id'] not in excluded_lss] - # only enable_replication will specify the source LSS + # only establish_replication will specify the source LSS # and it need to reuse LSS reserved for CG if this LSS # is in PPRC path. if not specified_lss: paths = [p for p in paths if p['source_lss_id'] not in - self._source.backend['lss_ids_for_cg']] + self._source_helper.backend['lss_ids_for_cg']] # sort pairs according to the number of luns in their LSSes, # and get the pair which LSS has least luns. @@ -123,7 +123,7 @@ class MetroMirrorManager(object): source_lss_set = set(p['source_lss_id'] for p in paths) for lss in source_lss_set: # get the number of luns in source. - src_luns = self._source.get_lun_number_in_lss(lss) + src_luns = self._source_helper.get_lun_number_in_lss(lss) if src_luns == helper.LSS_VOL_SLOTS and not specified_lss: continue @@ -131,7 +131,7 @@ class MetroMirrorManager(object): for path in spec_paths: # get the number of luns in target. try: - tgt_luns = self._target.get_lun_number_in_lss( + tgt_luns = self._target_helper.get_lun_number_in_lss( path['target_lss_id']) except restclient.APIException: # if DS8K can fix this problem, then remove the @@ -148,23 +148,24 @@ class MetroMirrorManager(object): else: src_lss, tgt_lss, num = sorted(candidates, key=lambda c: c[2])[0] return PPRC_PATH_HEALTHY, { - 'source': (self._source.get_pool(src_lss), src_lss), - 'target': (self._target.get_pool(tgt_lss), tgt_lss) + 'source': (self._source_helper.get_pool(src_lss), src_lss), + 'target': (self._target_helper.get_pool(tgt_lss), tgt_lss) } def _filter_pprc_paths(self, lss): - paths = self._source.get_pprc_paths(lss) + paths = self._source_helper.get_pprc_paths(lss) if paths: # get the paths only connected to replication target paths = [p for p in paths if p['target_system_wwnn'] in - self._target.backend['storage_wwnn']] + self._target_helper.backend['storage_wwnn']] else: LOG.info("No PPRC paths found in primary DS8K.") return PPRC_PATH_NOT_EXIST, None # get the paths whose port pairs have been set in configuration file. - expected_port_pairs = [(p['source_port_id'], p['target_port_id']) - for p in self._target.backend['port_pairs']] + expected_port_pairs = [ + (port['source_port_id'], port['target_port_id']) + for port in self._target_helper.backend['port_pairs']] for path in paths[:]: port_pairs = [(p['source_port_id'], p['target_port_id']) for p in path['port_pairs']] @@ -177,11 +178,11 @@ class MetroMirrorManager(object): # abandon PPRC paths according to volume type(fb/ckd) source_lss_set = set(p['source_lss_id'] for p in paths) - if self._source.backend.get('device_mapping'): + if self._source_helper.backend.get('device_mapping'): source_lss_set = source_lss_set & set( - self._source.backend['device_mapping'].keys()) + self._source_helper.backend['device_mapping'].keys()) else: - all_lss = self._source.get_all_lss(['id', 'type']) + all_lss = self._source_helper.get_all_lss(['id', 'type']) fb_lss = set( lss['id'] for lss in all_lss if lss['type'] == 'fb') source_lss_set = source_lss_set & fb_lss @@ -196,13 +197,13 @@ class MetroMirrorManager(object): discarded_tgt_lss = [] for lss in source_lss_set: spec_paths = [p for p in paths if p['source_lss_id'] == lss] - if self._source.get_pool(lss) is None: + if self._source_helper.get_pool(lss) is None: discarded_src_lss.append(lss) continue for spec_path in spec_paths: tgt_lss = spec_path['target_lss_id'] - if self._target.get_pool(tgt_lss) is None: + if self._target_helper.get_pool(tgt_lss) is None: discarded_tgt_lss.append(tgt_lss) if discarded_src_lss: @@ -228,13 +229,17 @@ class MetroMirrorManager(object): return PPRC_PATH_HEALTHY, paths - def create_pprc_path(self, pool_lss_pair): - src_lss = pool_lss_pair['source'][1] - tgt_lss = pool_lss_pair['target'][1] - # check whether the pprc path exists and is healthy or not firstly. - pid = (self._source.backend['storage_wwnn'] + '_' + src_lss + ':' + - self._target.backend['storage_wwnn'] + '_' + tgt_lss) - state = self._is_pprc_paths_healthy(pid) + def create_pprc_path(self, lun, is_group=False): + switch = lun.failed_over if is_group else False + src_helper, tgt_helper = ( + (self._target_helper, self._source_helper) if switch else + (self._source_helper, self._target_helper)) + src_lss = lun.pool_lss_pair['source'][1] + tgt_lss = lun.pool_lss_pair['target'][1] + # check whether the pprc path exists and is healthy or not. + pid = (src_helper.backend['storage_wwnn'] + '_' + src_lss + ':' + + tgt_helper.backend['storage_wwnn'] + '_' + tgt_lss) + state = self._is_pprc_paths_healthy(pid, switch) LOG.info("The state of PPRC path %(path)s is %(state)s.", {'path': pid, 'state': state}) if state == PPRC_PATH_HEALTHY: @@ -242,31 +247,34 @@ class MetroMirrorManager(object): # create the pprc path pathData = { - 'target_system_wwnn': self._target.backend['storage_wwnn'], + 'target_system_wwnn': tgt_helper.backend['storage_wwnn'], 'source_lss_id': src_lss, 'target_lss_id': tgt_lss, - 'port_pairs': self._target.backend['port_pairs'] + 'port_pairs': tgt_helper.backend['port_pairs'] } + if lun.group and lun.group.consisgroup_replication_enabled: + pathData['pprc_consistency_group'] = 'enable' LOG.info("PPRC path %(src)s:%(tgt)s will be created.", {'src': src_lss, 'tgt': tgt_lss}) - self._source.create_pprc_path(pathData) + src_helper.create_pprc_path(pathData) # check the state of the pprc path LOG.debug("Checking the state of the new PPRC path.") for retry in range(4): eventlet.sleep(2) - if self._is_pprc_paths_healthy(pid) == PPRC_PATH_HEALTHY: + if self._is_pprc_paths_healthy(pid, switch) == PPRC_PATH_HEALTHY: break if retry == 3: - self._source.delete_pprc_path(pid) + src_helper.delete_pprc_path(pid) raise restclient.APIException( data=(_("Failed to create PPRC path %(src)s:%(tgt)s.") % {'src': src_lss, 'tgt': tgt_lss})) LOG.debug("Create the new PPRC path successfully.") - def _is_pprc_paths_healthy(self, path_id): + def _is_pprc_paths_healthy(self, path_id, switch): + bck_helper = self._target_helper if switch else self._source_helper try: - path = self._source.get_pprc_path(path_id) + path = bck_helper.get_pprc_path(path_id) except restclient.APIException: return PPRC_PATH_NOT_EXIST @@ -278,99 +286,114 @@ class MetroMirrorManager(object): def create_pprc_pairs(self, lun): tgt_vol_id = lun.replication_driver_data[ - self._target.backend['id']]['vol_hex_id'] - tgt_stg_id = self._target.backend['storage_unit'] + self._target_helper.backend['id']]['vol_hex_id'] + tgt_stg_id = self._target_helper.backend['storage_unit'] vol_pairs = [{ 'source_volume': lun.ds_id, - 'source_system_id': self._source.backend['storage_unit'], + 'source_system_id': self._source_helper.backend['storage_unit'], 'target_volume': tgt_vol_id, 'target_system_id': tgt_stg_id }] - pairData = { + pair_data = { "volume_pairs": vol_pairs, "type": "metro_mirror", "options": ["permit_space_efficient_target", "initial_copy_full"] } - LOG.debug("Creating pprc pair, pairData is %s.", pairData) - self._source.create_pprc_pair(pairData) - self._source.wait_pprc_copy_finished([lun.ds_id], 'full_duplex') + LOG.debug("Creating pprc pair, pair_data is %s.", pair_data) + self._source_helper.create_pprc_pair(pair_data) + self._source_helper.wait_pprc_copy_finished([lun.ds_id], 'full_duplex') LOG.info("The state of PPRC pair has become full_duplex.") def delete_pprc_pairs(self, lun): - self._source.delete_pprc_pair(lun.ds_id) + self._source_helper.delete_pprc_pair(lun.ds_id) if self.is_target_alive() and lun.replication_driver_data: replica = sorted(lun.replication_driver_data.values())[0] - self._target.delete_pprc_pair(replica['vol_hex_id']) + self._target_helper.delete_pprc_pair(replica['vol_hex_id']) - def do_pprc_failover(self, luns, backend_id): + def do_pprc_failover(self, luns, is_group=False): + switch = luns[0].failed_over if is_group else False + src_helper, tgt_helper = ( + (self._target_helper, self._source_helper) if switch else + (self._source_helper, self._target_helper)) vol_pairs = [] target_vol_ids = [] for lun in luns: - target_vol_id = ( - lun.replication_driver_data[backend_id]['vol_hex_id']) - if not self._target.lun_exists(target_vol_id): + if not tgt_helper.lun_exists(lun.replica_ds_id): LOG.info("Target volume %(volid)s doesn't exist in " "DS8K %(storage)s.", - {'volid': target_vol_id, - 'storage': self._target.backend['storage_unit']}) + {'volid': lun.replica_ds_id, + 'storage': tgt_helper.backend['storage_unit']}) continue vol_pairs.append({ - 'source_volume': target_vol_id, - 'source_system_id': self._target.backend['storage_unit'], + 'source_volume': lun.replica_ds_id, + 'source_system_id': tgt_helper.backend['storage_unit'], 'target_volume': lun.ds_id, - 'target_system_id': self._source.backend['storage_unit'] + 'target_system_id': src_helper.backend['storage_unit'] }) - target_vol_ids.append(target_vol_id) + target_vol_ids.append(lun.replica_ds_id) - pairData = { + pair_data = { "volume_pairs": vol_pairs, "type": "metro_mirror", "options": ["failover"] } - LOG.info("Begin to fail over to %s", - self._target.backend['storage_unit']) - self._target.create_pprc_pair(pairData) - self._target.wait_pprc_copy_finished(target_vol_ids, - 'suspended', False) + LOG.info("Begin to fail over to %(backend)s, " + "pair_data is %(pair_data)s.", + {'backend': tgt_helper.backend['storage_unit'], + 'pair_data': pair_data}) + tgt_helper.create_pprc_pair(pair_data) + tgt_helper.wait_pprc_copy_finished(target_vol_ids, + 'suspended', switch) LOG.info("Failover from %(src)s to %(tgt)s is finished.", { - 'src': self._source.backend['storage_unit'], - 'tgt': self._target.backend['storage_unit'] + 'src': src_helper.backend['storage_unit'], + 'tgt': tgt_helper.backend['storage_unit'] }) - def do_pprc_failback(self, luns, backend_id): - pprc_ids = [] - vol_ids = [] + def get_pprc_pair_ids(self, luns, switch=False): + if not luns: + return None + src_helper, tgt_helper = ( + (self._target_helper, self._source_helper) if switch else + (self._source_helper, self._target_helper)) + pprc_pair_ids = [] for lun in luns: - target_vol_id = ( - lun.replication_driver_data[backend_id]['vol_hex_id']) - if not self._target.lun_exists(target_vol_id): + if switch: + is_lun_exist = tgt_helper.lun_exists(lun.replica_ds_id) + else: + is_lun_exist = src_helper.lun_exists(lun.ds_id) + if not is_lun_exist: LOG.info("Target volume %(volume)s doesn't exist in " "DS8K %(storage)s.", - {'volume': lun.ds_id, - 'storage': self._target.backend['storage_unit']}) + {'volume': (lun.replica_ds_id + if switch else lun.ds_id), + 'storage': (tgt_helper.backend['storage_unit'] + if switch else + src_helper.backend['storage_unit'])}) continue + pprc_pair_ids.append( + src_helper.backend['storage_unit'] + '_' + lun.ds_id + ':' + + tgt_helper.backend['storage_unit'] + '_' + lun.replica_ds_id) + return pprc_pair_ids - pprc_id = (self._source.backend['storage_unit'] + '_' + - lun.ds_id + ':' + - self._target.backend['storage_unit'] + - '_' + target_vol_id) - pprc_ids.append(pprc_id) - vol_ids.append(lun.ds_id) - - pairData = {"pprc_ids": pprc_ids, - "type": "metro_mirror", - "options": ["failback"]} - - LOG.info("Begin to run failback in %s.", - self._source.backend['storage_unit']) - self._source.do_failback(pairData) - self._source.wait_pprc_copy_finished(vol_ids, 'full_duplex', False) + def do_pprc_failback(self, luns, is_group=False): + switch = luns[0].failed_over if is_group else False + bck_helper = self._target_helper if switch else self._source_helper + pair_data = {"pprc_ids": self.get_pprc_pair_ids(luns, switch), + "type": "metro_mirror", + "options": ["failback"]} + LOG.info("Begin to run failback in %(backend)s, " + "pair_data is %(pair_data)s.", + {'backend': bck_helper.backend['storage_unit'], + 'pair_data': pair_data}) + bck_helper.do_failback(pair_data) + lun_ids = [lun.ds_id for lun in luns] + bck_helper.wait_pprc_copy_finished(lun_ids, 'full_duplex', switch) LOG.info("Run failback in %s is finished.", - self._source.backend['storage_unit']) + bck_helper.backend['storage_unit']) class Replication(object): @@ -383,9 +406,10 @@ class Replication(object): 1.0.0 - initial revision. 2.1.0 - ignore exception during cleanup when creating or deleting replica failed. + 2.1.1 - Adding support for replication consistency group. """ - VERSION = "2.1.0" + VERSION = "2.1.1" def __init__(self, source_helper, target_device): self._source_helper = source_helper @@ -401,11 +425,25 @@ class Replication(object): err=(_("Param [connection_type] %s in replication_device " "is invalid.") % connection_type)) - self._target_helper.backend['lss_ids_for_cg'] = ( - self._source_helper.backend['lss_ids_for_cg']) + if self._target_helper.backend['lss_ids_for_cg']: + if (len(self._target_helper.backend['lss_ids_for_cg']) != + len(self._source_helper.backend['lss_ids_for_cg'])): + raise exception.VolumeDriverException( + message=_("Please reserve the same number of LSS for " + "secondary DS8K just as the primary DS8K.")) + else: + self._target_helper.backend['lss_ids_for_cg'] = ( + self._source_helper.backend['lss_ids_for_cg']) + self._mm_manager = MetroMirrorManager(self._source_helper, self._target_helper) + def get_target_helper(self): + return self._target_helper + + def get_source_helper(self): + return self._source_helper + def check_connection_type(self): src_conn_type = self._source_helper.get_connection_type() tgt_conn_type = self._target_helper.get_connection_type() @@ -420,19 +458,25 @@ class Replication(object): def check_physical_links(self): self._mm_manager.check_physical_links() - def switch_source_and_target(self, secondary_id, luns=None): + def switch_source_and_target_client(self): # switch the helper in metro mirror manager self._mm_manager.switch_source_and_target() # switch the helper self._source_helper, self._target_helper = ( self._target_helper, self._source_helper) - # switch the volume id - if luns: - for lun in luns: - backend = lun.replication_driver_data.get(secondary_id, None) - lun.replication_driver_data.update( - {secondary_id: {'vol_hex_id': lun.ds_id}}) - lun.ds_id = backend['vol_hex_id'] + + def _switch_source_and_target_volume(self, luns, secondary_backend_id): + for lun in luns: + if secondary_backend_id == 'default': + backend_id = self._target_helper.backend['id'] + lun.failed_over = False + else: + backend_id = 'default' + lun.failed_over = True + # secondary_id is never blank here. + lun.replication_driver_data = ( + {backend_id: {'vol_hex_id': lun.ds_id}}) + lun.ds_id, lun.replica_ds_id = lun.replica_ds_id, lun.ds_id return luns @proxy.logger @@ -455,10 +499,10 @@ class Replication(object): return {'target': (tgt_pid, tgt_lss)} @proxy.logger - def enable_replication(self, lun, delete_source=False): + def establish_replication(self, lun, delete_source=False): state, lun.pool_lss_pair = ( self._mm_manager.find_from_pprc_paths(lun.ds_id[0:2])) - LOG.debug("enable_replication: pool_lss_pair is %s.", + LOG.debug("establish_replication: pool_lss_pair is %s.", lun.pool_lss_pair) if state == PPRC_PATH_UNHEALTHY: raise restclient.APIException( @@ -479,7 +523,7 @@ class Replication(object): try: self._target_helper.create_lun(lun) # create PPRC paths if need. - self._mm_manager.create_pprc_path(lun.pool_lss_pair) + self._mm_manager.create_pprc_path(lun) # create pprc pair self._mm_manager.create_pprc_pairs(lun) except restclient.APIException: @@ -545,11 +589,35 @@ class Replication(object): def create_pprc_pairs(self, lun): self._mm_manager.create_pprc_pairs(lun) - def do_pprc_failover(self, luns, backend_id): - self._mm_manager.do_pprc_failover(luns, backend_id) + def start_host_pprc_failover(self, luns, backend_id): + self._mm_manager.do_pprc_failover(luns) + self.switch_source_and_target_client() + self._switch_source_and_target_volume(luns, backend_id) + + def start_group_pprc_failover(self, luns, backend_id): + # unlike host failover, group failover needs to fetch changes from + # target volumes to source volumes after group is failed over. + self._mm_manager.do_pprc_failover(luns, True) + self._switch_source_and_target_volume(luns, backend_id) + sample_luns = self._get_sample_luns(luns) + for lun in sample_luns: + self._mm_manager.create_pprc_path(lun, True) + self._mm_manager.do_pprc_failback(luns, True) + + def _get_sample_luns(self, luns): + # choose sample lun according to position. + sample_luns = [] + positions = [] + for lun in luns: + position = (lun.pool_lss_pair['source'][1], + lun.pool_lss_pair['target'][1]) + if position not in positions: + sample_luns.append(lun) + positions.append(position) + return sample_luns @proxy.logger - def start_pprc_failback(self, luns, backend_id): + def start_host_pprc_failback(self, luns, backend_id): # check whether primary client is alive or not. if not self._mm_manager.is_target_alive(): try: @@ -559,28 +627,72 @@ class Replication(object): "please make sure it is back.") LOG.error(msg) raise exception.UnableToFailOver(reason=msg) - - LOG.debug("Failback starts, backend id is %s.", backend_id) - for lun in luns: - self._mm_manager.create_pprc_path(lun.pool_lss_pair) - self._mm_manager.do_pprc_failback(luns, backend_id) + LOG.debug("Failback host starts, backend id is %s.", backend_id) + sample_luns = self._get_sample_luns(luns) + for lun in sample_luns: + self._mm_manager.create_pprc_path(lun) + self._mm_manager.do_pprc_failback(luns) # revert the relationship of source volume and target volume - self.do_pprc_failover(luns, backend_id) - self.switch_source_and_target(backend_id, luns) - self._mm_manager.do_pprc_failback(luns, backend_id) - LOG.debug("Failback ends, backend id is %s.", backend_id) + self.start_host_pprc_failover(luns, backend_id) + self._mm_manager.do_pprc_failback(luns) + LOG.debug("Failback host ends, backend id is %s.", backend_id) @proxy.logger - def failover_unreplicated_volume(self, lun): - provider_location = ast.literal_eval(lun.volume['provider_location']) - if 'old_status' in provider_location: - updates = {'status': provider_location['old_status']} - del provider_location['old_status'] - updates['provider_location'] = six.text_type(provider_location) + def start_group_pprc_failback(self, luns, backend_id): + # NOTE: unlike failover host, after group is failed over, + # source and target clients are not swapped. + LOG.debug("Failback group starts, backend id is %s.", backend_id) + self.start_group_pprc_failover(luns, backend_id) + LOG.debug("Failback group ends, backend id is %s.", backend_id) + + def _get_expected_luns(self, luns, state, ignored_state=None): + lun_ids = set(lun.ds_id for lun in luns) + min_lun_id = min(lun_ids) + max_lun_id = max(lun_ids) + if not luns[0].failed_over: + pairs = self._source_helper.get_pprc_pairs(min_lun_id, max_lun_id) else: - provider_location['old_status'] = lun.status - updates = { - 'status': 'error', - 'provider_location': six.text_type(provider_location) - } - return {'volume_id': lun.os_id, 'updates': updates} + pairs = self._target_helper.get_pprc_pairs(min_lun_id, max_lun_id) + pairs = {pair['source_volume']['name']: pair for pair in pairs} + expected_luns = [] + for lun in luns: + pair = pairs.get(lun.ds_id) + if pair: + if ignored_state and pair['state'] == ignored_state: + continue + elif pair['state'] != state: + raise exception.VolumeDriverException( + message=(_("Source volume %(id)s has wrong pprc pair " + "state %(invalid_state)s, expected one is " + "%(valid_state)s") + % {'id': pair['source_volume']['name'], + 'invalid_state': pair['state'], + 'valid_state': state})) + else: + raise exception.VolumeDriverException( + message=_("There is no PPRC pair for source volume " + "%s.") % lun.ds_id) + expected_luns.append(lun) + return expected_luns + + @proxy.logger + def enable_replication(self, luns): + # after group is failed over, user can not enable replication. + if not luns: + return None + luns = self._get_expected_luns(luns, 'suspended', 'full_duplex') + pprc_pair_ids = self._mm_manager.get_pprc_pair_ids(luns) + LOG.debug("enable_replication: pprc_pair_ids is %s", pprc_pair_ids) + if pprc_pair_ids: + self._source_helper.resume_pprc_pairs(pprc_pair_ids) + + @proxy.logger + def disable_replication(self, luns): + # after group is failed over, user can not disable replication. + if not luns: + return None + luns = self._get_expected_luns(luns, 'full_duplex', 'suspended') + pprc_pair_ids = self._mm_manager.get_pprc_pair_ids(luns) + LOG.debug("disable_replication: pprc_pair_ids is %s", pprc_pair_ids) + if pprc_pair_ids: + self._source_helper.pause_pprc_pairs(pprc_pair_ids) diff --git a/releasenotes/notes/ds8k-replication-group-3f2e8cd3c2e291a3.yaml b/releasenotes/notes/ds8k-replication-group-3f2e8cd3c2e291a3.yaml new file mode 100644 index 00000000000..2c638e35b75 --- /dev/null +++ b/releasenotes/notes/ds8k-replication-group-3f2e8cd3c2e291a3.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add replication consistency group support in DS8K cinder driver.