
399 lines
14 KiB

# Copyright (c) 2010 Citrix Systems, Inc.
# Copyright (c) 2013 OpenStack Foundation
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
Helper methods for operations related to the management of volumes,
and storage repositories
import re
import uuid
from eventlet import greenthread
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import strutils
from oslo_utils import versionutils
import six
import nova.conf
from nova import exception
from nova.i18n import _
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
# Namespace for SRs so we can reliably generate a UUID
# Generated from uuid.uuid5(uuid.UUID(int=0), 'volume_utils-SR_UUID')
SR_NAMESPACE = uuid.UUID("3cca4135-a809-5bb3-af62-275fbfe87178")
def parse_sr_info(connection_data, description=''):
params = {}
if 'sr_uuid' not in connection_data:
params = _parse_volume_info(connection_data)
sr_identity = "%s/%s/%s" % (params['target'], params['port'],
# PY2 can only support taking an ascii string to uuid5
if six.PY2 and isinstance(sr_identity, six.text_type):
sr_identity = sr_identity.encode('utf-8')
sr_uuid = str(uuid.uuid5(SR_NAMESPACE, sr_identity))
sr_uuid = connection_data['sr_uuid']
for k in connection_data.get('introduce_sr_keys', {}):
params[k] = connection_data[k]
label = connection_data.pop('name_label',
'tempSR-%s' % sr_uuid)
params['name_description'] = connection_data.get('name_description',
return (sr_uuid, label, params)
def _parse_volume_info(connection_data):
"""Parse device_path and mountpoint as they can be used by XenAPI.
In particular, the mountpoint (e.g. /dev/sdc) must be translated
into a numeric literal.
volume_id = connection_data['volume_id']
target_portal = connection_data['target_portal']
target_host = _get_target_host(target_portal)
target_port = _get_target_port(target_portal)
target_iqn = connection_data['target_iqn']
log_params = {
"vol_id": volume_id,
"host": target_host,
"port": target_port,
"iqn": target_iqn
LOG.debug('(vol_id,host,port,iqn): '
'(%(vol_id)s,%(host)s,%(port)s,%(iqn)s)', log_params)
if (volume_id is None or
target_host is None or
target_iqn is None):
raise exception.StorageError(
reason=_('Unable to obtain target information %s') %
volume_info = {}
volume_info['id'] = volume_id
volume_info['target'] = target_host
volume_info['port'] = target_port
volume_info['targetIQN'] = target_iqn
if ('auth_method' in connection_data and
connection_data['auth_method'] == 'CHAP'):
volume_info['chapuser'] = connection_data['auth_username']
volume_info['chappassword'] = connection_data['auth_password']
return volume_info
def _get_target_host(iscsi_string):
"""Retrieve target host."""
if iscsi_string:
host = iscsi_string.split(':')[0]
if len(host) > 0:
return host
return CONF.xenserver.target_host
def _get_target_port(iscsi_string):
"""Retrieve target port."""
if iscsi_string and ':' in iscsi_string:
return iscsi_string.split(':')[1]
return CONF.xenserver.target_port
def introduce_sr(session, sr_uuid, label, params):
LOG.debug('Introducing SR %s', label)
sr_type, sr_desc = _handle_sr_params(params)
if _requires_backend_kind(session.product_version) and sr_type == 'iscsi':
params['backend-kind'] = 'vbd'
sr_ref = session.call_xenapi('SR.introduce', sr_uuid, label, sr_desc,
sr_type, '', False, params)
LOG.debug('Creating PBD for SR')
pbd_ref = _create_pbd(session, sr_ref, params)
LOG.debug('Plugging SR')
session.call_xenapi("PBD.plug", pbd_ref)
session.call_xenapi("SR.scan", sr_ref)
return sr_ref
def _requires_backend_kind(version):
# Fix for Bug #1502929
version_as_string = '.'.join(str(v) for v in version)
return (versionutils.is_compatible('6.5', version_as_string))
def _handle_sr_params(params):
if 'id' in params:
del params['id']
sr_type = params.pop('sr_type', 'iscsi')
sr_desc = params.pop('name_description', '')
return sr_type, sr_desc
def _create_pbd(session, sr_ref, params):
pbd_rec = {}
pbd_rec['host'] = session.host_ref
pbd_rec['SR'] = sr_ref
pbd_rec['device_config'] = params
pbd_ref = session.call_xenapi("PBD.create", pbd_rec)
return pbd_ref
def introduce_vdi(session, sr_ref, vdi_uuid=None, target_lun=None):
"""Introduce VDI in the host."""
vdi_ref = _get_vdi_ref(session, sr_ref, vdi_uuid, target_lun)
if vdi_ref is None:
session.call_xenapi("SR.scan", sr_ref)
vdi_ref = _get_vdi_ref(session, sr_ref, vdi_uuid, target_lun)
except session.XenAPI.Failure:
LOG.exception('Unable to introduce VDI on SR')
raise exception.StorageError(
reason=_('Unable to introduce VDI on SR %s') % sr_ref)
if not vdi_ref:
raise exception.StorageError(
reason=_('VDI not found on SR %(sr)s (vdi_uuid '
'%(vdi_uuid)s, target_lun %(target_lun)s)') %
{'sr': sr_ref, 'vdi_uuid': vdi_uuid,
'target_lun': target_lun})
vdi_rec = session.call_xenapi("VDI.get_record", vdi_ref)
except session.XenAPI.Failure:
LOG.exception('Unable to get record of VDI')
raise exception.StorageError(
reason=_('Unable to get record of VDI %s on') % vdi_ref)
if vdi_rec['managed']:
# We do not need to introduce the vdi
return vdi_ref
return session.call_xenapi("VDI.introduce",
except session.XenAPI.Failure:
LOG.exception('Unable to introduce VDI for SR')
raise exception.StorageError(
reason=_('Unable to introduce VDI for SR %s') % sr_ref)
def _get_vdi_ref(session, sr_ref, vdi_uuid, target_lun):
if vdi_uuid:
LOG.debug("vdi_uuid: %s", vdi_uuid)
return session.call_xenapi("VDI.get_by_uuid", vdi_uuid)
elif target_lun:
vdi_refs = session.call_xenapi("SR.get_VDIs", sr_ref)
for curr_ref in vdi_refs:
curr_rec = session.call_xenapi("VDI.get_record", curr_ref)
if ('sm_config' in curr_rec and
'LUNid' in curr_rec['sm_config'] and
curr_rec['sm_config']['LUNid'] == str(target_lun)):
return curr_ref
return (session.call_xenapi("SR.get_VDIs", sr_ref))[0]
return None
def purge_sr(session, sr_ref):
# Make sure no VBDs are referencing the SR VDIs
vdi_refs = session.call_xenapi("SR.get_VDIs", sr_ref)
for vdi_ref in vdi_refs:
vbd_refs = session.call_xenapi("VDI.get_VBDs", vdi_ref)
if vbd_refs:
LOG.warning('Cannot purge SR with referenced VDIs')
forget_sr(session, sr_ref)
def forget_sr(session, sr_ref):
"""Forgets the storage repository without destroying the VDIs within."""
LOG.debug('Forgetting SR...')
_unplug_pbds(session, sr_ref)
session.call_xenapi("SR.forget", sr_ref)
def _unplug_pbds(session, sr_ref):
pbds = session.call_xenapi("SR.get_PBDs", sr_ref)
except session.XenAPI.Failure as exc:
LOG.warning('Ignoring exception %(exc)s when getting PBDs'
' for %(sr_ref)s', {'exc': exc, 'sr_ref': sr_ref})
for pbd in pbds:
session.call_xenapi("PBD.unplug", pbd)
except session.XenAPI.Failure as exc:
LOG.warning('Ignoring exception %(exc)s when unplugging'
' PBD %(pbd)s', {'exc': exc, 'pbd': pbd})
def get_device_number(mountpoint):
device_number = _mountpoint_to_number(mountpoint)
if device_number < 0:
raise exception.StorageError(
reason=_('Unable to obtain target information %s') %
return device_number
def _mountpoint_to_number(mountpoint):
"""Translate a mountpoint like /dev/sdc into a numeric."""
if mountpoint.startswith('/dev/'):
mountpoint = mountpoint[5:]
if re.match('^[hs]d[a-p]$', mountpoint):
return (ord(mountpoint[2:3]) - ord('a'))
elif re.match('^x?vd[a-p]$', mountpoint):
return (ord(mountpoint[-1]) - ord('a'))
elif re.match('^[0-9]+$', mountpoint):
return int(mountpoint, 10)
LOG.warning('Mountpoint cannot be translated: %s', mountpoint)
return -1
def find_sr_by_uuid(session, sr_uuid):
"""Return the storage repository given a uuid."""
return session.call_xenapi("SR.get_by_uuid", sr_uuid)
except session.XenAPI.Failure as exc:
if exc.details[0] == 'UUID_INVALID':
return None
def find_sr_from_vbd(session, vbd_ref):
"""Find the SR reference from the VBD reference."""
vdi_ref = session.call_xenapi("VBD.get_VDI", vbd_ref)
sr_ref = session.call_xenapi("VDI.get_SR", vdi_ref)
except session.XenAPI.Failure:
LOG.exception('Unable to find SR from VBD')
raise exception.StorageError(
reason=_('Unable to find SR from VBD %s') % vbd_ref)
return sr_ref
def find_sr_from_vdi(session, vdi_ref):
"""Find the SR reference from the VDI reference."""
sr_ref = session.call_xenapi("VDI.get_SR", vdi_ref)
except session.XenAPI.Failure:
LOG.exception('Unable to find SR from VDI')
raise exception.StorageError(
reason=_('Unable to find SR from VDI %s') % vdi_ref)
return sr_ref
def find_vbd_by_number(session, vm_ref, dev_number):
"""Get the VBD reference from the device number."""
vbd_refs = session.VM.get_VBDs(vm_ref)
requested_device = str(dev_number)
if vbd_refs:
for vbd_ref in vbd_refs:
user_device = session.VBD.get_userdevice(vbd_ref)
if user_device == requested_device:
return vbd_ref
except session.XenAPI.Failure:
msg = "Error looking up VBD %s for %s" % (vbd_ref, vm_ref)
LOG.debug(msg, exc_info=True)
def is_booted_from_volume(session, vm_ref, user_device=0):
"""Determine if the root device is a volume."""
# TODO(bkaminski): We have opened the scope of this method to accept
# userdevice. We should rename this method and its references for clarity.
vbd_ref = find_vbd_by_number(session, vm_ref, user_device)
vbd_other_config = session.VBD.get_other_config(vbd_ref)
if vbd_other_config.get('osvol', False):
return True
return False
def _get_vdi_import_path(session, task_ref, vdi_ref, disk_format):
session_id = session.get_session_id()
str_fmt = '/import_raw_vdi?session_id={}&task_id={}&vdi={}&format={}'
return str_fmt.format(session_id, task_ref, vdi_ref, disk_format)
def _stream_to_vdi(conn, vdi_import_path, file_size, file_obj):
headers = {'Content-Type': 'application/octet-stream',
'Content-Length': '%s' % file_size}
CHUNK_SIZE = 16 * 1024
LOG.debug('Initialising PUT request to %s (Headers: %s)',
vdi_import_path, headers)
conn.request('PUT', vdi_import_path, headers=headers)
remain_size = file_size
while remain_size >= CHUNK_SIZE:
trunk =
remain_size -= CHUNK_SIZE
if remain_size != 0:
trunk =
resp = conn.getresponse()
LOG.debug("Connection response status:reason is "
{'status': resp.status, 'reason': resp.reason})
def stream_to_vdi(session, instance, disk_format,
file_obj, file_size, vdi_ref):
task_name_label = 'VDI_IMPORT_for_' + instance['name']
with session.custom_task(task_name_label) as task_ref:
vdi_import_path = _get_vdi_import_path(session, task_ref, vdi_ref,
with session.http_connection() as conn:
_stream_to_vdi(conn, vdi_import_path, file_size, file_obj)
except Exception as e:
with excutils.save_and_reraise_exception():
LOG.error('Streaming disk to VDI failed with error: %s',
e, instance=instance)