Merge "Add a locally running threaded flow"
This commit is contained in:
@@ -35,14 +35,28 @@ def wraps(fn):
|
||||
return wrapper
|
||||
|
||||
|
||||
def locked(f):
|
||||
def locked(*args, **kwargs):
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
with self._lock:
|
||||
return f(self, *args, **kwargs)
|
||||
def decorator(f):
|
||||
attr_name = kwargs.get('lock', '_lock')
|
||||
|
||||
return wrapper
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
lock = getattr(args[0], attr_name)
|
||||
with lock:
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
# This is needed to handle when the decorator has args or the decorator
|
||||
# doesn't have args, python is rather weird here...
|
||||
if kwargs or not args:
|
||||
return decorator
|
||||
else:
|
||||
if len(args) == 1:
|
||||
return decorator(args[0])
|
||||
else:
|
||||
return decorator
|
||||
|
||||
|
||||
def _original_function(fun):
|
||||
|
||||
@@ -68,10 +68,10 @@ class JobNotFound(TaskFlowException):
|
||||
|
||||
|
||||
class MissingDependencies(InvalidStateException):
|
||||
"""Raised when a task has dependencies that can not be satisified."""
|
||||
message = ("%(task)s requires %(requirements)s but no other task produces"
|
||||
"""Raised when a entity has dependencies that can not be satisified."""
|
||||
message = ("%(who)s requires %(requirements)s but no other entity produces"
|
||||
" said requirements")
|
||||
|
||||
def __init__(self, task, requirements):
|
||||
message = self.message % {'task': task, 'requirements': requirements}
|
||||
def __init__(self, who, requirements):
|
||||
message = self.message % {'who': who, 'requirements': requirements}
|
||||
super(MissingDependencies, self).__init__(message)
|
||||
|
||||
@@ -21,7 +21,6 @@ import threading
|
||||
|
||||
from taskflow.openstack.common import uuidutils
|
||||
|
||||
from taskflow import decorators
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import states
|
||||
from taskflow import utils
|
||||
@@ -86,14 +85,14 @@ class Flow(object):
|
||||
# storage backend).
|
||||
self.notifier = utils.TransitionNotifier()
|
||||
self.task_notifier = utils.TransitionNotifier()
|
||||
# Ensure that modifications and/or multiple runs aren't happening
|
||||
# at the same time in the same flow at the same time.
|
||||
self._lock = threading.RLock()
|
||||
# Assign this flow a unique identifer.
|
||||
if uuid:
|
||||
self._id = str(uuid)
|
||||
else:
|
||||
self._id = uuidutils.generate_uuid()
|
||||
# Ensure we can not change the state at the same time in 2 different
|
||||
# threads.
|
||||
self._state_lock = threading.RLock()
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
@@ -109,21 +108,26 @@ class Flow(object):
|
||||
"""Provides a read-only view of the flow state."""
|
||||
return self._state
|
||||
|
||||
def _change_state(self, context, new_state):
|
||||
was_changed = False
|
||||
old_state = self.state
|
||||
with self._lock:
|
||||
def _change_state(self, context, new_state, check_func=None, notify=True):
|
||||
old_state = None
|
||||
changed = False
|
||||
with self._state_lock:
|
||||
if self.state != new_state:
|
||||
old_state = self.state
|
||||
self._state = new_state
|
||||
was_changed = True
|
||||
if was_changed:
|
||||
# Don't notify while holding the lock.
|
||||
if (not check_func or
|
||||
(check_func and check_func(self.state))):
|
||||
changed = True
|
||||
old_state = self.state
|
||||
self._state = new_state
|
||||
# Don't notify while holding the lock so that the reciever of said
|
||||
# notifications can actually perform operations on the given flow
|
||||
# without getting into deadlock.
|
||||
if notify and changed:
|
||||
self.notifier.notify(self.state, details={
|
||||
'context': context,
|
||||
'flow': self,
|
||||
'old_state': old_state,
|
||||
})
|
||||
return changed
|
||||
|
||||
def __str__(self):
|
||||
lines = ["Flow: %s" % (self.name)]
|
||||
@@ -141,7 +145,6 @@ class Flow(object):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@decorators.locked
|
||||
def add_many(self, tasks):
|
||||
"""Adds many tasks to this flow.
|
||||
|
||||
@@ -158,54 +161,54 @@ class Flow(object):
|
||||
|
||||
Returns how many tasks were interrupted (if any).
|
||||
"""
|
||||
if self.state in self.UNINTERRUPTIBLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not interrupt when"
|
||||
" in state %s") % (self.state))
|
||||
# Note(harlowja): Do *not* acquire the lock here so that the flow may
|
||||
# be interrupted while running. This does mean the the above check may
|
||||
# not be valid but we can worry about that if it becomes an issue.
|
||||
old_state = self.state
|
||||
if old_state != states.INTERRUPTED:
|
||||
self._state = states.INTERRUPTED
|
||||
self.notifier.notify(self.state, details={
|
||||
'context': None,
|
||||
'flow': self,
|
||||
'old_state': old_state,
|
||||
})
|
||||
return 0
|
||||
def check():
|
||||
if self.state in self.UNINTERRUPTIBLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not interrupt when"
|
||||
" in state %s") % self.state)
|
||||
|
||||
check()
|
||||
with self._state_lock:
|
||||
check()
|
||||
self._change_state(None, states.INTERRUPTED)
|
||||
return 0
|
||||
|
||||
@decorators.locked
|
||||
def reset(self):
|
||||
"""Fully resets the internal state of this flow, allowing for the flow
|
||||
to be ran again.
|
||||
|
||||
Note: Listeners are also reset.
|
||||
"""
|
||||
if self.state not in self.RESETTABLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not reset when"
|
||||
" in state %s") % (self.state))
|
||||
self.notifier.reset()
|
||||
self.task_notifier.reset()
|
||||
self._change_state(None, states.PENDING)
|
||||
def check():
|
||||
if self.state not in self.RESETTABLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not reset when"
|
||||
" in state %s") % self.state)
|
||||
|
||||
check()
|
||||
with self._state_lock:
|
||||
check()
|
||||
self.notifier.reset()
|
||||
self.task_notifier.reset()
|
||||
self._change_state(None, states.PENDING)
|
||||
|
||||
@decorators.locked
|
||||
def soft_reset(self):
|
||||
"""Partially resets the internal state of this flow, allowing for the
|
||||
flow to be ran again from an interrupted state only.
|
||||
flow to be ran again from an interrupted state.
|
||||
"""
|
||||
if self.state not in self.SOFT_RESETTABLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not soft reset when"
|
||||
" in state %s") % (self.state))
|
||||
self._change_state(None, states.PENDING)
|
||||
def check():
|
||||
if self.state not in self.SOFT_RESETTABLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not soft reset when"
|
||||
" in state %s") % self.state)
|
||||
|
||||
@decorators.locked
|
||||
check()
|
||||
with self._state_lock:
|
||||
check()
|
||||
self._change_state(None, states.PENDING)
|
||||
|
||||
@abc.abstractmethod
|
||||
def run(self, context, *args, **kwargs):
|
||||
"""Executes the workflow."""
|
||||
if self.state not in self.RUNNABLE_STATES:
|
||||
raise exc.InvalidStateException("Unable to run flow when "
|
||||
"in state %s" % (self.state))
|
||||
raise NotImplementedError()
|
||||
|
||||
@decorators.locked
|
||||
def rollback(self, context, cause):
|
||||
"""Performs rollback of this workflow and any attached parent workflows
|
||||
if present.
|
||||
|
||||
@@ -49,7 +49,7 @@ class Flow(linear_flow.Flow):
|
||||
# to infer the edges at this stage we likely will fail finding
|
||||
# dependencies from nodes that don't exist.
|
||||
assert isinstance(task, collections.Callable)
|
||||
r = utils.Runner(task)
|
||||
r = utils.AOTRunner(task)
|
||||
self._graph.add_node(r, uuid=r.uuid, infer=infer)
|
||||
self._reset_internals()
|
||||
return r.uuid
|
||||
|
||||
@@ -17,7 +17,9 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from taskflow.openstack.common import excutils
|
||||
|
||||
@@ -54,6 +56,7 @@ class Flow(flow.Flow):
|
||||
# All runners to run are collected here.
|
||||
self._runners = []
|
||||
self._connected = False
|
||||
self._lock = threading.RLock()
|
||||
# The resumption strategy to use.
|
||||
self.resumer = None
|
||||
|
||||
@@ -61,7 +64,7 @@ class Flow(flow.Flow):
|
||||
def add(self, task):
|
||||
"""Adds a given task to this flow."""
|
||||
assert isinstance(task, collections.Callable)
|
||||
r = utils.Runner(task)
|
||||
r = utils.AOTRunner(task)
|
||||
r.runs_before = list(reversed(self._runners))
|
||||
self._runners.append(r)
|
||||
self._reset_internals()
|
||||
@@ -136,20 +139,27 @@ class Flow(flow.Flow):
|
||||
|
||||
@decorators.locked
|
||||
def run(self, context, *args, **kwargs):
|
||||
super(Flow, self).run(context, *args, **kwargs)
|
||||
|
||||
def abort_if(current_state, ok_states):
|
||||
if current_state not in ok_states:
|
||||
return False
|
||||
return True
|
||||
|
||||
def resume_it():
|
||||
if self._leftoff_at is not None:
|
||||
return ([], self._leftoff_at)
|
||||
if self.resumer:
|
||||
(finished, leftover) = self.resumer.resume(self,
|
||||
self._ordering())
|
||||
(finished, leftover) = self.resumer(self, self._ordering())
|
||||
else:
|
||||
finished = []
|
||||
leftover = self._ordering()
|
||||
return (finished, leftover)
|
||||
|
||||
self._change_state(context, states.STARTED)
|
||||
start_check_functor = functools.partial(abort_if,
|
||||
ok_states=self.RUNNABLE_STATES)
|
||||
if not self._change_state(context, states.STARTED,
|
||||
check_func=start_check_functor):
|
||||
return
|
||||
try:
|
||||
those_finished, leftover = resume_it()
|
||||
except Exception:
|
||||
@@ -211,8 +221,13 @@ class Flow(flow.Flow):
|
||||
})
|
||||
self.rollback(context, cause)
|
||||
|
||||
run_check_functor = functools.partial(abort_if,
|
||||
ok_states=[states.STARTED,
|
||||
states.RESUMING])
|
||||
if len(those_finished):
|
||||
self._change_state(context, states.RESUMING)
|
||||
if not self._change_state(context, states.RESUMING,
|
||||
check_func=run_check_functor):
|
||||
return
|
||||
for (r, details) in those_finished:
|
||||
# Fake running the task so that we trigger the same
|
||||
# notifications and state changes (and rollback that
|
||||
@@ -222,8 +237,8 @@ class Flow(flow.Flow):
|
||||
run_it(r, failed=failed, result=result, simulate_run=True)
|
||||
|
||||
self._leftoff_at = leftover
|
||||
self._change_state(context, states.RUNNING)
|
||||
if self.state == states.INTERRUPTED:
|
||||
if not self._change_state(context, states.RUNNING,
|
||||
check_func=run_check_functor):
|
||||
return
|
||||
|
||||
was_interrupted = False
|
||||
|
||||
@@ -111,7 +111,7 @@ class Resumption(object):
|
||||
details = self._reconcile_versions(immediate_version, details)
|
||||
return (True, details)
|
||||
|
||||
def resume(self, flow, ordering):
|
||||
def __call__(self, flow, ordering):
|
||||
"""Splits the initial ordering into two segments, the first which
|
||||
has already completed (or errored) and the second which has not
|
||||
completed or errored.
|
||||
|
||||
636
taskflow/patterns/threaded_flow.py
Normal file
636
taskflow/patterns/threaded_flow.py
Normal file
@@ -0,0 +1,636 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import flow
|
||||
from taskflow import graph_utils
|
||||
from taskflow import states
|
||||
from taskflow import utils
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import logging
|
||||
import sys
|
||||
import threading
|
||||
import weakref
|
||||
|
||||
from networkx.algorithms import cycles
|
||||
from networkx.classes import digraph
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DependencyTimeout(exc.InvalidStateException):
|
||||
"""When running in parallel a task has the ability to timeout waiting for
|
||||
its dependent tasks to finish, this will be raised when that occurs.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class Flow(flow.Flow):
|
||||
"""This flow pattern establishes tasks into a graph where each task is a
|
||||
node in the graph and dependencies between tasks are edges in the graph.
|
||||
When running (in parallel) each task will only be activated when its
|
||||
dependencies have been satisified. When a graph is split into two or more
|
||||
segments, both of those segments will be ran in parallel.
|
||||
|
||||
For example lets take this small little *somewhat complicated* graph:
|
||||
|
||||
X--Y--C--D
|
||||
| |
|
||||
A--B-- --G--
|
||||
| | |--Z(end)
|
||||
E--F-- --H--
|
||||
|
||||
In this flow the following will be ran in parallel at start:
|
||||
1. X--Y
|
||||
2. A--B
|
||||
3. E--F
|
||||
Note the C--D nodes will not be able to run until [Y,B,F] has completed.
|
||||
After C--D completes the following will be ran in parallel:
|
||||
1. G
|
||||
2. H
|
||||
Then finally Z will run (after [G,H] complete) and the flow will then have
|
||||
finished executing.
|
||||
"""
|
||||
MUTABLE_STATES = set([states.PENDING, states.FAILURE, states.SUCCESS])
|
||||
REVERTABLE_STATES = set([states.FAILURE, states.INCOMPLETE])
|
||||
CANCELLABLE_STATES = set([states.PENDING, states.RUNNING])
|
||||
|
||||
def __init__(self, name):
|
||||
super(Flow, self).__init__(name)
|
||||
self._graph = digraph.DiGraph(name=name)
|
||||
self._run_lock = threading.RLock()
|
||||
self._cancel_lock = threading.RLock()
|
||||
self._mutate_lock = threading.RLock()
|
||||
# NOTE(harlowja) The locking order in this list actually matters since
|
||||
# we need to make sure that users of this list do not get deadlocked
|
||||
# by out of order lock access.
|
||||
self._core_locks = [
|
||||
self._run_lock,
|
||||
self._mutate_lock,
|
||||
self._cancel_lock,
|
||||
]
|
||||
self._run_locks = [
|
||||
self._run_lock,
|
||||
self._mutate_lock,
|
||||
]
|
||||
self._cancel_locks = [
|
||||
self._cancel_lock,
|
||||
]
|
||||
self.results = {}
|
||||
self.resumer = None
|
||||
|
||||
def __str__(self):
|
||||
lines = ["ParallelFlow: %s" % (self.name)]
|
||||
lines.append("%s" % (self._graph.number_of_nodes()))
|
||||
lines.append("%s" % (self.state))
|
||||
return "; ".join(lines)
|
||||
|
||||
def soft_reset(self):
|
||||
# The way this flow works does not allow (at the current moment) for
|
||||
# you to suspend the threads and then resume them at a later time,
|
||||
# instead it only supports interruption (which will cancel the threads)
|
||||
# and then a full reset.
|
||||
raise NotImplementedError("Threaded flow does not currently support"
|
||||
" soft resetting, please try using"
|
||||
" reset() instead")
|
||||
|
||||
def interrupt(self):
|
||||
"""Currently we can not pause threads and then resume them later, not
|
||||
really thinking that we should likely ever do this.
|
||||
"""
|
||||
raise NotImplementedError("Threaded flow does not currently support"
|
||||
" interruption, please try using"
|
||||
" cancel() instead")
|
||||
|
||||
def reset(self):
|
||||
# All locks are used so that resets can not happen while running or
|
||||
# cancelling or modifying.
|
||||
with utils.MultiLock(self._core_locks):
|
||||
super(Flow, self).reset()
|
||||
self.results = {}
|
||||
self.resumer = None
|
||||
|
||||
def cancel(self):
|
||||
|
||||
def check():
|
||||
if self.state not in self.CANCELLABLE_STATES:
|
||||
raise exc.InvalidStateException("Can not attempt cancellation"
|
||||
" when in state %s" %
|
||||
self.state)
|
||||
|
||||
check()
|
||||
cancelled = 0
|
||||
was_empty = False
|
||||
|
||||
# We don't lock the other locks so that the flow can be cancelled while
|
||||
# running. Further state management logic is then used while running
|
||||
# to verify that the flow should still be running when it has been
|
||||
# cancelled.
|
||||
with utils.MultiLock(self._cancel_locks):
|
||||
check()
|
||||
if len(self._graph) == 0:
|
||||
was_empty = True
|
||||
else:
|
||||
for r in self._graph.nodes_iter():
|
||||
try:
|
||||
if r.cancel(blocking=False):
|
||||
cancelled += 1
|
||||
except exc.InvalidStateException:
|
||||
pass
|
||||
if cancelled or was_empty:
|
||||
self._change_state(None, states.CANCELLED)
|
||||
|
||||
return cancelled
|
||||
|
||||
def _find_uuid(self, uuid):
|
||||
# Finds the runner for the given uuid (or returns none)
|
||||
for r in self._graph.nodes_iter():
|
||||
if r.uuid == uuid:
|
||||
return r
|
||||
return None
|
||||
|
||||
def add(self, task, timeout=None, infer=True):
|
||||
"""Adds a task to the given flow using the given timeout which will be
|
||||
used a the timeout to wait for dependencies (if any) to be
|
||||
fulfilled.
|
||||
"""
|
||||
def check():
|
||||
if self.state not in self.MUTABLE_STATES:
|
||||
raise exc.InvalidStateException("Flow is currently in a"
|
||||
" non-mutable %s state" %
|
||||
(self.state))
|
||||
|
||||
# Ensure that we do a quick check to see if we can even perform this
|
||||
# addition before we go about actually acquiring the lock to perform
|
||||
# the actual addition.
|
||||
check()
|
||||
|
||||
# All locks must be acquired so that modifications can not be made
|
||||
# while running, cancelling or performing a simultaneous mutation.
|
||||
with utils.MultiLock(self._core_locks):
|
||||
check()
|
||||
runner = ThreadRunner(task, self, timeout)
|
||||
self._graph.add_node(runner, infer=infer)
|
||||
return runner.uuid
|
||||
|
||||
def _connect(self):
|
||||
"""Infers and connects the edges of the given tasks by examining the
|
||||
associated tasks provides and requires attributes and connecting tasks
|
||||
that require items to tasks that produce said items.
|
||||
"""
|
||||
|
||||
# Disconnect all edges not manually created before we attempt to infer
|
||||
# them so that we don't retain edges that are invalid.
|
||||
def disconnect_non_user(u, v, e_data):
|
||||
if e_data and e_data.get('reason') != 'manual':
|
||||
return True
|
||||
return False
|
||||
|
||||
# Link providers to requirers.
|
||||
graph_utils.connect(self._graph,
|
||||
discard_func=disconnect_non_user)
|
||||
|
||||
# Connect the successors & predecessors and related siblings
|
||||
for r in self._graph.nodes_iter():
|
||||
r._predecessors = []
|
||||
r._successors = []
|
||||
for (r2, _me) in self._graph.in_edges_iter([r]):
|
||||
r._predecessors.append(r2)
|
||||
for (_me, r2) in self._graph.out_edges_iter([r]):
|
||||
r._successors.append(r2)
|
||||
r.siblings = []
|
||||
for r2 in self._graph.nodes_iter():
|
||||
if r2 is r or r2 in r._predecessors or r2 in r._successors:
|
||||
continue
|
||||
r._siblings.append(r2)
|
||||
|
||||
def add_many(self, tasks):
|
||||
"""Adds a list of tasks to the flow."""
|
||||
|
||||
def check():
|
||||
if self.state not in self.MUTABLE_STATES:
|
||||
raise exc.InvalidStateException("Flow is currently in a"
|
||||
" non-mutable state %s"
|
||||
% (self.state))
|
||||
|
||||
# Ensure that we do a quick check to see if we can even perform this
|
||||
# addition before we go about actually acquiring the lock.
|
||||
check()
|
||||
|
||||
# All locks must be acquired so that modifications can not be made
|
||||
# while running, cancelling or performing a simultaneous mutation.
|
||||
with utils.MultiLock(self._core_locks):
|
||||
check()
|
||||
added = []
|
||||
for t in tasks:
|
||||
added.append(self.add(t))
|
||||
return added
|
||||
|
||||
def add_dependency(self, provider_uuid, consumer_uuid):
|
||||
"""Manually adds a dependency between a provider and a consumer."""
|
||||
|
||||
def check_and_fetch():
|
||||
if self.state not in self.MUTABLE_STATES:
|
||||
raise exc.InvalidStateException("Flow is currently in a"
|
||||
" non-mutable state %s"
|
||||
% (self.state))
|
||||
provider = self._find_uuid(provider_uuid)
|
||||
if not provider or not self._graph.has_node(provider):
|
||||
raise exc.InvalidStateException("Can not add a dependency "
|
||||
"from unknown uuid %s" %
|
||||
(provider_uuid))
|
||||
consumer = self._find_uuid(consumer_uuid)
|
||||
if not consumer or not self._graph.has_node(consumer):
|
||||
raise exc.InvalidStateException("Can not add a dependency "
|
||||
"to unknown uuid %s"
|
||||
% (consumer_uuid))
|
||||
if provider is consumer:
|
||||
raise exc.InvalidStateException("Can not add a dependency "
|
||||
"to loop via uuid %s"
|
||||
% (consumer_uuid))
|
||||
return (provider, consumer)
|
||||
|
||||
check_and_fetch()
|
||||
|
||||
# All locks must be acquired so that modifications can not be made
|
||||
# while running, cancelling or performing a simultaneous mutation.
|
||||
with utils.MultiLock(self._core_locks):
|
||||
(provider, consumer) = check_and_fetch()
|
||||
self._graph.add_edge(provider, consumer, reason='manual')
|
||||
LOG.debug("Connecting %s as a manual provider for %s",
|
||||
provider, consumer)
|
||||
|
||||
def run(self, context, *args, **kwargs):
|
||||
"""Executes the given flow using the given context and args/kwargs."""
|
||||
|
||||
def abort_if(current_state, ok_states):
|
||||
if current_state in (states.CANCELLED,):
|
||||
return False
|
||||
if current_state not in ok_states:
|
||||
return False
|
||||
return True
|
||||
|
||||
def check():
|
||||
if self.state not in self.RUNNABLE_STATES:
|
||||
raise exc.InvalidStateException("Flow is currently unable "
|
||||
"to be ran in state %s"
|
||||
% (self.state))
|
||||
|
||||
def connect_and_verify():
|
||||
"""Do basic sanity tests on the graph structure."""
|
||||
if len(self._graph) == 0:
|
||||
return
|
||||
self._connect()
|
||||
degrees = [g[1] for g in self._graph.in_degree_iter()]
|
||||
zero_degrees = [d for d in degrees if d == 0]
|
||||
if not zero_degrees:
|
||||
# If every task depends on something else to produce its input
|
||||
# then we will be in a deadlock situation.
|
||||
raise exc.InvalidStateException("No task has an in-degree"
|
||||
" of zero")
|
||||
self_loops = self._graph.nodes_with_selfloops()
|
||||
if self_loops:
|
||||
# A task that has a dependency on itself will never be able
|
||||
# to run.
|
||||
raise exc.InvalidStateException("%s tasks have been detected"
|
||||
" with dependencies on"
|
||||
" themselves" %
|
||||
len(self_loops))
|
||||
simple_cycles = len(cycles.recursive_simple_cycles(self._graph))
|
||||
if simple_cycles:
|
||||
# A task loop will never be able to run, unless it somehow
|
||||
# breaks that loop.
|
||||
raise exc.InvalidStateException("%s tasks have been detected"
|
||||
" with dependency loops" %
|
||||
simple_cycles)
|
||||
|
||||
def run_it(result_cb, args, kwargs):
|
||||
check_runnable = functools.partial(abort_if,
|
||||
ok_states=self.RUNNABLE_STATES)
|
||||
if self._change_state(context, states.RUNNING,
|
||||
check_func=check_runnable):
|
||||
self.results = {}
|
||||
if len(self._graph) == 0:
|
||||
return
|
||||
for r in self._graph.nodes_iter():
|
||||
r.reset()
|
||||
r._result_cb = result_cb
|
||||
executor = utils.ThreadGroupExecutor()
|
||||
for r in self._graph.nodes_iter():
|
||||
executor.submit(r, *args, **kwargs)
|
||||
executor.await_termination()
|
||||
|
||||
def trigger_rollback(failures):
|
||||
if not failures:
|
||||
return
|
||||
causes = []
|
||||
for r in failures:
|
||||
causes.append(utils.FlowFailure(r, self,
|
||||
r.exc, r.exc_info))
|
||||
try:
|
||||
self.rollback(context, causes)
|
||||
except exc.InvalidStateException:
|
||||
pass
|
||||
finally:
|
||||
# TODO(harlowja): re-raise a combined exception when
|
||||
# there are more than one failures??
|
||||
for f in failures:
|
||||
if all(f.exc_info):
|
||||
raise f.exc_info[0], f.exc_info[1], f.exc_info[2]
|
||||
|
||||
def handle_results():
|
||||
# Isolate each runner state into groups so that we can easily tell
|
||||
# which ones failed, cancelled, completed...
|
||||
groups = collections.defaultdict(list)
|
||||
for r in self._graph.nodes_iter():
|
||||
groups[r.state].append(r)
|
||||
for r in self._graph.nodes_iter():
|
||||
if r not in groups.get(states.FAILURE, []) and r.has_ran():
|
||||
self.results[r.uuid] = r.result
|
||||
if groups[states.FAILURE]:
|
||||
self._change_state(context, states.FAILURE)
|
||||
trigger_rollback(groups[states.FAILURE])
|
||||
elif (groups[states.CANCELLED] or groups[states.PENDING]
|
||||
or groups[states.TIMED_OUT] or groups[states.STARTED]):
|
||||
self._change_state(context, states.INCOMPLETE)
|
||||
else:
|
||||
check_ran = functools.partial(abort_if,
|
||||
ok_states=[states.RUNNING])
|
||||
self._change_state(context, states.SUCCESS,
|
||||
check_func=check_ran)
|
||||
|
||||
def get_resumer_cb():
|
||||
if not self.resumer:
|
||||
return None
|
||||
(ran, _others) = self.resumer(self, self._graph.nodes_iter())
|
||||
|
||||
def fetch_results(runner):
|
||||
for (r, metadata) in ran:
|
||||
if r is runner:
|
||||
return (True, metadata.get('result'))
|
||||
return (False, None)
|
||||
|
||||
result_cb = fetch_results
|
||||
return result_cb
|
||||
|
||||
args = [context] + list(args)
|
||||
check()
|
||||
|
||||
# Only acquire the run lock (but use further state checking) and the
|
||||
# mutation lock to stop simultaneous running and simultaneous mutating
|
||||
# which are not allowed on a running flow. Allow simultaneous cancel
|
||||
# by performing repeated state checking while running.
|
||||
with utils.MultiLock(self._run_locks):
|
||||
check()
|
||||
connect_and_verify()
|
||||
try:
|
||||
run_it(get_resumer_cb(), args, kwargs)
|
||||
finally:
|
||||
handle_results()
|
||||
|
||||
def rollback(self, context, cause):
|
||||
"""Rolls back all tasks that are *not* still pending or cancelled."""
|
||||
|
||||
def check():
|
||||
if self.state not in self.REVERTABLE_STATES:
|
||||
raise exc.InvalidStateException("Flow is currently unable "
|
||||
"to be rolled back in "
|
||||
"state %s" % (self.state))
|
||||
|
||||
check()
|
||||
|
||||
# All locks must be acquired so that modifications can not be made
|
||||
# while another entity is running, rolling-back, cancelling or
|
||||
# performing a mutation operation.
|
||||
with utils.MultiLock(self._core_locks):
|
||||
check()
|
||||
accum = utils.RollbackAccumulator()
|
||||
for r in self._graph.nodes_iter():
|
||||
if r.has_ran():
|
||||
accum.add(utils.RollbackTask(context, r.task, r.result))
|
||||
try:
|
||||
self._change_state(context, states.REVERTING)
|
||||
accum.rollback(cause)
|
||||
finally:
|
||||
self._change_state(context, states.FAILURE)
|
||||
|
||||
|
||||
class ThreadRunner(utils.Runner):
|
||||
"""A helper class that will use a countdown latch to avoid calling its
|
||||
callable object until said countdown latch has emptied. After it has
|
||||
been emptied the predecessor tasks will be examined for dependent results
|
||||
and said results will then be provided to call the runners callable
|
||||
object.
|
||||
|
||||
TODO(harlowja): this could be a 'future' like object in the future since it
|
||||
is starting to have the same purpose and usage (in a way). Likely switch
|
||||
this over to the task details object or a subclass of it???
|
||||
"""
|
||||
RESETTABLE_STATES = set([states.PENDING, states.SUCCESS, states.FAILURE,
|
||||
states.CANCELLED])
|
||||
RUNNABLE_STATES = set([states.PENDING])
|
||||
CANCELABLE_STATES = set([states.PENDING])
|
||||
SUCCESS_STATES = set([states.SUCCESS])
|
||||
CANCEL_SUCCESSORS_WHEN = set([states.FAILURE, states.CANCELLED,
|
||||
states.TIMED_OUT])
|
||||
NO_RAN_STATES = set([states.CANCELLED, states.PENDING, states.TIMED_OUT,
|
||||
states.RUNNING])
|
||||
|
||||
def __init__(self, task, flow, timeout):
|
||||
super(ThreadRunner, self).__init__(task)
|
||||
# Use weak references to give the GC a break.
|
||||
self._flow = weakref.proxy(flow)
|
||||
self._notifier = flow.task_notifier
|
||||
self._timeout = timeout
|
||||
self._state = states.PENDING
|
||||
self._run_lock = threading.RLock()
|
||||
# Use the flows state lock so that state notifications are not sent
|
||||
# simultaneously for a given flow.
|
||||
self._state_lock = flow._state_lock
|
||||
self._cancel_lock = threading.RLock()
|
||||
self._latch = utils.CountDownLatch()
|
||||
# Any related family.
|
||||
self._predecessors = []
|
||||
self._successors = []
|
||||
self._siblings = []
|
||||
# Ensure we capture any exceptions that may have been triggered.
|
||||
self.exc = None
|
||||
self.exc_info = (None, None, None)
|
||||
# This callback will be called before the underlying task is actually
|
||||
# returned and it should either return a tuple of (has_result, result)
|
||||
self._result_cb = None
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
return self._state
|
||||
|
||||
def has_ran(self):
|
||||
if self.state in self.NO_RAN_STATES:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _change_state(self, context, new_state):
|
||||
old_state = None
|
||||
changed = False
|
||||
with self._state_lock:
|
||||
if self.state != new_state:
|
||||
old_state = self.state
|
||||
self._state = new_state
|
||||
changed = True
|
||||
# Don't notify while holding the lock so that the reciever of said
|
||||
# notifications can actually perform operations on the given runner
|
||||
# without getting into deadlock.
|
||||
if changed and self._notifier:
|
||||
self._notifier.notify(self.state, details={
|
||||
'context': context,
|
||||
'flow': self._flow,
|
||||
'old_state': old_state,
|
||||
'runner': self,
|
||||
})
|
||||
|
||||
def cancel(self, blocking=True):
|
||||
|
||||
def check():
|
||||
if self.state not in self.CANCELABLE_STATES:
|
||||
raise exc.InvalidStateException("Runner not in a cancelable"
|
||||
" state: %s" % (self.state))
|
||||
|
||||
# Check before as a quick way out of attempting to acquire the more
|
||||
# heavy-weight lock. Then acquire the lock (which should not be
|
||||
# possible if we are currently running) and set the state (if still
|
||||
# applicable).
|
||||
check()
|
||||
acquired = False
|
||||
cancelled = False
|
||||
try:
|
||||
acquired = self._cancel_lock.acquire(blocking=blocking)
|
||||
if acquired:
|
||||
check()
|
||||
cancelled = True
|
||||
self._change_state(None, states.CANCELLED)
|
||||
finally:
|
||||
if acquired:
|
||||
self._cancel_lock.release()
|
||||
return cancelled
|
||||
|
||||
def reset(self):
|
||||
|
||||
def check():
|
||||
if self.state not in self.RESETTABLE_STATES:
|
||||
raise exc.InvalidStateException("Runner not in a resettable"
|
||||
" state: %s" % (self.state))
|
||||
|
||||
def do_reset():
|
||||
self._latch.count = len(self._predecessors)
|
||||
self.exc = None
|
||||
self.exc_info = (None, None, None)
|
||||
self.result = None
|
||||
self._change_state(None, states.PENDING)
|
||||
|
||||
# We need to acquire both locks here so that we can not be running
|
||||
# or being cancelled at the same time we are resetting.
|
||||
check()
|
||||
with self._run_lock:
|
||||
check()
|
||||
with self._cancel_lock:
|
||||
check()
|
||||
do_reset()
|
||||
|
||||
@property
|
||||
def runs_before(self):
|
||||
# NOTE(harlowja): this list may change, depending on which other
|
||||
# runners have completed (or are currently actively running), so
|
||||
# this is why this is a property instead of a semi-static defined list
|
||||
# like in the AOT class. The list should only get bigger and not
|
||||
# smaller so it should be fine to filter on runners that have completed
|
||||
# successfully.
|
||||
finished_ok = []
|
||||
for r in self._siblings:
|
||||
if r.has_ran() and r.state in self.SUCCESS_STATES:
|
||||
finished_ok.append(r)
|
||||
return finished_ok
|
||||
|
||||
def __call__(self, context, *args, **kwargs):
|
||||
|
||||
def is_runnable():
|
||||
if self.state not in self.RUNNABLE_STATES:
|
||||
return False
|
||||
return True
|
||||
|
||||
def run(*args, **kwargs):
|
||||
try:
|
||||
self._change_state(context, states.RUNNING)
|
||||
has_result = False
|
||||
if self._result_cb:
|
||||
has_result, self.result = self._result_cb(self)
|
||||
if not has_result:
|
||||
super(ThreadRunner, self).__call__(*args, **kwargs)
|
||||
self._change_state(context, states.SUCCESS)
|
||||
except Exception as e:
|
||||
self._change_state(context, states.FAILURE)
|
||||
self.exc = e
|
||||
self.exc_info = sys.exc_info()
|
||||
|
||||
def signal():
|
||||
if not self._successors:
|
||||
return
|
||||
if self.state in self.CANCEL_SUCCESSORS_WHEN:
|
||||
for r in self._successors:
|
||||
try:
|
||||
r.cancel(blocking=False)
|
||||
except exc.InvalidStateException:
|
||||
pass
|
||||
for r in self._successors:
|
||||
try:
|
||||
r._latch.countDown()
|
||||
except Exception:
|
||||
LOG.exception("Failed decrementing %s latch", r)
|
||||
|
||||
# We check before to avoid attempting to acquire the lock when we are
|
||||
# known to be in a non-runnable state.
|
||||
if not is_runnable():
|
||||
return
|
||||
args = [context] + list(args)
|
||||
with self._run_lock:
|
||||
# We check after we now own the run lock since a previous thread
|
||||
# could have exited and released that lock and set the state to
|
||||
# not runnable.
|
||||
if not is_runnable():
|
||||
return
|
||||
may_proceed = self._latch.await(self._timeout)
|
||||
# We now acquire the cancel lock so that we can be assured that
|
||||
# we have not been cancelled by another entity.
|
||||
with self._cancel_lock:
|
||||
try:
|
||||
# If we have been cancelled after awaiting and timing out
|
||||
# ensure that we alter the state to show timed out (but
|
||||
# not if we have been cancelled, since our state should
|
||||
# be cancelled instead). This is done after acquiring the
|
||||
# cancel lock so that we will not try to overwrite another
|
||||
# entity trying to set the runner to the cancel state.
|
||||
if not may_proceed and self.state != states.CANCELLED:
|
||||
self._change_state(context, states.TIMED_OUT)
|
||||
# We at this point should only have been able to time out
|
||||
# or be cancelled, no other state transitions should have
|
||||
# been possible.
|
||||
if self.state not in (states.CANCELLED, states.TIMED_OUT):
|
||||
run(*args, **kwargs)
|
||||
finally:
|
||||
signal()
|
||||
@@ -33,8 +33,12 @@ REVERTING = 'REVERTING'
|
||||
RUNNING = RUNNING
|
||||
STARTED = 'STARTED'
|
||||
SUCCESS = SUCCESS
|
||||
CANCELLED = 'CANCELLED'
|
||||
INCOMPLETE = 'INCOMPLETE'
|
||||
|
||||
# Task states.
|
||||
FAILURE = FAILURE
|
||||
STARTED = STARTED
|
||||
SUCCESS = SUCCESS
|
||||
TIMED_OUT = 'TIMED_OUT'
|
||||
CANCELLED = CANCELLED
|
||||
|
||||
388
taskflow/tests/unit/test_threaded_flow.py
Normal file
388
taskflow/tests/unit/test_threaded_flow.py
Normal file
@@ -0,0 +1,388 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
import time
|
||||
import unittest2
|
||||
|
||||
from taskflow import decorators
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow import states
|
||||
|
||||
from taskflow.patterns import threaded_flow as tf
|
||||
from taskflow.tests import utils
|
||||
|
||||
|
||||
def _find_idx(what, search_where):
|
||||
for i, j in enumerate(search_where):
|
||||
if i == what:
|
||||
return j
|
||||
return -1
|
||||
|
||||
|
||||
class ThreadedFlowTest(unittest2.TestCase):
|
||||
def _make_tracking_flow(self, name):
|
||||
notify_lock = threading.RLock()
|
||||
flo = tf.Flow(name)
|
||||
notifications = []
|
||||
|
||||
def save_notify(state, details):
|
||||
runner = details.get('runner')
|
||||
if not runner:
|
||||
return
|
||||
with notify_lock:
|
||||
notifications.append((runner.uuid, state, dict(details)))
|
||||
|
||||
flo.task_notifier.register('*', save_notify)
|
||||
return (flo, notifications)
|
||||
|
||||
def _make_watched_flow(self, name):
|
||||
history_lock = threading.RLock()
|
||||
flo = tf.Flow(name)
|
||||
history = {}
|
||||
|
||||
def save_state(state, details):
|
||||
runner = details.get('runner')
|
||||
if not runner:
|
||||
return
|
||||
with history_lock:
|
||||
old_state = details.get('old_state')
|
||||
old_states = history.get(runner.uuid, [])
|
||||
if not old_states:
|
||||
old_states.append(old_state)
|
||||
old_states.append(state)
|
||||
history[runner.uuid] = old_states
|
||||
|
||||
flo.task_notifier.register('*', save_state)
|
||||
return (flo, history)
|
||||
|
||||
def test_somewhat_complicated(self):
|
||||
"""Tests a somewhat complicated dependency graph.
|
||||
|
||||
X--Y--C--D
|
||||
| |
|
||||
A--B-- --G--
|
||||
| | |--Z(end)
|
||||
E--F-- --H--
|
||||
"""
|
||||
(flo, notifications) = self._make_tracking_flow("sanity-test")
|
||||
|
||||
# X--Y
|
||||
x = flo.add(utils.ProvidesRequiresTask("X",
|
||||
provides=['x'],
|
||||
requires=[]))
|
||||
y = flo.add(utils.ProvidesRequiresTask("Y",
|
||||
provides=['y'],
|
||||
requires=['x']))
|
||||
|
||||
# A--B
|
||||
a = flo.add(utils.ProvidesRequiresTask("A",
|
||||
provides=['a'],
|
||||
requires=[]))
|
||||
b = flo.add(utils.ProvidesRequiresTask("B",
|
||||
provides=['b'],
|
||||
requires=['a']))
|
||||
|
||||
# E--F
|
||||
e = flo.add(utils.ProvidesRequiresTask("E",
|
||||
provides=['e'],
|
||||
requires=[]))
|
||||
f = flo.add(utils.ProvidesRequiresTask("F",
|
||||
provides=['f'],
|
||||
requires=['e']))
|
||||
|
||||
# C--D
|
||||
c = flo.add(utils.ProvidesRequiresTask("C",
|
||||
provides=['c'],
|
||||
requires=['f', 'b', 'y']))
|
||||
d = flo.add(utils.ProvidesRequiresTask("D",
|
||||
provides=['d'],
|
||||
requires=['c']))
|
||||
|
||||
# G
|
||||
g = flo.add(utils.ProvidesRequiresTask("G",
|
||||
provides=['g'],
|
||||
requires=['d']))
|
||||
|
||||
# H
|
||||
h = flo.add(utils.ProvidesRequiresTask("H",
|
||||
provides=['h'],
|
||||
requires=['d']))
|
||||
|
||||
# Z
|
||||
z = flo.add(utils.ProvidesRequiresTask("Z",
|
||||
provides=['z'],
|
||||
requires=['g', 'h']))
|
||||
|
||||
all_uuids = [z, h, g, d, c, f, e, b, a, y, x]
|
||||
self.assertEquals(states.PENDING, flo.state)
|
||||
flo.run({})
|
||||
self.assertEquals(states.SUCCESS, flo.state)
|
||||
|
||||
# Analyze the notifications to determine that the correct ordering
|
||||
# occurred
|
||||
|
||||
# Discard states we aren't really interested in.
|
||||
c_notifications = []
|
||||
uuids_ran = set()
|
||||
for (uuid, state, details) in notifications:
|
||||
if state not in [states.RUNNING, states.SUCCESS, states.FAILURE]:
|
||||
continue
|
||||
uuids_ran.add(uuid)
|
||||
c_notifications.append((uuid, state, details))
|
||||
notifications = c_notifications
|
||||
self.assertEquals(len(all_uuids), len(uuids_ran))
|
||||
|
||||
# Select out the run order
|
||||
just_ran_uuids = []
|
||||
for (uuid, state, details) in notifications:
|
||||
if state not in [states.RUNNING]:
|
||||
continue
|
||||
just_ran_uuids.append(uuid)
|
||||
|
||||
def ran_before(ran_uuid, before_what):
|
||||
before_idx = just_ran_uuids.index(ran_uuid)
|
||||
other_idxs = [just_ran_uuids.index(u) for u in before_what]
|
||||
was_before = True
|
||||
for idx in other_idxs:
|
||||
if idx < before_idx:
|
||||
was_before = False
|
||||
return was_before
|
||||
|
||||
def ran_after(ran_uuid, after_what):
|
||||
after_idx = just_ran_uuids.index(ran_uuid)
|
||||
other_idxs = [just_ran_uuids.index(u) for u in after_what]
|
||||
was_after = True
|
||||
for idx in other_idxs:
|
||||
if idx > after_idx:
|
||||
was_after = False
|
||||
return was_after
|
||||
|
||||
# X, A, E should always run before the others
|
||||
self.assertTrue(ran_before(x, [c, d, g, h, z]))
|
||||
self.assertTrue(ran_before(a, [c, d, g, h, z]))
|
||||
self.assertTrue(ran_before(e, [c, d, g, h, z]))
|
||||
|
||||
# Y, B, F should always run before C
|
||||
self.assertTrue(ran_before(y, [c]))
|
||||
self.assertTrue(ran_before(b, [c]))
|
||||
self.assertTrue(ran_before(f, [c]))
|
||||
|
||||
# C runs before D
|
||||
self.assertTrue(ran_before(c, [d]))
|
||||
|
||||
# G and H are before Z
|
||||
self.assertTrue(ran_before(g, [z]))
|
||||
self.assertTrue(ran_before(h, [z]))
|
||||
|
||||
# C, D runs after X, Y, B, E, F
|
||||
self.assertTrue(ran_after(c, [x, y, b, e, f]))
|
||||
self.assertTrue(ran_after(d, [x, y, b, c, e, f]))
|
||||
|
||||
# Z is last
|
||||
all_uuids_no_z = list(all_uuids)
|
||||
all_uuids_no_z.remove(z)
|
||||
self.assertTrue(ran_after(z, all_uuids_no_z))
|
||||
|
||||
def test_empty_cancel(self):
|
||||
(flo, history) = self._make_watched_flow("sanity-test")
|
||||
self.assertEquals(states.PENDING, flo.state)
|
||||
flo.cancel()
|
||||
self.assertEquals(states.CANCELLED, flo.state)
|
||||
|
||||
def test_self_loop_flo(self):
|
||||
(flo, history) = self._make_watched_flow("sanity-test")
|
||||
flo.add(utils.ProvidesRequiresTask("do-that",
|
||||
provides=['c'],
|
||||
requires=['c']))
|
||||
self.assertRaises(excp.InvalidStateException, flo.run, {})
|
||||
|
||||
def test_circular_flo(self):
|
||||
(flo, history) = self._make_watched_flow("sanity-test")
|
||||
flo.add(utils.ProvidesRequiresTask("do-that",
|
||||
provides=['c'],
|
||||
requires=['a']))
|
||||
flo.add(utils.ProvidesRequiresTask("do-this",
|
||||
provides=['a'],
|
||||
requires=['c']))
|
||||
self.assertRaises(excp.InvalidStateException, flo.run, {})
|
||||
|
||||
def test_no_input_flo(self):
|
||||
(flo, history) = self._make_watched_flow("sanity-test")
|
||||
flo.add(utils.ProvidesRequiresTask("do-that",
|
||||
provides=['c'],
|
||||
requires=['a']))
|
||||
flo.add(utils.ProvidesRequiresTask("do-this",
|
||||
provides=['b'],
|
||||
requires=['c']))
|
||||
self.assertRaises(excp.InvalidStateException, flo.run, {})
|
||||
|
||||
def test_simple_resume(self):
|
||||
(flo, history) = self._make_watched_flow("sanity-test")
|
||||
f_uuid = flo.add(utils.ProvidesRequiresTask("do-this",
|
||||
provides=['a'],
|
||||
requires=[]))
|
||||
flo.add(utils.ProvidesRequiresTask("do-that",
|
||||
provides=['c'],
|
||||
requires=['a']))
|
||||
|
||||
def resume_it(flow, ordering):
|
||||
ran_already = []
|
||||
not_ran = []
|
||||
for r in ordering:
|
||||
if r.uuid == f_uuid:
|
||||
ran_already.append((r, {
|
||||
'result': 'b',
|
||||
'states': [states.SUCCESS],
|
||||
}))
|
||||
else:
|
||||
not_ran.append(r)
|
||||
return (ran_already, not_ran)
|
||||
|
||||
flo.resumer = resume_it
|
||||
flo.run({})
|
||||
self.assertEquals('b', flo.results[f_uuid])
|
||||
self.assertEquals(states.SUCCESS, flo.state)
|
||||
|
||||
def test_active_cancel(self):
|
||||
(flo, history) = self._make_watched_flow("sanity-test")
|
||||
flo.add(utils.ProvidesRequiresTask("do-this",
|
||||
provides=['a'],
|
||||
requires=[]))
|
||||
flo.add(utils.ProvidesRequiresTask("do-that",
|
||||
provides=['c'],
|
||||
requires=['a']))
|
||||
|
||||
@decorators.task(provides=['d'], requires=['c'])
|
||||
def cancel_it(context, c):
|
||||
am_cancelled = flo.cancel()
|
||||
return am_cancelled
|
||||
|
||||
uuid = flo.add(cancel_it)
|
||||
flo.add(utils.ProvidesRequiresTask("do-the-other",
|
||||
provides=['e'],
|
||||
requires=['d']))
|
||||
|
||||
flo.run({})
|
||||
self.assertIn(uuid, flo.results)
|
||||
self.assertEquals(states.INCOMPLETE, flo.state)
|
||||
self.assertEquals(1, flo.results[uuid])
|
||||
|
||||
def test_sanity_run(self):
|
||||
(flo, history) = self._make_watched_flow("sanity-test")
|
||||
flo.add(utils.ProvidesRequiresTask("do-this",
|
||||
provides=['a'],
|
||||
requires=[]))
|
||||
flo.add(utils.ProvidesRequiresTask("do-that",
|
||||
provides=['c'],
|
||||
requires=['a']))
|
||||
flo.add(utils.ProvidesRequiresTask("do-other",
|
||||
provides=['d'],
|
||||
requires=[]))
|
||||
flo.add(utils.ProvidesRequiresTask("do-thing",
|
||||
provides=['e'],
|
||||
requires=['d']))
|
||||
self.assertEquals(states.PENDING, flo.state)
|
||||
context = {}
|
||||
flo.run(context)
|
||||
self.assertEquals(states.SUCCESS, flo.state)
|
||||
self.assertTrue(len(context) > 0)
|
||||
# Even when running in parallel this will be the required order since
|
||||
# 'do-that' depends on 'do-this' finishing first.
|
||||
expected_order = ['do-this', 'do-that']
|
||||
this_that = [t for t in context[utils.ORDER_KEY]
|
||||
if t in expected_order]
|
||||
self.assertEquals(expected_order, this_that)
|
||||
expected_order = ['do-other', 'do-thing']
|
||||
this_that = [t for t in context[utils.ORDER_KEY]
|
||||
if t in expected_order]
|
||||
self.assertEquals(expected_order, this_that)
|
||||
|
||||
def test_single_failure(self):
|
||||
|
||||
def reverter(context, result, cause):
|
||||
context['reverted'] = True
|
||||
|
||||
@decorators.task(revert_with=reverter)
|
||||
def fail_quick(context):
|
||||
raise IOError("Broken")
|
||||
|
||||
(flo, history) = self._make_watched_flow('test-single-fail')
|
||||
f_uuid = flo.add(fail_quick)
|
||||
context = {}
|
||||
self.assertRaises(IOError, flo.run, context)
|
||||
self.assertEquals(states.FAILURE, flo.state)
|
||||
self.assertEquals(states.FAILURE, history[f_uuid][-1])
|
||||
self.assertTrue(context.get('reverted'))
|
||||
|
||||
def test_failure_cancel_successors(self):
|
||||
(flo, history) = self._make_watched_flow("failure-cancel")
|
||||
|
||||
@decorators.task(provides=['b', 'c'])
|
||||
def fail_quick(context):
|
||||
raise IOError("Broken")
|
||||
|
||||
@decorators.task
|
||||
def after_fail(context, b):
|
||||
pass
|
||||
|
||||
@decorators.task
|
||||
def after_fail2(context, c):
|
||||
pass
|
||||
|
||||
fq, af, af2 = flo.add_many([fail_quick, after_fail, after_fail2])
|
||||
self.assertEquals(states.PENDING, flo.state)
|
||||
|
||||
context = {}
|
||||
self.assertRaises(IOError, flo.run, context)
|
||||
self.assertEquals(states.FAILURE, flo.state)
|
||||
self.assertEquals(states.FAILURE, history[fq][-1])
|
||||
self.assertEquals(states.CANCELLED, history[af][-1])
|
||||
self.assertEquals(states.CANCELLED, history[af2][-1])
|
||||
|
||||
def test_live_timeout(self):
|
||||
|
||||
@decorators.task(provides=['a'])
|
||||
def task_long(context):
|
||||
time.sleep(1)
|
||||
return {
|
||||
'a': 2,
|
||||
}
|
||||
|
||||
@decorators.task(provides=['b'])
|
||||
def wait_short(context, a):
|
||||
pass
|
||||
|
||||
@decorators.task
|
||||
def wait_ok_long(context, a):
|
||||
pass
|
||||
|
||||
@decorators.task
|
||||
def wait_after_short(context, b):
|
||||
pass
|
||||
|
||||
(flo, history) = self._make_watched_flow('test-live')
|
||||
flo.add(task_long)
|
||||
ws_uuid = flo.add(wait_short, timeout=0.1)
|
||||
flo.add(wait_ok_long)
|
||||
was_uuid = flo.add(wait_after_short)
|
||||
|
||||
flo.run({})
|
||||
self.assertEquals(states.INCOMPLETE, flo.state)
|
||||
self.assertEquals(states.TIMED_OUT, history[ws_uuid][-1])
|
||||
self.assertEquals(states.CANCELLED, history[was_uuid][-1])
|
||||
@@ -24,6 +24,7 @@ import logging
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import threading2
|
||||
import time
|
||||
|
||||
from taskflow.openstack.common import uuidutils
|
||||
@@ -32,6 +33,25 @@ TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory'
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def await(check_functor, timeout=None):
|
||||
if timeout is not None:
|
||||
end_time = time.time() + max(0, timeout)
|
||||
else:
|
||||
end_time = None
|
||||
# Use the same/similar scheme that the python condition class uses.
|
||||
delay = 0.0005
|
||||
while not check_functor():
|
||||
time.sleep(delay)
|
||||
if end_time is not None:
|
||||
remaining = end_time - time.time()
|
||||
if remaining <= 0:
|
||||
return False
|
||||
delay = min(delay * 2, remaining, 0.05)
|
||||
else:
|
||||
delay = min(delay * 2, 0.05)
|
||||
return True
|
||||
|
||||
|
||||
def get_task_version(task):
|
||||
"""Gets a tasks *string* version, whether it is a task object/function."""
|
||||
task_version = getattr(task, 'version')
|
||||
@@ -79,23 +99,80 @@ def is_version_compatible(version_1, version_2):
|
||||
return True
|
||||
|
||||
|
||||
def await(check_functor, timeout=None):
|
||||
if timeout is not None:
|
||||
end_time = time.time() + max(0, timeout)
|
||||
else:
|
||||
class MultiLock(object):
|
||||
"""A class which can attempt to obtain many locks at once and release
|
||||
said locks when exiting.
|
||||
|
||||
Useful as a context manager around many locks (instead of having to nest
|
||||
said individual context managers).
|
||||
"""
|
||||
|
||||
def __init__(self, locks):
|
||||
assert len(locks) > 0, "Zero locks requested"
|
||||
self._locks = locks
|
||||
self._locked = [False] * len(locks)
|
||||
|
||||
def __enter__(self):
|
||||
|
||||
def is_locked(lock):
|
||||
# NOTE(harlowja): the threading2 lock doesn't seem to have this
|
||||
# attribute, so thats why we are checking it existing first.
|
||||
if hasattr(lock, 'locked'):
|
||||
return lock.locked()
|
||||
return False
|
||||
|
||||
for i in xrange(0, len(self._locked)):
|
||||
if self._locked[i] or is_locked(self._locks[i]):
|
||||
raise threading.ThreadError("Lock %s not previously released"
|
||||
% (i + 1))
|
||||
self._locked[i] = False
|
||||
for (i, lock) in enumerate(self._locks):
|
||||
self._locked[i] = lock.acquire()
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
for (i, locked) in enumerate(self._locked):
|
||||
try:
|
||||
if locked:
|
||||
self._locks[i].release()
|
||||
self._locked[i] = False
|
||||
except threading.ThreadError:
|
||||
LOG.exception("Unable to release lock %s", i + 1)
|
||||
|
||||
|
||||
class CountDownLatch(object):
|
||||
"""Similar in concept to the java count down latch."""
|
||||
|
||||
def __init__(self, count=0):
|
||||
self.count = count
|
||||
self.lock = threading.Condition()
|
||||
|
||||
def countDown(self):
|
||||
with self.lock:
|
||||
self.count -= 1
|
||||
if self.count <= 0:
|
||||
self.lock.notifyAll()
|
||||
|
||||
def await(self, timeout=None):
|
||||
end_time = None
|
||||
# Use the same/similar scheme that the python condition class uses.
|
||||
delay = 0.0005
|
||||
while not check_functor():
|
||||
time.sleep(delay)
|
||||
if end_time is not None:
|
||||
remaining = end_time - time.time()
|
||||
if remaining <= 0:
|
||||
return False
|
||||
delay = min(delay * 2, remaining, 0.05)
|
||||
else:
|
||||
delay = min(delay * 2, 0.05)
|
||||
return True
|
||||
if timeout is not None:
|
||||
timeout = max(0, timeout)
|
||||
end_time = time.time() + timeout
|
||||
time_up = False
|
||||
with self.lock:
|
||||
while True:
|
||||
# Stop waiting on these 2 conditions.
|
||||
if time_up or self.count <= 0:
|
||||
break
|
||||
# Was this a spurious wakeup or did we really end??
|
||||
self.lock.wait(timeout=timeout)
|
||||
if end_time is not None:
|
||||
if time.time() >= end_time:
|
||||
time_up = True
|
||||
else:
|
||||
# Reduce the timeout so that we don't wait extra time
|
||||
# over what we initially were requested to.
|
||||
timeout = end_time - time.time()
|
||||
return self.count <= 0
|
||||
|
||||
|
||||
class LastFedIter(object):
|
||||
@@ -113,16 +190,45 @@ class LastFedIter(object):
|
||||
yield i
|
||||
|
||||
|
||||
class ThreadGroupExecutor(object):
|
||||
"""A simple thread executor that spins up new threads (or greenthreads) for
|
||||
each task to be completed (no pool limit is enforced).
|
||||
|
||||
TODO(harlowja): Likely if we use the more advanced executors that come with
|
||||
the concurrent.futures library we can just get rid of this.
|
||||
"""
|
||||
|
||||
def __init__(self, daemonize=True):
|
||||
self._threads = []
|
||||
self._group = threading2.ThreadGroup()
|
||||
self._daemonize = daemonize
|
||||
|
||||
def submit(self, fn, *args, **kwargs):
|
||||
t = threading2.Thread(target=fn, group=self._group,
|
||||
args=args, kwargs=kwargs)
|
||||
t.daemon = self._daemonize
|
||||
self._threads.append(t)
|
||||
t.start()
|
||||
|
||||
def await_termination(self, timeout=None):
|
||||
if not self._threads:
|
||||
return
|
||||
return self._group.join(timeout)
|
||||
|
||||
|
||||
class FlowFailure(object):
|
||||
"""When a task failure occurs the following object will be given to revert
|
||||
and can be used to interrogate what caused the failure.
|
||||
"""
|
||||
|
||||
def __init__(self, runner, flow, exception):
|
||||
def __init__(self, runner, flow, exc, exc_info=None):
|
||||
self.runner = runner
|
||||
self.flow = flow
|
||||
self.exc = exception
|
||||
self.exc_info = sys.exc_info()
|
||||
self.exc = exc
|
||||
if not exc_info:
|
||||
self.exc_info = sys.exc_info()
|
||||
else:
|
||||
self.exc_info = exc_info
|
||||
|
||||
|
||||
class RollbackTask(object):
|
||||
@@ -161,7 +267,6 @@ class Runner(object):
|
||||
else:
|
||||
self.task = task
|
||||
self.providers = {}
|
||||
self.runs_before = []
|
||||
self.result = None
|
||||
if not uuid:
|
||||
self._id = uuidutils.generate_uuid()
|
||||
@@ -184,6 +289,10 @@ class Runner(object):
|
||||
def optional(self):
|
||||
return self.task.optional
|
||||
|
||||
@property
|
||||
def runs_before(self):
|
||||
return []
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
return get_task_version(self.task)
|
||||
@@ -205,24 +314,52 @@ class Runner(object):
|
||||
# Find all of our inputs first.
|
||||
kwargs = dict(kwargs)
|
||||
for (k, who_made) in self.providers.iteritems():
|
||||
if who_made.result and k in who_made.result:
|
||||
if k in kwargs:
|
||||
continue
|
||||
try:
|
||||
kwargs[k] = who_made.result[k]
|
||||
else:
|
||||
kwargs[k] = None
|
||||
except (TypeError, KeyError):
|
||||
pass
|
||||
optional_keys = self.optional
|
||||
optional_missing_keys = optional_keys - set(kwargs.keys())
|
||||
if optional_missing_keys:
|
||||
for k in optional_missing_keys:
|
||||
for r in self.runs_before:
|
||||
r_provides = r.provides
|
||||
if k in r_provides and r.result and k in r.result:
|
||||
kwargs[k] = r.result[k]
|
||||
break
|
||||
optional_keys = optional_keys - set(kwargs.keys())
|
||||
for k in optional_keys:
|
||||
for who_ran in self.runs_before:
|
||||
matched = False
|
||||
if k in who_ran.provides:
|
||||
try:
|
||||
kwargs[k] = who_ran.result[k]
|
||||
matched = True
|
||||
except (TypeError, KeyError):
|
||||
pass
|
||||
if matched:
|
||||
break
|
||||
# Ensure all required keys are either existent or set to none.
|
||||
for k in self.requires:
|
||||
if k not in kwargs:
|
||||
kwargs[k] = None
|
||||
# And now finally run.
|
||||
self.result = self.task(*args, **kwargs)
|
||||
return self.result
|
||||
|
||||
|
||||
class AOTRunner(Runner):
|
||||
"""A runner that knows who runs before this runner ahead of time from a
|
||||
known list of previous runners.
|
||||
"""
|
||||
|
||||
def __init__(self, task):
|
||||
super(AOTRunner, self).__init__(task)
|
||||
self._runs_before = []
|
||||
|
||||
@property
|
||||
def runs_before(self):
|
||||
return self._runs_before
|
||||
|
||||
@runs_before.setter
|
||||
def runs_before(self, runs_before):
|
||||
self._runs_before = list(runs_before)
|
||||
|
||||
|
||||
class TransitionNotifier(object):
|
||||
"""A utility helper class that can be used to subscribe to
|
||||
notifications of events occuring as well as allow a entity to post said
|
||||
@@ -300,16 +437,12 @@ class RollbackAccumulator(object):
|
||||
def __len__(self):
|
||||
return len(self._rollbacks)
|
||||
|
||||
def __iter__(self):
|
||||
# Rollbacks happen in the reverse order that they were added.
|
||||
return reversed(self._rollbacks)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def rollback(self, cause):
|
||||
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
|
||||
for (i, f) in enumerate(self):
|
||||
for (i, f) in enumerate(reversed(self._rollbacks)):
|
||||
LOG.debug("Calling rollback %s: %s", i + 1, f)
|
||||
try:
|
||||
f(cause)
|
||||
@@ -399,45 +532,6 @@ class ReaderWriterLock(object):
|
||||
self.readers_ok.release()
|
||||
|
||||
|
||||
class MultiLock(object):
|
||||
"""A class which can attempt to obtain many locks at once and release
|
||||
said locks when exiting.
|
||||
|
||||
Useful as a context manager around many locks (instead of having to nest
|
||||
said individual context managers).
|
||||
"""
|
||||
|
||||
def __init__(self, locks):
|
||||
self._locks = locks
|
||||
self._locked = list([False] * len(locks))
|
||||
|
||||
def __enter__(self):
|
||||
|
||||
def is_locked(lock):
|
||||
# NOTE(harlowja): the threading2 lock doesn't seem to have this
|
||||
# attribute, so thats why we are checking it existing first.
|
||||
if hasattr(lock, 'locked'):
|
||||
return lock.locked()
|
||||
return False
|
||||
|
||||
for i in xrange(0, len(self._locked)):
|
||||
if self._locked[i] or is_locked(self._locks[i]):
|
||||
raise threading.ThreadError("Lock %s not previously released"
|
||||
% (i + 1))
|
||||
self._locked[i] = False
|
||||
for (i, lock) in enumerate(self._locks):
|
||||
self._locked[i] = lock.acquire()
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
for (i, locked) in enumerate(self._locked):
|
||||
try:
|
||||
if locked:
|
||||
self._locks[i].release()
|
||||
self._locked[i] = False
|
||||
except threading.ThreadError:
|
||||
LOG.exception("Unable to release lock %s", i + 1)
|
||||
|
||||
|
||||
class LazyPluggable(object):
|
||||
"""A pluggable backend loaded lazily based on some value."""
|
||||
|
||||
|
||||
@@ -3,8 +3,6 @@ anyjson>=0.2.4
|
||||
eventlet>=0.9.17
|
||||
iso8601
|
||||
netaddr>=0.7.6
|
||||
# Very nice graph library
|
||||
networkx>=1.5
|
||||
# Need currently the newest version of oslo.config for the DeprecatedOpt
|
||||
# used inside oslo database layer.
|
||||
-f http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a3.tar.gz#egg=oslo.config-1.2.0a3
|
||||
@@ -13,4 +11,7 @@ six
|
||||
# Only needed if database backend used.
|
||||
sqlalchemy>=0.7,<=0.7.99
|
||||
alembic>=0.4.1
|
||||
# Very nice graph library
|
||||
networkx>=1.8.1
|
||||
threading2
|
||||
Babel>=0.9.6
|
||||
|
||||
Reference in New Issue
Block a user