Add a locally running threaded flow

Propose a new graph flow that will run every task
in the graph at the same time but will use a count
down latch concept to ensure that a tasks dependents
are provided before the task itself can run. This
allows for tasks to run in parallel (if they have
no dependents or are placed in disjoint parts of
graph).

Note: although this flow uses threads it is typically
expected that the underlying threads would be greenthreads
since python native threading is still suboptimal (for
various reasons).

Implements: blueprint locally-run-many-at-once

Change-Id: If434abd77758aa12fc99da395a2559995305a853
This commit is contained in:
Joshua Harlow 2013-06-25 18:46:55 -07:00
parent 392925d013
commit fceabee18f
11 changed files with 1299 additions and 144 deletions

@ -35,14 +35,28 @@ def wraps(fn):
return wrapper
def locked(f):
def locked(*args, **kwargs):
@wraps(f)
def wrapper(self, *args, **kwargs):
with self._lock:
return f(self, *args, **kwargs)
def decorator(f):
attr_name = kwargs.get('lock', '_lock')
return wrapper
@wraps(f)
def wrapper(*args, **kwargs):
lock = getattr(args[0], attr_name)
with lock:
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator
def _original_function(fun):

@ -68,10 +68,10 @@ class JobNotFound(TaskFlowException):
class MissingDependencies(InvalidStateException):
"""Raised when a task has dependencies that can not be satisified."""
message = ("%(task)s requires %(requirements)s but no other task produces"
"""Raised when a entity has dependencies that can not be satisified."""
message = ("%(who)s requires %(requirements)s but no other entity produces"
" said requirements")
def __init__(self, task, requirements):
message = self.message % {'task': task, 'requirements': requirements}
def __init__(self, who, requirements):
message = self.message % {'who': who, 'requirements': requirements}
super(MissingDependencies, self).__init__(message)

@ -21,7 +21,6 @@ import threading
from taskflow.openstack.common import uuidutils
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow import states
from taskflow import utils
@ -86,14 +85,14 @@ class Flow(object):
# storage backend).
self.notifier = utils.TransitionNotifier()
self.task_notifier = utils.TransitionNotifier()
# Ensure that modifications and/or multiple runs aren't happening
# at the same time in the same flow at the same time.
self._lock = threading.RLock()
# Assign this flow a unique identifer.
if uuid:
self._id = str(uuid)
else:
self._id = uuidutils.generate_uuid()
# Ensure we can not change the state at the same time in 2 different
# threads.
self._state_lock = threading.RLock()
@property
def name(self):
@ -109,21 +108,26 @@ class Flow(object):
"""Provides a read-only view of the flow state."""
return self._state
def _change_state(self, context, new_state):
was_changed = False
old_state = self.state
with self._lock:
def _change_state(self, context, new_state, check_func=None, notify=True):
old_state = None
changed = False
with self._state_lock:
if self.state != new_state:
old_state = self.state
self._state = new_state
was_changed = True
if was_changed:
# Don't notify while holding the lock.
if (not check_func or
(check_func and check_func(self.state))):
changed = True
old_state = self.state
self._state = new_state
# Don't notify while holding the lock so that the reciever of said
# notifications can actually perform operations on the given flow
# without getting into deadlock.
if notify and changed:
self.notifier.notify(self.state, details={
'context': context,
'flow': self,
'old_state': old_state,
})
return changed
def __str__(self):
lines = ["Flow: %s" % (self.name)]
@ -141,7 +145,6 @@ class Flow(object):
"""
raise NotImplementedError()
@decorators.locked
def add_many(self, tasks):
"""Adds many tasks to this flow.
@ -158,54 +161,54 @@ class Flow(object):
Returns how many tasks were interrupted (if any).
"""
if self.state in self.UNINTERRUPTIBLE_STATES:
raise exc.InvalidStateException(("Can not interrupt when"
" in state %s") % (self.state))
# Note(harlowja): Do *not* acquire the lock here so that the flow may
# be interrupted while running. This does mean the the above check may
# not be valid but we can worry about that if it becomes an issue.
old_state = self.state
if old_state != states.INTERRUPTED:
self._state = states.INTERRUPTED
self.notifier.notify(self.state, details={
'context': None,
'flow': self,
'old_state': old_state,
})
return 0
def check():
if self.state in self.UNINTERRUPTIBLE_STATES:
raise exc.InvalidStateException(("Can not interrupt when"
" in state %s") % self.state)
check()
with self._state_lock:
check()
self._change_state(None, states.INTERRUPTED)
return 0
@decorators.locked
def reset(self):
"""Fully resets the internal state of this flow, allowing for the flow
to be ran again.
Note: Listeners are also reset.
"""
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % (self.state))
self.notifier.reset()
self.task_notifier.reset()
self._change_state(None, states.PENDING)
def check():
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % self.state)
check()
with self._state_lock:
check()
self.notifier.reset()
self.task_notifier.reset()
self._change_state(None, states.PENDING)
@decorators.locked
def soft_reset(self):
"""Partially resets the internal state of this flow, allowing for the
flow to be ran again from an interrupted state only.
flow to be ran again from an interrupted state.
"""
if self.state not in self.SOFT_RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not soft reset when"
" in state %s") % (self.state))
self._change_state(None, states.PENDING)
def check():
if self.state not in self.SOFT_RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not soft reset when"
" in state %s") % self.state)
@decorators.locked
check()
with self._state_lock:
check()
self._change_state(None, states.PENDING)
@abc.abstractmethod
def run(self, context, *args, **kwargs):
"""Executes the workflow."""
if self.state not in self.RUNNABLE_STATES:
raise exc.InvalidStateException("Unable to run flow when "
"in state %s" % (self.state))
raise NotImplementedError()
@decorators.locked
def rollback(self, context, cause):
"""Performs rollback of this workflow and any attached parent workflows
if present.

@ -49,7 +49,7 @@ class Flow(linear_flow.Flow):
# to infer the edges at this stage we likely will fail finding
# dependencies from nodes that don't exist.
assert isinstance(task, collections.Callable)
r = utils.Runner(task)
r = utils.AOTRunner(task)
self._graph.add_node(r, uuid=r.uuid, infer=infer)
self._reset_internals()
return r.uuid

@ -17,7 +17,9 @@
# under the License.
import collections
import functools
import logging
import threading
from taskflow.openstack.common import excutils
@ -54,6 +56,7 @@ class Flow(flow.Flow):
# All runners to run are collected here.
self._runners = []
self._connected = False
self._lock = threading.RLock()
# The resumption strategy to use.
self.resumer = None
@ -61,7 +64,7 @@ class Flow(flow.Flow):
def add(self, task):
"""Adds a given task to this flow."""
assert isinstance(task, collections.Callable)
r = utils.Runner(task)
r = utils.AOTRunner(task)
r.runs_before = list(reversed(self._runners))
self._runners.append(r)
self._reset_internals()
@ -136,20 +139,27 @@ class Flow(flow.Flow):
@decorators.locked
def run(self, context, *args, **kwargs):
super(Flow, self).run(context, *args, **kwargs)
def abort_if(current_state, ok_states):
if current_state not in ok_states:
return False
return True
def resume_it():
if self._leftoff_at is not None:
return ([], self._leftoff_at)
if self.resumer:
(finished, leftover) = self.resumer.resume(self,
self._ordering())
(finished, leftover) = self.resumer(self, self._ordering())
else:
finished = []
leftover = self._ordering()
return (finished, leftover)
self._change_state(context, states.STARTED)
start_check_functor = functools.partial(abort_if,
ok_states=self.RUNNABLE_STATES)
if not self._change_state(context, states.STARTED,
check_func=start_check_functor):
return
try:
those_finished, leftover = resume_it()
except Exception:
@ -211,8 +221,13 @@ class Flow(flow.Flow):
})
self.rollback(context, cause)
run_check_functor = functools.partial(abort_if,
ok_states=[states.STARTED,
states.RESUMING])
if len(those_finished):
self._change_state(context, states.RESUMING)
if not self._change_state(context, states.RESUMING,
check_func=run_check_functor):
return
for (r, details) in those_finished:
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that
@ -222,8 +237,8 @@ class Flow(flow.Flow):
run_it(r, failed=failed, result=result, simulate_run=True)
self._leftoff_at = leftover
self._change_state(context, states.RUNNING)
if self.state == states.INTERRUPTED:
if not self._change_state(context, states.RUNNING,
check_func=run_check_functor):
return
was_interrupted = False

@ -111,7 +111,7 @@ class Resumption(object):
details = self._reconcile_versions(immediate_version, details)
return (True, details)
def resume(self, flow, ordering):
def __call__(self, flow, ordering):
"""Splits the initial ordering into two segments, the first which
has already completed (or errored) and the second which has not
completed or errored.

@ -0,0 +1,636 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from taskflow import exceptions as exc
from taskflow import flow
from taskflow import graph_utils
from taskflow import states
from taskflow import utils
import collections
import functools
import logging
import sys
import threading
import weakref
from networkx.algorithms import cycles
from networkx.classes import digraph
LOG = logging.getLogger(__name__)
class DependencyTimeout(exc.InvalidStateException):
"""When running in parallel a task has the ability to timeout waiting for
its dependent tasks to finish, this will be raised when that occurs.
"""
pass
class Flow(flow.Flow):
"""This flow pattern establishes tasks into a graph where each task is a
node in the graph and dependencies between tasks are edges in the graph.
When running (in parallel) each task will only be activated when its
dependencies have been satisified. When a graph is split into two or more
segments, both of those segments will be ran in parallel.
For example lets take this small little *somewhat complicated* graph:
X--Y--C--D
| |
A--B-- --G--
| | |--Z(end)
E--F-- --H--
In this flow the following will be ran in parallel at start:
1. X--Y
2. A--B
3. E--F
Note the C--D nodes will not be able to run until [Y,B,F] has completed.
After C--D completes the following will be ran in parallel:
1. G
2. H
Then finally Z will run (after [G,H] complete) and the flow will then have
finished executing.
"""
MUTABLE_STATES = set([states.PENDING, states.FAILURE, states.SUCCESS])
REVERTABLE_STATES = set([states.FAILURE, states.INCOMPLETE])
CANCELLABLE_STATES = set([states.PENDING, states.RUNNING])
def __init__(self, name):
super(Flow, self).__init__(name)
self._graph = digraph.DiGraph(name=name)
self._run_lock = threading.RLock()
self._cancel_lock = threading.RLock()
self._mutate_lock = threading.RLock()
# NOTE(harlowja) The locking order in this list actually matters since
# we need to make sure that users of this list do not get deadlocked
# by out of order lock access.
self._core_locks = [
self._run_lock,
self._mutate_lock,
self._cancel_lock,
]
self._run_locks = [
self._run_lock,
self._mutate_lock,
]
self._cancel_locks = [
self._cancel_lock,
]
self.results = {}
self.resumer = None
def __str__(self):
lines = ["ParallelFlow: %s" % (self.name)]
lines.append("%s" % (self._graph.number_of_nodes()))
lines.append("%s" % (self.state))
return "; ".join(lines)
def soft_reset(self):
# The way this flow works does not allow (at the current moment) for
# you to suspend the threads and then resume them at a later time,
# instead it only supports interruption (which will cancel the threads)
# and then a full reset.
raise NotImplementedError("Threaded flow does not currently support"
" soft resetting, please try using"
" reset() instead")
def interrupt(self):
"""Currently we can not pause threads and then resume them later, not
really thinking that we should likely ever do this.
"""
raise NotImplementedError("Threaded flow does not currently support"
" interruption, please try using"
" cancel() instead")
def reset(self):
# All locks are used so that resets can not happen while running or
# cancelling or modifying.
with utils.MultiLock(self._core_locks):
super(Flow, self).reset()
self.results = {}
self.resumer = None
def cancel(self):
def check():
if self.state not in self.CANCELLABLE_STATES:
raise exc.InvalidStateException("Can not attempt cancellation"
" when in state %s" %
self.state)
check()
cancelled = 0
was_empty = False
# We don't lock the other locks so that the flow can be cancelled while
# running. Further state management logic is then used while running
# to verify that the flow should still be running when it has been
# cancelled.
with utils.MultiLock(self._cancel_locks):
check()
if len(self._graph) == 0:
was_empty = True
else:
for r in self._graph.nodes_iter():
try:
if r.cancel(blocking=False):
cancelled += 1
except exc.InvalidStateException:
pass
if cancelled or was_empty:
self._change_state(None, states.CANCELLED)
return cancelled
def _find_uuid(self, uuid):
# Finds the runner for the given uuid (or returns none)
for r in self._graph.nodes_iter():
if r.uuid == uuid:
return r
return None
def add(self, task, timeout=None, infer=True):
"""Adds a task to the given flow using the given timeout which will be
used a the timeout to wait for dependencies (if any) to be
fulfilled.
"""
def check():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable %s state" %
(self.state))
# Ensure that we do a quick check to see if we can even perform this
# addition before we go about actually acquiring the lock to perform
# the actual addition.
check()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
check()
runner = ThreadRunner(task, self, timeout)
self._graph.add_node(runner, infer=infer)
return runner.uuid
def _connect(self):
"""Infers and connects the edges of the given tasks by examining the
associated tasks provides and requires attributes and connecting tasks
that require items to tasks that produce said items.
"""
# Disconnect all edges not manually created before we attempt to infer
# them so that we don't retain edges that are invalid.
def disconnect_non_user(u, v, e_data):
if e_data and e_data.get('reason') != 'manual':
return True
return False
# Link providers to requirers.
graph_utils.connect(self._graph,
discard_func=disconnect_non_user)
# Connect the successors & predecessors and related siblings
for r in self._graph.nodes_iter():
r._predecessors = []
r._successors = []
for (r2, _me) in self._graph.in_edges_iter([r]):
r._predecessors.append(r2)
for (_me, r2) in self._graph.out_edges_iter([r]):
r._successors.append(r2)
r.siblings = []
for r2 in self._graph.nodes_iter():
if r2 is r or r2 in r._predecessors or r2 in r._successors:
continue
r._siblings.append(r2)
def add_many(self, tasks):
"""Adds a list of tasks to the flow."""
def check():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable state %s"
% (self.state))
# Ensure that we do a quick check to see if we can even perform this
# addition before we go about actually acquiring the lock.
check()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
check()
added = []
for t in tasks:
added.append(self.add(t))
return added
def add_dependency(self, provider_uuid, consumer_uuid):
"""Manually adds a dependency between a provider and a consumer."""
def check_and_fetch():
if self.state not in self.MUTABLE_STATES:
raise exc.InvalidStateException("Flow is currently in a"
" non-mutable state %s"
% (self.state))
provider = self._find_uuid(provider_uuid)
if not provider or not self._graph.has_node(provider):
raise exc.InvalidStateException("Can not add a dependency "
"from unknown uuid %s" %
(provider_uuid))
consumer = self._find_uuid(consumer_uuid)
if not consumer or not self._graph.has_node(consumer):
raise exc.InvalidStateException("Can not add a dependency "
"to unknown uuid %s"
% (consumer_uuid))
if provider is consumer:
raise exc.InvalidStateException("Can not add a dependency "
"to loop via uuid %s"
% (consumer_uuid))
return (provider, consumer)
check_and_fetch()
# All locks must be acquired so that modifications can not be made
# while running, cancelling or performing a simultaneous mutation.
with utils.MultiLock(self._core_locks):
(provider, consumer) = check_and_fetch()
self._graph.add_edge(provider, consumer, reason='manual')
LOG.debug("Connecting %s as a manual provider for %s",
provider, consumer)
def run(self, context, *args, **kwargs):
"""Executes the given flow using the given context and args/kwargs."""
def abort_if(current_state, ok_states):
if current_state in (states.CANCELLED,):
return False
if current_state not in ok_states:
return False
return True
def check():
if self.state not in self.RUNNABLE_STATES:
raise exc.InvalidStateException("Flow is currently unable "
"to be ran in state %s"
% (self.state))
def connect_and_verify():
"""Do basic sanity tests on the graph structure."""
if len(self._graph) == 0:
return
self._connect()
degrees = [g[1] for g in self._graph.in_degree_iter()]
zero_degrees = [d for d in degrees if d == 0]
if not zero_degrees:
# If every task depends on something else to produce its input
# then we will be in a deadlock situation.
raise exc.InvalidStateException("No task has an in-degree"
" of zero")
self_loops = self._graph.nodes_with_selfloops()
if self_loops:
# A task that has a dependency on itself will never be able
# to run.
raise exc.InvalidStateException("%s tasks have been detected"
" with dependencies on"
" themselves" %
len(self_loops))
simple_cycles = len(cycles.recursive_simple_cycles(self._graph))
if simple_cycles:
# A task loop will never be able to run, unless it somehow
# breaks that loop.
raise exc.InvalidStateException("%s tasks have been detected"
" with dependency loops" %
simple_cycles)
def run_it(result_cb, args, kwargs):
check_runnable = functools.partial(abort_if,
ok_states=self.RUNNABLE_STATES)
if self._change_state(context, states.RUNNING,
check_func=check_runnable):
self.results = {}
if len(self._graph) == 0:
return
for r in self._graph.nodes_iter():
r.reset()
r._result_cb = result_cb
executor = utils.ThreadGroupExecutor()
for r in self._graph.nodes_iter():
executor.submit(r, *args, **kwargs)
executor.await_termination()
def trigger_rollback(failures):
if not failures:
return
causes = []
for r in failures:
causes.append(utils.FlowFailure(r, self,
r.exc, r.exc_info))
try:
self.rollback(context, causes)
except exc.InvalidStateException:
pass
finally:
# TODO(harlowja): re-raise a combined exception when
# there are more than one failures??
for f in failures:
if all(f.exc_info):
raise f.exc_info[0], f.exc_info[1], f.exc_info[2]
def handle_results():
# Isolate each runner state into groups so that we can easily tell
# which ones failed, cancelled, completed...
groups = collections.defaultdict(list)
for r in self._graph.nodes_iter():
groups[r.state].append(r)
for r in self._graph.nodes_iter():
if r not in groups.get(states.FAILURE, []) and r.has_ran():
self.results[r.uuid] = r.result
if groups[states.FAILURE]:
self._change_state(context, states.FAILURE)
trigger_rollback(groups[states.FAILURE])
elif (groups[states.CANCELLED] or groups[states.PENDING]
or groups[states.TIMED_OUT] or groups[states.STARTED]):
self._change_state(context, states.INCOMPLETE)
else:
check_ran = functools.partial(abort_if,
ok_states=[states.RUNNING])
self._change_state(context, states.SUCCESS,
check_func=check_ran)
def get_resumer_cb():
if not self.resumer:
return None
(ran, _others) = self.resumer(self, self._graph.nodes_iter())
def fetch_results(runner):
for (r, metadata) in ran:
if r is runner:
return (True, metadata.get('result'))
return (False, None)
result_cb = fetch_results
return result_cb
args = [context] + list(args)
check()
# Only acquire the run lock (but use further state checking) and the
# mutation lock to stop simultaneous running and simultaneous mutating
# which are not allowed on a running flow. Allow simultaneous cancel
# by performing repeated state checking while running.
with utils.MultiLock(self._run_locks):
check()
connect_and_verify()
try:
run_it(get_resumer_cb(), args, kwargs)
finally:
handle_results()
def rollback(self, context, cause):
"""Rolls back all tasks that are *not* still pending or cancelled."""
def check():
if self.state not in self.REVERTABLE_STATES:
raise exc.InvalidStateException("Flow is currently unable "
"to be rolled back in "
"state %s" % (self.state))
check()
# All locks must be acquired so that modifications can not be made
# while another entity is running, rolling-back, cancelling or
# performing a mutation operation.
with utils.MultiLock(self._core_locks):
check()
accum = utils.RollbackAccumulator()
for r in self._graph.nodes_iter():
if r.has_ran():
accum.add(utils.RollbackTask(context, r.task, r.result))
try:
self._change_state(context, states.REVERTING)
accum.rollback(cause)
finally:
self._change_state(context, states.FAILURE)
class ThreadRunner(utils.Runner):
"""A helper class that will use a countdown latch to avoid calling its
callable object until said countdown latch has emptied. After it has
been emptied the predecessor tasks will be examined for dependent results
and said results will then be provided to call the runners callable
object.
TODO(harlowja): this could be a 'future' like object in the future since it
is starting to have the same purpose and usage (in a way). Likely switch
this over to the task details object or a subclass of it???
"""
RESETTABLE_STATES = set([states.PENDING, states.SUCCESS, states.FAILURE,
states.CANCELLED])
RUNNABLE_STATES = set([states.PENDING])
CANCELABLE_STATES = set([states.PENDING])
SUCCESS_STATES = set([states.SUCCESS])
CANCEL_SUCCESSORS_WHEN = set([states.FAILURE, states.CANCELLED,
states.TIMED_OUT])
NO_RAN_STATES = set([states.CANCELLED, states.PENDING, states.TIMED_OUT,
states.RUNNING])
def __init__(self, task, flow, timeout):
super(ThreadRunner, self).__init__(task)
# Use weak references to give the GC a break.
self._flow = weakref.proxy(flow)
self._notifier = flow.task_notifier
self._timeout = timeout
self._state = states.PENDING
self._run_lock = threading.RLock()
# Use the flows state lock so that state notifications are not sent
# simultaneously for a given flow.
self._state_lock = flow._state_lock
self._cancel_lock = threading.RLock()
self._latch = utils.CountDownLatch()
# Any related family.
self._predecessors = []
self._successors = []
self._siblings = []
# Ensure we capture any exceptions that may have been triggered.
self.exc = None
self.exc_info = (None, None, None)
# This callback will be called before the underlying task is actually
# returned and it should either return a tuple of (has_result, result)
self._result_cb = None
@property
def state(self):
return self._state
def has_ran(self):
if self.state in self.NO_RAN_STATES:
return False
return True
def _change_state(self, context, new_state):
old_state = None
changed = False
with self._state_lock:
if self.state != new_state:
old_state = self.state
self._state = new_state
changed = True
# Don't notify while holding the lock so that the reciever of said
# notifications can actually perform operations on the given runner
# without getting into deadlock.
if changed and self._notifier:
self._notifier.notify(self.state, details={
'context': context,
'flow': self._flow,
'old_state': old_state,
'runner': self,
})
def cancel(self, blocking=True):
def check():
if self.state not in self.CANCELABLE_STATES:
raise exc.InvalidStateException("Runner not in a cancelable"
" state: %s" % (self.state))
# Check before as a quick way out of attempting to acquire the more
# heavy-weight lock. Then acquire the lock (which should not be
# possible if we are currently running) and set the state (if still
# applicable).
check()
acquired = False
cancelled = False
try:
acquired = self._cancel_lock.acquire(blocking=blocking)
if acquired:
check()
cancelled = True
self._change_state(None, states.CANCELLED)
finally:
if acquired:
self._cancel_lock.release()
return cancelled
def reset(self):
def check():
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException("Runner not in a resettable"
" state: %s" % (self.state))
def do_reset():
self._latch.count = len(self._predecessors)
self.exc = None
self.exc_info = (None, None, None)
self.result = None
self._change_state(None, states.PENDING)
# We need to acquire both locks here so that we can not be running
# or being cancelled at the same time we are resetting.
check()
with self._run_lock:
check()
with self._cancel_lock:
check()
do_reset()
@property
def runs_before(self):
# NOTE(harlowja): this list may change, depending on which other
# runners have completed (or are currently actively running), so
# this is why this is a property instead of a semi-static defined list
# like in the AOT class. The list should only get bigger and not
# smaller so it should be fine to filter on runners that have completed
# successfully.
finished_ok = []
for r in self._siblings:
if r.has_ran() and r.state in self.SUCCESS_STATES:
finished_ok.append(r)
return finished_ok
def __call__(self, context, *args, **kwargs):
def is_runnable():
if self.state not in self.RUNNABLE_STATES:
return False
return True
def run(*args, **kwargs):
try:
self._change_state(context, states.RUNNING)
has_result = False
if self._result_cb:
has_result, self.result = self._result_cb(self)
if not has_result:
super(ThreadRunner, self).__call__(*args, **kwargs)
self._change_state(context, states.SUCCESS)
except Exception as e:
self._change_state(context, states.FAILURE)
self.exc = e
self.exc_info = sys.exc_info()
def signal():
if not self._successors:
return
if self.state in self.CANCEL_SUCCESSORS_WHEN:
for r in self._successors:
try:
r.cancel(blocking=False)
except exc.InvalidStateException:
pass
for r in self._successors:
try:
r._latch.countDown()
except Exception:
LOG.exception("Failed decrementing %s latch", r)
# We check before to avoid attempting to acquire the lock when we are
# known to be in a non-runnable state.
if not is_runnable():
return
args = [context] + list(args)
with self._run_lock:
# We check after we now own the run lock since a previous thread
# could have exited and released that lock and set the state to
# not runnable.
if not is_runnable():
return
may_proceed = self._latch.await(self._timeout)
# We now acquire the cancel lock so that we can be assured that
# we have not been cancelled by another entity.
with self._cancel_lock:
try:
# If we have been cancelled after awaiting and timing out
# ensure that we alter the state to show timed out (but
# not if we have been cancelled, since our state should
# be cancelled instead). This is done after acquiring the
# cancel lock so that we will not try to overwrite another
# entity trying to set the runner to the cancel state.
if not may_proceed and self.state != states.CANCELLED:
self._change_state(context, states.TIMED_OUT)
# We at this point should only have been able to time out
# or be cancelled, no other state transitions should have
# been possible.
if self.state not in (states.CANCELLED, states.TIMED_OUT):
run(*args, **kwargs)
finally:
signal()

@ -33,8 +33,12 @@ REVERTING = 'REVERTING'
RUNNING = RUNNING
STARTED = 'STARTED'
SUCCESS = SUCCESS
CANCELLED = 'CANCELLED'
INCOMPLETE = 'INCOMPLETE'
# Task states.
FAILURE = FAILURE
STARTED = STARTED
SUCCESS = SUCCESS
TIMED_OUT = 'TIMED_OUT'
CANCELLED = CANCELLED

@ -0,0 +1,388 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
import unittest2
from taskflow import decorators
from taskflow import exceptions as excp
from taskflow import states
from taskflow.patterns import threaded_flow as tf
from taskflow.tests import utils
def _find_idx(what, search_where):
for i, j in enumerate(search_where):
if i == what:
return j
return -1
class ThreadedFlowTest(unittest2.TestCase):
def _make_tracking_flow(self, name):
notify_lock = threading.RLock()
flo = tf.Flow(name)
notifications = []
def save_notify(state, details):
runner = details.get('runner')
if not runner:
return
with notify_lock:
notifications.append((runner.uuid, state, dict(details)))
flo.task_notifier.register('*', save_notify)
return (flo, notifications)
def _make_watched_flow(self, name):
history_lock = threading.RLock()
flo = tf.Flow(name)
history = {}
def save_state(state, details):
runner = details.get('runner')
if not runner:
return
with history_lock:
old_state = details.get('old_state')
old_states = history.get(runner.uuid, [])
if not old_states:
old_states.append(old_state)
old_states.append(state)
history[runner.uuid] = old_states
flo.task_notifier.register('*', save_state)
return (flo, history)
def test_somewhat_complicated(self):
"""Tests a somewhat complicated dependency graph.
X--Y--C--D
| |
A--B-- --G--
| | |--Z(end)
E--F-- --H--
"""
(flo, notifications) = self._make_tracking_flow("sanity-test")
# X--Y
x = flo.add(utils.ProvidesRequiresTask("X",
provides=['x'],
requires=[]))
y = flo.add(utils.ProvidesRequiresTask("Y",
provides=['y'],
requires=['x']))
# A--B
a = flo.add(utils.ProvidesRequiresTask("A",
provides=['a'],
requires=[]))
b = flo.add(utils.ProvidesRequiresTask("B",
provides=['b'],
requires=['a']))
# E--F
e = flo.add(utils.ProvidesRequiresTask("E",
provides=['e'],
requires=[]))
f = flo.add(utils.ProvidesRequiresTask("F",
provides=['f'],
requires=['e']))
# C--D
c = flo.add(utils.ProvidesRequiresTask("C",
provides=['c'],
requires=['f', 'b', 'y']))
d = flo.add(utils.ProvidesRequiresTask("D",
provides=['d'],
requires=['c']))
# G
g = flo.add(utils.ProvidesRequiresTask("G",
provides=['g'],
requires=['d']))
# H
h = flo.add(utils.ProvidesRequiresTask("H",
provides=['h'],
requires=['d']))
# Z
z = flo.add(utils.ProvidesRequiresTask("Z",
provides=['z'],
requires=['g', 'h']))
all_uuids = [z, h, g, d, c, f, e, b, a, y, x]
self.assertEquals(states.PENDING, flo.state)
flo.run({})
self.assertEquals(states.SUCCESS, flo.state)
# Analyze the notifications to determine that the correct ordering
# occurred
# Discard states we aren't really interested in.
c_notifications = []
uuids_ran = set()
for (uuid, state, details) in notifications:
if state not in [states.RUNNING, states.SUCCESS, states.FAILURE]:
continue
uuids_ran.add(uuid)
c_notifications.append((uuid, state, details))
notifications = c_notifications
self.assertEquals(len(all_uuids), len(uuids_ran))
# Select out the run order
just_ran_uuids = []
for (uuid, state, details) in notifications:
if state not in [states.RUNNING]:
continue
just_ran_uuids.append(uuid)
def ran_before(ran_uuid, before_what):
before_idx = just_ran_uuids.index(ran_uuid)
other_idxs = [just_ran_uuids.index(u) for u in before_what]
was_before = True
for idx in other_idxs:
if idx < before_idx:
was_before = False
return was_before
def ran_after(ran_uuid, after_what):
after_idx = just_ran_uuids.index(ran_uuid)
other_idxs = [just_ran_uuids.index(u) for u in after_what]
was_after = True
for idx in other_idxs:
if idx > after_idx:
was_after = False
return was_after
# X, A, E should always run before the others
self.assertTrue(ran_before(x, [c, d, g, h, z]))
self.assertTrue(ran_before(a, [c, d, g, h, z]))
self.assertTrue(ran_before(e, [c, d, g, h, z]))
# Y, B, F should always run before C
self.assertTrue(ran_before(y, [c]))
self.assertTrue(ran_before(b, [c]))
self.assertTrue(ran_before(f, [c]))
# C runs before D
self.assertTrue(ran_before(c, [d]))
# G and H are before Z
self.assertTrue(ran_before(g, [z]))
self.assertTrue(ran_before(h, [z]))
# C, D runs after X, Y, B, E, F
self.assertTrue(ran_after(c, [x, y, b, e, f]))
self.assertTrue(ran_after(d, [x, y, b, c, e, f]))
# Z is last
all_uuids_no_z = list(all_uuids)
all_uuids_no_z.remove(z)
self.assertTrue(ran_after(z, all_uuids_no_z))
def test_empty_cancel(self):
(flo, history) = self._make_watched_flow("sanity-test")
self.assertEquals(states.PENDING, flo.state)
flo.cancel()
self.assertEquals(states.CANCELLED, flo.state)
def test_self_loop_flo(self):
(flo, history) = self._make_watched_flow("sanity-test")
flo.add(utils.ProvidesRequiresTask("do-that",
provides=['c'],
requires=['c']))
self.assertRaises(excp.InvalidStateException, flo.run, {})
def test_circular_flo(self):
(flo, history) = self._make_watched_flow("sanity-test")
flo.add(utils.ProvidesRequiresTask("do-that",
provides=['c'],
requires=['a']))
flo.add(utils.ProvidesRequiresTask("do-this",
provides=['a'],
requires=['c']))
self.assertRaises(excp.InvalidStateException, flo.run, {})
def test_no_input_flo(self):
(flo, history) = self._make_watched_flow("sanity-test")
flo.add(utils.ProvidesRequiresTask("do-that",
provides=['c'],
requires=['a']))
flo.add(utils.ProvidesRequiresTask("do-this",
provides=['b'],
requires=['c']))
self.assertRaises(excp.InvalidStateException, flo.run, {})
def test_simple_resume(self):
(flo, history) = self._make_watched_flow("sanity-test")
f_uuid = flo.add(utils.ProvidesRequiresTask("do-this",
provides=['a'],
requires=[]))
flo.add(utils.ProvidesRequiresTask("do-that",
provides=['c'],
requires=['a']))
def resume_it(flow, ordering):
ran_already = []
not_ran = []
for r in ordering:
if r.uuid == f_uuid:
ran_already.append((r, {
'result': 'b',
'states': [states.SUCCESS],
}))
else:
not_ran.append(r)
return (ran_already, not_ran)
flo.resumer = resume_it
flo.run({})
self.assertEquals('b', flo.results[f_uuid])
self.assertEquals(states.SUCCESS, flo.state)
def test_active_cancel(self):
(flo, history) = self._make_watched_flow("sanity-test")
flo.add(utils.ProvidesRequiresTask("do-this",
provides=['a'],
requires=[]))
flo.add(utils.ProvidesRequiresTask("do-that",
provides=['c'],
requires=['a']))
@decorators.task(provides=['d'], requires=['c'])
def cancel_it(context, c):
am_cancelled = flo.cancel()
return am_cancelled
uuid = flo.add(cancel_it)
flo.add(utils.ProvidesRequiresTask("do-the-other",
provides=['e'],
requires=['d']))
flo.run({})
self.assertIn(uuid, flo.results)
self.assertEquals(states.INCOMPLETE, flo.state)
self.assertEquals(1, flo.results[uuid])
def test_sanity_run(self):
(flo, history) = self._make_watched_flow("sanity-test")
flo.add(utils.ProvidesRequiresTask("do-this",
provides=['a'],
requires=[]))
flo.add(utils.ProvidesRequiresTask("do-that",
provides=['c'],
requires=['a']))
flo.add(utils.ProvidesRequiresTask("do-other",
provides=['d'],
requires=[]))
flo.add(utils.ProvidesRequiresTask("do-thing",
provides=['e'],
requires=['d']))
self.assertEquals(states.PENDING, flo.state)
context = {}
flo.run(context)
self.assertEquals(states.SUCCESS, flo.state)
self.assertTrue(len(context) > 0)
# Even when running in parallel this will be the required order since
# 'do-that' depends on 'do-this' finishing first.
expected_order = ['do-this', 'do-that']
this_that = [t for t in context[utils.ORDER_KEY]
if t in expected_order]
self.assertEquals(expected_order, this_that)
expected_order = ['do-other', 'do-thing']
this_that = [t for t in context[utils.ORDER_KEY]
if t in expected_order]
self.assertEquals(expected_order, this_that)
def test_single_failure(self):
def reverter(context, result, cause):
context['reverted'] = True
@decorators.task(revert_with=reverter)
def fail_quick(context):
raise IOError("Broken")
(flo, history) = self._make_watched_flow('test-single-fail')
f_uuid = flo.add(fail_quick)
context = {}
self.assertRaises(IOError, flo.run, context)
self.assertEquals(states.FAILURE, flo.state)
self.assertEquals(states.FAILURE, history[f_uuid][-1])
self.assertTrue(context.get('reverted'))
def test_failure_cancel_successors(self):
(flo, history) = self._make_watched_flow("failure-cancel")
@decorators.task(provides=['b', 'c'])
def fail_quick(context):
raise IOError("Broken")
@decorators.task
def after_fail(context, b):
pass
@decorators.task
def after_fail2(context, c):
pass
fq, af, af2 = flo.add_many([fail_quick, after_fail, after_fail2])
self.assertEquals(states.PENDING, flo.state)
context = {}
self.assertRaises(IOError, flo.run, context)
self.assertEquals(states.FAILURE, flo.state)
self.assertEquals(states.FAILURE, history[fq][-1])
self.assertEquals(states.CANCELLED, history[af][-1])
self.assertEquals(states.CANCELLED, history[af2][-1])
def test_live_timeout(self):
@decorators.task(provides=['a'])
def task_long(context):
time.sleep(1)
return {
'a': 2,
}
@decorators.task(provides=['b'])
def wait_short(context, a):
pass
@decorators.task
def wait_ok_long(context, a):
pass
@decorators.task
def wait_after_short(context, b):
pass
(flo, history) = self._make_watched_flow('test-live')
flo.add(task_long)
ws_uuid = flo.add(wait_short, timeout=0.1)
flo.add(wait_ok_long)
was_uuid = flo.add(wait_after_short)
flo.run({})
self.assertEquals(states.INCOMPLETE, flo.state)
self.assertEquals(states.TIMED_OUT, history[ws_uuid][-1])
self.assertEquals(states.CANCELLED, history[was_uuid][-1])

@ -24,6 +24,7 @@ import logging
import re
import sys
import threading
import threading2
import time
from taskflow.openstack.common import uuidutils
@ -32,6 +33,25 @@ TASK_FACTORY_ATTRIBUTE = '_TaskFlow_task_factory'
LOG = logging.getLogger(__name__)
def await(check_functor, timeout=None):
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while not check_functor():
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
def get_task_version(task):
"""Gets a tasks *string* version, whether it is a task object/function."""
task_version = getattr(task, 'version')
@ -79,23 +99,80 @@ def is_version_compatible(version_1, version_2):
return True
def await(check_functor, timeout=None):
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
class MultiLock(object):
"""A class which can attempt to obtain many locks at once and release
said locks when exiting.
Useful as a context manager around many locks (instead of having to nest
said individual context managers).
"""
def __init__(self, locks):
assert len(locks) > 0, "Zero locks requested"
self._locks = locks
self._locked = [False] * len(locks)
def __enter__(self):
def is_locked(lock):
# NOTE(harlowja): the threading2 lock doesn't seem to have this
# attribute, so thats why we are checking it existing first.
if hasattr(lock, 'locked'):
return lock.locked()
return False
for i in xrange(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
def __exit__(self, type, value, traceback):
for (i, locked) in enumerate(self._locked):
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
class CountDownLatch(object):
"""Similar in concept to the java count down latch."""
def __init__(self, count=0):
self.count = count
self.lock = threading.Condition()
def countDown(self):
with self.lock:
self.count -= 1
if self.count <= 0:
self.lock.notifyAll()
def await(self, timeout=None):
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while not check_functor():
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
if timeout is not None:
timeout = max(0, timeout)
end_time = time.time() + timeout
time_up = False
with self.lock:
while True:
# Stop waiting on these 2 conditions.
if time_up or self.count <= 0:
break
# Was this a spurious wakeup or did we really end??
self.lock.wait(timeout=timeout)
if end_time is not None:
if time.time() >= end_time:
time_up = True
else:
# Reduce the timeout so that we don't wait extra time
# over what we initially were requested to.
timeout = end_time - time.time()
return self.count <= 0
class LastFedIter(object):
@ -113,16 +190,45 @@ class LastFedIter(object):
yield i
class ThreadGroupExecutor(object):
"""A simple thread executor that spins up new threads (or greenthreads) for
each task to be completed (no pool limit is enforced).
TODO(harlowja): Likely if we use the more advanced executors that come with
the concurrent.futures library we can just get rid of this.
"""
def __init__(self, daemonize=True):
self._threads = []
self._group = threading2.ThreadGroup()
self._daemonize = daemonize
def submit(self, fn, *args, **kwargs):
t = threading2.Thread(target=fn, group=self._group,
args=args, kwargs=kwargs)
t.daemon = self._daemonize
self._threads.append(t)
t.start()
def await_termination(self, timeout=None):
if not self._threads:
return
return self._group.join(timeout)
class FlowFailure(object):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure.
"""
def __init__(self, runner, flow, exception):
def __init__(self, runner, flow, exc, exc_info=None):
self.runner = runner
self.flow = flow
self.exc = exception
self.exc_info = sys.exc_info()
self.exc = exc
if not exc_info:
self.exc_info = sys.exc_info()
else:
self.exc_info = exc_info
class RollbackTask(object):
@ -161,7 +267,6 @@ class Runner(object):
else:
self.task = task
self.providers = {}
self.runs_before = []
self.result = None
if not uuid:
self._id = uuidutils.generate_uuid()
@ -184,6 +289,10 @@ class Runner(object):
def optional(self):
return self.task.optional
@property
def runs_before(self):
return []
@property
def version(self):
return get_task_version(self.task)
@ -205,24 +314,52 @@ class Runner(object):
# Find all of our inputs first.
kwargs = dict(kwargs)
for (k, who_made) in self.providers.iteritems():
if who_made.result and k in who_made.result:
if k in kwargs:
continue
try:
kwargs[k] = who_made.result[k]
else:
kwargs[k] = None
except (TypeError, KeyError):
pass
optional_keys = self.optional
optional_missing_keys = optional_keys - set(kwargs.keys())
if optional_missing_keys:
for k in optional_missing_keys:
for r in self.runs_before:
r_provides = r.provides
if k in r_provides and r.result and k in r.result:
kwargs[k] = r.result[k]
break
optional_keys = optional_keys - set(kwargs.keys())
for k in optional_keys:
for who_ran in self.runs_before:
matched = False
if k in who_ran.provides:
try:
kwargs[k] = who_ran.result[k]
matched = True
except (TypeError, KeyError):
pass
if matched:
break
# Ensure all required keys are either existent or set to none.
for k in self.requires:
if k not in kwargs:
kwargs[k] = None
# And now finally run.
self.result = self.task(*args, **kwargs)
return self.result
class AOTRunner(Runner):
"""A runner that knows who runs before this runner ahead of time from a
known list of previous runners.
"""
def __init__(self, task):
super(AOTRunner, self).__init__(task)
self._runs_before = []
@property
def runs_before(self):
return self._runs_before
@runs_before.setter
def runs_before(self, runs_before):
self._runs_before = list(runs_before)
class TransitionNotifier(object):
"""A utility helper class that can be used to subscribe to
notifications of events occuring as well as allow a entity to post said
@ -300,16 +437,12 @@ class RollbackAccumulator(object):
def __len__(self):
return len(self._rollbacks)
def __iter__(self):
# Rollbacks happen in the reverse order that they were added.
return reversed(self._rollbacks)
def __enter__(self):
return self
def rollback(self, cause):
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
for (i, f) in enumerate(self):
for (i, f) in enumerate(reversed(self._rollbacks)):
LOG.debug("Calling rollback %s: %s", i + 1, f)
try:
f(cause)
@ -399,45 +532,6 @@ class ReaderWriterLock(object):
self.readers_ok.release()
class MultiLock(object):
"""A class which can attempt to obtain many locks at once and release
said locks when exiting.
Useful as a context manager around many locks (instead of having to nest
said individual context managers).
"""
def __init__(self, locks):
self._locks = locks
self._locked = list([False] * len(locks))
def __enter__(self):
def is_locked(lock):
# NOTE(harlowja): the threading2 lock doesn't seem to have this
# attribute, so thats why we are checking it existing first.
if hasattr(lock, 'locked'):
return lock.locked()
return False
for i in xrange(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
def __exit__(self, type, value, traceback):
for (i, locked) in enumerate(self._locked):
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""

@ -3,8 +3,6 @@ anyjson>=0.2.4
eventlet>=0.9.17
iso8601
netaddr>=0.7.6
# Very nice graph library
networkx>=1.5
# Need currently the newest version of oslo.config for the DeprecatedOpt
# used inside oslo database layer.
-f http://tarballs.openstack.org/oslo.config/oslo.config-1.2.0a3.tar.gz#egg=oslo.config-1.2.0a3
@ -13,4 +11,7 @@ six
# Only needed if database backend used.
sqlalchemy>=0.7,<=0.7.99
alembic>=0.4.1
# Very nice graph library
networkx>=1.8.1
threading2
Babel>=0.9.6