From d94651db183c099c81bc3dfc6ebd0688a74fd64c Mon Sep 17 00:00:00 2001 From: Jia Min Date: Thu, 12 Jan 2017 01:29:48 -0700 Subject: [PATCH] DS8K driver: change the design of CG When making consistency group snapshot, flashcopy in DS8K needs to freeze the whole LSS which holds volumes. So it will affect volumes not belonging to the CG if they are in the LSS used by volumes belonging to CG. In order to dedicate LSS to CG, we need to allocate volume not belonging to CG in other LSSs. On the other hand, this change switches to generic volume group from consistency group at the same time, but generic volume group driver interfaces are added in another review, please refer to https://review.openstack.org/#/c/445860. DocImpact Depends-On: Id9964f868c2dc14aaee37e4bcbba330fb4575492 Closes-Bug: #1655841 Change-Id: Ic6406a558c035ebb34177f87076adaec55b08f65 --- .../volume/drivers/ibm/test_ds8k_proxy.py | 673 ++++++++++++------ .../drivers/ibm/ibm_storage/ds8k_helper.py | 317 ++++++--- .../drivers/ibm/ibm_storage/ds8k_proxy.py | 493 ++++++++----- .../ibm/ibm_storage/ds8k_replication.py | 204 +++--- .../ibm/ibm_storage/ds8k_restclient.py | 44 +- 5 files changed, 1104 insertions(+), 627 deletions(-) diff --git a/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py b/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py index f4fcabd4ff4..1b8aae874d6 100644 --- a/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py +++ b/cinder/tests/unit/volume/drivers/ibm/test_ds8k_proxy.py @@ -29,6 +29,7 @@ from cinder.tests.unit import utils as testutils from cinder.volume import configuration as conf import cinder.volume.drivers.ibm.ibm_storage as storage from cinder.volume.drivers.ibm.ibm_storage import proxy +from cinder.volume import group_types from cinder.volume import volume_types # mock decorator logger for all unit test cases. @@ -44,6 +45,7 @@ mock_logger.stop() TEST_VOLUME_ID = '0001' TEST_HOST_ID = 'H1' TEST_VOLUME_BACKEND_NAME = 'ds8k_backend' +TEST_GROUP_HOST = 'test_host@' + TEST_VOLUME_BACKEND_NAME + '#fakepool' TEST_LUN_ID = '00' TEST_POOLS_STR = 'P0,P1' TEST_POOL_ID_1 = 'P0' @@ -858,6 +860,10 @@ FAKE_REST_API_RESPONSES = { FAKE_GET_FB_LSS_RESPONSE_3, TEST_TARGET_DS8K_IP + '/lss/' + TEST_LSS_ID_3 + '/get': FAKE_GET_FB_LSS_RESPONSE_3, + TEST_SOURCE_DS8K_IP + '/lss/' + TEST_LCU_ID + '/get': + FAKE_GET_CKD_LSS_RESPONSE, + TEST_TARGET_DS8K_IP + '/lss/' + TEST_LCU_ID + '/get': + FAKE_GET_CKD_LSS_RESPONSE, TEST_SOURCE_DS8K_IP + '/lss/fb/get': FAKE_GET_FB_LSS_RESPONSE_1, TEST_SOURCE_DS8K_IP + '/lss/ckd/get': @@ -978,7 +984,9 @@ class FakeDS8KCommonHelper(helper.DS8KCommonHelper): def _get_value(self, key): value = getattr(self.conf, key, None) - return value if value else self.conf.get(key) + if not value and key not in self.OPTIONAL_PARAMS: + value = self.conf.get(key) + return value def _create_client(self): self._client = FakeRESTScheduler(self._get_value('san_ip'), @@ -1040,13 +1048,13 @@ class FakeDS8KProxy(ds8kproxy.DS8KProxy): proxy.IBMStorageProxy.__init__(self, storage_info, logger, exception, driver, active_backend_id) - self._helper = None self._replication = None self._connector_obj = HTTPConnectorObject self._replication_enabled = False self._active_backend_id = active_backend_id self.configuration = driver.configuration + self.consisgroup_cache = {} self.setup(None) def setup(self, context): @@ -1113,13 +1121,13 @@ class DS8KProxyTest(test.TestCase): def _create_snapshot(self, **kwargs): return testutils.create_snapshot(self.ctxt, **kwargs) - def _create_consistencygroup(self, **kwargs): - return testutils.create_consistencygroup(self.ctxt, **kwargs) + def _create_group(self, **kwargs): + return testutils.create_group(self.ctxt, **kwargs) - def _create_cgsnapshot(self, cg_id, **kwargs): - return testutils.create_cgsnapshot(self.ctxt, - consistencygroup_id= cg_id, - **kwargs) + def _create_group_snapshot(self, group_id, **kwargs): + return testutils.create_group_snapshot(self.ctxt, + group_id=group_id, + **kwargs) def test_check_host_type(self): """host type should be a valid one.""" @@ -1192,7 +1200,6 @@ class DS8KProxyTest(test.TestCase): "total_capacity_gb": 10, "free_capacity_gb": 10, "reserved_percentage": 0, - "consistencygroup_support": True, "consistent_group_snapshot_enabled": True, "multiattach": False } @@ -1225,7 +1232,7 @@ class DS8KProxyTest(test.TestCase): """create volume should choose biggest pool.""" self.configuration.san_clustername = TEST_POOLS_STR cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertEqual(TEST_POOL_ID_1, pool_id) @mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss') @@ -1239,7 +1246,7 @@ class DS8KProxyTest(test.TestCase): "configvols": "0" }] cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertNotEqual(TEST_LSS_ID_1, lss_id) @mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss') @@ -1254,7 +1261,7 @@ class DS8KProxyTest(test.TestCase): "configvols": "0" }] cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertEqual(TEST_LSS_ID_2, lss_id) @mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss') @@ -1284,11 +1291,11 @@ class DS8KProxyTest(test.TestCase): } ] cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertEqual(TEST_LSS_ID_2, lss_id) @mock.patch.object(helper.DS8KCommonHelper, 'get_all_lss') - @mock.patch.object(helper.DS8KCommonHelper, '_find_from_unexisting_lss') + @mock.patch.object(helper.DS8KCommonHelper, '_find_from_nonexistent_lss') def test_find_lss_when_no_existing_lss_available(self, mock_find_lss, mock_get_all_lss): """find LSS when no existing LSSs are available.""" @@ -1300,7 +1307,7 @@ class DS8KProxyTest(test.TestCase): "configvols": "256" }] cmn_helper = FakeDS8KCommonHelper(self.configuration, None) - pool_id, lss_id = cmn_helper.find_available_lss(None, False, None) + pool_id, lss_id = cmn_helper.find_pool_lss_pair(None, False, None) self.assertTrue(mock_find_lss.called) @mock.patch.object(helper.DS8KCommonHelper, '_find_lss') @@ -1309,7 +1316,127 @@ class DS8KProxyTest(test.TestCase): cmn_helper = FakeDS8KCommonHelper(self.configuration, None) mock_find_lss.return_value = None self.assertRaises(restclient.LssIDExhaustError, - cmn_helper.find_available_lss, None, False, None) + cmn_helper.find_pool_lss_pair, None, False, None) + + def test_find_lss_for_volume_which_belongs_to_cg(self): + """find lss for volume, which is in empty CG.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + lun = ds8kproxy.Lun(volume) + self.driver._create_lun_helper(lun) + pid, lss = lun.pool_lss_pair['source'] + self.assertTrue(lss in ['20', '21', '22', '23']) + + def test_find_lss_for_volume_which_belongs_to_cg2(self): + """find lss for volume, which is in CG having volumes.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + self._create_volume(group_id=group.id, + provider_location=location) + volume = self._create_volume(group_id=group.id) + lun = ds8kproxy.Lun(volume) + self.driver._create_lun_helper(lun) + pid, lss = lun.pool_lss_pair['source'] + self.assertEqual(lss, '20') + + def test_find_lss_for_volume_which_belongs_to_cg3(self): + """find lss for volume, and other CGs have volumes.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + + group_type2 = group_types.create( + self.ctxt, + 'group2', + {'consistent_group_snapshot_enabled': ' True'} + ) + group2 = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type2.id) + location = six.text_type({'vol_hex_id': '2000'}) + self._create_volume(group_id=group2.id, + provider_location=location) + lun = ds8kproxy.Lun(volume) + self.driver._create_lun_helper(lun) + pid, lss = lun.pool_lss_pair['source'] + self.assertNotEqual(lss, '20') + + def test_find_lss_for_volume_which_belongs_to_cg4(self): + """find lss for volume, and other CGs are in error state.""" + self.configuration.lss_range_for_cg = '20' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + + group_type2 = group_types.create( + self.ctxt, + 'group2', + {'consistent_group_snapshot_enabled': ' True'} + ) + group2 = self._create_group(status='error', + host=TEST_GROUP_HOST, + group_type_id=group_type2.id) + location = six.text_type({'vol_hex_id': '2000'}) + self._create_volume(group_id=group2.id, + provider_location=location) + lun = ds8kproxy.Lun(volume) + self.driver._create_lun_helper(lun) + pid, lss = lun.pool_lss_pair['source'] + # error group will be ignored, so LSS 20 can be used. + self.assertEqual(lss, '20') + + def test_create_volume_and_assign_to_group_with_wrong_host(self): + # create volume for group which has wrong format of host. + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host="fake_invalid_host", + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_volume, volume) @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') def test_create_volume_but_lss_full_afterwards(self, mock_create_lun): @@ -1342,11 +1469,7 @@ class DS8KProxyTest(test.TestCase): TEST_VOLUME_ID, ast.literal_eval(vol['provider_location'])['vol_hex_id']) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'drivers:os400': '050'}) - def test_create_volume_of_OS400_050(self, mock_volume_types): + def test_create_volume_of_OS400_050(self): """create volume which type is OS400 050.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) @@ -1360,12 +1483,8 @@ class DS8KProxyTest(test.TestCase): ast.literal_eval(vol['provider_location'])['vol_hex_id']) self.assertEqual('050 FB 520UV', vol['metadata']['data_type']) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'drivers:thin_provision': 'False'}) @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') - def test_create_eckd_volume(self, mock_create_lun, mock_volume_types): + def test_create_eckd_volume(self, mock_create_lun): """create volume which type is ECKD.""" self.configuration.connection_type = ( storage.XIV_CONNECTION_TYPE_FC_ECKD) @@ -1470,18 +1589,14 @@ class DS8KProxyTest(test.TestCase): device['san_clustername'] = TEST_ECKD_POOL_ID repl = FakeReplication(src_helper, device) repl.check_physical_links() - lss_pair = repl.find_available_lss_pair(None) + pool_lss_pair = repl.find_pool_lss_pair(None) - expected_lss_pair = {'source': (TEST_ECKD_POOL_ID, TEST_LCU_ID), - 'target': (TEST_ECKD_POOL_ID, TEST_LCU_ID)} - self.assertDictEqual(expected_lss_pair, lss_pair) + expected_pair = {'source': (TEST_ECKD_POOL_ID, TEST_LCU_ID), + 'target': (TEST_ECKD_POOL_ID, TEST_LCU_ID)} + self.assertDictEqual(expected_pair, pool_lss_pair) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(eventlet, 'sleep') - def test_create_fb_replicated_volume(self, mock_sleep, mock_volume_types): + def test_create_fb_replicated_volume(self, mock_sleep): """create FB volume when enable replication.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1499,17 +1614,12 @@ class DS8KProxyTest(test.TestCase): self.assertEqual(TEST_VOLUME_ID, repl[TEST_TARGET_DS8K_IP]['vol_hex_id']) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths') @mock.patch.object(replication.MetroMirrorManager, 'create_pprc_path') @mock.patch.object(eventlet, 'sleep') def test_create_fb_replicated_vol_but_no_path_available(self, mock_sleep, create_pprc_path, - get_pprc_paths, - mock_volume_types): + get_pprc_paths): """create replicated volume but no pprc paths are available.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1536,14 +1646,10 @@ class DS8KProxyTest(test.TestCase): self.driver.create_volume(volume) self.assertTrue(create_pprc_path.called) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths') @mock.patch.object(eventlet, 'sleep') def test_create_fb_replicated_vol_and_verify_lss_in_path( - self, mock_sleep, get_pprc_paths, mock_volume_types): + self, mock_sleep, get_pprc_paths): """create replicated volume should verify the LSS in pprc paths.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1588,14 +1694,10 @@ class DS8KProxyTest(test.TestCase): self.assertEqual(TEST_LSS_ID_1, repl[TEST_TARGET_DS8K_IP]['vol_hex_id'][:2]) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, 'get_pprc_paths') @mock.patch.object(eventlet, 'sleep') def test_create_fb_replicated_vol_when_paths_available( - self, mock_sleep, get_pprc_paths, mock_volume_types): + self, mock_sleep, get_pprc_paths): """create replicated volume when multiple pprc paths are available.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1640,14 +1742,10 @@ class DS8KProxyTest(test.TestCase): self.assertEqual(TEST_LSS_ID_1, repl[TEST_TARGET_DS8K_IP]['vol_hex_id'][:2]) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') @mock.patch.object(eventlet, 'sleep') def test_create_replicated_vol_but_lss_full_afterwards( - self, mock_sleep, create_lun, mock_volume_types): + self, mock_sleep, create_lun): """create replicated volume but lss is full afterwards.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1715,15 +1813,10 @@ class DS8KProxyTest(test.TestCase): self.driver.delete_volume(volume) self.assertFalse(mock_delete_lun.called) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun_by_id') @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') def test_delete_fb_replicated_volume(self, mock_delete_lun, - mock_delete_lun_by_id, - mock_volume_types): + mock_delete_lun_by_id): """Delete volume when enable replication.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -1801,7 +1894,7 @@ class DS8KProxyTest(test.TestCase): location = six.text_type({'vol_hex_id': None}) tgt_vol = self._create_volume(volume_type_id=vol_type.id, provider_location=location) - self.assertRaises(restclient.APIException, + self.assertRaises(exception.VolumeDriverException, self.driver.create_cloned_volume, tgt_vol, src_vol) @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') @@ -1877,12 +1970,7 @@ class DS8KProxyTest(test.TestCase): replication_driver_data=data) self.driver.extend_volume(volume, 2) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) - def test_extend_replicated_volume_that_has_been_failed_over( - self, mock_volume_types): + def test_extend_replicated_volume_that_has_been_failed_over(self): """extend replicated volume which has been failed over should fail.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -2415,210 +2503,375 @@ class DS8KProxyTest(test.TestCase): fake_connector = {'ip': '127.0.0.1', 'initiator': 'iqn.fake'} self.driver.terminate_connection(volume, fake_connector) - def test_delete_consistencygroup_sucessfully(self): - """test a successful cg deletion.""" + def test_create_consistency_group(self): + """user should reserve LSS for consistency group.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_group, + self.ctxt, group) + + def test_delete_consistency_group_sucessfully(self): + """test a successful consistency group deletion.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) + volume = self._create_volume(provider_location=location, + group_id=group.id) model_update, volumes_model_update = ( - self.driver.delete_consistencygroup(self.ctxt, cg, [volume])) + self.driver.delete_group(self.ctxt, group, [volume])) self.assertEqual('deleted', volumes_model_update[0]['status']) - self.assertEqual('deleted', model_update['status']) + self.assertEqual(fields.GroupStatus.DELETED, + model_update['status']) @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') - def test_delete_consistencygroup_failed(self, mock_delete_lun): - """test a failed cg deletion.""" + def test_delete_consistency_group_failed(self, mock_delete_lun): + """test a failed consistency group deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) + volume = self._create_volume(provider_location=location, + group_id=group.id) mock_delete_lun.side_effect = ( restclient.APIException('delete volume failed.')) model_update, volumes_model_update = ( - self.driver.delete_consistencygroup(self.ctxt, cg, [volume])) + self.driver.delete_group(self.ctxt, group, [volume])) self.assertEqual('error_deleting', volumes_model_update[0]['status']) - self.assertEqual('error_deleting', model_update['status']) + self.assertEqual(fields.GroupStatus.ERROR_DELETING, + model_update['status']) + + def test_create_consistency_group_without_reserve_lss(self): + """user should reserve LSS for group if it enables cg.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + self.assertRaises(exception.VolumeDriverException, + self.driver.create_group, self.ctxt, group) + + def test_update_generic_group_without_enable_cg(self): + """update group which not enable cg should return None.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create(self.ctxt, 'group', {}) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + volume = self._create_volume(provider_location=location) + model_update, add_volumes_update, remove_volumes_update = ( + self.driver.update_group(self.ctxt, group, [volume], [])) + self.assertIsNone(model_update) + self.assertIsNone(add_volumes_update) + self.assertIsNone(remove_volumes_update) + + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') + def test_update_generic_group_when_enable_cg(self, mock_create_lun, + mock_get_flashcopy, + mock_sleep): + """update group, but volume is not in LSS which belongs to group.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(provider_location=location, + volume_metadata=metadata) + + mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' + model_update, add_volumes_update, remove_volumes_update = ( + self.driver.update_group(self.ctxt, group, [volume], [])) + location = ast.literal_eval(add_volumes_update[0]['provider_location']) + self.assertEqual('2200', location['vol_hex_id']) + + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') + def test_update_generic_group_when_enable_cg2(self, mock_create_lun, + mock_get_flashcopy, + mock_sleep): + """add replicated volume into group.""" + self.configuration.replication_device = [TEST_REPLICATION_DEVICE] + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + + vol_type = volume_types.create( + self.ctxt, 'VOL_TYPE', {'replication_enabled': ' True'}) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + data = json.dumps( + {TEST_TARGET_DS8K_IP: {'vol_hex_id': TEST_VOLUME_ID}}) + metadata = [{'key': 'data_type', 'value': 'FB 512'}] + volume = self._create_volume(volume_type_id=vol_type.id, + provider_location=location, + replication_driver_data=data, + volume_metadata=metadata) + + mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' + model_update, add_volumes_update, remove_volumes_update = ( + self.driver.update_group(self.ctxt, group, [volume], [])) + location = ast.literal_eval(add_volumes_update[0]['provider_location']) + self.assertEqual('2200', location['vol_hex_id']) + + @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') + def test_delete_generic_group_failed(self, mock_delete_lun): + """test a failed group deletion.""" + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + group_type = group_types.create(self.ctxt, 'group', {}) + group = self._create_group(group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) + volume = self._create_volume(group_type_id=group_type.id, + provider_location=location, + group_id=group.id) + mock_delete_lun.side_effect = ( + restclient.APIException('delete volume failed.')) + model_update, volumes_model_update = ( + self.driver.delete_group(self.ctxt, group, [volume])) + self.assertEqual('error_deleting', volumes_model_update[0]['status']) + self.assertEqual(fields.GroupStatus.ERROR_DELETING, + model_update['status']) def test_delete_generic_group_sucessfully(self): """test a successful generic group deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) + group_type = group_types.create(self.ctxt, 'CG', {}) + group = self._create_group(group_type_id=group_type.id) location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=cg_type.id, + volume = self._create_volume(group_type_id=group_type.id, provider_location=location, - consistencygroup_id=cg.id) + group_id=group.id) model_update, volumes_model_update = ( - self.driver.delete_group(self.ctxt, cg, [volume])) + self.driver.delete_group(self.ctxt, group, [volume])) self.assertEqual('deleted', volumes_model_update[0]['status']) - self.assertEqual('deleted', model_update['status']) + self.assertEqual(fields.GroupStatus.DELETED, model_update['status']) - @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') - def test_delete_generic_group_failed(self, mock_delete_lun): - """test a failed cg deletion.""" - self.driver = FakeDS8KProxy(self.storage_info, self.logger, - self.exception, self) - self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': TEST_VOLUME_ID}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) - mock_delete_lun.side_effect = ( - restclient.APIException('delete volume failed.')) - model_update, volumes_model_update = ( - self.driver.delete_group(self.ctxt, cg, [volume])) - self.assertEqual('error_deleting', volumes_model_update[0]['status']) - self.assertEqual('error_deleting', model_update['status']) - - @mock.patch('oslo_concurrency.lockutils.external_lock', - new=mock.MagicMock()) @mock.patch.object(eventlet, 'sleep') @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') - def test_create_cgsnapshot_sucessfully(self, mock_get_flashcopy, - mock_sleep): - """test a successful cgsnapshot creation.""" + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') + def test_create_consistency_group_snapshot_sucessfully( + self, mock_create_lun, mock_get_flashcopy, mock_sleep): + """test a successful consistency group snapshot creation.""" + self.configuration.lss_range_for_cg = '20-23' self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': '0002'}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) - cg_snapshot = self._create_cgsnapshot(cg_id=cg.id) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + volume = self._create_volume(provider_location=location, + group_id=group.id) + group_snapshot = ( + self._create_group_snapshot(group_id=group.id, + group_type_id=group_type.id)) snapshot = self._create_snapshot(volume_id=volume.id, - volume_type_id=cg_type.id, - cgsnapshot_id=cg_snapshot.id) + group_snapshot_id=group_snapshot.id) mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' model_update, snapshots_model_update = ( - self.driver.create_cgsnapshot(self.ctxt, cg_snapshot, [snapshot])) + self.driver.create_group_snapshot( + self.ctxt, group_snapshot, [snapshot])) + location = ast.literal_eval( + snapshots_model_update[0]['provider_location']) + self.assertEqual('2200', location['vol_hex_id']) self.assertEqual('available', snapshots_model_update[0]['status']) - self.assertEqual('available', model_update['status']) + self.assertEqual(fields.GroupStatus.AVAILABLE, model_update['status']) - def test_delete_cgsnapshot_sucessfully(self): - """test a successful cgsnapshot deletion.""" + def test_delete_consistency_group_snapshot_sucessfully(self): + """test a successful consistency group snapshot deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': '0002'}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) - cg_snapshot = self._create_cgsnapshot(cg_id=cg.id) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + volume = self._create_volume(provider_location=location, + group_id=group.id) + group_snapshot = ( + self._create_group_snapshot(group_id=group.id, + group_type_id=group_type.id)) snapshot = self._create_snapshot(volume_id=volume.id, - volume_type_id=cg_type.id, - cgsnapshot_id=cg_snapshot.id) + group_snapshot_id=group_snapshot.id) model_update, snapshots_model_update = ( - self.driver.delete_cgsnapshot(self.ctxt, cg_snapshot, [snapshot])) + self.driver.delete_group_snapshot( + self.ctxt, group_snapshot, [snapshot])) self.assertEqual('deleted', snapshots_model_update[0]['status']) - self.assertEqual('deleted', model_update['status']) + self.assertEqual(fields.GroupSnapshotStatus.DELETED, + model_update['status']) @mock.patch.object(helper.DS8KCommonHelper, 'delete_lun') - def test_delete_cgsnapshot_failed(self, mock_delete_lun): - """test a failed cgsnapshot deletion.""" + def test_delete_consistency_group_snapshot_failed(self, + mock_delete_lun): + """test a failed consistency group snapshot deletion.""" self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': '0002'}) - volume = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=cg.id) - cg_snapshot = self._create_cgsnapshot(cg_id=cg.id) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + group = self._create_group(group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + volume = self._create_volume(provider_location=location, + group_id=group.id) + group_snapshot = ( + self._create_group_snapshot(group_id=group.id, + group_type_id=group_type.id)) snapshot = self._create_snapshot(volume_id=volume.id, - volume_type_id=cg_type.id, - cgsnapshot_id=cg_snapshot.id) + group_snapshot_id=group_snapshot.id) mock_delete_lun.side_effect = ( restclient.APIException('delete snapshot failed.')) model_update, snapshots_model_update = ( - self.driver.delete_cgsnapshot(self.ctxt, cg_snapshot, [snapshot])) + self.driver.delete_group_snapshot( + self.ctxt, group_snapshot, [snapshot])) self.assertEqual('error_deleting', snapshots_model_update[0]['status']) - self.assertEqual('error_deleting', model_update['status']) - - @mock.patch('oslo_concurrency.lockutils.external_lock', - new=mock.MagicMock()) - @mock.patch.object(eventlet, 'sleep') - @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') - def test_create_consisgroup_from_consisgroup(self, mock_get_flashcopy, - mock_sleep): - """test creation of consistency group from consistency group.""" - self.driver = FakeDS8KProxy(self.storage_info, self.logger, - self.exception, self) - self.driver.setup(self.ctxt) - - cg_type = volume_types.create(self.ctxt, 'CG', {}) - src_cg = self._create_consistencygroup(volume_type_id=cg_type.id) - location = six.text_type({'vol_hex_id': '0002'}) - src_vol = self._create_volume(volume_type_id=cg_type.id, - provider_location=location, - consistencygroup_id=src_cg.id) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - volume = self._create_volume(volume_type_id=cg_type.id, - consistencygroup_id=cg.id) - mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] - model_update, volumes_model_update = ( - self.driver.create_consistencygroup_from_src( - self.ctxt, cg, [volume], None, None, src_cg, [src_vol])) - self.assertEqual(TEST_VOLUME_ID, - volumes_model_update[0]['metadata']['vol_hex_id']) - self.assertEqual(fields.ConsistencyGroupStatus.AVAILABLE, + self.assertEqual(fields.GroupSnapshotStatus.ERROR_DELETING, model_update['status']) - @mock.patch('oslo_concurrency.lockutils.external_lock', - new=mock.MagicMock()) @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') - def test_create_consisgroup_from_cgsnapshot(self, mock_get_flashcopy, - mock_sleep): - """test creation of consistency group from cgsnapshot.""" + def test_create_consisgroup_from_consisgroup(self, mock_get_flashcopy, + mock_create_lun, mock_sleep): + """test creation of consistency group from consistency group.""" + self.configuration.lss_range_for_cg = '20-23' self.driver = FakeDS8KProxy(self.storage_info, self.logger, self.exception, self) self.driver.setup(self.ctxt) - cg_type = volume_types.create(self.ctxt, 'CG', {}) - src_cg = self._create_consistencygroup(volume_type_id=cg_type.id) - src_vol = self._create_volume(volume_type_id=cg_type.id, - consistencygroup_id=src_cg.id) - cg_snapshot = self._create_cgsnapshot(cg_id=src_cg.id) - location = six.text_type({'vol_hex_id': '0002'}) + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + src_group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + location = six.text_type({'vol_hex_id': '2000'}) + src_vol = self._create_volume(provider_location=location, + group_id=src_group.id) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) + mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' + model_update, volumes_model_update = ( + self.driver.create_group_from_src( + self.ctxt, group, [volume], None, None, src_group, [src_vol])) + self.assertEqual('2200', + volumes_model_update[0]['metadata']['vol_hex_id']) + self.assertEqual(fields.GroupStatus.AVAILABLE, + model_update['status']) + + @mock.patch.object(eventlet, 'sleep') + @mock.patch.object(helper.DS8KCommonHelper, '_create_lun') + @mock.patch.object(helper.DS8KCommonHelper, 'get_flashcopy') + def test_create_consisgroup_from_cgsnapshot(self, mock_get_flashcopy, + mock_create_lun, mock_sleep): + """test creation of consistency group from cgsnapshot.""" + self.configuration.lss_range_for_cg = '20-23' + self.driver = FakeDS8KProxy(self.storage_info, self.logger, + self.exception, self) + self.driver.setup(self.ctxt) + + group_type = group_types.create( + self.ctxt, + 'group', + {'consistent_group_snapshot_enabled': ' True'} + ) + src_group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + src_vol = self._create_volume(group_id=src_group.id) + group_snapshot = ( + self._create_group_snapshot(group_id=src_group.id, + group_type_id=group_type.id)) + location = six.text_type({'vol_hex_id': '2000'}) snapshot = self._create_snapshot(volume_id=src_vol.id, - volume_type_id=cg_type.id, provider_location=location, - cgsnapshot_id=cg_snapshot.id) - cg = self._create_consistencygroup(volume_type_id=cg_type.id) - volume = self._create_volume(volume_type_id=cg_type.id, - consistencygroup_id=cg.id) + group_snapshot_id=group_snapshot.id) + group = self._create_group(host=TEST_GROUP_HOST, + group_type_id=group_type.id) + volume = self._create_volume(group_id=group.id) mock_get_flashcopy.side_effect = [[TEST_FLASHCOPY], {}] + mock_create_lun.return_value = '2200' model_update, volumes_model_update = ( - self.driver.create_consistencygroup_from_src( - self.ctxt, cg, [volume], cg_snapshot, [snapshot], None, None)) - self.assertEqual(TEST_VOLUME_ID, - volumes_model_update[0]['metadata']['vol_hex_id']) - self.assertEqual(fields.ConsistencyGroupStatus.AVAILABLE, + self.driver.create_group_from_src( + self.ctxt, group, [volume], group_snapshot, + [snapshot], None, None)) + self.assertEqual( + '2200', volumes_model_update[0]['metadata']['vol_hex_id']) + self.assertEqual(fields.GroupStatus.AVAILABLE, model_update['status']) @mock.patch.object(eventlet, 'sleep') @@ -2647,13 +2900,8 @@ class DS8KProxyTest(test.TestCase): self.ctxt, [volume], TEST_TARGET_DS8K_IP) self.assertEqual(TEST_TARGET_DS8K_IP, secondary_id) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(replication.Replication, 'do_pprc_failover') - def test_failover_host_failed(self, mock_do_pprc_failover, - mock_volume_types): + def test_failover_host_failed(self, mock_do_pprc_failover): """Failover host should raise exception when failed.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, @@ -2801,13 +3049,8 @@ class DS8KProxyTest(test.TestCase): self.ctxt, [volume], 'default') self.assertEqual('default', secondary_id) - @mock.patch.object( - volume_types, - 'get_volume_type_extra_specs', - return_value={'replication_enabled': ' True'}) @mock.patch.object(replication.Replication, 'start_pprc_failback') - def test_failback_host_failed(self, mock_start_pprc_failback, - mock_volume_types): + def test_failback_host_failed(self, mock_start_pprc_failback): """Failback host should raise exception when failed.""" self.configuration.replication_device = [TEST_REPLICATION_DEVICE] self.driver = FakeDS8KProxy(self.storage_info, self.logger, diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py index 4a2a865fd35..30b07cbfb9c 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_helper.py @@ -61,6 +61,8 @@ def filter_alnum(s): class DS8KCommonHelper(object): """Manage the primary backend, it is common class too.""" + OPTIONAL_PARAMS = ['ds8k_host_type', 'lss_range_for_cg'] + def __init__(self, conf, HTTPConnectorObject=None): self.conf = conf self._connector_obj = HTTPConnectorObject @@ -76,9 +78,13 @@ class DS8KCommonHelper(object): def _get_value(self, key): if getattr(self.conf, 'safe_get', 'get') == 'get': - return self.conf.get(key) + value = self.conf.get(key) else: - return self.conf.safe_get(key) + value = self.conf.safe_get(key) + if not value and key not in self.OPTIONAL_PARAMS: + raise exception.InvalidParameterValue( + err=(_('Param [%s] should be provided.') % key)) + return value def get_thin_provision(self): return self._disable_thin_provision @@ -100,6 +106,7 @@ class DS8KCommonHelper(object): self._get_storage_information() self._check_host_type() self.backend['pools_str'] = self._get_value('san_clustername') + self._get_lss_ids_for_cg() self._verify_version() self._verify_pools() @@ -109,8 +116,8 @@ class DS8KCommonHelper(object): def _get_certificate(self, host): cert_file = strings.CERTIFICATES_PATH + host + '.pem' - msg = "certificate file for DS8K %(host)s: %(cert)s" - LOG.debug(msg, {'host': host, 'cert': cert_file}) + LOG.debug("certificate file for DS8K %(host)s: %(cert)s", + {'host': host, 'cert': cert_file}) # Use the certificate if it exists, otherwise use the System CA Bundle if os.path.exists(cert_file): return cert_file @@ -119,23 +126,23 @@ class DS8KCommonHelper(object): return True def _create_client(self): + san_ip = self._get_value('san_ip') try: clear_pass = cryptish.decrypt(self._get_value('san_password')) except TypeError: - err = _('Param [san_password] is invalid.') - raise exception.InvalidParameterValue(err=err) - verify = self._get_certificate(self._get_value('san_ip')) + raise exception.InvalidParameterValue( + err=_('Param [san_password] is invalid.')) + verify = self._get_certificate(san_ip) try: self._client = restclient.RESTScheduler( - self._get_value('san_ip'), + san_ip, self._get_value('san_login'), clear_pass, self._connector_obj, verify) except restclient.TimeoutException: - msg = (_("Can't connect to %(host)s") % - {'host': self._get_value('san_ip')}) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_("Can't connect to %(host)s") % {'host': san_ip})) self.backend['rest_version'] = self._get_version()['bundle_version'] LOG.info("Connection to DS8K storage system %(host)s has been " "established successfully, the version of REST is %(rest)s.", @@ -148,12 +155,31 @@ class DS8KCommonHelper(object): self.backend['storage_wwnn'] = storage_info['wwnn'] self.backend['storage_version'] = storage_info['release'] + def _get_lss_ids_for_cg(self): + lss_range = self._get_value('lss_range_for_cg') + if lss_range: + lss_range = lss_range.replace(' ', '').split('-') + if len(lss_range) == 1: + begin = int(lss_range[0], 16) + end = begin + else: + begin = int(lss_range[0], 16) + end = int(lss_range[1], 16) + if begin > 0xFF or end > 0xFF or begin > end: + raise exception.InvalidParameterValue( + err=_('Param [lss_range_for_cg] is invalid, it ' + 'should be within 00-FF.')) + self.backend['lss_ids_for_cg'] = set( + ('%02x' % i).upper() for i in range(begin, end + 1)) + else: + self.backend['lss_ids_for_cg'] = set() + def _check_host_type(self): ds8k_host_type = self._get_value('ds8k_host_type') - if ((ds8k_host_type is not None) and + if (ds8k_host_type and (ds8k_host_type not in VALID_HOST_TYPES)): - msg = (_("Param [ds8k_host_type] must be one of: %(values)s.") % - {'values': VALID_HOST_TYPES[1:-1]}) + msg = (_("Param [ds8k_host_type] must be one of: %(values)s.") + % {'values': VALID_HOST_TYPES[1:-1]}) LOG.error(msg) raise exception.InvalidParameterValue(err=msg) self.backend['host_type_override'] = ( @@ -161,12 +187,12 @@ class DS8KCommonHelper(object): def _verify_version(self): if self.backend['storage_version'] == '8.0.1': - msg = (_("8.0.1 does not support bulk deletion of volumes, " - "if you want to use this version of driver, " - "please upgrade the CCL, and make sure the REST " - "version is not lower than %s.") - % VALID_REST_VERSION_5_8_MIN) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + data=(_("8.0.1 does not support bulk deletion of volumes, " + "if you want to use this version of driver, " + "please upgrade the CCL, and make sure the REST " + "version is not lower than %s.") + % VALID_REST_VERSION_5_8_MIN)) else: if (('5.7' in self.backend['rest_version'] and dist_version.LooseVersion(self.backend['rest_version']) < @@ -174,13 +200,13 @@ class DS8KCommonHelper(object): ('5.8' in self.backend['rest_version'] and dist_version.LooseVersion(self.backend['rest_version']) < dist_version.LooseVersion(VALID_REST_VERSION_5_8_MIN))): - msg = (_("REST version %(invalid)s is lower than " - "%(valid)s, please upgrade it in DS8K.") - % {'invalid': self.backend['rest_version'], - 'valid': (VALID_REST_VERSION_5_7_MIN if '5.7' in - self.backend['rest_version'] else - VALID_REST_VERSION_5_8_MIN)}) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + data=(_("REST version %(invalid)s is lower than " + "%(valid)s, please upgrade it in DS8K.") + % {'invalid': self.backend['rest_version'], + 'valid': (VALID_REST_VERSION_5_7_MIN if '5.7' in + self.backend['rest_version'] else + VALID_REST_VERSION_5_8_MIN)})) if self._connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD: if (dist_version.LooseVersion(self.backend['storage_version']) < @@ -193,15 +219,15 @@ class DS8KCommonHelper(object): elif self._connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD: ptype = 'ckd' else: - err = _('Param [connection_type] is invalid.') - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=_('Param [connection_type] is invalid.')) self._storage_pools = self.get_pools() for pid, p in self._storage_pools.items(): if p['stgtype'] != ptype: LOG.error('The stgtype of pool %(pool)s is %(ptype)s.', {'pool': pid, 'ptype': p['stgtype']}) - err = _('Param [san_clustername] is invalid.') - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err='Param [san_clustername] is invalid.') @proxy.logger def get_pools(self, new_pools=None): @@ -229,7 +255,7 @@ class DS8KCommonHelper(object): }) for p in pools) @proxy.logger - def find_available_lss(self, pool, find_new_pid, excluded_lss): + def find_pool_lss_pair(self, pool, find_new_pid, excluded_lss): if pool: node = int(pool[1:], 16) % 2 lss = self._find_lss(node, excluded_lss) @@ -237,9 +263,9 @@ class DS8KCommonHelper(object): return (pool, lss) else: if not find_new_pid: - msg = _('All LSS/LCU IDs for configured pools on ' - 'storage are exhausted.') - raise restclient.LssIDExhaustError(message=msg) + raise restclient.LssIDExhaustError( + message=_('All LSS/LCU IDs for configured pools ' + 'on storage are exhausted.')) # find new pool id and lss for lun return self.find_biggest_pool_and_lss(excluded_lss) @@ -250,8 +276,8 @@ class DS8KCommonHelper(object): lss = self._find_lss(pool['node'], excluded_lss) if lss: return pool_id, lss - msg = _("All LSS/LCU IDs for configured pools are exhausted.") - raise restclient.LssIDExhaustError(message=msg) + raise restclient.LssIDExhaustError( + message=_("All LSS/LCU IDs for configured pools are exhausted.")) @proxy.logger def _find_lss(self, node, excluded_lss): @@ -259,20 +285,35 @@ class DS8KCommonHelper(object): existing_lss = self.get_all_lss(fileds) LOG.info("existing LSS IDs are: %s.", ','.join([lss['id'] for lss in existing_lss])) + existing_lss_cg, nonexistent_lss_cg = ( + self._classify_lss_for_cg(existing_lss)) + # exclude LSSs that are full. if excluded_lss: existing_lss = [lss for lss in existing_lss if lss['id'] not in excluded_lss] - lss = self._find_from_existing_lss(node, existing_lss) - lss = lss if lss else self._find_from_unexisting_lss(node, - existing_lss) + # exclude LSSs that reserved for CG. + candidates = [lss for lss in existing_lss + if lss['id'] not in existing_lss_cg] + lss = self._find_from_existing_lss(node, candidates) + if not lss: + lss = self._find_from_nonexistent_lss(node, existing_lss, + nonexistent_lss_cg) return lss + def _classify_lss_for_cg(self, existing_lss): + existing_lss_ids = set(lss['id'] for lss in existing_lss) + existing_lss_cg = existing_lss_ids & self.backend['lss_ids_for_cg'] + nonexistent_lss_cg = self.backend['lss_ids_for_cg'] - existing_lss_cg + return existing_lss_cg, nonexistent_lss_cg + def _find_from_existing_lss(self, node, existing_lss): + # exclude LSSs that are used by PPRC paths. lss_in_pprc = self.get_lss_in_pprc_paths() if lss_in_pprc: existing_lss = [lss for lss in existing_lss if lss['id'] not in lss_in_pprc] + # exclude wrong type of LSSs and those that are not in expected node. existing_lss = [lss for lss in existing_lss if lss['type'] == 'fb' and int(lss['group']) == node] lss_id = None @@ -286,18 +327,19 @@ class DS8KCommonHelper(object): {'lss': lss_id, 'num': lss['configvols']}) return lss_id - def _find_from_unexisting_lss(self, node, existing_lss): + def _find_from_nonexistent_lss(self, node, existing_lss, lss_cg=None): addrgrps = set(int(lss['addrgrp'], 16) for lss in existing_lss if lss['type'] == 'ckd' and int(lss['group']) == node) - fulllss = set(int(lss['id'], 16) for lss in existing_lss if lss['type'] == 'fb' and int(lss['group']) == node) - - # look for an available lss from unexisting lss + cglss = set(int(lss, 16) for lss in lss_cg) if lss_cg else set() + # look for an available lss from nonexistent lss lss_id = None for lss in range(node, LSS_SLOTS, 2): addrgrp = lss // 16 - if addrgrp not in addrgrps and lss not in fulllss: + if (addrgrp not in addrgrps and + lss not in fulllss and + lss not in cglss): lss_id = ("%02x" % lss).upper() break LOG.info('_find_from_unexisting_lss: choose %s.', lss_id) @@ -314,7 +356,7 @@ class DS8KCommonHelper(object): if lun.type_os400: volData['os400'] = lun.type_os400 volData['name'] = lun.ds_name - volData['pool'], volData['lss'] = lun.lss_pair['source'] + volData['pool'], volData['lss'] = lun.pool_lss_pair['source'] lun.ds_id = self._create_lun(volData) return lun @@ -339,7 +381,7 @@ class DS8KCommonHelper(object): else: lun_ids_str = ','.join(lun_ids) lun_ids = [] - LOG.error("Deleting volumes: %s.", lun_ids_str) + LOG.info("Deleting volumes: %s.", lun_ids_str) self._delete_lun(lun_ids_str) def get_lss_in_pprc_paths(self): @@ -361,7 +403,7 @@ class DS8KCommonHelper(object): vol_ids = [vol['volume_id'] for vol in host['mappings_briefs']] if vol_id in vol_ids: host_ids.append(host['id']) - LOG.info('_find_host: host IDs are: %s.', host_ids) + LOG.info('_find_host: host IDs are %s.', ','.join(host_ids)) return host_ids def wait_flashcopy_finished(self, src_luns, tgt_luns): @@ -378,9 +420,9 @@ class DS8KCommonHelper(object): continue if fcs[0]['state'] not in ('valid', 'validation_required'): - msg = (_('Flashcopy ended up in bad state %s. ' - 'Rolling back.') % fcs[0]['state']) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_('Flashcopy ended up in bad state %s. ' + 'Rolling back.') % fcs[0]['state'])) if fc_state.count(False) == 0: break finished = True @@ -420,10 +462,10 @@ class DS8KCommonHelper(object): unfinished_pairs = [p for p in pairs if p['state'] != state] for p in unfinished_pairs: if p['state'] in invalid_states: - msg = (_('Metro Mirror pair %(id)s enters into ' - 'state %(state)s. ') % - {'id': p['id'], 'state': p['state']}) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_('Metro Mirror pair %(id)s enters into ' + 'state %(state)s. ') + % {'id': p['id'], 'state': p['state']})) finally: if not finished and delete: pair_ids = {'ids': ','.join([p['id'] for p in pairs])} @@ -459,22 +501,19 @@ class DS8KCommonHelper(object): hp['wwpn'] for hp in host_ports) unconfigured_ports = set( hp['wwpn'] for hp in host_ports if not hp['host_id']) - msg = ("initialize_connection: defined_hosts: %(defined)s, " - "unknown_ports: %(unknown)s, unconfigured_ports: " - "%(unconfigured)s.") - LOG.debug(msg, { - "defined": defined_hosts, - "unknown": unknown_ports, - "unconfigured": unconfigured_ports - }) + LOG.debug("initialize_connection: defined_hosts: %(defined)s, " + "unknown_ports: %(unknown)s, unconfigured_ports: " + "%(unconfigured)s.", {"defined": defined_hosts, + "unknown": unknown_ports, + "unconfigured": unconfigured_ports}) # Create host if it is not defined if not defined_hosts: host_id = self._create_host(host)['id'] elif len(defined_hosts) == 1: host_id = defined_hosts.pop() else: - msg = _('More than one host defined for requested ports.') - raise restclient.APIException(message=msg) + raise restclient.APIException( + message='More than one host defined for requested ports.') LOG.info('Volume will be attached to host %s.', host_id) # Create missing host ports @@ -511,13 +550,11 @@ class DS8KCommonHelper(object): host_ports = None delete_ports = None defined_hosts = self._find_host(vol_id) - msg = ("terminate_connection: host_ports: %(host)s, defined_hosts: " - "%(defined)s, delete_ports: %(delete)s.") - LOG.debug(msg, { - "host": host_ports, - "defined": defined_hosts, - "delete": delete_ports - }) + LOG.debug("terminate_connection: host_ports: %(host)s, " + "defined_hosts: %(defined)s, delete_ports: %(delete)s.", + {"host": host_ports, + "defined": defined_hosts, + "delete": delete_ports}) if not defined_hosts: LOG.info('Could not find host.') @@ -552,33 +589,49 @@ class DS8KCommonHelper(object): target_map = {initiator.upper(): target_ports for initiator in connector['wwpns']} ret_info['data']['initiator_target_map'] = target_map - return ret_info return ret_info - def create_group(self, ctxt, group): + def create_group(self, group): return {'status': fields.GroupStatus.AVAILABLE} - def delete_group(self, ctxt, group, luns): + def delete_group(self, group, src_luns): volumes_model_update = [] model_update = {'status': fields.GroupStatus.DELETED} - if luns: + if src_luns: try: - self.delete_lun(luns) - except restclient.APIException: + self.delete_lun(src_luns) + except restclient.APIException as e: model_update['status'] = fields.GroupStatus.ERROR_DELETING LOG.exception( - "Failed to delete the volumes in group %(group)s", - {'group': group.id}) + "Failed to delete the volumes in group %(group)s, " + "Exception = %(ex)s", + {'group': group.id, 'ex': e}) - for lun in luns: + for src_lun in src_luns: volumes_model_update.append({ - 'id': lun.os_id, + 'id': src_lun.os_id, 'status': model_update['status'] }) return model_update, volumes_model_update - def update_group(self, ctxt, group, add_volumes, remove_volumes): - return None, None, None + def delete_group_snapshot(self, group_snapshot, tgt_luns): + snapshots_model_update = [] + model_update = {'status': fields.GroupSnapshotStatus.DELETED} + if tgt_luns: + try: + self.delete_lun(tgt_luns) + except restclient.APIException as e: + model_update['status'] = ( + fields.GroupSnapshotStatus.ERROR_DELETING) + LOG.error("Failed to delete snapshots in group snapshot " + "%(gsnapshot)s, Exception = %(ex)s", + {'gsnapshot': group_snapshot.id, 'ex': e}) + for tgt_lun in tgt_luns: + snapshots_model_update.append({ + 'id': tgt_lun.os_id, + 'status': model_update['status'] + }) + return model_update, snapshots_model_update def _delete_lun(self, lun_ids_str): self._client.send('DELETE', '/volumes', @@ -768,28 +821,35 @@ class DS8KReplicationSourceHelper(DS8KCommonHelper): excluded_lss) if lss: return pool_id, lss - msg = _("All LSS/LCU IDs for configured pools are exhausted.") - raise restclient.LssIDExhaustError(message=msg) + raise restclient.LssIDExhaustError( + message=_("All LSS/LCU IDs for configured pools are exhausted.")) @proxy.logger def _find_lss_for_type_replication(self, node, excluded_lss): - # prefer to choose the non-existing one firstly + # prefer to choose non-existing one first. fileds = ['id', 'type', 'addrgrp', 'group', 'configvols'] existing_lss = self.get_all_lss(fileds) LOG.info("existing LSS IDs are %s", ','.join([lss['id'] for lss in existing_lss])) - lss_id = self._find_from_unexisting_lss(node, existing_lss) + existing_lss_cg, nonexistent_lss_cg = ( + self._classify_lss_for_cg(existing_lss)) + lss_id = self._find_from_nonexistent_lss(node, existing_lss, + nonexistent_lss_cg) if not lss_id: if excluded_lss: existing_lss = [lss for lss in existing_lss if lss['id'] not in excluded_lss] - lss_id = self._find_from_existing_lss(node, existing_lss) + candidates = [lss for lss in existing_lss + if lss['id'] not in existing_lss_cg] + lss_id = self._find_from_existing_lss(node, candidates) return lss_id class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): """Manage target storage for replication.""" + OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs'] + def setup(self): self._create_client() self._get_storage_information() @@ -814,6 +874,21 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): self.backend['port_pairs'] = port_pairs self.backend['id'] = self._get_value('backend_id') + @proxy.logger + def _find_lss_for_type_replication(self, node, excluded_lss): + # prefer to choose non-existing one first. + fileds = ['id', 'type', 'addrgrp', 'group', 'configvols'] + existing_lss = self.get_all_lss(fileds) + LOG.info("existing LSS IDs are %s", + ','.join([lss['id'] for lss in existing_lss])) + lss_id = self._find_from_nonexistent_lss(node, existing_lss) + if not lss_id: + if excluded_lss: + existing_lss = [lss for lss in existing_lss + if lss['id'] not in excluded_lss] + lss_id = self._find_from_existing_lss(node, existing_lss) + return lss_id + def create_lun(self, lun): volData = { 'cap': self._gb2b(lun.size), @@ -826,7 +901,7 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): volData['os400'] = lun.type_os400 volData['name'] = lun.replica_ds_name - volData['pool'], volData['lss'] = lun.lss_pair['target'] + volData['pool'], volData['lss'] = lun.pool_lss_pair['target'] volID = self._create_lun(volData) lun.replication_driver_data.update( {self.backend['id']: {'vol_hex_id': volID}}) @@ -848,16 +923,19 @@ class DS8KReplicationTargetHelper(DS8KReplicationSourceHelper): class DS8KECKDHelper(DS8KCommonHelper): """Manage ECKD volume.""" + OPTIONAL_PARAMS = ['ds8k_host_type', 'port_pairs', 'ds8k_ssid_prefix', + 'lss_range_for_cg'] + @staticmethod def _gb2cyl(gb): # now only support 3390, no 3380 or 3390-A cyl = int(math.ceil(gb * 1263.28)) if cyl > 65520: - msg = (_("For 3390 volume, capacity can be in the range " - "1-65520(849KiB to 55.68GiB) cylinders, now it " - "is %(gb)d GiB, equals to %(cyl)d cylinders.") % - {'gb': gb, 'cyl': cyl}) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + message=(_("For 3390 volume, capacity can be in the range " + "1-65520(849KiB to 55.68GiB) cylinders, now it " + "is %(gb)d GiB, equals to %(cyl)d cylinders.") + % {'gb': gb, 'cyl': cyl})) return cyl @staticmethod @@ -874,6 +952,7 @@ class DS8KECKDHelper(DS8KCommonHelper): self._create_client() self._get_storage_information() self._check_host_type() + self._get_lss_ids_for_cg() self.backend['pools_str'] = self._get_value('san_clustername') ssid_prefix = self._get_value('ds8k_ssid_prefix') self.backend['ssid_prefix'] = ssid_prefix if ssid_prefix else 'FF' @@ -885,10 +964,10 @@ class DS8KECKDHelper(DS8KCommonHelper): def _check_and_verify_lcus(self): map_str = self._get_value('ds8k_devadd_unitadd_mapping') if not map_str: - err = _('Param [ds8k_devadd_unitadd_mapping] is not ' - 'provided, please provide the mapping between ' - 'IODevice address and unit address.') - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=_('Param [ds8k_devadd_unitadd_mapping] is not ' + 'provided, please provide the mapping between ' + 'IODevice address and unit address.')) # verify the LCU mappings = map_str.replace(' ', '').upper().split(';') @@ -896,9 +975,9 @@ class DS8KECKDHelper(DS8KCommonHelper): dev_mapping = {p[1]: int(p[0], 16) for p in pairs} for lcu in dev_mapping.keys(): if int(lcu, 16) > 255: - err = (_('LCU %s in param [ds8k_devadd_unitadd_mapping]' - 'is invalid, it should be within 00-FF.') % lcu) - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=(_('LCU %s in param [ds8k_devadd_unitadd_mapping]' + 'is invalid, it should be within 00-FF.') % lcu)) # verify address group all_lss = self.get_all_lss(['id', 'type']) @@ -907,23 +986,24 @@ class DS8KECKDHelper(DS8KCommonHelper): ckd_addrgrp = set((int(lcu, 16) // 16) for lcu in dev_mapping.keys()) intersection = ckd_addrgrp & fb_addrgrp if intersection: - msg = (_('Invaild LCUs which first digit is %s, they are' - 'for fb volume.') % ', '.join(intersection)) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + message=(_('LCUs which first digit is %s are invalid, they ' + 'are for FB volume.') % ', '.join(intersection))) # create LCU that doesn't exist ckd_lss = set(lss['id'] for lss in all_lss if lss['type'] == 'ckd') - unexisting_lcu = set(dev_mapping.keys()) - ckd_lss - if unexisting_lcu: - LOG.info('LCUs %s do not exist in DS8K, they will be created.', - ','.join(unexisting_lcu)) - for lcu in unexisting_lcu: + nonexistent_lcu = set(dev_mapping.keys()) - ckd_lss + if nonexistent_lcu: + LOG.info('LCUs %s do not exist in DS8K, they will be ' + 'created.', ','.join(nonexistent_lcu)) + for lcu in nonexistent_lcu: try: self._create_lcu(self.backend['ssid_prefix'], lcu) except restclient.APIException as e: - msg = (_('can not create lcu %(lcu)s, Exception= ' - '%(e)s') % {'lcu': lcu, 'e': six.text_type(e)}) - raise exception.VolumeDriverException(data=msg) + raise exception.VolumeDriverException( + message=(_('Can not create lcu %(lcu)s, ' + 'Exception= %(e)s.') + % {'lcu': lcu, 'e': six.text_type(e)})) return dev_mapping def _format_pools(self, pools): @@ -944,14 +1024,19 @@ class DS8KECKDHelper(DS8KCommonHelper): # all LCUs have existed, not like LSS all_lss = self.get_all_lss(['id', 'type', 'group', 'configvols']) existing_lcu = [lss for lss in all_lss if lss['type'] == 'ckd'] + excluded_lcu = excluded_lcu or [] candidate_lcu = [lcu for lcu in existing_lcu if ( lcu['id'] in self.backend['device_mapping'].keys() and lcu['id'] not in excluded_lcu and lcu['group'] == str(node))] + + # exclude LCUs reserved for CG. + candidate_lcu = [lss for lss in candidate_lcu if lss['id'] + not in self.backend['lss_ids_for_cg']] if not candidate_lcu: return None - # perfer to use LCU that is not in PPRC path first. + # prefer to use LCU that is not in PPRC path first. lcu_pprc = self.get_lss_in_pprc_paths() & set( self.backend['device_mapping'].keys()) if lcu_pprc: @@ -984,7 +1069,7 @@ class DS8KECKDHelper(DS8KCommonHelper): } lun.data_type = '3390' volData['name'] = lun.ds_name - volData['pool'], volData['lss'] = lun.lss_pair['source'] + volData['pool'], volData['lss'] = lun.pool_lss_pair['source'] lun.ds_id = self._create_lun(volData) return lun @@ -1030,7 +1115,7 @@ class DS8KReplicationTargetECKDHelper(DS8KECKDHelper, lun.data_type = '3390' volData['name'] = lun.replica_ds_name - volData['pool'], volData['lss'] = lun.lss_pair['target'] + volData['pool'], volData['lss'] = lun.pool_lss_pair['target'] volID = self._create_lun(volData) lun.replication_driver_data.update( {self.backend['id']: {'vol_hex_id': volID}}) diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py index f9e11c30e7f..9a579882a92 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_proxy.py @@ -37,7 +37,7 @@ volume_driver = cinder.volume.drivers.ibm.ibm_storage.ibm_storage.IBMStorageDriver chap = disabled connection_type = fibre_channel -replication_device = backend_id: bar, +replication_device = connection_type: fibre_channel, backend_id: bar, san_ip: bar.com, san_login: actual_username, san_password: actual_password, san_clustername: P4, port_pairs: I0236-I0306; I0237-I0307 @@ -57,24 +57,28 @@ connection_type = fibre_channel """ import ast +import collections import json import six from oslo_config import cfg from oslo_log import log as logging +from cinder import context +from cinder import coordination from cinder import exception from cinder.i18n import _ +from cinder import objects from cinder.objects import fields from cinder.utils import synchronized import cinder.volume.drivers.ibm.ibm_storage as storage +from cinder.volume.drivers.ibm.ibm_storage import ( + ds8k_replication as replication) from cinder.volume.drivers.ibm.ibm_storage import ds8k_helper as helper -from cinder.volume.drivers.ibm.ibm_storage \ - import ds8k_replication as replication from cinder.volume.drivers.ibm.ibm_storage import ds8k_restclient as restclient from cinder.volume.drivers.ibm.ibm_storage import proxy from cinder.volume.drivers.ibm.ibm_storage import strings -from cinder.volume import group_types +from cinder.volume import utils from cinder.volume import volume_types LOG = logging.getLogger(__name__) @@ -91,10 +95,7 @@ EXTRA_SPECS_DEFAULTS = { 'thin': True, 'replication_enabled': False, 'consistency': False, - 'os400': '', - 'consistent_group_replication_enabled': False, - 'group_replication_enabled': False, - 'consistent_group_snapshot_enabled': False, + 'os400': '' } ds8k_opts = [ @@ -105,7 +106,11 @@ ds8k_opts = [ cfg.StrOpt( 'ds8k_ssid_prefix', default='FF', - help='Set the first two digits of SSID'), + help='Set the first two digits of SSID.'), + cfg.StrOpt( + 'lss_range_for_cg', + default='', + help='Reserve LSSs for consistency group.'), cfg.StrOpt( 'ds8k_host_type', default='auto', @@ -135,16 +140,23 @@ class Lun(object): self.type_os400 = lun.type_os400 self.data_type = lun.data_type self.type_replication = lun.type_replication + self.group = lun.group if not self.is_snapshot and self.type_replication: self.replica_ds_name = lun.replica_ds_name - self.replication_driver_data = lun.replication_driver_data + self.replication_driver_data = ( + lun.replication_driver_data.copy()) self.replication_status = lun.replication_status - self.lss_pair = lun.lss_pair + self.pool_lss_pair = lun.pool_lss_pair def update_volume(self, lun): volume_update = lun.get_volume_update() volume_update['provider_location'] = six.text_type({ 'vol_hex_id': self.ds_id}) + if self.type_replication: + volume_update['replication_driver_data'] = json.dumps( + self.replication_driver_data) + volume_update['metadata']['replication'] = six.text_type( + self.replication_driver_data) volume_update['metadata']['vol_hex_id'] = self.ds_id return volume_update @@ -165,19 +177,22 @@ class Lun(object): if volume.provider_location: provider_location = ast.literal_eval(volume.provider_location) - self.ds_id = provider_location[six.text_type('vol_hex_id')] + self.ds_id = provider_location['vol_hex_id'] else: self.ds_id = None self.cinder_name = volume.display_name - self.lss_pair = {} + self.pool_lss_pair = {} self.is_snapshot = is_snapshot if self.is_snapshot: + self.group = (Group(volume.group_snapshot, True) + if volume.group_snapshot else None) self.size = volume.volume_size # ds8k supports at most 16 chars self.ds_name = ( "OS%s:%s" % ('snap', helper.filter_alnum(self.cinder_name)) )[:16] else: + self.group = Group(volume.group) if volume.group else None self.size = volume.size self.ds_name = ( "OS%s:%s" % ('vol', helper.filter_alnum(self.cinder_name)) @@ -193,17 +208,17 @@ class Lun(object): # now only support one replication target. replication_target = sorted( self.replication_driver_data.values())[0] - replica_id = replication_target[six.text_type('vol_hex_id')] - self.lss_pair = { + replica_id = replication_target['vol_hex_id'] + self.pool_lss_pair = { 'source': (None, self.ds_id[0:2]), 'target': (None, replica_id[0:2]) } if os400: if os400 not in VALID_OS400_VOLUME_TYPES.keys(): - msg = (_("The OS400 volume type provided, %s, is not " - "a valid volume type.") % os400) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_("The OS400 volume type provided, %s, is not " + "a valid volume type.") % os400)) self.type_os400 = os400 if os400 not in ['050', '099']: self.size = VALID_OS400_VOLUME_TYPES[os400] @@ -284,13 +299,14 @@ class Lun(object): class Group(object): """provide group information for driver from group db object.""" - def __init__(self, group): - gid = group.get('group_type_id') - specs = group_types.get_group_type_specs(gid) if gid else {} - self.type_cg_snapshot = specs.get( - 'consistent_group_snapshot_enabled', ' %s' % - EXTRA_SPECS_DEFAULTS['consistent_group_snapshot_enabled'] - ).upper() == strings.METADATA_IS_TRUE + def __init__(self, group, is_snapshot=False): + self.id = group.id + self.host = group.host + if is_snapshot: + self.snapshots = group.snapshots + else: + self.volumes = group.volumes + self.consisgroup_enabled = utils.is_group_a_cg_snapshot_type(group) class DS8KProxy(proxy.IBMStorageProxy): @@ -307,6 +323,9 @@ class DS8KProxy(proxy.IBMStorageProxy): self._active_backend_id = active_backend_id self.configuration = driver.configuration self.configuration.append_config_values(ds8k_opts) + # TODO(jiamin): this cache is used to handle concurrency issue, but it + # hurts HA, we will find whether is it possible to store it in storage. + self.consisgroup_cache = {} @proxy._trace_time def setup(self, ctxt): @@ -325,9 +344,9 @@ class DS8KProxy(proxy.IBMStorageProxy): self._helper = helper.DS8KECKDHelper(self.configuration, self._connector_obj) else: - err = (_("Param [connection_type] %s is invalid.") - % connection_type) - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=(_("Param [connection_type] %s is invalid.") + % connection_type)) if replication_devices: self._do_replication_setup(replication_devices, self._helper) @@ -335,9 +354,9 @@ class DS8KProxy(proxy.IBMStorageProxy): @proxy.logger def _do_replication_setup(self, devices, src_helper): if len(devices) >= 2: - err = _("Param [replication_device] is invalid, Driver " - "support only one replication target.") - raise exception.InvalidParameterValue(err=err) + raise exception.InvalidParameterValue( + err=_("Param [replication_device] is invalid, Driver " + "support only one replication target.")) self._replication = replication.Replication(src_helper, devices[0]) self._replication.check_physical_links() @@ -368,9 +387,9 @@ class DS8KProxy(proxy.IBMStorageProxy): LOG.error(msg) raise exception.CinderException(message=msg) else: - msg = (_('Backend %s is not initialized.') - % self.configuration.volume_backend_name) - raise exception.CinderException(data=msg) + raise exception.VolumeDriverException( + message=(_('Backend %s is not initialized.') + % self.configuration.volume_backend_name)) stats = { "volume_backend_name": self.configuration.volume_backend_name, @@ -384,7 +403,6 @@ class DS8KProxy(proxy.IBMStorageProxy): "free_capacity_gb": self._b2gb( sum(p['capavail'] for p in storage_pools.values())), "reserved_percentage": self.configuration.reserved_percentage, - "consistencygroup_support": True, "consistent_group_snapshot_enabled": True, "multiattach": False } @@ -397,7 +415,7 @@ class DS8KProxy(proxy.IBMStorageProxy): def _assert(self, assert_condition, exception_message=''): if not assert_condition: LOG.error(exception_message) - raise restclient.APIException(data=exception_message) + raise exception.VolumeDriverException(message=exception_message) @proxy.logger def _create_lun_helper(self, lun, pool=None, find_new_pid=True): @@ -415,20 +433,107 @@ class DS8KProxy(proxy.IBMStorageProxy): raise restclient.APIException(message=msg) # There is a time gap between find available LSS slot and # lun actually occupies it. - excluded_lss = [] + excluded_lss = set() while True: try: - if lun.type_replication and not lun.is_snapshot: - lun.lss_pair = self._replication.find_available_lss_pair( - excluded_lss) + if lun.group and lun.group.consisgroup_enabled: + lun.pool_lss_pair = { + 'source': self._find_pool_lss_pair_for_cg( + lun, excluded_lss)} else: - lun.lss_pair['source'] = self._helper.find_available_lss( - pool, find_new_pid, excluded_lss) + if lun.type_replication and not lun.is_snapshot: + lun.pool_lss_pair = ( + self._replication.find_pool_lss_pair( + excluded_lss)) + else: + lun.pool_lss_pair = { + 'source': self._helper.find_pool_lss_pair( + pool, find_new_pid, excluded_lss)} return self._helper.create_lun(lun) except restclient.LssFullException: LOG.warning("LSS %s is full, find another one.", - lun.lss_pair['source'][1]) - excluded_lss.append(lun.lss_pair['source'][1]) + lun.pool_lss_pair['source'][1]) + excluded_lss.add(lun.pool_lss_pair['source'][1]) + + @coordination.synchronized('{self.prefix}-consistency-group') + def _find_pool_lss_pair_for_cg(self, lun, excluded_lss): + lss_in_cache = self.consisgroup_cache.get(lun.group.id, set()) + if not lss_in_cache: + lss_in_cg = self._get_lss_in_cg(lun.group, lun.is_snapshot) + LOG.debug("LSSs used by CG %(cg)s are %(lss)s.", + {'cg': lun.group.id, 'lss': ','.join(lss_in_cg)}) + available_lss = lss_in_cg - excluded_lss + else: + available_lss = lss_in_cache - excluded_lss + if not available_lss: + available_lss = self._find_lss_for_cg() + + pid, lss = self._find_pool_for_lss(available_lss) + if pid: + lss_in_cache.add(lss) + self.consisgroup_cache[lun.group.id] = lss_in_cache + else: + raise exception.VolumeDriverException( + message=_('There are still some available LSSs for CG, ' + 'but they are not in the same node as pool.')) + return (pid, lss) + + def _get_lss_in_cg(self, group, is_snapshot=False): + # Driver can not support the case that dedicating LSS for CG while + # user enable multiple backends which use the same DS8K. + try: + volume_backend_name = ( + group.host[group.host.index('@') + 1:group.host.index('#')]) + except ValueError: + raise exception.VolumeDriverException( + message=(_('Invalid host %(host)s in group %(group)s') + % {'host': group.host, 'group': group.id})) + lss_in_cg = set() + if volume_backend_name == self.configuration.volume_backend_name: + if is_snapshot: + luns = [Lun(snapshot, is_snapshot=True) + for snapshot in group.snapshots] + else: + luns = [Lun(volume) for volume in group.volumes] + lss_in_cg = set(lun.ds_id[:2] for lun in luns if lun.ds_id) + return lss_in_cg + + def _find_lss_for_cg(self): + # Unable to get CGs/groups belonging to the current tenant, so + # get all of them, this function will consume some time if there + # are so many CGs/groups. + lss_used = set() + ctxt = context.get_admin_context() + existing_groups = objects.GroupList.get_all( + ctxt, filters={'status': 'available'}) + for group in existing_groups: + if Group(group).consisgroup_enabled: + lss_used = lss_used | self._get_lss_in_cg(group) + existing_groupsnapshots = objects.GroupSnapshotList.get_all( + ctxt, filters={'status': 'available'}) + for group in existing_groupsnapshots: + if Group(group, True).consisgroup_enabled: + lss_used = lss_used | self._get_lss_in_cg(group, True) + available_lss = set(self._helper.backend['lss_ids_for_cg']) - lss_used + for lss_set in self.consisgroup_cache.values(): + available_lss -= lss_set + self._assert(available_lss, + "All LSSs reserved for CG have been used out, " + "please reserve more LSS for CG if there are still" + "some empty LSSs left.") + LOG.debug('_find_lss_for_cg: available LSSs for consistency ' + 'group are %s', ','.join(available_lss)) + return available_lss + + @proxy.logger + def _find_pool_for_lss(self, available_lss): + for lss in available_lss: + pid = self._helper.get_pool(lss) + if pid: + return (pid, lss) + raise exception.VolumeDriverException( + message=(_("Can not find pool for LSSs %s.") + % ','.join(available_lss))) @proxy.logger def _clone_lun(self, src_lun, tgt_lun): @@ -485,29 +590,36 @@ class DS8KProxy(proxy.IBMStorageProxy): def _ensure_vol_not_fc_target(self, vol_hex_id): for cp in self._helper.get_flashcopy(vol_hex_id): if cp['targetvolume']['id'] == vol_hex_id: - msg = (_('Volume %s is currently a target of another ' - 'FlashCopy operation') % vol_hex_id) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_('Volume %s is currently a target of another ' + 'FlashCopy operation') % vol_hex_id)) + + def _create_replica_helper(self, lun): + if not lun.pool_lss_pair.get('target'): + lun = self._replication.enable_replication(lun, True) + else: + lun = self._replication.create_replica(lun) + return lun @proxy._trace_time def create_volume(self, volume): lun = self._create_lun_helper(Lun(volume)) if lun.type_replication: - lun = self._replication.create_replica(lun) + lun = self._create_replica_helper(lun) return lun.get_volume_update() @proxy._trace_time def create_cloned_volume(self, target_vol, source_vol): lun = self._clone_lun(Lun(source_vol), Lun(target_vol)) if lun.type_replication: - lun = self._replication.create_replica(lun) + lun = self._create_replica_helper(lun) return lun.get_volume_update() @proxy._trace_time def create_volume_from_snapshot(self, volume, snapshot): lun = self._clone_lun(Lun(snapshot, is_snapshot=True), Lun(volume)) if lun.type_replication: - lun = self._replication.create_replica(lun) + lun = self._create_replica_helper(lun) return lun.get_volume_update() @proxy._trace_time @@ -524,9 +636,9 @@ class DS8KProxy(proxy.IBMStorageProxy): self._replication.extend_replica(lun, param) self._replication.create_pprc_pairs(lun) else: - msg = (_("The volume %s has been failed over, it is " - "not suggested to extend it.") % lun.ds_id) - raise exception.CinderException(data=msg) + raise exception.CinderException( + message=(_("The volume %s has been failed over, it is " + "not suggested to extend it.") % lun.ds_id)) else: self._helper.change_lun(lun.ds_id, param) @@ -658,10 +770,11 @@ class DS8KProxy(proxy.IBMStorageProxy): lun = self._replication.delete_replica(lun) lun = _convert_thin_and_thick(lun, new_type_thin) else: - msg = (_("The volume %s is in replication relationship, " - "it is not supported to retype from thin to " - "thick or vice versus.") % lun.ds_id) - raise exception.CinderException(msg) + raise exception.CinderException( + message=(_("The volume %s is in replication " + "relationship, it is not supported to " + "retype from thin to thick or vice " + "versa.") % lun.ds_id)) else: lun = _convert_thin_and_thick(lun, new_type_thin) if new_type_replication: @@ -697,23 +810,53 @@ class DS8KProxy(proxy.IBMStorageProxy): force, **kwargs) @proxy.logger - def create_consistencygroup(self, ctxt, group): - """Create a consistency group.""" - return self._helper.create_group(ctxt, group) + def create_group(self, ctxt, group): + """Create generic volume group.""" + if Group(group).consisgroup_enabled: + self._assert(self._helper.backend['lss_ids_for_cg'], + 'No LSS(s) for CG, please make sure you have ' + 'reserved LSS for CG via param lss_range_for_cg.') + return self._helper.create_group(group) @proxy.logger - def delete_consistencygroup(self, ctxt, group, volumes): - """Delete a consistency group.""" + def delete_group(self, ctxt, group, volumes): + """Delete group and the volumes in the group.""" luns = [Lun(volume) for volume in volumes] - return self._helper.delete_group(ctxt, group, luns) + if Group(group).consisgroup_enabled: + return self._delete_group_with_lock(group, luns) + else: + return self._helper.delete_group(group, luns) - @proxy._trace_time - def create_cgsnapshot(self, ctxt, cgsnapshot, snapshots): - """Create a consistency group snapshot.""" - return self._create_group_snapshot(ctxt, cgsnapshot, snapshots, True) + @coordination.synchronized('{self.prefix}-consistency-group') + def _delete_group_with_lock(self, group, luns): + model_update, volumes_model_update = ( + self._helper.delete_group(group, luns)) + if model_update['status'] == fields.GroupStatus.DELETED: + self._update_consisgroup_cache(group.id) + return model_update, volumes_model_update - def _create_group_snapshot(self, ctxt, cgsnapshot, snapshots, - cg_enabled=False): + @proxy.logger + def delete_group_snapshot(self, ctxt, group_snapshot, snapshots): + """Delete volume group snapshot.""" + tgt_luns = [Lun(s, is_snapshot=True) for s in snapshots] + if Group(group_snapshot, True).consisgroup_enabled: + return self._delete_group_snapshot_with_lock( + group_snapshot, tgt_luns) + else: + return self._helper.delete_group_snapshot( + group_snapshot, tgt_luns) + + @coordination.synchronized('{self.prefix}-consistency-group') + def _delete_group_snapshot_with_lock(self, group_snapshot, tgt_luns): + model_update, snapshots_model_update = ( + self._helper.delete_group_snapshot(group_snapshot, tgt_luns)) + if model_update['status'] == fields.GroupStatus.DELETED: + self._update_consisgroup_cache(group_snapshot.id) + return model_update, snapshots_model_update + + @proxy.logger + def create_group_snapshot(self, ctxt, group_snapshot, snapshots): + """Create volume group snapshot.""" snapshots_model_update = [] model_update = {'status': fields.GroupStatus.AVAILABLE} @@ -722,7 +865,7 @@ class DS8KProxy(proxy.IBMStorageProxy): try: if src_luns and tgt_luns: - self._clone_group(src_luns, tgt_luns, cg_enabled) + self._clone_group(src_luns, tgt_luns) except restclient.APIException: model_update['status'] = fields.GroupStatus.ERROR LOG.exception('Failed to create group snapshot.') @@ -737,70 +880,100 @@ class DS8KProxy(proxy.IBMStorageProxy): return model_update, snapshots_model_update - @proxy._trace_time @proxy.logger - def delete_cgsnapshot(self, ctxt, cgsnapshot, snapshots): - """Delete a consistency group snapshot.""" - return self._delete_group_snapshot(ctxt, cgsnapshot, snapshots) + def update_group(self, ctxt, group, add_volumes, remove_volumes): + """Update generic volume group.""" + if Group(group).consisgroup_enabled: + return self._update_group(group, add_volumes, remove_volumes) + else: + return None, None, None - def _delete_group_snapshot(self, ctxt, group_snapshot, snapshots): - snapshots_model_update = [] - model_update = {'status': fields.GroupStatus.DELETED} + def _update_group(self, group, add_volumes, remove_volumes): + add_volumes_update = [] + group_volume_ids = [vol.id for vol in group.volumes] + add_volumes = [vol for vol in add_volumes + if vol.id not in group_volume_ids] + remove_volumes = [vol for vol in remove_volumes + if vol.id in group_volume_ids] + if add_volumes: + add_luns = [Lun(vol) for vol in add_volumes] + lss_in_cg = [Lun(vol).ds_id[:2] for vol in group.volumes] + if not lss_in_cg: + lss_in_cg = self._find_lss_for_empty_group(group, add_luns) + add_volumes_update = self._add_volumes_into_group( + group, add_luns, lss_in_cg) + if remove_volumes: + self._remove_volumes_in_group(group, add_volumes, remove_volumes) + return None, add_volumes_update, None - snapshots = [Lun(s, is_snapshot=True) for s in snapshots] - if snapshots: - try: - self._helper.delete_lun(snapshots) - except restclient.APIException as e: - model_update['status'] = fields.GroupStatus.ERROR_DELETING - LOG.error("Failed to delete group snapshot. " - "Error: %(err)s", - {'err': e}) + @coordination.synchronized('{self.prefix}-consistency-group') + def _find_lss_for_empty_group(self, group, luns): + sorted_lss_ids = collections.Counter([lun.ds_id[:2] for lun in luns]) + available_lss = self._find_lss_for_cg() + lss_for_cg = None + for lss_id in sorted_lss_ids: + if lss_id in available_lss: + lss_for_cg = lss_id + break + if not lss_for_cg: + lss_for_cg = available_lss.pop() + self._update_consisgroup_cache(group.id, lss_for_cg) + return lss_for_cg - for snapshot in snapshots: - snapshots_model_update.append({ - 'id': snapshot.os_id, - 'status': model_update['status'] - }) - return model_update, snapshots_model_update + def _add_volumes_into_group(self, group, add_luns, lss_in_cg): + add_volumes_update = [] + luns = [lun for lun in add_luns if lun.ds_id[:2] not in lss_in_cg] + for lun in luns: + if lun.type_replication: + new_lun = self._clone_lun_for_group(group, lun) + new_lun.type_replication = True + new_lun = self._replication.enable_replication(new_lun, True) + lun = self._replication.delete_replica(lun) + else: + new_lun = self._clone_lun_for_group(group, lun) + self._helper.delete_lun(lun) + volume_update = new_lun.update_volume(lun) + volume_update['id'] = lun.os_id + add_volumes_update.append(volume_update) + return add_volumes_update + + def _clone_lun_for_group(self, group, lun): + lun.group = Group(group) + new_lun = lun.shallow_copy() + new_lun.type_replication = False + self._create_lun_helper(new_lun) + self._clone_lun(lun, new_lun) + return new_lun + + @coordination.synchronized('{self.prefix}-consistency-group') + def _remove_volumes_in_group(self, group, add_volumes, remove_volumes): + if len(remove_volumes) == len(group.volumes) + len(add_volumes): + self._update_consisgroup_cache(group.id) @proxy.logger - def update_consistencygroup(self, ctxt, group, - add_volumes, remove_volumes): - """Add or remove volume(s) to/from an existing consistency group.""" - return self._helper.update_group(ctxt, group, - add_volumes, remove_volumes) + def _update_consisgroup_cache(self, group_id, lss_id=None): + if lss_id: + self.consisgroup_cache[group_id] = set([lss_id]) + else: + if self.consisgroup_cache.get(group_id): + LOG.debug('Group %(id)s owns LSS %(lss)s in the cache.', { + 'id': group_id, + 'lss': ','.join(self.consisgroup_cache[group_id]) + }) + self.consisgroup_cache.pop(group_id) @proxy._trace_time - def create_consistencygroup_from_src(self, ctxt, group, volumes, - cgsnapshot, snapshots, - source_cg, sorted_source_vols): - """Create a consistencygroup from source. - - :param ctxt: the context of the caller. - :param group: the dictionary of the consistency group to be created. - :param volumes: a list of volume dictionaries in the group. - :param cgsnapshot: the dictionary of the cgsnapshot as source. - :param snapshots: a list of snapshot dictionaries in the cgsnapshot. - :param source_cg: the dictionary of the consisgroup as source. - :param sorted_source_vols: a list of volume dictionaries - in the consisgroup. - :return model_update, volumes_model_update - """ - return self._create_group_from_src(ctxt, group, volumes, cgsnapshot, - snapshots, source_cg, - sorted_source_vols, True) - - def _create_group_from_src(self, ctxt, group, volumes, cgsnapshot, - snapshots, source_cg, sorted_source_vols, - cg_enabled=False): + def create_group_from_src(self, ctxt, group, volumes, group_snapshot, + sorted_snapshots, source_group, + sorted_source_vols): + """Create volume group from volume group or volume group snapshot.""" model_update = {'status': fields.GroupStatus.AVAILABLE} volumes_model_update = [] - if cgsnapshot and snapshots: + if group_snapshot and sorted_snapshots: src_luns = [Lun(snapshot, is_snapshot=True) - for snapshot in snapshots] - elif source_cg and sorted_source_vols: + for snapshot in sorted_snapshots] + elif source_group and sorted_source_vols: src_luns = [Lun(source_vol) for source_vol in sorted_source_vols] else: @@ -811,9 +984,22 @@ class DS8KProxy(proxy.IBMStorageProxy): raise exception.InvalidInput(message=msg) try: + # Don't use paramter volumes because it has DetachedInstanceError + # issue frequently. here tries to get and sort new volumes, a lot + # of cases have been guaranteed by the _sort_source_vols in + # manange.py, so not verify again. + sorted_volumes = [] + for vol in volumes: + found_vols = [v for v in group.volumes if v['id'] == vol['id']] + sorted_volumes.extend(found_vols) + volumes = sorted_volumes + tgt_luns = [Lun(volume) for volume in volumes] if src_luns and tgt_luns: - self._clone_group(src_luns, tgt_luns, cg_enabled) + self._clone_group(src_luns, tgt_luns) + for tgt_lun in tgt_luns: + if tgt_lun.type_replication: + self._create_replica_helper(tgt_lun) except restclient.APIException: model_update['status'] = fields.GroupStatus.ERROR LOG.exception("Failed to create group from group snapshot.") @@ -828,7 +1014,7 @@ class DS8KProxy(proxy.IBMStorageProxy): return model_update, volumes_model_update - def _clone_group(self, src_luns, tgt_luns, cg_enabled): + def _clone_group(self, src_luns, tgt_luns): for src_lun in src_luns: self._ensure_vol_not_fc_target(src_lun.ds_id) finished = False @@ -842,7 +1028,7 @@ class DS8KProxy(proxy.IBMStorageProxy): "source_volume": src_lun.ds_id, "target_volume": tgt_lun.ds_id }) - if cg_enabled: + if tgt_lun.group.consisgroup_enabled: self._do_flashcopy_with_freeze(vol_pairs) else: self._helper.start_flashcopy(vol_pairs) @@ -851,7 +1037,6 @@ class DS8KProxy(proxy.IBMStorageProxy): if not finished: self._helper.delete_lun(tgt_luns) - @synchronized('OpenStackCinderIBMDS8KMutex-CG-', external=True) @proxy._trace_time def _do_flashcopy_with_freeze(self, vol_pairs): # issue flashcopy with freeze @@ -861,48 +1046,6 @@ class DS8KProxy(proxy.IBMStorageProxy): LOG.debug('Unfreezing the LSS: %s', ','.join(lss_ids)) self._helper.unfreeze_lss(lss_ids) - @proxy.logger - def create_group(self, ctxt, group): - """Create generic volume group.""" - return self._helper.create_group(ctxt, group) - - @proxy.logger - def delete_group(self, ctxt, group, volumes): - """Delete group and the volumes in the group.""" - luns = [Lun(volume) for volume in volumes] - return self._helper.delete_group(ctxt, group, luns) - - @proxy.logger - def update_group(self, ctxt, group, add_volumes, remove_volumes): - """Update generic volume group.""" - return self._helper.update_group(ctxt, group, - add_volumes, remove_volumes) - - @proxy.logger - def create_group_snapshot(self, ctxt, group_snapshot, snapshots): - """Create volume group snapshot.""" - snapshot_group = Group(group_snapshot) - cg_enabled = True if snapshot_group.type_cg_snapshot else False - return self._create_group_snapshot(ctxt, group_snapshot, - snapshots, cg_enabled) - - @proxy.logger - def delete_group_snapshot(self, ctxt, group_snapshot, snapshots): - """Delete volume group snapshot.""" - return self._delete_group_snapshot(ctxt, group_snapshot, snapshots) - - @proxy._trace_time - def create_group_from_src(self, ctxt, group, volumes, group_snapshot, - sorted_snapshots, source_group, - sorted_source_vols): - """Create volume group from volume group or volume group snapshot.""" - volume_group = Group(group) - cg_enabled = True if volume_group.type_cg_snapshot else False - return self._create_group_from_src(ctxt, group, volumes, - group_snapshot, sorted_snapshots, - source_group, sorted_source_vols, - cg_enabled) - def freeze_backend(self, ctxt): """Notify the backend that it's frozen.""" pass @@ -935,9 +1078,9 @@ class DS8KProxy(proxy.IBMStorageProxy): if secondary_id is None: secondary_id = backend_id elif secondary_id != backend_id: - msg = (_('Invalid secondary_backend_id specified. ' - 'Valid backend id is %s.') % backend_id) - raise exception.InvalidReplicationTarget(message=msg) + raise exception.InvalidReplicationTarget( + message=(_('Invalid secondary_backend_id specified. ' + 'Valid backend id is %s.') % backend_id)) LOG.debug("Starting failover to %s.", secondary_id) @@ -965,10 +1108,10 @@ class DS8KProxy(proxy.IBMStorageProxy): self._active_backend_id = "" self._helper = self._replication._source_helper except restclient.APIException as e: - msg = (_("Unable to failover host to %(id)s. " - "Exception= %(ex)s") - % {'id': secondary_id, 'ex': six.text_type(e)}) - raise exception.UnableToFailOver(reason=msg) + raise exception.UnableToFailOver( + reason=(_("Unable to failover host to %(id)s. " + "Exception= %(ex)s") + % {'id': secondary_id, 'ex': six.text_type(e)})) for lun in replicated_luns: volume_update = lun.get_volume_update() diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py index f29f436d8a2..72cc6c390e9 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_replication.py @@ -50,10 +50,11 @@ class MetroMirrorManager(object): ports = self._source.get_physical_links( self._target.backend['storage_wwnn']) if not ports: - msg = (_("DS8K %(tgt)s is not connected to the DS8K %(src)s!") % - {'tgt': self._target.backend['storage_wwnn'], - 'src': self._source.backend['storage_wwnn']}) - raise exception.CinderException(msg) + raise exception.VolumeDriverException( + message=((_("%(tgt)s is not connected to %(src)s!") % { + 'tgt': self._target.backend['storage_wwnn'], + 'src': self._source.backend['storage_wwnn'] + }))) pairs = [{ 'source_port_id': p['source_port_id'], @@ -72,15 +73,13 @@ class MetroMirrorManager(object): ["%s-%s" % (p['source_port_id'], p['target_port_id']) for p in pairs]) - invalid_pair = "%s-%s" % (pair['source_port_id'], pair['target_port_id']) - - msg = (_("Invalid port pair: %(invalid)s, valid port " - "pair(s) are: %(valid)s") % - {'invalid': invalid_pair, - 'valid': valid_pairs}) - raise exception.CinderException(msg) + raise exception.VolumeDriverException( + message=((_("Invalid port pair: %(invalid)s, valid " + "port pair(s) are: %(valid)s") + % {'invalid': invalid_pair, + 'valid': valid_pairs}))) self._source.backend['port_pairs'] = [{ 'source_port_id': p['target_port_id'], 'target_port_id': p['source_port_id'] @@ -96,13 +95,13 @@ class MetroMirrorManager(object): return True - def find_available_pprc_path(self, lss=None, excluded_lss=None): - """find lss from existed pprc path. + def find_from_pprc_paths(self, specified_lss=None, excluded_lss=None): + """find lss from existing pprc paths and pool id for it. - the format of lss_pair returned is as below: + the format of pool_lss_pair returned is as below: {'source': (pid, lss), 'target': (pid, lss)} """ - state, paths = self._filter_pprc_paths(lss) + state, paths = self._filter_pprc_paths(specified_lss) if state != PPRC_PATH_HEALTHY: # check whether the physical links are available or not, # or have been changed. @@ -111,43 +110,47 @@ class MetroMirrorManager(object): if excluded_lss: paths = [p for p in paths if p['source_lss_id'] not in excluded_lss] + # only enable_replication will specify the source LSS + # and it need to reuse LSS reserved for CG if this LSS + # is in PPRC path. + if not specified_lss: + paths = [p for p in paths if p['source_lss_id'] not in + self._source.backend['lss_ids_for_cg']] - lss_pair = {} - if len(paths) == 1: - path = paths[0] - pid = self._source.get_pool(path['source_lss_id']) - lss_pair['source'] = (pid, path['source_lss_id']) - else: - # sort the lss pairs according to the number of luns, - # get the lss pair which has least luns. - candidates = [] - source_lss_set = set(p['source_lss_id'] for p in paths) - for lss in source_lss_set: - # get the number of lun in source. - src_luns = self._source.get_lun_number_in_lss(lss) - if src_luns == helper.LSS_VOL_SLOTS: - continue + # sort pairs according to the number of luns in their LSSes, + # and get the pair which LSS has least luns. + candidates = [] + source_lss_set = set(p['source_lss_id'] for p in paths) + for lss in source_lss_set: + # get the number of luns in source. + src_luns = self._source.get_lun_number_in_lss(lss) + if src_luns == helper.LSS_VOL_SLOTS and not specified_lss: + continue - spec_paths = [p for p in paths if p['source_lss_id'] == lss] - for path in spec_paths: - # get the number of lun in target. + spec_paths = [p for p in paths if p['source_lss_id'] == lss] + for path in spec_paths: + # get the number of luns in target. + try: tgt_luns = self._target.get_lun_number_in_lss( path['target_lss_id']) - candidates.append((lss, path, src_luns + tgt_luns)) - - if candidates: - candidate = sorted(candidates, key=lambda c: c[2])[0] - pid = self._source.get_pool(candidate[0]) - lss_pair['source'] = (pid, candidate[0]) - path = candidate[1] - else: - return PPRC_PATH_FULL, None - - # format the target in lss_pair. - pid = self._target.get_pool(path['target_lss_id']) - lss_pair['target'] = (pid, path['target_lss_id']) - - return PPRC_PATH_HEALTHY, lss_pair + except restclient.APIException: + # if DS8K can fix this problem, then remove the + # exception here. + LOG.error("Target LSS %s in PPRC path may doesn't " + "exist although PPRC path is available.", + path['target_lss_id']) + tgt_luns = 0 + candidates.append((path['source_lss_id'], + path['target_lss_id'], + src_luns + tgt_luns)) + if not candidates: + return PPRC_PATH_FULL, None + else: + src_lss, tgt_lss, num = sorted(candidates, key=lambda c: c[2])[0] + return PPRC_PATH_HEALTHY, { + 'source': (self._source.get_pool(src_lss), src_lss), + 'target': (self._target.get_pool(tgt_lss), tgt_lss) + } def _filter_pprc_paths(self, lss): paths = self._source.get_pprc_paths(lss) @@ -225,9 +228,9 @@ class MetroMirrorManager(object): return PPRC_PATH_HEALTHY, paths - def create_pprc_path(self, lss_pair): - src_lss = lss_pair['source'][1] - tgt_lss = lss_pair['target'][1] + def create_pprc_path(self, pool_lss_pair): + src_lss = pool_lss_pair['source'][1] + tgt_lss = pool_lss_pair['target'][1] # check whether the pprc path exists and is healthy or not firstly. pid = (self._source.backend['storage_wwnn'] + '_' + src_lss + ':' + self._target.backend['storage_wwnn'] + '_' + tgt_lss) @@ -256,9 +259,9 @@ class MetroMirrorManager(object): break if retry == 3: self._source.delete_pprc_path(pid) - msg = (_("Fail to create PPRC path %(src)s:%(tgt)s.") % - {'src': src_lss, 'tgt': tgt_lss}) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_("Failed to create PPRC path %(src)s:%(tgt)s.") + % {'src': src_lss, 'tgt': tgt_lss})) LOG.debug("Create the new PPRC path successfully.") def _is_pprc_paths_healthy(self, path_id): @@ -280,8 +283,7 @@ class MetroMirrorManager(object): vol_pairs = [{ 'source_volume': lun.ds_id, - 'source_system_id': - self._source.backend['storage_unit'], + 'source_system_id': self._source.backend['storage_unit'], 'target_volume': tgt_vol_id, 'target_system_id': tgt_stg_id }] @@ -298,10 +300,9 @@ class MetroMirrorManager(object): def delete_pprc_pairs(self, lun): self._source.delete_pprc_pair(lun.ds_id) - if self.is_target_alive(): + if self.is_target_alive() and lun.replication_driver_data: replica = sorted(lun.replication_driver_data.values())[0] - self._target.delete_pprc_pair( - six.text_type(replica['vol_hex_id'])) + self._target.delete_pprc_pair(replica['vol_hex_id']) def do_pprc_failover(self, luns, backend_id): vol_pairs = [] @@ -317,12 +318,10 @@ class MetroMirrorManager(object): continue vol_pairs.append({ - 'source_volume': six.text_type(target_vol_id), - 'source_system_id': six.text_type( - self._target.backend['storage_unit']), - 'target_volume': six.text_type(lun.ds_id), - 'target_system_id': six.text_type( - self._source.backend['storage_unit']) + 'source_volume': target_vol_id, + 'source_system_id': self._target.backend['storage_unit'], + 'target_volume': lun.ds_id, + 'target_system_id': self._source.backend['storage_unit'] }) target_vol_ids.append(target_vol_id) @@ -383,9 +382,16 @@ class Replication(object): if connection_type == storage.XIV_CONNECTION_TYPE_FC: self._target_helper = ( helper.DS8KReplicationTargetHelper(target_device)) - else: + elif connection_type == storage.XIV_CONNECTION_TYPE_FC_ECKD: self._target_helper = ( helper.DS8KReplicationTargetECKDHelper(target_device)) + else: + raise exception.InvalidParameterValue( + err=(_("Param [connection_type] %s in replication_device " + "is invalid.") % connection_type)) + + self._target_helper.backend['lss_ids_for_cg'] = ( + self._source_helper.backend['lss_ids_for_cg']) self._mm_manager = MetroMirrorManager(self._source_helper, self._target_helper) @@ -393,11 +399,12 @@ class Replication(object): src_conn_type = self._source_helper.get_connection_type() tgt_conn_type = self._target_helper.get_connection_type() if src_conn_type != tgt_conn_type: - msg = (_("The connection type in primary backend is " - "%(primary)s, but in secondary backend it is " - "%(secondary)s") % - {'primary': src_conn_type, 'secondary': tgt_conn_type}) - raise exception.CinderException(msg) + raise exception.VolumeDriverException( + message=(_("The connection type in primary backend is " + "%(primary)s, but in secondary backend it is " + "%(secondary)s") + % {'primary': src_conn_type, + 'secondary': tgt_conn_type})) # PPRC can not copy from ESE volume to standard volume or vice versus. if src_conn_type == storage.XIV_CONNECTION_TYPE_FC_ECKD: src_thin = self._source_helper.get_thin_provision() @@ -425,13 +432,13 @@ class Replication(object): return luns @proxy.logger - def find_available_lss_pair(self, excluded_lss): - state, lss_pair = ( - self._mm_manager.find_available_pprc_path(None, excluded_lss)) - if lss_pair is None: - lss_pair = self.find_new_lss_for_source(excluded_lss) - lss_pair.update(self.find_new_lss_for_target()) - return lss_pair + def find_pool_lss_pair(self, excluded_lss): + state, pool_lss_pair = ( + self._mm_manager.find_from_pprc_paths(None, excluded_lss)) + if pool_lss_pair is None: + pool_lss_pair = self.find_new_lss_for_source(excluded_lss) + pool_lss_pair.update(self.find_new_lss_for_target()) + return pool_lss_pair @proxy.logger def find_new_lss_for_source(self, excluded_lss): @@ -444,23 +451,22 @@ class Replication(object): return {'target': (tgt_pid, tgt_lss)} @proxy.logger - def enable_replication(self, lun): - state, lun.lss_pair = ( - self._mm_manager.find_available_pprc_path(lun.ds_id[0:2])) + def enable_replication(self, lun, delete_source=False): + state, lun.pool_lss_pair = ( + self._mm_manager.find_from_pprc_paths(lun.ds_id[0:2])) + LOG.debug("enable_replication: pool_lss_pair is %s.", + lun.pool_lss_pair) if state == PPRC_PATH_UNHEALTHY: - msg = (_("The path(s) for volume %(name)s isn't available " - "any more, please make sure the state of the path(s) " - "which source LSS is %(lss)s is success.") % - {'name': lun.cinder_name, 'lss': lun.ds_id[0:2]}) - raise restclient.APIException(data=msg) + raise restclient.APIException( + data=(_("The path(s) for volume %(name)s isn't available " + "any more, please make sure the state of the path(s) " + "which source LSS is %(lss)s is success.") + % {'name': lun.cinder_name, 'lss': lun.ds_id[0:2]})) elif state == PPRC_PATH_NOT_EXIST: pid = self._source_helper.get_pool(lun.ds_id[0:2]) - lss_pair = {'source': (pid, lun.ds_id[0:2])} - lss_pair.update(self.find_new_lss_for_target()) - lun.lss_pair = lss_pair - LOG.debug("Begin to create replication volume, lss_pair is %s." % - lun.lss_pair) - lun = self.create_replica(lun, False) + lun.pool_lss_pair = {'source': (pid, lun.ds_id[0:2])} + lun.pool_lss_pair.update(self.find_new_lss_for_target()) + lun = self.create_replica(lun, delete_source) return lun @proxy.logger @@ -469,7 +475,7 @@ class Replication(object): try: self._target_helper.create_lun(lun) # create PPRC paths if need. - self._mm_manager.create_pprc_path(lun.lss_pair) + self._mm_manager.create_pprc_path(lun.pool_lss_pair) # create pprc pair self._mm_manager.create_pprc_pairs(lun) except restclient.APIException: @@ -477,7 +483,6 @@ class Replication(object): self.delete_replica(lun) if delete_source: self._source_helper.delete_lun(lun) - lun.replication_status = 'enabled' return lun @@ -488,11 +493,10 @@ class Replication(object): self._mm_manager.delete_pprc_pairs(lun) self._delete_replica(lun) except restclient.APIException as e: - msg = (_('Failed to delete the target volume for volume ' - '%(volume)s, Exception: %(ex)s.') % - {'volume': lun.ds_id, 'ex': six.text_type(e)}) - raise exception.CinderException(msg) - + raise exception.VolumeDriverException( + message=(_('Failed to delete the target volume for ' + 'volume %(volume)s, Exception: %(ex)s.') + % {'volume': lun.ds_id, 'ex': six.text_type(e)})) lun.replication_status = 'disabled' lun.replication_driver_data = {} return lun @@ -542,7 +546,7 @@ class Replication(object): LOG.debug("Failback starts, backend id is %s.", backend_id) for lun in luns: - self._mm_manager.create_pprc_path(lun.lss_pair) + self._mm_manager.create_pprc_path(lun.pool_lss_pair) self._mm_manager.do_pprc_failback(luns, backend_id) # revert the relationship of source volume and target volume self.do_pprc_failover(luns, backend_id) diff --git a/cinder/volume/drivers/ibm/ibm_storage/ds8k_restclient.py b/cinder/volume/drivers/ibm/ibm_storage/ds8k_restclient.py index 45123e09dd4..d11e6f40626 100644 --- a/cinder/volume/drivers/ibm/ibm_storage/ds8k_restclient.py +++ b/cinder/volume/drivers/ibm/ibm_storage/ds8k_restclient.py @@ -271,18 +271,19 @@ class RESTScheduler(object): attempts == 0): self.connect() elif response['server'].get('code') in AUTHENTICATION_ERROR_CODES: - msg = (_('Authentication failed for host %(host)s. ' - 'Exception= %(e)s') % - {'host': self.host, 'e': response['server']['message']}) - raise APIAuthenticationException(data=msg) + raise APIAuthenticationException( + data=(_('Authentication failed for host %(host)s. ' + 'Exception= %(e)s') % + {'host': self.host, + 'e': response['server']['message']})) elif response['server'].get('code') in LSS_ERROR_CODES: - msg = (_('Can not put the volume in LSS: %s') % - response['server']['message']) - raise LssFullException(data=msg) + raise LssFullException( + data=(_('Can not put the volume in LSS: %s') + % response['server']['message'])) elif response['server']['status'] == 'timeout': - msg = (_('Request to storage API time out: %s') % - response['server']['message']) - raise TimeoutException(data=msg) + raise TimeoutException( + data=(_('Request to storage API time out: %s') + % response['server']['message'])) elif (response['server']['status'] != 'ok' and (badStatusException or 'code' not in response['server'])): # if code is not in response means that error was in @@ -290,10 +291,11 @@ class RESTScheduler(object): # via badStatusException=False, but will retry it to # confirm the problem. if attempts == 1: - msg = (_("Request to storage API failed: %(err)s, " - "(%(url)s).") % - {'err': response['server']['message'], 'url': url}) - raise APIException(data=msg) + raise APIException( + data=(_("Request to storage API failed: %(err)s, " + "(%(url)s).") + % {'err': response['server']['message'], + 'url': url})) eventlet.sleep(2) else: return response @@ -303,8 +305,8 @@ class RESTScheduler(object): def fetchall(self, *args, **kwargs): r = self.send(*args, **kwargs)['data'] if len(r) != 1: - msg = _('Expected one result but got %d.') % len(r) - raise APIException(data=msg) + raise APIException( + data=(_('Expected one result but got %d.') % len(r))) else: return r.popitem()[1] @@ -313,8 +315,8 @@ class RESTScheduler(object): def fetchone(self, *args, **kwargs): r = self.fetchall(*args, **kwargs) if len(r) != 1: - msg = _('Expected one item in result but got %d.') % len(r) - raise APIException(data=msg) + raise APIException( + data=(_('Expected one item in result but got %d.') % len(r))) return r[0] # same as the send method above but returns the last element of the @@ -323,9 +325,9 @@ class RESTScheduler(object): r = self.send(*args, **kwargs) if 'responses' in r: if len(r['responses']) != 1: - msg = (_('Expected one item in result responses but ' - 'got %d.') % len(r['responses'])) - raise APIException(data=msg) + raise APIException( + data=(_('Expected one item in result responses but ' + 'got %d.') % len(r['responses']))) r = r['responses'][0] return r['link']['href'].split('/')[-1]