cinder/cinder/utils.py

1057 lines
38 KiB
Python

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Utilities and helper functions for all Cinder code.
This file is for utilities useful in all of Cinder,
including cinder-manage, the api service, the scheduler,
etc.
Code related to volume drivers and connecting to volumes
should be placed in volume_utils instead.
"""
from __future__ import annotations # Remove when only supporting python 3.9+
from collections import OrderedDict
import contextlib
import datetime
import functools
import inspect
import logging as py_logging
import math
import multiprocessing
import operator
import os
import pyclbr
import re
import shutil
import stat
import sys
import tempfile
import typing
from typing import Callable, Iterable, Iterator # noqa: H301
from typing import Optional, Type, Union # noqa: H301
import eventlet
from eventlet import tpool
from oslo_concurrency import lockutils
from oslo_concurrency import processutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import excutils
from oslo_utils import importutils
from oslo_utils import strutils
from oslo_utils import timeutils
import tenacity
from cinder import coordination
from cinder import exception
from cinder.i18n import _
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
ISO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
INITIAL_AUTO_MOSR = 20
INFINITE_UNKNOWN_VALUES = ('infinite', 'unknown')
synchronized = lockutils.synchronized_with_prefix('cinder-')
synchronized_remove = lockutils.remove_external_lock_file_with_prefix(
'cinder-')
def clean_volume_file_locks(volume_id, driver):
"""Remove file locks used by Cinder.
This doesn't take care of driver locks, those should be handled in driver's
delete_volume method.
"""
for name in (volume_id + '-delete_volume', volume_id,
volume_id + '-detach_volume'):
try:
synchronized_remove(name)
except Exception as exc:
LOG.warning('Failed to cleanup volume lock %(name)s: %(exc)s',
{'name': name, 'exc': exc})
try:
driver.clean_volume_file_locks(volume_id)
except Exception as exc:
LOG.warning('Failed to cleanup driver locks for volume %(id)s: '
'%(exc)s', {'id': volume_id, 'exc': exc})
def api_clean_volume_file_locks(volume_id):
coordination.synchronized_remove('attachment_update-' + volume_id + '-*')
def clean_snapshot_file_locks(snapshot_id, driver):
try:
name = snapshot_id + '-delete_snapshot'
synchronized_remove(name)
except Exception as exc:
LOG.warning('Failed to cleanup snapshot lock %(name)s: %(exc)s',
{'name': name, 'exc': exc})
try:
driver.clean_snapshot_file_locks(snapshot_id)
except Exception as exc:
LOG.warning('Failed to cleanup driver locks for snapshot %(id)s: '
'%(exc)s', {'id': snapshot_id, 'exc': exc})
def as_int(obj: Union[int, float, str], quiet: bool = True) -> int:
# Try "2" -> 2
try:
return int(obj)
except (ValueError, TypeError):
pass
# Try "2.5" -> 2
try:
return int(float(obj))
except (ValueError, TypeError):
pass
# Eck, not sure what this is then.
if not quiet:
raise TypeError(_("Can not translate %s to integer.") % (obj))
obj = typing.cast(int, obj)
return obj
def check_exclusive_options(
**kwargs: Optional[Union[dict, str, bool]]) -> None:
"""Checks that only one of the provided options is actually not-none.
Iterates over all the kwargs passed in and checks that only one of said
arguments is not-none, if more than one is not-none then an exception will
be raised with the names of those arguments who were not-none.
"""
if not kwargs:
return
pretty_keys = kwargs.pop("pretty_keys", True)
exclusive_options = {}
for (k, v) in kwargs.items():
if v is not None:
exclusive_options[k] = True
if len(exclusive_options) > 1:
# Change the format of the names from pythonic to
# something that is more readable.
#
# Ex: 'the_key' -> 'the key'
if pretty_keys:
tnames = [k.replace('_', ' ') for k in kwargs]
else:
tnames = list(kwargs.keys())
names = ", ".join(sorted(tnames))
msg = (_("May specify only one of %s") % (names))
raise exception.InvalidInput(reason=msg)
def execute(*cmd: str, **kwargs: Union[bool, str]) -> tuple[str, str]:
"""Convenience wrapper around oslo's execute() method."""
if 'run_as_root' in kwargs and 'root_helper' not in kwargs:
kwargs['root_helper'] = get_root_helper()
return processutils.execute(*cmd, **kwargs)
def check_ssh_injection(cmd_list: list[str]) -> None:
ssh_injection_pattern: tuple[str, ...] = ('`', '$', '|', '||', ';', '&',
'&&', '>', '>>', '<')
# Check whether injection attacks exist
for arg in cmd_list:
arg = arg.strip()
# Check for matching quotes on the ends
is_quoted = re.match('^(?P<quote>[\'"])(?P<quoted>.*)(?P=quote)$', arg)
if is_quoted:
# Check for unescaped quotes within the quoted argument
quoted = is_quoted.group('quoted')
if quoted:
if (re.match('[\'"]', quoted) or
re.search('[^\\\\][\'"]', quoted)):
raise exception.SSHInjectionThreat(command=cmd_list)
else:
# We only allow spaces within quoted arguments, and that
# is the only special character allowed within quotes
if len(arg.split()) > 1:
raise exception.SSHInjectionThreat(command=cmd_list)
# Second, check whether danger character in command. So the shell
# special operator must be a single argument.
for c in ssh_injection_pattern:
if c not in arg:
continue
result = arg.find(c)
if not result == -1:
if result == 0 or not arg[result - 1] == '\\':
raise exception.SSHInjectionThreat(command=cmd_list)
def check_metadata_properties(
metadata: Optional[dict[str, str]]) -> None:
"""Checks that the volume metadata properties are valid."""
if not metadata:
metadata = {}
if not isinstance(metadata, dict):
msg = _("Metadata should be a dict.")
raise exception.InvalidInput(msg)
for k, v in metadata.items():
try:
check_string_length(k, "Metadata key: %s" % k, min_length=1)
check_string_length(v, "Value for metadata key: %s" % k)
except exception.InvalidInput as exc:
raise exception.InvalidVolumeMetadata(reason=exc)
# for backward compatibility
if len(k) > 255:
msg = _("Metadata property key %s greater than 255 "
"characters.") % k
raise exception.InvalidVolumeMetadataSize(reason=msg)
if len(v) > 255:
msg = _("Metadata property key %s value greater than "
"255 characters.") % k
raise exception.InvalidVolumeMetadataSize(reason=msg)
def last_completed_audit_period(unit: Optional[str] = None) -> \
tuple[Union[datetime.datetime, datetime.timedelta],
Union[datetime.datetime, datetime.timedelta]]:
"""This method gives you the most recently *completed* audit period.
arguments:
units: string, one of 'hour', 'day', 'month', 'year'
Periods normally begin at the beginning (UTC) of the
period unit (So a 'day' period begins at midnight UTC,
a 'month' unit on the 1st, a 'year' on Jan, 1)
unit string may be appended with an optional offset
like so: 'day@18' This will begin the period at 18:00
UTC. 'month@15' starts a monthly period on the 15th,
and year@3 begins a yearly one on March 1st.
returns: 2 tuple of datetimes (begin, end)
The begin timestamp of this audit period is the same as the
end of the previous.
"""
if not unit:
unit = CONF.volume_usage_audit_period
unit = typing.cast(str, unit)
offset: Union[str, int] = 0
if '@' in unit:
unit, offset = unit.split("@", 1)
offset = int(offset)
offset = typing.cast(int, offset)
rightnow = timeutils.utcnow()
if unit not in ('month', 'day', 'year', 'hour'):
raise ValueError('Time period must be hour, day, month or year')
if unit == 'month':
if offset == 0:
offset = 1
end = datetime.datetime(day=offset,
month=rightnow.month,
year=rightnow.year)
if end >= rightnow:
year = rightnow.year
if 1 >= rightnow.month:
year -= 1
month = 12 + (rightnow.month - 1)
else:
month = rightnow.month - 1
end = datetime.datetime(day=offset,
month=month,
year=year)
year = end.year
if 1 >= end.month:
year -= 1
month = 12 + (end.month - 1)
else:
month = end.month - 1
begin = datetime.datetime(day=offset, month=month, year=year)
elif unit == 'year':
if offset == 0:
offset = 1
end = datetime.datetime(day=1, month=offset, year=rightnow.year)
if end >= rightnow:
end = datetime.datetime(day=1,
month=offset,
year=rightnow.year - 1)
begin = datetime.datetime(day=1,
month=offset,
year=rightnow.year - 2)
else:
begin = datetime.datetime(day=1,
month=offset,
year=rightnow.year - 1)
elif unit == 'day':
end = datetime.datetime(hour=offset,
day=rightnow.day,
month=rightnow.month,
year=rightnow.year)
if end >= rightnow:
end = end - datetime.timedelta(days=1)
begin = end - datetime.timedelta(days=1)
elif unit == 'hour':
end = rightnow.replace(minute=offset, second=0, microsecond=0)
if end >= rightnow:
end = end - datetime.timedelta(hours=1)
begin = end - datetime.timedelta(hours=1)
return (begin, end)
def monkey_patch() -> None:
"""Patches decorators for all functions in a specified module.
If the CONF.monkey_patch set as True,
this function patches a decorator
for all functions in specified modules.
You can set decorators for each modules
using CONF.monkey_patch_modules.
The format is "Module path:Decorator function".
Example: 'cinder.api.ec2.cloud:' \
cinder.openstack.common.notifier.api.notify_decorator'
Parameters of the decorator are as follows.
(See cinder.openstack.common.notifier.api.notify_decorator)
:param name: name of the function
:param function: object of the function
"""
# If CONF.monkey_patch is not True, this function do nothing.
if not CONF.monkey_patch:
return
# Get list of modules and decorators
for module_and_decorator in CONF.monkey_patch_modules:
module, decorator_name = module_and_decorator.split(':')
# import decorator function
decorator = importutils.import_class(decorator_name)
__import__(module)
# Retrieve module information using pyclbr
module_data = pyclbr.readmodule_ex(module)
for key in module_data.keys():
# set the decorator for the class methods
if isinstance(module_data[key], pyclbr.Class):
clz = importutils.import_class("%s.%s" % (module, key))
# On Python 3, unbound methods are regular functions
predicate = inspect.isfunction
for method, func in inspect.getmembers(clz, predicate):
setattr(
clz, method,
decorator("%s.%s.%s" % (module, key, method), func))
# set the decorator for the function
elif isinstance(module_data[key], pyclbr.Function):
func = importutils.import_class("%s.%s" % (module, key))
setattr(sys.modules[module], key,
decorator("%s.%s" % (module, key), func))
def make_dev_path(dev: str,
partition: Optional[str] = None,
base: str = '/dev') -> str:
"""Return a path to a particular device.
>>> make_dev_path('xvdc')
/dev/xvdc
>>> make_dev_path('xvdc', 1)
/dev/xvdc1
"""
path = os.path.join(base, dev)
if partition:
path += str(partition)
return path
def robust_file_write(directory: str, filename: str, data: str) -> None:
"""Robust file write.
Use "write to temp file and rename" model for writing the
persistence file.
:param directory: Target directory to create a file.
:param filename: File name to store specified data.
:param data: String data.
"""
tempname = None
dirfd = None
try:
dirfd = os.open(directory, os.O_DIRECTORY)
# write data to temporary file
with tempfile.NamedTemporaryFile(prefix=filename,
dir=directory,
delete=False) as tf:
tempname = tf.name
tf.write(data.encode('utf-8'))
tf.flush()
os.fdatasync(tf.fileno())
tf.close()
# Fsync the directory to ensure the fact of the existence of
# the temp file hits the disk.
os.fsync(dirfd)
# If destination file exists, it will be replaced silently.
os.rename(tempname, os.path.join(directory, filename))
# Fsync the directory to ensure the rename hits the disk.
os.fsync(dirfd)
except OSError:
with excutils.save_and_reraise_exception():
LOG.error("Failed to write persistence file: %(path)s.",
{'path': os.path.join(directory, filename)})
if tempname is not None:
if os.path.isfile(tempname):
os.unlink(tempname)
finally:
if dirfd is not None:
os.close(dirfd)
@contextlib.contextmanager
def temporary_chown(path: str,
owner_uid: Optional[int] = None) -> Iterator[None]:
"""Temporarily chown a path.
:params owner_uid: UID of temporary owner (defaults to current user)
"""
if os.name == 'nt':
LOG.debug("Skipping chown for %s as this operation is "
"not available on Windows.", path)
yield
return
if owner_uid is None:
owner_uid = os.getuid()
orig_uid = os.stat(path).st_uid
if orig_uid != owner_uid:
execute('chown', str(owner_uid), path, run_as_root=True)
try:
yield
finally:
if orig_uid != owner_uid:
execute('chown', str(orig_uid), path, run_as_root=True)
@contextlib.contextmanager
def tempdir(**kwargs) -> Iterator[str]:
tmpdir = tempfile.mkdtemp(**kwargs)
try:
yield tmpdir
finally:
try:
shutil.rmtree(tmpdir)
except OSError as e:
LOG.debug('Could not remove tmpdir: %s', str(e))
def get_root_helper() -> str:
return 'sudo cinder-rootwrap %s' % CONF.rootwrap_config
def get_file_mode(path: str) -> int:
"""This primarily exists to make unit testing easier."""
return stat.S_IMODE(os.stat(path).st_mode)
def get_file_gid(path: str) -> int:
"""This primarily exists to make unit testing easier."""
return os.stat(path).st_gid
def get_file_size(path: str) -> int:
"""Returns the file size."""
return os.stat(path).st_size
def _get_disk_of_partition(
devpath: str,
st: Optional[os.stat_result] = None) -> tuple[str, os.stat_result]:
"""Gets a disk device path and status from partition path.
Returns a disk device path from a partition device path, and stat for
the device. If devpath is not a partition, devpath is returned as it is.
For example, '/dev/sda' is returned for '/dev/sda1', and '/dev/disk1' is
for '/dev/disk1p1' ('p' is prepended to the partition number if the disk
name ends with numbers).
"""
diskpath = re.sub(r'(?:(?<=\d)p)?\d+$', '', devpath)
if diskpath != devpath:
try:
st_disk = os.stat(diskpath)
if stat.S_ISBLK(st_disk.st_mode):
return (diskpath, st_disk)
except OSError:
pass
# devpath is not a partition
if st is None:
st = os.stat(devpath)
return (devpath, st)
def get_bool_param(param_string: str,
params: dict,
default: bool = False) -> bool:
param = params.get(param_string, default)
if not strutils.is_valid_boolstr(param):
msg = _("Value '%(param)s' for '%(param_string)s' is not "
"a boolean.") % {'param': param, 'param_string': param_string}
raise exception.InvalidParameterValue(err=msg)
return strutils.bool_from_string(param, strict=True)
def get_blkdev_major_minor(path: str,
lookup_for_file: bool = True) -> Optional[str]:
"""Get 'major:minor' number of block device.
Get the device's 'major:minor' number of a block device to control
I/O ratelimit of the specified path.
If lookup_for_file is True and the path is a regular file, lookup a disk
device which the file lies on and returns the result for the device.
"""
st = os.stat(path)
if stat.S_ISBLK(st.st_mode):
path, st = _get_disk_of_partition(path, st)
return '%d:%d' % (os.major(st.st_rdev), os.minor(st.st_rdev))
elif stat.S_ISCHR(st.st_mode):
# No I/O ratelimit control is provided for character devices
return None
elif lookup_for_file:
# lookup the mounted disk which the file lies on
out, _err = execute('df', path)
devpath = out.split("\n")[1].split()[0]
if devpath[0] != '/':
# the file is on a network file system
return None
return get_blkdev_major_minor(devpath, False)
else:
msg = _("Unable to get a block device for file \'%s\'") % path
raise exception.CinderException(msg)
def check_string_length(value: str, name: str, min_length: int = 0,
max_length: Optional[int] = None,
allow_all_spaces: bool = True) -> None:
"""Check the length of specified string.
:param value: the value of the string
:param name: the name of the string
:param min_length: the min_length of the string
:param max_length: the max_length of the string
"""
try:
strutils.check_string_length(value, name=name,
min_length=min_length,
max_length=max_length)
except(ValueError, TypeError) as exc:
raise exception.InvalidInput(reason=exc)
if not allow_all_spaces and value.isspace():
msg = _('%(name)s cannot be all spaces.')
raise exception.InvalidInput(reason=msg)
def is_blk_device(dev: str) -> bool:
try:
if stat.S_ISBLK(os.stat(dev).st_mode):
return True
return False
except Exception:
LOG.debug('Path %s not found in is_blk_device check', dev)
return False
class ComparableMixin(object):
def _compare(self, other: object, method: Callable):
try:
return method(self._cmpkey(), other._cmpkey()) # type: ignore
except (AttributeError, TypeError):
# _cmpkey not implemented, or return different type,
# so I can't compare with "other".
return NotImplemented
def __lt__(self, other: object):
return self._compare(other, lambda s, o: s < o)
def __le__(self, other: object):
return self._compare(other, lambda s, o: s <= o)
def __eq__(self, other: object):
return self._compare(other, lambda s, o: s == o)
def __ge__(self, other: object):
return self._compare(other, lambda s, o: s >= o)
def __gt__(self, other: object):
return self._compare(other, lambda s, o: s > o)
def __ne__(self, other: object):
return self._compare(other, lambda s, o: s != o)
class retry_if_exit_code(tenacity.retry_if_exception):
"""Retry on ProcessExecutionError specific exit codes."""
def __init__(self, codes):
self.codes = (codes,) if isinstance(codes, int) else codes
super(retry_if_exit_code, self).__init__(self._check_exit_code)
def _check_exit_code(self, exc):
return (exc and isinstance(exc, processutils.ProcessExecutionError) and
exc.exit_code in self.codes)
def retry(retry_param: Union[None,
Type[Exception],
tuple[Type[Exception], ...],
int,
tuple[int, ...]],
interval: float = 1,
retries: int = 3,
backoff_rate: float = 2,
wait_random: bool = False,
retry=tenacity.retry_if_exception_type) -> Callable:
if retries < 1:
raise ValueError('Retries must be greater than or '
'equal to 1 (received: %s). ' % retries)
if wait_random:
wait = tenacity.wait_random_exponential(multiplier=interval)
else:
wait = tenacity.wait_exponential(
multiplier=interval, min=0, exp_base=backoff_rate)
def _decorator(f: Callable) -> Callable:
@functools.wraps(f)
def _wrapper(*args, **kwargs):
r = tenacity.Retrying(
sleep=tenacity.nap.sleep,
before_sleep=tenacity.before_sleep_log(LOG, logging.DEBUG),
after=tenacity.after_log(LOG, logging.DEBUG),
stop=tenacity.stop_after_attempt(retries),
reraise=True,
retry=retry(retry_param),
wait=wait)
return r(f, *args, **kwargs)
return _wrapper
return _decorator
def convert_str(text: Union[str, bytes]) -> str:
"""Convert to native string.
Convert bytes and Unicode strings to native strings:
* convert to bytes on Python 2:
encode Unicode using encodeutils.safe_encode()
* convert to Unicode on Python 3: decode bytes from UTF-8
"""
if isinstance(text, bytes):
return text.decode('utf-8')
else:
return text
def build_or_str(elements: Union[None, str, Iterable[str]],
str_format: Optional[str] = None) -> str:
"""Builds a string of elements joined by 'or'.
Will join strings with the 'or' word and if a str_format is provided it
will be used to format the resulted joined string.
If there are no elements an empty string will be returned.
:param elements: Elements we want to join.
:type elements: String or iterable of strings.
:param str_format: String to use to format the response.
:type str_format: String.
"""
if not elements:
return ''
if not isinstance(elements, str):
elements = _(' or ').join(elements)
elements = typing.cast(str, elements)
if str_format:
return str_format % elements
return elements
def calculate_capacity_factors(total_capacity: float,
free_capacity: float,
provisioned_capacity: float,
thin_provisioning_support: bool,
max_over_subscription_ratio: float,
reserved_percentage: int,
thin: bool) -> dict:
"""Create the various capacity factors of the a particular backend.
Based off of definition of terms
cinder-specs/specs/queens/provisioning-improvements.html
Description of factors calculated where units of gb are Gibibytes.
reserved_capacity - The amount of space reserved from the total_capacity
as reported by the backend.
total_reserved_available_capacity - The total capacity minus reserved
capacity
total_available_capacity - The total capacity available to cinder
calculated from total_reserved_available_capacity (for thick) OR
for thin total_reserved_available_capacity max_over_subscription_ratio
calculated_free_capacity - total_available_capacity - provisioned_capacity
virtual_free_capacity - The calculated free capacity available to cinder
to allocate new storage.
For thin: calculated_free_capacity
For thick: the reported free_capacity can be less than the calculated
capacity, so we use free_capacity - reserved_capacity.
free_percent - the percentage of the virtual_free_capacity and
total_available_capacity is left over
provisioned_ratio - The ratio of provisioned storage to
total_available_capacity
:param total_capacity: The reported total capacity in the backend.
:type total_capacity: float
:param free_capacity: The free space/capacity as reported by the backend.
:type free_capacity: float
:param provisioned_capacity: as reported by backend or volume manager from
allocated_capacity_gb
:type provisioned_capacity: float
:param thin_provisioning_support: Is thin provisioning supported?
:type thin_provisioning_support: bool
:param max_over_subscription_ratio: as reported by the backend
:type max_over_subscription_ratio: float
:param reserved_percentage: the % amount to reserve as unavailable. 0-100
:type reserved_percentage: int, 0-100
:param thin: calculate based on thin provisioning if enabled by
thin_provisioning_support
:type thin: bool
:return: A dictionary of all of the capacity factors.
:rtype: dict
"""
total = float(total_capacity)
reserved = float(reserved_percentage) / 100
reserved_capacity = math.floor(total * reserved)
total_reserved_available = total - reserved_capacity
if thin and thin_provisioning_support:
total_available_capacity = (
total_reserved_available * max_over_subscription_ratio
)
calculated_free = total_available_capacity - provisioned_capacity
virtual_free = calculated_free
provisioned_type = 'thin'
else:
# Calculate how much free space is left after taking into
# account the reserved space.
total_available_capacity = total_reserved_available
calculated_free = total_available_capacity - provisioned_capacity
virtual_free = calculated_free
if free_capacity < calculated_free:
virtual_free = free_capacity
provisioned_type = 'thick'
if total_available_capacity:
provisioned_ratio = provisioned_capacity / total_available_capacity
free_percent = (virtual_free / total_available_capacity) * 100
else:
provisioned_ratio = 0
free_percent = 0
def _limit(x):
"""Limit our floating points to 2 decimal places."""
return round(x, 2)
return {
"total_capacity": total,
"free_capacity": free_capacity,
"reserved_capacity": reserved_capacity,
"total_reserved_available_capacity": _limit(total_reserved_available),
"max_over_subscription_ratio": (
max_over_subscription_ratio if provisioned_type == 'thin' else None
),
"total_available_capacity": _limit(total_available_capacity),
"provisioned_capacity": provisioned_capacity,
"calculated_free_capacity": _limit(calculated_free),
"virtual_free_capacity": _limit(virtual_free),
"free_percent": _limit(free_percent),
"provisioned_ratio": _limit(provisioned_ratio),
"provisioned_type": provisioned_type
}
def calculate_virtual_free_capacity(total_capacity: float,
free_capacity: float,
provisioned_capacity: float,
thin_provisioning_support: bool,
max_over_subscription_ratio: float,
reserved_percentage: int,
thin: bool) -> float:
"""Calculate the virtual free capacity based on multiple factors.
:param total_capacity: total_capacity_gb of a host_state or pool.
:param free_capacity: free_capacity_gb of a host_state or pool.
:param provisioned_capacity: provisioned_capacity_gb of a host_state
or pool.
:param thin_provisioning_support: thin_provisioning_support of
a host_state or a pool.
:param max_over_subscription_ratio: max_over_subscription_ratio of
a host_state or a pool
:param reserved_percentage: reserved_percentage of a host_state or
a pool.
:param thin: whether volume to be provisioned is thin
:returns: the calculated virtual free capacity.
"""
factors = calculate_capacity_factors(
total_capacity,
free_capacity,
provisioned_capacity,
thin_provisioning_support,
max_over_subscription_ratio,
reserved_percentage,
thin
)
return factors["virtual_free_capacity"]
def calculate_max_over_subscription_ratio(
capability: dict,
global_max_over_subscription_ratio: float) -> float:
# provisioned_capacity_gb is the apparent total capacity of
# all the volumes created on a backend, which is greater than
# or equal to allocated_capacity_gb, which is the apparent
# total capacity of all the volumes created on a backend
# in Cinder. Using allocated_capacity_gb as the default of
# provisioned_capacity_gb if it is not set.
allocated_capacity_gb = capability.get('allocated_capacity_gb', 0)
provisioned_capacity_gb = capability.get('provisioned_capacity_gb',
allocated_capacity_gb)
thin_provisioning_support = capability.get('thin_provisioning_support',
False)
total_capacity_gb = capability.get('total_capacity_gb', 0)
free_capacity_gb = capability.get('free_capacity_gb', 0)
pool_name = capability.get('pool_name',
capability.get('volume_backend_name'))
# If thin provisioning is not supported the capacity filter will not use
# the value we return, no matter what it is.
if not thin_provisioning_support:
LOG.debug("Trying to retrieve max_over_subscription_ratio from a "
"service that does not support thin provisioning")
return 1.0
# Again, if total or free capacity is infinite or unknown, the capacity
# filter will not use the max_over_subscription_ratio at all. So, does
# not matter what we return here.
if ((total_capacity_gb in INFINITE_UNKNOWN_VALUES) or
(free_capacity_gb in INFINITE_UNKNOWN_VALUES)):
return 1.0
max_over_subscription_ratio = (capability.get(
'max_over_subscription_ratio') or global_max_over_subscription_ratio)
# We only calculate the automatic max_over_subscription_ratio (mosr)
# when the global or driver conf is set auto and while
# provisioned_capacity_gb is not 0. When auto is set and
# provisioned_capacity_gb is 0, we use the default value 20.0.
if max_over_subscription_ratio == 'auto':
if provisioned_capacity_gb != 0:
used_capacity = total_capacity_gb - free_capacity_gb
LOG.debug("Calculating max_over_subscription_ratio for "
"pool %s: provisioned_capacity_gb=%s, "
"used_capacity=%s",
pool_name, provisioned_capacity_gb, used_capacity)
max_over_subscription_ratio = 1 + (
float(provisioned_capacity_gb) / (used_capacity + 1))
else:
max_over_subscription_ratio = INITIAL_AUTO_MOSR
LOG.info("Auto max_over_subscription_ratio for pool %s is "
"%s", pool_name, max_over_subscription_ratio)
else:
max_over_subscription_ratio = float(max_over_subscription_ratio)
return max_over_subscription_ratio
def validate_dictionary_string_length(specs: dict) -> None:
"""Check the length of each key and value of dictionary."""
if not isinstance(specs, dict):
msg = _('specs must be a dictionary.')
raise exception.InvalidInput(reason=msg)
for key, value in specs.items():
if key is not None:
check_string_length(key, 'Key "%s"' % key,
min_length=1, max_length=255)
if value is not None:
check_string_length(value, 'Value for key "%s"' % key,
min_length=0, max_length=255)
def service_expired_time(
with_timezone: Optional[bool] = False) -> datetime.datetime:
return (timeutils.utcnow(with_timezone=with_timezone) -
datetime.timedelta(seconds=CONF.service_down_time))
class DoNothing(str):
"""Class that literrally does nothing.
We inherit from str in case it's called with json.dumps.
"""
def __call__(self, *args, **kwargs):
return self
def __getattr__(self, name):
return self
DO_NOTHING = DoNothing()
def notifications_enabled(conf):
"""Check if oslo notifications are enabled."""
notifications_driver = set(conf.oslo_messaging_notifications.driver)
return notifications_driver and notifications_driver != {'noop'}
def if_notifications_enabled(f: Callable) -> Callable:
"""Calls decorated method only if notifications are enabled."""
@functools.wraps(f)
def wrapped(*args, **kwargs):
if notifications_enabled(CONF):
return f(*args, **kwargs)
return DO_NOTHING
return wrapped
LOG_LEVELS = ('INFO', 'WARNING', 'ERROR', 'DEBUG')
def get_log_method(level_string: str) -> int:
level_string = level_string or ''
upper_level_string = level_string.upper()
if upper_level_string not in LOG_LEVELS:
raise exception.InvalidInput(
reason=_('%s is not a valid log level.') % level_string)
return getattr(logging, upper_level_string)
def set_log_levels(prefix: str, level_string: str) -> None:
level = get_log_method(level_string)
prefix = prefix or ''
for k, v in logging.get_loggers().items():
if k and k.startswith(prefix):
v.logger.setLevel(level)
def get_log_levels(prefix: str) -> dict:
prefix = prefix or ''
return {k: py_logging.getLevelName(v.logger.getEffectiveLevel())
for k, v in logging.get_loggers().items()
if k and k.startswith(prefix)}
def paths_normcase_equal(path_a: str, path_b: str) -> bool:
return os.path.normcase(path_a) == os.path.normcase(path_b)
def create_ordereddict(adict: dict) -> OrderedDict:
"""Given a dict, return a sorted OrderedDict."""
return OrderedDict(sorted(adict.items(),
key=operator.itemgetter(0)))
class Semaphore(object):
"""Custom semaphore to workaround eventlet issues with multiprocessing."""
def __init__(self, limit):
self.limit = limit
self.semaphore = multiprocessing.Semaphore(limit)
def __enter__(self):
# Eventlet does not work with multiprocessing's Semaphore, so we have
# to execute it in a native thread to avoid getting blocked when trying
# to acquire the semaphore.
return tpool.execute(self.semaphore.__enter__)
def __exit__(self, *args):
# Don't use native thread for exit, as it will only add overhead
return self.semaphore.__exit__(*args)
def semaphore_factory(limit: int,
concurrent_processes: int) -> Union[eventlet.Semaphore,
Semaphore]:
"""Get a semaphore to limit concurrent operations.
The semaphore depends on the limit we want to set and the concurrent
processes that need to be limited.
"""
# Limit of 0 is no limit, so we won't use a semaphore
if limit:
# If we only have 1 process we can use eventlet's Semaphore
if concurrent_processes == 1:
return eventlet.Semaphore(limit)
# Use our own Sempahore for interprocess because eventlet blocks with
# the standard one
return Semaphore(limit)
return contextlib.suppress()
def limit_operations(func: Callable) -> Callable:
"""Decorator to limit the number of concurrent operations.
This method decorator expects to have a _semaphore attribute holding an
initialized semaphore in the self instance object.
We can get the appropriate semaphore with the semaphore_factory method.
"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self._semaphore:
return func(self, *args, **kwargs)
return wrapper