Adrian Vladu 5dba5c60f6 Check all configdrive types if one errors out
By default, all config drive types are checked.
But if somehow, the implementation of each type
fails, the metadata service errors out and does not
check for the next type.

The issue was reported here:
https://ask.cloudbase.it/question/3094/windows-server-2016-extendvolumespluginp-doesnt-work/

In that case, in the method is_vfat_drive:
  match = VOLUME_LABEL_REGEX.search(out)
  return match.group(1) in CONFIG_DRIVE_LABELS

if match value is None, the return line throws an error:
  AttributeError: 'NoneType' object has no attribute 'group'

To make sure that no other implementation will bubble up
the error, we catch the error in the config_drive metadata
service.

Catching the error will allow that the next config_drive
type will be checked for metadata.

Change-Id: I0d9967ec6a81214c7d78be667cffa4a98758587a
2019-10-28 15:54:28 +02:00

218 lines
8.4 KiB
Python

# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import os
import shutil
import struct
import tempfile
import uuid
from oslo_log import log as oslo_logging
from cloudbaseinit import conf as cloudbaseinit_conf
from cloudbaseinit import exception
from cloudbaseinit.metadata.services.osconfigdrive import base
from cloudbaseinit.osutils import factory as osutils_factory
from cloudbaseinit.utils.windows import disk
from cloudbaseinit.utils.windows import vfat
CONF = cloudbaseinit_conf.CONF
LOG = oslo_logging.getLogger(__name__)
CONFIG_DRIVE_LABEL = 'config-2'
MAX_SECTOR_SIZE = 4096
# Absolute offset values and the ISO magic string.
OFFSET_BOOT_RECORD = 0x8000
OFFSET_ISO_ID = OFFSET_BOOT_RECORD + 1
ISO_ID = b'CD001'
# Little-endian unsigned short size values.
OFFSET_VOLUME_SIZE = OFFSET_BOOT_RECORD + 80
OFFSET_BLOCK_SIZE = OFFSET_BOOT_RECORD + 128
PEEK_SIZE = 2
class WindowsConfigDriveManager(base.BaseConfigDriveManager):
def __init__(self):
super(WindowsConfigDriveManager, self).__init__()
self._osutils = osutils_factory.get_os_utils()
def _check_for_config_drive(self, drive):
label = self._osutils.get_volume_label(drive)
if label and label.lower() == CONFIG_DRIVE_LABEL and \
os.path.exists(os.path.join(drive,
'openstack\\latest\\'
'meta_data.json')):
LOG.info('Config Drive found on %s', drive)
return True
return False
def _get_iso_file_size(self, device):
if not device.fixed:
return None
if not device.size > (OFFSET_BLOCK_SIZE + PEEK_SIZE):
return None
off = device.seek(OFFSET_ISO_ID)
magic = device.read(len(ISO_ID), skip=OFFSET_ISO_ID - off)
if ISO_ID != magic:
return None
off = device.seek(OFFSET_VOLUME_SIZE)
volume_size_bytes = device.read(PEEK_SIZE,
skip=OFFSET_VOLUME_SIZE - off)
off = device.seek(OFFSET_BLOCK_SIZE)
block_size_bytes = device.read(PEEK_SIZE,
skip=OFFSET_BLOCK_SIZE - off)
volume_size = struct.unpack("<H", volume_size_bytes)[0]
block_size = struct.unpack("<H", block_size_bytes)[0]
return volume_size * block_size
def _write_iso_file(self, device, iso_file_path, iso_file_size):
with open(iso_file_path, 'wb') as stream:
offset = 0
# Read multiples of the sector size bytes
# until the entire ISO content is written.
while offset < iso_file_size:
real_offset = device.seek(offset)
bytes_to_read = min(MAX_SECTOR_SIZE, iso_file_size - offset)
data = device.read(bytes_to_read, skip=offset - real_offset)
stream.write(data)
offset += bytes_to_read
def _extract_files_from_iso(self, iso_file_path):
args = [CONF.bsdtar_path, '-xf', iso_file_path,
'-C', self.target_path]
(out, err, exit_code) = self._osutils.execute_process(args, False)
if exit_code:
raise exception.CloudbaseInitException(
'Failed to execute "bsdtar" from path "%(bsdtar_path)s" with '
'exit code: %(exit_code)s\n%(out)s\n%(err)s' % {
'bsdtar_path': CONF.bsdtar_path,
'exit_code': exit_code,
'out': out, 'err': err})
def _extract_iso_from_devices(self, devices):
"""Search across multiple devices for a raw ISO."""
extracted = False
iso_file_path = os.path.join(tempfile.gettempdir(),
str(uuid.uuid4()) + '.iso')
for device in devices:
try:
with device:
iso_file_size = self._get_iso_file_size(device)
if iso_file_size:
LOG.info('ISO9660 disk found on %s', device)
self._write_iso_file(device, iso_file_path,
iso_file_size)
self._extract_files_from_iso(iso_file_path)
extracted = True
break
except Exception as exc:
LOG.warning('ISO extraction failed on %(device)s with '
'%(error)r', {"device": device, "error": exc})
if os.path.isfile(iso_file_path):
os.remove(iso_file_path)
return extracted
def _get_config_drive_from_cdrom_drive(self):
for drive_letter in self._osutils.get_cdrom_drives():
if self._check_for_config_drive(drive_letter):
os.rmdir(self.target_path)
shutil.copytree(drive_letter, self.target_path)
return True
return False
def _get_config_drive_from_raw_hdd(self):
disks = map(disk.Disk, self._osutils.get_physical_disks())
return self._extract_iso_from_devices(disks)
def _get_config_drive_from_vfat(self):
for drive_path in self._osutils.get_physical_disks():
if vfat.is_vfat_drive(self._osutils, drive_path):
LOG.info('Config Drive found on disk %r', drive_path)
vfat.copy_from_vfat_drive(self._osutils, drive_path,
self.target_path)
return True
return False
def _get_config_drive_from_partition(self):
for disk_path in self._osutils.get_physical_disks():
physical_drive = disk.Disk(disk_path)
with physical_drive:
partitions = physical_drive.partitions()
extracted = self._extract_iso_from_devices(partitions)
if extracted:
return True
return False
def _get_config_drive_from_volume(self):
"""Look through all the volumes for config drive."""
volumes = self._osutils.get_volumes()
for volume in volumes:
if self._check_for_config_drive(volume):
os.rmdir(self.target_path)
shutil.copytree(volume, self.target_path)
return True
return False
def _get_config_drive_files(self, cd_type, cd_location):
try:
get_config_drive = self.config_drive_type_location.get(
"{}_{}".format(cd_location, cd_type))
if get_config_drive:
return get_config_drive()
else:
LOG.debug("Irrelevant type %(type)s in %(location)s "
"location; skip",
{"type": cd_type, "location": cd_location})
except Exception as exc:
LOG.warning("Config type %(type)s not found in %(loc)s "
"location; Error: '%(err)r'",
{"type": cd_type, "loc": cd_location, "err": exc})
return False
def get_config_drive_files(self, searched_types=None,
searched_locations=None):
searched_types = searched_types or []
searched_locations = searched_locations or []
for cd_type, cd_location in itertools.product(searched_types,
searched_locations):
LOG.debug('Looking for Config Drive %(type)s in %(location)s',
{"type": cd_type, "location": cd_location})
if self._get_config_drive_files(cd_type, cd_location):
return True
return False
@property
def config_drive_type_location(self):
return {
"cdrom_iso": self._get_config_drive_from_cdrom_drive,
"hdd_iso": self._get_config_drive_from_raw_hdd,
"hdd_vfat": self._get_config_drive_from_vfat,
"partition_iso": self._get_config_drive_from_partition,
"partition_vfat": self._get_config_drive_from_volume,
}