Update in-tree Datera Cinder driver

New attempt to update Datera Cinder in-tree driver. This review request
builds on https://review.opendev.org/#/c/661359/ with fixed zuul gating
tests.

Full list of changes since last update:

* Added Pypi packaging installation support
* Dropping support for v2 API.  No Datera products exist in production
  with this API version.
* Added v2.2 API support
* Rewrite of the driver to use the Datera Python-SDK instead of
  hand-rolled connections.  Usage requires the dfs_sdk python package
* Dropping support for default_storage_name and default_volume_name
  volume-type keys
* Added CHAP support
* Implemented fast-path Glance-->Datera image cloning with clone_image RPC
* Implemented fast-path volume retype
* Rewrote unit tests from scratch
* Added iops_per_gb and bandwidth_per_gb volume-type keys
* Implemented update_migrated_volume
* Increased number of stats reported with get_volume_stats
* API fallback now only occurs during driver initialization.  This
  increases driver performance
* Added config option for customizing volume-type default values
* Implemented template size override
* Implemented LDAP support
* Added support for filter_functions and goodness_functions
* Changed version string to date-based
* Implemented manage_existing_snapshot and related RPCs
* Removed ancient py25 compatibility imports
* Updated Copyright to 2020
* Fixed almost all requests From walter and Sean
* Added comprehensive version history

Change-Id: I56a1a24d60a7bc0dc59bfcfa89da23f43696a31e
This commit is contained in:
Mark Korondi 2020-01-24 13:26:23 +00:00
parent c29285e7dc
commit 8113e9a379
No known key found for this signature in database
GPG Key ID: 8EA189A141E6D1F0
10 changed files with 2736 additions and 2774 deletions

View File

@ -244,6 +244,7 @@ def list_opts():
cinder_volume_driver.scst_opts,
cinder_volume_driver.backup_opts,
cinder_volume_driver.image_opts,
cinder_volume_drivers_datera_dateraiscsi.d_opts,
cinder_volume_drivers_fusionstorage_dsware.volume_opts,
cinder_volume_drivers_infortrend_raidcmd_cli_commoncli.
infortrend_opts,
@ -279,7 +280,6 @@ def list_opts():
cinder_volume_driver.nvmet_opts,
cinder_volume_driver.scst_opts,
cinder_volume_driver.image_opts,
cinder_volume_drivers_datera_dateraiscsi.d_opts,
cinder_volume_drivers_dell_emc_powermax_common.powermax_opts,
cinder_volume_drivers_dell_emc_sc_storagecentercommon.
common_opts,

File diff suppressed because it is too large Load Diff

View File

@ -1,763 +0,0 @@
# Copyright 2017 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
import re
import uuid
import eventlet
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import units
import six
from cinder import exception
from cinder.i18n import _
import cinder.volume.drivers.datera.datera_common as datc
from cinder.volume import volume_utils
LOG = logging.getLogger(__name__)
class DateraApi(object):
# =================
# = Create Volume =
# =================
def _create_volume_2(self, volume):
# Generate App Instance, Storage Instance and Volume
# Volume ID will be used as the App Instance Name
# Storage Instance and Volumes will have standard names
policies = self._get_policies_for_resource(volume)
num_replicas = int(policies['replica_count'])
storage_name = policies['default_storage_name']
volume_name = policies['default_volume_name']
template = policies['template']
if template:
app_params = (
{
'create_mode': "openstack",
# 'uuid': str(volume['id']),
'name': datc._get_name(volume['id']),
'app_template': '/app_templates/{}'.format(template)
})
else:
app_params = (
{
'create_mode': "openstack",
'uuid': str(volume['id']),
'name': datc._get_name(volume['id']),
'access_control_mode': 'deny_all',
'storage_instances': {
storage_name: {
'name': storage_name,
'volumes': {
volume_name: {
'name': volume_name,
'size': volume['size'],
'replica_count': num_replicas,
'snapshot_policies': {
}
}
}
}
}
})
self._issue_api_request(
datc.URL_TEMPLATES['ai'](),
'post',
body=app_params,
api_version='2')
self._update_qos(volume, policies)
# =================
# = Extend Volume =
# =================
def _extend_volume_2(self, volume, new_size):
# Current product limitation:
# If app_instance is bound to template resizing is not possible
# Once policies are implemented in the product this can go away
policies = self._get_policies_for_resource(volume)
template = policies['template']
if template:
LOG.warning("Volume size not extended due to template binding:"
" volume: %(volume)s, template: %(template)s",
volume=volume, template=template)
return
# Offline App Instance, if necessary
reonline = False
app_inst = self._issue_api_request(
datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id'])),
api_version='2')
if app_inst['admin_state'] == 'online':
reonline = True
self._detach_volume_2(None, volume)
# Change Volume Size
app_inst = datc._get_name(volume['id'])
data = {
'size': new_size
}
store_name, vol_name = self._scrape_template(policies)
self._issue_api_request(
datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name).format(app_inst),
method='put',
body=data,
api_version='2')
# Online Volume, if it was online before
if reonline:
self._create_export_2(None, volume, None)
# =================
# = Cloned Volume =
# =================
def _create_cloned_volume_2(self, volume, src_vref):
policies = self._get_policies_for_resource(volume)
store_name, vol_name = self._scrape_template(policies)
src = "/" + datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name).format(datc._get_name(src_vref['id']))
data = {
'create_mode': 'openstack',
'name': datc._get_name(volume['id']),
'uuid': str(volume['id']),
'clone_src': src,
}
self._issue_api_request(
datc.URL_TEMPLATES['ai'](), 'post', body=data, api_version='2')
if volume['size'] > src_vref['size']:
self._extend_volume_2(volume, volume['size'])
# =================
# = Delete Volume =
# =================
def _delete_volume_2(self, volume):
self.detach_volume(None, volume)
app_inst = datc._get_name(volume['id'])
try:
self._issue_api_request(datc.URL_TEMPLATES['ai_inst']().format(
app_inst),
method='delete',
api_version='2')
except exception.NotFound:
LOG.info("Tried to delete volume %s, but it was not found in the "
"Datera cluster. Continuing with delete.",
datc._get_name(volume['id']))
# =================
# = Ensure Export =
# =================
def _ensure_export_2(self, context, volume, connector):
return self._create_export_2(context, volume, connector)
# =========================
# = Initialize Connection =
# =========================
def _initialize_connection_2(self, volume, connector):
# Now online the app_instance (which will online all storage_instances)
multipath = connector.get('multipath', False)
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
data = {
'admin_state': 'online'
}
app_inst = self._issue_api_request(
url, method='put', body=data, api_version='2')
storage_instances = app_inst["storage_instances"]
si_names = list(storage_instances.keys())
portal = storage_instances[si_names[0]]['access']['ips'][0] + ':3260'
iqn = storage_instances[si_names[0]]['access']['iqn']
if multipath:
portals = [p + ':3260' for p in
storage_instances[si_names[0]]['access']['ips']]
iqns = [iqn for _ in
storage_instances[si_names[0]]['access']['ips']]
lunids = [self._get_lunid() for _ in
storage_instances[si_names[0]]['access']['ips']]
return {
'driver_volume_type': 'iscsi',
'data': {
'target_discovered': False,
'target_iqn': iqn,
'target_iqns': iqns,
'target_portal': portal,
'target_portals': portals,
'target_lun': self._get_lunid(),
'target_luns': lunids,
'volume_id': volume['id'],
'discard': False}}
else:
return {
'driver_volume_type': 'iscsi',
'data': {
'target_discovered': False,
'target_iqn': iqn,
'target_portal': portal,
'target_lun': self._get_lunid(),
'volume_id': volume['id'],
'discard': False}}
# =================
# = Create Export =
# =================
def _create_export_2(self, context, volume, connector):
# Online volume in case it hasn't been already
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
data = {
'admin_state': 'online'
}
self._issue_api_request(url, method='put', body=data, api_version='2')
# Check if we've already setup everything for this volume
url = (datc.URL_TEMPLATES['si']().format(datc._get_name(volume['id'])))
storage_instances = self._issue_api_request(url, api_version='2')
# Handle adding initiator to product if necessary
# Then add initiator to ACL
policies = self._get_policies_for_resource(volume)
store_name, _ = self._scrape_template(policies)
if (connector and
connector.get('initiator') and
not policies['acl_allow_all']):
initiator_name = "OpenStack_{}_{}".format(
self.driver_prefix, str(uuid.uuid4())[:4])
initiator_group = datc.INITIATOR_GROUP_PREFIX + volume['id']
found = False
initiator = connector['initiator']
current_initiators = self._issue_api_request(
'initiators', api_version='2')
for iqn, values in current_initiators.items():
if initiator == iqn:
found = True
break
# If we didn't find a matching initiator, create one
if not found:
data = {'id': initiator, 'name': initiator_name}
# Try and create the initiator
# If we get a conflict, ignore it because race conditions
self._issue_api_request("initiators",
method="post",
body=data,
conflict_ok=True,
api_version='2')
# Create initiator group with initiator in it
initiator_path = "/initiators/{}".format(initiator)
initiator_group_path = "/initiator_groups/{}".format(
initiator_group)
ig_data = {'name': initiator_group, 'members': [initiator_path]}
self._issue_api_request("initiator_groups",
method="post",
body=ig_data,
conflict_ok=True,
api_version='2')
# Create ACL with initiator group as reference for each
# storage_instance in app_instance
# TODO(_alastor_): We need to avoid changing the ACLs if the
# template already specifies an ACL policy.
for si_name in storage_instances.keys():
acl_url = (datc.URL_TEMPLATES['si']() +
"/{}/acl_policy").format(
datc._get_name(volume['id']), si_name)
existing_acl = self._issue_api_request(acl_url,
method="get",
api_version='2')
data = {}
data['initiators'] = existing_acl['initiators']
data['initiator_groups'] = existing_acl['initiator_groups']
data['initiator_groups'].append(initiator_group_path)
self._issue_api_request(acl_url,
method="put",
body=data,
api_version='2')
if connector and connector.get('ip'):
try:
# Case where volume_type has non default IP Pool info
if policies['ip_pool'] != 'default':
initiator_ip_pool_path = self._issue_api_request(
"access_network_ip_pools/{}".format(
policies['ip_pool']), api_version='2')['path']
# Fallback to trying reasonable IP based guess
else:
initiator_ip_pool_path = self._get_ip_pool_for_string_ip(
connector['ip'])
ip_pool_url = datc.URL_TEMPLATES['si_inst'](
store_name).format(datc._get_name(volume['id']))
ip_pool_data = {'ip_pool': initiator_ip_pool_path}
self._issue_api_request(ip_pool_url,
method="put",
body=ip_pool_data,
api_version='2')
except exception.DateraAPIException:
# Datera product 1.0 support
pass
# Check to ensure we're ready for go-time
self._si_poll(volume, policies)
# =================
# = Detach Volume =
# =================
def _detach_volume_2(self, context, volume, attachment=None):
url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
data = {
'admin_state': 'offline',
'force': True
}
try:
self._issue_api_request(url, method='put', body=data,
api_version='2')
except exception.NotFound:
msg = ("Tried to detach volume %s, but it was not found in the "
"Datera cluster. Continuing with detach.")
LOG.info(msg, volume['id'])
# TODO(_alastor_): Make acl cleaning multi-attach aware
self._clean_acl_2(volume)
def _check_for_acl_2(self, initiator_path):
"""Returns True if an acl is found for initiator_path """
# TODO(_alastor_) when we get a /initiators/:initiator/acl_policies
# endpoint use that instead of this monstrosity
initiator_groups = self._issue_api_request("initiator_groups",
api_version='2')
for ig, igdata in initiator_groups.items():
if initiator_path in igdata['members']:
LOG.debug("Found initiator_group: %s for initiator: %s",
ig, initiator_path)
return True
LOG.debug("No initiator_group found for initiator: %s", initiator_path)
return False
def _clean_acl_2(self, volume):
policies = self._get_policies_for_resource(volume)
store_name, _ = self._scrape_template(policies)
acl_url = (datc.URL_TEMPLATES["si_inst"](
store_name) + "/acl_policy").format(datc._get_name(volume['id']))
try:
initiator_group = self._issue_api_request(
acl_url, api_version='2')['initiator_groups'][0]
initiator_iqn_path = self._issue_api_request(
initiator_group.lstrip("/"))["members"][0]
# Clear out ACL and delete initiator group
self._issue_api_request(acl_url,
method="put",
body={'initiator_groups': []},
api_version='2')
self._issue_api_request(initiator_group.lstrip("/"),
method="delete",
api_version='2')
if not self._check_for_acl_2(initiator_iqn_path):
self._issue_api_request(initiator_iqn_path.lstrip("/"),
method="delete",
api_version='2')
except (IndexError, exception.NotFound):
LOG.debug("Did not find any initiator groups for volume: %s",
volume)
# ===================
# = Create Snapshot =
# ===================
def _create_snapshot_2(self, snapshot):
policies = self._get_policies_for_resource(snapshot)
store_name, vol_name = self._scrape_template(policies)
url_template = datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name) + '/snapshots'
url = url_template.format(datc._get_name(snapshot['volume_id']))
snap_params = {
'uuid': snapshot['id'],
}
snap = self._issue_api_request(url, method='post', body=snap_params,
api_version='2')
snapu = "/".join((url, snap['timestamp']))
self._snap_poll(snapu)
# ===================
# = Delete Snapshot =
# ===================
def _delete_snapshot_2(self, snapshot):
policies = self._get_policies_for_resource(snapshot)
store_name, vol_name = self._scrape_template(policies)
snap_temp = datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name) + '/snapshots'
snapu = snap_temp.format(datc._get_name(snapshot['volume_id']))
snapshots = self._issue_api_request(snapu, method='get',
api_version='2')
try:
for ts, snap in snapshots.items():
if snap['uuid'] == snapshot['id']:
url_template = snapu + '/{}'
url = url_template.format(ts)
self._issue_api_request(url, method='delete',
api_version='2')
break
else:
raise exception.NotFound
except exception.NotFound:
msg = ("Tried to delete snapshot %s, but was not found in "
"Datera cluster. Continuing with delete.")
LOG.info(msg, datc._get_name(snapshot['id']))
# ========================
# = Volume From Snapshot =
# ========================
def _create_volume_from_snapshot_2(self, volume, snapshot):
policies = self._get_policies_for_resource(snapshot)
store_name, vol_name = self._scrape_template(policies)
snap_temp = datc.URL_TEMPLATES['vol_inst'](
store_name, vol_name) + '/snapshots'
snapu = snap_temp.format(datc._get_name(snapshot['volume_id']))
snapshots = self._issue_api_request(snapu, method='get',
api_version='2')
for ts, snap in snapshots.items():
if snap['uuid'] == snapshot['id']:
found_ts = ts
break
else:
raise exception.NotFound
snap_url = (snap_temp + '/{}').format(
datc._get_name(snapshot['volume_id']), found_ts)
self._snap_poll(snap_url)
src = "/" + snap_url
app_params = (
{
'create_mode': 'openstack',
'uuid': str(volume['id']),
'name': datc._get_name(volume['id']),
'clone_src': src,
})
self._issue_api_request(
datc.URL_TEMPLATES['ai'](),
method='post',
body=app_params,
api_version='2')
if (volume['size'] > snapshot['volume_size']):
self._extend_volume_2(volume, volume['size'])
# ==========
# = Manage =
# ==========
def _manage_existing_2(self, volume, existing_ref):
existing_ref = existing_ref['source-name']
if existing_ref.count(":") != 2:
raise exception.ManageExistingInvalidReference(
_("existing_ref argument must be of this format:"
"app_inst_name:storage_inst_name:vol_name"))
app_inst_name = existing_ref.split(":")[0]
LOG.debug("Managing existing Datera volume %s. "
"Changing name to %s",
datc._get_name(volume['id']),
existing_ref)
data = {'name': datc._get_name(volume['id'])}
self._issue_api_request(datc.URL_TEMPLATES['ai_inst']().format(
app_inst_name), method='put', body=data, api_version='2')
# ===================
# = Manage Get Size =
# ===================
def _manage_existing_get_size_2(self, volume, existing_ref):
existing_ref = existing_ref['source-name']
if existing_ref.count(":") != 2:
raise exception.ManageExistingInvalidReference(
_("existing_ref argument must be of this format:"
"app_inst_name:storage_inst_name:vol_name"))
app_inst_name, si_name, vol_name = existing_ref.split(":")
app_inst = self._issue_api_request(
datc.URL_TEMPLATES['ai_inst']().format(app_inst_name),
api_version='2')
return self._get_size_2(volume, app_inst, si_name, vol_name)
def _get_size_2(self, volume, app_inst=None, si_name=None, vol_name=None):
"""Helper method for getting the size of a backend object
If app_inst is provided, we'll just parse the dict to get
the size instead of making a separate http request
"""
policies = self._get_policies_for_resource(volume)
si_name = si_name if si_name else policies['default_storage_name']
vol_name = vol_name if vol_name else policies['default_volume_name']
if not app_inst:
vol_url = datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id']))
app_inst = self._issue_api_request(vol_url)
size = app_inst[
'storage_instances'][si_name]['volumes'][vol_name]['size']
return size
# =========================
# = Get Manageable Volume =
# =========================
def _get_manageable_volumes_2(self, cinder_volumes, marker, limit, offset,
sort_keys, sort_dirs):
LOG.debug("Listing manageable Datera volumes")
app_instances = self._issue_api_request(
datc.URL_TEMPLATES['ai'](), api_version='2').values()
results = []
cinder_volume_ids = [vol['id'] for vol in cinder_volumes]
for ai in app_instances:
ai_name = ai['name']
reference = None
size = None
safe_to_manage = False
reason_not_safe = None
cinder_id = None
extra_info = None
if re.match(datc.UUID4_RE, ai_name):
cinder_id = ai_name.lstrip(datc.OS_PREFIX)
if (not cinder_id and
ai_name.lstrip(datc.OS_PREFIX) not in cinder_volume_ids):
safe_to_manage = self._is_manageable(ai)
if safe_to_manage:
si = list(ai['storage_instances'].values())[0]
si_name = si['name']
vol = list(si['volumes'].values())[0]
vol_name = vol['name']
size = vol['size']
reference = {"source-name": "{}:{}:{}".format(
ai_name, si_name, vol_name)}
results.append({
'reference': reference,
'size': size,
'safe_to_manage': safe_to_manage,
'reason_not_safe': reason_not_safe,
'cinder_id': cinder_id,
'extra_info': extra_info})
page_results = volume_utils.paginate_entries_list(
results, marker, limit, offset, sort_keys, sort_dirs)
return page_results
# ============
# = Unmanage =
# ============
def _unmanage_2(self, volume):
LOG.debug("Unmanaging Cinder volume %s. Changing name to %s",
volume['id'], datc._get_unmanaged(volume['id']))
data = {'name': datc._get_unmanaged(volume['id'])}
self._issue_api_request(datc.URL_TEMPLATES['ai_inst']().format(
datc._get_name(volume['id'])),
method='put',
body=data,
api_version='2')
# ================
# = Volume Stats =
# ================
def _get_volume_stats_2(self, refresh=False):
if refresh or not self.cluster_stats:
try:
LOG.debug("Updating cluster stats info.")
results = self._issue_api_request('system', api_version='2')
if 'uuid' not in results:
LOG.error(
'Failed to get updated stats from Datera Cluster.')
backend_name = self.configuration.safe_get(
'volume_backend_name')
stats = {
'volume_backend_name': backend_name or 'Datera',
'vendor_name': 'Datera',
'driver_version': self.VERSION,
'storage_protocol': 'iSCSI',
'total_capacity_gb': (
int(results['total_capacity']) / units.Gi),
'free_capacity_gb': (
int(results['available_capacity']) / units.Gi),
'reserved_percentage': 0,
}
self.cluster_stats = stats
except exception.DateraAPIException:
LOG.error('Failed to get updated stats from Datera cluster.')
return self.cluster_stats
def _is_manageable(self, app_inst):
if len(app_inst['storage_instances']) == 1:
si = list(app_inst['storage_instances'].values())[0]
if len(si['volumes']) == 1:
return True
return False
# =========
# = Login =
# =========
def _login_2(self):
"""Use the san_login and san_password to set token."""
body = {
'name': self.username,
'password': self.password
}
# Unset token now, otherwise potential expired token will be sent
# along to be used for authorization when trying to login.
self.datera_api_token = None
try:
LOG.debug('Getting Datera auth token.')
results = self._issue_api_request('login', 'put', body=body,
sensitive=True, api_version='2')
self.datera_api_token = results['key']
except exception.NotAuthorized:
with excutils.save_and_reraise_exception():
LOG.error('Logging into the Datera cluster failed. Please '
'check your username and password set in the '
'cinder.conf and start the cinder-volume '
'service again.')
# ===========
# = Polling =
# ===========
def _snap_poll(self, url):
eventlet.sleep(datc.DEFAULT_SNAP_SLEEP)
TIMEOUT = 10
retry = 0
poll = True
while poll and retry < TIMEOUT:
retry += 1
snap = self._issue_api_request(url, api_version='2')
if snap['op_state'] == 'available':
poll = False
else:
eventlet.sleep(1)
if retry >= TIMEOUT:
raise exception.VolumeDriverException(
message=_('Snapshot not ready.'))
def _si_poll(self, volume, policies):
# Initial 4 second sleep required for some Datera versions
eventlet.sleep(datc.DEFAULT_SI_SLEEP_API_2)
TIMEOUT = 10
retry = 0
check_url = datc.URL_TEMPLATES['si_inst'](
policies['default_storage_name']).format(
datc._get_name(volume['id']))
poll = True
while poll and retry < TIMEOUT:
retry += 1
si = self._issue_api_request(check_url, api_version='2')
if si['op_state'] == 'available':
poll = False
else:
eventlet.sleep(1)
if retry >= TIMEOUT:
raise exception.VolumeDriverException(
message=_('Resource not ready.'))
# ============
# = IP Pools =
# ============
def _get_ip_pool_for_string_ip(self, ip):
"""Takes a string ipaddress and return the ip_pool API object dict """
pool = 'default'
ip_obj = ipaddress.ip_address(six.text_type(ip))
ip_pools = self._issue_api_request('access_network_ip_pools',
api_version='2')
for ip_pool, ipdata in ip_pools.items():
for access, adata in ipdata['network_paths'].items():
if not adata.get('start_ip'):
continue
pool_if = ipaddress.ip_interface(
"/".join((adata['start_ip'], str(adata['netmask']))))
if ip_obj in pool_if.network:
pool = ip_pool
return self._issue_api_request(
"access_network_ip_pools/{}".format(pool), api_version='2')['path']
# =============
# = Templates =
# =============
def _scrape_template(self, policies):
sname = policies['default_storage_name']
vname = policies['default_volume_name']
template = policies['template']
if template:
result = self._issue_api_request(
datc.URL_TEMPLATES['at']().format(template), api_version='2')
sname, st = list(result['storage_templates'].items())[0]
vname = list(st['volume_templates'].keys())[0]
return sname, vname
# =======
# = QoS =
# =======
def _update_qos(self, resource, policies):
url = datc.URL_TEMPLATES['vol_inst'](
policies['default_storage_name'],
policies['default_volume_name']) + '/performance_policy'
url = url.format(datc._get_name(resource['id']))
type_id = resource.get('volume_type_id', None)
if type_id is not None:
# Filter for just QOS policies in result. All of their keys
# should end with "max"
fpolicies = {k: int(v) for k, v in
policies.items() if k.endswith("max")}
# Filter all 0 values from being passed
fpolicies = dict(filter(lambda _v: _v[1] > 0, fpolicies.items()))
if fpolicies:
self._issue_api_request(url, 'post', body=fpolicies,
api_version='2')

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
# Copyright 2017 Datera
# Copyright 2020 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -14,228 +14,138 @@
# under the License.
import functools
import json
import random
import re
import string
import time
import types
import uuid
import eventlet
from glanceclient import exc as glance_exc
from oslo_log import log as logging
import requests
import six
from six.moves import http_client
from oslo_utils import importutils
from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder.image import glance
from cinder.volume import qos_specs
from cinder.volume import volume_types
LOG = logging.getLogger(__name__)
OS_PREFIX = "OS-"
UNMANAGE_PREFIX = "UNMANAGED-"
dfs_sdk = importutils.try_import('dfs_sdk')
OS_PREFIX = "OS"
UNMANAGE_PREFIX = "UNMANAGED"
# Taken from this SO post :
# http://stackoverflow.com/a/18516125
# Using old-style string formatting because of the nature of the regex
# conflicting with new-style curly braces
UUID4_STR_RE = ("%s[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab]"
"[a-f0-9]{3}-?[a-f0-9]{12}")
UUID4_STR_RE = ("%s.*([a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab]"
"[a-f0-9]{3}-?[a-f0-9]{12})")
UUID4_RE = re.compile(UUID4_STR_RE % OS_PREFIX)
SNAP_RE = re.compile(r"\d{10,}\.\d+")
# Recursive dict to assemble basic url structure for the most common
# API URL endpoints. Most others are constructed from these
URL_TEMPLATES = {
'ai': lambda: 'app_instances',
'ai_inst': lambda: (URL_TEMPLATES['ai']() + '/{}'),
'si': lambda: (URL_TEMPLATES['ai_inst']() + '/storage_instances'),
'si_inst': lambda storage_name: (
(URL_TEMPLATES['si']() + '/{}').format(
'{}', storage_name)),
'vol': lambda storage_name: (
(URL_TEMPLATES['si_inst'](storage_name) + '/volumes')),
'vol_inst': lambda storage_name, volume_name: (
(URL_TEMPLATES['vol'](storage_name) + '/{}').format(
'{}', volume_name)),
'at': lambda: 'app_templates/{}'}
DEFAULT_SI_SLEEP = 1
DEFAULT_SI_SLEEP_API_2 = 5
DEFAULT_SNAP_SLEEP = 1
INITIATOR_GROUP_PREFIX = "IG-"
API_VERSIONS = ["2", "2.1"]
API_VERSIONS = ["2.1", "2.2"]
API_TIMEOUT = 20
###############
# METADATA KEYS
###############
M_TYPE = 'cinder_volume_type'
M_CALL = 'cinder_calls'
M_CLONE = 'cinder_clone_from'
M_MANAGED = 'cinder_managed'
M_KEYS = [M_TYPE, M_CALL, M_CLONE, M_MANAGED]
VALID_CHARS = set(string.ascii_letters + string.digits + "-_.")
class DateraAPIException(exception.VolumeBackendAPIException):
message = _("Bad response from Datera API")
def _get_name(name):
return "".join((OS_PREFIX, name))
def get_name(resource):
dn = resource.get('display_name')
cid = resource.get('id')
if dn:
dn = filter_chars(dn)
# Check to ensure the name is short enough to fit. Prioritize
# the prefix and Cinder ID, strip all invalid characters
nl = len(OS_PREFIX) + len(dn) + len(cid) + 2
if nl >= 64:
dn = dn[:-(nl - 63)]
return "-".join((OS_PREFIX, dn, cid))
return "-".join((OS_PREFIX, cid))
def _get_unmanaged(name):
return "".join((UNMANAGE_PREFIX, name))
def get_unmanaged(name):
return "-".join((UNMANAGE_PREFIX, name))
def _authenticated(func):
"""Ensure the driver is authenticated to make a request.
In do_setup() we fetch an auth token and store it. If that expires when
we do API request, we'll fetch a new one.
"""
@functools.wraps(func)
def func_wrapper(driver, *args, **kwargs):
try:
return func(driver, *args, **kwargs)
except exception.NotAuthorized:
# Prevent recursion loop. After the driver arg is the
# resource_type arg from _issue_api_request(). If attempt to
# login failed, we should just give up.
if args[0] == 'login':
raise
# Token might've expired, get a new one, try again.
driver.login()
return func(driver, *args, **kwargs)
return func_wrapper
def filter_chars(s):
if s:
return ''.join([c for c in s if c in VALID_CHARS])
return s
def _api_lookup(func):
"""Perform a dynamic API implementation lookup for a call
Naming convention follows this pattern:
# original_func(args) --> _original_func_X_?Y?(args)
# where X and Y are the major and minor versions of the latest
# supported API version
# From the Datera box we've determined that it supports API
# versions ['2', '2.1']
# This is the original function call
@_api_lookup
def original_func(arg1, arg2):
print("I'm a shim, this won't get executed!")
pass
# This is the function that is actually called after determining
# the correct API version to use
def _original_func_2_1(arg1, arg2):
some_version_2_1_implementation_here()
# This is the function that would be called if the previous function
# did not exist:
def _original_func_2(arg1, arg2):
some_version_2_implementation_here()
# This function would NOT be called, because the connected Datera box
# does not support the 1.5 version of the API
def _original_func_1_5(arg1, arg2):
some_version_1_5_implementation_here()
"""
def lookup(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
obj = args[0]
api_versions = _get_supported_api_versions(obj)
api_version = None
index = -1
while True:
try:
api_version = api_versions[index]
except (IndexError, KeyError):
msg = _("No compatible API version found for this product: "
"api_versions -> %(api_version)s, %(func)s")
LOG.error(msg, api_version=api_version, func=func)
raise DateraAPIException(msg % {
'api_version': api_version, 'func': func})
# Py27
try:
name = "_" + "_".join(
(func.func_name, api_version.replace(".", "_")))
# Py3+
except AttributeError:
name = "_" + "_".join(
(func.__name__, api_version.replace(".", "_")))
try:
if obj.do_profile:
LOG.info("Trying method: %s", name)
call_id = uuid.uuid4()
LOG.debug("Profiling method: %s, id %s", name, call_id)
t1 = time.time()
obj.thread_local.trace_id = call_id
result = getattr(obj, name)(*args[1:], **kwargs)
if obj.do_profile:
t2 = time.time()
timedelta = round(t2 - t1, 3)
LOG.debug("Profile for method %s, id %s: %ss",
name, call_id, timedelta)
return result
except AttributeError as e:
# If we find the attribute name in the error message
# then we continue otherwise, raise to prevent masking
# errors
if name not in six.text_type(e):
raise
else:
LOG.info(e)
index -= 1
except DateraAPIException as e:
if "UnsupportedVersionError" in six.text_type(e):
index -= 1
else:
raise
name = "_" + func.__name__ + "_" + obj.apiv.replace(".", "_")
LOG.debug("Trying method: %s", name)
call_id = uuid.uuid4()
if obj.do_profile:
LOG.debug("Profiling method: %s, id %s", name, call_id)
t1 = time.time()
obj.thread_local.trace_id = call_id
result = getattr(obj, name)(*args[1:], **kwargs)
if obj.do_profile:
t2 = time.time()
timedelta = round(t2 - t1, 3)
LOG.debug("Profile for method %s, id %s: %ss",
name, call_id, timedelta)
return result
return wrapper
def _get_supported_api_versions(driver):
t = time.time()
if driver.api_cache and driver.api_timeout - t < API_TIMEOUT:
return driver.api_cache
driver.api_timeout = t + API_TIMEOUT
results = []
host = driver.configuration.san_ip
port = driver.configuration.datera_api_port
client_cert = driver.configuration.driver_client_cert
client_cert_key = driver.configuration.driver_client_cert_key
cert_data = None
header = {'Content-Type': 'application/json; charset=utf-8',
'Datera-Driver': 'OpenStack-Cinder-{}'.format(driver.VERSION)}
protocol = 'http'
if client_cert:
protocol = 'https'
cert_data = (client_cert, client_cert_key)
def _parse_vol_ref(ref):
if ref.count(":") not in (2, 3):
raise exception.ManageExistingInvalidReference(
_("existing_ref argument must be of this format: "
"tenant:app_inst_name:storage_inst_name:vol_name or "
"app_inst_name:storage_inst_name:vol_name"))
try:
url = '%s://%s:%s/api_versions' % (protocol, host, port)
resp = driver._request(url, "get", None, header, cert_data)
data = resp.json()
results = [elem.strip("v") for elem in data['api_versions']]
except (DateraAPIException, KeyError):
# Fallback to pre-endpoint logic
for version in API_VERSIONS[0:-1]:
url = '%s://%s:%s/v%s' % (protocol, host, port, version)
resp = driver._request(url, "get", None, header, cert_data)
if ("api_req" in resp.json() or
str(resp.json().get("code")) == "99"):
results.append(version)
else:
LOG.error("No supported API versions available, "
"Please upgrade your Datera EDF software")
return results
(tenant, app_inst_name, storage_inst_name,
vol_name) = ref.split(":")
if tenant == "root":
tenant = None
except (TypeError, ValueError):
app_inst_name, storage_inst_name, vol_name = ref.split(
":")
tenant = None
return app_inst_name, storage_inst_name, vol_name, tenant
def _check_snap_ref(ref):
if not SNAP_RE.match(ref):
raise exception.ManageExistingInvalidReference(
_("existing_ref argument must be of this format: "
"1234567890.12345678"))
return True
def _get_size(app_inst):
"""Helper method for getting the size of a backend object
If app_inst is provided, we'll just parse the dict to get
the size instead of making a separate http request
"""
if 'data' in app_inst:
app_inst = app_inst['data']
sis = app_inst['storage_instances']
found_si = sis[0]
found_vol = found_si['volumes'][0]
return found_vol['size']
def _get_volume_type_obj(driver, resource):
@ -251,16 +161,20 @@ def _get_volume_type_obj(driver, resource):
def _get_policies_for_resource(driver, resource):
volume_type = driver._get_volume_type_obj(resource)
return driver._get_policies_for_volume_type(volume_type)
def _get_policies_for_volume_type(driver, volume_type):
"""Get extra_specs and qos_specs of a volume_type.
This fetches the scoped keys from the volume type. Anything set from
qos_specs will override key/values set from extra_specs.
"""
volume_type = driver._get_volume_type_obj(resource)
# Handle case of volume with no type. We still want the
# specified defaults from above
if volume_type:
specs = volume_type.get('extra_specs')
specs = volume_type.get('extra_specs', {})
else:
specs = {}
@ -269,19 +183,19 @@ def _get_policies_for_resource(driver, resource):
in driver._init_vendor_properties()[0].items()}
if volume_type:
# Populate updated value
for key, value in specs.items():
if ':' in key:
fields = key.split(':')
key = fields[1]
policies[key] = value
qos_specs_id = volume_type.get('qos_specs_id')
if qos_specs_id is not None:
ctxt = context.get_admin_context()
qos_kvs = qos_specs.get_qos_specs(ctxt, qos_specs_id)['specs']
if qos_kvs:
policies.update(qos_kvs)
specs.update(qos_kvs)
# Populate updated value
for key, value in specs.items():
if ':' in key:
fields = key.split(':')
key = fields[1]
policies[key] = value
# Cast everything except booleans int that can be cast
for k, v in policies.items():
# Handle String Boolean case
@ -296,199 +210,152 @@ def _get_policies_for_resource(driver, resource):
return policies
# ================
# = API Requests =
# ================
def _request(driver, connection_string, method, payload, header, cert_data):
LOG.debug("Endpoint for Datera API call: %s", connection_string)
LOG.debug("Payload for Datera API call: %s", payload)
try:
response = getattr(requests, method)(connection_string,
data=payload, headers=header,
verify=False, cert=cert_data)
return response
except requests.exceptions.RequestException as ex:
msg = _(
'Failed to make a request to Datera cluster endpoint due '
'to the following reason: %s') % six.text_type(
ex.message)
LOG.error(msg)
raise DateraAPIException(msg)
def _image_accessible(driver, context, volume, image_meta):
# Determine if image is accessible by current project
pid = volume.get('project_id', '')
public = False
visibility = image_meta.get('visibility', None)
LOG.debug("Image %(image)s visibility: %(vis)s",
{"image": image_meta['id'], "vis": visibility})
if visibility and visibility in ['public', 'community']:
public = True
elif visibility and visibility in ['shared', 'private']:
# Do membership check. Newton and before didn't have a 'shared'
# visibility option, so we have to do this check for 'private'
# as well
gclient = glance.get_default_image_service()
members = []
# list_members is only available in Rocky+
try:
members = gclient.list_members(context, image_meta['id'])
except AttributeError:
# This is the fallback method for the same query
try:
members = gclient._client.call(context,
'list',
controller='image_members',
image_id=image_meta['id'])
except glance_exc.HTTPForbidden as e:
LOG.warning(e)
except glance_exc.HTTPForbidden as e:
LOG.warning(e)
members = list(members)
LOG.debug("Shared image %(image)s members: %(members)s",
{"image": image_meta['id'], "members": members})
for member in members:
if (member['member_id'] == pid and
member['status'] == 'accepted'):
public = True
break
if image_meta.get('is_public', False):
public = True
else:
if image_meta.get('owner', '') == pid:
public = True
if not public:
LOG.warning("Requested image is not "
"accessible by current Project.")
return public
def _raise_response(driver, response):
msg = _('Request to Datera cluster returned bad status:'
' %(status)s | %(reason)s') % {
'status': response.status_code,
'reason': response.reason}
LOG.error(msg)
raise DateraAPIException(msg)
def _format_tenant(tenant):
if tenant == "all" or (tenant and ('/root' in tenant or 'root' in tenant)):
return '/root'
elif tenant and ('/root' not in tenant and 'root' not in tenant):
return "/" + "/".join(('root', tenant)).strip('/')
return tenant
def _handle_bad_status(driver,
response,
connection_string,
method,
payload,
header,
cert_data,
sensitive=False,
conflict_ok=False):
if (response.status_code == http_client.BAD_REQUEST and
connection_string.endswith("api_versions")):
# Raise the exception, but don't log any error. We'll just fall
# back to the old style of determining API version. We make this
# request a lot, so logging it is just noise
raise DateraAPIException
if response.status_code == http_client.NOT_FOUND:
raise exception.NotFound(response.json()['message'])
elif response.status_code in [http_client.FORBIDDEN,
http_client.UNAUTHORIZED]:
raise exception.NotAuthorized()
elif response.status_code == http_client.CONFLICT and conflict_ok:
# Don't raise, because we're expecting a conflict
pass
elif response.status_code == http_client.SERVICE_UNAVAILABLE:
current_retry = 0
while current_retry <= driver.retry_attempts:
LOG.debug("Datera 503 response, trying request again")
eventlet.sleep(driver.interval)
resp = driver._request(connection_string,
method,
payload,
header,
cert_data)
if resp.ok:
return response.json()
elif resp.status_code != http_client.SERVICE_UNAVAILABLE:
driver._raise_response(resp)
def get_ip_pool(policies):
ip_pool = policies['ip_pool']
if ',' in ip_pool:
ip_pools = ip_pool.split(',')
ip_pool = random.choice(ip_pools)
return ip_pool
def create_tenant(driver, project_id):
if driver.tenant_id.lower() == 'map':
name = get_name({'id': project_id})
elif driver.tenant_id:
name = driver.tenant_id.replace('root', '').strip('/')
else:
driver._raise_response(response)
name = 'root'
if name:
try:
driver.api.tenants.create(name=name)
except dfs_sdk.exceptions.ApiConflictError:
LOG.debug("Tenant {} already exists".format(name))
return _format_tenant(name)
@_authenticated
def _issue_api_request(driver, resource_url, method='get', body=None,
sensitive=False, conflict_ok=False,
api_version='2', tenant=None):
"""All API requests to Datera cluster go through this method.
def get_tenant(driver, project_id):
if driver.tenant_id.lower() == 'map':
return _format_tenant(get_name({'id': project_id}))
elif not driver.tenant_id:
return _format_tenant('root')
return _format_tenant(driver.tenant_id)
:param resource_url: the url of the resource
:param method: the request verb
:param body: a dict with options for the action_type
:param sensitive: Bool, whether request should be obscured from logs
:param conflict_ok: Bool, True to suppress ConflictError exceptions
during this request
:param api_version: The Datera api version for the request
:param tenant: The tenant header value for the request (only applicable
to 2.1 product versions and later)
:returns: a dict of the response from the Datera cluster
"""
host = driver.configuration.san_ip
port = driver.configuration.datera_api_port
api_token = driver.datera_api_token
payload = json.dumps(body, ensure_ascii=False)
payload.encode('utf-8')
def cvol_to_ai(driver, resource, tenant=None):
if not tenant:
tenant = get_tenant(driver, resource['project_id'])
try:
# api.tenants.get needs a non '/'-prefixed tenant id
driver.api.tenants.get(tenant.strip('/'))
except dfs_sdk.exceptions.ApiNotFoundError:
create_tenant(driver, resource['project_id'])
cid = resource.get('id', None)
if not cid:
raise ValueError('Unsure what id key to use for object', resource)
ais = driver.api.app_instances.list(
filter='match(name,.*{}.*)'.format(cid),
tenant=tenant)
if not ais:
raise exception.VolumeNotFound(volume_id=cid)
return ais[0]
header = {'Content-Type': 'application/json; charset=utf-8'}
header.update(driver.HEADER_DATA)
protocol = 'http'
if driver.configuration.driver_use_ssl:
protocol = 'https'
def cvol_to_dvol(driver, resource, tenant=None):
if not tenant:
tenant = get_tenant(driver, resource['project_id'])
ai = cvol_to_ai(driver, resource, tenant=tenant)
si = ai.storage_instances.list(tenant=tenant)[0]
vol = si.volumes.list(tenant=tenant)[0]
return vol
if api_token:
header['Auth-Token'] = api_token
if tenant == "all":
header['tenant'] = tenant
elif tenant and '/root' not in tenant:
header['tenant'] = "".join(("/root/", tenant))
elif tenant and '/root' in tenant:
header['tenant'] = tenant
elif driver.tenant_id and driver.tenant_id.lower() != "map":
header['tenant'] = driver.tenant_id
def _version_to_int(ver):
# Using a factor of 100 per digit so up to 100 versions are supported
# per major/minor/patch/subpatch digit in this calculation
# Example:
# In [2]: _version_to_int("3.3.0.0")
# Out[2]: 303000000
# In [3]: _version_to_int("2.2.7.1")
# Out[3]: 202070100
VERSION_DIGITS = 4
factor = pow(10, VERSION_DIGITS * 2)
div = pow(10, 2)
val = 0
for c in ver.split("."):
val += int(int(c) * factor)
factor /= div
return val
client_cert = driver.configuration.driver_client_cert
client_cert_key = driver.configuration.driver_client_cert_key
cert_data = None
if client_cert:
protocol = 'https'
cert_data = (client_cert, client_cert_key)
connection_string = '%s://%s:%s/v%s/%s' % (protocol, host, port,
api_version, resource_url)
request_id = uuid.uuid4()
if driver.do_profile:
t1 = time.time()
if not sensitive:
LOG.debug("\nDatera Trace ID: %(tid)s\n"
"Datera Request ID: %(rid)s\n"
"Datera Request URL: /v%(api)s/%(url)s\n"
"Datera Request Method: %(method)s\n"
"Datera Request Payload: %(payload)s\n"
"Datera Request Headers: %(header)s\n",
{'tid': driver.thread_local.trace_id,
'rid': request_id,
'api': api_version,
'url': resource_url,
'method': method,
'payload': payload,
'header': header})
response = driver._request(connection_string,
method,
payload,
header,
cert_data)
data = response.json()
timedelta = "Profiling disabled"
if driver.do_profile:
t2 = time.time()
timedelta = round(t2 - t1, 3)
if not sensitive:
LOG.debug("\nDatera Trace ID: %(tid)s\n"
"Datera Response ID: %(rid)s\n"
"Datera Response TimeDelta: %(delta)ss\n"
"Datera Response URL: %(url)s\n"
"Datera Response Payload: %(payload)s\n"
"Datera Response Object: %(obj)s\n",
{'tid': driver.thread_local.trace_id,
'rid': request_id,
'delta': timedelta,
'url': response.url,
'payload': payload,
'obj': vars(response)})
if not response.ok:
driver._handle_bad_status(response,
connection_string,
method,
payload,
header,
cert_data,
conflict_ok=conflict_ok)
return data
def dat_version_gte(version_a, version_b):
return _version_to_int(version_a) >= _version_to_int(version_b)
def register_driver(driver):
for func in [_get_supported_api_versions,
_get_volume_type_obj,
for func in [_get_volume_type_obj,
_get_policies_for_resource,
_request,
_raise_response,
_handle_bad_status,
_issue_api_request]:
# PY27
_get_policies_for_volume_type,
_image_accessible,
get_tenant,
create_tenant,
cvol_to_ai,
cvol_to_dvol]:
f = types.MethodType(func, driver)
try:
setattr(driver, func.func_name, f)
# PY3+
except AttributeError:
setattr(driver, func.__name__, f)
setattr(driver, func.__name__, f)

View File

@ -1,4 +1,4 @@
# Copyright 2017 Datera
# Copyright 2020 Datera
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,28 +19,34 @@ import uuid
from eventlet.green import threading
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
import six
from cinder import exception
from cinder.i18n import _
from cinder import interface
from cinder import utils
from cinder.volume import configuration
import cinder.volume.drivers.datera.datera_api2 as api2
import cinder.volume.drivers.datera.datera_api21 as api21
import cinder.volume.drivers.datera.datera_api22 as api22
import cinder.volume.drivers.datera.datera_common as datc
from cinder.volume.drivers.san import san
LOG = logging.getLogger(__name__)
dfs_sdk = importutils.try_import('dfs_sdk')
d_opts = [
cfg.StrOpt('datera_api_port',
default='7717',
deprecated_for_removal=True,
help='Datera API port.'),
cfg.StrOpt('datera_api_version',
default='2',
default='2.2',
deprecated_for_removal=True,
help='Datera API version.'),
cfg.StrOpt('datera_ldap_server',
default=None,
help='LDAP authentication server'),
cfg.IntOpt('datera_503_timeout',
default='120',
help='Timeout for HTTP 503 retry messages'),
@ -58,25 +64,51 @@ d_opts = [
default=None,
help="If set to 'Map' --> OpenStack project ID will be mapped "
"implicitly to Datera tenant ID\n"
"If set to 'None' --> Datera tenant ID will not be used "
"If set to None --> Datera tenant ID will not be used "
"during volume provisioning\n"
"If set to anything else --> Datera tenant ID will be the "
"provided value"),
cfg.BoolOpt('datera_enable_image_cache',
default=False,
help="Set to True to enable Datera backend image caching"),
cfg.StrOpt('datera_image_cache_volume_type_id',
default=None,
help="Cinder volume type id to use for cached volumes"),
cfg.BoolOpt('datera_disable_profiler',
default=False,
help="Set to True to disable profiling in the Datera driver"),
cfg.BoolOpt('datera_disable_extended_metadata',
default=False,
help="Set to True to disable sending additional metadata to "
"the Datera backend"),
cfg.BoolOpt('datera_disable_template_override',
default=False,
help="Set to True to disable automatic template override of "
"the size attribute when creating from a template"),
cfg.DictOpt('datera_volume_type_defaults',
default={},
help="Settings here will be used as volume-type defaults if "
"the volume-type setting is not provided. This can be "
"used, for example, to set a very low total_iops_max "
"value if none is specified in the volume-type to "
"prevent accidental overusage. Options are specified "
"via the following format, WITHOUT ANY 'DF:' PREFIX: "
"'datera_volume_type_defaults="
"iops_per_gb:100,bandwidth_per_gb:200...etc'."),
]
CONF = cfg.CONF
CONF.import_opt('driver_use_ssl', 'cinder.volume.driver')
CONF.register_opts(d_opts, group=configuration.SHARED_CONF_GROUP)
CONF.register_opts(d_opts)
@six.add_metaclass(utils.TraceWrapperWithABCMetaclass)
class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
@interface.volumedriver
class DateraDriver(san.SanISCSIDriver, api21.DateraApi, api22.DateraApi):
"""The OpenStack Datera iSCSI volume driver.
"""The OpenStack Datera Driver
.. code-block:: none
Version history:
* 1.0 - Initial driver
@ -91,21 +123,85 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
* 2.3.1 - Scalability bugfixes
* 2.3.2 - Volume Placement, ACL multi-attach bugfix
* 2.4.0 - Fast Retype Support
* 2.5.0 - Glance Image Caching, retyping/QoS bugfixes
* 2.6.0 - Api 2.2 support
* 2.6.1 - Glance interoperability fix
* 2.7.0 - IOPS/GB and BW/GB settings, driver level overrides
(API 2.1+ only)
* 2.7.2 - Allowing DF: QoS Spec prefix, QoS type leak bugfix
* 2.7.3 - Fixed bug in clone_image where size was not set correctly
* 2.7.4 - Fix for create_tenant incorrect API call
Temporary fix for DAT-15931
* 2.7.5 - Removed "force" parameter from /initiators v2.1 API requests
* 2.8.0 - iops_per_gb and bandwidth_per_gb are now limited by
total_iops_max and total_bandwidth_max (API 2.1+ only)
Bugfix for cinder retype with online volume
* 2.8.1 - Bugfix for missing default dict during retype
* 2.8.2 - Updated most retype operations to not detach volume
* 2.8.3 - Bugfix for not allowing fast clones for shared/community
volumes
* 2.8.4 - Fixed missing API version pinning in _offline_flip
* 2.8.5 - Membership check for fast image cloning. Metadata API pinning
* 2.8.6 - Added LDAP support and CHAP support
* 2.8.7 - Bugfix for missing tenancy calls in offline_flip
* 2.9.0 - Volumes now correctly renamed during backend migration.
Implemented update_migrated_volume (API 2.1+ only),
Prevent non-raw image cloning
* 2.9.1 - Added extended metadata attributes during volume creation
and attachment. Added datera_disable_extended_metadata
option to disable it.
* 2.9.2 - Made ensure_export a no-op. Removed usage of
initiator-groups
* 2018.4.5.0 - Switch to new date-based versioning scheme. Removed v2
API support
* 2018.4.17.1 - Bugfixes to IP Pools, Templates and Initiators
* 2018.4.25.0 - Snapshot Manage. List Manageable Snapshots support
* 2018.4.27.0 - Major driver revamp/restructure, no functionality
change
* 2018.5.1.0 - Bugfix for Map tenant auto-creation
* 2018.5.18.0 - Bugfix for None tenant handling
* 2018.6.7.0 - Bugfix for missing project_id during image clone
* 2018.7.13.0 - Massive update porting to use the Datera Python-SDK
* 2018.7.20.0 - Driver now includes display_name in created backend
app_instances.
* 2018.9.17.0 - Requirements and doc changes
* 2018.10.8.0 - Added extra_headers to Python-SDK constructor call.
This allows for the SDK to send the type of driver
performing each request along with the request. This
functionality existed before the Python-SDK revamp, so
this change adds the functionality back in.
* 2018.10.8.1 - Adding thread_local to Python-SDK constructor call.
This preserves trace_id in the logs
* 2018.10.30.0 - Adding template_override support. Added
datera_disable_template_override cfgOpt to disable
this feature. Updated required requests version to
>=2.20.0 because of a security vulnerability in
<=2.19.X. Added support for filter_function and
goodness_function.
* 2018.11.1.0 - Adding flash and hybrid capacity info to
get_volume_stats
* 2018.11.8.0 - Fixing bug that broke 2.2.X support
* 2018.11.14.0 - Bugfixes for v2.1 API support and unicode character
support
* 2019.1.24.0 - Python-SDK requirements update, README updates
* 2019.2.25.0 - Scalability fixes and utility script updates
* 2019.6.4.1 - Added Pypi packaging installation support
* 2019.12.10.0 - Python 3.x support, tox tests, CI ready, live
migration support, image cache, bugfixes.
"""
VERSION = '2.4.0'
VERSION = '2019.12.10.0'
CI_WIKI_NAME = "datera-ci"
HEADER_DATA = {'Datera-Driver': 'OpenStack-Cinder-{}'.format(VERSION)}
# TODO(jsbryant) Remove driver in the 'U' release if CI is not fixed.
SUPPORTED = False
def __init__(self, *args, **kwargs):
super(DateraDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(d_opts)
self.username = self.configuration.san_login
self.password = self.configuration.san_password
self.ldap = self.configuration.datera_ldap_server
self.cluster_stats = {}
self.datera_api_token = None
self.interval = self.configuration.datera_503_interval
@ -113,23 +209,39 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
self.interval)
self.driver_prefix = str(uuid.uuid4())[:4]
self.datera_debug = self.configuration.datera_debug
self.datera_api_versions = []
if self.datera_debug:
utils.setup_tracing(['method'])
self.tenant_id = self.configuration.datera_tenant_id
if self.tenant_id is None:
self.tenant_id = ''
self.defaults = self.configuration.datera_volume_type_defaults
if self.tenant_id and self.tenant_id.lower() == 'none':
self.tenant_id = None
self.template_override = (
not self.configuration.datera_disable_template_override)
self.api_check = time.time()
self.api_cache = []
self.api_timeout = 0
self.do_profile = not self.configuration.datera_disable_profiler
self.thread_local = threading.local()
self.do_metadata = (
not self.configuration.datera_disable_extended_metadata)
self.image_cache = self.configuration.datera_enable_image_cache
self.image_type = self.configuration.datera_image_cache_volume_type_id
self.thread_local = threading.local() # pylint: disable=no-member
self.datera_version = None
self.apiv = None
self.api = None
self.filterf = self.get_filter_function()
self.goodnessf = self.get_goodness_function()
self.use_chap_auth = self.configuration.use_chap_auth
self.chap_username = self.configuration.chap_username
self.chap_password = self.configuration.chap_password
backend_name = self.configuration.safe_get(
'volume_backend_name')
self.backend_name = backend_name or 'Datera'
datc.register_driver(self)
def do_setup(self, context):
@ -142,8 +254,25 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
LOG.error(msg)
raise exception.InvalidInput(msg)
self.login()
self._create_tenant()
# Try each valid api version starting with the latest until we find
# one that works
for apiv in reversed(datc.API_VERSIONS):
try:
api = dfs_sdk.get_api(self.configuration.san_ip,
self.username,
self.password,
'v{}'.format(apiv),
disable_log=True,
extra_headers=self.HEADER_DATA,
thread_local=self.thread_local,
ldap_server=self.ldap)
system = api.system.get()
LOG.debug('Connected successfully to cluster: %s', system.name)
self.api = api
self.apiv = apiv
break
except Exception as e:
LOG.warning(e)
# =================
@ -151,7 +280,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Create Volume =
# =================
@datc._api_lookup
@datc.lookup
def create_volume(self, volume):
"""Create a logical volume."""
pass
@ -160,7 +289,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Extend Volume =
# =================
@datc._api_lookup
@datc.lookup
def extend_volume(self, volume, new_size):
pass
@ -170,7 +299,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Cloned Volume =
# =================
@datc._api_lookup
@datc.lookup
def create_cloned_volume(self, volume, src_vref):
pass
@ -178,7 +307,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Delete Volume =
# =================
@datc._api_lookup
@datc.lookup
def delete_volume(self, volume):
pass
@ -186,7 +315,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Ensure Export =
# =================
@datc._api_lookup
@datc.lookup
def ensure_export(self, context, volume, connector=None):
"""Gets the associated account, retrieves CHAP info and updates."""
@ -194,7 +323,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Initialize Connection =
# =========================
@datc._api_lookup
@datc.lookup
def initialize_connection(self, volume, connector):
pass
@ -202,7 +331,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Create Export =
# =================
@datc._api_lookup
@datc.lookup
def create_export(self, context, volume, connector):
pass
@ -210,7 +339,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Detach Volume =
# =================
@datc._api_lookup
@datc.lookup
def detach_volume(self, context, volume, attachment=None):
pass
@ -218,7 +347,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Create Snapshot =
# ===================
@datc._api_lookup
@datc.lookup
def create_snapshot(self, snapshot):
pass
@ -226,7 +355,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Delete Snapshot =
# ===================
@datc._api_lookup
@datc.lookup
def delete_snapshot(self, snapshot):
pass
@ -234,7 +363,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Volume From Snapshot =
# ========================
@datc._api_lookup
@datc.lookup
def create_volume_from_snapshot(self, volume, snapshot):
pass
@ -242,12 +371,11 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Retype =
# ==========
@datc._api_lookup
@datc.lookup
def retype(self, ctxt, volume, new_type, diff, host):
"""Convert the volume to be of the new type.
Returns a boolean indicating whether the retype occurred.
:param ctxt: Context
:param volume: A dictionary describing the volume to migrate
:param new_type: A dictionary describing the volume type to convert to
@ -262,7 +390,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Manage =
# ==========
@datc._api_lookup
@datc.lookup
def manage_existing(self, volume, existing_ref):
"""Manage an existing volume on the Datera backend
@ -276,7 +404,6 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
(existing_ref['source-name'] ==
tenant:app_inst_name:storage_inst_name:vol_name)
if using Datera 2.1 API
or
@ -292,11 +419,41 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"""
pass
@datc.lookup
def manage_existing_snapshot(self, snapshot, existing_ref):
"""Brings an existing backend storage object under Cinder management.
existing_ref is passed straight through from the API request's
manage_existing_ref value, and it is up to the driver how this should
be interpreted. It should be sufficient to identify a storage object
that the driver should somehow associate with the newly-created cinder
snapshot structure.
There are two ways to do this:
1. Rename the backend storage object so that it matches the
snapshot['name'] which is how drivers traditionally map between a
cinder snapshot and the associated backend storage object.
2. Place some metadata on the snapshot, or somewhere in the backend,
that allows other driver requests (e.g. delete) to locate the
backend storage object when required.
If the existing_ref doesn't make sense, or doesn't refer to an existing
backend storage object, raise a ManageExistingInvalidReference
exception.
:param snapshot: Cinder volume snapshot to manage
:param existing_ref: Driver-specific information used to identify a
volume snapshot
"""
pass
# ===================
# = Manage Get Size =
# ===================
@datc._api_lookup
@datc.lookup
def manage_existing_get_size(self, volume, existing_ref):
"""Get the size of an unmanaged volume on the Datera backend
@ -316,20 +473,32 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"""
pass
@datc.lookup
def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
"""Return size of snapshot to be managed by manage_existing.
When calculating the size, round up to the next GB.
:param snapshot: Cinder volume snapshot to manage
:param existing_ref: Driver-specific information used to identify a
volume snapshot
:returns size: Volume snapshot size in GiB (integer)
"""
pass
# =========================
# = Get Manageable Volume =
# =========================
@datc._api_lookup
@datc.lookup
def get_manageable_volumes(self, cinder_volumes, marker, limit, offset,
sort_keys, sort_dirs):
"""List volumes on the backend available for management by Cinder.
Returns a list of dictionaries, each specifying a volume in the host,
with the following keys:
- reference (dictionary): The reference for a volume, which can be
passed to 'manage_existing'.
passed to "manage_existing".
- size (int): The size of the volume according to the storage
backend, rounded up to the nearest GB.
- safe_to_manage (boolean): Whether or not this volume is safe to
@ -353,11 +522,50 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"""
pass
# ============================
# = Get Manageable Snapshots =
# ============================
@datc.lookup
def get_manageable_snapshots(self, cinder_snapshots, marker, limit,
offset, sort_keys, sort_dirs):
"""List snapshots on the backend available for management by Cinder.
Returns a list of dictionaries, each specifying a snapshot in the host,
with the following keys:
- reference (dictionary): The reference for a snapshot, which can be
passed to "manage_existing_snapshot".
- size (int): The size of the snapshot according to the storage
backend, rounded up to the nearest GB.
- safe_to_manage (boolean): Whether or not this snapshot is safe to
manage according to the storage backend. For example, is the snapshot
in use or invalid for any reason.
- reason_not_safe (string): If safe_to_manage is False, the reason why.
- cinder_id (string): If already managed, provide the Cinder ID.
- extra_info (string): Any extra information to return to the user
- source_reference (string): Similar to "reference", but for the
snapshot's source volume.
:param cinder_snapshots: A list of snapshots in this host that Cinder
currently manages, used to determine if
a snapshot is manageable or not.
:param marker: The last item of the previous page; we return the
next results after this value (after sorting)
:param limit: Maximum number of items to return
:param offset: Number of items to skip after marker
:param sort_keys: List of keys to sort results by (valid keys are
'identifier' and 'size')
:param sort_dirs: List of directions to sort by, corresponding to
sort_keys (valid directions are 'asc' and 'desc')
"""
pass
# ============
# = Unmanage =
# ============
@datc._api_lookup
@datc.lookup
def unmanage(self, volume):
"""Unmanage a currently managed volume in Cinder
@ -365,11 +573,43 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"""
pass
# ====================
# = Fast Image Clone =
# ====================
@datc.lookup
def clone_image(self, context, volume, image_location, image_meta,
image_service):
"""Clone an existing image volume."""
pass
# ====================
# = Volume Migration =
# ====================
@datc.lookup
def update_migrated_volume(self, context, volume, new_volume,
volume_status):
"""Return model update for migrated volume.
Each driver implementing this method needs to be responsible for the
values of _name_id and provider_location. If None is returned or either
key is not set, it means the volume table does not need to change the
value(s) for the key(s).
The return format is {"_name_id": value, "provider_location": value}.
:param volume: The original volume that was migrated to this backend
:param new_volume: The migration volume object that was created on
this backend as part of the migration process
:param original_volume_status: The status of the original volume
:returns: model_update to update DB with any needed changes
"""
pass
# ================
# = Volume Stats =
# ================
@datc._api_lookup
@datc.lookup
def get_volume_stats(self, refresh=False):
"""Get volume stats.
@ -384,31 +624,10 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
# = Login =
# =========
@datc._api_lookup
@datc.lookup
def login(self):
pass
# =======
# = QoS =
# =======
def _update_qos(self, resource, policies):
url = datc.URL_TEMPLATES['vol_inst'](
policies['default_storage_name'],
policies['default_volume_name']) + '/performance_policy'
url = url.format(datc._get_name(resource['id']))
type_id = resource.get('volume_type_id', None)
if type_id is not None:
# Filter for just QOS policies in result. All of their keys
# should end with "max"
fpolicies = {k: int(v) for k, v in
policies.items() if k.endswith("max")}
# Filter all 0 values from being passed
fpolicies = dict(filter(lambda _v: _v[1] > 0, fpolicies.items()))
if fpolicies:
self._issue_api_request(url, 'post', body=fpolicies,
api_version='2')
def _get_lunid(self):
return 0
@ -447,18 +666,62 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
prefix: DF --> Datera Fabric
"""
LOG.debug("Using the following volume-type defaults: %s",
self.defaults)
properties = {}
self._set_property(
properties,
"DF:iops_per_gb",
"Datera IOPS Per GB Setting",
_("Setting this value will calculate IOPS for each volume of "
"this type based on their size. Eg. A setting of 100 will "
"give a 1 GB volume 100 IOPS, but a 10 GB volume 1000 IOPS. "
"A setting of '0' is unlimited. This value is applied to "
"total_iops_max and will be overridden by total_iops_max if "
"iops_per_gb is set and a large enough volume is provisioned "
"which would exceed total_iops_max"),
"integer",
minimum=0,
default=int(self.defaults.get('iops_per_gb', 0)))
self._set_property(
properties,
"DF:bandwidth_per_gb",
"Datera Bandwidth Per GB Setting",
_("Setting this value will calculate bandwidth for each volume of "
"this type based on their size in KiB/s. Eg. A setting of 100 "
"will give a 1 GB volume 100 KiB/s bandwidth, but a 10 GB "
"volume 1000 KiB/s bandwidth. A setting of '0' is unlimited. "
"This value is applied to total_bandwidth_max and will be "
"overridden by total_bandwidth_max if set and a large enough "
"volume is provisioned which woudl exceed total_bandwidth_max"),
"integer",
minimum=0,
default=int(self.defaults.get('bandwidth_per_gb', 0)))
self._set_property(
properties,
"DF:placement_mode",
"Datera Volume Placement",
_("'single_flash' for single-flash-replica placement, "
"Datera Volume Placement Mode (deprecated)",
_("'DEPRECATED: PLEASE USE 'placement_policy' on 3.3.X+ versions "
" of the Datera product. 'single_flash' for "
"single-flash-replica placement, "
"'all_flash' for all-flash-replica placement, "
"'hybrid' for hybrid placement"),
"string",
default="hybrid")
default=self.defaults.get('placement_mode', 'hybrid'))
self._set_property(
properties,
"DF:placement_policy",
"Datera Volume Placement Policy",
_("Valid path to a media placement policy. Example: "
"/placement_policies/all-flash"),
"string",
default=self.defaults.get('placement_policy',
'default'))
self._set_property(
properties,
@ -466,7 +729,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"Datera Round Robin Portals",
_("True to round robin the provided portals for a target"),
"boolean",
default=False)
default="True" == self.defaults.get('round_robin', "False"))
if self.configuration.get('datera_debug_replica_count_override'):
replica_count = 1
@ -480,24 +743,20 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"increased once volume is created"),
"integer",
minimum=1,
default=replica_count)
self._set_property(
properties,
"DF:acl_allow_all",
"Datera ACL Allow All",
_("True to set acl 'allow_all' on volumes created. Cannot be "
"changed on volume once set"),
"boolean",
default=False)
default=int(self.defaults.get('replica_count', replica_count)))
self._set_property(
properties,
"DF:ip_pool",
"Datera IP Pool",
_("Specifies IP pool to use for volume"),
_("Specifies IP pool to use for volume. If provided string "
"contains commas, it will be split on the commas and each "
"substring will be uses as a separate IP pool and the volume's "
"IP pool will be chosen randomly from the list. Example: "
"'my-ip-pool1,my-ip-pool2,my-ip-pool3', next attach "
"my-ip-pool2 was chosen randomly as the volume IP pool"),
"string",
default="default")
default=self.defaults.get('ip_pool', 'default'))
self._set_property(
properties,
@ -505,7 +764,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"Datera Template",
_("Specifies Template to use for volume provisioning"),
"string",
default="")
default=self.defaults.get('template', ''))
# ###### QoS Settings ###### #
self._set_property(
@ -516,23 +775,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"use 0 for unlimited"),
"integer",
minimum=0,
default=0)
self._set_property(
properties,
"DF:default_storage_name",
"Datera Default Storage Instance Name",
_("The name to use for storage instances created"),
"string",
default="storage-1")
self._set_property(
properties,
"DF:default_volume_name",
"Datera Default Volume Name",
_("The name to use for volumes created"),
"string",
default="volume-1")
default=int(self.defaults.get('read_bandwidth_max', 0)))
self._set_property(
properties,
@ -542,7 +785,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"use 0 for unlimited"),
"integer",
minimum=0,
default=0)
default=int(self.defaults.get('write_bandwidth_max', 0)))
self._set_property(
properties,
@ -552,7 +795,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"use 0 for unlimited"),
"integer",
minimum=0,
default=0)
default=int(self.defaults.get('total_bandwidth_max', 0)))
self._set_property(
properties,
@ -562,7 +805,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"use 0 for unlimited"),
"integer",
minimum=0,
default=0)
default=int(self.defaults.get('read_iops_max', 0)))
self._set_property(
properties,
@ -572,7 +815,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"use 0 for unlimited"),
"integer",
minimum=0,
default=0)
default=int(self.defaults.get('write_iops_max', 0)))
self._set_property(
properties,
@ -582,7 +825,7 @@ class DateraDriver(san.SanISCSIDriver, api2.DateraApi, api21.DateraApi):
"use 0 for unlimited"),
"integer",
minimum=0,
default=0)
default=int(self.defaults.get('total_iops_max', 0)))
# ###### End QoS Settings ###### #
return properties, 'DF'

View File

@ -39,3 +39,6 @@ infi.dtypes.iqn # PSF
# Storpool
storpool>=4.0.0 # Apache-2.0
storpool.spopenstack>=2.2.1 # Apache-2.0
# Datera
dfs_sdk>=1.2.25 # Apache-2.0

View File

@ -175,3 +175,4 @@ purestorage==1.6.0
rsd-lib==1.1.0
storpool==4.0.0
storpool.spopenstack==2.2.1
dfs_sdk==1.2.25

View File

@ -132,4 +132,6 @@ rsd =
storpool =
storpool>=4.0.0 # Apache-2.0
storpool.spopenstack>=2.2.1 # Apache-2.0
datera =
dfs_sdk>=1.2.25 # Apache-2.0