Datera 2.4.0 driver update

Updating the Datera Cinder driver to version 2.4.0 and implemented
    the following features:

    - Scalability bugfixes
    - Volume Placement, ACL multi-attach bugfix
    - Fast retype support

Change-Id: I7a742a8d7590f6a29e4cb24c57615ea94c79ee83
This commit is contained in:
Matt Smith 2017-05-18 15:58:10 -05:00
parent 429db9d9f3
commit 1e23faf82a
6 changed files with 558 additions and 352 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2015 Datera
# Copyright 2017 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -20,12 +20,14 @@ from cinder import context
from cinder import exception
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers.datera import datera_common as datc
from cinder.volume.drivers.datera import datera_iscsi as datera
from cinder.volume import volume_types
datera.datc.DEFAULT_SI_SLEEP = 0
datera.datc.DEFAULT_SNAP_SLEEP = 0
datc.DEFAULT_SI_SLEEP = 0
datc.DEFAULT_SI_SLEEP_API_2 = 0
datc.DEFAULT_SNAP_SLEEP = 0
URL_TEMPLATES = datera.datc.URL_TEMPLATES
OS_PREFIX = datera.datc.OS_PREFIX
UNMANAGE_PREFIX = datera.datc.UNMANAGE_PREFIX
@ -51,6 +53,8 @@ class DateraVolumeTestCasev2(test.TestCase):
self.cfg.datera_tenant_id = 'test-tenant'
self.cfg.driver_client_cert = None
self.cfg.driver_client_cert_key = None
self.cfg.datera_disable_profiler = False
self.cfg.driver_use_ssl = False
mock_exec = mock.Mock()
mock_exec.return_value = ('', '')
@ -60,18 +64,16 @@ class DateraVolumeTestCasev2(test.TestCase):
self.driver.set_initialized()
self.driver.configuration.get = _config_getter
self.volume = _stub_volume()
self.api_patcher = mock.patch('cinder.volume.drivers.datera.'
'datera_iscsi.DateraDriver.'
'_issue_api_request')
self.driver._request = mock.Mock()
m = mock.Mock()
m.json.return_value = {'api_versions': ['v2']}
self.driver._request.return_value = m
self.mock_api = self.api_patcher.start()
self.mock_api = mock.Mock()
self.driver._issue_api_request = self.mock_api
self._apiv = "2"
self._tenant = None
self.addCleanup(self.api_patcher.stop)
# self.addCleanup(self.api_patcher.stop)
def test_volume_create_success(self):
self.mock_api.return_value = stub_single_ai
@ -165,6 +167,31 @@ class DateraVolumeTestCasev2(test.TestCase):
source_volume)
def test_delete_volume_success(self):
if self._apiv == '2':
self.mock_api.side_effect = [
{},
self._generate_fake_api_request()(
"acl_policy", api_version=self._apiv, tenant=self._tenant),
self._generate_fake_api_request()(
"ig_group", api_version=self._apiv, tenant=self._tenant),
{},
{},
{},
{},
{}]
else:
self.mock_api.side_effect = [
{},
{},
self._generate_fake_api_request()(
"acl_policy", api_version=self._apiv, tenant=self._tenant),
self._generate_fake_api_request()(
"ig_group", api_version=self._apiv, tenant=self._tenant),
{},
{},
{},
{},
{}]
self.assertIsNone(self.driver.delete_volume(self.volume))
def test_delete_volume_not_found(self):
@ -313,6 +340,10 @@ class DateraVolumeTestCasev2(test.TestCase):
self.driver.create_snapshot, snapshot)
def test_delete_snapshot_success(self):
if self._apiv == '2':
self.mock_api.return_value = stub_return_snapshots
else:
self.mock_api.return_value = stub_return_snapshots_21
snapshot = _stub_snapshot(volume_id=self.volume['id'])
self.assertIsNone(self.driver.delete_snapshot(snapshot))
@ -386,6 +417,17 @@ class DateraVolumeTestCasev2(test.TestCase):
def test_extend_volume_success(self):
volume = _stub_volume(size=1)
self.mock_api.side_effect = [
stub_get_export,
{'data': stub_get_export},
self._generate_fake_api_request()(
"acl_policy", api_version=self._apiv, tenant=self._tenant),
self._generate_fake_api_request()(
"ig_group", api_version=self._apiv, tenant=self._tenant),
self._generate_fake_api_request()(
"acl_policy", api_version=self._apiv, tenant=self._tenant),
{}, {}, {}, {}, {}, {}, stub_get_export,
{'data': stub_get_export}]
self.assertIsNone(self.driver.extend_volume(volume, 2))
def test_extend_volume_fails(self):

View File

@ -1,4 +1,4 @@
# Copyright 2016 Datera
# Copyright 2017 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -688,7 +688,7 @@ class DateraApi(object):
def _si_poll(self, volume, policies):
# Initial 4 second sleep required for some Datera versions
eventlet.sleep(datc.DEFAULT_SI_SLEEP)
eventlet.sleep(datc.DEFAULT_SI_SLEEP_API_2)
TIMEOUT = 10
retry = 0
check_url = datc.URL_TEMPLATES['si_inst'](

View File

@ -1,4 +1,4 @@
# Copyright 2016 Datera
# Copyright 2017 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,11 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import random
import re
import uuid
import eventlet
import ipaddress
import six
from oslo_log import log as logging
from oslo_utils import excutils
@ -45,6 +47,7 @@ class DateraApi(object):
storage_name = policies['default_storage_name']
volume_name = policies['default_volume_name']
template = policies['template']
placement = policies['placement_mode']
if template:
app_params = (
@ -70,6 +73,7 @@ class DateraApi(object):
{
'name': volume_name,
'size': volume['size'],
'placement_mode': placement,
'replica_count': num_replicas,
'snapshot_policies': [
]
@ -86,15 +90,6 @@ class DateraApi(object):
tenant=tenant)
self._update_qos_2_1(volume, policies, tenant)
metadata = {}
volume_type = self._get_volume_type_obj(volume)
if volume_type:
metadata.update({datc.M_TYPE: volume_type['name']})
metadata.update(self.HEADER_DATA)
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
self._store_metadata(url, metadata, "create_volume_2_1", tenant)
# =================
# = Extend Volume =
# =================
@ -158,23 +153,13 @@ class DateraApi(object):
if volume['size'] > src_vref['size']:
self._extend_volume_2_1(volume, volume['size'])
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
volume_type = self._get_volume_type_obj(volume)
if volume_type:
vtype = volume_type['name']
else:
vtype = None
metadata = {datc.M_TYPE: vtype,
datc.M_CLONE: datc._get_name(src_vref['id'])}
self._store_metadata(url, metadata, "create_cloned_volume_2_1", tenant)
# =================
# = Delete Volume =
# =================
def _delete_volume_2_1(self, volume):
self.detach_volume(None, volume)
self._detach_volume_2_1(None, volume)
tenant = self._create_tenant(volume)
app_inst = datc._get_name(volume['id'])
try:
@ -192,7 +177,7 @@ class DateraApi(object):
# = Ensure Export =
# =================
def _ensure_export_2_1(self, context, volume, connector):
def _ensure_export_2_1(self, context, volume, connector=None):
self.create_export(context, volume, connector)
# =========================
@ -214,7 +199,12 @@ class DateraApi(object):
storage_instances = app_inst["storage_instances"]
si = storage_instances[0]
portal = si['access']['ips'][0] + ':3260'
# randomize portal chosen
choice = 0
policies = self._get_policies_for_resource(volume)
if policies["round_robin"]:
choice = random.randint(0, 1)
portal = si['access']['ips'][choice] + ':3260'
iqn = si['access']['iqn']
if multipath:
portals = [p + ':3260' for p in si['access']['ips']]
@ -244,9 +234,6 @@ class DateraApi(object):
'volume_id': volume['id'],
'discard': False}}
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
self._store_metadata(url, {}, "initialize_connection_2_1", tenant)
return result
# =================
@ -255,6 +242,37 @@ class DateraApi(object):
def _create_export_2_1(self, context, volume, connector):
tenant = self._create_tenant(volume)
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
data = {
'admin_state': 'offline',
'force': True
}
self._issue_api_request(
url, method='put', body=data, api_version='2.1', tenant=tenant)
policies = self._get_policies_for_resource(volume)
store_name, _ = self._scrape_template(policies)
if connector and connector.get('ip'):
# Case where volume_type has non default IP Pool info
if policies['ip_pool'] != 'default':
initiator_ip_pool_path = self._issue_api_request(
"access_network_ip_pools/{}".format(
policies['ip_pool']),
api_version='2.1',
tenant=tenant)['path']
# Fallback to trying reasonable IP based guess
else:
initiator_ip_pool_path = self._get_ip_pool_for_string_ip_2_1(
connector['ip'])
ip_pool_url = datc.URL_TEMPLATES['si_inst'](
store_name).format(datc._get_name(volume['id']))
ip_pool_data = {'ip_pool': {'path': initiator_ip_pool_path}}
self._issue_api_request(ip_pool_url,
method="put",
body=ip_pool_data,
api_version='2.1',
tenant=tenant)
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
data = {
@ -268,29 +286,18 @@ class DateraApi(object):
url, api_version='2.1', tenant=tenant)
# Handle adding initiator to product if necessary
# Then add initiator to ACL
policies = self._get_policies_for_resource(volume)
store_name, _ = self._scrape_template(policies)
if (connector and
connector.get('initiator') and
not policies['acl_allow_all']):
initiator_name = "OpenStack_{}_{}".format(
self.driver_prefix, str(uuid.uuid4())[:4])
initiator_group = datc.INITIATOR_GROUP_PREFIX + volume['id']
initiator_group = datc.INITIATOR_GROUP_PREFIX + str(uuid.uuid4())
found = False
initiator = connector['initiator']
current_initiators = self._issue_api_request(
'initiators', api_version='2.1', tenant=tenant)
for iqn, values in current_initiators.items():
if initiator == iqn:
found = True
break
# If we didn't find a matching initiator, create one
if not found:
data = {'id': initiator, 'name': initiator_name}
# Try and create the initiator
# If we get a conflict, ignore it because race conditions
# If we get a conflict, ignore it
self._issue_api_request("initiators",
method="post",
body=data,
@ -330,37 +337,8 @@ class DateraApi(object):
body=data,
api_version='2.1',
tenant=tenant)
if connector and connector.get('ip'):
# Case where volume_type has non default IP Pool info
if policies['ip_pool'] != 'default':
initiator_ip_pool_path = self._issue_api_request(
"access_network_ip_pools/{}".format(
policies['ip_pool']),
api_version='2.1',
tenant=tenant)['path']
# Fallback to trying reasonable IP based guess
else:
initiator_ip_pool_path = self._get_ip_pool_for_string_ip(
connector['ip'])
ip_pool_url = datc.URL_TEMPLATES['si_inst'](
store_name).format(datc._get_name(volume['id']))
ip_pool_data = {'ip_pool': {'path': initiator_ip_pool_path}}
self._issue_api_request(ip_pool_url,
method="put",
body=ip_pool_data,
api_version='2.1',
tenant=tenant)
# Check to ensure we're ready for go-time
self._si_poll_2_1(volume, policies, tenant)
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
metadata = {}
# TODO(_alastor_): Figure out what we want to post with a create_export
# call
self._store_metadata(url, metadata, "create_export_2_1", tenant)
# =================
# = Detach Volume =
@ -384,15 +362,19 @@ class DateraApi(object):
# TODO(_alastor_): Make acl cleaning multi-attach aware
self._clean_acl_2_1(volume, tenant)
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
metadata = {}
try:
self._store_metadata(url, metadata, "detach_volume_2_1", tenant)
except exception.NotFound:
# If the object isn't found, we probably are deleting/detaching
# an already deleted object
pass
def _check_for_acl_2_1(self, initiator_path):
"""Returns True if an acl is found for initiator_path """
# TODO(_alastor_) when we get a /initiators/:initiator/acl_policies
# endpoint use that instead of this monstrosity
initiator_groups = self._issue_api_request("initiator_groups",
api_version='2.1')
for ig, igdata in initiator_groups.items():
if initiator_path in igdata['members']:
LOG.debug("Found initiator_group: %s for initiator: %s",
ig, initiator_path)
return True
LOG.debug("No initiator_group found for initiator: %s", initiator_path)
return False
def _clean_acl_2_1(self, volume, tenant):
policies = self._get_policies_for_resource(volume)
@ -405,9 +387,12 @@ class DateraApi(object):
initiator_group = self._issue_api_request(
acl_url, api_version='2.1', tenant=tenant)['data'][
'initiator_groups'][0]['path']
initiator_iqn_path = self._issue_api_request(
initiator_group.lstrip("/"), api_version='2.1', tenant=tenant)[
"data"]["members"][0]["path"]
# TODO(_alastor_): Re-enable this when we get a force-delete
# option on the /initiators endpoint
# initiator_iqn_path = self._issue_api_request(
# initiator_group.lstrip("/"), api_version='2.1',
# tenant=tenant)[
# "data"]["members"][0]["path"]
# Clear out ACL and delete initiator group
self._issue_api_request(acl_url,
method="put",
@ -418,11 +403,13 @@ class DateraApi(object):
method="delete",
api_version='2.1',
tenant=tenant)
if not self._check_for_acl_2(initiator_iqn_path):
self._issue_api_request(initiator_iqn_path.lstrip("/"),
method="delete",
api_version='2.1',
tenant=tenant)
# TODO(_alastor_): Re-enable this when we get a force-delete
# option on the /initiators endpoint
# if not self._check_for_acl_2_1(initiator_iqn_path):
# self._issue_api_request(initiator_iqn_path.lstrip("/"),
# method="delete",
# api_version='2.1',
# tenant=tenant)
except (IndexError, exception.NotFound):
LOG.debug("Did not find any initiator groups for volume: %s",
volume)
@ -462,10 +449,19 @@ class DateraApi(object):
snap_temp = datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name) + '/snapshots'
snapu = snap_temp.format(datc._get_name(snapshot['volume_id']))
snapshots = self._issue_api_request(snapu,
method='get',
api_version='2.1',
tenant=tenant)
snapshots = []
try:
snapshots = self._issue_api_request(snapu,
method='get',
api_version='2.1',
tenant=tenant)
except exception.NotFound:
msg = ("Tried to delete snapshot %s, but parent volume %s was "
"not found in Datera cluster. Continuing with delete.")
LOG.info(msg,
datc._get_name(snapshot['id']),
datc._get_name(snapshot['volume_id']))
return
try:
for snap in snapshots['data']:
@ -531,6 +527,50 @@ class DateraApi(object):
if (volume['size'] > snapshot['volume_size']):
self._extend_volume_2_1(volume, volume['size'])
# ==========
# = Retype =
# ==========
def _retype_2_1(self, ctxt, volume, new_type, diff, host):
LOG.debug("Retype called\n"
"Volume: %(volume)s\n"
"NewType: %(new_type)s\n"
"Diff: %(diff)s\n"
"Host: %(host)s\n", {'volume': volume, 'new_type': new_type,
'diff': diff, 'host': host})
# We'll take the fast route only if the types share the same backend
# And that backend matches this driver
old_pol = self._get_policies_for_resource(volume)
new_pol = self._get_policies_for_volume_type(new_type)
if (host['capabilities']['vendor_name'].lower() ==
self.backend_name.lower()):
LOG.debug("Starting fast volume retype")
if old_pol.get('template') or new_pol.get('template'):
LOG.warning(
"Fast retyping between template-backed volume-types "
"unsupported. Type1: %s, Type2: %s",
volume['volume_type_id'], new_type)
tenant = self._create_tenant(volume)
self._update_qos_2_1(volume, new_pol, tenant)
vol_params = (
{
'placement_mode': new_pol['placement_mode'],
'replica_count': new_pol['replica_count'],
})
url = datc.URL_TEMPLATES['vol_inst'](
old_pol['default_storage_name'],
old_pol['default_volume_name']).format(
datc._get_name(volume['id']))
self._issue_api_request(url, method='put', body=vol_params,
api_version='2.1', tenant=tenant)
return True
else:
LOG.debug("Couldn't fast-retype volume between specified types")
return False
# ==========
# = Manage =
# ==========
@ -723,32 +763,6 @@ class DateraApi(object):
api_version='2.1')
return tenant
# ============
# = Metadata =
# ============
def _get_metadata(self, obj_url, tenant):
url = "/".join((obj_url.rstrip("/"), "metadata"))
mdata = self._issue_api_request(
url, api_version="2.1", tenant=tenant).get("data")
# Make sure we only grab the relevant keys
filter_mdata = {k: json.loads(mdata[k])
for k in mdata if k in datc.M_KEYS}
return filter_mdata
def _store_metadata(self, obj_url, data, calling_func_name, tenant):
mdata = self._get_metadata(obj_url, tenant)
new_call_entry = (calling_func_name, self.HEADER_DATA['Datera-Driver'])
if mdata.get(datc.M_CALL):
mdata[datc.M_CALL].append(new_call_entry)
else:
mdata[datc.M_CALL] = [new_call_entry]
mdata.update(data)
mdata.update(self.HEADER_DATA)
data_s = {k: json.dumps(v) for k, v in data.items()}
url = "/".join((obj_url.rstrip("/"), "metadata"))
return self._issue_api_request(url, method="put", api_version="2.1",
body=data_s, tenant=tenant)
# =========
# = Login =
# =========
@ -783,7 +797,7 @@ class DateraApi(object):
def _snap_poll_2_1(self, url, tenant):
eventlet.sleep(datc.DEFAULT_SNAP_SLEEP)
TIMEOUT = 10
TIMEOUT = 20
retry = 0
poll = True
while poll and not retry >= TIMEOUT:
@ -837,10 +851,8 @@ class DateraApi(object):
LOG.error(
'Failed to get updated stats from Datera Cluster.')
backend_name = self.configuration.safe_get(
'volume_backend_name')
stats = {
'volume_backend_name': backend_name or 'Datera',
'volume_backend_name': self.backend_name,
'vendor_name': 'Datera',
'driver_version': self.VERSION,
'storage_protocol': 'iSCSI',
@ -875,5 +887,29 @@ class DateraApi(object):
# Filter all 0 values from being passed
fpolicies = dict(filter(lambda _v: _v[1] > 0, fpolicies.items()))
if fpolicies:
self._issue_api_request(url, 'delete', api_version='2.1',
tenant=tenant)
self._issue_api_request(url, 'post', body=fpolicies,
api_version='2.1', tenant=tenant)
# ============
# = IP Pools =
# ============
def _get_ip_pool_for_string_ip_2_1(self, ip):
"""Takes a string ipaddress and return the ip_pool API object dict """
pool = 'default'
ip_obj = ipaddress.ip_address(six.text_type(ip))
ip_pools = self._issue_api_request('access_network_ip_pools',
api_version='2.1')
for ipdata in ip_pools['data']:
for adata in ipdata['network_paths']:
if not adata.get('start_ip'):
continue
pool_if = ipaddress.ip_interface(
"/".join((adata['start_ip'], str(adata['netmask']))))
if ip_obj in pool_if.network:
pool = ipdata['name']
return self._issue_api_request(
"access_network_ip_pools/{}".format(pool),
api_version='2.1')['path']

View File

@ -1,4 +1,4 @@
# Copyright 2016 Datera
# Copyright 2017 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,14 +14,24 @@
# under the License.
import functools
import json
import re
import six
import time
import types
import uuid
import eventlet
import requests
from oslo_log import log as logging
from six.moves import http_client
from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder.volume import qos_specs
from cinder.volume import volume_types
LOG = logging.getLogger(__name__)
@ -52,8 +62,9 @@ URL_TEMPLATES = {
'{}', volume_name)),
'at': lambda: 'app_templates/{}'}
DEFAULT_SI_SLEEP = 10
DEFAULT_SNAP_SLEEP = 5
DEFAULT_SI_SLEEP = 1
DEFAULT_SI_SLEEP_API_2 = 5
DEFAULT_SNAP_SLEEP = 1
INITIATOR_GROUP_PREFIX = "IG-"
API_VERSIONS = ["2", "2.1"]
API_TIMEOUT = 20
@ -146,7 +157,8 @@ def _api_lookup(func):
msg = _("No compatible API version found for this product: "
"api_versions -> %(api_version)s, %(func)s")
LOG.error(msg, api_version=api_version, func=func)
raise exception.DateraAPIException(msg % (api_version, func))
raise exception.DateraAPIException(msg % {
'api_version': api_version, 'func': func})
# Py27
try:
name = "_" + "_".join(
@ -156,8 +168,19 @@ def _api_lookup(func):
name = "_" + "_".join(
(func.__name__, api_version.replace(".", "_")))
try:
LOG.info("Trying method: %s", name)
return getattr(obj, name)(*args[1:], **kwargs)
if obj.do_profile:
LOG.info("Trying method: %s", name)
call_id = uuid.uuid4()
LOG.debug("Profiling method: %s, id %s", name, call_id)
t1 = time.time()
obj.thread_local.trace_id = call_id
result = getattr(obj, name)(*args[1:], **kwargs)
if obj.do_profile:
t2 = time.time()
timedelta = round(t2 - t1, 3)
LOG.debug("Profile for method %s, id %s: %ss",
name, call_id, timedelta)
return result
except AttributeError as e:
# If we find the attribute name in the error message
# then we continue otherwise, raise to prevent masking
@ -180,6 +203,7 @@ def _get_supported_api_versions(driver):
t = time.time()
if driver.api_cache and driver.api_timeout - t < API_TIMEOUT:
return driver.api_cache
driver.api_timeout = t + API_TIMEOUT
results = []
host = driver.configuration.san_ip
port = driver.configuration.datera_api_port
@ -209,3 +233,259 @@ def _get_supported_api_versions(driver):
LOG.error("No supported API versions available, "
"Please upgrade your Datera EDF software")
return results
def _get_volume_type_obj(driver, resource):
type_id = resource.get('volume_type_id', None)
# Handle case of volume with no type. We still want the
# specified defaults from above
if type_id:
ctxt = context.get_admin_context()
volume_type = volume_types.get_volume_type(ctxt, type_id)
else:
volume_type = None
return volume_type
def _get_policies_for_resource(driver, resource):
"""Get extra_specs and qos_specs of a volume_type.
This fetches the scoped keys from the volume type. Anything set from
qos_specs will override key/values set from extra_specs.
"""
volume_type = driver._get_volume_type_obj(resource)
# Handle case of volume with no type. We still want the
# specified defaults from above
if volume_type:
specs = volume_type.get('extra_specs')
else:
specs = {}
# Set defaults:
policies = {k.lstrip('DF:'): str(v['default']) for (k, v)
in driver._init_vendor_properties()[0].items()}
if volume_type:
# Populate updated value
for key, value in specs.items():
if ':' in key:
fields = key.split(':')
key = fields[1]
policies[key] = value
qos_specs_id = volume_type.get('qos_specs_id')
if qos_specs_id is not None:
ctxt = context.get_admin_context()
qos_kvs = qos_specs.get_qos_specs(ctxt, qos_specs_id)['specs']
if qos_kvs:
policies.update(qos_kvs)
# Cast everything except booleans int that can be cast
for k, v in policies.items():
# Handle String Boolean case
if v == 'True' or v == 'False':
policies[k] = policies[k] == 'True'
continue
# Int cast
try:
policies[k] = int(v)
except ValueError:
pass
return policies
# ================
# = API Requests =
# ================
def _request(driver, connection_string, method, payload, header, cert_data):
LOG.debug("Endpoint for Datera API call: %s", connection_string)
LOG.debug("Payload for Datera API call: %s", payload)
try:
response = getattr(requests, method)(connection_string,
data=payload, headers=header,
verify=False, cert=cert_data)
return response
except requests.exceptions.RequestException as ex:
msg = _(
'Failed to make a request to Datera cluster endpoint due '
'to the following reason: %s') % six.text_type(
ex.message)
LOG.error(msg)
raise exception.DateraAPIException(msg)
def _raise_response(driver, response):
msg = _('Request to Datera cluster returned bad status:'
' %(status)s | %(reason)s') % {
'status': response.status_code,
'reason': response.reason}
LOG.error(msg)
raise exception.DateraAPIException(msg)
def _handle_bad_status(driver,
response,
connection_string,
method,
payload,
header,
cert_data,
sensitive=False,
conflict_ok=False):
if (response.status_code == http_client.BAD_REQUEST and
connection_string.endswith("api_versions")):
# Raise the exception, but don't log any error. We'll just fall
# back to the old style of determining API version. We make this
# request a lot, so logging it is just noise
raise exception.DateraAPIException
if response.status_code == http_client.NOT_FOUND:
raise exception.NotFound(response.json()['message'])
elif response.status_code in [http_client.FORBIDDEN,
http_client.UNAUTHORIZED]:
raise exception.NotAuthorized()
elif response.status_code == http_client.CONFLICT and conflict_ok:
# Don't raise, because we're expecting a conflict
pass
elif response.status_code == http_client.SERVICE_UNAVAILABLE:
current_retry = 0
while current_retry <= driver.retry_attempts:
LOG.debug("Datera 503 response, trying request again")
eventlet.sleep(driver.interval)
resp = driver._request(connection_string,
method,
payload,
header,
cert_data)
if resp.ok:
return response.json()
elif resp.status_code != http_client.SERVICE_UNAVAILABLE:
driver._raise_response(resp)
else:
driver._raise_response(response)
@_authenticated
def _issue_api_request(driver, resource_url, method='get', body=None,
sensitive=False, conflict_ok=False,
api_version='2', tenant=None):
"""All API requests to Datera cluster go through this method.
:param resource_url: the url of the resource
:param method: the request verb
:param body: a dict with options for the action_type
:param sensitive: Bool, whether request should be obscured from logs
:param conflict_ok: Bool, True to suppress ConflictError exceptions
during this request
:param api_version: The Datera api version for the request
:param tenant: The tenant header value for the request (only applicable
to 2.1 product versions and later)
:returns: a dict of the response from the Datera cluster
"""
host = driver.configuration.san_ip
port = driver.configuration.datera_api_port
api_token = driver.datera_api_token
payload = json.dumps(body, ensure_ascii=False)
payload.encode('utf-8')
header = {'Content-Type': 'application/json; charset=utf-8'}
header.update(driver.HEADER_DATA)
protocol = 'http'
if driver.configuration.driver_use_ssl:
protocol = 'https'
if api_token:
header['Auth-Token'] = api_token
if tenant == "all":
header['tenant'] = tenant
elif tenant and '/root' not in tenant:
header['tenant'] = "".join(("/root/", tenant))
elif tenant and '/root' in tenant:
header['tenant'] = tenant
elif driver.tenant_id and driver.tenant_id.lower() != "map":
header['tenant'] = driver.tenant_id
client_cert = driver.configuration.driver_client_cert
client_cert_key = driver.configuration.driver_client_cert_key
cert_data = None
if client_cert:
protocol = 'https'
cert_data = (client_cert, client_cert_key)
connection_string = '%s://%s:%s/v%s/%s' % (protocol, host, port,
api_version, resource_url)
request_id = uuid.uuid4()
if driver.do_profile:
t1 = time.time()
if not sensitive:
LOG.debug("\nDatera Trace ID: %(tid)s\n"
"Datera Request ID: %(rid)s\n"
"Datera Request URL: /v%(api)s/%(url)s\n"
"Datera Request Method: %(method)s\n"
"Datera Request Payload: %(payload)s\n"
"Datera Request Headers: %(header)s\n",
{'tid': driver.thread_local.trace_id,
'rid': request_id,
'api': api_version,
'url': resource_url,
'method': method,
'payload': payload,
'header': header})
response = driver._request(connection_string,
method,
payload,
header,
cert_data)
data = response.json()
timedelta = "Profiling disabled"
if driver.do_profile:
t2 = time.time()
timedelta = round(t2 - t1, 3)
if not sensitive:
LOG.debug("\nDatera Trace ID: %(tid)s\n"
"Datera Response ID: %(rid)s\n"
"Datera Response TimeDelta: %(delta)ss\n"
"Datera Response URL: %(url)s\n"
"Datera Response Payload: %(payload)s\n"
"Datera Response Object: %(obj)s\n",
{'tid': driver.thread_local.trace_id,
'rid': request_id,
'delta': timedelta,
'url': response.url,
'payload': payload,
'obj': vars(response)})
if not response.ok:
driver._handle_bad_status(response,
connection_string,
method,
payload,
header,
cert_data,
conflict_ok=conflict_ok)
return data
def register_driver(driver):
for func in [_get_supported_api_versions,
_get_volume_type_obj,
_get_policies_for_resource,
_request,
_raise_response,
_handle_bad_status,
_issue_api_request]:
# PY27
f = types.MethodType(func, driver)
try:
setattr(driver, func.func_name, f)
# PY3+
except AttributeError:
setattr(driver, func.__name__, f)

View File

@ -1,4 +1,4 @@
# Copyright 2016 Datera
# Copyright 2017 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -13,25 +13,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import json
import time
import uuid
import eventlet
from eventlet.green import threading
from oslo_config import cfg
from oslo_log import log as logging
import requests
import six
from six.moves import http_client
from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder import interface
from cinder import utils
from cinder.volume.drivers.san import san
from cinder.volume import qos_specs
from cinder.volume import volume_types
import cinder.volume.drivers.datera.datera_api2 as api2
import cinder.volume.drivers.datera.datera_api21 as api21
@ -68,7 +61,10 @@ d_opts = [
"If set to 'None' --> Datera tenant ID will not be used "
"during volume provisioning\n"
"If set to anything else --> Datera tenant ID will be the "
"provided value")
"provided value"),
cfg.BoolOpt('datera_disable_profiler',
default=False,
help="Set to True to disable profiling in the Datera driver"),
]
@ -77,7 +73,6 @@ CONF.import_opt('driver_use_ssl', 'cinder.volume.driver')
CONF.register_opts(d_opts)
@interface.volumedriver
@six.add_metaclass(utils.TraceWrapperWithABCMetaclass)
class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
@ -93,8 +88,11 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
Volume Manage/Unmanage support
2.3 - Templates, Tenants, Snapshot Polling,
2.1 Api Version Support, Restructure
2.3.1 - Scalability bugfixes
2.3.2 - Volume Placement, ACL multi-attach bugfix
2.4.0 - Fast Retype Support
"""
VERSION = '2.3'
VERSION = '2.4.0'
CI_WIKI_NAME = "datera-ci"
@ -121,6 +119,15 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
self.tenant_id = None
self.api_check = time.time()
self.api_cache = []
self.api_timeout = 0
self.do_profile = not self.configuration.datera_disable_profiler
self.thread_local = threading.local()
backend_name = self.configuration.safe_get(
'volume_backend_name')
self.backend_name = backend_name or 'Datera'
datc.register_driver(self)
def do_setup(self, context):
# If we can't authenticate through the old and new method, just fail
@ -177,7 +184,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# =================
@datc._api_lookup
def ensure_export(self, context, volume, connector):
def ensure_export(self, context, volume, connector=None):
"""Gets the associated account, retrieves CHAP info and updates."""
# =========================
@ -228,6 +235,25 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
def create_volume_from_snapshot(self, volume, snapshot):
pass
# ==========
# = Retype =
# ==========
@datc._api_lookup
def retype(self, ctxt, volume, new_type, diff, host):
"""Convert the volume to be of the new type.
Returns a boolean indicating whether the retype occurred.
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
:param new_type: A dictionary describing the volume type to convert to
:param diff: A dictionary with the difference between the two types
:param host: A dictionary describing the host to migrate to, where
host['host'] is its name, and host['capabilities'] is a
dictionary of its reported capabilities (Not Used).
"""
pass
# ==========
# = Manage =
# ==========
@ -418,6 +444,24 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
properties = {}
self._set_property(
properties,
"DF:placement_mode",
"Datera Volume Placement",
_("'single_flash' for single-flash-replica placement, "
"'all_flash' for all-flash-replica placement, "
"'hybrid' for hybrid placement"),
"string",
default="hybrid")
self._set_property(
properties,
"DF:round_robin",
"Datera Round Robin Portals",
_("True to round robin the provided portals for a target"),
"boolean",
default=False)
if self.configuration.get('datera_debug_replica_count_override'):
replica_count = 1
else:
@ -536,206 +580,3 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# ###### End QoS Settings ###### #
return properties, 'DF'
def _get_volume_type_obj(self, resource):
type_id = resource.get('volume_type_id', None)
# Handle case of volume with no type. We still want the
# specified defaults from above
if type_id:
ctxt = context.get_admin_context()
volume_type = volume_types.get_volume_type(ctxt, type_id)
else:
volume_type = None
return volume_type
def _get_policies_for_resource(self, resource):
"""Get extra_specs and qos_specs of a volume_type.
This fetches the scoped keys from the volume type. Anything set from
qos_specs will override key/values set from extra_specs.
"""
volume_type = self._get_volume_type_obj(resource)
# Handle case of volume with no type. We still want the
# specified defaults from above
if volume_type:
specs = volume_type.get('extra_specs')
else:
specs = {}
# Set defaults:
policies = {k.lstrip('DF:'): str(v['default']) for (k, v)
in self._init_vendor_properties()[0].items()}
if volume_type:
# Populate updated value
for key, value in specs.items():
if ':' in key:
fields = key.split(':')
key = fields[1]
policies[key] = value
qos_specs_id = volume_type.get('qos_specs_id')
if qos_specs_id is not None:
ctxt = context.get_admin_context()
qos_kvs = qos_specs.get_qos_specs(ctxt, qos_specs_id)['specs']
if qos_kvs:
policies.update(qos_kvs)
# Cast everything except booleans int that can be cast
for k, v in policies.items():
# Handle String Boolean case
if v == 'True' or v == 'False':
policies[k] = policies[k] == 'True'
continue
# Int cast
try:
policies[k] = int(v)
except ValueError:
pass
return policies
# ================
# = API Requests =
# ================
def _request(self, connection_string, method, payload, header, cert_data):
LOG.debug("Endpoint for Datera API call: %s", connection_string)
try:
response = getattr(requests, method)(connection_string,
data=payload, headers=header,
verify=False, cert=cert_data)
return response
except requests.exceptions.RequestException as ex:
msg = _(
'Failed to make a request to Datera cluster endpoint due '
'to the following reason: %s') % six.text_type(
ex.message)
LOG.error(msg)
raise exception.DateraAPIException(msg)
def _raise_response(self, response):
msg = _('Request to Datera cluster returned bad status:'
' %(status)s | %(reason)s') % {
'status': response.status_code,
'reason': response.reason}
LOG.error(msg)
raise exception.DateraAPIException(msg)
def _handle_bad_status(self,
response,
connection_string,
method,
payload,
header,
cert_data,
sensitive=False,
conflict_ok=False):
if (response.status_code == http_client.BAD_REQUEST and
connection_string.endswith("api_versions")):
# Raise the exception, but don't log any error. We'll just fall
# back to the old style of determining API version. We make this
# request a lot, so logging it is just noise
raise exception.DateraAPIException
if not sensitive:
LOG.debug(("Datera Response URL: %s\n"
"Datera Response Payload: %s\n"
"Response Object: %s\n"),
response.url,
payload,
vars(response))
if response.status_code == http_client.NOT_FOUND:
raise exception.NotFound(response.json()['message'])
elif response.status_code in [http_client.FORBIDDEN,
http_client.UNAUTHORIZED]:
raise exception.NotAuthorized()
elif response.status_code == http_client.CONFLICT and conflict_ok:
# Don't raise, because we're expecting a conflict
pass
elif response.status_code == http_client.SERVICE_UNAVAILABLE:
current_retry = 0
while current_retry <= self.retry_attempts:
LOG.debug("Datera 503 response, trying request again")
eventlet.sleep(self.interval)
resp = self._request(connection_string,
method,
payload,
header,
cert_data)
if resp.ok:
return response.json()
elif resp.status_code != http_client.SERVICE_UNAVAILABLE:
self._raise_response(resp)
else:
self._raise_response(response)
@datc._authenticated
def _issue_api_request(self, resource_url, method='get', body=None,
sensitive=False, conflict_ok=False,
api_version='2', tenant=None):
"""All API requests to Datera cluster go through this method.
:param resource_url: the url of the resource
:param method: the request verb
:param body: a dict with options for the action_type
:param sensitive: Bool, whether request should be obscured from logs
:param conflict_ok: Bool, True to suppress ConflictError exceptions
during this request
:param api_version: The Datera api version for the request
:param tenant: The tenant header value for the request (only applicable
to 2.1 product versions and later)
:returns: a dict of the response from the Datera cluster
"""
host = self.configuration.san_ip
port = self.configuration.datera_api_port
api_token = self.datera_api_token
payload = json.dumps(body, ensure_ascii=False)
payload.encode('utf-8')
header = {'Content-Type': 'application/json; charset=utf-8'}
header.update(self.HEADER_DATA)
protocol = 'http'
if self.configuration.driver_use_ssl:
protocol = 'https'
if api_token:
header['Auth-Token'] = api_token
if tenant == "all":
header['tenant'] = tenant
elif tenant and '/root' not in tenant:
header['tenant'] = "".join(("/root/", tenant))
elif tenant and '/root' in tenant:
header['tenant'] = tenant
elif self.tenant_id and self.tenant_id.lower() != "map":
header['tenant'] = self.tenant_id
client_cert = self.configuration.driver_client_cert
client_cert_key = self.configuration.driver_client_cert_key
cert_data = None
if client_cert:
protocol = 'https'
cert_data = (client_cert, client_cert_key)
connection_string = '%s://%s:%s/v%s/%s' % (protocol, host, port,
api_version, resource_url)
response = self._request(connection_string,
method,
payload,
header,
cert_data)
data = response.json()
if not response.ok:
self._handle_bad_status(response,
connection_string,
method,
payload,
header,
cert_data,
conflict_ok=conflict_ok)
return data

View File

@ -0,0 +1,7 @@
---
features:
- Added ``datera_disable_profiler`` boolean config option.
- Added Cinder fast-retype support to Datera EDF driver.
- Added Volume Placement extra-specs support to Datera EDF driver.
- Fixed ACL multi-attach bug in Datera EDF driver.
- Fixed a few scalability bugs in the Datera EDF driver.