Implementation for CoprHD Cinder Drivers

This patch set includes following CoprHD Cinder drivers

1-Cinder iSCSI Block Volume Driver for CoprHD
2-Cinder FC Block Volume Driver for CoprHD
3-Cinder ScaleIO Block Volume Driver for CoprHD

CI trigger: run emc-coprhd
DocImpact
Implements: blueprint coprhd-cinder-drivers
Change-Id: I5fe7ac8190edb2405981c4750dcecde00159a3ec
This commit is contained in:
hallur, parashuram 2016-05-17 23:06:01 +05:30
parent eccc58f7af
commit a7c715b4d0
22 changed files with 5794 additions and 0 deletions

View File

@ -67,6 +67,10 @@ from cinder.volume.drivers import blockbridge as \
from cinder.volume.drivers.cloudbyte import options as \
cinder_volume_drivers_cloudbyte_options
from cinder.volume.drivers import coho as cinder_volume_drivers_coho
from cinder.volume.drivers.coprhd import common as \
cinder_volume_drivers_coprhd_common
from cinder.volume.drivers.coprhd import scaleio as \
cinder_volume_drivers_coprhd_scaleio
from cinder.volume.drivers import datera as cinder_volume_drivers_datera
from cinder.volume.drivers.dell import dell_storagecenter_common as \
cinder_volume_drivers_dell_dellstoragecentercommon
@ -217,6 +221,7 @@ def list_opts():
cinder_volume_drivers_ibm_storwize_svc_storwizesvciscsi.
storwize_svc_iscsi_opts,
cinder_backup_drivers_glusterfs.glusterfsbackup_service_opts,
cinder_volume_drivers_coprhd_scaleio.scaleio_opts,
cinder_backup_drivers_tsm.tsm_opts,
cinder_volume_drivers_fujitsu_eternusdxcommon.
FJ_ETERNUS_DX_OPT_opts,
@ -234,6 +239,7 @@ def list_opts():
cinder_volume_drivers_sheepdog.sheepdog_opts,
[cinder_api_middleware_sizelimit.max_request_body_size_opt],
cinder_volume_drivers_solidfire.sf_opts,
cinder_volume_drivers_coprhd_common.volume_opts,
cinder_backup_drivers_swift.swiftbackup_service_opts,
cinder_volume_drivers_cloudbyte_options.
cloudbyte_add_qosgroup_opts,

View File

@ -0,0 +1,919 @@
# Copyright (c) 2012 - 2016 EMC Corporation, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mock import Mock
from cinder import context
from cinder.objects import fields
from cinder import test
from cinder.volume.drivers.coprhd import common as coprhd_common
from cinder.volume.drivers.coprhd import fc as coprhd_fc
from cinder.volume.drivers.coprhd import iscsi as coprhd_iscsi
from cinder.volume.drivers.coprhd import scaleio as coprhd_scaleio
from cinder.volume import volume_types
"""
Test Data required for mocking
"""
export_group_details_data = {
"inactive": False,
"initiators": [{"creation_time": 1392194176020,
"host": {"id": "urn:storageos:Host:3e21edff-8662-4e60-ab5",
"link": {"href": "/compute/hosts/urn:storageos:H",
"rel": "self"}},
"hostname": "lglw7134",
"id": "urn:storageos:Initiator:13945431-06b7-44a0-838c-50",
"inactive": False,
"initiator_node": "20:00:00:90:FA:13:81:8D",
"initiator_port": "iqn.1993-08.org.deb:01:222",
"link": {"href": "/compute/initiators/urn:storageos:Initi",
"rel": "self"},
"protocol": "iSCSI",
"registration_status": "REGISTERED",
"tags": []}],
"name": "ccgroup",
"project": 'project',
"tags": [],
"tenant": 'tenant',
"type": "Host",
"varray": {"id": "urn:storageos:VirtualArray:5af376e9-ce2f-493d-9079-a872",
"link": {"href": "/vdc/varrays/urn:storageos:VirtualArray:5af3",
"rel": "self"}
},
"volumes": [{"id": "urn:storageos:Volume:6dc64865-bb25-431c-b321-ac268f16"
"a7ae:vdc1",
"lun": 1
}]
}
varray_detail_data = {"name": "varray"}
export_group_list = ["urn:storageos:ExportGroup:2dbce233-7da0-47cb-8ff3-68f48"]
iscsi_itl_list = {"itl": [{"hlu": 3,
"initiator": {"id": "urn:storageos:Initiator:13945",
"link": {"rel": "self",
"href": "/comput"},
"port": "iqn.1993-08.org.deb:01:222"},
"export": {"id": "urn:storageos:ExportGroup:2dbce2",
"name": "ccgroup",
"link": {"rel": "self",
"href": "/block/expo"}},
"device": {"id": "urn:storageos:Volume:aa1fc84a-af",
"link": {"rel": "self",
"href": "/block/volumes/urn:s"},
"wwn": "600009700001957015735330303535"},
"target": {"id": "urn:storageos:StoragePort:d7e42",
"link": {"rel": "self",
"href": "/vdc/stor:"},
"port": "50:00:09:73:00:18:95:19",
'ip_address': "10.10.10.10",
'tcp_port': '22'}},
{"hlu": 3,
"initiator": {"id": "urn:storageos:Initiator:13945",
"link": {"rel": "self",
"href": "/comput"},
"port": "iqn.1993-08.org.deb:01:222"},
"export": {"id": "urn:storageos:ExportGroup:2dbce2",
"name": "ccgroup",
"link": {"rel": "self",
"href": "/block/expo"}},
"device": {"id": "urn:storageos:Volume:aa1fc84a-af",
"link": {"rel": "self",
"href": "/block/volumes/urn:s"},
"wwn": "600009700001957015735330303535"},
"target": {"id": "urn:storageos:StoragePort:d7e42",
"link": {"rel": "self",
"href": "/vdc/stor:"},
"port": "50:00:09:73:00:18:95:19",
'ip_address': "10.10.10.10",
'tcp_port': '22'}}]}
fcitl_itl_list = {"itl": [{"hlu": 3,
"initiator": {"id": "urn:storageos:Initiator:13945",
"link": {"rel": "self",
"href": "/comput"},
"port": "12:34:56:78:90:12:34:56"},
"export": {"id": "urn:storageos:ExportGroup:2dbce2",
"name": "ccgroup",
"link": {"rel": "self",
"href": "/block/expo"}},
"device": {"id": "urn:storageos:Volume:aa1fc84a-af",
"link": {"rel": "self",
"href": "/block/volumes/urn:s"},
"wwn": "600009700001957015735330303535"},
"target": {"id": "urn:storageos:StoragePort:d7e42",
"link": {"rel": "self",
"href": "/vdc/stor:"},
"port": "12:34:56:78:90:12:34:56",
'ip_address': "10.10.10.10",
'tcp_port': '22'}},
{"hlu": 3,
"initiator": {"id": "urn:storageos:Initiator:13945",
"link": {"rel": "self",
"href": "/comput"},
"port": "12:34:56:78:90:12:34:56"},
"export": {"id": "urn:storageos:ExportGroup:2dbce2",
"name": "ccgroup",
"link": {"rel": "self",
"href": "/block/expo"}},
"device": {"id": "urn:storageos:Volume:aa1fc84a-af",
"link": {"rel": "self",
"href": "/block/volumes/urn:s"},
"wwn": "600009700001957015735330303535"},
"target": {"id": "urn:storageos:StoragePort:d7e42",
"link": {"rel": "self",
"href": "/vdc/stor:"},
"port": "12:34:56:78:90:12:34:56",
'ip_address': "10.10.10.10",
'tcp_port': '22'}}]}
scaleio_itl_list = {"itl": [{"hlu": -1,
"initiator": {"id":
"urn:storageos:Initiator:920aee",
"link": {"rel": "self",
"href":
"/compute/initiators"},
"port": "bfdf432500000004"},
"export": {"id":
"urn:storageos:ExportGroup:5449235",
"name": "10.108.225.109",
"link": {"rel": "self",
"href":
"/block/exports/urn:stor"}},
"device": {"id":
"urn:storageos:Volume:b3624a83-3eb",
"link": {"rel": "self",
"href": "/block/volume"},
"wwn":
"4F48CC4C27A43248092128B400000004"},
"target": {}},
{"hlu": -1,
"initiator": {"id":
"urn:storageos:Initiator:920aee",
"link": {"rel": "self",
"href":
"/compute/initiators/"},
"port": "bfdf432500000004"},
"export": {"id":
"urn:storageos:ExportGroup:5449235",
"name": "10.108.225.109",
"link": {"rel": "self",
"href":
"/block/exports/urn:stor"}},
"device": {"id":
"urn:storageos:Volume:c014e96a-557",
"link": {"rel": "self",
"href":
"/block/volumes/urn:stor"},
"wwn":
"4F48CC4C27A43248092129320000000E"},
"target": {}}]}
def get_test_volume_data(volume_type_id):
test_volume = {'name': 'test-vol1',
'size': 1,
'volume_name': 'test-vol1',
'id': '1',
'consistencygroup_id': None,
'provider_auth': None,
'project_id': 'project',
'display_name': 'test-vol1',
'display_description': 'test volume',
'volume_type_id': volume_type_id}
return test_volume
def get_source_test_volume_data(volume_type_id):
test_volume = {'name': 'source_test-vol1',
'size': 1,
'volume_name': 'source_test-vol1',
'id': '1234',
'consistencygroup_id': None,
'provider_auth': None,
'project_id': 'project',
'display_name': 'source_test-vol1',
'display_description': 'test volume',
'volume_type_id': volume_type_id}
return test_volume
def get_clone_volume_data(volume_type_id):
clone_test_volume = {'name': 'clone-test-vol1',
'size': 1,
'volume_name': 'clone-test-vol1',
'id': '2',
'provider_auth': None,
'project_id': 'project',
'display_name': 'clone-test-vol1',
'display_description': 'clone test volume',
'volume_type_id': volume_type_id}
return clone_test_volume
def get_test_snapshot_data(src_volume):
test_snapshot = {'name': 'snapshot1',
'display_name': 'snapshot1',
'size': 1,
'id': '1111',
'volume_name': 'test-vol1',
'volume_id': '1234',
'volume': src_volume,
'volume_size': 1,
'project_id': 'project'}
return test_snapshot
def get_connector_data():
connector = {'ip': '10.0.0.2',
'initiator': 'iqn.1993-08.org.deb:01:222',
'wwpns': ["1234567890123456", "1234567890543211"],
'wwnns': ["223456789012345", "223456789054321"],
'host': 'fakehost'}
return connector
def get_test_CG_data(volume_type_id):
test_CG = {'name': 'consistency_group_name',
'id': '12345abcde',
'volume_type_id': volume_type_id,
'status': fields.ConsistencyGroupStatus.AVAILABLE
}
return test_CG
def get_test_CG_snap_data(volume_type_id):
test_CG_snapshot = {'name': 'cg_snap_name',
'id': '12345abcde',
'consistencygroup_id': '123456789',
'status': fields.ConsistencyGroupStatus.AVAILABLE,
'snapshots': [],
'consistencygroup': get_test_CG_data(volume_type_id)
}
return test_CG_snapshot
class MockedEMCCoprHDDriverCommon(coprhd_common.EMCCoprHDDriverCommon):
def __init__(self, protocol, default_backend_name,
configuration=None):
super(MockedEMCCoprHDDriverCommon, self).__init__(
protocol, default_backend_name, configuration)
def authenticate_user(self):
pass
def get_exports_count_by_initiators(self, initiator_ports):
return 0
def _get_coprhd_volume_name(self, vol, verbose=False):
if verbose is True:
return {'volume_name': "coprhd_vol_name",
'volume_uri': "coprhd_vol_uri"}
else:
return "coprhd_vol_name"
def _get_coprhd_snapshot_name(self, snapshot, resUri):
return "coprhd_snapshot_name"
def _get_coprhd_cgid(self, cgid):
return "cg_uri"
def init_volume_api(self):
self.volume_api = Mock()
self.volume_api.get.return_value = {
'name': 'source_test-vol1',
'size': 1,
'volume_name': 'source_test-vol1',
'id': '1234',
'consistencygroup_id': '12345',
'provider_auth': None,
'project_id': 'project',
'display_name': 'source_test-vol1',
'display_description': 'test volume',
'volume_type_id': "vol_type_id-for-snap"}
def init_coprhd_api_components(self):
self.volume_obj = Mock()
self.volume_obj.create.return_value = "volume_created"
self.volume_obj.volume_query.return_value = "volume_uri"
self.volume_obj.get_storageAttributes.return_value = (
'block', 'volume_name')
self.volume_obj.storage_resource_query.return_value = "volume_uri"
self.volume_obj.is_volume_detachable.return_value = False
self.volume_obj.volume_clone_detach.return_value = 'detached'
self.volume_obj.getTags.return_value = (
["Openstack-vol", "Openstack-vol1"])
self.volume_obj.tag.return_value = "tagged"
self.volume_obj.clone.return_value = "volume-cloned"
if(self.protocol == "iSCSI"):
self.volume_obj.get_exports_by_uri.return_value = (
iscsi_itl_list)
elif(self.protocol == "FC"):
self.volume_obj.get_exports_by_uri.return_value = (
fcitl_itl_list)
else:
self.volume_obj.get_exports_by_uri.return_value = (
scaleio_itl_list)
self.volume_obj.list_volumes.return_value = []
self.volume_obj.show.return_value = {"id": "vol_id"}
self.volume_obj.expand.return_value = "expanded"
self.tag_obj = Mock()
self.tag_obj.list_tags.return_value = [
"Openstack-vol", "Openstack-vol1"]
self.tag_obj.tag_resource.return_value = "Tagged"
self.exportgroup_obj = Mock()
self.exportgroup_obj.exportgroup_list.return_value = (
export_group_list)
self.exportgroup_obj.exportgroup_show.return_value = (
export_group_details_data)
self.exportgroup_obj.exportgroup_add_volumes.return_value = (
"volume-added")
self.host_obj = Mock()
self.host_obj.list_by_tenant.return_value = []
self.host_obj.search_by_name.return_value = []
self.host_obj.list_all.return_value = [{'id': "host1_id",
'name': "host1"}]
self.host_obj.list_initiators.return_value = [
{'name': "12:34:56:78:90:12:34:56"},
{'name': "12:34:56:78:90:54:32:11"},
{'name': "bfdf432500000004"}]
self.hostinitiator_obj = Mock()
self.varray_obj = Mock()
self.varray_obj.varray_show.return_value = varray_detail_data
self.snapshot_obj = Mock()
mocked_snap_obj = self.snapshot_obj.return_value
mocked_snap_obj.storageResource_query.return_value = (
"resourceUri")
mocked_snap_obj.snapshot_create.return_value = (
"snapshot_created")
mocked_snap_obj.snapshot_query.return_value = "snapshot_uri"
self.consistencygroup_obj = Mock()
mocked_cg_object = self.consistencygroup_obj.return_value
mocked_cg_object.create.return_value = "CG-Created"
mocked_cg_object.consistencygroup_query.return_value = "CG-uri"
class EMCCoprHDISCSIDriverTest(test.TestCase):
def setUp(self):
super(EMCCoprHDISCSIDriverTest, self).setUp()
self.create_coprhd_setup()
def create_coprhd_setup(self):
self.configuration = Mock()
self.configuration.coprhd_hostname = "10.10.10.10"
self.configuration.coprhd_port = "4443"
self.configuration.volume_backend_name = "EMCCoprHDISCSIDriver"
self.configuration.coprhd_username = "user-name"
self.configuration.coprhd_password = "password"
self.configuration.coprhd_tenant = "tenant"
self.configuration.coprhd_project = "project"
self.configuration.coprhd_varray = "varray"
self.configuration.coprhd_emulate_snapshot = False
self.volume_type_id = self.create_coprhd_volume_type()
self.stubs.Set(coprhd_iscsi.EMCCoprHDISCSIDriver,
'_get_common_driver',
self._get_mocked_common_driver)
self.driver = coprhd_iscsi.EMCCoprHDISCSIDriver(
configuration=self.configuration)
def tearDown(self):
self._cleanUp()
super(EMCCoprHDISCSIDriverTest, self).tearDown()
def _cleanUp(self):
self.delete_vipr_volume_type()
def create_coprhd_volume_type(self):
ctx = context.get_admin_context()
vipr_volume_type = volume_types.create(ctx,
"coprhd-volume-type",
{'CoprHD:VPOOL':
'vpool_coprhd'})
volume_id = vipr_volume_type['id']
return volume_id
def _get_mocked_common_driver(self):
return MockedEMCCoprHDDriverCommon(
protocol="iSCSI",
default_backend_name="EMCViPRISCSIDriver",
configuration=self.configuration)
def delete_vipr_volume_type(self):
ctx = context.get_admin_context()
volume_types.destroy(ctx, self.volume_type_id)
def test_create_destroy(self):
volume = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume)
self.driver.delete_volume(volume)
def test_get_volume_stats(self):
vol_stats = self.driver.get_volume_stats(True)
self.assertTrue(vol_stats['free_capacity_gb'], 'unknown')
def test_create_volume_clone(self):
src_volume_data = get_test_volume_data(self.volume_type_id)
clone_volume_data = get_clone_volume_data(self.volume_type_id)
self.driver.create_volume(src_volume_data)
self.driver.create_cloned_volume(clone_volume_data, src_volume_data)
self.driver.delete_volume(src_volume_data)
self.driver.delete_volume(clone_volume_data)
def test_create_destroy_snapshot(self):
volume_data = get_test_volume_data(self.volume_type_id)
snapshot_data = get_test_snapshot_data(
get_source_test_volume_data(self.volume_type_id))
self.driver.create_volume(volume_data)
self.driver.create_snapshot(snapshot_data)
self.driver.delete_snapshot(snapshot_data)
self.driver.delete_volume(volume_data)
def test_create_volume_from_snapshot(self):
src_vol_data = get_source_test_volume_data(self.volume_type_id)
self.driver.create_volume(src_vol_data)
volume_data = get_test_volume_data(self.volume_type_id)
snapshot_data = get_test_snapshot_data(src_vol_data)
self.driver.create_snapshot(snapshot_data)
self.driver.create_volume_from_snapshot(volume_data, snapshot_data)
self.driver.delete_snapshot(snapshot_data)
self.driver.delete_volume(src_vol_data)
self.driver.delete_volume(volume_data)
def test_extend_volume(self):
volume_data = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume_data)
self.driver.extend_volume(volume_data, 2)
self.driver.delete_volume(volume_data)
def test_initialize_and_terminate_connection(self):
connector_data = get_connector_data()
volume_data = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume_data)
res_initialize = self.driver.initialize_connection(
volume_data, connector_data)
expected_initialize = {'driver_volume_type': 'iscsi',
'data': {'target_lun': 3,
'target_portal': '10.10.10.10:22',
'target_iqn':
'50:00:09:73:00:18:95:19',
'target_discovered': False,
'volume_id': '1'}}
self.assertEqual(
expected_initialize, res_initialize, 'Unexpected return data')
self.driver.terminate_connection(volume_data, connector_data)
self.driver.delete_volume(volume_data)
def test_create_delete_empty_CG(self):
cg_data = get_test_CG_data(self.volume_type_id)
ctx = context.get_admin_context()
self.driver.create_consistencygroup(ctx, cg_data)
model_update, volumes_model_update = \
self.driver.delete_consistencygroup(ctx, cg_data, [])
self.assertEqual([], volumes_model_update, 'Unexpected return data')
def test_create_update_delete_CG(self):
cg_data = get_test_CG_data(self.volume_type_id)
ctx = context.get_admin_context()
self.driver.create_consistencygroup(ctx, cg_data)
volume = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume)
model_update, ret1, ret2 = \
self.driver.update_consistencygroup(ctx, cg_data, [volume], [])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
model_update, volumes_model_update = \
self.driver.delete_consistencygroup(ctx, cg_data, [volume])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
self.assertEqual([{'status': 'deleted', 'id': '1'}],
volumes_model_update)
def test_create_delete_CG_snap(self):
cg_snap_data = get_test_CG_snap_data(self.volume_type_id)
ctx = context.get_admin_context()
model_update, snapshots_model_update = \
self.driver.create_cgsnapshot(ctx, cg_snap_data, [])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
self.assertEqual([], snapshots_model_update, 'Unexpected return data')
model_update, snapshots_model_update = \
self.driver.delete_cgsnapshot(ctx, cg_snap_data, [])
self.assertEqual({}, model_update, 'Unexpected return data')
self.assertEqual([], snapshots_model_update, 'Unexpected return data')
class EMCCoprHDFCDriverTest(test.TestCase):
def setUp(self):
super(EMCCoprHDFCDriverTest, self).setUp()
self.create_coprhd_setup()
def create_coprhd_setup(self):
self.configuration = Mock()
self.configuration.coprhd_hostname = "10.10.10.10"
self.configuration.coprhd_port = "4443"
self.configuration.volume_backend_name = "EMCCoprHDFCDriver"
self.configuration.coprhd_username = "user-name"
self.configuration.coprhd_password = "password"
self.configuration.coprhd_tenant = "tenant"
self.configuration.coprhd_project = "project"
self.configuration.coprhd_varray = "varray"
self.configuration.coprhd_emulate_snapshot = False
self.volume_type_id = self.create_coprhd_volume_type()
self.stubs.Set(coprhd_fc.EMCCoprHDFCDriver,
'_get_common_driver',
self._get_mocked_common_driver)
self.driver = coprhd_fc.EMCCoprHDFCDriver(
configuration=self.configuration)
def tearDown(self):
self._cleanUp()
super(EMCCoprHDFCDriverTest, self).tearDown()
def _cleanUp(self):
self.delete_vipr_volume_type()
def create_coprhd_volume_type(self):
ctx = context.get_admin_context()
vipr_volume_type = volume_types.create(ctx,
"coprhd-volume-type",
{'CoprHD:VPOOL': 'vpool_vipr'})
volume_id = vipr_volume_type['id']
return volume_id
def _get_mocked_common_driver(self):
return MockedEMCCoprHDDriverCommon(
protocol="FC",
default_backend_name="EMCViPRFCDriver",
configuration=self.configuration)
def delete_vipr_volume_type(self):
ctx = context.get_admin_context()
volume_types.destroy(ctx, self.volume_type_id)
def test_create_destroy(self):
volume = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume)
self.driver.delete_volume(volume)
def test_get_volume_stats(self):
vol_stats = self.driver.get_volume_stats(True)
self.assertTrue(vol_stats['free_capacity_gb'], 'unknown')
def test_create_volume_clone(self):
src_volume_data = get_test_volume_data(self.volume_type_id)
clone_volume_data = get_clone_volume_data(self.volume_type_id)
self.driver.create_volume(src_volume_data)
self.driver.create_cloned_volume(clone_volume_data, src_volume_data)
self.driver.delete_volume(src_volume_data)
self.driver.delete_volume(clone_volume_data)
def test_create_destroy_snapshot(self):
volume_data = get_test_volume_data(self.volume_type_id)
snapshot_data = get_test_snapshot_data(
get_source_test_volume_data(self.volume_type_id))
self.driver.create_volume(volume_data)
self.driver.create_snapshot(snapshot_data)
self.driver.delete_snapshot(snapshot_data)
self.driver.delete_volume(volume_data)
def test_create_volume_from_snapshot(self):
src_vol_data = get_source_test_volume_data(self.volume_type_id)
self.driver.create_volume(src_vol_data)
volume_data = get_test_volume_data(self.volume_type_id)
snapshot_data = get_test_snapshot_data(src_vol_data)
self.driver.create_snapshot(snapshot_data)
self.driver.create_volume_from_snapshot(volume_data, snapshot_data)
self.driver.delete_snapshot(snapshot_data)
self.driver.delete_volume(src_vol_data)
self.driver.delete_volume(volume_data)
def test_extend_volume(self):
volume_data = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume_data)
self.driver.extend_volume(volume_data, 2)
self.driver.delete_volume(volume_data)
def test_initialize_and_terminate_connection(self):
connector_data = get_connector_data()
volume_data = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume_data)
res_initiatlize = self.driver.initialize_connection(
volume_data, connector_data)
expected_initialize = {'driver_volume_type': 'fibre_channel',
'data': {'target_lun': 3,
'initiator_target_map':
{'1234567890543211':
['1234567890123456',
'1234567890123456'],
'1234567890123456':
['1234567890123456',
'1234567890123456']},
'target_wwn': ['1234567890123456',
'1234567890123456'],
'target_discovered': False,
'volume_id': '1'}}
self.assertEqual(
expected_initialize, res_initiatlize, 'Unexpected return data')
res_terminate = self.driver.terminate_connection(
volume_data, connector_data)
expected_terminate = {'driver_volume_type': 'fibre_channel',
'data': {'initiator_target_map':
{'1234567890543211':
['1234567890123456',
'1234567890123456'],
'1234567890123456':
['1234567890123456',
'1234567890123456']},
'target_wwn': ['1234567890123456',
'1234567890123456']}}
self.assertEqual(
expected_terminate, res_terminate, 'Unexpected return data')
self.driver.delete_volume(volume_data)
def test_create_delete_empty_CG(self):
cg_data = get_test_CG_data(self.volume_type_id)
ctx = context.get_admin_context()
self.driver.create_consistencygroup(ctx, cg_data)
model_update, volumes_model_update = \
self.driver.delete_consistencygroup(ctx, cg_data, [])
self.assertEqual([], volumes_model_update, 'Unexpected return data')
def test_create_update_delete_CG(self):
cg_data = get_test_CG_data(self.volume_type_id)
ctx = context.get_admin_context()
self.driver.create_consistencygroup(ctx, cg_data)
volume = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume)
model_update, ret1, ret2 = \
self.driver.update_consistencygroup(ctx, cg_data, [volume], [])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
model_update, volumes_model_update = \
self.driver.delete_consistencygroup(ctx, cg_data, [volume])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
self.assertEqual([{'status': 'deleted', 'id': '1'}],
volumes_model_update)
def test_create_delete_CG_snap(self):
cg_snap_data = get_test_CG_snap_data(self.volume_type_id)
ctx = context.get_admin_context()
model_update, snapshots_model_update = \
self.driver.create_cgsnapshot(ctx, cg_snap_data, [])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
self.assertEqual([], snapshots_model_update, 'Unexpected return data')
model_update, snapshots_model_update = \
self.driver.delete_cgsnapshot(ctx, cg_snap_data, [])
self.assertEqual({}, model_update, 'Unexpected return data')
self.assertEqual([], snapshots_model_update, 'Unexpected return data')
class EMCCoprHDScaleIODriverTest(test.TestCase):
def setUp(self):
super(EMCCoprHDScaleIODriverTest, self).setUp()
self.create_coprhd_setup()
def create_coprhd_setup(self):
self.configuration = Mock()
self.configuration.coprhd_hostname = "10.10.10.10"
self.configuration.coprhd_port = "4443"
self.configuration.volume_backend_name = "EMCCoprHDFCDriver"
self.configuration.coprhd_username = "user-name"
self.configuration.coprhd_password = "password"
self.configuration.coprhd_tenant = "tenant"
self.configuration.coprhd_project = "project"
self.configuration.coprhd_varray = "varray"
self.configuration.coprhd_scaleio_rest_gateway_ip = "10.10.10.11"
self.configuration.coprhd_scaleio_rest_gateway_port = 443
self.configuration.coprhd_scaleio_rest_server_username = (
"scaleio_username")
self.configuration.coprhd_scaleio_rest_server_password = (
"scaleio_password")
self.configuration.scaleio_verify_server_certificate = False
self.configuration.scaleio_server_certificate_path = (
"/etc/scaleio/certs")
self.volume_type_id = self.create_coprhd_volume_type()
self.stubs.Set(coprhd_scaleio.EMCCoprHDScaleIODriver,
'_get_common_driver',
self._get_mocked_common_driver)
self.stubs.Set(coprhd_scaleio.EMCCoprHDScaleIODriver,
'_get_client_id',
self._get_client_id)
self.driver = coprhd_scaleio.EMCCoprHDScaleIODriver(
configuration=self.configuration)
def tearDown(self):
self._cleanUp()
super(EMCCoprHDScaleIODriverTest, self).tearDown()
def _cleanUp(self):
self.delete_vipr_volume_type()
def create_coprhd_volume_type(self):
ctx = context.get_admin_context()
vipr_volume_type = volume_types.create(ctx,
"coprhd-volume-type",
{'CoprHD:VPOOL': 'vpool_vipr'})
volume_id = vipr_volume_type['id']
return volume_id
def _get_mocked_common_driver(self):
return MockedEMCCoprHDDriverCommon(
protocol="scaleio",
default_backend_name="EMCCoprHDScaleIODriver",
configuration=self.configuration)
def _get_client_id(self, server_ip, server_port, server_username,
server_password, sdc_ip):
return "bfdf432500000004"
def delete_vipr_volume_type(self):
ctx = context.get_admin_context()
volume_types.destroy(ctx, self.volume_type_id)
def test_create_destroy(self):
volume = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume)
self.driver.delete_volume(volume)
def test_get_volume_stats(self):
vol_stats = self.driver.get_volume_stats(True)
self.assertTrue(vol_stats['free_capacity_gb'], 'unknown')
def test_create_volume_clone(self):
src_volume_data = get_test_volume_data(self.volume_type_id)
clone_volume_data = get_clone_volume_data(self.volume_type_id)
self.driver.create_volume(src_volume_data)
self.driver.create_cloned_volume(clone_volume_data, src_volume_data)
self.driver.delete_volume(src_volume_data)
self.driver.delete_volume(clone_volume_data)
def test_create_destroy_snapshot(self):
volume_data = get_test_volume_data(self.volume_type_id)
snapshot_data = get_test_snapshot_data(
get_source_test_volume_data(self.volume_type_id))
self.driver.create_volume(volume_data)
self.driver.create_snapshot(snapshot_data)
self.driver.delete_snapshot(snapshot_data)
self.driver.delete_volume(volume_data)
def test_create_volume_from_snapshot(self):
src_vol_data = get_source_test_volume_data(self.volume_type_id)
self.driver.create_volume(src_vol_data)
volume_data = get_test_volume_data(self.volume_type_id)
snapshot_data = get_test_snapshot_data(src_vol_data)
self.driver.create_snapshot(snapshot_data)
self.driver.create_volume_from_snapshot(volume_data, snapshot_data)
self.driver.delete_snapshot(snapshot_data)
self.driver.delete_volume(src_vol_data)
self.driver.delete_volume(volume_data)
def test_extend_volume(self):
volume_data = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume_data)
self.driver.extend_volume(volume_data, 2)
self.driver.delete_volume(volume_data)
def test_initialize_and_terminate_connection(self):
connector_data = get_connector_data()
volume_data = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume_data)
res_initiatlize = self.driver.initialize_connection(
volume_data, connector_data)
expected_initialize = {'data': {'bandwidthLimit': None,
'hostIP': '10.0.0.2',
'iopsLimit': None,
'scaleIO_volname': 'test-vol1',
'serverIP': '10.10.10.11',
'serverPassword': 'scaleio_password',
'serverPort': 443,
'serverToken': None,
'serverUsername': 'scaleio_username'},
'driver_volume_type': 'scaleio'}
self.assertEqual(
expected_initialize, res_initiatlize, 'Unexpected return data')
self.driver.terminate_connection(
volume_data, connector_data)
self.driver.delete_volume(volume_data)
def test_create_delete_empty_CG(self):
cg_data = get_test_CG_data(self.volume_type_id)
ctx = context.get_admin_context()
self.driver.create_consistencygroup(ctx, cg_data)
model_update, volumes_model_update = \
self.driver.delete_consistencygroup(ctx, cg_data, [])
self.assertEqual([], volumes_model_update, 'Unexpected return data')
def test_create_update_delete_CG(self):
cg_data = get_test_CG_data(self.volume_type_id)
ctx = context.get_admin_context()
self.driver.create_consistencygroup(ctx, cg_data)
volume = get_test_volume_data(self.volume_type_id)
self.driver.create_volume(volume)
model_update, ret1, ret2 = \
self.driver.update_consistencygroup(ctx, cg_data, [volume], [])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
model_update, volumes_model_update = \
self.driver.delete_consistencygroup(ctx, cg_data, [volume])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
self.assertEqual([{'status': 'deleted', 'id': '1'}],
volumes_model_update)
def test_create_delete_CG_snap(self):
cg_snap_data = get_test_CG_snap_data(self.volume_type_id)
ctx = context.get_admin_context()
model_update, snapshots_model_update = \
self.driver.create_cgsnapshot(ctx, cg_snap_data, [])
self.assertEqual({'status': fields.ConsistencyGroupStatus.AVAILABLE},
model_update)
self.assertEqual([], snapshots_model_update, 'Unexpected return data')
model_update, snapshots_model_update = \
self.driver.delete_cgsnapshot(ctx, cg_snap_data, [])
self.assertEqual({}, model_update, 'Unexpected return data')
self.assertEqual([], snapshots_model_update, 'Unexpected return data')

View File

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,222 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Driver for EMC CoprHD FC volumes."""
import re
from oslo_log import log as logging
from cinder import interface
from cinder.volume import driver
from cinder.volume.drivers.coprhd import common as coprhd_common
from cinder.zonemanager import utils
LOG = logging.getLogger(__name__)
@interface.volumedriver
class EMCCoprHDFCDriver(driver.FibreChannelDriver):
"""CoprHD FC Driver."""
def __init__(self, *args, **kwargs):
super(EMCCoprHDFCDriver, self).__init__(*args, **kwargs)
self.common = self._get_common_driver()
def _get_common_driver(self):
return coprhd_common.EMCCoprHDDriverCommon(
protocol='FC',
default_backend_name=self.__class__.__name__,
configuration=self.configuration)
def check_for_setup_error(self):
self.common.check_for_setup_error()
def create_volume(self, volume):
"""Creates a Volume."""
self.common.create_volume(volume, self)
self.common.set_volume_tags(volume, ['_obj_volume_type'])
def create_cloned_volume(self, volume, src_vref):
"""Creates a cloned Volume."""
self.common.create_cloned_volume(volume, src_vref)
self.common.set_volume_tags(volume, ['_obj_volume_type'])
def create_volume_from_snapshot(self, volume, snapshot):
"""Creates a volume from a snapshot."""
self.common.create_volume_from_snapshot(snapshot, volume)
self.common.set_volume_tags(volume, ['_obj_volume_type'])
def extend_volume(self, volume, new_size):
"""expands the size of the volume."""
self.common.expand_volume(volume, new_size)
def delete_volume(self, volume):
"""Deletes a volume."""
self.common.delete_volume(volume)
def create_snapshot(self, snapshot):
"""Creates a snapshot."""
self.common.create_snapshot(snapshot)
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""
self.common.delete_snapshot(snapshot)
def ensure_export(self, context, volume):
"""Driver entry point to get the export info for an existing volume."""
pass
def create_export(self, context, volume, connector=None):
"""Driver entry point to get the export info for a new volume."""
pass
def remove_export(self, context, volume):
"""Driver exntry point to remove an export for a volume."""
pass
def create_consistencygroup(self, context, group):
"""Creates a consistencygroup."""
return self.common.create_consistencygroup(context, group)
def update_consistencygroup(self, context, group, add_volumes,
remove_volumes):
"""Updates volumes in consistency group."""
return self.common.update_consistencygroup(group, add_volumes,
remove_volumes)
def delete_consistencygroup(self, context, group, volumes):
"""Deletes a consistency group."""
return self.common.delete_consistencygroup(context, group, volumes)
def create_cgsnapshot(self, context, cgsnapshot, snapshots):
"""Creates a cgsnapshot."""
return self.common.create_cgsnapshot(cgsnapshot, snapshots)
def delete_cgsnapshot(self, context, cgsnapshot, snapshots):
"""Deletes a cgsnapshot."""
return self.common.delete_cgsnapshot(cgsnapshot, snapshots)
def check_for_export(self, context, volume_id):
"""Make sure volume is exported."""
pass
@utils.AddFCZone
def initialize_connection(self, volume, connector):
"""Initializes the connection and returns connection info."""
properties = {}
properties['volume_id'] = volume['id']
properties['target_discovered'] = False
properties['target_wwn'] = []
init_ports = self._build_initport_list(connector)
itls = self.common.initialize_connection(volume,
'FC',
init_ports,
connector['host'])
target_wwns = None
initiator_target_map = None
if itls:
properties['target_lun'] = itls[0]['hlu']
target_wwns, initiator_target_map = (
self._build_initiator_target_map(itls, connector))
properties['target_wwn'] = target_wwns
properties['initiator_target_map'] = initiator_target_map
auth = volume['provider_auth']
if auth:
(auth_method, auth_username, auth_secret) = auth.split()
properties['auth_method'] = auth_method
properties['auth_username'] = auth_username
properties['auth_password'] = auth_secret
LOG.debug('FC properties: %s', properties)
return {
'driver_volume_type': 'fibre_channel',
'data': properties
}
@utils.RemoveFCZone
def terminate_connection(self, volume, connector, **kwargs):
"""Driver entry point to detach a volume from an instance."""
init_ports = self._build_initport_list(connector)
itls = self.common.terminate_connection(volume,
'FC',
init_ports,
connector['host'])
volumes_count = self.common.get_exports_count_by_initiators(init_ports)
if volumes_count > 0:
# return empty data
data = {'driver_volume_type': 'fibre_channel', 'data': {}}
else:
target_wwns, initiator_target_map = (
self._build_initiator_target_map(itls, connector))
data = {
'driver_volume_type': 'fibre_channel',
'data': {
'target_wwn': target_wwns,
'initiator_target_map': initiator_target_map}}
LOG.debug('Return FC data: %s', data)
return data
def _build_initiator_target_map(self, itls, connector):
target_wwns = []
for itl in itls:
target_wwns.append(itl['target']['port'].replace(':', '').lower())
initiator_wwns = connector['wwpns']
initiator_target_map = {}
for initiator in initiator_wwns:
initiator_target_map[initiator] = target_wwns
return target_wwns, initiator_target_map
def _build_initport_list(self, connector):
init_ports = []
for i in range(len(connector['wwpns'])):
initiator_port = ':'.join(re.findall(
'..',
connector['wwpns'][i])).upper() # Add ":" every two digits
init_ports.append(initiator_port)
return init_ports
def get_volume_stats(self, refresh=False):
"""Get volume status.
If 'refresh' is True, run update the stats first.
"""
if refresh:
self.update_volume_stats()
return self._stats
def update_volume_stats(self):
"""Retrieve stats info from virtual pool/virtual array."""
LOG.debug("Updating volume stats")
self._stats = self.common.update_volume_stats()
def retype(self, ctxt, volume, new_type, diff, host):
"""Change the volume type."""
return self.common.retype(ctxt, volume, new_type, diff, host)

View File

@ -0,0 +1,216 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
try:
import cookielib as cookie_lib
except ImportError:
import http.cookiejar as cookie_lib
import socket
import requests
from requests import exceptions
import six
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
class Authentication(common.CoprHDResource):
# Commonly used URIs for the 'Authentication' module
URI_SERVICES_BASE = ''
URI_AUTHENTICATION = '/login'
HEADERS = {'Content-Type': 'application/json',
'ACCEPT': 'application/json', 'X-EMC-REST-CLIENT': 'TRUE'}
def authenticate_user(self, username, password):
"""Makes REST API call to generate the authentication token.
Authentication token is generated for the specified user after
validation
:param username: Name of the user
:param password: Password for the user
:returns: The authtoken
"""
SEC_REDIRECT = 302
SEC_AUTHTOKEN_HEADER = 'X-SDS-AUTH-TOKEN'
LB_API_PORT = 4443
# Port on which load-balancer/reverse-proxy listens to all incoming
# requests for CoprHD REST APIs
APISVC_PORT = 8443 # Port on which apisvc listens to incoming requests
cookiejar = cookie_lib.LWPCookieJar()
url = ('https://%(ip)s:%(port)d%(uri)s' %
{'ip': self.ipaddr, 'port': self.port,
'uri': self.URI_AUTHENTICATION})
try:
if self.port == APISVC_PORT:
login_response = requests.get(
url, headers=self.HEADERS, verify=False,
auth=(username, password), cookies=cookiejar,
allow_redirects=False, timeout=common.TIMEOUT_SEC)
if login_response.status_code == SEC_REDIRECT:
location = login_response.headers['Location']
if not location:
raise common.CoprHdError(
common.CoprHdError.HTTP_ERR, (_("The redirect"
" location of the"
" authentication"
" service is not"
" provided")))
# Make the second request
login_response = requests.get(
location, headers=self.HEADERS, verify=False,
cookies=cookiejar, allow_redirects=False,
timeout=common.TIMEOUT_SEC)
if (login_response.status_code !=
requests.codes['unauthorized']):
raise common.CoprHdError(
common.CoprHdError.HTTP_ERR, (_("The"
" authentication"
" service failed"
" to reply with"
" 401")))
# Now provide the credentials
login_response = requests.get(
location, headers=self.HEADERS,
auth=(username, password), verify=False,
cookies=cookiejar, allow_redirects=False,
timeout=common.TIMEOUT_SEC)
if login_response.status_code != SEC_REDIRECT:
raise common.CoprHdError(
common.CoprHdError.HTTP_ERR,
(_("Access forbidden: Authentication required")))
location = login_response.headers['Location']
if not location:
raise common.CoprHdError(
common.CoprHdError.HTTP_ERR,
(_("The"
" authentication service failed to provide the"
" location of the service URI when redirecting"
" back")))
authtoken = login_response.headers[SEC_AUTHTOKEN_HEADER]
if not authtoken:
details_str = self.extract_error_detail(login_response)
raise common.CoprHdError(common.CoprHdError.HTTP_ERR,
(_("The token is not"
" generated by"
" authentication service."
"%s") %
details_str))
# Make the final call to get the page with the token
new_headers = self.HEADERS
new_headers[SEC_AUTHTOKEN_HEADER] = authtoken
login_response = requests.get(
location, headers=new_headers, verify=False,
cookies=cookiejar, allow_redirects=False,
timeout=common.TIMEOUT_SEC)
if login_response.status_code != requests.codes['ok']:
raise common.CoprHdError(
common.CoprHdError.HTTP_ERR, (_(
"Login failure code: "
"%(statuscode)s Error: %(responsetext)s") %
{'statuscode': six.text_type(
login_response.status_code),
'responsetext': login_response.text}))
elif self.port == LB_API_PORT:
login_response = requests.get(
url, headers=self.HEADERS, verify=False,
cookies=cookiejar, allow_redirects=False)
if(login_response.status_code ==
requests.codes['unauthorized']):
# Now provide the credentials
login_response = requests.get(
url, headers=self.HEADERS, auth=(username, password),
verify=False, cookies=cookiejar, allow_redirects=False)
authtoken = None
if SEC_AUTHTOKEN_HEADER in login_response.headers:
authtoken = login_response.headers[SEC_AUTHTOKEN_HEADER]
else:
raise common.CoprHdError(
common.CoprHdError.HTTP_ERR,
(_("Incorrect port number. Load balanced port is: "
"%(lb_api_port)s, api service port is: "
"%(apisvc_port)s") %
{'lb_api_port': LB_API_PORT,
'apisvc_port': APISVC_PORT}))
if not authtoken:
details_str = self.extract_error_detail(login_response)
raise common.CoprHdError(
common.CoprHdError.HTTP_ERR,
(_("The token is not generated by authentication service."
" %s") % details_str))
if login_response.status_code != requests.codes['ok']:
error_msg = None
if login_response.status_code == 401:
error_msg = _("Access forbidden: Authentication required")
elif login_response.status_code == 403:
error_msg = _("Access forbidden: You don't have"
" sufficient privileges to perform"
" this operation")
elif login_response.status_code == 500:
error_msg = _("Bourne internal server error")
elif login_response.status_code == 404:
error_msg = _(
"Requested resource is currently unavailable")
elif login_response.status_code == 405:
error_msg = (_("GET method is not supported by resource:"
" %s"),
url)
elif login_response.status_code == 503:
error_msg = _("Service temporarily unavailable:"
" The server is temporarily unable"
" to service your request")
else:
error_msg = login_response.text
raise common.CoprHdError(common.CoprHdError.HTTP_ERR,
(_("HTTP code: %(status_code)s"
", response: %(reason)s"
" [%(error_msg)s]") % {
'status_code': six.text_type(
login_response.status_code),
'reason': six.text_type(
login_response.reason),
'error_msg': six.text_type(
error_msg)
}))
except (exceptions.SSLError, socket.error, exceptions.ConnectionError,
exceptions.Timeout) as e:
raise common.CoprHdError(
common.CoprHdError.HTTP_ERR, six.text_type(e))
return authtoken
def extract_error_detail(self, login_response):
details_str = ""
try:
if login_response.content:
json_object = common.json_decode(login_response.content)
if 'details' in json_object:
details_str = json_object['details']
return details_str
except common.CoprHdError:
return details_str

View File

@ -0,0 +1,517 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Contains some commonly used utility methods."""
try:
import cookielib as cookie_lib
except ImportError:
import http.cookiejar as cookie_lib
import json
import re
import socket
import threading
import oslo_serialization
from oslo_utils import units
import requests
from requests import exceptions
import six
from cinder import exception
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers.urihelper import (
singletonURIHelperInstance)
PROD_NAME = 'storageos'
TIMEOUT_SEC = 20 # 20 SECONDS
IS_TASK_TIMEOUT = False
global AUTH_TOKEN
AUTH_TOKEN = None
TASK_TIMEOUT = 300
def _decode_list(data):
rv = []
for item in data:
if isinstance(item, unicode):
item = item.encode('utf-8')
elif isinstance(item, list):
item = _decode_list(item)
elif isinstance(item, dict):
item = _decode_dict(item)
rv.append(item)
return rv
def _decode_dict(data):
rv = {}
for key, value in data.items():
if isinstance(key, unicode):
key = key.encode('utf-8')
if isinstance(value, unicode):
value = value.encode('utf-8')
elif isinstance(value, list):
value = _decode_list(value)
elif isinstance(value, dict):
value = _decode_dict(value)
rv[key] = value
return rv
def json_decode(rsp):
"""Used to decode the JSON encoded response."""
o = ""
try:
o = json.loads(rsp, object_hook=_decode_dict)
except ValueError:
raise CoprHdError(CoprHdError.VALUE_ERR,
(_("Failed to recognize JSON payload:\n[%s]") % rsp))
return o
def service_json_request(ip_addr, port, http_method, uri, body,
contenttype='application/json', customheaders=None):
"""Used to make an HTTP request and get the response.
The message body is encoded in JSON format
:param ip_addr: IP address or host name of the server
:param port: port number of the server on which it
is listening to HTTP requests
:param http_method: one of GET, POST, PUT, DELETE
:param uri: the request URI
:param body: the request payload
:returns: a tuple of two elements: (response body, response headers)
:raises: CoprHdError in case of HTTP errors with err_code 3
"""
SEC_AUTHTOKEN_HEADER = 'X-SDS-AUTH-TOKEN'
headers = {'Content-Type': contenttype,
'ACCEPT': 'application/json, application/octet-stream',
'X-EMC-REST-CLIENT': 'TRUE'}
if customheaders:
headers.update(customheaders)
try:
protocol = "https://"
if port == 8080:
protocol = "http://"
url = protocol + ip_addr + ":" + six.text_type(port) + uri
cookiejar = cookie_lib.LWPCookieJar()
headers[SEC_AUTHTOKEN_HEADER] = AUTH_TOKEN
if http_method == 'GET':
response = requests.get(url, headers=headers, verify=False,
cookies=cookiejar)
elif http_method == 'POST':
response = requests.post(url, data=body, headers=headers,
verify=False, cookies=cookiejar)
elif http_method == 'PUT':
response = requests.put(url, data=body, headers=headers,
verify=False, cookies=cookiejar)
elif http_method == 'DELETE':
response = requests.delete(url, headers=headers, verify=False,
cookies=cookiejar)
else:
raise CoprHdError(CoprHdError.HTTP_ERR,
(_("Unknown/Unsupported HTTP method: %s") %
http_method))
if (response.status_code == requests.codes['ok'] or
response.status_code == 202):
return (response.text, response.headers)
error_msg = None
if response.status_code == 500:
responseText = json_decode(response.text)
errorDetails = ""
if 'details' in responseText:
errorDetails = responseText['details']
error_msg = (_("CoprHD internal server error. Error details: %s"),
errorDetails)
elif response.status_code == 401:
error_msg = _("Access forbidden: Authentication required")
elif response.status_code == 403:
error_msg = ""
errorDetails = ""
errorDescription = ""
responseText = json_decode(response.text)
if 'details' in responseText:
errorDetails = responseText['details']
error_msg = (_("%(error_msg)s Error details:"
" %(errorDetails)s"),
{'error_msg': error_msg,
'errorDetails': errorDetails
})
elif 'description' in responseText:
errorDescription = responseText['description']
error_msg = (_("%(error_msg)s Error description:"
" %(errorDescription)s"),
{'error_msg': error_msg,
'errorDescription': errorDescription
})
else:
error_msg = _("Access forbidden: You don't have"
" sufficient privileges to perform this"
" operation")
elif response.status_code == 404:
error_msg = "Requested resource not found"
elif response.status_code == 405:
error_msg = six.text_type(response.text)
elif response.status_code == 503:
error_msg = ""
errorDetails = ""
errorDescription = ""
responseText = json_decode(response.text)
if 'code' in responseText:
errorCode = responseText['code']
error_msg = error_msg + "Error " + six.text_type(errorCode)
if 'details' in responseText:
errorDetails = responseText['details']
error_msg = error_msg + ": " + errorDetails
elif 'description' in responseText:
errorDescription = responseText['description']
error_msg = error_msg + ": " + errorDescription
else:
error_msg = _("Service temporarily unavailable:"
" The server is temporarily unable to"
" service your request")
else:
error_msg = response.text
if isinstance(error_msg, unicode):
error_msg = error_msg.encode('utf-8')
raise CoprHdError(CoprHdError.HTTP_ERR,
(_("HTTP code: %(status_code)s"
", %(reason)s"
" [%(error_msg)s]") % {
'status_code': six.text_type(
response.status_code),
'reason': six.text_type(
response.reason),
'error_msg': six.text_type(
error_msg)
}))
except (CoprHdError, socket.error, exceptions.SSLError,
exceptions.ConnectionError, exceptions.TooManyRedirects,
exceptions.Timeout) as e:
raise CoprHdError(CoprHdError.HTTP_ERR, six.text_type(e))
# TODO(Ravi) : Either following exception should have proper message or
# IOError should just be combined with the above statement
except IOError as e:
raise CoprHdError(CoprHdError.HTTP_ERR, six.text_type(e))
def is_uri(name):
"""Checks whether the name is a URI or not.
:param name: Name of the resource
:returns: True if name is URI, False otherwise
"""
try:
(urn, prod, trailer) = name.split(':', 2)
return (urn == 'urn' and prod == PROD_NAME)
except Exception:
return False
def format_json_object(obj):
"""Formats JSON object to make it readable by proper indentation.
:param obj: JSON object
:returns: a string of formatted JSON object
"""
return oslo_serialization.jsonutils.dumps(obj, sort_keys=True, indent=3)
def get_parent_child_from_xpath(name):
"""Returns the parent and child elements from XPath."""
if '/' in name:
(pname, label) = name.rsplit('/', 1)
else:
pname = None
label = name
return (pname, label)
def to_bytes(in_str):
"""Converts a size to bytes.
:param in_str: a number suffixed with a unit: {number}{unit}
units supported:
K, KB, k or kb - kilobytes
M, MB, m or mb - megabytes
G, GB, g or gb - gigabytes
T, TB, t or tb - terabytes
:returns: number of bytes
None; if input is incorrect
"""
match = re.search('^([0-9]+)([a-zA-Z]{0,2})$', in_str)
if not match:
return None
unit = match.group(2).upper()
value = match.group(1)
size_count = int(value)
if unit in ['K', 'KB']:
multiplier = int(units.Ki)
elif unit in ['M', 'MB']:
multiplier = int(units.Mi)
elif unit in ['G', 'GB']:
multiplier = int(units.Gi)
elif unit in ['T', 'TB']:
multiplier = int(units.Ti)
elif unit == "":
return size_count
else:
return None
size_in_bytes = int(size_count * multiplier)
return size_in_bytes
def get_list(json_object, parent_node_name, child_node_name=None):
"""Returns a list of values from child_node_name.
If child_node is not given, then it will retrieve list from parent node
"""
if not json_object:
return []
return_list = []
if isinstance(json_object[parent_node_name], list):
for detail in json_object[parent_node_name]:
if child_node_name:
return_list.append(detail[child_node_name])
else:
return_list.append(detail)
else:
if child_node_name:
return_list.append(json_object[parent_node_name][child_node_name])
else:
return_list.append(json_object[parent_node_name])
return return_list
def get_node_value(json_object, parent_node_name, child_node_name=None):
"""Returns value of given child_node.
If child_node is not given, then value of parent node is returned
returns None: If json_object or parent_node is not given,
If child_node is not found under parent_node
"""
if not json_object:
return None
if not parent_node_name:
return None
detail = json_object[parent_node_name]
if not child_node_name:
return detail
return_value = None
if child_node_name in detail:
return_value = detail[child_node_name]
else:
return_value = None
return return_value
def format_err_msg_and_raise(operation_type, component,
error_message, error_code):
"""Method to format error message.
:param operation_type: create, update, add, etc
:param component: storagesystem, vpool, etc
:param error_code: Error code from the API call
:param error_message: Detailed error message
"""
formated_err_msg = (_("Error: Failed to %(operation_type)s"
" %(component)s") %
{'operation_type': operation_type,
'component': component
})
if error_message.startswith("\"\'") and error_message.endswith("\'\""):
# stripping the first 2 and last 2 characters, which are quotes.
error_message = error_message[2:len(error_message) - 2]
formated_err_msg = formated_err_msg + "\nReason:" + error_message
raise CoprHdError(error_code, formated_err_msg)
def search_by_tag(resource_search_uri, ipaddr, port):
"""Fetches the list of resources with a given tag.
:param resource_search_uri: The tag based search uri
Example: '/block/volumes/search?tag=tagexample1'
:param ipaddr: IP address of CoprHD host
:param port: Port number
"""
# check if the URI passed has both project and name parameters
strUri = six.text_type(resource_search_uri)
if strUri.__contains__("search") and strUri.__contains__("?tag="):
# Get the project URI
(s, h) = service_json_request(
ipaddr, port, "GET",
resource_search_uri, None)
o = json_decode(s)
if not o:
return None
resources = get_node_value(o, "resource")
resource_uris = []
for resource in resources:
resource_uris.append(resource["id"])
return resource_uris
else:
raise CoprHdError(CoprHdError.VALUE_ERR, (_("Search URI %s"
" is not in the expected"
" format, it should end"
" with ?tag={0}")
% strUri))
# Timeout handler for synchronous operations
def timeout_handler():
global IS_TASK_TIMEOUT
IS_TASK_TIMEOUT = True
# Blocks the operation until the task is complete/error out/timeout
def block_until_complete(component_type,
resource_uri,
task_id,
ipAddr,
port,
synctimeout=0):
global IS_TASK_TIMEOUT
IS_TASK_TIMEOUT = False
if synctimeout:
t = threading.Timer(synctimeout, timeout_handler)
else:
synctimeout = TASK_TIMEOUT
t = threading.Timer(synctimeout, timeout_handler)
t.start()
while True:
out = get_task_by_resourceuri_and_taskId(
component_type, resource_uri, task_id, ipAddr, port)
if out:
if out["state"] == "ready":
# cancel the timer and return
t.cancel()
break
# if the status of the task is 'error' then cancel the timer
# and raise exception
if out["state"] == "error":
# cancel the timer
t.cancel()
if ("service_error" in out and
"details" in out["service_error"]):
error_message = out["service_error"]["details"]
raise CoprHdError(CoprHdError.VALUE_ERR,
(_("Task: %(task_id)s"
" is failed with"
" error: %(error_message)s") %
{'task_id': task_id,
'error_message': error_message
}))
if IS_TASK_TIMEOUT:
IS_TASK_TIMEOUT = False
raise CoprHdError(CoprHdError.TIME_OUT,
(_("Task did not complete in %d secs."
" Operation timed out. Task in CoprHD"
" will continue") % synctimeout))
return
def get_task_by_resourceuri_and_taskId(component_type, resource_uri,
task_id, ipAddr, port):
"""Returns the single task details."""
task_uri_constant = singletonURIHelperInstance.getUri(
component_type, "task")
(s, h) = service_json_request(
ipAddr, port, "GET",
task_uri_constant.format(resource_uri, task_id), None)
if not s:
return None
o = json_decode(s)
return o
class CoprHdError(exception.VolumeBackendAPIException):
"""Custom exception class used to report logical errors.
Attributes:
err_code - String error code
msg - String text
"""
SOS_FAILURE_ERR = 1
CMD_LINE_ERR = 2
HTTP_ERR = 3
VALUE_ERR = 4
NOT_FOUND_ERR = 1
ENTRY_ALREADY_EXISTS_ERR = 5
MAX_COUNT_REACHED = 6
TIME_OUT = 7
def __init__(self, err_code, msg):
self.err_code = err_code
self.msg = msg
def __str__(self):
return repr(self.msg)
class CoprHDResource(object):
def __init__(self, ipaddr, port):
"""Constructor: takes IP address and port of the CoprHD instance.
These are needed to make http requests for REST API
"""
self.ipaddr = ipaddr
self.port = port

View File

@ -0,0 +1,220 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_serialization
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
from cinder.volume.drivers.coprhd.helpers import project
class ConsistencyGroup(common.CoprHDResource):
URI_CONSISTENCY_GROUP = "/block/consistency-groups"
URI_CONSISTENCY_GROUPS_INSTANCE = URI_CONSISTENCY_GROUP + "/{0}"
URI_CONSISTENCY_GROUPS_DEACTIVATE = (URI_CONSISTENCY_GROUPS_INSTANCE +
"/deactivate")
URI_CONSISTENCY_GROUPS_SEARCH = (
'/block/consistency-groups/search?project={0}')
URI_SEARCH_CONSISTENCY_GROUPS_BY_TAG = (
'/block/consistency-groups/search?tag={0}')
URI_CONSISTENCY_GROUP_TAGS = (
'/block/consistency-groups/{0}/tags')
def list(self, project_name, tenant):
"""This function gives list of comma separated consistency group uris.
:param project_name: Name of the project path
:param tenant: Name of the tenant
:returns: list of consistency group ids separated by comma
"""
if tenant is None:
tenant = ""
projobj = project.Project(self.ipaddr, self.port)
fullproj = tenant + "/" + project_name
projuri = projobj.project_query(fullproj)
(s, h) = common.service_json_request(
self.ipaddr, self.port, "GET",
self.URI_CONSISTENCY_GROUPS_SEARCH.format(projuri), None)
o = common.json_decode(s)
if not o:
return []
congroups = []
resources = common.get_node_value(o, "resource")
for resource in resources:
congroups.append(resource["id"])
return congroups
def show(self, name, project, tenant):
"""This function will display the consistency group with details.
:param name : Name of the consistency group
:param project: Name of the project
:param tenant: Name of the tenant
:returns: details of consistency group
"""
uri = self.consistencygroup_query(name, project, tenant)
(s, h) = common.service_json_request(
self.ipaddr, self.port, "GET",
self.URI_CONSISTENCY_GROUPS_INSTANCE.format(uri), None)
o = common.json_decode(s)
if o['inactive']:
return None
return o
def consistencygroup_query(self, name, project, tenant):
"""This function will return consistency group id.
:param name : Name/id of the consistency group
:param project: Name of the project
:param tenant: Name of the tenant
:returns: id of the consistency group
"""
if common.is_uri(name):
return name
uris = self.list(project, tenant)
for uri in uris:
congroup = self.show(uri, project, tenant)
if congroup and congroup['name'] == name:
return congroup['id']
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
(_("Consistency Group %s: not found") % name))
# Blocks the operation until the task is complete/error out/timeout
def check_for_sync(self, result, sync, synctimeout=0):
if len(result["resource"]) > 0:
resource = result["resource"]
return (
common.block_until_complete("consistencygroup", resource["id"],
result["id"], self.ipaddr,
self.port, synctimeout)
)
else:
raise common.CoprHdError(
common.CoprHdError.SOS_FAILURE_ERR,
_("error: task list is empty, no task response found"))
def create(self, name, project_name, tenant):
"""This function will create consistency group with the given name.
:param name : Name of the consistency group
:param project_name: Name of the project path
:param tenant: Container tenant name
:returns: status of creation
"""
# check for existence of consistency group.
try:
status = self.show(name, project_name, tenant)
except common.CoprHdError as e:
if e.err_code == common.CoprHdError.NOT_FOUND_ERR:
if tenant is None:
tenant = ""
fullproj = tenant + "/" + project_name
projobj = project.Project(self.ipaddr, self.port)
projuri = projobj.project_query(fullproj)
parms = {'name': name, 'project': projuri, }
body = oslo_serialization.jsonutils.dumps(parms)
(s, h) = common.service_json_request(
self.ipaddr, self.port, "POST",
self.URI_CONSISTENCY_GROUP, body)
o = common.json_decode(s)
return o
else:
raise
if status:
common.format_err_msg_and_raise(
"create", "consistency group",
(_("consistency group with name: %s already exists") % name),
common.CoprHdError.ENTRY_ALREADY_EXISTS_ERR)
def delete(self, name, project, tenant, coprhdonly=False):
"""This function marks a particular consistency group as delete.
:param name: Name of the consistency group
:param project: Name of the project
:param tenant: Name of the tenant
:returns: status of the delete operation
false, incase it fails to do delete
"""
params = ''
if coprhdonly is True:
params += "?type=" + 'CoprHD_ONLY'
uri = self.consistencygroup_query(name, project, tenant)
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"POST",
self.URI_CONSISTENCY_GROUPS_DEACTIVATE.format(uri) + params,
None)
return
def update(self, uri, project, tenant, add_volumes, remove_volumes,
sync, synctimeout=0):
"""Function used to add or remove volumes from consistency group.
It will update the consistency group with given volumes
:param uri : URI of the consistency group
:param project : Name of the project path
:param tenant : Container tenant name
:param add_volumes : volumes to be added to the consistency group
:param remove_volumes: volumes to be removed from CG
:param sync : synchronous request
:param synctimeout : Query for task status for "synctimeout" secs.
If the task doesn't complete in synctimeout
secs, an exception is thrown
:returns: status of creation
"""
if tenant is None:
tenant = ""
parms = []
add_voluris = []
remove_voluris = []
from cinder.volume.drivers.coprhd.helpers.volume import Volume
volobj = Volume(self.ipaddr, self.port)
if add_volumes:
for volname in add_volumes:
full_project_name = tenant + "/" + project
add_voluris.append(
volobj.volume_query(full_project_name, volname))
volumes = {'volume': add_voluris}
parms = {'add_volumes': volumes}
if remove_volumes:
for volname in remove_volumes:
full_project_name = tenant + "/" + project
remove_voluris.append(
volobj.volume_query(full_project_name, volname))
volumes = {'volume': remove_voluris}
parms = {'remove_volumes': volumes}
body = oslo_serialization.jsonutils.dumps(parms)
(s, h) = common.service_json_request(
self.ipaddr, self.port, "PUT",
self.URI_CONSISTENCY_GROUPS_INSTANCE.format(uri),
body)
o = common.json_decode(s)
if sync:
return self.check_for_sync(o, sync, synctimeout)
else:
return o

View File

@ -0,0 +1,303 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_serialization
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
from cinder.volume.drivers.coprhd.helpers import host
from cinder.volume.drivers.coprhd.helpers import project
from cinder.volume.drivers.coprhd.helpers import virtualarray
from cinder.volume.drivers.coprhd.helpers import volume
class ExportGroup(common.CoprHDResource):
URI_EXPORT_GROUP = "/block/exports"
URI_EXPORT_GROUPS_SHOW = URI_EXPORT_GROUP + "/{0}"
URI_EXPORT_GROUP_SEARCH = '/block/exports/search'
URI_EXPORT_GROUP_UPDATE = '/block/exports/{0}'
def exportgroup_remove_volumes_by_uri(self, exportgroup_uri,
volume_id_list, sync=False,
tenantname=None, projectname=None,
cg=None, synctimeout=0):
"""Remove volumes from the exportgroup, given the uris of volume."""
volume_list = volume_id_list
parms = {}
parms['volume_changes'] = self._remove_list(volume_list)
o = self.send_json_request(exportgroup_uri, parms)
return self.check_for_sync(o, sync, synctimeout)
def _remove_list(self, uris):
resChanges = {}
if not isinstance(uris, list):
resChanges['remove'] = [uris]
else:
resChanges['remove'] = uris
return resChanges
def send_json_request(self, exportgroup_uri, param):
body = oslo_serialization.jsonutils.dumps(param)
(s, h) = common.service_json_request(
self.ipaddr, self.port, "PUT",
self.URI_EXPORT_GROUP_UPDATE.format(exportgroup_uri), body)
return common.json_decode(s)
def check_for_sync(self, result, sync, synctimeout=0):
if sync:
if len(result["resource"]) > 0:
resource = result["resource"]
return (
common.block_until_complete("export", resource["id"],
result["id"], self.ipaddr,
self.port, synctimeout)
)
else:
raise common.CoprHdError(
common.CoprHdError.SOS_FAILURE_ERR, _(
"error: task list is empty, no task response found"))
else:
return result
def exportgroup_list(self, project_name, tenant):
"""This function gives list of export group uris separated by comma.
:param project_name: Name of the project path
:param tenant: Name of the tenant
:returns: list of export group ids separated by comma
"""
if tenant is None:
tenant = ""
projobj = project.Project(self.ipaddr, self.port)
fullproj = tenant + "/" + project_name
projuri = projobj.project_query(fullproj)
uri = self.URI_EXPORT_GROUP_SEARCH
if '?' in uri:
uri += '&project=' + projuri
else:
uri += '?project=' + projuri
(s, h) = common.service_json_request(self.ipaddr, self.port, "GET",
uri, None)
o = common.json_decode(s)
if not o:
return []
exportgroups = []
resources = common.get_node_value(o, "resource")
for resource in resources:
exportgroups.append(resource["id"])
return exportgroups
def exportgroup_show(self, name, project, tenant, varray=None):
"""This function displays the Export group with details.
:param name: Name of the export group
:param project: Name of the project
:param tenant: Name of the tenant
:returns: Details of export group
"""
varrayuri = None
if varray:
varrayObject = virtualarray.VirtualArray(
self.ipaddr, self.port)
varrayuri = varrayObject.varray_query(varray)
uri = self.exportgroup_query(name, project, tenant, varrayuri)
(s, h) = common.service_json_request(
self.ipaddr,
self.port,
"GET",
self.URI_EXPORT_GROUPS_SHOW.format(uri), None)
o = common.json_decode(s)
if o['inactive']:
return None
return o
def exportgroup_create(self, name, project_name, tenant, varray,
exportgrouptype, export_destination=None):
"""This function creates the Export group with given name.
:param name: Name of the export group
:param project_name: Name of the project path
:param tenant: Container tenant name
:param varray: Name of the virtual array
:param exportgrouptype: Type of the export group. Ex:Host etc
:returns: status of creation
"""
# check for existence of export group.
try:
status = self.exportgroup_show(name, project_name, tenant)
except common.CoprHdError as e:
if e.err_code == common.CoprHdError.NOT_FOUND_ERR:
if tenant is None:
tenant = ""
fullproj = tenant + "/" + project_name
projObject = project.Project(self.ipaddr, self.port)
projuri = projObject.project_query(fullproj)
varrayObject = virtualarray.VirtualArray(
self.ipaddr, self.port)
nhuri = varrayObject.varray_query(varray)
parms = {
'name': name,
'project': projuri,
'varray': nhuri,
'type': exportgrouptype
}
if exportgrouptype and export_destination:
host_obj = host.Host(self.ipaddr, self.port)
host_uri = host_obj.query_by_name(export_destination)
parms['hosts'] = [host_uri]
body = oslo_serialization.jsonutils.dumps(parms)
(s, h) = common.service_json_request(self.ipaddr,
self.port, "POST",
self.URI_EXPORT_GROUP,
body)
o = common.json_decode(s)
return o
else:
raise
if status:
raise common.CoprHdError(
common.CoprHdError.ENTRY_ALREADY_EXISTS_ERR, (_(
"Export group with name %s"
" already exists") % name))
def exportgroup_query(self, name, project, tenant, varrayuri=None):
"""Makes REST API call to query the exportgroup by name.
:param name: Name/id of the export group
:param project: Name of the project
:param tenant: Name of the tenant
:param varrayuri: URI of the virtual array
:returns: id of the export group
"""
if common.is_uri(name):
return name
uris = self.exportgroup_list(project, tenant)
for uri in uris:
exportgroup = self.exportgroup_show(uri, project, tenant)
if exportgroup and exportgroup['name'] == name:
if varrayuri:
varrayobj = exportgroup['varray']
if varrayobj['id'] == varrayuri:
return exportgroup['id']
else:
continue
else:
return exportgroup['id']
raise common.CoprHdError(
common.CoprHdError.NOT_FOUND_ERR,
(_("Export Group %s: not found") % name))
def exportgroup_add_volumes(self, sync, exportgroupname, tenantname,
maxpaths, minpaths, pathsperinitiator,
projectname, volumenames,
cg=None, synctimeout=0, varray=None):
"""Add volume to export group.
:param sync : synchronous request
:param exportgroupname : Name/id of the export group
:param tenantname : tenant name
:param maxpaths : Maximum number of paths
:param minpaths : Minimum number of paths
:param pathsperinitiator : Paths per initiator
:param projectname : name of project
:param volumenames : names of volumes that needs
to be added to exportgroup
:param cg : consistency group
:param synctimeout : Query for task status for "synctimeout" secs
If the task doesn't complete in synctimeout
secs, an exception is thrown
:param varray : Name of varray
:returns: action result
"""
varrayuri = None
if varray:
varrayObject = virtualarray.VirtualArray(
self.ipaddr, self.port)
varrayuri = varrayObject.varray_query(varray)
exportgroup_uri = self.exportgroup_query(exportgroupname,
projectname,
tenantname,
varrayuri)
# get volume uri
if tenantname is None:
tenantname = ""
# List of volumes
volume_list = []
if volumenames:
volume_list = self._get_resource_lun_tuple(
volumenames, "volumes", None, tenantname,
projectname, None)
parms = {}
# construct the body
volChanges = {}
volChanges['add'] = volume_list
parms['volume_changes'] = volChanges
o = self.send_json_request(exportgroup_uri, parms)
return self.check_for_sync(o, sync, synctimeout)
def _get_resource_lun_tuple(self, resources, resType, baseResUri,
tenantname, projectname, blockTypeName):
"""Function to validate input volumes and return list of ids and luns.
"""
copyEntries = []
volumeObject = volume.Volume(self.ipaddr, self.port)
for copy in resources:
copyParam = []
try:
copyParam = copy.split(":")
except Exception:
raise common.CoprHdError(
common.CoprHdError.CMD_LINE_ERR,
(_("Please provide valid format volume:"
" lun for parameter %s") %
resType))
copy = dict()
if not len(copyParam):
raise common.CoprHdError(
common.CoprHdError.CMD_LINE_ERR,
(_("Please provide at least one volume for parameter %s") %
resType))
if resType == "volumes":
full_project_name = tenantname + "/" + projectname
copy['id'] = volumeObject.volume_query(
full_project_name, copyParam[0])
if len(copyParam) > 1:
copy['lun'] = copyParam[1]
copyEntries.append(copy)
return copyEntries

View File

@ -0,0 +1,104 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
from cinder.volume.drivers.coprhd.helpers import tenant
class Host(common.CoprHDResource):
# All URIs for the Host operations
URI_HOST_DETAILS = "/compute/hosts/{0}"
URI_HOST_LIST_INITIATORS = "/compute/hosts/{0}/initiators"
URI_COMPUTE_HOST = "/compute/hosts"
URI_HOSTS_SEARCH_BY_NAME = "/compute/hosts/search?name={0}"
def query_by_name(self, host_name, tenant_name=None):
"""Search host matching host_name and tenant if tenant_name provided.
tenant_name is optional
"""
hostList = self.list_all(tenant_name)
for host in hostList:
hostUri = host['id']
hostDetails = self.show_by_uri(hostUri)
if hostDetails:
if hostDetails['name'] == host_name:
return hostUri
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR, (_(
"Host with name: %s not found") % host_name))
def list_initiators(self, host_name):
"""Lists all initiators for the given host.
:param host_name: The name of the host
"""
if not common.is_uri(host_name):
hostUri = self.query_by_name(host_name, None)
else:
hostUri = host_name
(s, h) = common.service_json_request(
self.ipaddr, self.port, "GET",
Host.URI_HOST_LIST_INITIATORS.format(hostUri),
None)
o = common.json_decode(s)
if not o or "initiator" not in o:
return []
return common.get_node_value(o, 'initiator')
def list_all(self, tenant_name):
"""Gets the ids and self links for all compute elements."""
restapi = self.URI_COMPUTE_HOST
tenant_obj = tenant.Tenant(self.ipaddr, self.port)
if tenant_name is None:
tenant_uri = tenant_obj.tenant_getid()
else:
tenant_uri = tenant_obj.tenant_query(tenant_name)
restapi = restapi + "?tenant=" + tenant_uri
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"GET",
restapi,
None)
o = common.json_decode(s)
return o['host']
def show_by_uri(self, uri):
"""Makes REST API call to retrieve Host details based on its UUID."""
(s, h) = common.service_json_request(self.ipaddr, self.port, "GET",
Host.URI_HOST_DETAILS.format(uri),
None)
o = common.json_decode(s)
inactive = common.get_node_value(o, 'inactive')
if inactive:
return None
return o
def search_by_name(self, host_name):
"""Search host by its name."""
(s, h) = common.service_json_request(
self.ipaddr, self.port, "GET",
self.URI_HOSTS_SEARCH_BY_NAME.format(host_name), None)
o = common.json_decode(s)
if not o:
return []
return common.get_node_value(o, "resource")

View File

@ -0,0 +1,88 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
from cinder.volume.drivers.coprhd.helpers import tenant
class Project(common.CoprHDResource):
# Commonly used URIs for the 'Project' module
URI_PROJECT_LIST = '/tenants/{0}/projects'
URI_PROJECT = '/projects/{0}'
def project_query(self, name):
"""Retrieves UUID of project based on its name.
:param name: name of project
:returns: UUID of project
:raises: CoprHdError - when project name is not found
"""
if common.is_uri(name):
return name
(tenant_name, project_name) = common.get_parent_child_from_xpath(name)
tenant_obj = tenant.Tenant(self.ipaddr, self.port)
tenant_uri = tenant_obj.tenant_query(tenant_name)
projects = self.project_list(tenant_uri)
if projects:
for project in projects:
if project:
project_detail = self.project_show_by_uri(
project['id'])
if(project_detail and
project_detail['name'] == project_name):
return project_detail['id']
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR, (_(
"Project: %s not found") % project_name))
def project_list(self, tenant_name):
"""Makes REST API call and retrieves projects based on tenant UUID.
:param tenant_name: Name of the tenant
:returns: List of project UUIDs in JSON response payload
"""
tenant_obj = tenant.Tenant(self.ipaddr, self.port)
tenant_uri = tenant_obj.tenant_query(tenant_name)
(s, h) = common.service_json_request(self.ipaddr, self.port, "GET",
Project.URI_PROJECT_LIST.format(
tenant_uri),
None)
o = common.json_decode(s)
if "project" in o:
return common.get_list(o, 'project')
return []
def project_show_by_uri(self, uri):
"""Makes REST API call and retrieves project derails based on UUID.
:param uri: UUID of project
:returns: Project details in JSON response payload
"""
(s, h) = common.service_json_request(self.ipaddr, self.port,
"GET",
Project.URI_PROJECT.format(uri),
None)
o = common.json_decode(s)
inactive = common.get_node_value(o, 'inactive')
if inactive:
return None
return o

View File

@ -0,0 +1,314 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import oslo_serialization
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
from cinder.volume.drivers.coprhd.helpers import consistencygroup
from cinder.volume.drivers.coprhd.helpers import volume
class Snapshot(common.CoprHDResource):
# Commonly used URIs for the 'Snapshot' module
URI_SNAPSHOTS = '/{0}/snapshots/{1}'
URI_BLOCK_SNAPSHOTS = '/block/snapshots/{0}'
URI_SEARCH_SNAPSHOT_BY_TAG = '/block/snapshots/search?tag={0}'
URI_SNAPSHOT_LIST = '/{0}/{1}/{2}/protection/snapshots'
URI_SNAPSHOT_TASKS_BY_OPID = '/vdc/tasks/{0}'
URI_RESOURCE_DEACTIVATE = '{0}/deactivate'
URI_CONSISTENCY_GROUP = "/block/consistency-groups"
URI_CONSISTENCY_GROUPS_SNAPSHOT_INSTANCE = (
URI_CONSISTENCY_GROUP + "/{0}/protection/snapshots/{1}")
URI_CONSISTENCY_GROUPS_SNAPSHOT_DEACTIVATE = (
URI_CONSISTENCY_GROUPS_SNAPSHOT_INSTANCE + "/deactivate")
URI_BLOCK_SNAPSHOTS_TAG = URI_BLOCK_SNAPSHOTS + '/tags'
VOLUMES = 'volumes'
CG = 'consistency-groups'
BLOCK = 'block'
is_timeout = False
timeout = 300
def snapshot_list_uri(self, otype, otypename, ouri):
"""Makes REST API call to list snapshots under a volume.
:param otype : block
:param otypename : either volume or consistency-group should be
provided
:param ouri : uri of volume or consistency-group
:returns: list of snapshots
"""
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"GET",
Snapshot.URI_SNAPSHOT_LIST.format(otype, otypename, ouri), None)
o = common.json_decode(s)
return o['snapshot']
def snapshot_show_uri(self, otype, resource_uri, suri):
"""Retrieves snapshot details based on snapshot Name or Label.
:param otype : block
:param suri : uri of the Snapshot.
:param resource_uri: uri of the source resource
:returns: Snapshot details in JSON response payload
"""
if(resource_uri is not None and
resource_uri.find('BlockConsistencyGroup') > 0):
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"GET",
Snapshot.URI_CONSISTENCY_GROUPS_SNAPSHOT_INSTANCE.format(
resource_uri,
suri),
None)
else:
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"GET",
Snapshot.URI_SNAPSHOTS.format(otype, suri), None)
return common.json_decode(s)
def snapshot_query(self, storageres_type,
storageres_typename, resuri, snapshot_name):
if resuri is not None:
uris = self.snapshot_list_uri(
storageres_type,
storageres_typename,
resuri)
for uri in uris:
snapshot = self.snapshot_show_uri(
storageres_type,
resuri,
uri['id'])
if (False == common.get_node_value(snapshot, 'inactive') and
snapshot['name'] == snapshot_name):
return snapshot['id']
raise common.CoprHdError(
common.CoprHdError.SOS_FAILURE_ERR,
(_("snapshot with the name: "
"%s Not Found") % snapshot_name))
def snapshot_show_task_opid(self, otype, snap, taskid):
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"GET",
Snapshot.URI_SNAPSHOT_TASKS_BY_OPID.format(taskid),
None)
if (not s):
return None
o = common.json_decode(s)
return o
# Blocks the operation until the task is complete/error out/timeout
def block_until_complete(self, storageres_type, resuri,
task_id, synctimeout=0):
if synctimeout:
t = threading.Timer(synctimeout, common.timeout_handler)
else:
synctimeout = self.timeout
t = threading.Timer(synctimeout, common.timeout_handler)
t.start()
while True:
out = self.snapshot_show_task_opid(
storageres_type, resuri, task_id)
if out:
if out["state"] == "ready":
# cancel the timer and return
t.cancel()
break
# if the status of the task is 'error' then cancel the timer
# and raise exception
if out["state"] == "error":
# cancel the timer
t.cancel()
error_message = "Please see logs for more details"
if("service_error" in out and
"details" in out["service_error"]):
error_message = out["service_error"]["details"]
raise common.CoprHdError(
common.CoprHdError.VALUE_ERR,
(_("Task: %(task_id)s is failed with error: "
"%(error_message)s") %
{'task_id': task_id,
'error_message': error_message}))
if self.is_timeout:
self.is_timeout = False
raise common.CoprHdError(common.CoprHdError.TIME_OUT,
(_("Task did not complete in %d secs."
" Operation timed out. Task in"
" CoprHD will continue") %
synctimeout))
return
def storage_resource_query(self,
storageres_type,
volume_name,
cg_name,
project,
tenant):
resourcepath = "/" + project
if tenant is not None:
resourcepath = tenant + resourcepath
resUri = None
resourceObj = None
if Snapshot.BLOCK == storageres_type and volume_name is not None:
resourceObj = volume.Volume(self.ipaddr, self.port)
resUri = resourceObj.volume_query(resourcepath, volume_name)
elif Snapshot.BLOCK == storageres_type and cg_name is not None:
resourceObj = consistencygroup.ConsistencyGroup(
self.ipaddr,
self.port)
resUri = resourceObj.consistencygroup_query(
cg_name,
project,
tenant)
else:
resourceObj = None
return resUri
def snapshot_create(self, otype, typename, ouri,
snaplabel, inactive, sync,
readonly=False, synctimeout=0):
"""New snapshot is created, for a given volume.
:param otype : block type should be provided
:param typename : either volume or consistency-groups should
be provided
:param ouri : uri of volume
:param snaplabel : name of the snapshot
:param inactive : if true, the snapshot will not activate the
synchronization between source and target volumes
:param sync : synchronous request
:param synctimeout : Query for task status for "synctimeout" secs.
If the task doesn't complete in synctimeout
secs, an exception is thrown
"""
# check snapshot is already exist
is_snapshot_exist = True
try:
self.snapshot_query(otype, typename, ouri, snaplabel)
except common.CoprHdError as e:
if e.err_code == common.CoprHdError.NOT_FOUND_ERR:
is_snapshot_exist = False
else:
raise
if is_snapshot_exist:
raise common.CoprHdError(
common.CoprHdError.ENTRY_ALREADY_EXISTS_ERR,
(_("Snapshot with name %(snaplabel)s"
" already exists under %(typename)s") %
{'snaplabel': snaplabel,
'typename': typename
}))
parms = {
'name': snaplabel,
# if true, the snapshot will not activate the synchronization
# between source and target volumes
'create_inactive': inactive
}
if readonly is True:
parms['read_only'] = readonly
body = oslo_serialization.jsonutils.dumps(parms)
# REST api call
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"POST",
Snapshot.URI_SNAPSHOT_LIST.format(otype, typename, ouri), body)
o = common.json_decode(s)
task = o["task"][0]
if sync:
return (
self.block_until_complete(
otype,
task['resource']['id'],
task["id"], synctimeout)
)
else:
return o
def snapshot_delete_uri(self, otype, resource_uri,
suri, sync, synctimeout=0):
"""Delete a snapshot by uri.
:param otype : block
:param resource_uri: uri of the source resource
:param suri : Uri of the Snapshot
:param sync : To perform operation synchronously
:param synctimeout : Query for task status for "synctimeout" secs. If
the task doesn't complete in synctimeout secs, an
exception is thrown
"""
s = None
if resource_uri.find("Volume") > 0:
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"POST",
Snapshot.URI_RESOURCE_DEACTIVATE.format(
Snapshot.URI_BLOCK_SNAPSHOTS.format(suri)),
None)
elif resource_uri.find("BlockConsistencyGroup") > 0:
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"POST",
Snapshot.URI_CONSISTENCY_GROUPS_SNAPSHOT_DEACTIVATE.format(
resource_uri,
suri),
None)
o = common.json_decode(s)
task = o["task"][0]
if sync:
return (
self.block_until_complete(
otype,
task['resource']['id'],
task["id"], synctimeout)
)
else:
return o
def snapshot_delete(self, storageres_type,
storageres_typename, resource_uri,
name, sync, synctimeout=0):
snapshotUri = self.snapshot_query(
storageres_type,
storageres_typename,
resource_uri,
name)
self.snapshot_delete_uri(
storageres_type,
resource_uri,
snapshotUri,
sync, synctimeout)

View File

@ -0,0 +1,55 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Contains tagging related methods."""
import oslo_serialization
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
class Tag(common.CoprHDResource):
def tag_resource(self, uri, resource_id, add, remove):
params = {
'add': add,
'remove': remove
}
body = oslo_serialization.jsonutils.dumps(params)
(s, h) = common.service_json_request(self.ipaddr, self.port, "PUT",
uri.format(resource_id), body)
o = common.json_decode(s)
return o
def list_tags(self, resource_uri):
if resource_uri.__contains__("tag") is False:
raise common.CoprHdError(
common.CoprHdError.VALUE_ERR, _("URI should end with /tag"))
(s, h) = common.service_json_request(self.ipaddr,
self.port,
"GET",
resource_uri,
None)
allTags = []
o = common.json_decode(s)
allTags = o['tag']
return allTags

View File

@ -0,0 +1,117 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
class Tenant(common.CoprHDResource):
URI_SERVICES_BASE = ''
URI_TENANT = URI_SERVICES_BASE + '/tenant'
URI_TENANTS = URI_SERVICES_BASE + '/tenants/{0}'
URI_TENANTS_SUBTENANT = URI_TENANTS + '/subtenants'
def tenant_query(self, label):
"""Returns the UID of the tenant specified by the hierarchical name.
(ex tenant1/tenant2/tenant3)
"""
if common.is_uri(label):
return label
tenant_id = self.tenant_getid()
if not label:
return tenant_id
subtenants = self.tenant_list(tenant_id)
subtenants.append(self.tenant_show(None))
for tenant in subtenants:
if tenant['name'] == label:
rslt = self.tenant_show_by_uri(tenant['id'])
if rslt:
return tenant['id']
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
(_("Tenant %s: not found") % label))
def tenant_show(self, label):
"""Returns the details of the tenant based on its name."""
if label:
tenant_id = self.tenant_query(label)
else:
tenant_id = self.tenant_getid()
return self.tenant_show_by_uri(tenant_id)
def tenant_getid(self):
(s, h) = common.service_json_request(self.ipaddr, self.port,
"GET", Tenant.URI_TENANT, None)
o = common.json_decode(s)
return o['id']
def tenant_list(self, uri=None):
"""Returns all the tenants under a parent tenant.
:param uri: The parent tenant name
:returns: JSON payload of tenant list
"""
if not uri:
uri = self.tenant_getid()
tenantdtls = self.tenant_show_by_uri(uri)
if(tenantdtls and not ('parent_tenant' in tenantdtls and
("id" in tenantdtls['parent_tenant']))):
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"GET", self.URI_TENANTS_SUBTENANT.format(uri), None)
o = common.json_decode(s)
return o['subtenant']
else:
return []
def tenant_show_by_uri(self, uri):
"""Makes REST API call to retrieve tenant details based on UUID."""
(s, h) = common.service_json_request(self.ipaddr, self.port, "GET",
Tenant.URI_TENANTS.format(uri),
None)
o = common.json_decode(s)
if 'inactive' in o and o['inactive']:
return None
return o
def get_tenant_by_name(self, tenant):
uri = None
if not tenant:
uri = self.tenant_getid()
else:
if not common.is_uri(tenant):
uri = self.tenant_query(tenant)
else:
uri = tenant
if not uri:
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
(_("Tenant %s: not found") % tenant))
return uri

View File

@ -0,0 +1,84 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class URIHelper(object):
"""This map will be a map of maps.
e.g for project component type, it will hold a map
of its operations vs their uris
"""
COMPONENT_TYPE_VS_URIS_MAP = dict()
"""Volume URIs."""
VOLUME_URIS_MAP = dict()
URI_VOLUMES = '/block/volumes'
URI_VOLUME = URI_VOLUMES + '/{0}'
URI_VOLUME_TASK_LIST = URI_VOLUME + '/tasks'
URI_VOLUME_TASK = URI_VOLUME_TASK_LIST + '/{1}'
"""Consistencygroup URIs."""
CG_URIS_MAP = dict()
URI_CGS = '/block/consistency-groups'
URI_CG = URI_CGS + '/{0}'
URI_CG_TASK_LIST = URI_CG + '/tasks'
URI_CG_TASK = URI_CG_TASK_LIST + '/{1}'
"""Export Group URIs."""
# Map to hold all export group uris
EXPORT_GROUP_URIS_MAP = dict()
URI_EXPORT_GROUP_TASKS_LIST = '/block/exports/{0}/tasks'
URI_EXPORT_GROUP_TASK = URI_EXPORT_GROUP_TASKS_LIST + '/{1}'
def __init__(self):
"""During initialization of the class, lets fill all the maps."""
self.__fillExportGroupMap()
self.__fillVolumeMap()
self.__fillConsistencyGroupMap()
self.__initializeComponentVsUriMap()
def __call__(self):
return self
def __initializeComponentVsUriMap(self):
self.COMPONENT_TYPE_VS_URIS_MAP["export"] = self.EXPORT_GROUP_URIS_MAP
self.COMPONENT_TYPE_VS_URIS_MAP[
"volume"] = self.VOLUME_URIS_MAP
self.COMPONENT_TYPE_VS_URIS_MAP[
"consistencygroup"] = self.CG_URIS_MAP
def __fillExportGroupMap(self):
self.EXPORT_GROUP_URIS_MAP["task"] = self.URI_EXPORT_GROUP_TASK
def __fillVolumeMap(self):
self.VOLUME_URIS_MAP["task"] = self.URI_VOLUME_TASK
def __fillConsistencyGroupMap(self):
self.CG_URIS_MAP["task"] = self.URI_CG_TASK
def getUri(self, componentType, operationType):
return (
self.COMPONENT_TYPE_VS_URIS_MAP.get(
componentType).get(
operationType)
)
return None
"""Defining the singleton instance.
Use this instance any time the access is required for this module/class
"""
singletonURIHelperInstance = URIHelper()

View File

@ -0,0 +1,79 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
class VirtualArray(common.CoprHDResource):
# Commonly used URIs for the 'varrays' module
URI_VIRTUALARRAY = '/vdc/varrays'
URI_VIRTUALARRAY_BY_VDC_ID = '/vdc/varrays?vdc-id={0}'
URI_VIRTUALARRAY_URI = '/vdc/varrays/{0}'
def varray_query(self, name):
"""Returns the UID of the varray specified by the name."""
if common.is_uri(name):
return name
uris = self.varray_list()
for uri in uris:
varray = self.varray_show(uri)
if varray and varray['name'] == name:
return varray['id']
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
(_("varray %s: not found") % name))
def varray_list(self, vdcname=None):
"""Returns all the varrays in a vdc.
:param vdcname: Name of the Virtual Data Center
:returns: JSON payload of varray list
"""
vdcrestapi = None
if vdcname is not None:
vdcrestapi = VirtualArray.URI_VIRTUALARRAY_BY_VDC_ID.format(
vdcname)
else:
vdcrestapi = VirtualArray.URI_VIRTUALARRAY
(s, h) = common.service_json_request(
self.ipaddr, self.port, "GET",
vdcrestapi, None)
o = common.json_decode(s)
returnlst = []
for item in o['varray']:
returnlst.append(item['id'])
return returnlst
def varray_show(self, label):
"""Makes REST API call to retrieve varray details based on name."""
uri = self.varray_query(label)
(s, h) = common.service_json_request(
self.ipaddr, self.port, "GET",
VirtualArray.URI_VIRTUALARRAY_URI.format(uri),
None)
o = common.json_decode(s)
if 'inactive' in o and o['inactive'] is True:
return None
else:
return o

View File

@ -0,0 +1,77 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
class VirtualPool(common.CoprHDResource):
URI_VPOOL = "/{0}/vpools"
URI_VPOOL_SHOW = URI_VPOOL + "/{1}"
URI_VPOOL_SEARCH = URI_VPOOL + "/search?name={1}"
def vpool_show_uri(self, vpooltype, uri):
"""Makes REST API call and retrieves vpool details based on UUID.
This function will take uri as input and returns with
all parameters of VPOOL like label, urn and type.
:param vpooltype : Type of virtual pool {'block'}
:param uri : unique resource identifier of the vpool
:returns: object containing all the details of vpool
"""
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"GET",
self.URI_VPOOL_SHOW.format(vpooltype, uri), None)
o = common.json_decode(s)
if o['inactive']:
return None
return o
def vpool_query(self, name, vpooltype):
"""Makes REST API call to query the vpool by name and type.
This function will take the VPOOL name and type of VPOOL
as input and get uri of the first occurance of given VPOOL.
:param name: Name of the VPOOL
:param vpooltype: Type of the VPOOL {'block'}
:returns: uri of the given vpool
"""
if common.is_uri(name):
return name
(s, h) = common.service_json_request(
self.ipaddr, self.port, "GET",
self.URI_VPOOL_SEARCH.format(vpooltype, name), None)
o = common.json_decode(s)
if len(o['resource']) > 0:
# Get the Active vpool ID.
for vpool in o['resource']:
if self.vpool_show_uri(vpooltype, vpool['id']) is not None:
return vpool['id']
# Raise not found exception. as we did not find any active vpool.
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
(_("VPool %(name)s ( %(vpooltype)s ) :"
" not found") %
{'name': name,
'vpooltype': vpooltype
}))

View File

@ -0,0 +1,523 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_serialization
from oslo_utils import units
import six
from cinder.i18n import _
from cinder.volume.drivers.coprhd.helpers import commoncoprhdapi as common
from cinder.volume.drivers.coprhd.helpers import consistencygroup
from cinder.volume.drivers.coprhd.helpers import project
from cinder.volume.drivers.coprhd.helpers import virtualarray
from cinder.volume.drivers.coprhd.helpers import virtualpool
class Volume(common.CoprHDResource):
# Commonly used URIs for the 'Volume' module
URI_SEARCH_VOLUMES = '/block/volumes/search?project={0}'
URI_SEARCH_VOLUMES_BY_TAG = '/block/volumes/search?tag={0}'
URI_VOLUMES = '/block/volumes'
URI_VOLUME = URI_VOLUMES + '/{0}'
URI_VOLUME_EXPORTS = URI_VOLUME + '/exports'
URI_BULK_DELETE = URI_VOLUMES + '/deactivate'
URI_DEACTIVATE = URI_VOLUME + '/deactivate'
URI_EXPAND = URI_VOLUME + '/expand'
URI_TAG_VOLUME = URI_VOLUME + "/tags"
URI_VOLUME_CHANGE_VPOOL = URI_VOLUMES + "/vpool-change"
# Protection REST APIs - clone
URI_VOLUME_PROTECTION_FULLCOPIES = (
'/block/volumes/{0}/protection/full-copies')
URI_SNAPSHOT_PROTECTION_FULLCOPIES = (
'/block/snapshots/{0}/protection/full-copies')
URI_VOLUME_CLONE_DETACH = "/block/full-copies/{0}/detach"
# New CG URIs
URI_CG_CLONE = "/block/consistency-groups/{0}/protection/full-copies"
URI_CG_CLONE_DETACH = (
"/block/consistency-groups/{0}/protection/full-copies/{1}/detach")
VOLUMES = 'volumes'
CG = 'consistency-groups'
BLOCK = 'block'
SNAPSHOTS = 'snapshots'
# Lists volumes in a project
def list_volumes(self, project):
"""Makes REST API call to list volumes under a project.
:param project: name of project
:returns: List of volumes uuids in JSON response payload
"""
volume_uris = self.search_volumes(project)
volumes = []
for uri in volume_uris:
volume = self.show_by_uri(uri)
if volume:
volumes.append(volume)
return volumes
def search_volumes(self, project_name):
proj = project.Project(self.ipaddr, self.port)
project_uri = proj.project_query(project_name)
(s, h) = common.service_json_request(self.ipaddr, self.port,
"GET",
Volume.URI_SEARCH_VOLUMES.format(
project_uri),
None)
o = common.json_decode(s)
if not o:
return []
volume_uris = []
resources = common.get_node_value(o, "resource")
for resource in resources:
volume_uris.append(resource["id"])
return volume_uris
# Shows volume information given its uri
def show_by_uri(self, uri):
"""Makes REST API call and retrieves volume details based on UUID.
:param uri: UUID of volume
:returns: Volume details in JSON response payload
"""
(s, h) = common.service_json_request(self.ipaddr, self.port,
"GET",
Volume.URI_VOLUME.format(uri),
None)
o = common.json_decode(s)
inactive = common.get_node_value(o, 'inactive')
if inactive:
return None
return o
# Creates a volume given label, project, vpool and size
def create(self, project_name, label, size, varray, vpool,
sync, consistencygroup, synctimeout=0):
"""Makes REST API call to create volume under a project.
:param project_name : name of the project under which the volume
will be created
:param label : name of volume
:param size : size of volume
:param varray : name of varray
:param vpool : name of vpool
:param sync : synchronous request
:param consistencygroup : To create volume under a consistencygroup
:param synctimeout : Query for task status for "synctimeout" secs.
If the task doesn't complete in synctimeout
secs, an exception is thrown
:returns: Created task details in JSON response payload
"""
proj_obj = project.Project(self.ipaddr, self.port)
project_uri = proj_obj.project_query(project_name)
vpool_obj = virtualpool.VirtualPool(self.ipaddr, self.port)
vpool_uri = vpool_obj.vpool_query(vpool, "block")
varray_obj = virtualarray.VirtualArray(self.ipaddr, self.port)
varray_uri = varray_obj.varray_query(varray)
request = {
'name': label,
'size': size,
'varray': varray_uri,
'project': project_uri,
'vpool': vpool_uri,
'count': 1
}
if consistencygroup:
request['consistency_group'] = consistencygroup
body = oslo_serialization.jsonutils.dumps(request)
(s, h) = common.service_json_request(self.ipaddr, self.port,
"POST",
Volume.URI_VOLUMES,
body)
o = common.json_decode(s)
if sync:
# check task empty
if len(o["task"]) > 0:
task = o["task"][0]
return self.check_for_sync(task, sync, synctimeout)
else:
raise common.CoprHdError(
common.CoprHdError.SOS_FAILURE_ERR,
_("error: task list is empty, no task response found"))
else:
return o
# Blocks the operation until the task is complete/error out/timeout
def check_for_sync(self, result, sync, synctimeout=0):
if sync:
if len(result["resource"]) > 0:
resource = result["resource"]
return (
common.block_until_complete("volume", resource["id"],
result["id"], self.ipaddr,
self.port, synctimeout)
)
else:
raise common.CoprHdError(
common.CoprHdError.SOS_FAILURE_ERR,
_("error: task list is empty, no task response found"))
else:
return result
# Queries a volume given its name
def volume_query(self, full_project_name, volume_name):
"""Makes REST API call to query the volume by name.
:param volume_name: name of volume
:param full_project_name: Full project path
:returns: Volume details in JSON response payload
"""
if common.is_uri(volume_name):
return volume_name
if not full_project_name:
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
_("Project name not specified"))
uris = self.search_volumes(full_project_name)
for uri in uris:
volume = self.show_by_uri(uri)
if volume and 'name' in volume and volume['name'] == volume_name:
return volume['id']
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
(_("Volume"
"%s: not found") % volume_name))
def get_storageAttributes(self, volume_name, cg_name, snapshot_name=None):
storageres_type = None
storageres_typename = None
if snapshot_name is not None:
storageres_type = Volume.BLOCK
storageres_typename = Volume.SNAPSHOTS
elif volume_name is not None:
storageres_type = Volume.BLOCK
storageres_typename = Volume.VOLUMES
elif cg_name is not None:
storageres_type = Volume.BLOCK
storageres_typename = Volume.CG
else:
storageres_type = None
storageres_typename = None
return (storageres_type, storageres_typename)
def storage_resource_query(self,
storageres_type,
volume_name,
cg_name,
snapshot_name,
project,
tenant):
resourcepath = "/" + project
if tenant is not None:
resourcepath = tenant + resourcepath
resUri = None
resourceObj = None
if Volume.BLOCK == storageres_type and volume_name is not None:
resUri = self.volume_query(resourcepath, volume_name)
if snapshot_name is not None:
from cinder.volume.drivers.coprhd.helpers import snapshot
snapobj = snapshot.Snapshot(self.ipaddr, self.port)
resUri = snapobj.snapshot_query(storageres_type,
Volume.VOLUMES, resUri,
snapshot_name)
elif Volume.BLOCK == storageres_type and cg_name is not None:
resourceObj = consistencygroup.ConsistencyGroup(
self.ipaddr, self.port)
resUri = resourceObj.consistencygroup_query(
cg_name,
project,
tenant)
else:
resourceObj = None
return resUri
# Creates volume(s) from given source volume
def clone(self, new_vol_name, resource_uri,
sync, synctimeout=0):
"""Makes REST API call to clone volume.
:param new_vol_name: name of volume
:param resource_uri: uri of source volume
:param sync : synchronous request
:param synctimeout : Query for task status for "synctimeout" secs.
If the task doesn't complete in synctimeout
secs, an exception is thrown
:returns: Created task details in JSON response payload
"""
from cinder.volume.drivers.coprhd.helpers import snapshot
snap_obj = snapshot.Snapshot(self.ipaddr, self.port)
is_snapshot_clone = False
clone_full_uri = None
# consistency group
if resource_uri.find("BlockConsistencyGroup") > 0:
clone_full_uri = Volume.URI_CG_CLONE.format(resource_uri)
elif resource_uri.find("BlockSnapshot") > 0:
is_snapshot_clone = True
clone_full_uri = (
Volume.URI_SNAPSHOT_PROTECTION_FULLCOPIES.format(resource_uri))
else:
clone_full_uri = (
Volume.URI_VOLUME_PROTECTION_FULLCOPIES.format(resource_uri))
request = {
'name': new_vol_name,
'type': None,
'count': 1
}
request["count"] = 1
body = oslo_serialization.jsonutils.dumps(request)
(s, h) = common.service_json_request(self.ipaddr, self.port,
"POST",
clone_full_uri,
body)
o = common.json_decode(s)
if sync:
task = o["task"][0]
if is_snapshot_clone:
return (
snap_obj.block_until_complete(
"block",
task["resource"]["id"],
task["id"])
)
else:
return self.check_for_sync(task, sync, synctimeout)
else:
return o
# To check whether a cloned volume is in detachable state or not
def is_volume_detachable(self, full_project_name, name):
volume_uri = self.volume_query(full_project_name, name)
vol = self.show_by_uri(volume_uri)
# Filtering based on "replicaState" attribute value of Cloned volume.
# If "replicaState" value is "SYNCHRONIZED" then only Cloned volume
# would be in detachable state.
if(vol and 'protection' in vol and
'full_copies' in vol['protection'] and
'replicaState' in vol['protection']['full_copies']):
if(vol['protection']['full_copies']['replicaState'] ==
'SYNCHRONIZED'):
return True
return False
return False
def volume_clone_detach(self, resource_uri, full_project_name,
name, sync, synctimeout=0):
volume_uri = self.volume_query(full_project_name, name)
# consistency group
if resource_uri.find("BlockConsistencyGroup") > 0:
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"POST",
Volume.URI_CG_CLONE_DETACH.format(
resource_uri,
volume_uri), None)
else:
(s, h) = common.service_json_request(
self.ipaddr, self.port,
"POST",
Volume.URI_VOLUME_CLONE_DETACH.format(volume_uri), None)
o = common.json_decode(s)
if sync:
task = o["task"][0]
return self.check_for_sync(task, sync, synctimeout)
else:
return o
# Shows volume information given its name
def show(self, full_project_name, name):
"""Retrieves volume details based on volume name.
:param full_project_name : project path of the volume
:param name: name of the volume. If the volume is under a project,
then full XPath needs to be specified.
Example: If VOL1 is a volume under project PROJ1, then the name
of volume is PROJ1/VOL1
:returns: Volume details in JSON response payload
"""
if common.is_uri(name):
return name
if full_project_name is None:
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
(_("Volume %s : not found") %
six.text_type(name)))
uris = self.search_volumes(full_project_name)
for uri in uris:
volume = self.show_by_uri(uri)
if volume and 'name' in volume and volume['name'] == name:
return volume
raise common.CoprHdError(common.CoprHdError.NOT_FOUND_ERR,
(_("Volume"
" %s : not found") % six.text_type(name)))
def expand(self, full_project_name, volume_name, new_size,
sync=False, synctimeout=0):
volume_detail = self.show(full_project_name, volume_name)
from decimal import Decimal
new_size_in_gb = Decimal(Decimal(new_size) / (units.Gi))
current_size = Decimal(volume_detail["provisioned_capacity_gb"])
if new_size_in_gb <= current_size:
raise common.CoprHdError(
common.CoprHdError.VALUE_ERR,
(_("error: Incorrect value of new size: %(new_size_in_gb)s"
" GB\nNew size must be greater than current size: "
"%(current_size)s GB") % {'new_size_in_gb': new_size_in_gb,
'current_size': current_size}))
body = oslo_serialization.jsonutils.dumps({
"new_size": new_size
})
(s, h) = common.service_json_request(self.ipaddr, self.port,
"POST",
Volume.URI_EXPAND.format(
volume_detail["id"]),
body)
if not s:
return None
o = common.json_decode(s)
if sync:
return self.check_for_sync(o, sync, synctimeout)
return o
# Deletes a volume given a volume name
def delete(self, full_project_name, name, sync=False,
force_delete=False, coprhdonly=False, synctimeout=0):
"""Deletes a volume based on volume name.
:param full_project_name: project name
:param name : name of volume to be deleted
:param sync : synchronous request
:param force_delete: if true, it will force the delete of internal
volumes that have the SUPPORTS_FORCE flag
:param coprhdonly : to delete volumes from coprHD only
:param synctimeout: Query for task status for "synctimeout" secs. If
the task doesn't complete in synctimeout secs, an
exception is thrown
"""
volume_uri = self.volume_query(full_project_name, name)
return self.delete_by_uri(volume_uri, sync, force_delete,
coprhdonly, synctimeout)
# Deletes a volume given a volume uri
def delete_by_uri(self, uri, sync=False,
force_delete=False, coprhdonly=False, synctimeout=0):
"""Deletes a volume based on volume uri."""
params = ''
if force_delete:
params += '&' if ('?' in params) else '?'
params += "force=" + "true"
if coprhdonly is True:
params += '&' if ('?' in params) else '?'
params += "type=" + 'CoprHD_ONLY'
(s, h) = common.service_json_request(self.ipaddr, self.port,
"POST",
Volume.URI_DEACTIVATE.format(
uri) + params,
None)
if not s:
return None
o = common.json_decode(s)
if sync:
return self.check_for_sync(o, sync, synctimeout)
return o
# Gets the exports info given a volume uri
def get_exports_by_uri(self, uri):
"""Makes REST API call to get exports info of a volume.
:param uri: URI of the volume
:returns: Exports details in JSON response payload
"""
(s, h) = common.service_json_request(self.ipaddr, self.port,
"GET",
Volume.URI_VOLUME_EXPORTS.format(
uri),
None)
return common.json_decode(s)
# Update a volume information
# Changed the volume vpool
def update(self, prefix_path, name, vpool):
"""Makes REST API call to update a volume information.
:param name: name of the volume to be updated
:param vpool: name of vpool
:returns: Created task details in JSON response payload
"""
namelist = []
if type(name) is list:
namelist = name
else:
namelist.append(name)
volumeurilist = []
for item in namelist:
volume_uri = self.volume_query(prefix_path, item)
volumeurilist.append(volume_uri)
vpool_obj = virtualpool.VirtualPool(self.ipaddr, self.port)
vpool_uri = vpool_obj.vpool_query(vpool, "block")
params = {
'vpool': vpool_uri,
'volumes': volumeurilist
}
body = oslo_serialization.jsonutils.dumps(params)
(s, h) = common.service_json_request(
self.ipaddr, self.port, "POST",
Volume.URI_VOLUME_CHANGE_VPOOL,
body)
o = common.json_decode(s)
return o

View File

@ -0,0 +1,173 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Driver for EMC CoprHD iSCSI volumes."""
from oslo_log import log as logging
from cinder import interface
from cinder.volume import driver
from cinder.volume.drivers.coprhd import common as coprhd_common
LOG = logging.getLogger(__name__)
@interface.volumedriver
class EMCCoprHDISCSIDriver(driver.ISCSIDriver):
"""CoprHD iSCSI Driver."""
def __init__(self, *args, **kwargs):
super(EMCCoprHDISCSIDriver, self).__init__(*args, **kwargs)
self.common = self._get_common_driver()
def _get_common_driver(self):
return coprhd_common.EMCCoprHDDriverCommon(
protocol='iSCSI',
default_backend_name=self.__class__.__name__,
configuration=self.configuration)
def check_for_setup_error(self):
self.common.check_for_setup_error()
def create_volume(self, volume):
"""Creates a Volume."""
self.common.create_volume(volume, self)
self.common.set_volume_tags(volume, ['_obj_volume_type'])
def create_cloned_volume(self, volume, src_vref):
"""Creates a cloned Volume."""
self.common.create_cloned_volume(volume, src_vref)
self.common.set_volume_tags(volume, ['_obj_volume_type'])
def create_volume_from_snapshot(self, volume, snapshot):
"""Creates a volume from a snapshot."""
self.common.create_volume_from_snapshot(snapshot, volume)
self.common.set_volume_tags(volume, ['_obj_volume_type'])
def extend_volume(self, volume, new_size):
"""expands the size of the volume."""
self.common.expand_volume(volume, new_size)
def delete_volume(self, volume):
"""Deletes an volume."""
self.common.delete_volume(volume)
def create_snapshot(self, snapshot):
"""Creates a snapshot."""
self.common.create_snapshot(snapshot)
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""
self.common.delete_snapshot(snapshot)
def ensure_export(self, context, volume):
"""Driver entry point to get the export info for an existing volume."""
pass
def create_export(self, context, volume, connector=None):
"""Driver entry point to get the export info for a new volume."""
pass
def remove_export(self, context, volume):
"""Driver entry point to remove an export for a volume."""
pass
def create_consistencygroup(self, context, group):
"""Creates a consistencygroup."""
return self.common.create_consistencygroup(context, group)
def delete_consistencygroup(self, context, group, volumes):
"""Deletes a consistency group."""
return self.common.delete_consistencygroup(context, group, volumes)
def update_consistencygroup(self, context, group,
add_volumes, remove_volumes):
"""Updates volumes in consistency group."""
return self.common.update_consistencygroup(group, add_volumes,
remove_volumes)
def create_cgsnapshot(self, context, cgsnapshot, snapshots):
"""Creates a cgsnapshot."""
return self.common.create_cgsnapshot(cgsnapshot, snapshots)
def delete_cgsnapshot(self, context, cgsnapshot, snapshots):
"""Deletes a cgsnapshot."""
return self.common.delete_cgsnapshot(cgsnapshot, snapshots)
def check_for_export(self, context, volume_id):
"""Make sure volume is exported."""
pass
def initialize_connection(self, volume, connector):
"""Initializes the connection and returns connection info."""
initiator_ports = []
initiator_ports.append(connector['initiator'])
itls = self.common.initialize_connection(volume,
'iSCSI',
initiator_ports,
connector['host'])
properties = {}
properties['target_discovered'] = False
properties['volume_id'] = volume['id']
if itls:
properties['target_iqn'] = itls[0]['target']['port']
properties['target_portal'] = '%s:%s' % (
itls[0]['target']['ip_address'],
itls[0]['target']['tcp_port'])
properties['target_lun'] = itls[0]['hlu']
auth = volume['provider_auth']
if auth:
(auth_method, auth_username, auth_secret) = auth.split()
properties['auth_method'] = auth_method
properties['auth_username'] = auth_username
properties['auth_password'] = auth_secret
LOG.debug("ISCSI properties: %s", properties)
return {
'driver_volume_type': 'iscsi',
'data': properties,
}
def terminate_connection(self, volume, connector, **kwargs):
"""Disallow connection from connector."""
init_ports = []
init_ports.append(connector['initiator'])
self.common.terminate_connection(volume,
'iSCSI',
init_ports,
connector['host'])
def get_volume_stats(self, refresh=False):
"""Get volume status.
If 'refresh' is True, run update the stats first.
"""
if refresh:
self.update_volume_stats()
return self._stats
def update_volume_stats(self):
"""Retrieve stats info from virtual pool/virtual array."""
LOG.debug("Updating volume stats")
self._stats = self.common.update_volume_stats()
def retype(self, ctxt, volume, new_type, diff, host):
"""Change the volume type."""
return self.common.retype(ctxt, volume, new_type, diff, host)

View File

@ -0,0 +1,324 @@
# Copyright (c) 2016 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Driver for EMC CoprHD ScaleIO volumes."""
from oslo_config import cfg
from oslo_log import log as logging
import requests
import six
from six.moves import urllib
from cinder import exception
from cinder.i18n import _
from cinder.i18n import _LI
from cinder import interface
from cinder.volume import driver
from cinder.volume.drivers.coprhd import common as coprhd_common
LOG = logging.getLogger(__name__)
scaleio_opts = [
cfg.StrOpt('coprhd_scaleio_rest_gateway_ip',
default='None',
help='Rest Gateway for Scaleio'),
cfg.PortOpt('coprhd_scaleio_rest_gateway_port',
default=4984,
help='Rest Gateway Port for Scaleio'),
cfg.StrOpt('coprhd_scaleio_rest_server_username',
default=None,
help='Username for Rest Gateway'),
cfg.StrOpt('coprhd_scaleio_rest_server_password',
default=None,
help='Rest Gateway Password',
secret=True),
cfg.BoolOpt('scaleio_verify_server_certificate',
default=False,
help='verify server certificate'),
cfg.StrOpt('scaleio_server_certificate_path',
default=None,
help='Server certificate path')
]
CONF = cfg.CONF
CONF.register_opts(scaleio_opts)
@interface.volumedriver
class EMCCoprHDScaleIODriver(driver.VolumeDriver):
"""CoprHD ScaleIO Driver."""
server_token = None
def __init__(self, *args, **kwargs):
super(EMCCoprHDScaleIODriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(scaleio_opts)
self.common = self._get_common_driver()
def _get_common_driver(self):
return coprhd_common.EMCCoprHDDriverCommon(
protocol='scaleio',
default_backend_name=self.__class__.__name__,
configuration=self.configuration)
def check_for_setup_error(self):
self.common.check_for_setup_error()
if (self.configuration.scaleio_verify_server_certificate is True and
self.configuration.scaleio_server_certificate_path is None):
message = _("scaleio_verify_server_certificate is True but"
" scaleio_server_certificate_path is not provided"
" in cinder configuration")
raise exception.VolumeBackendAPIException(data=message)
def create_volume(self, volume):
"""Creates a Volume."""
self.common.create_volume(volume, self, True)
self.common.set_volume_tags(volume, ['_obj_volume_type'], True)
vol_size = self._update_volume_size(int(volume['size']))
return {'size': vol_size}
def _update_volume_size(self, vol_size):
"""update the openstack volume size."""
default_size = 8
if (vol_size % default_size) != 0:
return (vol_size / default_size) * default_size + default_size
else:
return vol_size
def create_cloned_volume(self, volume, src_vref):
"""Creates a cloned Volume."""
self.common.create_cloned_volume(volume, src_vref, True)
self.common.set_volume_tags(volume, ['_obj_volume_type'], True)
def create_volume_from_snapshot(self, volume, snapshot):
"""Creates a volume from a snapshot."""
self.common.create_volume_from_snapshot(snapshot, volume, True)
self.common.set_volume_tags(volume, ['_obj_volume_type'], True)
def extend_volume(self, volume, new_size):
"""expands the size of the volume."""
self.common.expand_volume(volume, new_size)
def delete_volume(self, volume):
"""Deletes an volume."""
self.common.delete_volume(volume)
def create_snapshot(self, snapshot):
"""Creates a snapshot."""
self.common.create_snapshot(snapshot, True)
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""
self.common.delete_snapshot(snapshot)
def ensure_export(self, context, volume):
"""Driver entry point to get the export info for an existing volume."""
pass
def create_export(self, context, volume, connector=None):
"""Driver entry point to get the export info for a new volume."""
pass
def remove_export(self, context, volume):
"""Driver exntry point to remove an export for a volume."""
pass
def create_consistencygroup(self, context, group):
"""Creates a consistencygroup."""
return self.common.create_consistencygroup(context, group, True)
def update_consistencygroup(self, context, group,
add_volumes, remove_volumes):
"""Updates volumes in consistency group."""
return self.common.update_consistencygroup(group, add_volumes,
remove_volumes)
def delete_consistencygroup(self, context, group, volumes):
"""Deletes a consistency group."""
return self.common.delete_consistencygroup(context, group,
volumes, True)
def create_cgsnapshot(self, context, cgsnapshot, snapshots):
"""Creates a cgsnapshot."""
return self.common.create_cgsnapshot(cgsnapshot, snapshots, True)
def delete_cgsnapshot(self, context, cgsnapshot, snapshots):
"""Deletes a cgsnapshot."""
return self.common.delete_cgsnapshot(cgsnapshot, snapshots, True)
def check_for_export(self, context, volume_id):
"""Make sure volume is exported."""
pass
def initialize_connection(self, volume, connector):
"""Initializes the connection and returns connection info."""
volname = self.common._get_resource_name(volume, True)
properties = {}
properties['scaleIO_volname'] = volname
properties['hostIP'] = connector['ip']
properties[
'serverIP'] = self.configuration.coprhd_scaleio_rest_gateway_ip
properties[
'serverPort'] = self.configuration.coprhd_scaleio_rest_gateway_port
properties[
'serverUsername'] = (
self.configuration.coprhd_scaleio_rest_server_username)
properties[
'serverPassword'] = (
self.configuration.coprhd_scaleio_rest_server_password)
properties['iopsLimit'] = None
properties['bandwidthLimit'] = None
properties['serverToken'] = self.server_token
initiatorPorts = []
initiatorPort = self._get_client_id(properties['serverIP'],
properties['serverPort'],
properties['serverUsername'],
properties['serverPassword'],
properties['hostIP'])
initiatorPorts.append(initiatorPort)
properties['serverToken'] = self.server_token
self.common.initialize_connection(volume,
'scaleio',
initiatorPorts,
connector['host'])
dictobj = {
'driver_volume_type': 'scaleio',
'data': properties
}
return dictobj
def terminate_connection(self, volume, connector, **kwargs):
"""Disallow connection from connector."""
volname = volume['display_name']
properties = {}
properties['scaleIO_volname'] = volname
properties['hostIP'] = connector['ip']
properties[
'serverIP'] = self.configuration.coprhd_scaleio_rest_gateway_ip
properties[
'serverPort'] = self.configuration.coprhd_scaleio_rest_gateway_port
properties[
'serverUsername'] = (
self.configuration.coprhd_scaleio_rest_server_username)
properties[
'serverPassword'] = (
self.configuration.coprhd_scaleio_rest_server_password)
properties['serverToken'] = self.server_token
initiatorPort = self._get_client_id(properties['serverIP'],
properties['serverPort'],
properties['serverUsername'],
properties['serverPassword'],
properties['hostIP'])
initPorts = []
initPorts.append(initiatorPort)
self.common.terminate_connection(volume,
'scaleio',
initPorts,
connector['host'])
def get_volume_stats(self, refresh=False):
"""Get volume status.
If 'refresh' is True, run update the stats first.
"""
if refresh:
self.update_volume_stats()
return self._stats
def update_volume_stats(self):
"""Retrieve stats info from virtual pool/virtual array."""
LOG.debug("Updating volume stats")
self._stats = self.common.update_volume_stats()
def _get_client_id(self, server_ip, server_port, server_username,
server_password, sdc_ip):
ip_encoded = urllib.parse.quote(sdc_ip, '')
ip_double_encoded = urllib.parse.quote(ip_encoded, '')
request = ("https://%s:%s/api/types/Sdc/instances/getByIp::%s/" %
(server_ip, six.text_type(server_port), ip_double_encoded))
LOG.info(_LI("ScaleIO get client id by ip request: %s"), request)
if self.configuration.scaleio_verify_server_certificate:
verify_cert = self.configuration.scaleio_server_certificate_path
else:
verify_cert = False
r = requests.get(
request, auth=(server_username, self.server_token),
verify=verify_cert)
r = self._check_response(
r, request, server_ip, server_port,
server_username, server_password)
sdc_id = r.json()
if not sdc_id:
msg = (_("Client with ip %s wasn't found ") % sdc_ip)
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
if r.status_code != 200 and "errorCode" in sdc_id:
msg = (_("Error getting sdc id from ip %(sdc_ip)s:"
" %(sdc_id_message)s") % {'sdc_ip': sdc_ip,
'sdc_id_message': sdc_id[
'message']})
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
LOG.info(_LI("ScaleIO sdc id is %s"), sdc_id)
return sdc_id
def _check_response(self, response, request,
server_ip, server_port,
server_username, server_password):
if response.status_code == 401 or response.status_code == 403:
LOG.info(
_LI("Token is invalid, going to re-login and get a new one"))
login_request = ("https://%s:%s/api/login" %
(server_ip, six.text_type(server_port)))
if self.configuration.scaleio_verify_server_certificate:
verify_cert = (
self.configuration.scaleio_server_certificate_path)
else:
verify_cert = False
r = requests.get(
login_request, auth=(server_username, server_password),
verify=verify_cert)
token = r.json()
self.server_token = token
# repeat request with valid token
LOG.info(_LI("Going to perform request again %s with valid token"),
request)
res = requests.get(
request, auth=(server_username, self.server_token),
verify=verify_cert)
return res
return response
def retype(self, ctxt, volume, new_type, diff, host):
"""Change the volume type."""
return self.common.retype(ctxt, volume, new_type, diff, host)

View File

@ -0,0 +1,2 @@
features:
- Added volume backend drivers for CoprHD FC, iSCSI and Scaleio.