Volume discovery and local storage management lib
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

523 lines
19 KiB

# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import requests
import six
from six.moves import configparser
from six.moves import urllib
from oslo_concurrency import lockutils
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
from os_brick import exception
from os_brick.i18n import _
from os_brick import initiator
from os_brick.initiator.connectors import base
from os_brick import utils
LOG = logging.getLogger(__name__)
DEVICE_SCAN_ATTEMPTS_DEFAULT = 3
CONNECTOR_CONF_PATH = '/opt/emc/scaleio/openstack/connector.conf'
synchronized = lockutils.synchronized_with_prefix('os-brick-')
class ScaleIOConnector(base.BaseLinuxConnector):
"""Class implements the connector driver for ScaleIO."""
OK_STATUS_CODE = 200
VOLUME_NOT_MAPPED_ERROR = 84
VOLUME_ALREADY_MAPPED_ERROR = 81
GET_GUID_CMD = ['/opt/emc/scaleio/sdc/bin/drv_cfg', '--query_guid']
GET_PASSWORD_CMD = ['cat', CONNECTOR_CONF_PATH]
def __init__(self, root_helper, driver=None,
device_scan_attempts=initiator.DEVICE_SCAN_ATTEMPTS_DEFAULT,
*args, **kwargs):
super(ScaleIOConnector, self).__init__(
root_helper,
driver=driver,
device_scan_attempts=device_scan_attempts,
*args, **kwargs
)
self.local_sdc_ip = None
self.server_ip = None
self.server_port = None
self.server_username = None
self.server_password = None
self.server_token = None
self.volume_id = None
self.volume_name = None
self.volume_path = None
self.iops_limit = None
self.bandwidth_limit = None
@staticmethod
def get_connector_properties(root_helper, *args, **kwargs):
"""The ScaleIO connector properties."""
return {}
def get_search_path(self):
return "/dev/disk/by-id"
def get_volume_paths(self, connection_properties):
self.get_config(connection_properties)
volume_paths = []
device_paths = [self._find_volume_path()]
for path in device_paths:
if os.path.exists(path):
volume_paths.append(path)
return volume_paths
def _find_volume_path(self):
LOG.info(
"Looking for volume %(volume_id)s, maximum tries: %(tries)s",
{'volume_id': self.volume_id, 'tries': self.device_scan_attempts}
)
# look for the volume in /dev/disk/by-id directory
by_id_path = self.get_search_path()
disk_filename = self._wait_for_volume_path(by_id_path)
full_disk_name = ("%(path)s/%(filename)s" %
{'path': by_id_path, 'filename': disk_filename})
LOG.info("Full disk name is %(full_path)s",
{'full_path': full_disk_name})
return full_disk_name
# NOTE: Usually 3 retries is enough to find the volume.
# If there are network issues, it could take much longer. Set
# the max retries to 15 to make sure we can find the volume.
@utils.retry(exceptions=exception.BrickException,
retries=15,
backoff_rate=1)
def _wait_for_volume_path(self, path):
if not os.path.isdir(path):
msg = (
_("ScaleIO volume %(volume_id)s not found at "
"expected path.") % {'volume_id': self.volume_id}
)
LOG.debug(msg)
raise exception.BrickException(message=msg)
disk_filename = None
filenames = os.listdir(path)
LOG.info(
"Files found in %(path)s path: %(files)s ",
{'path': path, 'files': filenames}
)
for filename in filenames:
if (filename.startswith("emc-vol") and
filename.endswith(self.volume_id)):
disk_filename = filename
break
if not disk_filename:
msg = (_("ScaleIO volume %(volume_id)s not found.") %
{'volume_id': self.volume_id})
LOG.debug(msg)
raise exception.BrickException(message=msg)
return disk_filename
def _get_client_id(self):
request = (
"https://%(server_ip)s:%(server_port)s/"
"api/types/Client/instances/getByIp::%(sdc_ip)s/" %
{
'server_ip': self.server_ip,
'server_port': self.server_port,
'sdc_ip': self.local_sdc_ip
}
)
LOG.info("ScaleIO get client id by ip request: %(request)s",
{'request': request})
r = requests.get(
request,
auth=(self.server_username, self.server_token),
verify=False
)
r = self._check_response(r, request)
sdc_id = r.json()
if not sdc_id:
msg = (_("Client with ip %(sdc_ip)s was not found.") %
{'sdc_ip': self.local_sdc_ip})
raise exception.BrickException(message=msg)
if r.status_code != 200 and "errorCode" in sdc_id:
msg = (_("Error getting sdc id from ip %(sdc_ip)s: %(err)s") %
{'sdc_ip': self.local_sdc_ip, 'err': sdc_id['message']})
LOG.error(msg)
raise exception.BrickException(message=msg)
LOG.info("ScaleIO sdc id is %(sdc_id)s.",
{'sdc_id': sdc_id})
return sdc_id
def _get_volume_id(self):
volname_encoded = urllib.parse.quote(self.volume_name, '')
volname_double_encoded = urllib.parse.quote(volname_encoded, '')
LOG.debug(_(
"Volume name after double encoding is %(volume_name)s."),
{'volume_name': volname_double_encoded}
)
request = (
"https://%(server_ip)s:%(server_port)s/api/types/Volume/instances"
"/getByName::%(encoded_volume_name)s" %
{
'server_ip': self.server_ip,
'server_port': self.server_port,
'encoded_volume_name': volname_double_encoded
}
)
LOG.info(
"ScaleIO get volume id by name request: %(request)s",
{'request': request}
)
r = requests.get(request,
auth=(self.server_username, self.server_token),
verify=False)
r = self._check_response(r, request)
volume_id = r.json()
if not volume_id:
msg = (_("Volume with name %(volume_name)s wasn't found.") %
{'volume_name': self.volume_name})
LOG.error(msg)
raise exception.BrickException(message=msg)
if r.status_code != self.OK_STATUS_CODE and "errorCode" in volume_id:
msg = (
_("Error getting volume id from name %(volume_name)s: "
"%(err)s") %
{'volume_name': self.volume_name, 'err': volume_id['message']}
)
LOG.error(msg)
raise exception.BrickException(message=msg)
LOG.info("ScaleIO volume id is %(volume_id)s.",
{'volume_id': volume_id})
return volume_id
def _get_connector_password(self, config_group):
LOG.info("Get ScaleIO connector password from configuration file")
if not os.path.isfile(CONNECTOR_CONF_PATH):
msg = ("ScaleIO connector configuration file "
"is not found in path %s." % CONNECTOR_CONF_PATH)
raise exception.BrickException(message=msg)
try:
(out, err) = self._execute(*self.GET_PASSWORD_CMD,
run_as_root=True,
root_helper=self._root_helper)
conf = configparser.ConfigParser()
conf.readfp(six.StringIO(out))
return conf[config_group]["san_password"]
except putils.ProcessExecutionError as e:
msg = _("Error reading ScaleIO connector "
"configuration file: %s") % e.stderr
LOG.error(msg)
raise exception.BrickException(message=msg)
except Exception as e:
msg = _("Error getting ScaleIO connector password from "
"configuration file: %s") % e
LOG.error(msg)
raise exception.BrickException(message=msg)
def _check_response(self, response, request, is_get_request=True,
params=None):
if response.status_code == 401 or response.status_code == 403:
LOG.info("Token is invalid, "
"going to re-login to get a new one")
login_request = (
"https://%(server_ip)s:%(server_port)s/api/login" %
{'server_ip': self.server_ip, 'server_port': self.server_port}
)
r = requests.get(
login_request,
auth=(self.server_username, self.server_password),
verify=False
)
token = r.json()
# repeat request with valid token
LOG.debug(_("Going to perform request %(request)s again "
"with valid token"), {'request': request})
if is_get_request:
res = requests.get(request,
auth=(self.server_username, token),
verify=False)
else:
headers = {'content-type': 'application/json'}
res = requests.post(
request,
data=json.dumps(params),
headers=headers,
auth=(self.server_username, token),
verify=False
)
self.server_token = token
return res
return response
def get_config(self, connection_properties):
self.local_sdc_ip = connection_properties['hostIP']
self.volume_name = connection_properties['scaleIO_volname']
self.volume_id = connection_properties['scaleIO_volume_id']
self.server_ip = connection_properties['serverIP']
self.server_port = connection_properties['serverPort']
self.server_username = connection_properties['serverUsername']
self.server_password = self._get_connector_password(
connection_properties['config_group'],
)
self.iops_limit = connection_properties['iopsLimit']
self.bandwidth_limit = connection_properties['bandwidthLimit']
device_info = {'type': 'block',
'path': self.volume_path}
return device_info
@utils.trace
@lockutils.synchronized('scaleio', 'scaleio-')
def connect_volume(self, connection_properties):
"""Connect the volume.
:param connection_properties: The dictionary that describes all
of the target volume attributes.
:type connection_properties: dict
:returns: dict
"""
device_info = self.get_config(connection_properties)
LOG.debug(
_(
"scaleIO Volume name: %(volume_name)s, SDC IP: %(sdc_ip)s, "
"REST Server IP: %(server_ip)s, "
"REST Server username: %(username)s, "
"iops limit:%(iops_limit)s, "
"bandwidth limit: %(bandwidth_limit)s."
), {
'volume_name': self.volume_name,
'volume_id': self.volume_id,
'sdc_ip': self.local_sdc_ip,
'server_ip': self.server_ip,
'username': self.server_username,
'iops_limit': self.iops_limit,
'bandwidth_limit': self.bandwidth_limit
}
)
LOG.info("ScaleIO sdc query guid command: %(cmd)s",
{'cmd': self.GET_GUID_CMD})
try:
(out, err) = self._execute(*self.GET_GUID_CMD, run_as_root=True,
root_helper=self._root_helper)
LOG.info("Map volume %(cmd)s: stdout=%(out)s "
"stderr=%(err)s",
{'cmd': self.GET_GUID_CMD, 'out': out, 'err': err})
except putils.ProcessExecutionError as e:
msg = (_("Error querying sdc guid: %(err)s") % {'err': e.stderr})
LOG.error(msg)
raise exception.BrickException(message=msg)
guid = out
LOG.info("Current sdc guid: %(guid)s", {'guid': guid})
params = {'guid': guid, 'allowMultipleMappings': 'TRUE'}
self.volume_id = self.volume_id or self._get_volume_id()
headers = {'content-type': 'application/json'}
request = (
"https://%(server_ip)s:%(server_port)s/api/instances/"
"Volume::%(volume_id)s/action/addMappedSdc" %
{'server_ip': self.server_ip, 'server_port': self.server_port,
'volume_id': self.volume_id}
)
LOG.info("map volume request: %(request)s", {'request': request})
r = requests.post(
request,
data=json.dumps(params),
headers=headers,
auth=(self.server_username, self.server_token),
verify=False
)
r = self._check_response(r, request, False, params)
if r.status_code != self.OK_STATUS_CODE:
response = r.json()
error_code = response['errorCode']
if error_code == self.VOLUME_ALREADY_MAPPED_ERROR:
LOG.warning(
"Ignoring error mapping volume %(volume_name)s: "
"volume already mapped.",
{'volume_name': self.volume_name}
)
else:
msg = (
_("Error mapping volume %(volume_name)s: %(err)s") %
{'volume_name': self.volume_name,
'err': response['message']}
)
LOG.error(msg)
raise exception.BrickException(message=msg)
self.volume_path = self._find_volume_path()
device_info['path'] = self.volume_path
# Set QoS settings after map was performed
if self.iops_limit is not None or self.bandwidth_limit is not None:
params = {'guid': guid}
if self.bandwidth_limit is not None:
params['bandwidthLimitInKbps'] = self.bandwidth_limit
if self.iops_limit is not None:
params['iopsLimit'] = self.iops_limit
request = (
"https://%(server_ip)s:%(server_port)s/api/instances/"
"Volume::%(volume_id)s/action/setMappedSdcLimits" %
{'server_ip': self.server_ip, 'server_port': self.server_port,
'volume_id': self.volume_id}
)
LOG.info("Set client limit request: %(request)s",
{'request': request})
r = requests.post(
request,
data=json.dumps(params),
headers=headers,
auth=(self.server_username, self.server_token),
verify=False
)
r = self._check_response(r, request, False, params)
if r.status_code != self.OK_STATUS_CODE:
response = r.json()
LOG.info("Set client limit response: %(response)s",
{'response': response})
msg = (
_("Error setting client limits for volume "
"%(volume_name)s: %(err)s") %
{'volume_name': self.volume_name,
'err': response['message']}
)
LOG.error(msg)
return device_info
@utils.trace
@lockutils.synchronized('scaleio', 'scaleio-')
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Disconnect the ScaleIO volume.
:param connection_properties: The dictionary that describes all
of the target volume attributes.
:type connection_properties: dict
:param device_info: historical difference, but same as connection_props
:type device_info: dict
"""
self.get_config(connection_properties)
self.volume_id = self.volume_id or self._get_volume_id()
LOG.info(
"ScaleIO disconnect volume in ScaleIO brick volume driver."
)
LOG.debug(
_("ScaleIO Volume name: %(volume_name)s, SDC IP: %(sdc_ip)s, "
"REST Server IP: %(server_ip)s"),
{'volume_name': self.volume_name, 'sdc_ip': self.local_sdc_ip,
'server_ip': self.server_ip}
)
LOG.info("ScaleIO sdc query guid command: %(cmd)s",
{'cmd': self.GET_GUID_CMD})
try:
(out, err) = self._execute(*self.GET_GUID_CMD, run_as_root=True,
root_helper=self._root_helper)
LOG.info(
"Unmap volume %(cmd)s: stdout=%(out)s stderr=%(err)s",
{'cmd': self.GET_GUID_CMD, 'out': out, 'err': err}
)
except putils.ProcessExecutionError as e:
msg = _("Error querying sdc guid: %(err)s") % {'err': e.stderr}
LOG.error(msg)
raise exception.BrickException(message=msg)
guid = out
LOG.info("Current sdc guid: %(guid)s", {'guid': guid})
params = {'guid': guid}
headers = {'content-type': 'application/json'}
request = (
"https://%(server_ip)s:%(server_port)s/api/instances/"
"Volume::%(volume_id)s/action/removeMappedSdc" %
{'server_ip': self.server_ip, 'server_port': self.server_port,
'volume_id': self.volume_id}
)
LOG.info("Unmap volume request: %(request)s",
{'request': request})
r = requests.post(
request,
data=json.dumps(params),
headers=headers,
auth=(self.server_username, self.server_token),
verify=False
)
r = self._check_response(r, request, False, params)
if r.status_code != self.OK_STATUS_CODE:
response = r.json()
error_code = response['errorCode']
if error_code == self.VOLUME_NOT_MAPPED_ERROR:
LOG.warning(
"Ignoring error unmapping volume %(volume_id)s: "
"volume not mapped.", {'volume_id': self.volume_name}
)
else:
msg = (_("Error unmapping volume %(volume_id)s: %(err)s") %
{'volume_id': self.volume_name,
'err': response['message']})
LOG.error(msg)
raise exception.BrickException(message=msg)
def extend_volume(self, connection_properties):
# TODO(walter-boring): is this possible?
raise NotImplementedError