# Copyright 2017 Huawei Technologies Co.,LTD. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """Utilities and helper functions.""" from concurrent.futures import ThreadPoolExecutor as CFThreadPoolExecutor from functools import wraps import queue import time import traceback from keystoneauth1 import exceptions as ks_exc from keystoneauth1 import loading as ks_loading from openstack import connection from openstack import exceptions as sdk_exc from os_service_types import service_types from oslo_concurrency import lockutils from oslo_log import log from cyborg.common import exception from cyborg.common.i18n import _ import cyborg.conf LOG = log.getLogger(__name__) synchronized = lockutils.synchronized_with_prefix('cyborg-') _SERVICE_TYPES = service_types.ServiceTypes() CONF = cyborg.conf.CONF def safe_rstrip(value, chars=None): """Removes trailing characters from a string if that does not make it empty :param value: A string value that will be stripped. :param chars: Characters to remove. :return: Stripped value. """ if not isinstance(value, str): LOG.warning("Failed to remove trailing character. Returning " "original object. Supplied object is not a string: " "%s,", value) return value return value.rstrip(chars) or value def get_ksa_adapter(service_type, ksa_auth=None, ksa_session=None, min_version=None, max_version=None): """Construct a keystoneauth1 Adapter for a given service type. We expect to find a conf group whose name corresponds to the service_type's project according to the service-types-authority. That conf group must provide at least ksa adapter options. Depending how the result is to be used, ksa auth and/or session options may also be required, or the relevant parameter supplied. :param service_type: String name of the service type for which the Adapter is to be constructed. :param ksa_auth: A keystoneauth1 auth plugin. If not specified, we attempt to find one in ksa_session. Failing that, we attempt to load one from the conf. :param ksa_session: A keystoneauth1 Session. If not specified, we attempt to load one from the conf. :param min_version: The minimum major version of the adapter's endpoint, intended to be used as the lower bound of a range with max_version. If min_version is given with no max_version it is as if max version is 'latest'. :param max_version: The maximum major version of the adapter's endpoint, intended to be used as the upper bound of a range with min_version. :return: A keystoneauth1 Adapter object for the specified service_type. :raise: ConfGroupForServiceTypeNotFound If no conf group name could be found for the specified service_type. """ # Get the conf group corresponding to the service type. confgrp = _SERVICE_TYPES.get_project_name(service_type) if not confgrp or not hasattr(CONF, confgrp): # Try the service type as the conf group. This is necessary for e.g. # placement, while it's still part of the nova project. # Note that this might become the first thing we try if/as we move to # using service types for conf group names in general. confgrp = service_type if not confgrp or not hasattr(CONF, confgrp): raise exception.ConfGroupForServiceTypeNotFound(stype=service_type) # Ensure we have an auth. # NOTE(efried): This could be None, and that could be okay - e.g. if the # result is being used for get_endpoint() and the conf only contains # endpoint_override. if not ksa_auth: if ksa_session and ksa_session.auth: ksa_auth = ksa_session.auth else: ksa_auth = ks_loading.load_auth_from_conf_options(CONF, confgrp) if not ksa_session: ksa_session = ks_loading.load_session_from_conf_options( CONF, confgrp, auth=ksa_auth) return ks_loading.load_adapter_from_conf_options( CONF, confgrp, session=ksa_session, auth=ksa_auth, min_version=min_version, max_version=max_version) def _get_conf_group(service_type): # Get the conf group corresponding to the service type. confgrp = _SERVICE_TYPES.get_project_name(service_type) if not confgrp or not hasattr(CONF, confgrp): raise exception.ConfGroupForServiceTypeNotFound(stype=service_type) return confgrp def _get_auth_and_session(confgrp): ksa_auth = ks_loading.load_auth_from_conf_options(CONF, confgrp) return ks_loading.load_session_from_conf_options( CONF, confgrp, auth=ksa_auth) def get_sdk_adapter(service_type, check_service=False): """Construct an openstacksdk-brokered Adapter for a given service type. We expect to find a conf group whose name corresponds to the service_type's project according to the service-types-authority. That conf group must provide ksa auth, session, and adapter options. :param service_type: String name of the service type for which the Adapter is to be constructed. :param check_service: If True, we will query the endpoint to make sure the service is alive, raising ServiceUnavailable if it is not. :return: An openstack.proxy.Proxy object for the specified service_type. :raise: ConfGroupForServiceTypeNotFound If no conf group name could be found for the specified service_type. :raise: ServiceUnavailable if check_service is True and the service is down """ confgrp = _get_conf_group(service_type) sess = _get_auth_and_session(confgrp) try: conn = connection.Connection( session=sess, oslo_conf=CONF, service_types={service_type}, strict_proxies=check_service) except sdk_exc.ServiceDiscoveryException as e: raise exception.ServiceUnavailable( _("The %(service_type)s service is unavailable: %(error)s") % {'service_type': service_type, 'error': str(e)}) return getattr(conn, service_type) def get_endpoint(ksa_adapter): """Get the endpoint URL represented by a keystoneauth1 Adapter. This method is equivalent to what ksa_adapter.get_endpoint() should do, if it weren't for a panoply of bugs. :param ksa_adapter: keystoneauth1.adapter.Adapter, appropriately set up with an endpoint_override; or service_type, interface (list) and auth/service_catalog. :return: String endpoint URL. :raise EndpointNotFound: If endpoint discovery fails. """ # TODO(efried): The remainder of this method reduces to # TODO(efried): return ksa_adapter.get_endpoint() # TODO(efried): once bug #1709118 is fixed. # NOTE(efried): Id9bd19cca68206fc64d23b0eaa95aa3e5b01b676 may also do the # trick, once it's in a ksa release. # The EndpointNotFound exception happens when _ContextAuthPlugin is in play # because its get_endpoint() method isn't yet set up to handle interface as # a list. (It could also happen with a real auth if the endpoint isn't # there; but that's covered below.) try: return ksa_adapter.get_endpoint() except ks_exc.EndpointNotFound: pass interfaces = list(ksa_adapter.interface) for interface in interfaces: ksa_adapter.interface = interface try: return ksa_adapter.get_endpoint() except ks_exc.EndpointNotFound: pass raise ks_exc.EndpointNotFound( "Could not find requested endpoint for any of the following " "interfaces: %s" % interfaces) class _Singleton(type): """A metaclass that creates a Singleton base class when called.""" _instances = {} def __call__(cls, *args, **kwargs): ins = cls._instances.get(cls) if not ins or (hasattr(ins, "_reset") and isinstance(ins, cls) and ins._reset()): cls._instances[cls] = super( _Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls] class Singleton(_Singleton('SingletonMeta', (object,), {})): """A class for Singleton pattern.""" pass class ThreadPoolExecutor(CFThreadPoolExecutor): """Derived from concurrent.futures.ThreadPoolExecutor""" def __init__(self, *args, **kwargs): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. initializer: A callable used to initialize worker threads. initargs: A tuple of arguments to pass to the initializer. """ super(ThreadPoolExecutor, self).__init__(*args, **kwargs) # NOTE(Shaohe): py37/38 will use SimpleQueue as _work_queue, it will # cause hang the main thread with eventlet.monkey_patch. Change it # to queue._PySimpleQueue if hasattr(queue, "SimpleQueue") and not isinstance( self._work_queue, queue._PySimpleQueue): self._work_queue = queue._PySimpleQueue() class ThreadWorks(Singleton): """Passthrough method for ThreadPoolExecutor. It will also grab the context from the threadlocal store and add it to the store on the new thread. This allows for continuity in logging the context when using this method to spawn a new thread. """ def __init__(self, pool_size=CONF.thread_pool_size): """Singleton ThreadWorks init.""" # Ref: https://pythonhosted.org/futures/ # NOTE(Shaohe) We can let eventlet greening ThreadPoolExecutor # eventlet.patcher.monkey_patch(os=False, socket=True, # select=True, thread=True) # futures = eventlet.import_patched('concurrent.futures') # ThreadPoolExecutor = futures.ThreadPoolExecutor self.executor = ThreadPoolExecutor(max_workers=pool_size) self.masters = {} def spawn(self, func, *args, **kwargs): """Put a job in thread pool.""" LOG.debug("Add an async jobs. func: %s is with parameters args: %s, " "kwargs: %s", func, args, kwargs) future = self.executor.submit(func, *args, **kwargs) return future def spawn_master(self, func, *args, **kwargs): """Start a new thread for a job.""" executor = ThreadPoolExecutor() # TODO(Shaohe) every submit func should be wrapped with exception catch job = executor.submit(func, *args, **kwargs) LOG.debug("Spawn master job. func: %s is with parameters args: %s, " "kwargs: %s", func, args, kwargs) # NOTE(Shaohe) shutdown should be after job submit executor.shutdown(wait=False) # TODO(Shaohe) we need to consider resouce collection such as the # follow code to recoder them with timestemp? # master = {tag: { # "executor": executor, # "job": f, # "timestemp": time.time(), # "timeout": timeout}} # self.masters.update(master) return job def _reset(self): return self.executor._shutdown def map(self, func, *iterables, **kwargs): """Batch for job function.""" return self.executor.map(func, *iterables, **kwargs) @classmethod def get_workers_result(cls, fs=(), **kwargs): """get a jobs worker result. Waits workers util it finish or raise any Exception. It will cancel the rest if one job worker fails. If the future is cancelled before completing then CancelledError will be raised. Parameters: fs: the workers list spawn return. timeout: Wait workers timeout, it can be an int or float. If the worker hasn't yet completed then this method will wait up to timeout seconds. If the worker hasn't completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. If timeout is not specified or None, there is no limit to the wait time. return a generator which include: result: the value returned by the job workers. exception: the exception details raised from workers. state: The work state. """ def future_iterator(): # Yield must be hidden in closure so that the futures are submitted # before the first iterator value is required. try: # reverse to keep finishing order fs.reverse() while fs: # Careful not to keep a reference to the popped future f = None if timeout is None: f = fs.pop() yield f.result(), f.exception(), f._state, None else: f = fs.pop() yield (f.result(end_time - time.time()), f.exception(), f._state, None) except Exception as e: err = traceback.format_exc() LOG.error("Error during check the worker status. Exception " "info: %s", err) if f: LOG.error("Error during check the worker status. " "Exception info: %s, result: %s, state: %s. " "Reason %s", f.exception(), f._result, f._state, str(e)) yield f._result, f.exception(), f._state, err finally: # Do best to cancel remain jobs. if fs: LOG.info("Cancel the remained pending jobs") for future in fs: future.cancel() timeout = kwargs.get('timeout') if timeout is not None: end_time = timeout + time.time() LOG.info("Job timeout set as %s", timeout) fs = list(fs) return future_iterator() # info https://www.oreilly.com/library/view/python-cookbook/ # 0596001673/ch14s05.html def format_tb(tb, limit=None): """Fromat traceback to a string list. Print the usual traceback information, followed by a listing of all the local variables in each frame. """ if not tb: return [] tbs = ['Traceback (most recent call last):\n'] while 1: tbs = tbs + traceback.format_tb(tb, limit) if not tb.tb_next: break tb = tb.tb_next return tbs def wrap_job_tb(msg="Reason: %s"): """Wrap a function with a is_job tag added, and catch Excetpion.""" def _wrap_job_tb(method): @wraps(method) def _impl(self, *args, **kwargs): try: output = method(self, *args, **kwargs) except Exception as e: LOG.error(msg, str(e)) LOG.error(traceback.format_exc()) raise return output setattr(_impl, "is_job", True) return _impl return _wrap_job_tb def factory_register(SuperClass, ClassName): """Register an concrete class to a factory Class.""" def decorator(Class): # return Class if not hasattr(SuperClass, "_factory"): setattr(SuperClass, "_factory", {}) SuperClass._factory[ClassName] = Class setattr(Class, "_factory_type", ClassName) return Class return decorator class FactoryMixin(object): """A factory Mixin to create an concrete class.""" @classmethod def factory(cls, typ, *args, **kwargs): """factory to create an concrete class.""" f = getattr(cls, "_factory", {}) sclass = f.get(typ, None) if sclass: LOG.info("Find %s of concrete %s by %s.", sclass.__name__, cls.__name__, typ) return sclass for sclass in cls.__subclasses__(): if typ == getattr(cls, "_factory_type", None): return sclass else: return cls LOG.info("Use default %s, do not find concrete class" "by %s.", cls.__name__, typ) def strtime(at): return at.strftime("%Y-%m-%dT%H:%M:%S.%f")