
Fix [H405] rule in heat/engine python files. Implements bp docstring-improvements Change-Id: Iaa1541eb03c4db837ef3a0e4eb22393ba32e270f
428 lines
14 KiB
Python
428 lines
14 KiB
Python
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import sys
|
|
import types
|
|
|
|
import eventlet
|
|
from oslo_log import log as logging
|
|
from oslo_utils import excutils
|
|
import six
|
|
from six import reraise as raise_
|
|
|
|
from heat.common.i18n import _
|
|
from heat.common.i18n import _LI
|
|
from heat.common import timeutils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
# Whether TaskRunner._sleep actually does an eventlet sleep when called.
|
|
ENABLE_SLEEP = True
|
|
|
|
|
|
def task_description(task):
|
|
"""Return a human-readable string description of a task suitable.
|
|
|
|
Description is used for logging the status of the task.
|
|
"""
|
|
name = task.__name__ if hasattr(task, '__name__') else None
|
|
if isinstance(task, types.MethodType):
|
|
if name is not None and hasattr(task, '__self__'):
|
|
return '%s from %s' % (name, task.__self__)
|
|
elif isinstance(task, types.FunctionType):
|
|
if name is not None:
|
|
return six.text_type(name)
|
|
return repr(task)
|
|
|
|
|
|
class Timeout(BaseException):
|
|
"""Raised when task has exceeded its allotted (wallclock) running time.
|
|
|
|
This allows the task to perform any necessary cleanup, as well as use a
|
|
different exception to notify the controlling task if appropriate. If the
|
|
task suppresses the exception altogether, it will be cancelled but the
|
|
controlling task will not be notified of the timeout.
|
|
"""
|
|
|
|
def __init__(self, task_runner, timeout):
|
|
"""Initialise with the TaskRunner and a timeout period in seconds."""
|
|
message = _('%s Timed out') % six.text_type(task_runner)
|
|
super(Timeout, self).__init__(message)
|
|
|
|
self._duration = timeutils.Duration(timeout)
|
|
|
|
def expired(self):
|
|
return self._duration.expired()
|
|
|
|
def trigger(self, generator):
|
|
"""Trigger the timeout on a given generator."""
|
|
try:
|
|
generator.throw(self)
|
|
except StopIteration:
|
|
return True
|
|
else:
|
|
# Clean up in case task swallows exception without exiting
|
|
generator.close()
|
|
return False
|
|
|
|
def __eq__(self, other):
|
|
return not self < other and not other < self
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __gt__(self, other):
|
|
return other < self
|
|
|
|
def __ge__(self, other):
|
|
return not self < other
|
|
|
|
def __le__(self, other):
|
|
return not other < self
|
|
|
|
def __lt__(self, other):
|
|
if not isinstance(other, Timeout):
|
|
return NotImplemented
|
|
return self._duration.endtime() < other._duration.endtime()
|
|
|
|
def __cmp__(self, other):
|
|
return self < other
|
|
|
|
|
|
class TimedCancel(Timeout):
|
|
def trigger(self, generator):
|
|
"""Trigger the timeout on a given generator."""
|
|
generator.close()
|
|
return False
|
|
|
|
|
|
@six.python_2_unicode_compatible
|
|
class ExceptionGroup(Exception):
|
|
"""Container for multiple exceptions.
|
|
|
|
This exception is used by DependencyTaskGroup when the flag
|
|
aggregate_exceptions is set to True and it's re-raised again when all tasks
|
|
are finished. This way it can be caught later on so that the individual
|
|
exceptions can be acted upon.
|
|
"""
|
|
|
|
def __init__(self, exceptions=None):
|
|
if exceptions is None:
|
|
exceptions = list()
|
|
|
|
self.exceptions = list(exceptions)
|
|
|
|
def __str__(self):
|
|
return six.text_type([six.text_type(ex) for ex in self.exceptions])
|
|
|
|
|
|
@six.python_2_unicode_compatible
|
|
class TaskRunner(object):
|
|
"""Wrapper for a resumable task (co-routine)."""
|
|
|
|
def __init__(self, task, *args, **kwargs):
|
|
"""Initialise with a task function.
|
|
|
|
Arguments to be passed to task when it is started.
|
|
|
|
The task function may be a co-routine that yields control flow between
|
|
steps.
|
|
"""
|
|
assert callable(task), "Task is not callable"
|
|
|
|
self._task = task
|
|
self._args = args
|
|
self._kwargs = kwargs
|
|
self._runner = None
|
|
self._done = False
|
|
self._timeout = None
|
|
self.name = task_description(task)
|
|
|
|
def __str__(self):
|
|
"""Return a human-readable string representation of the task."""
|
|
text = 'Task %s' % self.name
|
|
return six.text_type(text)
|
|
|
|
def _sleep(self, wait_time):
|
|
"""Sleep for the specified number of seconds."""
|
|
if ENABLE_SLEEP and wait_time is not None:
|
|
LOG.debug('%s sleeping' % six.text_type(self))
|
|
eventlet.sleep(wait_time)
|
|
|
|
def __call__(self, wait_time=1, timeout=None):
|
|
"""Start and run the task to completion.
|
|
|
|
The task will first sleep for zero seconds, then sleep for `wait_time`
|
|
seconds between steps. To avoid sleeping, pass `None` for `wait_time`.
|
|
"""
|
|
self.start(timeout=timeout)
|
|
# ensure that zero second sleep is applied only if task
|
|
# has not completed.
|
|
if not self.done() and wait_time:
|
|
self._sleep(0)
|
|
self.run_to_completion(wait_time=wait_time)
|
|
|
|
def start(self, timeout=None):
|
|
"""Initialise the task and run its first step.
|
|
|
|
If a timeout is specified, any attempt to step the task after that
|
|
number of seconds has elapsed will result in a Timeout being
|
|
raised inside the task.
|
|
"""
|
|
assert self._runner is None, "Task already started"
|
|
assert not self._done, "Task already cancelled"
|
|
|
|
LOG.debug('%s starting' % six.text_type(self))
|
|
|
|
if timeout is not None:
|
|
self._timeout = Timeout(self, timeout)
|
|
|
|
result = self._task(*self._args, **self._kwargs)
|
|
if isinstance(result, types.GeneratorType):
|
|
self._runner = result
|
|
self.step()
|
|
else:
|
|
self._runner = False
|
|
self._done = True
|
|
LOG.debug('%s done (not resumable)' % six.text_type(self))
|
|
|
|
def step(self):
|
|
"""Run another step of the task.
|
|
|
|
:returns: True if the task is complete; False otherwise.
|
|
"""
|
|
if not self.done():
|
|
assert self._runner is not None, "Task not started"
|
|
|
|
if self._timeout is not None and self._timeout.expired():
|
|
LOG.info(_LI('%s timed out'), six.text_type(self))
|
|
self._done = True
|
|
|
|
self._timeout.trigger(self._runner)
|
|
else:
|
|
LOG.debug('%s running' % six.text_type(self))
|
|
|
|
try:
|
|
next(self._runner)
|
|
except StopIteration:
|
|
self._done = True
|
|
LOG.debug('%s complete' % six.text_type(self))
|
|
|
|
return self._done
|
|
|
|
def run_to_completion(self, wait_time=1):
|
|
"""Run the task to completion.
|
|
|
|
The task will sleep for `wait_time` seconds between steps. To avoid
|
|
sleeping, pass `None` for `wait_time`.
|
|
"""
|
|
while not self.step():
|
|
self._sleep(wait_time)
|
|
|
|
def cancel(self, grace_period=None):
|
|
"""Cancel the task and mark it as done."""
|
|
if self.done():
|
|
return
|
|
|
|
if not self.started() or grace_period is None:
|
|
LOG.debug('%s cancelled' % six.text_type(self))
|
|
self._done = True
|
|
if self.started():
|
|
self._runner.close()
|
|
else:
|
|
timeout = TimedCancel(self, grace_period)
|
|
if self._timeout is None or timeout < self._timeout:
|
|
self._timeout = timeout
|
|
|
|
def started(self):
|
|
"""Return True if the task has been started."""
|
|
return self._runner is not None
|
|
|
|
def done(self):
|
|
"""Return True if the task is complete."""
|
|
return self._done
|
|
|
|
def __nonzero__(self):
|
|
"""Return True if there are steps remaining."""
|
|
return not self.done()
|
|
|
|
def __bool__(self):
|
|
"""Return True if there are steps remaining."""
|
|
return self.__nonzero__()
|
|
|
|
|
|
def wrappertask(task):
|
|
"""Decorator for a task that needs to drive a subtask.
|
|
|
|
This is essentially a replacement for the Python 3-only "yield from"
|
|
keyword (PEP 380), using the "yield" keyword that is supported in
|
|
Python 2. For example::
|
|
|
|
@wrappertask
|
|
def parent_task(self):
|
|
self.setup()
|
|
|
|
yield self.child_task()
|
|
|
|
self.cleanup()
|
|
"""
|
|
|
|
@six.wraps(task)
|
|
def wrapper(*args, **kwargs):
|
|
parent = task(*args, **kwargs)
|
|
|
|
subtask = next(parent)
|
|
|
|
while True:
|
|
try:
|
|
if subtask is not None:
|
|
subtask_running = True
|
|
try:
|
|
step = next(subtask)
|
|
except StopIteration:
|
|
subtask_running = False
|
|
|
|
while subtask_running:
|
|
try:
|
|
yield step
|
|
except GeneratorExit as ex:
|
|
subtask.close()
|
|
raise ex
|
|
except: # noqa
|
|
try:
|
|
step = subtask.throw(*sys.exc_info())
|
|
except StopIteration:
|
|
subtask_running = False
|
|
else:
|
|
try:
|
|
step = next(subtask)
|
|
except StopIteration:
|
|
subtask_running = False
|
|
else:
|
|
yield
|
|
except GeneratorExit as ex:
|
|
parent.close()
|
|
raise ex
|
|
except: # noqa
|
|
subtask = parent.throw(*sys.exc_info())
|
|
else:
|
|
subtask = next(parent)
|
|
|
|
return wrapper
|
|
|
|
|
|
class DependencyTaskGroup(object):
|
|
"""Task which manages group of subtasks that have ordering dependencies."""
|
|
|
|
def __init__(self, dependencies, task=lambda o: o(),
|
|
reverse=False, name=None, error_wait_time=None,
|
|
aggregate_exceptions=False):
|
|
"""Initialise with the task dependencies.
|
|
|
|
Optionally initialise with a task to run on each.
|
|
|
|
If no task is supplied, it is assumed that the tasks are stored
|
|
directly in the dependency tree. If a task is supplied, the object
|
|
stored in the dependency tree is passed as an argument.
|
|
|
|
If an error_wait_time is specified, tasks that are already running at
|
|
the time of an error will continue to run for up to the specified
|
|
time before being cancelled. Once all remaining tasks are complete or
|
|
have been cancelled, the original exception is raised.
|
|
|
|
If aggregate_exceptions is True, then execution of parallel operations
|
|
will not be cancelled in the event of an error (operations downstream
|
|
of the error will be cancelled). Once all chains are complete, any
|
|
errors will be rolled up into an ExceptionGroup exception.
|
|
"""
|
|
self._runners = dict((o, TaskRunner(task, o)) for o in dependencies)
|
|
self._graph = dependencies.graph(reverse=reverse)
|
|
self.error_wait_time = error_wait_time
|
|
self.aggregate_exceptions = aggregate_exceptions
|
|
|
|
if name is None:
|
|
name = '(%s) %s' % (getattr(task, '__name__',
|
|
task_description(task)),
|
|
six.text_type(dependencies))
|
|
self.name = name
|
|
|
|
def __repr__(self):
|
|
"""Return a string representation of the task."""
|
|
text = '%s(%s)' % (type(self).__name__, self.name)
|
|
return six.text_type(text)
|
|
|
|
def __call__(self):
|
|
"""Return a co-routine which runs the task group."""
|
|
raised_exceptions = []
|
|
while any(six.itervalues(self._runners)):
|
|
try:
|
|
for k, r in self._ready():
|
|
r.start()
|
|
|
|
yield
|
|
|
|
for k, r in self._running():
|
|
if r.step():
|
|
del self._graph[k]
|
|
except Exception:
|
|
exc_info = sys.exc_info()
|
|
if self.aggregate_exceptions:
|
|
self._cancel_recursively(k, r)
|
|
else:
|
|
self.cancel_all(grace_period=self.error_wait_time)
|
|
raised_exceptions.append(exc_info)
|
|
except: # noqa
|
|
with excutils.save_and_reraise_exception():
|
|
self.cancel_all()
|
|
|
|
if raised_exceptions:
|
|
if self.aggregate_exceptions:
|
|
raise ExceptionGroup(v for t, v, tb in raised_exceptions)
|
|
else:
|
|
exc_type, exc_val, traceback = raised_exceptions[0]
|
|
raise_(exc_type, exc_val, traceback)
|
|
|
|
def cancel_all(self, grace_period=None):
|
|
for r in six.itervalues(self._runners):
|
|
r.cancel(grace_period=grace_period)
|
|
|
|
def _cancel_recursively(self, key, runner):
|
|
runner.cancel()
|
|
node = self._graph[key]
|
|
for dependent_node in node.required_by():
|
|
node_runner = self._runners[dependent_node]
|
|
self._cancel_recursively(dependent_node, node_runner)
|
|
|
|
del self._graph[key]
|
|
|
|
def _ready(self):
|
|
"""Iterate over all subtasks that are ready to start.
|
|
|
|
All subtasks' dependencies have been satisfied but they have not yet
|
|
been started.
|
|
"""
|
|
for k, n in six.iteritems(self._graph):
|
|
if not n:
|
|
runner = self._runners[k]
|
|
if runner and not runner.started():
|
|
yield k, runner
|
|
|
|
def _running(self):
|
|
"""Iterate over all subtasks that are currently running.
|
|
|
|
Subtasks have been started but have not yet completed.
|
|
"""
|
|
running = lambda k_r: k_r[0] in self._graph and k_r[1].started()
|
|
return six.moves.filter(running, six.iteritems(self._runners))
|