From 2c2b42a307513be87e9faf262cf2394ec346e683 Mon Sep 17 00:00:00 2001 From: Xing Yang Date: Tue, 26 May 2015 23:52:54 -0400 Subject: [PATCH] Add connector driver for the ScaleIO cinder driver This patch adds a connector driver for the ScaleIO cinder driver. This is needed for attaching/detaching volumes once Nova has switched to use os-brick. A Nova spec was submitted earlier for the ScaleIO libvirt volume driver. Since Nova is moving to use os-brick, I'm including it as a reference here if needed: https://review.openstack.org/#/c/181941/ partial-implements blueprint scaleio-cinder-volume-driver Change-Id: I1bfdb2c015cc0627d341bd6c3e31e2ae27cca3cc --- os_brick/initiator/connector.py | 433 ++++++++++++++++++++- os_brick/tests/initiator/test_connector.py | 213 ++++++++++ 2 files changed, 645 insertions(+), 1 deletion(-) diff --git a/os_brick/initiator/connector.py b/os_brick/initiator/connector.py index c75695f87..52a656cbb 100644 --- a/os_brick/initiator/connector.py +++ b/os_brick/initiator/connector.py @@ -21,9 +21,11 @@ each of the supported transport protocols. """ import copy +import json import os import platform import re +import requests import socket import sys import time @@ -34,18 +36,21 @@ from oslo_log import log as logging from oslo_service import loopingcall from oslo_utils import strutils import six +from six.moves import urllib S390X = "s390x" S390 = "s390" from os_brick import exception from os_brick import executor +from os_brick import utils + from os_brick.initiator import host_driver from os_brick.initiator import linuxfc from os_brick.initiator import linuxrbd from os_brick.initiator import linuxscsi from os_brick.remotefs import remotefs -from os_brick.i18n import _, _LE, _LW +from os_brick.i18n import _, _LE, _LI, _LW LOG = logging.getLogger(__name__) @@ -208,6 +213,14 @@ class InitiatorConnector(executor.Executor): execute=execute, device_scan_attempts=device_scan_attempts, *args, **kwargs) + elif protocol == "SCALEIO": + return ScaleIOConnector( + root_helper=root_helper, + driver=driver, + execute=execute, + device_scan_attempts=device_scan_attempts, + *args, **kwargs + ) else: msg = (_("Invalid InitiatorConnector protocol " "specified %(protocol)s") % @@ -1531,3 +1544,421 @@ class HGSTConnector(InitiatorConnector): msg = (_("Unable to set apphost for space %s") % connection_properties['name']) raise exception.BrickException(message=msg) + + +class ScaleIOConnector(InitiatorConnector): + """Class implements the connector driver for ScaleIO.""" + OK_STATUS_CODE = 200 + VOLUME_NOT_MAPPED_ERROR = 84 + VOLUME_ALREADY_MAPPED_ERROR = 81 + GET_GUID_CMD = ['drv_cfg', '--query_guid'] + + def __init__(self, root_helper, driver=None, execute=putils.execute, + device_scan_attempts=DEVICE_SCAN_ATTEMPTS_DEFAULT, + *args, **kwargs): + super(ScaleIOConnector, self).__init__( + root_helper, + driver=driver, + execute=execute, + device_scan_attempts=device_scan_attempts, + *args, **kwargs + ) + + self.local_sdc_ip = None + self.server_ip = None + self.server_port = None + self.server_username = None + self.server_password = None + self.server_token = None + self.volume_id = None + self.volume_name = None + self.volume_path = None + self.iops_limit = None + self.bandwidth_limit = None + + def _find_volume_path(self): + LOG.info(_LI( + "Looking for volume %(volume_id)s, maximum tries: %(tries)s"), + {'volume_id': self.volume_id, 'tries': self.device_scan_attempts} + ) + + # look for the volume in /dev/disk/by-id directory + by_id_path = "/dev/disk/by-id" + + disk_filename = self._wait_for_volume_path(by_id_path) + full_disk_name = ("%(path)s/%(filename)s" % + {'path': by_id_path, 'filename': disk_filename}) + LOG.info(_LI("Full disk name is %(full_path)s"), + {'full_path': full_disk_name}) + return full_disk_name + + # NOTE: Usually 3 retries is enough to find the volume. + # If there are network issues, it could take much longer. Set + # the max retries to 15 to make sure we can find the volume. + @utils.retry(exceptions=exception.BrickException, + retries=15, + backoff_rate=1) + def _wait_for_volume_path(self, path): + if not os.path.isdir(path): + msg = ( + _("ScaleIO volume %(volume_id)s not found at " + "expected path.") % {'volume_id': self.volume_id} + ) + + LOG.debug(msg) + raise exception.BrickException(message=msg) + + disk_filename = None + filenames = os.listdir(path) + LOG.info(_LI( + "Files found in %(path)s path: %(files)s "), + {'path': path, 'files': filenames} + ) + + for filename in filenames: + if (filename.startswith("emc-vol") and + filename.endswith(self.volume_id)): + disk_filename = filename + break + + if not disk_filename: + msg = (_("ScaleIO volume %(volume_id)s not found.") % + {'volume_id': self.volume_id}) + LOG.debug(msg) + raise exception.BrickException(message=msg) + + return disk_filename + + def _get_client_id(self): + request = ( + "https://%(server_ip)s:%(server_port)s/" + "api/types/Client/instances/getByIp::%(sdc_ip)s/" % + { + 'server_ip': self.server_ip, + 'server_port': self.server_port, + 'sdc_ip': self.local_sdc_ip + } + ) + + LOG.info(_LI("ScaleIO get client id by ip request: %(request)s"), + {'request': request}) + + r = requests.get( + request, + auth=(self.server_username, self.server_token), + verify=False + ) + + r = self._check_response(r, request) + sdc_id = r.json() + if not sdc_id: + msg = (_("Client with ip %(sdc_ip)s was not found.") % + {'sdc_ip': self.local_sdc_ip}) + raise exception.BrickException(message=msg) + + if r.status_code != 200 and "errorCode" in sdc_id: + msg = (_("Error getting sdc id from ip %(sdc_ip): %(err)s") % + {'sdc_ip': self.local_sdc_ip, 'err': sdc_id['message']}) + + LOG.error(msg) + raise exception.BrickException(message=msg) + + LOG.info(_LI("ScaleIO sdc id is %(sdc_id)s."), + {'sdc_id': sdc_id}) + return sdc_id + + def _get_volume_id(self): + volname_encoded = urllib.parse.quote(self.volume_name, '') + volname_double_encoded = urllib.parse.quote(volname_encoded, '') + LOG.debug(_( + "Volume name after double encoding is %(volume_name)s."), + {'volume_name': volname_double_encoded} + ) + + request = ( + "https://%(server_ip)s:%(server_port)s/api/types/Volume/instances" + "/getByName::%(encoded_volume_name)s" % + { + 'server_ip': self.server_ip, + 'server_port': self.server_port, + 'encoded_volume_name': volname_double_encoded + } + ) + + LOG.info( + _LI("ScaleIO get volume id by name request: %(request)s"), + {'request': request} + ) + + r = requests.get(request, + auth=(self.server_username, self.server_token), + verify=False) + + r = self._check_response(r, request) + + volume_id = r.json() + if not volume_id: + msg = (_("Volume with name %(volume_name)s wasn't found.") % + {'volume_name': self.volume_name}) + + LOG.error(msg) + raise exception.BrickException(message=msg) + + if r.status_code != self.OK_STATUS_CODE and "errorCode" in volume_id: + msg = ( + _("Error getting volume id from name %(volume_name)s: " + "%(err)s") % + {'volume_name': self.volume_name, 'err': volume_id['message']} + ) + + LOG.error(msg) + raise exception.BrickException(message=msg) + + LOG.info(_LI("ScaleIO volume id is %(volume_id)s."), + {'volume_id': volume_id}) + return volume_id + + def _check_response(self, response, request, is_get_request=True, + params=None): + if response.status_code == 401 or response.status_code == 403: + LOG.info(_LI("Token is invalid, " + "going to re-login to get a new one")) + + login_request = ( + "https://%(server_ip)s:%(server_port)s/api/login" % + {'server_ip': self.server_ip, 'server_port': self.server_port} + ) + + r = requests.get( + login_request, + auth=(self.server_username, self.server_password), + verify=False + ) + + token = r.json() + # repeat request with valid token + LOG.debug(_("Going to perform request %(request)s again " + "with valid token"), {'request': request}) + + if is_get_request: + res = requests.get(request, + auth=(self.server_username, token), + verify=False) + else: + headers = {'content-type': 'application/json'} + res = requests.post( + request, + data=json.dumps(params), + headers=headers, + auth=(self.server_username, token), + verify=False + ) + + self.server_token = token + return res + + return response + + def get_config(self, connection_properties): + self.local_sdc_ip = connection_properties['hostIP'] + self.volume_name = connection_properties['scaleIO_volname'] + self.server_ip = connection_properties['serverIP'] + self.server_port = connection_properties['serverPort'] + self.server_username = connection_properties['serverUsername'] + self.server_password = connection_properties['serverPassword'] + self.server_token = connection_properties['serverToken'] + self.iops_limit = connection_properties['iopsLimit'] + self.bandwidth_limit = connection_properties['bandwidthLimit'] + device_info = {'type': 'block', + 'path': self.volume_path} + return device_info + + def connect_volume(self, connection_properties): + """Connect the volume.""" + device_info = self.get_config(connection_properties) + LOG.debug( + _( + "scaleIO Volume name: %(volume_name)s, SDC IP: %(sdc_ip)s, " + "REST Server IP: %(server_ip)s, " + "REST Server username: %(username)s, " + "iops limit:%(iops_limit)s, " + "bandwidth limit: %(bandwidth_limit)s." + ), { + 'volume_name': self.volume_name, + 'sdc_ip': self.local_sdc_ip, + 'server_ip': self.server_ip, + 'username': self.server_username, + 'iops_limit': self.iops_limit, + 'bandwidth_limit': self.bandwidth_limit + } + ) + + LOG.info(_LI("ScaleIO sdc query guid command: %(cmd)s"), + {'cmd': self.GET_GUID_CMD}) + + try: + (out, err) = self._execute(*self.GET_GUID_CMD, run_as_root=True, + root_helper=self._root_helper) + + LOG.info(_LI("Map volume %(cmd)s: stdout=%(out)s " + "stderr=%(err)s"), + {'cmd': self.GET_GUID_CMD, 'out': out, 'err': err}) + + except putils.ProcessExecutionError as e: + msg = (_("Error querying sdc guid: %(err)s") % {'err': e.stderr}) + LOG.error(msg) + raise exception.BrickException(message=msg) + + guid = out + LOG.info(_LI("Current sdc guid: %(guid)s"), {'guid': guid}) + params = {'guid': guid, 'allowMultipleMappings': 'TRUE'} + self.volume_id = self._get_volume_id() + + headers = {'content-type': 'application/json'} + request = ( + "https://%(server_ip)s:%(server_port)s/api/instances/" + "Volume::%(volume_id)s/action/addMappedSdc" % + {'server_ip': self.server_ip, 'server_port': self.server_port, + 'volume_id': self.volume_id} + ) + + LOG.info(_LI("map volume request: %(request)s"), {'request': request}) + r = requests.post( + request, + data=json.dumps(params), + headers=headers, + auth=(self.server_username, self.server_token), + verify=False + ) + + r = self._check_response(r, request, False, params) + if r.status_code != self.OK_STATUS_CODE: + response = r.json() + error_code = response['errorCode'] + if error_code == self.VOLUME_ALREADY_MAPPED_ERROR: + LOG.warning(_LW( + "Ignoring error mapping volume %(volume_name)s: " + "volume already mapped."), + {'volume_name': self.volume_name} + ) + else: + msg = ( + _("Error mapping volume %(volume_name)s: %(err)s") % + {'volume_name': self.volume_name, + 'err': response['message']} + ) + + LOG.error(msg) + raise exception.BrickException(message=msg) + + self.volume_path = self._find_volume_path() + device_info['path'] = self.volume_path + + # Set QoS settings after map was performed + if self.iops_limit is not None or self.bandwidth_limit is not None: + params = {'guid': guid} + if self.bandwidth_limit is not None: + params['bandwidthLimitInKbps'] = self.bandwidth_limit + if self.iops_limit is not None: + params['iops_limit'] = self.iops_limit + + request = ( + "https://%(server_ip)s:%(server_port)s/api/instances/" + "Volume::%(volume_id)s/action/setMappedSdcLimits" % + {'server_ip': self.server_ip, 'server_port': self.server_port, + 'volume_id': self.volume_id} + ) + + LOG.info(_LI("Set client limit request: %(request)s"), + {'request': request}) + + r = requests.post( + request, + data=json.dumps(params), + headers=headers, + auth=(self.server_username, self.server_token), + verify=False + ) + r = self._check_response(r, request, False, params) + if r.status_code != self.OK_STATUS_CODE: + response = r.json() + LOG.info(_LI("Set client limit response: %(response)s"), + {'response': response}) + msg = ( + _("Error setting client limits for volume " + "%(volume_name)s: %(err)s") % + {'volume_name': self.volume_name, + 'err': response['message']} + ) + + LOG.error(msg) + + return device_info + + def disconnect_volume(self, connection_properties, device_info): + self.get_config(connection_properties) + self.volume_id = self._get_volume_id() + LOG.info(_LI( + "ScaleIO disconnect volume in ScaleIO brick volume driver." + )) + + LOG.debug( + _("ScaleIO Volume name: %(volume_name)s, SDC IP: %(sdc_ip)s, " + "REST Server IP: %(server_ip)s"), + {'volume_name': self.volume_name, 'sdc_ip': self.local_sdc_ip, + 'server_ip': self.server_ip} + ) + + LOG.info(_LI("ScaleIO sdc query guid command: %(cmd)s"), + {'cmd': self.GET_GUID_CMD}) + + try: + (out, err) = self._execute(*self.GET_GUID_CMD, run_as_root=True, + root_helper=self._root_helper) + LOG.info( + _LI("Unmap volume %(cmd)s: stdout=%(out)s stderr=%(err)s"), + {'cmd': self.GET_GUID_CMD, 'out': out, 'err': err} + ) + + except putils.ProcessExecutionError as e: + msg = _("Error querying sdc guid: %(err)s") % {'err': e.stderr} + LOG.error(msg) + raise exception.BrickException(message=msg) + + guid = out + LOG.info(_LI("Current sdc guid: %(guid)s"), {'guid': guid}) + + params = {'guid': guid} + headers = {'content-type': 'application/json'} + request = ( + "https://%(server_ip)s:%(server_port)s/api/instances/" + "Volume::%(volume_id)s/action/removeMappedSdc" % + {'server_ip': self.server_ip, 'server_port': self.server_port, + 'volume_id': self.volume_id} + ) + + LOG.info(_LI("Unmap volume request: %(request)s"), + {'request': request}) + r = requests.post( + request, + data=json.dumps(params), + headers=headers, + auth=(self.server_username, self.server_token), + verify=False + ) + + r = self._check_response(r, request, False, params) + if r.status_code != self.OK_STATUS_CODE: + response = r.json() + error_code = response['errorCode'] + if error_code == self.VOLUME_NOT_MAPPED_ERROR: + LOG.warning(_LW( + "Ignoring error unmapping volume %(volume_id)s: " + "volume not mapped."), {'volume_id': self.volume_name} + ) + else: + msg = (_("Error unmapping volume %(volume_id)s: %(err)s") % + {'volume_id': self.volume_name, + 'err': response['message']}) + LOG.error(msg) + raise exception.BrickException(message=msg) diff --git a/os_brick/tests/initiator/test_connector.py b/os_brick/tests/initiator/test_connector.py index 6a308390f..53d59549f 100644 --- a/os_brick/tests/initiator/test_connector.py +++ b/os_brick/tests/initiator/test_connector.py @@ -17,11 +17,13 @@ import platform import tempfile import time +import json import mock from oslo_concurrency import processutils as putils from oslo_log import log as logging from oslo_service import loopingcall from oslo_utils import encodeutils +import requests import six import testtools @@ -153,6 +155,9 @@ class ConnectorTestCase(base.TestCase): obj = connector.InitiatorConnector.factory('huaweisdshypervisor', None) self.assertEqual(obj.__class__.__name__, "HuaweiStorHyperConnector") + obj = connector.InitiatorConnector.factory("scaleio", None) + self.assertEqual("ScaleIOConnector", obj.__class__.__name__) + self.assertRaises(ValueError, connector.InitiatorConnector.factory, "bogus", None) @@ -1551,3 +1556,211 @@ class RBDConnectorTestCase(ConnectorTestCase): rbd.disconnect_volume(self.connection_properties, device_info) volume_close.assert_called_once() + + +class ScaleIOConnectorTestCase(ConnectorTestCase): + """Test cases for ScaleIO connector""" + # Fake volume information + vol = { + 'id': 'vol1', + 'name': 'test_volume' + } + + # Fake SDC GUID + fake_guid = 'FAKE_GUID' + + def setUp(self): + super(ScaleIOConnectorTestCase, self).setUp() + + self.fake_connection_properties = { + 'hostIP': MY_IP, + 'serverIP': MY_IP, + 'scaleIO_volname': self.vol['name'], + 'serverPort': 443, + 'serverUsername': 'test', + 'serverPassword': 'fake', + 'serverToken': 'fake_token', + 'iopsLimit': None, + 'bandwidthLimit': None + } + + # Formatting string for REST API calls + self.action_format = "instances/Volume::{}/action/{{}}".format( + self.vol['id']) + self.get_volume_api = 'types/Volume/instances/getByName::{}'.format( + self.vol['name']) + + # Map of REST API calls to responses + self.mock_calls = { + self.get_volume_api: + self.MockHTTPSResponse(json.dumps(self.vol['id'])), + self.action_format.format('addMappedSdc'): + self.MockHTTPSResponse(''), + self.action_format.format('setMappedSdcLimits'): + self.MockHTTPSResponse(''), + self.action_format.format('removeMappedSdc'): + self.MockHTTPSResponse(''), + } + + # Default error REST response + self.error_404 = self.MockHTTPSResponse(content=dict( + errorCode=0, + message='HTTP 404', + ), status_code=404) + + # Patch the request and os calls to fake versions + mock.patch.object( + requests, 'get', self.handle_scaleio_request).start() + mock.patch.object( + requests, 'post', self.handle_scaleio_request).start() + mock.patch.object(os.path, 'isdir', return_value=True).start() + mock.patch.object( + os, 'listdir', return_value=["emc-vol-{}".format(self.vol['id'])] + ).start() + self.addCleanup(mock.patch.stopall) + + # The actual ScaleIO connector + self.connector = connector.ScaleIOConnector( + 'sudo', execute=self.fake_execute) + + class MockHTTPSResponse(requests.Response): + """Mock HTTP Response + + Defines the https replies from the mocked calls to do_request() + """ + def __init__(self, content, status_code=200): + super(ScaleIOConnectorTestCase.MockHTTPSResponse, + self).__init__() + + self._content = content + self.encoding = 'UTF-8' + self.status_code = status_code + + def json(self, **kwargs): + if isinstance(self._content, six.string_types): + return super(ScaleIOConnectorTestCase.MockHTTPSResponse, + self).json(**kwargs) + + return self._content + + @property + def text(self): + if not isinstance(self._content, six.string_types): + return json.dumps(self._content) + + self._content = self._content.encode('utf-8') + return super(ScaleIOConnectorTestCase.MockHTTPSResponse, + self).text + + def fake_execute(self, *cmd, **kwargs): + """Fakes the rootwrap call""" + return self.fake_guid, None + + def fake_missing_execute(self, *cmd, **kwargs): + """Error when trying to call rootwrap drv_cfg""" + raise putils.ProcessExecutionError("Test missing drv_cfg.") + + def handle_scaleio_request(self, url, *args, **kwargs): + """Fake REST server""" + api_call = url.split(':', 2)[2].split('/', 1)[1].replace('api/', '') + + try: + return self.mock_calls[api_call] + except KeyError: + return self.error_404 + + def test_connect_volume(self): + """Successful connect to volume""" + self.connector.connect_volume(self.fake_connection_properties) + + def test_connect_with_bandwidth_limit(self): + """Successful connect to volume with bandwidth limit""" + self.fake_connection_properties['bandwidthLimit'] = '500' + self.test_connect_volume() + + def test_connect_with_iops_limit(self): + """Successful connect to volume with iops limit""" + self.fake_connection_properties['iopsLimit'] = '80' + self.test_connect_volume() + + def test_connect_with_iops_and_bandwidth_limits(self): + """Successful connect with iops and bandwidth limits""" + self.fake_connection_properties['bandwidthLimit'] = '500' + self.fake_connection_properties['iopsLimit'] = '80' + self.test_connect_volume() + + def test_disconnect_volume(self): + """Successful disconnect from volume""" + self.connector.disconnect_volume(self.fake_connection_properties, None) + + def test_error_id(self): + """Fail to connect with bad volume name""" + self.mock_calls[self.get_volume_api] = self.MockHTTPSResponse( + dict(errorCode='404', message='Test volume not found'), 404) + + self.assertRaises(exception.BrickException, self.test_connect_volume) + + def test_error_no_volume_id(self): + """Faile to connect with no volume id""" + self.mock_calls[self.get_volume_api] = self.MockHTTPSResponse( + 'null', 200) + + self.assertRaises(exception.BrickException, self.test_connect_volume) + + def test_error_bad_login(self): + """Fail to connect with bad authentication""" + self.mock_calls[self.get_volume_api] = self.MockHTTPSResponse( + 'null', 401) + + self.mock_calls['login'] = self.MockHTTPSResponse('null', 401) + + self.assertRaises(exception.BrickException, self.test_connect_volume) + + def test_error_bad_drv_cfg(self): + """Fail to connect with missing rootwrap executable""" + self.connector.set_execute(self.fake_missing_execute) + self.assertRaises(exception.BrickException, self.test_connect_volume) + + def test_error_map_volume(self): + """Fail to connect with REST API failure""" + self.mock_calls[self.action_format.format( + 'addMappedSdc')] = self.MockHTTPSResponse( + dict(errorCode=self.connector.VOLUME_NOT_MAPPED_ERROR, + message='Test error map volume'), 500) + + self.assertRaises(exception.BrickException, self.test_connect_volume) + + def test_error_path_not_found(self): + """Timeout waiting for volume to map to local file system""" + mock.patch.object( + os, 'listdir', return_value=["emc-vol-no-volume"] + ).start() + self.assertRaises(exception.BrickException, self.test_connect_volume) + + def test_map_volume_already_mapped(self): + """Ignore REST API failure for volume already mapped""" + self.mock_calls[self.action_format.format( + 'addMappedSdc')] = self.MockHTTPSResponse( + dict(errorCode=self.connector.VOLUME_ALREADY_MAPPED_ERROR, + message='Test error map volume'), 500) + + self.test_connect_volume() + + def test_error_disconnect_volume(self): + """Fail to disconnect with REST API failure""" + self.mock_calls[self.action_format.format( + 'removeMappedSdc')] = self.MockHTTPSResponse( + dict(errorCode=self.connector.VOLUME_ALREADY_MAPPED_ERROR, + message='Test error map volume'), 500) + + self.assertRaises(exception.BrickException, + self.test_disconnect_volume) + + def test_disconnect_volume_not_mapped(self): + """Ignore REST API failure for volume not mapped""" + self.mock_calls[self.action_format.format( + 'removeMappedSdc')] = self.MockHTTPSResponse( + dict(errorCode=self.connector.VOLUME_NOT_MAPPED_ERROR, + message='Test error map volume'), 500) + + self.test_disconnect_volume()