Merge "Add connector driver for the ScaleIO cinder driver"

This commit is contained in:
Jenkins 2015-07-09 18:49:27 +00:00 committed by Gerrit Code Review
commit 377d9fbb11
2 changed files with 645 additions and 1 deletions

View File

@ -22,9 +22,11 @@ each of the supported transport protocols.
import abc import abc
import copy import copy
import json
import os import os
import platform import platform
import re import re
import requests
import socket import socket
import sys import sys
import time import time
@ -35,18 +37,21 @@ from oslo_log import log as logging
from oslo_service import loopingcall from oslo_service import loopingcall
from oslo_utils import strutils from oslo_utils import strutils
import six import six
from six.moves import urllib
S390X = "s390x" S390X = "s390x"
S390 = "s390" S390 = "s390"
from os_brick import exception from os_brick import exception
from os_brick import executor from os_brick import executor
from os_brick import utils
from os_brick.initiator import host_driver from os_brick.initiator import host_driver
from os_brick.initiator import linuxfc from os_brick.initiator import linuxfc
from os_brick.initiator import linuxrbd from os_brick.initiator import linuxrbd
from os_brick.initiator import linuxscsi from os_brick.initiator import linuxscsi
from os_brick.remotefs import remotefs from os_brick.remotefs import remotefs
from os_brick.i18n import _, _LE, _LW from os_brick.i18n import _, _LE, _LI, _LW
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -210,6 +215,14 @@ class InitiatorConnector(executor.Executor):
execute=execute, execute=execute,
device_scan_attempts=device_scan_attempts, device_scan_attempts=device_scan_attempts,
*args, **kwargs) *args, **kwargs)
elif protocol == "SCALEIO":
return ScaleIOConnector(
root_helper=root_helper,
driver=driver,
execute=execute,
device_scan_attempts=device_scan_attempts,
*args, **kwargs
)
else: else:
msg = (_("Invalid InitiatorConnector protocol " msg = (_("Invalid InitiatorConnector protocol "
"specified %(protocol)s") % "specified %(protocol)s") %
@ -1548,3 +1561,421 @@ class HGSTConnector(InitiatorConnector):
msg = (_("Unable to set apphost for space %s") % msg = (_("Unable to set apphost for space %s") %
connection_properties['name']) connection_properties['name'])
raise exception.BrickException(message=msg) raise exception.BrickException(message=msg)
class ScaleIOConnector(InitiatorConnector):
"""Class implements the connector driver for ScaleIO."""
OK_STATUS_CODE = 200
VOLUME_NOT_MAPPED_ERROR = 84
VOLUME_ALREADY_MAPPED_ERROR = 81
GET_GUID_CMD = ['drv_cfg', '--query_guid']
def __init__(self, root_helper, driver=None, execute=putils.execute,
device_scan_attempts=DEVICE_SCAN_ATTEMPTS_DEFAULT,
*args, **kwargs):
super(ScaleIOConnector, self).__init__(
root_helper,
driver=driver,
execute=execute,
device_scan_attempts=device_scan_attempts,
*args, **kwargs
)
self.local_sdc_ip = None
self.server_ip = None
self.server_port = None
self.server_username = None
self.server_password = None
self.server_token = None
self.volume_id = None
self.volume_name = None
self.volume_path = None
self.iops_limit = None
self.bandwidth_limit = None
def _find_volume_path(self):
LOG.info(_LI(
"Looking for volume %(volume_id)s, maximum tries: %(tries)s"),
{'volume_id': self.volume_id, 'tries': self.device_scan_attempts}
)
# look for the volume in /dev/disk/by-id directory
by_id_path = "/dev/disk/by-id"
disk_filename = self._wait_for_volume_path(by_id_path)
full_disk_name = ("%(path)s/%(filename)s" %
{'path': by_id_path, 'filename': disk_filename})
LOG.info(_LI("Full disk name is %(full_path)s"),
{'full_path': full_disk_name})
return full_disk_name
# NOTE: Usually 3 retries is enough to find the volume.
# If there are network issues, it could take much longer. Set
# the max retries to 15 to make sure we can find the volume.
@utils.retry(exceptions=exception.BrickException,
retries=15,
backoff_rate=1)
def _wait_for_volume_path(self, path):
if not os.path.isdir(path):
msg = (
_("ScaleIO volume %(volume_id)s not found at "
"expected path.") % {'volume_id': self.volume_id}
)
LOG.debug(msg)
raise exception.BrickException(message=msg)
disk_filename = None
filenames = os.listdir(path)
LOG.info(_LI(
"Files found in %(path)s path: %(files)s "),
{'path': path, 'files': filenames}
)
for filename in filenames:
if (filename.startswith("emc-vol") and
filename.endswith(self.volume_id)):
disk_filename = filename
break
if not disk_filename:
msg = (_("ScaleIO volume %(volume_id)s not found.") %
{'volume_id': self.volume_id})
LOG.debug(msg)
raise exception.BrickException(message=msg)
return disk_filename
def _get_client_id(self):
request = (
"https://%(server_ip)s:%(server_port)s/"
"api/types/Client/instances/getByIp::%(sdc_ip)s/" %
{
'server_ip': self.server_ip,
'server_port': self.server_port,
'sdc_ip': self.local_sdc_ip
}
)
LOG.info(_LI("ScaleIO get client id by ip request: %(request)s"),
{'request': request})
r = requests.get(
request,
auth=(self.server_username, self.server_token),
verify=False
)
r = self._check_response(r, request)
sdc_id = r.json()
if not sdc_id:
msg = (_("Client with ip %(sdc_ip)s was not found.") %
{'sdc_ip': self.local_sdc_ip})
raise exception.BrickException(message=msg)
if r.status_code != 200 and "errorCode" in sdc_id:
msg = (_("Error getting sdc id from ip %(sdc_ip): %(err)s") %
{'sdc_ip': self.local_sdc_ip, 'err': sdc_id['message']})
LOG.error(msg)
raise exception.BrickException(message=msg)
LOG.info(_LI("ScaleIO sdc id is %(sdc_id)s."),
{'sdc_id': sdc_id})
return sdc_id
def _get_volume_id(self):
volname_encoded = urllib.parse.quote(self.volume_name, '')
volname_double_encoded = urllib.parse.quote(volname_encoded, '')
LOG.debug(_(
"Volume name after double encoding is %(volume_name)s."),
{'volume_name': volname_double_encoded}
)
request = (
"https://%(server_ip)s:%(server_port)s/api/types/Volume/instances"
"/getByName::%(encoded_volume_name)s" %
{
'server_ip': self.server_ip,
'server_port': self.server_port,
'encoded_volume_name': volname_double_encoded
}
)
LOG.info(
_LI("ScaleIO get volume id by name request: %(request)s"),
{'request': request}
)
r = requests.get(request,
auth=(self.server_username, self.server_token),
verify=False)
r = self._check_response(r, request)
volume_id = r.json()
if not volume_id:
msg = (_("Volume with name %(volume_name)s wasn't found.") %
{'volume_name': self.volume_name})
LOG.error(msg)
raise exception.BrickException(message=msg)
if r.status_code != self.OK_STATUS_CODE and "errorCode" in volume_id:
msg = (
_("Error getting volume id from name %(volume_name)s: "
"%(err)s") %
{'volume_name': self.volume_name, 'err': volume_id['message']}
)
LOG.error(msg)
raise exception.BrickException(message=msg)
LOG.info(_LI("ScaleIO volume id is %(volume_id)s."),
{'volume_id': volume_id})
return volume_id
def _check_response(self, response, request, is_get_request=True,
params=None):
if response.status_code == 401 or response.status_code == 403:
LOG.info(_LI("Token is invalid, "
"going to re-login to get a new one"))
login_request = (
"https://%(server_ip)s:%(server_port)s/api/login" %
{'server_ip': self.server_ip, 'server_port': self.server_port}
)
r = requests.get(
login_request,
auth=(self.server_username, self.server_password),
verify=False
)
token = r.json()
# repeat request with valid token
LOG.debug(_("Going to perform request %(request)s again "
"with valid token"), {'request': request})
if is_get_request:
res = requests.get(request,
auth=(self.server_username, token),
verify=False)
else:
headers = {'content-type': 'application/json'}
res = requests.post(
request,
data=json.dumps(params),
headers=headers,
auth=(self.server_username, token),
verify=False
)
self.server_token = token
return res
return response
def get_config(self, connection_properties):
self.local_sdc_ip = connection_properties['hostIP']
self.volume_name = connection_properties['scaleIO_volname']
self.server_ip = connection_properties['serverIP']
self.server_port = connection_properties['serverPort']
self.server_username = connection_properties['serverUsername']
self.server_password = connection_properties['serverPassword']
self.server_token = connection_properties['serverToken']
self.iops_limit = connection_properties['iopsLimit']
self.bandwidth_limit = connection_properties['bandwidthLimit']
device_info = {'type': 'block',
'path': self.volume_path}
return device_info
def connect_volume(self, connection_properties):
"""Connect the volume."""
device_info = self.get_config(connection_properties)
LOG.debug(
_(
"scaleIO Volume name: %(volume_name)s, SDC IP: %(sdc_ip)s, "
"REST Server IP: %(server_ip)s, "
"REST Server username: %(username)s, "
"iops limit:%(iops_limit)s, "
"bandwidth limit: %(bandwidth_limit)s."
), {
'volume_name': self.volume_name,
'sdc_ip': self.local_sdc_ip,
'server_ip': self.server_ip,
'username': self.server_username,
'iops_limit': self.iops_limit,
'bandwidth_limit': self.bandwidth_limit
}
)
LOG.info(_LI("ScaleIO sdc query guid command: %(cmd)s"),
{'cmd': self.GET_GUID_CMD})
try:
(out, err) = self._execute(*self.GET_GUID_CMD, run_as_root=True,
root_helper=self._root_helper)
LOG.info(_LI("Map volume %(cmd)s: stdout=%(out)s "
"stderr=%(err)s"),
{'cmd': self.GET_GUID_CMD, 'out': out, 'err': err})
except putils.ProcessExecutionError as e:
msg = (_("Error querying sdc guid: %(err)s") % {'err': e.stderr})
LOG.error(msg)
raise exception.BrickException(message=msg)
guid = out
LOG.info(_LI("Current sdc guid: %(guid)s"), {'guid': guid})
params = {'guid': guid, 'allowMultipleMappings': 'TRUE'}
self.volume_id = self._get_volume_id()
headers = {'content-type': 'application/json'}
request = (
"https://%(server_ip)s:%(server_port)s/api/instances/"
"Volume::%(volume_id)s/action/addMappedSdc" %
{'server_ip': self.server_ip, 'server_port': self.server_port,
'volume_id': self.volume_id}
)
LOG.info(_LI("map volume request: %(request)s"), {'request': request})
r = requests.post(
request,
data=json.dumps(params),
headers=headers,
auth=(self.server_username, self.server_token),
verify=False
)
r = self._check_response(r, request, False, params)
if r.status_code != self.OK_STATUS_CODE:
response = r.json()
error_code = response['errorCode']
if error_code == self.VOLUME_ALREADY_MAPPED_ERROR:
LOG.warning(_LW(
"Ignoring error mapping volume %(volume_name)s: "
"volume already mapped."),
{'volume_name': self.volume_name}
)
else:
msg = (
_("Error mapping volume %(volume_name)s: %(err)s") %
{'volume_name': self.volume_name,
'err': response['message']}
)
LOG.error(msg)
raise exception.BrickException(message=msg)
self.volume_path = self._find_volume_path()
device_info['path'] = self.volume_path
# Set QoS settings after map was performed
if self.iops_limit is not None or self.bandwidth_limit is not None:
params = {'guid': guid}
if self.bandwidth_limit is not None:
params['bandwidthLimitInKbps'] = self.bandwidth_limit
if self.iops_limit is not None:
params['iops_limit'] = self.iops_limit
request = (
"https://%(server_ip)s:%(server_port)s/api/instances/"
"Volume::%(volume_id)s/action/setMappedSdcLimits" %
{'server_ip': self.server_ip, 'server_port': self.server_port,
'volume_id': self.volume_id}
)
LOG.info(_LI("Set client limit request: %(request)s"),
{'request': request})
r = requests.post(
request,
data=json.dumps(params),
headers=headers,
auth=(self.server_username, self.server_token),
verify=False
)
r = self._check_response(r, request, False, params)
if r.status_code != self.OK_STATUS_CODE:
response = r.json()
LOG.info(_LI("Set client limit response: %(response)s"),
{'response': response})
msg = (
_("Error setting client limits for volume "
"%(volume_name)s: %(err)s") %
{'volume_name': self.volume_name,
'err': response['message']}
)
LOG.error(msg)
return device_info
def disconnect_volume(self, connection_properties, device_info):
self.get_config(connection_properties)
self.volume_id = self._get_volume_id()
LOG.info(_LI(
"ScaleIO disconnect volume in ScaleIO brick volume driver."
))
LOG.debug(
_("ScaleIO Volume name: %(volume_name)s, SDC IP: %(sdc_ip)s, "
"REST Server IP: %(server_ip)s"),
{'volume_name': self.volume_name, 'sdc_ip': self.local_sdc_ip,
'server_ip': self.server_ip}
)
LOG.info(_LI("ScaleIO sdc query guid command: %(cmd)s"),
{'cmd': self.GET_GUID_CMD})
try:
(out, err) = self._execute(*self.GET_GUID_CMD, run_as_root=True,
root_helper=self._root_helper)
LOG.info(
_LI("Unmap volume %(cmd)s: stdout=%(out)s stderr=%(err)s"),
{'cmd': self.GET_GUID_CMD, 'out': out, 'err': err}
)
except putils.ProcessExecutionError as e:
msg = _("Error querying sdc guid: %(err)s") % {'err': e.stderr}
LOG.error(msg)
raise exception.BrickException(message=msg)
guid = out
LOG.info(_LI("Current sdc guid: %(guid)s"), {'guid': guid})
params = {'guid': guid}
headers = {'content-type': 'application/json'}
request = (
"https://%(server_ip)s:%(server_port)s/api/instances/"
"Volume::%(volume_id)s/action/removeMappedSdc" %
{'server_ip': self.server_ip, 'server_port': self.server_port,
'volume_id': self.volume_id}
)
LOG.info(_LI("Unmap volume request: %(request)s"),
{'request': request})
r = requests.post(
request,
data=json.dumps(params),
headers=headers,
auth=(self.server_username, self.server_token),
verify=False
)
r = self._check_response(r, request, False, params)
if r.status_code != self.OK_STATUS_CODE:
response = r.json()
error_code = response['errorCode']
if error_code == self.VOLUME_NOT_MAPPED_ERROR:
LOG.warning(_LW(
"Ignoring error unmapping volume %(volume_id)s: "
"volume not mapped."), {'volume_id': self.volume_name}
)
else:
msg = (_("Error unmapping volume %(volume_id)s: %(err)s") %
{'volume_id': self.volume_name,
'err': response['message']})
LOG.error(msg)
raise exception.BrickException(message=msg)

View File

@ -17,11 +17,13 @@ import platform
import tempfile import tempfile
import time import time
import json
import mock import mock
from oslo_concurrency import processutils as putils from oslo_concurrency import processutils as putils
from oslo_log import log as logging from oslo_log import log as logging
from oslo_service import loopingcall from oslo_service import loopingcall
from oslo_utils import encodeutils from oslo_utils import encodeutils
import requests
import six import six
import testtools import testtools
@ -163,6 +165,9 @@ class ConnectorTestCase(base.TestCase):
obj = connector.InitiatorConnector.factory('huaweisdshypervisor', None) obj = connector.InitiatorConnector.factory('huaweisdshypervisor', None)
self.assertEqual(obj.__class__.__name__, "HuaweiStorHyperConnector") self.assertEqual(obj.__class__.__name__, "HuaweiStorHyperConnector")
obj = connector.InitiatorConnector.factory("scaleio", None)
self.assertEqual("ScaleIOConnector", obj.__class__.__name__)
self.assertRaises(ValueError, self.assertRaises(ValueError,
connector.InitiatorConnector.factory, connector.InitiatorConnector.factory,
"bogus", None) "bogus", None)
@ -1561,3 +1566,211 @@ class RBDConnectorTestCase(ConnectorTestCase):
rbd.disconnect_volume(self.connection_properties, device_info) rbd.disconnect_volume(self.connection_properties, device_info)
volume_close.assert_called_once() volume_close.assert_called_once()
class ScaleIOConnectorTestCase(ConnectorTestCase):
"""Test cases for ScaleIO connector"""
# Fake volume information
vol = {
'id': 'vol1',
'name': 'test_volume'
}
# Fake SDC GUID
fake_guid = 'FAKE_GUID'
def setUp(self):
super(ScaleIOConnectorTestCase, self).setUp()
self.fake_connection_properties = {
'hostIP': MY_IP,
'serverIP': MY_IP,
'scaleIO_volname': self.vol['name'],
'serverPort': 443,
'serverUsername': 'test',
'serverPassword': 'fake',
'serverToken': 'fake_token',
'iopsLimit': None,
'bandwidthLimit': None
}
# Formatting string for REST API calls
self.action_format = "instances/Volume::{}/action/{{}}".format(
self.vol['id'])
self.get_volume_api = 'types/Volume/instances/getByName::{}'.format(
self.vol['name'])
# Map of REST API calls to responses
self.mock_calls = {
self.get_volume_api:
self.MockHTTPSResponse(json.dumps(self.vol['id'])),
self.action_format.format('addMappedSdc'):
self.MockHTTPSResponse(''),
self.action_format.format('setMappedSdcLimits'):
self.MockHTTPSResponse(''),
self.action_format.format('removeMappedSdc'):
self.MockHTTPSResponse(''),
}
# Default error REST response
self.error_404 = self.MockHTTPSResponse(content=dict(
errorCode=0,
message='HTTP 404',
), status_code=404)
# Patch the request and os calls to fake versions
mock.patch.object(
requests, 'get', self.handle_scaleio_request).start()
mock.patch.object(
requests, 'post', self.handle_scaleio_request).start()
mock.patch.object(os.path, 'isdir', return_value=True).start()
mock.patch.object(
os, 'listdir', return_value=["emc-vol-{}".format(self.vol['id'])]
).start()
self.addCleanup(mock.patch.stopall)
# The actual ScaleIO connector
self.connector = connector.ScaleIOConnector(
'sudo', execute=self.fake_execute)
class MockHTTPSResponse(requests.Response):
"""Mock HTTP Response
Defines the https replies from the mocked calls to do_request()
"""
def __init__(self, content, status_code=200):
super(ScaleIOConnectorTestCase.MockHTTPSResponse,
self).__init__()
self._content = content
self.encoding = 'UTF-8'
self.status_code = status_code
def json(self, **kwargs):
if isinstance(self._content, six.string_types):
return super(ScaleIOConnectorTestCase.MockHTTPSResponse,
self).json(**kwargs)
return self._content
@property
def text(self):
if not isinstance(self._content, six.string_types):
return json.dumps(self._content)
self._content = self._content.encode('utf-8')
return super(ScaleIOConnectorTestCase.MockHTTPSResponse,
self).text
def fake_execute(self, *cmd, **kwargs):
"""Fakes the rootwrap call"""
return self.fake_guid, None
def fake_missing_execute(self, *cmd, **kwargs):
"""Error when trying to call rootwrap drv_cfg"""
raise putils.ProcessExecutionError("Test missing drv_cfg.")
def handle_scaleio_request(self, url, *args, **kwargs):
"""Fake REST server"""
api_call = url.split(':', 2)[2].split('/', 1)[1].replace('api/', '')
try:
return self.mock_calls[api_call]
except KeyError:
return self.error_404
def test_connect_volume(self):
"""Successful connect to volume"""
self.connector.connect_volume(self.fake_connection_properties)
def test_connect_with_bandwidth_limit(self):
"""Successful connect to volume with bandwidth limit"""
self.fake_connection_properties['bandwidthLimit'] = '500'
self.test_connect_volume()
def test_connect_with_iops_limit(self):
"""Successful connect to volume with iops limit"""
self.fake_connection_properties['iopsLimit'] = '80'
self.test_connect_volume()
def test_connect_with_iops_and_bandwidth_limits(self):
"""Successful connect with iops and bandwidth limits"""
self.fake_connection_properties['bandwidthLimit'] = '500'
self.fake_connection_properties['iopsLimit'] = '80'
self.test_connect_volume()
def test_disconnect_volume(self):
"""Successful disconnect from volume"""
self.connector.disconnect_volume(self.fake_connection_properties, None)
def test_error_id(self):
"""Fail to connect with bad volume name"""
self.mock_calls[self.get_volume_api] = self.MockHTTPSResponse(
dict(errorCode='404', message='Test volume not found'), 404)
self.assertRaises(exception.BrickException, self.test_connect_volume)
def test_error_no_volume_id(self):
"""Faile to connect with no volume id"""
self.mock_calls[self.get_volume_api] = self.MockHTTPSResponse(
'null', 200)
self.assertRaises(exception.BrickException, self.test_connect_volume)
def test_error_bad_login(self):
"""Fail to connect with bad authentication"""
self.mock_calls[self.get_volume_api] = self.MockHTTPSResponse(
'null', 401)
self.mock_calls['login'] = self.MockHTTPSResponse('null', 401)
self.assertRaises(exception.BrickException, self.test_connect_volume)
def test_error_bad_drv_cfg(self):
"""Fail to connect with missing rootwrap executable"""
self.connector.set_execute(self.fake_missing_execute)
self.assertRaises(exception.BrickException, self.test_connect_volume)
def test_error_map_volume(self):
"""Fail to connect with REST API failure"""
self.mock_calls[self.action_format.format(
'addMappedSdc')] = self.MockHTTPSResponse(
dict(errorCode=self.connector.VOLUME_NOT_MAPPED_ERROR,
message='Test error map volume'), 500)
self.assertRaises(exception.BrickException, self.test_connect_volume)
def test_error_path_not_found(self):
"""Timeout waiting for volume to map to local file system"""
mock.patch.object(
os, 'listdir', return_value=["emc-vol-no-volume"]
).start()
self.assertRaises(exception.BrickException, self.test_connect_volume)
def test_map_volume_already_mapped(self):
"""Ignore REST API failure for volume already mapped"""
self.mock_calls[self.action_format.format(
'addMappedSdc')] = self.MockHTTPSResponse(
dict(errorCode=self.connector.VOLUME_ALREADY_MAPPED_ERROR,
message='Test error map volume'), 500)
self.test_connect_volume()
def test_error_disconnect_volume(self):
"""Fail to disconnect with REST API failure"""
self.mock_calls[self.action_format.format(
'removeMappedSdc')] = self.MockHTTPSResponse(
dict(errorCode=self.connector.VOLUME_ALREADY_MAPPED_ERROR,
message='Test error map volume'), 500)
self.assertRaises(exception.BrickException,
self.test_disconnect_volume)
def test_disconnect_volume_not_mapped(self):
"""Ignore REST API failure for volume not mapped"""
self.mock_calls[self.action_format.format(
'removeMappedSdc')] = self.MockHTTPSResponse(
dict(errorCode=self.connector.VOLUME_NOT_MAPPED_ERROR,
message='Test error map volume'), 500)
self.test_disconnect_volume()