openstacksdk/openstack/utils.py
2024-09-12 22:25:39 +00:00

626 lines
20 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections.abc import Mapping
import hashlib
import queue
import string
import threading
import time
import typing as ty
import keystoneauth1
from keystoneauth1 import adapter as ks_adapter
from keystoneauth1 import discover
from openstack import _log
from openstack import exceptions
def urljoin(*args):
"""A custom version of urljoin that simply joins strings into a path.
The real urljoin takes into account web semantics like when joining a url
like /path this should be joined to http://host/path as it is an anchored
link. We generally won't care about that in client.
"""
return '/'.join(str(a or '').strip('/') for a in args)
def iterate_timeout(timeout, message, wait=2):
"""Iterate and raise an exception on timeout.
This is a generator that will continually yield and sleep for
wait seconds, and if the timeout is reached, will raise an exception
with <message>.
"""
log = _log.setup_logging('openstack.iterate_timeout')
try:
# None as a wait winds up flowing well in the per-resource cache
# flow. We could spread this logic around to all of the calling
# points, but just having this treat None as "I don't have a value"
# seems friendlier
if wait is None:
wait = 2
elif wait == 0:
# wait should be < timeout, unless timeout is None
wait = 0.1 if timeout is None else min(0.1, timeout)
wait = float(wait)
except ValueError:
raise exceptions.SDKException(
"Wait value must be an int or float value. {wait} given"
" instead".format(wait=wait)
)
start = time.time()
count = 0
while (timeout is None) or (time.time() < start + timeout):
count += 1
yield count
log.debug('Waiting %s seconds', wait)
time.sleep(wait)
raise exceptions.ResourceTimeout(message)
class _AccessSaver:
__slots__ = ('keys',)
def __init__(self):
self.keys = []
def __getitem__(self, key):
self.keys.append(key)
def get_string_format_keys(fmt_string, old_style=True):
"""Gets a list of required keys from a format string
Required mostly for parsing base_path urls for required keys, which
use the old style string formatting.
"""
if old_style:
a = _AccessSaver()
fmt_string % a
return a.keys
else:
keys = []
for t in string.Formatter().parse(fmt_string):
if t[1] is not None:
keys.append(t[1])
return keys
def supports_version(
adapter: ks_adapter.Adapter,
version: str,
raise_exception: bool = False,
) -> bool:
"""Determine if the given adapter supports the given version.
Checks the version asserted by the service and ensures this matches the
provided version. ``version`` can be a major version or a major-minor
version
:param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
:param version: String containing the desired version.
:param raise_exception: Raise exception when requested version
is not supported by the server.
:returns: ``True`` if the service supports the version, else ``False``.
:raises: :class:`~openstack.exceptions.SDKException` when
``raise_exception`` is ``True`` and requested version is not supported.
"""
required = discover.normalize_version_number(version)
if discover.version_match(required, adapter.get_api_major_version()):
return True
if raise_exception:
raise exceptions.SDKException(
f'Required version {version} is not supported by the server'
)
return False
def supports_microversion(adapter, microversion, raise_exception=False):
"""Determine if the given adapter supports the given microversion.
Checks the min and max microversion asserted by the service and ensures
``min <= microversion <= max``. If set, the current default microversion is
taken into consideration to ensure ``microversion <= default``.
:param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
:param str microversion: String containing the desired microversion.
:param bool raise_exception: Raise exception when requested microversion
is not supported by the server or is higher than the current default
microversion.
:returns: True if the service supports the microversion, else False.
:rtype: bool
:raises: :class:`~openstack.exceptions.SDKException` when
``raise_exception`` is ``True`` and requested microversion is not
supported.
"""
endpoint_data = adapter.get_endpoint_data()
if (
endpoint_data.min_microversion
and endpoint_data.max_microversion
and discover.version_between(
endpoint_data.min_microversion,
endpoint_data.max_microversion,
microversion,
)
):
if adapter.default_microversion is not None:
# If default_microversion is set - evaluate
# whether it match the expectation
candidate = discover.normalize_version_number(
adapter.default_microversion
)
required = discover.normalize_version_number(microversion)
supports = discover.version_match(required, candidate)
if raise_exception and not supports:
raise exceptions.SDKException(
'Required microversion {ver} is higher than currently '
'selected {curr}'.format(
ver=microversion, curr=adapter.default_microversion
)
)
return supports
return True
if raise_exception:
raise exceptions.SDKException(
'Required microversion {ver} is not supported '
'by the server side'.format(ver=microversion)
)
return False
def require_microversion(adapter, required):
"""Require microversion.
:param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
:param str microversion: String containing the desired microversion.
:raises: :class:`~openstack.exceptions.SDKException` when requested
microversion is not supported
"""
supports_microversion(adapter, required, raise_exception=True)
def pick_microversion(session, required):
"""Get a new microversion if it is higher than session's default.
:param session: The session to use for making this request.
:type session: :class:`~keystoneauth1.adapter.Adapter`
:param required: Minimum version that is required for an action.
:type required: String or tuple or None.
:return: ``required`` as a string if the ``session``'s default is too low,
otherwise the ``session``'s default. Returns ``None`` if both
are ``None``.
:raises: TypeError if ``required`` is invalid.
:raises: :class:`~openstack.exceptions.SDKException` if requested
microversion is not supported.
"""
if required is not None:
required = discover.normalize_version_number(required)
if session.default_microversion is not None:
default = discover.normalize_version_number(
session.default_microversion
)
if required is None:
required = default
else:
required = (
default
if discover.version_match(required, default)
else required
)
if required is not None:
if not supports_microversion(session, required):
raise exceptions.SDKException(
'Requested microversion is not supported by the server side '
'or the default microversion is too low'
)
return discover.version_to_string(required)
def maximum_supported_microversion(adapter, client_maximum):
"""Determine the maximum microversion supported by both client and server.
:param adapter: :class:`~keystoneauth1.adapter.Adapter` instance.
:param client_maximum: Maximum microversion supported by the client.
If ``None``, ``None`` is returned.
:returns: the maximum supported microversion as string or ``None``.
"""
if client_maximum is None:
return None
# NOTE(dtantsur): if we cannot determine supported microversions, fall back
# to the default one.
try:
endpoint_data = adapter.get_endpoint_data()
except keystoneauth1.exceptions.discovery.DiscoveryFailure:
endpoint_data = None
if endpoint_data is None:
log = _log.setup_logging('openstack')
log.warning(
'Cannot determine endpoint data for service %s',
adapter.service_type or adapter.service_name,
)
return None
if not endpoint_data.max_microversion:
return None
client_max = discover.normalize_version_number(client_maximum)
server_max = discover.normalize_version_number(
endpoint_data.max_microversion
)
if endpoint_data.min_microversion:
server_min = discover.normalize_version_number(
endpoint_data.min_microversion
)
if client_max < server_min:
# NOTE(dtantsur): we may want to raise in this case, but this keeps
# the current behavior intact.
return None
result = min(client_max, server_max)
return discover.version_to_string(result)
def _hashes_up_to_date(md5, sha256, md5_key, sha256_key):
"""Compare md5 and sha256 hashes for being up to date
md5 and sha256 are the current values.
md5_key and sha256_key are the previous values.
"""
up_to_date = False
if md5 and md5_key == md5:
up_to_date = True
if sha256 and sha256_key == sha256:
up_to_date = True
if md5 and md5_key != md5:
up_to_date = False
if sha256 and sha256_key != sha256:
up_to_date = False
return up_to_date
def _calculate_data_hashes(data):
_md5 = hashlib.md5(usedforsecurity=False)
_sha256 = hashlib.sha256()
if hasattr(data, 'read'):
for chunk in iter(lambda: data.read(8192), b''):
_md5.update(chunk)
_sha256.update(chunk)
else:
_md5.update(data)
_sha256.update(data)
return (_md5.hexdigest(), _sha256.hexdigest())
def _get_file_hashes(filename):
(_md5, _sha256) = (None, None)
with open(filename, 'rb') as file_obj:
(_md5, _sha256) = _calculate_data_hashes(file_obj)
return (_md5, _sha256)
class TinyDAG:
"""Tiny DAG
Bases on the Kahn's algorithm, and enables parallel visiting of the nodes
(parallel execution of the workflow items).
"""
def __init__(self, data=None):
self._reset()
self._lock = threading.Lock()
if data and isinstance(data, dict):
self.from_dict(data)
def _reset(self):
self._graph = dict()
self._wait_timeout = 120
@property
def graph(self):
"""Get graph as adjacency dict"""
return self._graph
def add_node(self, node):
self._graph.setdefault(node, set())
def add_edge(self, u, v):
self._graph[u].add(v)
def from_dict(self, data):
self._reset()
for k, v in data.items():
self.add_node(k)
for dep in v:
self.add_edge(k, dep)
def walk(self, timeout=None):
"""Start the walking from the beginning."""
if timeout:
self._wait_timeout = timeout
return self
def __iter__(self):
self._start_traverse()
return self
def __next__(self):
# Start waiting if it is expected to get something
# (counting down from graph length to 0).
if self._it_cnt > 0:
self._it_cnt -= 1
try:
res = self._queue.get(block=True, timeout=self._wait_timeout)
return res
except queue.Empty:
raise exceptions.SDKException(
'Timeout waiting for cleanup task to complete'
)
else:
raise StopIteration
def node_done(self, node):
"""Mark node as "processed" and put following items into the queue"""
self._done.add(node)
for v in self._graph[node]:
self._run_in_degree[v] -= 1
if self._run_in_degree[v] == 0:
self._queue.put(v)
def _start_traverse(self):
"""Initialize graph traversing"""
self._run_in_degree = self._get_in_degree()
self._queue: queue.Queue[str] = queue.Queue()
self._done = set()
self._it_cnt = len(self._graph)
for k, v in self._run_in_degree.items():
if v == 0:
self._queue.put(k)
def _get_in_degree(self):
"""Calculate the in_degree (count incoming) for nodes"""
_in_degree: ty.Dict[str, int] = {u: 0 for u in self._graph.keys()}
for u in self._graph:
for v in self._graph[u]:
_in_degree[v] += 1
return _in_degree
def topological_sort(self):
"""Return the graph nodes in the topological order"""
result = []
for node in self:
result.append(node)
self.node_done(node)
return result
def size(self):
return len(self._graph.keys())
def is_complete(self):
return len(self._done) == self.size()
# Importing Munch is a relatively expensive operation (0.3s) while we do not
# really even need much of it. Before we can rework all places where we rely on
# it we can have a reduced version.
class Munch(dict):
"""A slightly stripped version of munch.Munch class"""
def __init__(self, *args, **kwargs):
self.update(*args, **kwargs)
# only called if k not found in normal places
def __getattr__(self, k):
"""Gets key if it exists, otherwise throws AttributeError."""
try:
return object.__getattribute__(self, k)
except AttributeError:
try:
return self[k]
except KeyError:
raise AttributeError(k)
def __setattr__(self, k, v):
"""Sets attribute k if it exists, otherwise sets key k. A KeyError
raised by set-item (only likely if you subclass Munch) will
propagate as an AttributeError instead.
"""
try:
# Throws exception if not in prototype chain
object.__getattribute__(self, k)
except AttributeError:
try:
self[k] = v
except Exception:
raise AttributeError(k)
else:
object.__setattr__(self, k, v)
def __delattr__(self, k):
"""Deletes attribute k if it exists, otherwise deletes key k.
A KeyError raised by deleting the key - such as when the key is missing
- will propagate as an AttributeError instead.
"""
try:
# Throws exception if not in prototype chain
object.__getattribute__(self, k)
except AttributeError:
try:
del self[k]
except KeyError:
raise AttributeError(k)
else:
object.__delattr__(self, k)
def toDict(self):
"""Recursively converts a munch back into a dictionary."""
return unmunchify(self)
@property
def __dict__(self):
return self.toDict()
def __repr__(self):
"""Invertible* string-form of a Munch."""
return f'{self.__class__.__name__}({dict.__repr__(self)})'
def __dir__(self):
return list(self.keys())
def __getstate__(self):
"""Implement a serializable interface used for pickling.
See https://docs.python.org/3.6/library/pickle.html.
"""
return {k: v for k, v in self.items()}
def __setstate__(self, state):
"""Implement a serializable interface used for pickling.
See https://docs.python.org/3.6/library/pickle.html.
"""
self.clear()
self.update(state)
@classmethod
def fromDict(cls, d):
"""Recursively transforms a dictionary into a Munch via copy."""
return munchify(d, cls)
def copy(self):
return type(self).fromDict(self)
def update(self, *args, **kwargs):
"""
Override built-in method to call custom __setitem__ method that may
be defined in subclasses.
"""
for k, v in dict(*args, **kwargs).items():
self[k] = v
def get(self, k, d=None):
"""
D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.
"""
if k not in self:
return d
return self[k]
def setdefault(self, k, d=None):
"""
D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D
"""
if k not in self:
self[k] = d
return self[k]
def munchify(x, factory=Munch):
"""Recursively transforms a dictionary into a Munch via copy."""
# Munchify x, using `seen` to track object cycles
seen: ty.Dict[int, ty.Any] = dict()
def munchify_cycles(obj):
try:
return seen[id(obj)]
except KeyError:
pass
seen[id(obj)] = partial = pre_munchify(obj)
return post_munchify(partial, obj)
def pre_munchify(obj):
if isinstance(obj, Mapping):
return factory({})
elif isinstance(obj, list):
return type(obj)()
elif isinstance(obj, tuple):
type_factory = getattr(obj, "_make", type(obj))
return type_factory(munchify_cycles(item) for item in obj)
else:
return obj
def post_munchify(partial, obj):
if isinstance(obj, Mapping):
partial.update((k, munchify_cycles(obj[k])) for k in obj.keys())
elif isinstance(obj, list):
partial.extend(munchify_cycles(item) for item in obj)
elif isinstance(obj, tuple):
for item_partial, item in zip(partial, obj):
post_munchify(item_partial, item)
return partial
return munchify_cycles(x)
def unmunchify(x):
"""Recursively converts a Munch into a dictionary."""
# Munchify x, using `seen` to track object cycles
seen: ty.Dict[int, ty.Any] = dict()
def unmunchify_cycles(obj):
try:
return seen[id(obj)]
except KeyError:
pass
seen[id(obj)] = partial = pre_unmunchify(obj)
return post_unmunchify(partial, obj)
def pre_unmunchify(obj):
if isinstance(obj, Mapping):
return dict()
elif isinstance(obj, list):
return type(obj)()
elif isinstance(obj, tuple):
type_factory = getattr(obj, "_make", type(obj))
return type_factory(unmunchify_cycles(item) for item in obj)
else:
return obj
def post_unmunchify(partial, obj):
if isinstance(obj, Mapping):
partial.update((k, unmunchify_cycles(obj[k])) for k in obj.keys())
elif isinstance(obj, list):
partial.extend(unmunchify_cycles(v) for v in obj)
elif isinstance(obj, tuple):
for value_partial, value in zip(partial, obj):
post_unmunchify(value_partial, value)
return partial
return unmunchify_cycles(x)